feat(json-asff): add json-asff ouput (#1252)

* feat(json): add json output

* feat(pydantic): add pydantic model to json output

* feat(json-asff): add json-asff ouput

* Update config/config.py

Co-authored-by: Pepe Fagoaga <pepe@verica.io>

* Update models.py

* fix(comments): Resolve comments.

Co-authored-by: sergargar <sergio@verica.io>
Co-authored-by: Pepe Fagoaga <pepe@verica.io>
This commit is contained in:
Sergio Garcia
2022-07-08 09:37:32 +02:00
committed by GitHub
parent db3de2d69e
commit 611bd909ef
9 changed files with 205 additions and 63 deletions

View File

@@ -12,5 +12,7 @@ aws_services_json_file = "providers/aws/aws_regions_by_service.json"
default_output_directory = getcwd() + "/output"
csv_file_suffix = timestamp.strftime("%Y%m%d%H%M%S") + ".csv"
json_file_suffix = timestamp.strftime("%Y%m%d%H%M%S") + ".json"
output_file_timestamp = timestamp.strftime("%Y%m%d%H%M%S")
csv_file_suffix = f"{output_file_timestamp}.csv"
json_file_suffix = f"{output_file_timestamp}.json"
json_asff_file_suffix = f"{output_file_timestamp}.asff.json"

View File

@@ -35,10 +35,10 @@
"RelatedUrl": "https://serviceofficialsiteorpageforthissubject",
"Remediation": {
"Code": {
"CLI": "cli command or URL to the cli command location.",
"NativeIaC": "code or URL to the code location.",
"Terraform": "code or URL to the code location.",
"cli": "cli command or URL to the cli command location.",
"other": "cli command or URL to the cli command location."
"Other": "cli command or URL to the cli command location.",
"Terraform": "code or URL to the code location."
},
"Recommendation": {
"Text": "Run sudo yum update and cross your fingers and toes.",

View File

@@ -26,7 +26,30 @@ def load_check_metadata(metadata_file: str) -> dict:
return check_metadata
# Check all values
class ComplianceItem(BaseModel):
Control: List[str]
Framework: str
Group: List[str]
Version: str
class Code(BaseModel):
NativeIaC: str
Terraform: str
CLI: str
Other: str
class Recommendation(BaseModel):
Text: str
Url: str
class Remediation(BaseModel):
Code: Code
Recommendation: Recommendation
class Check_Metadata_Model(BaseModel):
Provider: str
CheckID: str
@@ -42,13 +65,13 @@ class Check_Metadata_Model(BaseModel):
Description: str
Risk: str
RelatedUrl: str
Remediation: dict
Remediation: Remediation
Categories: List[str]
Tags: dict
DependsOn: List[str]
RelatedTo: List[str]
Notes: str
Compliance: List
Compliance: List[ComplianceItem]
class Check(ABC):

View File

@@ -1,10 +1,10 @@
from dataclasses import asdict, dataclass
from typing import Optional
from typing import List, Optional
from pydantic import BaseModel
from config.config import timestamp
from lib.check.models import Check_Report
from lib.check.models import Check_Report, ComplianceItem, Remediation
from providers.aws.models import AWS_Organizations_Info
@@ -17,36 +17,78 @@ class Compliance_Framework:
class Check_Output_JSON(BaseModel):
AssessmentStartTime: Optional[str]
FindingUniqueId: Optional[str]
AssessmentStartTime: str = ""
FindingUniqueId: str = ""
Provider: str
Profile: Optional[str]
AccountId: Optional[str]
OrganizationsInfo: Optional[dict]
Region: Optional[str]
Profile: str = ""
AccountId: str = ""
OrganizationsInfo: Optional[AWS_Organizations_Info]
Region: str = ""
CheckID: str
CheckName: str
CheckTitle: str
CheckType: str
ServiceName: str
SubServiceName: str
Status: Optional[str]
StatusExtended: Optional[str]
Status: str = ""
StatusExtended: str = ""
Severity: str
ResourceId: Optional[str]
ResourceArn: Optional[str]
ResourceId: str = ""
ResourceArn: str = ""
ResourceType: str
ResourceDetails: Optional[str]
ResourceDetails: str = ""
Tags: dict
Description: str
Risk: str
RelatedUrl: str
Remediation: dict
Categories: list
DependsOn: list
RelatedTo: list
Remediation: Remediation
Categories: List[str]
DependsOn: List[str]
RelatedTo: List[str]
Notes: str
Compliance: list
Compliance: List[ComplianceItem]
# JSON ASFF Output
class ProductFields(BaseModel):
ProviderName: str = "Prowler"
ProviderVersion: str
ProwlerResourceName: str
class Severity(BaseModel):
Label: str
class Resource(BaseModel):
Type: str
Id: str
Partition: str
Region: str
class Compliance(BaseModel):
Status: str
RelatedRequirements: List[str]
class Check_Output_JSON_ASFF(BaseModel):
SchemaVersion: str = "2018-10-08"
Id: str = ""
ProductArn: str = ""
RecordState: str = "ACTIVE"
ProductFields: ProductFields = None
GeneratorId: str = ""
AwsAccountId: str = ""
Types: List[str] = None
FirstObservedAt: str = ""
UpdatedAt: str = ""
CreatedAt: str = ""
Severity: Severity = None
Title: str = ""
Description: str = ""
Resources: List[Resource] = None
Compliance: Compliance = None
@dataclass
@@ -133,27 +175,27 @@ class Check_Output_CSV:
self.description = report.check_metadata.Description
self.risk = report.check_metadata.Risk
self.related_url = report.check_metadata.RelatedUrl
self.remediation_recommendation_text = report.check_metadata.Remediation[
"Recommendation"
]["Text"]
self.remediation_recommendation_url = report.check_metadata.Remediation[
"Recommendation"
]["Url"]
self.remediation_recommendation_text = (
report.check_metadata.Remediation.Recommendation.Text
)
self.remediation_recommendation_url = (
report.check_metadata.Remediation.Recommendation.Url
)
self.remediation_recommendation_code_nativeiac = (
report.check_metadata.Remediation["Code"]["NativeIaC"]
report.check_metadata.Remediation.Code.NativeIaC
)
self.remediation_recommendation_code_terraform = (
report.check_metadata.Remediation["Code"]["Terraform"]
report.check_metadata.Remediation.Code.Terraform
)
self.remediation_recommendation_code_cli = (
report.check_metadata.Remediation.Code.cli
)
self.remediation_recommendation_code_cli = (
report.check_metadata.Remediation.Code.cli
)
self.remediation_recommendation_code_other = (
report.check_metadata.Remediation.Code.other
)
self.remediation_recommendation_code_cli = report.check_metadata.Remediation[
"Code"
]["cli"]
self.remediation_recommendation_code_cli = report.check_metadata.Remediation[
"Code"
]["cli"]
self.remediation_recommendation_code_other = report.check_metadata.Remediation[
"Code"
]["other"]
self.categories = self.__unroll_list__(report.check_metadata.Categories)
self.depends_on = self.__unroll_list__(report.check_metadata.DependsOn)
self.related_to = self.__unroll_list__(report.check_metadata.RelatedTo)
@@ -188,10 +230,10 @@ class Check_Output_CSV:
# fill list of dataclasses
for item in compliance:
compliance_framework = Compliance_Framework(
Framework=item["Framework"],
Version=item["Version"],
Group=item["Group"],
Control=item["Control"],
Framework=item.Framework,
Version=item.Version,
Group=item.Group,
Control=item.Control,
)
compliance_frameworks.append(compliance_framework)
# iterate over list of dataclasses to output info

View File

@@ -4,8 +4,22 @@ from csv import DictWriter
from colorama import Fore, Style
from config.config import csv_file_suffix, json_file_suffix, timestamp
from lib.outputs.models import Check_Output_CSV, Check_Output_JSON
from config.config import (
csv_file_suffix,
json_asff_file_suffix,
json_file_suffix,
prowler_version,
timestamp,
)
from lib.outputs.models import (
Check_Output_CSV,
Check_Output_JSON,
Check_Output_JSON_ASFF,
Compliance,
ProductFields,
Resource,
Severity,
)
from lib.utils.utils import file_exists, open_file
@@ -60,6 +74,15 @@ def report(check_findings, output_options, audit_info):
json.dump(finding_output.dict(), file_descriptors["json"], indent=4)
file_descriptors["json"].write(",")
if "json-asff" in file_descriptors:
finding_output = Check_Output_JSON_ASFF()
fill_json_asff(finding_output, audit_info, finding)
json.dump(
finding_output.dict(), file_descriptors["json-asff"], indent=4
)
file_descriptors["json-asff"].write(",")
if file_descriptors:
# Close all file descriptors
for file_descriptor in file_descriptors:
@@ -107,6 +130,22 @@ def fill_file_descriptors(output_modes, audited_account, output_directory, csv_f
file_descriptors.update({output_mode: file_descriptor})
if output_mode == "json-asff":
filename = f"{output_directory}/prowler-output-{audited_account}-{json_asff_file_suffix}"
if file_exists(filename):
file_descriptor = open_file(
filename,
"a",
)
else:
file_descriptor = open_file(
filename,
"a",
)
file_descriptor.write("[")
file_descriptors.update({output_mode: file_descriptor})
return file_descriptors
@@ -149,12 +188,46 @@ def fill_json(finding_output, audit_info, finding):
return finding_output
def close_json(output_directory, audited_account):
filename = f"{output_directory}/prowler-output-{audited_account}-{json_file_suffix}"
def fill_json_asff(finding_output, audit_info, finding):
finding_output.Id = f"prowler-{finding.check_metadata.CheckID}-{audit_info.audited_account}-{finding.region}-{str(hash(finding.resource_id))}"
finding_output.ProductArn = f"arn:{audit_info.audited_partition}:securityhub:{finding.region}::product/prowler/prowler"
finding_output.ProductFields = ProductFields(
ProviderVersion=prowler_version, ProwlerResourceName=finding.resource_id
)
finding_output.GeneratorId = "prowler-" + finding.check_metadata.CheckID
finding_output.AwsAccountId = audit_info.audited_account
finding_output.Types = finding.check_metadata.CheckType
finding_output.FirstObservedAt = (
finding_output.UpdatedAt
) = finding_output.CreatedAt = timestamp.isoformat()
finding_output.Severity = Severity(Label=finding.check_metadata.Severity)
finding_output.Title = finding.check_metadata.CheckTitle
finding_output.Description = finding.check_metadata.Description
finding_output.Resources = [
Resource(
Id=finding.resource_id,
Type=finding.check_metadata.ResourceType,
Partition=audit_info.audited_partition,
Region=finding.region,
)
]
finding_output.Compliance = Compliance(
Status=finding.status, RelatedRequirements=[finding.check_metadata.CheckType]
)
return finding_output
def close_json(output_directory, audited_account, mode):
suffix = json_file_suffix
if mode == "json-asff":
suffix = json_asff_file_suffix
filename = f"{output_directory}/prowler-output-{audited_account}-{suffix}"
file_descriptor = open_file(
filename,
"a",
)
# Replace last comma for square bracket
file_descriptor.seek(file_descriptor.tell() - 1, os.SEEK_SET)
file_descriptor.truncate()
file_descriptor.write("]")

View File

@@ -46,10 +46,10 @@
"RelatedUrl": "https://serviceofficialsiteorpageforthissubject",
"Remediation": {
"Code": {
"CLI": "cli command or URL to the cli command location.",
"NativeIaC": "code or URL to the code location.",
"Terraform": "code or URL to the code location.",
"cli": "cli command or URL to the cli command location.",
"other": "cli command or URL to the cli command location."
"Other": "cli command or URL to the cli command location.",
"Terraform": "code or URL to the code location."
},
"Recommendation": {
"Text": "Run sudo yum update and cross your fingers and toes.",

View File

@@ -35,10 +35,10 @@
"RelatedUrl": "https://serviceofficialsiteorpageforthissubject",
"Remediation": {
"Code": {
"CLI": "cli command or URL to the cli command location.",
"NativeIaC": "code or URL to the code location.",
"Terraform": "code or URL to the code location.",
"cli": "cli command or URL to the cli command location.",
"other": "cli command or URL to the cli command location."
"Other": "cli command or URL to the cli command location.",
"Terraform": "code or URL to the code location."
},
"Recommendation": {
"Text": "Run sudo yum update and cross your fingers and toes.",

View File

@@ -35,10 +35,10 @@
"RelatedUrl": "https://serviceofficialsiteorpageforthissubject",
"Remediation": {
"Code": {
"CLI": "cli command or URL to the cli command location.",
"NativeIaC": "code or URL to the code location.",
"Terraform": "code or URL to the code location.",
"cli": "cli command or URL to the cli command location.",
"other": "cli command or URL to the cli command location."
"Other": "cli command or URL to the cli command location.",
"Terraform": "code or URL to the code location."
},
"Recommendation": {
"Text": "Run sudo yum update and cross your fingers and toes.",

View File

@@ -125,7 +125,7 @@ if __name__ == "__main__":
"--output-modes",
nargs="+",
help="Output mode, by default csv",
choices=["csv", "json"],
choices=["csv", "json", "json-asff"],
)
parser.add_argument(
"-o",
@@ -272,5 +272,7 @@ if __name__ == "__main__":
)
# Close json file if exists
if "json" in output_modes:
close_json(output_directory, audit_info.audited_account)
if output_modes:
for mode in output_modes:
if mode == "json" or mode == "json-asff":
close_json(output_directory, audit_info.audited_account, mode)