mirror of
https://github.com/ghndrx/prowler.git
synced 2026-02-10 23:05:05 +00:00
* feat(json): add json output * feat(pydantic): add pydantic model to json output Co-authored-by: sergargar <sergio@verica.io>
222 lines
7.3 KiB
Python
222 lines
7.3 KiB
Python
from dataclasses import asdict, dataclass
|
|
from typing import Optional
|
|
|
|
from pydantic import BaseModel
|
|
|
|
from config.config import timestamp
|
|
from lib.check.models import Check_Report
|
|
from providers.aws.models import AWS_Organizations_Info
|
|
|
|
|
|
@dataclass
|
|
class Compliance_Framework:
|
|
Framework: str
|
|
Version: str
|
|
Group: list
|
|
Control: list
|
|
|
|
|
|
class Check_Output_JSON(BaseModel):
|
|
AssessmentStartTime: Optional[str]
|
|
FindingUniqueId: Optional[str]
|
|
Provider: str
|
|
Profile: Optional[str]
|
|
AccountId: Optional[str]
|
|
OrganizationsInfo: Optional[dict]
|
|
Region: Optional[str]
|
|
CheckID: str
|
|
CheckName: str
|
|
CheckTitle: str
|
|
CheckType: str
|
|
ServiceName: str
|
|
SubServiceName: str
|
|
Status: Optional[str]
|
|
StatusExtended: Optional[str]
|
|
Severity: str
|
|
ResourceId: Optional[str]
|
|
ResourceArn: Optional[str]
|
|
ResourceType: str
|
|
ResourceDetails: Optional[str]
|
|
Tags: dict
|
|
Description: str
|
|
Risk: str
|
|
RelatedUrl: str
|
|
Remediation: dict
|
|
Categories: list
|
|
DependsOn: list
|
|
RelatedTo: list
|
|
Notes: str
|
|
Compliance: list
|
|
|
|
|
|
@dataclass
|
|
class Check_Output_CSV:
|
|
assessment_start_time: str
|
|
finding_unique_id: str
|
|
provider: str
|
|
profile: str
|
|
account_id: int
|
|
account_name: str
|
|
account_email: str
|
|
account_arn: str
|
|
account_org: str
|
|
account_tags: str
|
|
region: str
|
|
check_id: str
|
|
check_name: str
|
|
check_title: str
|
|
check_type: str
|
|
status: str
|
|
status_extended: str
|
|
service_name: str
|
|
subservice_name: str
|
|
severity: str
|
|
resource_id: str
|
|
resource_arn: str
|
|
resource_type: str
|
|
resource_details: str
|
|
resource_tags: list
|
|
description: dict
|
|
risk: list
|
|
related_url: list
|
|
remediation_recommendation_text: str
|
|
remediation_recommendation_url: list
|
|
remediation_recommendation_code_nativeiac: str
|
|
remediation_recommendation_code_terraform: str
|
|
remediation_recommendation_code_cli: str
|
|
remediation_recommendation_code_other: str
|
|
categories: str
|
|
depends_on: str
|
|
related_to: str
|
|
notes: str
|
|
compliance: str
|
|
|
|
def get_csv_header(self):
|
|
csv_header = []
|
|
for key in asdict(self):
|
|
csv_header = csv_header.append(key)
|
|
return csv_header
|
|
|
|
def __init__(
|
|
self,
|
|
account: str,
|
|
profile: str,
|
|
report: Check_Report,
|
|
organizations: AWS_Organizations_Info,
|
|
):
|
|
self.assessment_start_time = timestamp.isoformat()
|
|
self.finding_unique_id = ""
|
|
self.provider = report.check_metadata.Provider
|
|
self.profile = profile
|
|
self.account_id = account
|
|
if organizations:
|
|
self.account_name = organizations.account_details_name
|
|
self.account_email = organizations.account_details_email
|
|
self.account_arn = organizations.account_details_arn
|
|
self.account_org = organizations.account_details_org
|
|
self.account_tags = organizations.account_details_tags
|
|
self.region = report.region
|
|
self.check_id = report.check_metadata.CheckID
|
|
self.check_name = report.check_metadata.CheckName
|
|
self.check_title = report.check_metadata.CheckTitle
|
|
self.check_type = report.check_metadata.CheckType
|
|
self.status = report.status
|
|
self.status_extended = report.status_extended
|
|
self.service_name = report.check_metadata.ServiceName
|
|
self.subservice_name = report.check_metadata.SubServiceName
|
|
self.severity = report.check_metadata.Severity
|
|
self.resource_id = report.resource_id
|
|
self.resource_arn = report.resource_arn
|
|
self.resource_type = report.check_metadata.ResourceType
|
|
self.resource_details = report.resource_details
|
|
self.resource_tags = report.resource_tags
|
|
self.description = report.check_metadata.Description
|
|
self.risk = report.check_metadata.Risk
|
|
self.related_url = report.check_metadata.RelatedUrl
|
|
self.remediation_recommendation_text = report.check_metadata.Remediation[
|
|
"Recommendation"
|
|
]["Text"]
|
|
self.remediation_recommendation_url = report.check_metadata.Remediation[
|
|
"Recommendation"
|
|
]["Url"]
|
|
self.remediation_recommendation_code_nativeiac = (
|
|
report.check_metadata.Remediation["Code"]["NativeIaC"]
|
|
)
|
|
self.remediation_recommendation_code_terraform = (
|
|
report.check_metadata.Remediation["Code"]["Terraform"]
|
|
)
|
|
self.remediation_recommendation_code_cli = report.check_metadata.Remediation[
|
|
"Code"
|
|
]["cli"]
|
|
self.remediation_recommendation_code_cli = report.check_metadata.Remediation[
|
|
"Code"
|
|
]["cli"]
|
|
self.remediation_recommendation_code_other = report.check_metadata.Remediation[
|
|
"Code"
|
|
]["other"]
|
|
self.categories = self.__unroll_list__(report.check_metadata.Categories)
|
|
self.depends_on = self.__unroll_list__(report.check_metadata.DependsOn)
|
|
self.related_to = self.__unroll_list__(report.check_metadata.RelatedTo)
|
|
self.notes = report.check_metadata.Notes
|
|
self.compliance = self.__unroll_compliance__(report.check_metadata.Compliance)
|
|
|
|
def __unroll_list__(self, listed_items: list):
|
|
unrolled_items = ""
|
|
separator = "|"
|
|
for item in listed_items:
|
|
if not unrolled_items:
|
|
unrolled_items = f"{item}"
|
|
else:
|
|
unrolled_items = f"{unrolled_items}{separator}{item}"
|
|
|
|
return unrolled_items
|
|
|
|
def __unroll_dict__(self, dict_items: dict):
|
|
unrolled_items = ""
|
|
separator = "|"
|
|
for key, value in dict_items.items():
|
|
unrolled_item = f"{key}:{value}"
|
|
if not unrolled_items:
|
|
unrolled_items = f"{unrolled_item}"
|
|
else:
|
|
unrolled_items = f"{unrolled_items}{separator}{unrolled_item}"
|
|
|
|
return unrolled_items
|
|
|
|
def __unroll_compliance__(self, compliance: list):
|
|
compliance_frameworks = []
|
|
# fill list of dataclasses
|
|
for item in compliance:
|
|
compliance_framework = Compliance_Framework(
|
|
Framework=item["Framework"],
|
|
Version=item["Version"],
|
|
Group=item["Group"],
|
|
Control=item["Control"],
|
|
)
|
|
compliance_frameworks.append(compliance_framework)
|
|
# iterate over list of dataclasses to output info
|
|
unrolled_compliance = ""
|
|
groups = ""
|
|
controls = ""
|
|
item_separator = ","
|
|
framework_separator = "|"
|
|
generic_separator = "/"
|
|
for framework in compliance_frameworks:
|
|
for group in framework.Group:
|
|
if groups:
|
|
groups = f"{groups}{generic_separator}"
|
|
groups = f"{groups}{group}"
|
|
for control in framework.Control:
|
|
if controls:
|
|
controls = f"{controls}{generic_separator}"
|
|
controls = f"{controls}{control}"
|
|
|
|
if unrolled_compliance:
|
|
unrolled_compliance = f"{unrolled_compliance}{framework_separator}"
|
|
unrolled_compliance = f"{unrolled_compliance}{framework.Framework}{item_separator}{framework.Version}{item_separator}{groups}{item_separator}{controls}"
|
|
# unset groups and controls for next framework
|
|
controls = ""
|
|
groups = ""
|
|
|
|
return unrolled_compliance
|