mirror of
https://github.com/ghndrx/prowler.git
synced 2026-02-10 14:55:00 +00:00
feat(outputS): Output generation format CSV (#1230)
* chore(csv): first version csv output * chore(pytest): added pytest dependency * chore(outputs): organizations demo * chore(compliance): Added new dataclass for each compliance framework * fix(test org values): deleted test values in orgs instantiation * fix(csv): formatted to match output format * fix(csv output): Reformulation of check report and minor changes * fix(minor issues): Fix various issues coming from PR comments * fix(csv): Renamed csv output data model * fix(output dir): create default if not present * fix(typo): remove s * fix(oldcode) * fix(typo) * fix(output): Only send to csv when -M is passed Co-authored-by: sergargar <sergio@verica.io> Co-authored-by: Pepe Fagoaga <pepe@verica.io>
This commit is contained in:
189
lib/outputs/models.py
Normal file
189
lib/outputs/models.py
Normal file
@@ -0,0 +1,189 @@
|
||||
from dataclasses import asdict, dataclass
|
||||
|
||||
from config.config import timestamp
|
||||
from lib.check.models import Check_Report, Organizations_Info
|
||||
|
||||
|
||||
@dataclass
|
||||
class Compliance_Framework:
|
||||
Framework: str
|
||||
Version: str
|
||||
Group: list
|
||||
Control: list
|
||||
|
||||
|
||||
@dataclass
|
||||
class Check_Output_CSV:
|
||||
assessment_start_time: str
|
||||
finding_unique_id: str
|
||||
provider: str
|
||||
profile: str
|
||||
account_id: int
|
||||
account_name: str
|
||||
account_email: str
|
||||
account_arn: str
|
||||
account_org: str
|
||||
account_tags: str
|
||||
region: str
|
||||
check_id: str
|
||||
check_name: str
|
||||
check_title: str
|
||||
check_type: str
|
||||
status: str
|
||||
status_extended: str
|
||||
service_name: str
|
||||
subservice_name: str
|
||||
severity: str
|
||||
resource_id: str
|
||||
resource_arn: str
|
||||
resource_type: str
|
||||
resource_details: str
|
||||
resource_tags: list
|
||||
description: dict
|
||||
risk: list
|
||||
related_url: list
|
||||
remediation_recommendation_text: str
|
||||
remediation_recommendation_url: list
|
||||
remediation_recommendation_code_nativeiac: str
|
||||
remediation_recommendation_code_terraform: str
|
||||
remediation_recommendation_code_cli: str
|
||||
remediation_recommendation_code_other: str
|
||||
categories: str
|
||||
depends_on: str
|
||||
related_to: str
|
||||
notes: str
|
||||
compliance: str
|
||||
|
||||
def get_csv_header(self):
|
||||
csv_header = []
|
||||
for key in asdict(self):
|
||||
csv_header = csv_header.append(key)
|
||||
return csv_header
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
account: str,
|
||||
profile: str,
|
||||
report: Check_Report,
|
||||
organizations: Organizations_Info,
|
||||
):
|
||||
self.assessment_start_time = timestamp.isoformat()
|
||||
self.finding_unique_id = ""
|
||||
self.provider = report.check_metadata.Provider
|
||||
self.profile = profile
|
||||
self.account_id = account
|
||||
self.account_name = organizations.account_details_name
|
||||
self.account_email = organizations.account_details_email
|
||||
self.account_arn = organizations.account_details_arn
|
||||
self.account_org = organizations.account_details_org
|
||||
self.account_tags = organizations.account_details_tags
|
||||
self.region = report.region
|
||||
self.check_id = report.check_metadata.CheckID
|
||||
self.check_name = report.check_metadata.CheckName
|
||||
self.check_title = report.check_metadata.CheckTitle
|
||||
self.check_type = report.check_metadata.CheckType
|
||||
self.status = report.status
|
||||
self.status_extended = report.status_extended
|
||||
self.service_name = report.check_metadata.ServiceName
|
||||
self.subservice_name = report.check_metadata.SubServiceName
|
||||
self.severity = report.check_metadata.Severity
|
||||
self.resource_id = report.resource_id
|
||||
self.resource_arn = report.resource_arn
|
||||
self.resource_type = report.check_metadata.ResourceType
|
||||
self.resource_details = report.resource_details
|
||||
self.resource_tags = report.resource_tags
|
||||
self.description = report.check_metadata.Description
|
||||
self.risk = report.check_metadata.Risk
|
||||
self.related_url = report.check_metadata.RelatedUrl
|
||||
self.remediation_recommendation_text = report.check_metadata.Remediation[
|
||||
"Recommendation"
|
||||
]["Text"]
|
||||
self.remediation_recommendation_url = report.check_metadata.Remediation[
|
||||
"Recommendation"
|
||||
]["Url"]
|
||||
self.remediation_recommendation_code_nativeiac = (
|
||||
report.check_metadata.Remediation["Code"]["NativeIaC"]
|
||||
)
|
||||
self.remediation_recommendation_code_terraform = (
|
||||
report.check_metadata.Remediation["Code"]["Terraform"]
|
||||
)
|
||||
self.remediation_recommendation_code_cli = report.check_metadata.Remediation[
|
||||
"Code"
|
||||
]["cli"]
|
||||
self.remediation_recommendation_code_cli = report.check_metadata.Remediation[
|
||||
"Code"
|
||||
]["cli"]
|
||||
self.remediation_recommendation_code_other = report.check_metadata.Remediation[
|
||||
"Code"
|
||||
]["other"]
|
||||
self.categories = self.__unroll_list__(report.check_metadata.Categories)
|
||||
self.depends_on = self.__unroll_list__(report.check_metadata.DependsOn)
|
||||
self.related_to = self.__unroll_list__(report.check_metadata.RelatedTo)
|
||||
self.notes = report.check_metadata.Notes
|
||||
self.compliance = self.__unroll_compliance__(report.check_metadata.Compliance)
|
||||
|
||||
def __unroll_list__(self, listed_items: list):
|
||||
unrolled_items = ""
|
||||
separator = "|"
|
||||
for item in listed_items:
|
||||
if not unrolled_items:
|
||||
unrolled_items = f"{item}"
|
||||
else:
|
||||
unrolled_items = f"{unrolled_items}{separator}{item}"
|
||||
|
||||
return unrolled_items
|
||||
|
||||
def __unroll_dict__(self, dict_items: dict):
|
||||
unrolled_items = ""
|
||||
separator = "|"
|
||||
for key, value in dict_items.items():
|
||||
unrolled_item = f"{key}:{value}"
|
||||
if not unrolled_items:
|
||||
unrolled_items = f"{unrolled_item}"
|
||||
else:
|
||||
unrolled_items = f"{unrolled_items}{separator}{unrolled_item}"
|
||||
|
||||
return unrolled_items
|
||||
|
||||
def __unroll_compliance__(self, compliance: list):
|
||||
compliance_frameworks = []
|
||||
# fill list of dataclasses
|
||||
for item in compliance:
|
||||
compliance_framework = Compliance_Framework(
|
||||
Framework=item["Framework"],
|
||||
Version=item["Version"],
|
||||
Group=item["Group"],
|
||||
Control=item["Control"],
|
||||
)
|
||||
compliance_frameworks.append(compliance_framework)
|
||||
# iterate over list of dataclasses to output info
|
||||
unrolled_compliance = ""
|
||||
groups = ""
|
||||
controls = ""
|
||||
item_separator = ","
|
||||
framework_separator = "|"
|
||||
generic_separator = "/"
|
||||
for framework in compliance_frameworks:
|
||||
for group in framework.Group:
|
||||
if groups:
|
||||
groups = f"{groups}{generic_separator}"
|
||||
groups = f"{groups}{group}"
|
||||
for control in framework.Control:
|
||||
if controls:
|
||||
controls = f"{controls}{generic_separator}"
|
||||
controls = f"{controls}{control}"
|
||||
|
||||
if unrolled_compliance:
|
||||
unrolled_compliance = f"{unrolled_compliance}{framework_separator}"
|
||||
unrolled_compliance = f"{unrolled_compliance}{framework.Framework}{item_separator}{framework.Version}{item_separator}{groups}{item_separator}{controls}"
|
||||
# unset groups and controls for next framework
|
||||
controls = ""
|
||||
groups = ""
|
||||
|
||||
return unrolled_compliance
|
||||
|
||||
def get_csv_header(self):
|
||||
csv_header = []
|
||||
for key in asdict(self):
|
||||
csv_header = csv_header.append(key)
|
||||
return csv_header
|
||||
Reference in New Issue
Block a user