mirror of
https://github.com/ghndrx/prowler.git
synced 2026-02-10 06:45:08 +00:00
feat(unix timestamp): add unix timestamp to outputs (#2813)
This commit is contained in:
@@ -159,6 +159,12 @@ Detailed documentation at https://docs.prowler.cloud
|
||||
action="store_true",
|
||||
help="Send a summary of the execution with a Slack APP in your channel. Environment variables SLACK_API_TOKEN and SLACK_CHANNEL_ID are required (see more in https://docs.prowler.cloud/en/latest/tutorials/integrations/#slack).",
|
||||
)
|
||||
common_outputs_parser.add_argument(
|
||||
"--unix-timestamp",
|
||||
action="store_true",
|
||||
default=False,
|
||||
help="Set the output timestamp format as unix timestamps instead of iso format timestamps (default mode).",
|
||||
)
|
||||
|
||||
def __init_logging_parser__(self):
|
||||
# Logging Options
|
||||
|
||||
@@ -18,6 +18,7 @@ from prowler.lib.outputs.models import (
|
||||
generate_csv_fields,
|
||||
unroll_list,
|
||||
)
|
||||
from prowler.lib.utils.utils import outputs_unix_timestamp
|
||||
|
||||
|
||||
def add_manual_controls(output_options, audit_info, file_descriptors):
|
||||
@@ -67,7 +68,9 @@ def fill_compliance(output_options, finding, audit_info, file_descriptors):
|
||||
Description=compliance.Description,
|
||||
AccountId=audit_info.audited_account,
|
||||
Region=finding.region,
|
||||
AssessmentDate=timestamp.isoformat(),
|
||||
AssessmentDate=outputs_unix_timestamp(
|
||||
output_options.unix_timestamp, timestamp
|
||||
),
|
||||
Requirements_Id=requirement_id,
|
||||
Requirements_Description=requirement_description,
|
||||
Requirements_Attributes_IdGrupoControl=attribute.IdGrupoControl,
|
||||
@@ -105,7 +108,9 @@ def fill_compliance(output_options, finding, audit_info, file_descriptors):
|
||||
Description=compliance.Description,
|
||||
AccountId=audit_info.audited_account,
|
||||
Region=finding.region,
|
||||
AssessmentDate=timestamp.isoformat(),
|
||||
AssessmentDate=outputs_unix_timestamp(
|
||||
output_options.unix_timestamp, timestamp
|
||||
),
|
||||
Requirements_Id=requirement_id,
|
||||
Requirements_Description=requirement_description,
|
||||
Requirements_Attributes_Section=attribute.Section,
|
||||
@@ -132,7 +137,9 @@ def fill_compliance(output_options, finding, audit_info, file_descriptors):
|
||||
Description=compliance.Description,
|
||||
ProjectId=finding.project_id,
|
||||
Location=finding.location,
|
||||
AssessmentDate=timestamp.isoformat(),
|
||||
AssessmentDate=outputs_unix_timestamp(
|
||||
output_options.unix_timestamp, timestamp
|
||||
),
|
||||
Requirements_Id=requirement_id,
|
||||
Requirements_Description=requirement_description,
|
||||
Requirements_Attributes_Section=attribute.Section,
|
||||
@@ -176,7 +183,9 @@ def fill_compliance(output_options, finding, audit_info, file_descriptors):
|
||||
Description=compliance.Description,
|
||||
AccountId=audit_info.audited_account,
|
||||
Region=finding.region,
|
||||
AssessmentDate=timestamp.isoformat(),
|
||||
AssessmentDate=outputs_unix_timestamp(
|
||||
output_options.unix_timestamp, timestamp
|
||||
),
|
||||
Requirements_Id=requirement_id,
|
||||
Requirements_Description=requirement_description,
|
||||
Requirements_Attributes_Name=attribute.Name,
|
||||
@@ -221,7 +230,9 @@ def fill_compliance(output_options, finding, audit_info, file_descriptors):
|
||||
Description=compliance.Description,
|
||||
AccountId=audit_info.audited_account,
|
||||
Region=finding.region,
|
||||
AssessmentDate=timestamp.isoformat(),
|
||||
AssessmentDate=outputs_unix_timestamp(
|
||||
output_options.unix_timestamp, timestamp
|
||||
),
|
||||
Requirements_Id=requirement_id,
|
||||
Requirements_Name=requirement_name,
|
||||
Requirements_Description=requirement_description,
|
||||
@@ -268,7 +279,9 @@ def fill_compliance(output_options, finding, audit_info, file_descriptors):
|
||||
Description=compliance.Description,
|
||||
AccountId=audit_info.audited_account,
|
||||
Region=finding.region,
|
||||
AssessmentDate=timestamp.isoformat(),
|
||||
AssessmentDate=outputs_unix_timestamp(
|
||||
output_options.unix_timestamp, timestamp
|
||||
),
|
||||
Requirements_Id=requirement_id,
|
||||
Requirements_Description=requirement_description,
|
||||
Requirements_Name=requirement_name,
|
||||
@@ -308,7 +321,9 @@ def fill_compliance(output_options, finding, audit_info, file_descriptors):
|
||||
Description=compliance.Description,
|
||||
AccountId=audit_info.audited_account,
|
||||
Region=finding.region,
|
||||
AssessmentDate=timestamp.isoformat(),
|
||||
AssessmentDate=outputs_unix_timestamp(
|
||||
output_options.unix_timestamp, timestamp
|
||||
),
|
||||
Requirements_Id=requirement_id,
|
||||
Requirements_Description=requirement_description,
|
||||
Requirements_Attributes_Section=attribute.Section,
|
||||
|
||||
@@ -30,7 +30,7 @@ from prowler.lib.outputs.models import (
|
||||
get_check_compliance,
|
||||
unroll_dict_to_list,
|
||||
)
|
||||
from prowler.lib.utils.utils import hash_sha512, open_file
|
||||
from prowler.lib.utils.utils import hash_sha512, open_file, outputs_unix_timestamp
|
||||
|
||||
|
||||
def fill_json_asff(finding_output, audit_info, finding, output_options):
|
||||
@@ -224,7 +224,9 @@ def fill_json_ocsf(audit_info, finding, output_options) -> Check_Output_JSON_OCS
|
||||
name=finding.check_metadata.CheckID,
|
||||
)
|
||||
),
|
||||
original_time=timestamp.isoformat(),
|
||||
original_time=outputs_unix_timestamp(
|
||||
output_options.unix_timestamp, timestamp
|
||||
),
|
||||
profiles=[audit_info.profile]
|
||||
if hasattr(audit_info, "organizations_metadata")
|
||||
else [],
|
||||
@@ -249,7 +251,7 @@ def fill_json_ocsf(audit_info, finding, output_options) -> Check_Output_JSON_OCS
|
||||
status_id=generate_json_ocsf_status_id(finding.status),
|
||||
compliance=compliance,
|
||||
cloud=cloud,
|
||||
time=timestamp.isoformat(),
|
||||
time=outputs_unix_timestamp(output_options.unix_timestamp, timestamp),
|
||||
metadata=metadata,
|
||||
)
|
||||
return finding_output
|
||||
|
||||
@@ -9,6 +9,7 @@ from pydantic import BaseModel
|
||||
from prowler.config.config import prowler_version, timestamp
|
||||
from prowler.lib.check.models import Remediation
|
||||
from prowler.lib.logger import logger
|
||||
from prowler.lib.utils.utils import outputs_unix_timestamp
|
||||
from prowler.providers.aws.lib.audit_info.models import AWS_Organizations_Info
|
||||
|
||||
|
||||
@@ -47,7 +48,7 @@ def generate_provider_output_csv(
|
||||
finding_output_model = f"{provider.capitalize()}_Check_Output_{mode.upper()}"
|
||||
output_model = getattr(importlib.import_module(__name__), finding_output_model)
|
||||
# Fill common data among providers
|
||||
data = fill_common_data_csv(finding)
|
||||
data = fill_common_data_csv(finding, output_options.unix_timestamp)
|
||||
|
||||
if provider == "azure":
|
||||
data["resource_id"] = finding.resource_id
|
||||
@@ -120,9 +121,9 @@ def generate_provider_output_csv(
|
||||
return csv_writer, finding_output
|
||||
|
||||
|
||||
def fill_common_data_csv(finding: dict) -> dict:
|
||||
def fill_common_data_csv(finding: dict, unix_timestamp: bool) -> dict:
|
||||
data = {
|
||||
"assessment_start_time": timestamp.isoformat(),
|
||||
"assessment_start_time": outputs_unix_timestamp(unix_timestamp, timestamp),
|
||||
"finding_unique_id": "",
|
||||
"provider": finding.check_metadata.Provider,
|
||||
"check_id": finding.check_metadata.CheckID,
|
||||
@@ -360,7 +361,9 @@ def generate_provider_output_json(
|
||||
# Instantiate the class for the cloud provider
|
||||
finding_output = output_model(**finding.check_metadata.dict())
|
||||
# Fill common fields
|
||||
finding_output.AssessmentStartTime = timestamp.isoformat()
|
||||
finding_output.AssessmentStartTime = outputs_unix_timestamp(
|
||||
output_options.unix_timestamp, timestamp
|
||||
)
|
||||
finding_output.Status = finding.status
|
||||
finding_output.StatusExtended = finding.status_extended
|
||||
finding_output.ResourceDetails = finding.resource_details
|
||||
|
||||
@@ -2,10 +2,12 @@ import json
|
||||
import os
|
||||
import sys
|
||||
import tempfile
|
||||
from datetime import datetime
|
||||
from hashlib import sha512
|
||||
from io import TextIOWrapper
|
||||
from ipaddress import ip_address
|
||||
from os.path import exists
|
||||
from time import mktime
|
||||
from typing import Any
|
||||
|
||||
from detect_secrets import SecretsCollection
|
||||
@@ -88,3 +90,11 @@ def validate_ip_address(ip_string):
|
||||
return True
|
||||
except ValueError:
|
||||
return False
|
||||
|
||||
|
||||
def outputs_unix_timestamp(is_unix_timestamp: bool, timestamp: datetime):
|
||||
if is_unix_timestamp:
|
||||
timestamp = mktime(timestamp.timetuple())
|
||||
else:
|
||||
timestamp = timestamp.isoformat()
|
||||
return timestamp
|
||||
|
||||
@@ -39,6 +39,7 @@ class Provider_Output_Options:
|
||||
verbose: str
|
||||
output_filename: str
|
||||
only_logs: bool
|
||||
unix_timestamp: bool
|
||||
|
||||
def __init__(self, arguments, allowlist_file, bulk_checks_metadata):
|
||||
self.is_quiet = arguments.quiet
|
||||
@@ -48,6 +49,7 @@ class Provider_Output_Options:
|
||||
self.bulk_checks_metadata = bulk_checks_metadata
|
||||
self.allowlist_file = allowlist_file
|
||||
self.only_logs = arguments.only_logs
|
||||
self.unix_timestamp = arguments.unix_timestamp
|
||||
# Check output directory, if it is not created -> create it
|
||||
if arguments.output_directory:
|
||||
if not isdir(arguments.output_directory):
|
||||
|
||||
@@ -44,6 +44,7 @@ class Test_Parser:
|
||||
assert not parsed.verbose
|
||||
assert not parsed.no_banner
|
||||
assert not parsed.slack
|
||||
assert not parsed.unix_timestamp
|
||||
assert parsed.log_level == "CRITICAL"
|
||||
assert not parsed.log_file
|
||||
assert not parsed.only_logs
|
||||
@@ -90,6 +91,7 @@ class Test_Parser:
|
||||
assert not parsed.verbose
|
||||
assert not parsed.no_banner
|
||||
assert not parsed.slack
|
||||
assert not parsed.unix_timestamp
|
||||
assert parsed.log_level == "CRITICAL"
|
||||
assert not parsed.log_file
|
||||
assert not parsed.only_logs
|
||||
@@ -128,6 +130,7 @@ class Test_Parser:
|
||||
assert not parsed.verbose
|
||||
assert not parsed.no_banner
|
||||
assert not parsed.slack
|
||||
assert not parsed.unix_timestamp
|
||||
assert parsed.log_level == "CRITICAL"
|
||||
assert not parsed.log_file
|
||||
assert not parsed.only_logs
|
||||
@@ -287,6 +290,11 @@ class Test_Parser:
|
||||
parsed = self.parser.parse(command)
|
||||
assert parsed.slack
|
||||
|
||||
def test_root_parser_unix_timestamp(self):
|
||||
command = [prowler_command, "--unix-timestamp"]
|
||||
parsed = self.parser.parse(command)
|
||||
assert parsed.unix_timestamp
|
||||
|
||||
def test_logging_parser_only_logs_set(self):
|
||||
command = [prowler_command, "--only-logs"]
|
||||
parsed = self.parser.parse(command)
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import os
|
||||
from os import path, remove
|
||||
from time import mktime
|
||||
from unittest import mock
|
||||
|
||||
import boto3
|
||||
@@ -973,7 +974,7 @@ class Test_Outputs:
|
||||
== expected
|
||||
)
|
||||
|
||||
def test_fill_json_ocsf(self):
|
||||
def test_fill_json_ocsf_iso_format_timestamp(self):
|
||||
input_audit_info = AWS_Audit_Info(
|
||||
session_config=None,
|
||||
original_session=None,
|
||||
@@ -1098,6 +1099,135 @@ class Test_Outputs:
|
||||
class_name="Security Finding",
|
||||
)
|
||||
output_options = mock.MagicMock()
|
||||
output_options.unix_timestamp = False
|
||||
assert fill_json_ocsf(input_audit_info, finding, output_options) == expected
|
||||
|
||||
def test_fill_json_ocsf_unix_timestamp(self):
|
||||
input_audit_info = AWS_Audit_Info(
|
||||
session_config=None,
|
||||
original_session=None,
|
||||
audit_session=None,
|
||||
audited_account=AWS_ACCOUNT_ID,
|
||||
audited_account_arn=f"arn:aws:iam::{AWS_ACCOUNT_ID}:root",
|
||||
audited_identity_arn="test-arn",
|
||||
audited_user_id="test",
|
||||
audited_partition="aws",
|
||||
profile="default",
|
||||
profile_region="eu-west-1",
|
||||
credentials=None,
|
||||
assumed_role_info=None,
|
||||
audited_regions=["eu-west-2", "eu-west-1"],
|
||||
organizations_metadata=None,
|
||||
audit_resources=None,
|
||||
mfa_enabled=False,
|
||||
audit_metadata=Audit_Metadata(
|
||||
services_scanned=0,
|
||||
expected_checks=[],
|
||||
completed_checks=0,
|
||||
audit_progress=0,
|
||||
),
|
||||
)
|
||||
finding = Check_Report(
|
||||
load_check_metadata(
|
||||
f"{path.dirname(path.realpath(__file__))}/fixtures/metadata.json"
|
||||
).json()
|
||||
)
|
||||
finding.resource_details = "Test resource details"
|
||||
finding.resource_id = "test-resource"
|
||||
finding.resource_arn = "test-arn"
|
||||
finding.region = "eu-west-1"
|
||||
finding.status = "PASS"
|
||||
finding.status_extended = "This is a test"
|
||||
|
||||
expected = Check_Output_JSON_OCSF(
|
||||
finding=Finding(
|
||||
title="Ensure credentials unused for 30 days or greater are disabled",
|
||||
desc="Ensure credentials unused for 30 days or greater are disabled",
|
||||
supporting_data={
|
||||
"Risk": "Risk associated.",
|
||||
"Notes": "additional information",
|
||||
},
|
||||
remediation=Remediation_OCSF(
|
||||
kb_articles=[
|
||||
"code or URL to the code location.",
|
||||
"code or URL to the code location.",
|
||||
"cli command or URL to the cli command location.",
|
||||
"cli command or URL to the cli command location.",
|
||||
"https://myfp.com/recommendations/dangerous_things_and_how_to_fix_them.html",
|
||||
],
|
||||
desc="Run sudo yum update and cross your fingers and toes.",
|
||||
),
|
||||
types=["Software and Configuration Checks"],
|
||||
src_url="https://serviceofficialsiteorpageforthissubject",
|
||||
uid="prowler-aws-iam_disable_30_days_credentials-123456789012-eu-west-1-test-resource",
|
||||
related_events=[
|
||||
"othercheck1",
|
||||
"othercheck2",
|
||||
"othercheck3",
|
||||
"othercheck4",
|
||||
],
|
||||
),
|
||||
resources=[
|
||||
Resources(
|
||||
group=Group(name="iam"),
|
||||
region="eu-west-1",
|
||||
name="test-resource",
|
||||
uid="test-arn",
|
||||
labels=[],
|
||||
type="AwsIamAccessAnalyzer",
|
||||
details="Test resource details",
|
||||
)
|
||||
],
|
||||
status_detail="This is a test",
|
||||
compliance=Compliance_OCSF(
|
||||
status="Success", requirements=[], status_detail="This is a test"
|
||||
),
|
||||
message="This is a test",
|
||||
severity_id=2,
|
||||
severity="Low",
|
||||
cloud=Cloud(
|
||||
account=Account(name="", uid="123456789012"),
|
||||
region="eu-west-1",
|
||||
org=Organization(uid="", name=""),
|
||||
provider="aws",
|
||||
project_uid="",
|
||||
),
|
||||
time=mktime(timestamp.timetuple()),
|
||||
metadata=Metadata(
|
||||
original_time=mktime(timestamp.timetuple()),
|
||||
profiles=["default"],
|
||||
product=Product(
|
||||
language="en",
|
||||
name="Prowler",
|
||||
version=prowler_version,
|
||||
vendor_name="Prowler/ProwlerPro",
|
||||
feature=Feature(
|
||||
name="iam_disable_30_days_credentials",
|
||||
uid="iam_disable_30_days_credentials",
|
||||
version=prowler_version,
|
||||
),
|
||||
),
|
||||
version="1.0.0-rc.3",
|
||||
),
|
||||
state_id=0,
|
||||
state="New",
|
||||
status_id=1,
|
||||
status="Success",
|
||||
type_uid=200101,
|
||||
type_name="Security Finding: Create",
|
||||
impact_id=0,
|
||||
impact="Unknown",
|
||||
confidence_id=0,
|
||||
confidence="Unknown",
|
||||
activity_id=1,
|
||||
activity_name="Create",
|
||||
category_uid=2,
|
||||
category_name="Findings",
|
||||
class_uid=2001,
|
||||
class_name="Security Finding",
|
||||
)
|
||||
output_options = mock.MagicMock()
|
||||
output_options.unix_timestamp = True
|
||||
assert fill_json_ocsf(input_audit_info, finding, output_options) == expected
|
||||
|
||||
def test_extract_findings_statistics_different_resources(self):
|
||||
|
||||
@@ -1,7 +1,18 @@
|
||||
from prowler.lib.utils.utils import validate_ip_address
|
||||
from datetime import datetime
|
||||
from time import mktime
|
||||
|
||||
from prowler.lib.utils.utils import outputs_unix_timestamp, validate_ip_address
|
||||
|
||||
|
||||
class Test_Validate_Ip_Address:
|
||||
class Test_utils:
|
||||
def test_validate_ip_address(self):
|
||||
assert validate_ip_address("88.26.151.198")
|
||||
assert not validate_ip_address("Not an IP")
|
||||
|
||||
def test_outputs_unix_timestamp_false(self):
|
||||
time = datetime.now()
|
||||
assert outputs_unix_timestamp(False, time) == time.isoformat()
|
||||
|
||||
def test_outputs_unix_timestamp_true(self):
|
||||
time = datetime.now()
|
||||
assert outputs_unix_timestamp(True, time) == mktime(time.timetuple())
|
||||
|
||||
@@ -91,6 +91,7 @@ class Test_Common_Output_Options:
|
||||
arguments.security_hub = True
|
||||
arguments.shodan = "test-api-key"
|
||||
arguments.only_logs = False
|
||||
arguments.unix_timestamp = False
|
||||
|
||||
audit_info = self.set_mocked_aws_audit_info()
|
||||
allowlist_file = ""
|
||||
@@ -119,6 +120,7 @@ class Test_Common_Output_Options:
|
||||
arguments.verbose = True
|
||||
arguments.output_filename = "output_test_filename"
|
||||
arguments.only_logs = False
|
||||
arguments.unix_timestamp = False
|
||||
|
||||
audit_info = self.set_mocked_gcp_audit_info()
|
||||
allowlist_file = ""
|
||||
@@ -147,6 +149,7 @@ class Test_Common_Output_Options:
|
||||
arguments.security_hub = True
|
||||
arguments.shodan = "test-api-key"
|
||||
arguments.only_logs = False
|
||||
arguments.unix_timestamp = False
|
||||
|
||||
# Mock AWS Audit Info
|
||||
audit_info = self.set_mocked_aws_audit_info()
|
||||
@@ -179,6 +182,7 @@ class Test_Common_Output_Options:
|
||||
arguments.output_directory = "output_test_directory"
|
||||
arguments.verbose = True
|
||||
arguments.only_logs = False
|
||||
arguments.unix_timestamp = False
|
||||
|
||||
# Mock Azure Audit Info
|
||||
audit_info = self.set_mocked_azure_audit_info()
|
||||
@@ -215,6 +219,7 @@ class Test_Common_Output_Options:
|
||||
arguments.output_directory = "output_test_directory"
|
||||
arguments.verbose = True
|
||||
arguments.only_logs = False
|
||||
arguments.unix_timestamp = False
|
||||
|
||||
# Mock Azure Audit Info
|
||||
audit_info = self.set_mocked_azure_audit_info()
|
||||
|
||||
Reference in New Issue
Block a user