diff --git a/config/config.py b/config/config.py index 71623eec..02a8e57c 100644 --- a/config/config.py +++ b/config/config.py @@ -8,6 +8,10 @@ from lib.logger import logger timestamp = datetime.today() timestamp_utc = datetime.now(timezone.utc).replace(tzinfo=timezone.utc) prowler_version = "3.0-beta-21Nov2022" +html_logo_url = "https://github.com/prowler-cloud/prowler/" +html_logo_img = ( + "https://github.com/prowler-cloud/prowler/raw/master/util/html/prowler-logo-new.png" +) orange_color = "\033[38;5;208m" banner_color = "\033[1;92m" @@ -25,6 +29,7 @@ timestamp_iso = timestamp.isoformat() csv_file_suffix = ".csv" json_file_suffix = ".json" json_asff_file_suffix = ".asff.json" +html_file_suffix = ".html" config_yaml = "providers/aws/config.yaml" diff --git a/lib/outputs/outputs.py b/lib/outputs/outputs.py index 860dd2da..b08f7fbf 100644 --- a/lib/outputs/outputs.py +++ b/lib/outputs/outputs.py @@ -10,14 +10,19 @@ from tabulate import tabulate from config.config import ( csv_file_suffix, + html_file_suffix, + html_logo_img, + html_logo_url, json_asff_file_suffix, json_file_suffix, orange_color, + output_file_timestamp, prowler_version, timestamp, timestamp_iso, timestamp_utc, ) +from lib.check.models import Output_From_Options from lib.logger import logger from lib.outputs.models import ( Check_Output_CSV, @@ -31,223 +36,264 @@ from lib.outputs.models import ( ) from lib.utils.utils import file_exists, hash_sha512, open_file from providers.aws.lib.allowlist.allowlist import is_allowlisted +from providers.aws.lib.audit_info.models import AWS_Audit_Info from providers.aws.lib.security_hub.security_hub import send_to_security_hub def report(check_findings, output_options, audit_info): - # Sort check findings - check_findings.sort(key=lambda x: x.region) + try: + # Sort check findings + check_findings.sort(key=lambda x: x.region) - # Generate the required output files - # csv_fields = [] - file_descriptors = {} - if output_options.output_modes: - # We have to create the required output files - file_descriptors = fill_file_descriptors( - output_options.output_modes, - output_options.output_directory, - output_options.output_filename, - ) + # Generate the required output files + # csv_fields = [] + file_descriptors = {} + if output_options.output_modes: + # We have to create the required output files + file_descriptors = fill_file_descriptors( + output_options.output_modes, + output_options.output_directory, + output_options.output_filename, + audit_info, + ) - if check_findings: - for finding in check_findings: - # Check if finding is allowlisted - if output_options.allowlist_file: - if is_allowlisted( - output_options.allowlist_file, - audit_info.audited_account, - finding.check_metadata.CheckID, - finding.region, - finding.resource_id, - ): - finding.status = "WARNING" - # Print findings by stdout - color = set_report_color(finding.status) - if output_options.is_quiet and "FAIL" in finding.status: - print( - f"\t{color}{finding.status}{Style.RESET_ALL} {finding.region}: {finding.status_extended}" - ) - elif not output_options.is_quiet and output_options.verbose: - print( - f"\t{color}{finding.status}{Style.RESET_ALL} {finding.region}: {finding.status_extended}" - ) - if file_descriptors: - if finding.check_metadata.Provider == "aws": - if "ens_rd2022_aws" in output_options.output_modes: - # We have to retrieve all the check's compliance requirements - check_compliance = output_options.bulk_checks_metadata[ - finding.check_metadata.CheckID - ].Compliance - for compliance in check_compliance: - if ( - compliance.Framework == "ENS" - and compliance.Version == "RD2022" - ): - for requirement in compliance.Requirements: - requirement_description = requirement.Description - requirement_id = requirement.Id - for attribute in requirement.Attributes: - compliance_row = Check_Output_CSV_ENS_RD2022( - Provider=finding.check_metadata.Provider, - AccountId=audit_info.audited_account, - Region=finding.region, - AssessmentDate=timestamp.isoformat(), - Requirements_Id=requirement_id, - Requirements_Description=requirement_description, - Requirements_Attributes_IdGrupoControl=attribute.get( - "IdGrupoControl" - ), - Requirements_Attributes_Marco=attribute.get( - "Marco" - ), - Requirements_Attributes_Categoria=attribute.get( - "Categoria" - ), - Requirements_Attributes_DescripcionControl=attribute.get( - "DescripcionControl" - ), - Requirements_Attributes_Nivel=attribute.get( - "Nivel" - ), - Requirements_Attributes_Tipo=attribute.get( - "Tipo" - ), - Requirements_Attributes_Dimensiones=",".join( - attribute.get("Dimensiones") - ), - Status=finding.status, - StatusExtended=finding.status_extended, - ResourceId=finding.resource_id, - CheckId=finding.check_metadata.CheckID, + if check_findings: + for finding in check_findings: + # Check if finding is allowlisted + if output_options.allowlist_file: + if is_allowlisted( + output_options.allowlist_file, + audit_info.audited_account, + finding.check_metadata.CheckID, + finding.region, + finding.resource_id, + ): + finding.status = "WARNING" + # Print findings by stdout + color = set_report_color(finding.status) + if output_options.is_quiet and "FAIL" in finding.status: + print( + f"\t{color}{finding.status}{Style.RESET_ALL} {finding.region}: {finding.status_extended}" + ) + elif not output_options.is_quiet and output_options.verbose: + print( + f"\t{color}{finding.status}{Style.RESET_ALL} {finding.region}: {finding.status_extended}" + ) + if file_descriptors: + if finding.check_metadata.Provider == "aws": + if "ens_rd2022_aws" in output_options.output_modes: + # We have to retrieve all the check's compliance requirements + check_compliance = output_options.bulk_checks_metadata[ + finding.check_metadata.CheckID + ].Compliance + for compliance in check_compliance: + if ( + compliance.Framework == "ENS" + and compliance.Version == "RD2022" + ): + for requirement in compliance.Requirements: + requirement_description = ( + requirement.Description ) + requirement_id = requirement.Id + for attribute in requirement.Attributes: + compliance_row = Check_Output_CSV_ENS_RD2022( + Provider=finding.check_metadata.Provider, + AccountId=audit_info.audited_account, + Region=finding.region, + AssessmentDate=timestamp.isoformat(), + Requirements_Id=requirement_id, + Requirements_Description=requirement_description, + Requirements_Attributes_IdGrupoControl=attribute.get( + "IdGrupoControl" + ), + Requirements_Attributes_Marco=attribute.get( + "Marco" + ), + Requirements_Attributes_Categoria=attribute.get( + "Categoria" + ), + Requirements_Attributes_DescripcionControl=attribute.get( + "DescripcionControl" + ), + Requirements_Attributes_Nivel=attribute.get( + "Nivel" + ), + Requirements_Attributes_Tipo=attribute.get( + "Tipo" + ), + Requirements_Attributes_Dimensiones=",".join( + attribute.get("Dimensiones") + ), + Status=finding.status, + StatusExtended=finding.status_extended, + ResourceId=finding.resource_id, + CheckId=finding.check_metadata.CheckID, + ) - csv_header = generate_csv_fields( - Check_Output_CSV_ENS_RD2022 - ) - csv_writer = DictWriter( - file_descriptors["ens_rd2022_aws"], - fieldnames=csv_header, - delimiter=";", - ) - csv_writer.writerow(compliance_row.__dict__) + csv_header = generate_csv_fields( + Check_Output_CSV_ENS_RD2022 + ) + csv_writer = DictWriter( + file_descriptors["ens_rd2022_aws"], + fieldnames=csv_header, + delimiter=";", + ) + csv_writer.writerow(compliance_row.__dict__) - if "csv" in file_descriptors: - finding_output = Check_Output_CSV( - audit_info.audited_account, - audit_info.profile, - finding, - audit_info.organizations_metadata, - ) - csv_writer = DictWriter( - file_descriptors["csv"], - fieldnames=generate_csv_fields(Check_Output_CSV), - delimiter=";", - ) - csv_writer.writerow(finding_output.__dict__) + if "csv" in file_descriptors: + finding_output = Check_Output_CSV( + audit_info.audited_account, + audit_info.profile, + finding, + audit_info.organizations_metadata, + ) + csv_writer = DictWriter( + file_descriptors["csv"], + fieldnames=generate_csv_fields(Check_Output_CSV), + delimiter=";", + ) + csv_writer.writerow(finding_output.__dict__) - if "json" in file_descriptors: - finding_output = Check_Output_JSON( - **finding.check_metadata.dict() - ) - fill_json(finding_output, audit_info, finding) + if "json" in file_descriptors: + finding_output = Check_Output_JSON( + **finding.check_metadata.dict() + ) + fill_json(finding_output, audit_info, finding) - json.dump( - finding_output.dict(), file_descriptors["json"], indent=4 - ) - file_descriptors["json"].write(",") + json.dump( + finding_output.dict(), + file_descriptors["json"], + indent=4, + ) + file_descriptors["json"].write(",") - if "json-asff" in file_descriptors: - finding_output = Check_Output_JSON_ASFF() - fill_json_asff(finding_output, audit_info, finding) + if "json-asff" in file_descriptors: + finding_output = Check_Output_JSON_ASFF() + fill_json_asff(finding_output, audit_info, finding) - json.dump( - finding_output.dict(), - file_descriptors["json-asff"], - indent=4, - ) - file_descriptors["json-asff"].write(",") + json.dump( + finding_output.dict(), + file_descriptors["json-asff"], + indent=4, + ) + file_descriptors["json-asff"].write(",") - # Check if it is needed to send findings to security hub - if output_options.security_hub_enabled: - send_to_security_hub( - finding.region, finding_output, audit_info.audit_session - ) - else: # No service resources in the whole account - color = set_report_color("INFO") - if not output_options.is_quiet and output_options.verbose: - print(f"\t{color}INFO{Style.RESET_ALL} There are no resources") - # Separator between findings and bar - if output_options.is_quiet or output_options.verbose: - print() - if file_descriptors: - # Close all file descriptors - for file_descriptor in file_descriptors: - file_descriptors.get(file_descriptor).close() + if "html" in file_descriptors: + fill_html(file_descriptors["html"], audit_info, finding) + + file_descriptors["html"].write("") + + # Check if it is needed to send findings to security hub + if output_options.security_hub_enabled: + send_to_security_hub( + finding.region, finding_output, audit_info.audit_session + ) + else: # No service resources in the whole account + color = set_report_color("INFO") + if not output_options.is_quiet and output_options.verbose: + print(f"\t{color}INFO{Style.RESET_ALL} There are no resources") + # Separator between findings and bar + if output_options.is_quiet or output_options.verbose: + print() + if file_descriptors: + # Close all file descriptors + for file_descriptor in file_descriptors: + file_descriptors.get(file_descriptor).close() + except Exception as error: + logger.error( + f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + ) def initialize_file_descriptor( - filename: str, output_mode: str, format: Any = None + filename: str, + output_mode: str, + audit_info: AWS_Audit_Info, + format: Any = None, ) -> TextIOWrapper: """Open/Create the output file. If needed include headers or the required format""" - - if file_exists(filename): - file_descriptor = open_file( - filename, - "a", - ) - else: - file_descriptor = open_file( - filename, - "a", - ) - - if output_mode in ("csv", "ens_rd2022_aws"): - # Format is the class model of the CSV format to print the headers - csv_header = [x.upper() for x in generate_csv_fields(format)] - csv_writer = DictWriter( - file_descriptor, fieldnames=csv_header, delimiter=";" - ) - csv_writer.writeheader() - - if output_mode in ("json", "json-asff"): + try: + if file_exists(filename): file_descriptor = open_file( filename, "a", ) - file_descriptor.write("[") + else: + file_descriptor = open_file( + filename, + "a", + ) + + if output_mode in ("csv", "ens_rd2022_aws"): + # Format is the class model of the CSV format to print the headers + csv_header = [x.upper() for x in generate_csv_fields(format)] + csv_writer = DictWriter( + file_descriptor, fieldnames=csv_header, delimiter=";" + ) + csv_writer.writeheader() + + if output_mode in ("json", "json-asff"): + file_descriptor.write("[") + + if "html" in output_mode: + add_html_header(file_descriptor, audit_info) + except Exception as error: + logger.error( + f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + ) return file_descriptor -def fill_file_descriptors(output_modes, output_directory, output_filename): - file_descriptors = {} - if output_modes: - for output_mode in output_modes: - if output_mode == "csv": - filename = f"{output_directory}/{output_filename}{csv_file_suffix}" - file_descriptor = initialize_file_descriptor( - filename, output_mode, Check_Output_CSV - ) - file_descriptors.update({output_mode: file_descriptor}) +def fill_file_descriptors(output_modes, output_directory, output_filename, audit_info): + try: + file_descriptors = {} + if output_modes: + for output_mode in output_modes: + if output_mode == "csv": + filename = f"{output_directory}/{output_filename}{csv_file_suffix}" + file_descriptor = initialize_file_descriptor( + filename, + output_mode, + audit_info, + Check_Output_CSV, + ) + file_descriptors.update({output_mode: file_descriptor}) - if output_mode == "json": - filename = f"{output_directory}/{output_filename}{json_file_suffix}" - file_descriptor = initialize_file_descriptor(filename, output_mode) - file_descriptors.update({output_mode: file_descriptor}) + if output_mode == "json": + filename = f"{output_directory}/{output_filename}{json_file_suffix}" + file_descriptor = initialize_file_descriptor( + filename, output_mode, audit_info + ) + file_descriptors.update({output_mode: file_descriptor}) - if output_mode == "json-asff": - filename = ( - f"{output_directory}/{output_filename}{json_asff_file_suffix}" - ) - file_descriptor = initialize_file_descriptor(filename, output_mode) - file_descriptors.update({output_mode: file_descriptor}) + if output_mode == "json-asff": + filename = ( + f"{output_directory}/{output_filename}{json_asff_file_suffix}" + ) + file_descriptor = initialize_file_descriptor( + filename, output_mode, audit_info + ) + file_descriptors.update({output_mode: file_descriptor}) - if output_mode == "ens_rd2022_aws": - filename = f"{output_directory}/{output_filename}_ens_rd2022_aws{csv_file_suffix}" - file_descriptor = initialize_file_descriptor( - filename, output_mode, Check_Output_CSV_ENS_RD2022 - ) - file_descriptors.update({output_mode: file_descriptor}) + if output_mode == "html": + filename = f"{output_directory}/{output_filename}{html_file_suffix}" + file_descriptor = initialize_file_descriptor( + filename, output_mode, audit_info + ) + file_descriptors.update({output_mode: file_descriptor}) + + if output_mode == "ens_rd2022_aws": + filename = f"{output_directory}/{output_filename}_ens_rd2022_aws{csv_file_suffix}" + file_descriptor = initialize_file_descriptor( + filename, output_mode, audit_info, Check_Output_CSV_ENS_RD2022 + ) + file_descriptors.update({output_mode: file_descriptor}) + except Exception as error: + logger.error( + f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + ) return file_descriptors @@ -333,6 +379,34 @@ def fill_json_asff(finding_output, audit_info, finding): return finding_output +def fill_html(file_descriptor, audit_info, finding): + row_class = "p-3 mb-2 bg-success-custom" + if finding.status == "INFO": + row_class = "table-info" + elif finding.status == "FAIL": + row_class = "table-danger" + elif finding.status == "WARNING": + row_class = "table-warning" + file_descriptor.write( + f""" +
{finding.check_metadata.Risk}
{finding.check_metadata.Remediation.Recommendation.Text}
| Status | +Severity | +Account ID | +Region | +Service | +Check ID | +Check Title | +Check Output | +Risk | +Remediation | +Related URL | +Resource ID | +
|---|