feat(cis_ouput): add csv output and table (#1532)

Co-authored-by: sergargar <sergio@verica.io>
This commit is contained in:
Sergio Garcia
2022-12-07 12:06:28 +01:00
committed by GitHub
parent 5e40d93d63
commit 05075d6508
5 changed files with 323 additions and 117 deletions

View File

@@ -111,6 +111,29 @@ class Check_Output_CSV_ENS_RD2022(BaseModel):
CheckId: str
class Check_Output_CSV_CIS(BaseModel):
Provider: str
AccountId: str
Region: str
AssessmentDate: str
Requirements_Id: str
Requirements_Description: str
Requirements_Attributes_Section: str
Requirements_Attributes_Profile: str
Requirements_Attributes_AssessmentStatus: str
Requirements_Attributes_Description: str
Requirements_Attributes_RationaleStatement: str
Requirements_Attributes_ImpactStatement: str
Requirements_Attributes_RemediationProcedure: str
Requirements_Attributes_AuditProcedure: str
Requirements_Attributes_AdditionalInformation: str
Requirements_Attributes_References: str
Status: str
StatusExtended: str
ResourceId: str
CheckId: str
@dataclass
class Check_Output_CSV:
assessment_start_time: str

View File

@@ -26,6 +26,7 @@ from lib.check.models import Output_From_Options
from lib.logger import logger
from lib.outputs.models import (
Check_Output_CSV,
Check_Output_CSV_CIS,
Check_Output_CSV_ENS_RD2022,
Check_Output_JSON,
Check_Output_JSON_ASFF,
@@ -141,6 +142,73 @@ def report(check_findings, output_options, audit_info):
delimiter=";",
)
csv_writer.writerow(compliance_row.__dict__)
elif "cis" in str(output_options.output_modes):
# We have to retrieve all the check's compliance requirements
check_compliance = output_options.bulk_checks_metadata[
finding.check_metadata.CheckID
].Compliance
for compliance in check_compliance:
if compliance.Framework == "CIS-AWS":
for requirement in compliance.Requirements:
requirement_description = (
requirement.Description
)
requirement_id = requirement.Id
for attribute in requirement.Attributes:
compliance_row = Check_Output_CSV_CIS(
Provider=finding.check_metadata.Provider,
AccountId=audit_info.audited_account,
Region=finding.region,
AssessmentDate=timestamp.isoformat(),
Requirements_Id=requirement_id,
Requirements_Description=requirement_description,
Requirements_Attributes_Section=attribute.get(
"Section"
),
Requirements_Attributes_Profile=attribute.get(
"Profile"
),
Requirements_Attributes_AssessmentStatus=attribute.get(
"AssessmentStatus"
),
Requirements_Attributes_Description=attribute.get(
"Description"
),
Requirements_Attributes_RationaleStatement=attribute.get(
"RationaleStatement"
),
Requirements_Attributes_ImpactStatement=attribute.get(
"ImpactStatement"
),
Requirements_Attributes_RemediationProcedure=attribute.get(
"RemediationProcedure"
),
Requirements_Attributes_AuditProcedure=attribute.get(
"AuditProcedure"
),
Requirements_Attributes_AdditionalInformation=attribute.get(
"AdditionalInformation"
),
Requirements_Attributes_References=attribute.get(
"References"
),
Status=finding.status,
StatusExtended=finding.status_extended,
ResourceId=finding.resource_id,
CheckId=finding.check_metadata.CheckID,
)
csv_header = generate_csv_fields(
Check_Output_CSV_CIS
)
csv_writer = DictWriter(
file_descriptors[
output_options.output_modes[-1]
],
fieldnames=csv_header,
delimiter=";",
)
csv_writer.writerow(compliance_row.__dict__)
if "csv" in file_descriptors:
finding_output = Check_Output_CSV(
@@ -226,7 +294,7 @@ def initialize_file_descriptor(
"a",
)
if output_mode in ("csv", "ens_rd2022_aws"):
if output_mode in ("csv", "ens_rd2022_aws", "cis_1.5_aws", "cis_1.4_aws"):
# Format is the class model of the CSV format to print the headers
csv_header = [x.upper() for x in generate_csv_fields(format)]
csv_writer = DictWriter(
@@ -291,6 +359,20 @@ def fill_file_descriptors(output_modes, output_directory, output_filename, audit
filename, output_mode, audit_info, Check_Output_CSV_ENS_RD2022
)
file_descriptors.update({output_mode: file_descriptor})
if output_mode == "cis_1.5_aws":
filename = f"{output_directory}/{output_filename}_cis_1.5_aws{csv_file_suffix}"
file_descriptor = initialize_file_descriptor(
filename, output_mode, audit_info, Check_Output_CSV_CIS
)
file_descriptors.update({output_mode: file_descriptor})
if output_mode == "cis_1.4_aws":
filename = f"{output_directory}/{output_filename}_cis_1.4_aws{csv_file_suffix}"
file_descriptor = initialize_file_descriptor(
filename, output_mode, audit_info, Check_Output_CSV_CIS
)
file_descriptors.update({output_mode: file_descriptor})
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
@@ -673,29 +755,129 @@ def display_compliance_table(
ens_compliance_table["Bajo"].append(
f"{Fore.BLUE}{marcos[marco]['Bajo']}{Style.RESET_ALL}"
)
print(
f"\nEstado de Cumplimiento de {Fore.YELLOW}ENS RD2022 - AWS{Style.RESET_ALL}:"
)
overview_table = [
[
f"{Fore.RED}{round(fail_count/(fail_count+pass_count)*100, 2)}% ({fail_count}) NO CUMPLE{Style.RESET_ALL}",
f"{Fore.GREEN}{round(pass_count/(fail_count+pass_count)*100, 2)}% ({pass_count}) CUMPLE{Style.RESET_ALL}",
if fail_count + pass_count < 0:
print(
f"\n {Style.BRIGHT}There are no resources for {Fore.YELLOW}ENS RD2022 - AWS{Style.RESET_ALL}.\n"
)
else:
print(
f"\nEstado de Cumplimiento de {Fore.YELLOW}ENS RD2022 - AWS{Style.RESET_ALL}:"
)
overview_table = [
[
f"{Fore.RED}{round(fail_count/(fail_count+pass_count)*100, 2)}% ({fail_count}) NO CUMPLE{Style.RESET_ALL}",
f"{Fore.GREEN}{round(pass_count/(fail_count+pass_count)*100, 2)}% ({pass_count}) CUMPLE{Style.RESET_ALL}",
]
]
]
print(tabulate(overview_table, tablefmt="rounded_grid"))
print(f"\nResultados de {Fore.YELLOW}ENS RD2022 - AWS{Style.RESET_ALL}:")
print(
tabulate(ens_compliance_table, headers="keys", tablefmt="rounded_grid")
)
print(
f"{Style.BRIGHT}* Solo aparece el Marco/Categoria que contiene resultados.{Style.RESET_ALL}"
)
print("\nResultados detallados en:")
print(
f" - CSV: {output_directory}/{output_filename}_{compliance_framework[0]}.csv\n"
)
print(tabulate(overview_table, tablefmt="rounded_grid"))
print(
f"\nResultados de {Fore.YELLOW}ENS RD2022 - AWS{Style.RESET_ALL}:"
)
print(
tabulate(
ens_compliance_table, headers="keys", tablefmt="rounded_grid"
)
)
print(
f"{Style.BRIGHT}* Solo aparece el Marco/Categoria que contiene resultados.{Style.RESET_ALL}"
)
print("\nResultados detallados en:")
print(
f" - CSV: {output_directory}/{output_filename}_{compliance_framework[0]}.csv\n"
)
if "cis" in str(compliance_framework):
sections = {}
cis_compliance_table = {
"Provider": [],
"Section": [],
"Level 1": [],
"Level 2": [],
}
pass_count = fail_count = 0
for finding in findings:
check = bulk_checks_metadata[finding.check_metadata.CheckID]
check_compliances = check.Compliance
for compliance in check_compliances:
if compliance.Framework == "CIS-AWS" and compliance.Version in str(
compliance_framework
):
compliance_version = compliance.Version
for requirement in compliance.Requirements:
for attribute in requirement.Attributes:
section = attribute["Section"]
# Check if Section exists
if section not in sections:
sections[section] = {
"Status": f"{Fore.GREEN}PASS{Style.RESET_ALL}",
"Level 1": {"FAIL": 0, "PASS": 0},
"Level 2": {"FAIL": 0, "PASS": 0},
}
if finding.status == "FAIL":
fail_count += 1
elif finding.status == "PASS":
pass_count += 1
if attribute["Profile"] == "Level 1":
if finding.status == "FAIL":
sections[section]["Level 1"]["FAIL"] += 1
else:
sections[section]["Level 1"]["PASS"] += 1
elif attribute["Profile"] == "Level 2":
if finding.status == "FAIL":
sections[section]["Level 2"]["FAIL"] += 1
else:
sections[section]["Level 2"]["PASS"] += 1
# Add results to table
sections = dict(sorted(sections.items()))
for section in sections:
cis_compliance_table["Provider"].append("aws")
cis_compliance_table["Section"].append(section)
if sections[section]["Level 1"]["FAIL"] > 0:
cis_compliance_table["Level 1"].append(
f"{Fore.RED}FAIL({sections[section]['Level 1']['FAIL']}){Style.RESET_ALL}"
)
else:
cis_compliance_table["Level 1"].append(
f"{Fore.GREEN}PASS({sections[section]['Level 1']['PASS']}){Style.RESET_ALL}"
)
if sections[section]["Level 2"]["FAIL"] > 0:
cis_compliance_table["Level 2"].append(
f"{Fore.RED}FAIL({sections[section]['Level 2']['FAIL']}){Style.RESET_ALL}"
)
else:
cis_compliance_table["Level 2"].append(
f"{Fore.GREEN}PASS({sections[section]['Level 2']['PASS']}){Style.RESET_ALL}"
)
if fail_count + pass_count < 0:
print(
f"\n {Style.BRIGHT}There are no resources for {Fore.YELLOW}{compliance.Framework}-{compliance.Version}{Style.RESET_ALL}.\n"
)
else:
print(
f"\nCompliance Status of {Fore.YELLOW}{compliance.Framework}-{compliance_version}{Style.RESET_ALL} Framework:"
)
overview_table = [
[
f"{Fore.RED}{round(fail_count/(fail_count+pass_count)*100, 2)}% ({fail_count}) FAIL{Style.RESET_ALL}",
f"{Fore.GREEN}{round(pass_count/(fail_count+pass_count)*100, 2)}% ({pass_count}) PASS{Style.RESET_ALL}",
]
]
print(tabulate(overview_table, tablefmt="rounded_grid"))
print(
f"\nFramework {Fore.YELLOW}{compliance.Framework}-{compliance_version}{Style.RESET_ALL} Results:"
)
print(
tabulate(
cis_compliance_table, headers="keys", tablefmt="rounded_grid"
)
)
print(
f"{Style.BRIGHT}* Only sections containing results appear.{Style.RESET_ALL}"
)
print("\nDetailed Results in:")
print(
f" - CSV: {output_directory}/{output_filename}_{compliance_framework[0]}.csv\n"
)
except Exception as error:
logger.critical(
f"{error.__class__.__name__}:{error.__traceback__.tb_lineno} -- {error}"