feat(compliance): add compliance field to HTML, CSV and JSON outputs including frameworks and reqs (#2060)

Co-authored-by: Pepe Fagoaga <pepe@verica.io>
This commit is contained in:
Sergio Garcia
2023-03-14 14:20:46 +01:00
committed by GitHub
parent 43c0540de7
commit 738fc9acad
8 changed files with 442 additions and 108 deletions

54
poetry.lock generated
View File

@@ -1,4 +1,4 @@
# This file is automatically @generated by Poetry and should not be changed by hand.
# This file is automatically @generated by Poetry 1.4.0 and should not be changed by hand.
[[package]]
name = "about-time"
@@ -754,14 +754,14 @@ pipenv = ["pipenv"]
[[package]]
name = "exceptiongroup"
version = "1.1.0"
version = "1.1.1"
description = "Backport of PEP 654 (exception groups)"
category = "dev"
optional = false
python-versions = ">=3.7"
files = [
{file = "exceptiongroup-1.1.0-py3-none-any.whl", hash = "sha256:327cbda3da756e2de031a3107b81ab7b3770a602c4d16ca618298c526f4bec1e"},
{file = "exceptiongroup-1.1.0.tar.gz", hash = "sha256:bcb67d800a4497e1b404c2dd44fca47d3b7a5e5433dbab67f96c1a685cdfdf23"},
{file = "exceptiongroup-1.1.1-py3-none-any.whl", hash = "sha256:232c37c63e4f682982c8b6459f33a8981039e5fb8756b2074364e5055c498c9e"},
{file = "exceptiongroup-1.1.1.tar.gz", hash = "sha256:d484c3090ba2889ae2928419117447a14daf3c1231d5e30d0aae34f354f01785"},
]
[package.extras]
@@ -1526,14 +1526,14 @@ files = [
[[package]]
name = "platformdirs"
version = "3.1.0"
version = "3.1.1"
description = "A small Python package for determining appropriate platform-specific dirs, e.g. a \"user data dir\"."
category = "dev"
optional = false
python-versions = ">=3.7"
files = [
{file = "platformdirs-3.1.0-py3-none-any.whl", hash = "sha256:13b08a53ed71021350c9e300d4ea8668438fb0046ab3937ac9a29913a1a1350a"},
{file = "platformdirs-3.1.0.tar.gz", hash = "sha256:accc3665857288317f32c7bebb5a8e482ba717b474f3fc1d18ca7f9214be0cef"},
{file = "platformdirs-3.1.1-py3-none-any.whl", hash = "sha256:e5986afb596e4bb5bde29a79ac9061aa955b94fca2399b7aaac4090860920dd8"},
{file = "platformdirs-3.1.1.tar.gz", hash = "sha256:024996549ee88ec1a9aa99ff7f8fc819bb59e2c3477b410d90a16d32d6e707aa"},
]
[package.extras]
@@ -2089,24 +2089,24 @@ rsa = ["oauthlib[signedtoken] (>=3.0.0)"]
[[package]]
name = "responses"
version = "0.22.0"
version = "0.23.1"
description = "A utility library for mocking out the `requests` Python library."
category = "dev"
optional = false
python-versions = ">=3.7"
files = [
{file = "responses-0.22.0-py3-none-any.whl", hash = "sha256:dcf294d204d14c436fddcc74caefdbc5764795a40ff4e6a7740ed8ddbf3294be"},
{file = "responses-0.22.0.tar.gz", hash = "sha256:396acb2a13d25297789a5866b4881cf4e46ffd49cc26c43ab1117f40b973102e"},
{file = "responses-0.23.1-py3-none-any.whl", hash = "sha256:8a3a5915713483bf353b6f4079ba8b2a29029d1d1090a503c70b0dc5d9d0c7bd"},
{file = "responses-0.23.1.tar.gz", hash = "sha256:c4d9aa9fc888188f0c673eff79a8dadbe2e75b7fe879dc80a221a06e0a68138f"},
]
[package.dependencies]
pyyaml = "*"
requests = ">=2.22.0,<3.0"
toml = "*"
types-toml = "*"
types-PyYAML = "*"
urllib3 = ">=1.25.10"
[package.extras]
tests = ["coverage (>=6.0.0)", "flake8", "mypy", "pytest (>=7.0.0)", "pytest-asyncio", "pytest-cov", "pytest-httpserver", "types-requests"]
tests = ["coverage (>=6.0.0)", "flake8", "mypy", "pytest (>=7.0.0)", "pytest-asyncio", "pytest-cov", "pytest-httpserver", "tomli", "tomli-w", "types-requests"]
[[package]]
name = "rfc3339-validator"
@@ -2266,14 +2266,14 @@ contextlib2 = ">=0.5.5"
[[package]]
name = "setuptools"
version = "67.5.1"
version = "67.6.0"
description = "Easily download, build, install, upgrade, and uninstall Python packages"
category = "dev"
optional = false
python-versions = ">=3.7"
files = [
{file = "setuptools-67.5.1-py3-none-any.whl", hash = "sha256:1c39d42bda4cb89f7fdcad52b6762e3c309ec8f8715b27c684176b7d71283242"},
{file = "setuptools-67.5.1.tar.gz", hash = "sha256:15136a251127da2d2e77ac7a1bc231eb504654f7e3346d93613a13f2e2787535"},
{file = "setuptools-67.6.0-py3-none-any.whl", hash = "sha256:b78aaa36f6b90a074c1fa651168723acbf45d14cb1196b6f02c0fd07f17623b2"},
{file = "setuptools-67.6.0.tar.gz", hash = "sha256:2ee892cd5f29f3373097f5a814697e397cf3ce313616df0af11231e2ad118077"},
]
[package.extras]
@@ -2405,15 +2405,15 @@ files = [
]
[[package]]
name = "types-toml"
version = "0.10.8.5"
description = "Typing stubs for toml"
name = "types-pyyaml"
version = "6.0.12.8"
description = "Typing stubs for PyYAML"
category = "dev"
optional = false
python-versions = "*"
files = [
{file = "types-toml-0.10.8.5.tar.gz", hash = "sha256:bf80fce7d2d74be91148f47b88d9ae5adeb1024abef22aa2fdbabc036d6b8b3c"},
{file = "types_toml-0.10.8.5-py3-none-any.whl", hash = "sha256:2432017febe43174af0f3c65f03116e3d3cf43e7e1406b8200e106da8cf98992"},
{file = "types-PyYAML-6.0.12.8.tar.gz", hash = "sha256:19304869a89d49af00be681e7b267414df213f4eb89634c4495fa62e8f942b9f"},
{file = "types_PyYAML-6.0.12.8-py3-none-any.whl", hash = "sha256:5314a4b2580999b2ea06b2e5f9a7763d860d6e09cdf21c0e9561daa9cbd60178"},
]
[[package]]
@@ -2430,14 +2430,14 @@ files = [
[[package]]
name = "urllib3"
version = "1.26.14"
version = "1.26.15"
description = "HTTP library with thread-safe connection pooling, file post, and more."
category = "main"
optional = false
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*, !=3.5.*"
files = [
{file = "urllib3-1.26.14-py2.py3-none-any.whl", hash = "sha256:75edcdc2f7d85b137124a6c3c9fc3933cdeaa12ecb9a6a959f22797a0feca7e1"},
{file = "urllib3-1.26.14.tar.gz", hash = "sha256:076907bf8fd355cde77728471316625a4d2f7e713c125f51953bb5b3eecf4f72"},
{file = "urllib3-1.26.15-py2.py3-none-any.whl", hash = "sha256:aa751d169e23c7479ce47a0cb0da579e3ede798f994f5816a74e4f4500dcea42"},
{file = "urllib3-1.26.15.tar.gz", hash = "sha256:8a388717b9476f934a21484e8c8e61875ab60644d29b9b39e11e4b9dc1c6b305"},
]
[package.extras]
@@ -2623,14 +2623,14 @@ files = [
[[package]]
name = "xlsxwriter"
version = "3.0.8"
version = "3.0.9"
description = "A Python module for creating Excel XLSX files."
category = "main"
optional = false
python-versions = ">=3.6"
files = [
{file = "XlsxWriter-3.0.8-py3-none-any.whl", hash = "sha256:f5c7491b8450cf49968428f062355de16c9140aa24eafc466c9dfe107610bd44"},
{file = "XlsxWriter-3.0.8.tar.gz", hash = "sha256:ec77335fb118c36bc5ed1c89e33904d649e4989df2d7980f7d6a9dd95ee5874e"},
{file = "XlsxWriter-3.0.9-py3-none-any.whl", hash = "sha256:5eaaf3c6f791cba1dd1c3065147c35982180f693436093aabe5b7d6c16148e95"},
{file = "XlsxWriter-3.0.9.tar.gz", hash = "sha256:7216d39a2075afac7a28cad81f6ac31b0b16d8976bf1b775577d157346f891dd"},
]
[[package]]

View File

@@ -80,26 +80,19 @@ def prowler():
# Load compliance frameworks
logger.debug("Loading compliance frameworks from .json files")
# Load the compliance framework if specified with --compliance
# If some compliance argument is specified we have to load it
if (
args.list_compliance
or args.list_compliance_requirements
or compliance_framework
):
bulk_compliance_frameworks = bulk_load_compliance_frameworks(provider)
# Complete checks metadata with the compliance framework specification
update_checks_metadata_with_compliance(
bulk_compliance_frameworks, bulk_checks_metadata
bulk_compliance_frameworks = bulk_load_compliance_frameworks(provider)
# Complete checks metadata with the compliance framework specification
update_checks_metadata_with_compliance(
bulk_compliance_frameworks, bulk_checks_metadata
)
if args.list_compliance:
print_compliance_frameworks(bulk_compliance_frameworks)
sys.exit()
if args.list_compliance_requirements:
print_compliance_requirements(
bulk_compliance_frameworks, args.list_compliance_requirements
)
if args.list_compliance:
print_compliance_frameworks(bulk_compliance_frameworks)
sys.exit()
if args.list_compliance_requirements:
print_compliance_requirements(
bulk_compliance_frameworks, args.list_compliance_requirements
)
sys.exit()
sys.exit()
# Load checks to execute
checks_to_execute = load_checks_to_execute(

View File

@@ -1,10 +1,12 @@
import sys
from pydantic import parse_obj_as
from prowler.lib.check.compliance_models import (
Compliance_Base_Model,
Compliance_Requirement,
)
from prowler.lib.check.models import Check_Report_AWS
from prowler.lib.check.models import Check_Metadata_Model
from prowler.lib.logger import logger
@@ -62,44 +64,33 @@ def update_checks_metadata_with_compliance(
# Include the compliance framework for the check
check_compliance.append(compliance)
# Create metadata for Manual Control
manual_check_metadata = """{
"Provider" : "aws",
"CheckID" : "manual_check",
"CheckTitle" : "Manual Check",
"CheckType" : [],
"ServiceName" : "",
"SubServiceName" : "",
"ResourceIdTemplate" : "",
"Severity" : "",
"ResourceType" : "",
"Description" : "",
"Risk" : "",
"RelatedUrl" : "",
manual_check_metadata = {
"Provider": "aws",
"CheckID": "manual_check",
"CheckTitle": "Manual Check",
"CheckType": [],
"ServiceName": "",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "",
"ResourceType": "",
"Description": "",
"Risk": "",
"RelatedUrl": "",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "",
"Url": ""
}
"Code": {"CLI": "", "NativeIaC": "", "Other": "", "Terraform": ""},
"Recommendation": {"Text": "", "Url": ""},
},
"Categories" : [],
"Tags" : {},
"DependsOn" : [],
"RelatedTo" : [],
"Notes" : ""
}"""
manual_check = Check_Report_AWS(manual_check_metadata)
manual_check.status = "INFO"
manual_check.status_extended = "Manual check"
manual_check.resource_id = "manual_check"
manual_check.Compliance = check_compliance
"Categories": [],
"Tags": {},
"DependsOn": [],
"RelatedTo": [],
"Notes": "",
}
manual_check = parse_obj_as(Check_Metadata_Model, manual_check_metadata)
# Save it into the check's metadata
bulk_checks_metadata["manual_check"] = manual_check
bulk_checks_metadata["manual_check"].Compliance = check_compliance
return bulk_checks_metadata
except Exception as e:

View File

@@ -5,6 +5,7 @@ from colorama import Fore, Style
from tabulate import tabulate
from prowler.config.config import orange_color, timestamp
from prowler.lib.check.models import Check_Report
from prowler.lib.logger import logger
from prowler.lib.outputs.models import (
Check_Output_CSV_CIS,
@@ -18,7 +19,13 @@ def add_manual_controls(output_options, audit_info, file_descriptors):
try:
# Check if MANUAL control was already added to output
if "manual_check" in output_options.bulk_checks_metadata:
manual_finding = output_options.bulk_checks_metadata["manual_check"]
manual_finding = Check_Report(
output_options.bulk_checks_metadata["manual_check"].json()
)
manual_finding.status = "INFO"
manual_finding.status_extended = "Manual check"
manual_finding.resource_id = "manual_check"
manual_finding.region = ""
fill_compliance(
output_options, manual_finding, audit_info, file_descriptors
)

View File

@@ -9,6 +9,12 @@ from prowler.config.config import (
timestamp,
)
from prowler.lib.logger import logger
from prowler.lib.outputs.models import (
get_check_compliance,
parse_html_string,
unroll_dict,
unroll_tags,
)
from prowler.lib.utils.utils import open_file
@@ -183,17 +189,16 @@ def add_html_header(file_descriptor, audit_info):
<tr>
<th scope="col">Status</th>
<th scope="col">Severity</th>
<th style="width:5%" scope="col">Service Name</th>
<th scope="col">Service Name</th>
<th scope="col">Region</th>
<th style="width:20%" scope="col">Check ID</th>
<th style="width:20%" scope="col">Check Title</th>
<th scope="col">Resource ID</th>
<th scope="col">Resource Tags</th>
<th style="width:15%" scope="col">Check Description</th>
<th scope="col">Check ID</th>
<th scope="col">Status Extended</th>
<th scope="col">Risk</th>
<th scope="col">Recomendation</th>
<th style="width:5%" scope="col">Recomendation URL</th>
<th scope="col">Compliance</th>
</tr>
</thead>
<tbody>
@@ -205,7 +210,7 @@ def add_html_header(file_descriptor, audit_info):
)
def fill_html(file_descriptor, finding):
def fill_html(file_descriptor, finding, output_options):
row_class = "p-3 mb-2 bg-success-custom"
if finding.status == "INFO":
row_class = "table-info"
@@ -220,15 +225,14 @@ def fill_html(file_descriptor, finding):
<td>{finding.check_metadata.Severity}</td>
<td>{finding.check_metadata.ServiceName}</td>
<td>{finding.region}</td>
<td>{finding.check_metadata.CheckID.replace("_", "<wbr>_")}</td>
<td>{finding.check_metadata.CheckTitle}</td>
<td>{finding.resource_id.replace("<", "&lt;").replace(">", "&gt;").replace("_", "<wbr>_")}</td>
<td>{str(finding.resource_tags)}</td>
<td>{finding.check_metadata.Description}</td>
<td>{finding.check_metadata.CheckID.replace("_", "<wbr>_")}</td>
<td>{parse_html_string(unroll_tags(finding.resource_tags))}</td>
<td>{finding.status_extended.replace("<", "&lt;").replace(">", "&gt;").replace("_", "<wbr>_")}</td>
<td><p class="show-read-more">{finding.check_metadata.Risk}</p></td>
<td><p class="show-read-more">{finding.check_metadata.Remediation.Recommendation.Text}</p></td>
<td><a class="read-more" href="{finding.check_metadata.Remediation.Recommendation.Url}"><i class="fas fa-external-link-alt"></i></a></td>
<td><p class="show-read-more">{finding.check_metadata.Remediation.Recommendation.Text}</p> <a class="read-more" href="{finding.check_metadata.Remediation.Recommendation.Url}"><i class="fas fa-external-link-alt"></i></a></td>
<td><p class="show-read-more">{parse_html_string(unroll_dict(get_check_compliance(finding, finding.check_metadata.Provider, output_options)))}</p></td>
</tr>
"""
)

View File

@@ -11,7 +11,26 @@ from prowler.lib.logger import logger
from prowler.providers.aws.lib.audit_info.models import AWS_Organizations_Info
def generate_provider_output_csv(provider: str, finding, audit_info, mode: str, fd):
def get_check_compliance(finding, provider, output_options):
check_compliance = {}
# We have to retrieve all the check's compliance requirements
for compliance in output_options.bulk_checks_metadata[
finding.check_metadata.CheckID
].Compliance:
compliance_fw = compliance.Framework
if compliance.Version:
compliance_fw = f"{compliance_fw}-{compliance.Version}"
if compliance.Provider == provider.upper():
if compliance_fw not in check_compliance:
check_compliance[compliance_fw] = []
for requirement in compliance.Requirements:
check_compliance[compliance_fw].append(requirement.Id)
return check_compliance
def generate_provider_output_csv(
provider: str, finding, audit_info, mode: str, fd, output_options
):
"""
set_provider_output_options configures automatically the outputs based on the selected provider and returns the Provider_Output_Options object.
"""
@@ -32,6 +51,9 @@ def generate_provider_output_csv(provider: str, finding, audit_info, mode: str,
data[
"finding_unique_id"
] = f"prowler-{provider}-{finding.check_metadata.CheckID}-{finding.subscription}-{finding.resource_id}"
data["compliance"] = unroll_dict(
get_check_compliance(finding, provider, output_options)
)
finding_output = output_model(**data)
if provider == "aws":
@@ -43,6 +65,9 @@ def generate_provider_output_csv(provider: str, finding, audit_info, mode: str,
data[
"finding_unique_id"
] = f"prowler-{provider}-{finding.check_metadata.CheckID}-{audit_info.audited_account}-{finding.region}-{finding.resource_id}"
data["compliance"] = unroll_dict(
get_check_compliance(finding, provider, output_options)
)
finding_output = output_model(**data)
if audit_info.organizations_metadata:
@@ -91,7 +116,7 @@ def fill_common_data_csv(finding: dict) -> dict:
"severity": finding.check_metadata.Severity,
"resource_type": finding.check_metadata.ResourceType,
"resource_details": finding.resource_details,
"resource_tags": finding.resource_tags,
"resource_tags": unroll_tags(finding.resource_tags),
"description": finding.check_metadata.Description,
"risk": finding.check_metadata.Risk,
"related_url": finding.check_metadata.RelatedUrl,
@@ -113,26 +138,99 @@ def fill_common_data_csv(finding: dict) -> dict:
"remediation_recommendation_code_other": (
finding.check_metadata.Remediation.Code.Other
),
"categories": __unroll_list__(finding.check_metadata.Categories),
"depends_on": __unroll_list__(finding.check_metadata.DependsOn),
"related_to": __unroll_list__(finding.check_metadata.RelatedTo),
"categories": unroll_list(finding.check_metadata.Categories),
"depends_on": unroll_list(finding.check_metadata.DependsOn),
"related_to": unroll_list(finding.check_metadata.RelatedTo),
"notes": finding.check_metadata.Notes,
}
return data
def __unroll_list__(listed_items: list):
def unroll_list(listed_items: list):
unrolled_items = ""
separator = "|"
for item in listed_items:
if not unrolled_items:
unrolled_items = f"{item}"
else:
unrolled_items = f"{unrolled_items}{separator}{item}"
if listed_items:
for item in listed_items:
if not unrolled_items:
unrolled_items = f"{item}"
else:
unrolled_items = f"{unrolled_items} {separator} {item}"
return unrolled_items
def unroll_tags(tags: list):
unrolled_items = ""
separator = "|"
if tags:
for item in tags:
# Check if there are tags in list
if type(item) == dict:
for key, value in item.items():
if not unrolled_items:
# Check the pattern of tags (Key:Value or Key:key/Value:value)
if "Key" != key and "Value" != key:
unrolled_items = f"{key}={value}"
else:
if "Key" == key:
unrolled_items = f"{value}="
else:
unrolled_items = f"{value}"
else:
if "Key" != key and "Value" != key:
unrolled_items = (
f"{unrolled_items} {separator} {key}={value}"
)
else:
if "Key" == key:
unrolled_items = (
f"{unrolled_items} {separator} {value}="
)
else:
unrolled_items = f"{unrolled_items}{value}"
elif not unrolled_items:
unrolled_items = f"{item}"
else:
unrolled_items = f"{unrolled_items} {separator} {item}"
return unrolled_items
def unroll_dict(dict: dict):
unrolled_items = ""
separator = "|"
for key, value in dict.items():
if type(value) == list:
value = ", ".join(value)
if not unrolled_items:
unrolled_items = f"{key}: {value}"
else:
unrolled_items = f"{unrolled_items} {separator} {key}: {value}"
return unrolled_items
def parse_html_string(str: str):
string = ""
for elem in str.split(" | "):
if elem:
string += f"\n&#x2022;{elem}\n"
return string
def parse_json_tags(tags: list):
dict_tags = {}
if tags and tags != [{}] and tags != [None]:
for tag in tags:
if "Key" in tag and "Value" in tag:
dict_tags[tag["Key"]] = tag["Value"]
else:
dict_tags.update(tag)
return dict_tags
def generate_csv_fields(format: Any) -> list[str]:
"""Generates the CSV headers for the given class"""
csv_fields = []
@@ -162,7 +260,7 @@ class Check_Output_CSV(BaseModel):
severity: str
resource_type: str
resource_details: str
resource_tags: Optional[list]
resource_tags: str
description: str
risk: str
related_url: str
@@ -172,6 +270,7 @@ class Check_Output_CSV(BaseModel):
remediation_recommendation_code_terraform: str
remediation_recommendation_code_cli: str
remediation_recommendation_code_other: str
compliance: str
categories: str
depends_on: str
related_to: str
@@ -206,7 +305,9 @@ class Azure_Check_Output_CSV(Check_Output_CSV):
resource_name: str = ""
def generate_provider_output_json(provider: str, finding, audit_info, mode: str, fd):
def generate_provider_output_json(
provider: str, finding, audit_info, mode: str, output_options
):
"""
generate_provider_output_json configures automatically the outputs based on the selected provider and returns the Check_Output_JSON object.
"""
@@ -228,6 +329,9 @@ def generate_provider_output_json(provider: str, finding, audit_info, mode: str,
finding_output.ResourceId = finding.resource_id
finding_output.ResourceName = finding.resource_name
finding_output.FindingUniqueId = f"prowler-{provider}-{finding.check_metadata.CheckID}-{finding.subscription}-{finding.resource_id}"
finding_output.Compliance = get_check_compliance(
finding, provider, output_options
)
if provider == "aws":
finding_output.Profile = audit_info.profile
@@ -235,8 +339,11 @@ def generate_provider_output_json(provider: str, finding, audit_info, mode: str,
finding_output.Region = finding.region
finding_output.ResourceId = finding.resource_id
finding_output.ResourceArn = finding.resource_arn
finding_output.ResourceTags = finding.resource_tags
finding_output.ResourceTags = parse_json_tags(finding.resource_tags)
finding_output.FindingUniqueId = f"prowler-{provider}-{finding.check_metadata.CheckID}-{audit_info.audited_account}-{finding.region}-{finding.resource_id}"
finding_output.Compliance = get_check_compliance(
finding, provider, output_options
)
if audit_info.organizations_metadata:
finding_output.OrganizationsInfo = (
@@ -276,6 +383,7 @@ class Check_Output_JSON(BaseModel):
Risk: str
RelatedUrl: str
Remediation: Remediation
Compliance: Optional[dict]
Categories: List[str]
DependsOn: List[str]
RelatedTo: List[str]

View File

@@ -101,7 +101,9 @@ def report(check_findings, output_options, audit_info):
)
if "html" in file_descriptors:
fill_html(file_descriptors["html"], finding)
fill_html(
file_descriptors["html"], finding, output_options
)
file_descriptors["html"].write("")
if "json-asff" in file_descriptors:
@@ -136,6 +138,7 @@ def report(check_findings, output_options, audit_info):
audit_info,
"csv",
file_descriptors["csv"],
output_options,
)
csv_writer.writerow(finding_output.__dict__)
@@ -145,7 +148,7 @@ def report(check_findings, output_options, audit_info):
finding,
audit_info,
"json",
file_descriptors["json"],
output_options,
)
json.dump(
finding_output.dict(),

View File

@@ -17,6 +17,11 @@ from prowler.config.config import (
prowler_version,
timestamp_utc,
)
from prowler.lib.check.compliance_models import (
CIS_Requirements,
Compliance_Base_Model,
Compliance_Requirement,
)
from prowler.lib.check.models import Check_Report, load_check_metadata
from prowler.lib.outputs.file_descriptors import fill_file_descriptors
from prowler.lib.outputs.json import fill_json_asff
@@ -28,6 +33,12 @@ from prowler.lib.outputs.models import (
Resource,
Severity,
generate_csv_fields,
get_check_compliance,
parse_html_string,
parse_json_tags,
unroll_dict,
unroll_list,
unroll_tags,
)
from prowler.lib.outputs.outputs import (
extract_findings_statistics,
@@ -193,6 +204,7 @@ class Test_Outputs:
"remediation_recommendation_code_terraform",
"remediation_recommendation_code_cli",
"remediation_recommendation_code_other",
"compliance",
"categories",
"depends_on",
"related_to",
@@ -201,6 +213,142 @@ class Test_Outputs:
assert generate_csv_fields(Check_Output_CSV) == expected
def test_unroll_list(self):
list = ["test", "test1", "test2"]
assert unroll_list(list) == "test | test1 | test2"
def test_unroll_tags(self):
dict_list = [
{"Key": "name", "Value": "test"},
{"Key": "project", "Value": "prowler"},
{"Key": "environment", "Value": "dev"},
{"Key": "terraform", "Value": "true"},
]
unique_dict_list = [
{
"test1": "value1",
"test2": "value2",
"test3": "value3",
}
]
assert (
unroll_tags(dict_list)
== "name=test | project=prowler | environment=dev | terraform=true"
)
assert (
unroll_tags(unique_dict_list)
== "test1=value1 | test2=value2 | test3=value3"
)
def test_unroll_dict(self):
test_compliance_dict = {
"CISA": ["your-systems-3", "your-data-1", "your-data-2"],
"CIS-1.4": ["2.1.1"],
"CIS-1.5": ["2.1.1"],
"GDPR": ["article_32"],
"AWS-Foundational-Security-Best-Practices": ["s3"],
"HIPAA": [
"164_308_a_1_ii_b",
"164_308_a_4_ii_a",
"164_312_a_2_iv",
"164_312_c_1",
"164_312_c_2",
"164_312_e_2_ii",
],
"GxP-21-CFR-Part-11": ["11.10-c", "11.30"],
"GxP-EU-Annex-11": ["7.1-data-storage-damage-protection"],
"NIST-800-171-Revision-2": ["3_3_8", "3_5_10", "3_13_11", "3_13_16"],
"NIST-800-53-Revision-4": ["sc_28"],
"NIST-800-53-Revision-5": [
"au_9_3",
"cm_6_a",
"cm_9_b",
"cp_9_d",
"cp_9_8",
"pm_11_b",
"sc_8_3",
"sc_8_4",
"sc_13_a",
"sc_16_1",
"sc_28_1",
"si_19_4",
],
"ENS-RD2022": ["mp.si.2.aws.s3.1"],
"NIST-CSF-1.1": ["ds_1"],
"RBI-Cyber-Security-Framework": ["annex_i_1_3"],
"FFIEC": ["d3-pc-am-b-12"],
"PCI-3.2.1": ["s3"],
"FedRamp-Moderate-Revision-4": ["sc-13", "sc-28"],
"FedRAMP-Low-Revision-4": ["sc-13"],
}
assert (
unroll_dict(test_compliance_dict)
== "CISA: your-systems-3, your-data-1, your-data-2 | CIS-1.4: 2.1.1 | CIS-1.5: 2.1.1 | GDPR: article_32 | AWS-Foundational-Security-Best-Practices: s3 | HIPAA: 164_308_a_1_ii_b, 164_308_a_4_ii_a, 164_312_a_2_iv, 164_312_c_1, 164_312_c_2, 164_312_e_2_ii | GxP-21-CFR-Part-11: 11.10-c, 11.30 | GxP-EU-Annex-11: 7.1-data-storage-damage-protection | NIST-800-171-Revision-2: 3_3_8, 3_5_10, 3_13_11, 3_13_16 | NIST-800-53-Revision-4: sc_28 | NIST-800-53-Revision-5: au_9_3, cm_6_a, cm_9_b, cp_9_d, cp_9_8, pm_11_b, sc_8_3, sc_8_4, sc_13_a, sc_16_1, sc_28_1, si_19_4 | ENS-RD2022: mp.si.2.aws.s3.1 | NIST-CSF-1.1: ds_1 | RBI-Cyber-Security-Framework: annex_i_1_3 | FFIEC: d3-pc-am-b-12 | PCI-3.2.1: s3 | FedRamp-Moderate-Revision-4: sc-13, sc-28 | FedRAMP-Low-Revision-4: sc-13"
)
def test_parse_html_string(self):
string = "CISA: your-systems-3, your-data-1, your-data-2 | CIS-1.4: 2.1.1 | CIS-1.5: 2.1.1 | GDPR: article_32 | AWS-Foundational-Security-Best-Practices: s3 | HIPAA: 164_308_a_1_ii_b, 164_308_a_4_ii_a, 164_312_a_2_iv, 164_312_c_1, 164_312_c_2, 164_312_e_2_ii | GxP-21-CFR-Part-11: 11.10-c, 11.30 | GxP-EU-Annex-11: 7.1-data-storage-damage-protection | NIST-800-171-Revision-2: 3_3_8, 3_5_10, 3_13_11, 3_13_16 | NIST-800-53-Revision-4: sc_28 | NIST-800-53-Revision-5: au_9_3, cm_6_a, cm_9_b, cp_9_d, cp_9_8, pm_11_b, sc_8_3, sc_8_4, sc_13_a, sc_16_1, sc_28_1, si_19_4 | ENS-RD2022: mp.si.2.aws.s3.1 | NIST-CSF-1.1: ds_1 | RBI-Cyber-Security-Framework: annex_i_1_3 | FFIEC: d3-pc-am-b-12 | PCI-3.2.1: s3 | FedRamp-Moderate-Revision-4: sc-13, sc-28 | FedRAMP-Low-Revision-4: sc-13"
assert (
parse_html_string(string)
== """
&#x2022;CISA: your-systems-3, your-data-1, your-data-2
&#x2022;CIS-1.4: 2.1.1
&#x2022;CIS-1.5: 2.1.1
&#x2022;GDPR: article_32
&#x2022;AWS-Foundational-Security-Best-Practices: s3
&#x2022;HIPAA: 164_308_a_1_ii_b, 164_308_a_4_ii_a, 164_312_a_2_iv, 164_312_c_1, 164_312_c_2, 164_312_e_2_ii
&#x2022;GxP-21-CFR-Part-11: 11.10-c, 11.30
&#x2022;GxP-EU-Annex-11: 7.1-data-storage-damage-protection
&#x2022;NIST-800-171-Revision-2: 3_3_8, 3_5_10, 3_13_11, 3_13_16
&#x2022;NIST-800-53-Revision-4: sc_28
&#x2022;NIST-800-53-Revision-5: au_9_3, cm_6_a, cm_9_b, cp_9_d, cp_9_8, pm_11_b, sc_8_3, sc_8_4, sc_13_a, sc_16_1, sc_28_1, si_19_4
&#x2022;ENS-RD2022: mp.si.2.aws.s3.1
&#x2022;NIST-CSF-1.1: ds_1
&#x2022;RBI-Cyber-Security-Framework: annex_i_1_3
&#x2022;FFIEC: d3-pc-am-b-12
&#x2022;PCI-3.2.1: s3
&#x2022;FedRamp-Moderate-Revision-4: sc-13, sc-28
&#x2022;FedRAMP-Low-Revision-4: sc-13
"""
)
def test_parse_json_tags(self):
json_tags = [
{"Key": "name", "Value": "test"},
{"Key": "project", "Value": "prowler"},
{"Key": "environment", "Value": "dev"},
{"Key": "terraform", "Value": "true"},
]
assert parse_json_tags(json_tags) == {
"name": "test",
"project": "prowler",
"environment": "dev",
"terraform": "true",
}
assert parse_json_tags([]) == {}
assert parse_json_tags([None]) == {}
assert parse_json_tags([{}]) == {}
assert parse_json_tags(None) == {}
# def test_fill_json(self):
# input_audit_info = AWS_Audit_Info(
session_config = (None,)
@@ -527,3 +675,83 @@ class Test_Outputs:
)
== 0
)
def test_get_check_compliance(self):
bulk_check_metadata = [
Compliance_Base_Model(
Framework="CIS",
Provider="AWS",
Version="1.4",
Description="The CIS Benchmark for CIS Amazon Web Services Foundations Benchmark, v1.4.0, Level 1 and 2 provides prescriptive guidance for configuring security options for a subset of Amazon Web Services. It has an emphasis on foundational, testable, and architecture agnostic settings",
Requirements=[
Compliance_Requirement(
Checks=[],
Id="2.1.3",
Description="Ensure MFA Delete is enabled on S3 buckets",
Attributes=[
CIS_Requirements(
Section="2.1. Simple Storage Service (S3)",
Profile="Level 1",
AssessmentStatus="Automated",
Description="Once MFA Delete is enabled on your sensitive and classified S3 bucket it requires the user to have two forms of authentication.",
RationaleStatement="Adding MFA delete to an S3 bucket, requires additional authentication when you change the version state of your bucket or you delete and object version adding another layer of security in the event your security credentials are compromised or unauthorized access is granted.",
ImpactStatement="",
RemediationProcedure="Perform the steps below to enable MFA delete on an S3 bucket.\n\nNote:\n-You cannot enable MFA Delete using the AWS Management Console. You must use the AWS CLI or API.\n-You must use your 'root' account to enable MFA Delete on S3 buckets.\n\n**From Command line:**\n\n1. Run the s3api put-bucket-versioning command\n\n```\naws s3api put-bucket-versioning --profile my-root-profile --bucket Bucket_Name --versioning-configuration Status=Enabled,MFADelete=Enabled --mfa “arn:aws:iam::aws_account_id:mfa/root-account-mfa-device passcode”\n```",
AuditProcedure='Perform the steps below to confirm MFA delete is configured on an S3 Bucket\n\n**From Console:**\n\n1. Login to the S3 console at `https://console.aws.amazon.com/s3/`\n\n2. Click the `Check` box next to the Bucket name you want to confirm\n\n3. In the window under `Properties`\n\n4. Confirm that Versioning is `Enabled`\n\n5. Confirm that MFA Delete is `Enabled`\n\n**From Command Line:**\n\n1. Run the `get-bucket-versioning`\n```\naws s3api get-bucket-versioning --bucket my-bucket\n```\n\nOutput example:\n```\n<VersioningConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/"> \n <Status>Enabled</Status>\n <MfaDelete>Enabled</MfaDelete> \n</VersioningConfiguration>\n```\n\nIf the Console or the CLI output does not show Versioning and MFA Delete `enabled` refer to the remediation below.',
AdditionalInformation="",
References="https://docs.aws.amazon.com/AmazonS3/latest/dev/Versioning.html#MultiFactorAuthenticationDelete:https://docs.aws.amazon.com/AmazonS3/latest/dev/UsingMFADelete.html:https://aws.amazon.com/blogs/security/securing-access-to-aws-using-mfa-part-3/:https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_mfa_lost-or-broken.html",
)
],
)
],
),
Compliance_Base_Model(
Framework="CIS",
Provider="AWS",
Version="1.5",
Description="The CIS Amazon Web Services Foundations Benchmark provides prescriptive guidance for configuring security options for a subset of Amazon Web Services with an emphasis on foundational, testable, and architecture agnostic settings.",
Requirements=[
Compliance_Requirement(
Checks=[],
Id="2.1.3",
Description="Ensure MFA Delete is enabled on S3 buckets",
Attributes=[
CIS_Requirements(
Section="2.1. Simple Storage Service (S3)",
Profile="Level 1",
AssessmentStatus="Automated",
Description="Once MFA Delete is enabled on your sensitive and classified S3 bucket it requires the user to have two forms of authentication.",
RationaleStatement="Adding MFA delete to an S3 bucket, requires additional authentication when you change the version state of your bucket or you delete and object version adding another layer of security in the event your security credentials are compromised or unauthorized access is granted.",
ImpactStatement="",
RemediationProcedure="Perform the steps below to enable MFA delete on an S3 bucket.\n\nNote:\n-You cannot enable MFA Delete using the AWS Management Console. You must use the AWS CLI or API.\n-You must use your 'root' account to enable MFA Delete on S3 buckets.\n\n**From Command line:**\n\n1. Run the s3api put-bucket-versioning command\n\n```\naws s3api put-bucket-versioning --profile my-root-profile --bucket Bucket_Name --versioning-configuration Status=Enabled,MFADelete=Enabled --mfa “arn:aws:iam::aws_account_id:mfa/root-account-mfa-device passcode”\n```",
AuditProcedure='Perform the steps below to confirm MFA delete is configured on an S3 Bucket\n\n**From Console:**\n\n1. Login to the S3 console at `https://console.aws.amazon.com/s3/`\n\n2. Click the `Check` box next to the Bucket name you want to confirm\n\n3. In the window under `Properties`\n\n4. Confirm that Versioning is `Enabled`\n\n5. Confirm that MFA Delete is `Enabled`\n\n**From Command Line:**\n\n1. Run the `get-bucket-versioning`\n```\naws s3api get-bucket-versioning --bucket my-bucket\n```\n\nOutput example:\n```\n<VersioningConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/"> \n <Status>Enabled</Status>\n <MfaDelete>Enabled</MfaDelete> \n</VersioningConfiguration>\n```\n\nIf the Console or the CLI output does not show Versioning and MFA Delete `enabled` refer to the remediation below.',
AdditionalInformation="",
References="https://docs.aws.amazon.com/AmazonS3/latest/dev/Versioning.html#MultiFactorAuthenticationDelete:https://docs.aws.amazon.com/AmazonS3/latest/dev/UsingMFADelete.html:https://aws.amazon.com/blogs/security/securing-access-to-aws-using-mfa-part-3/:https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_mfa_lost-or-broken.html",
)
],
)
],
),
]
finding = Check_Report(
load_check_metadata(
f"{path.dirname(path.realpath(__file__))}/fixtures/metadata.json"
).json()
)
finding.resource_details = "Test resource details"
finding.resource_id = "test-resource"
finding.resource_arn = "test-arn"
finding.region = "eu-west-1"
finding.status = "PASS"
finding.status_extended = "This is a test"
output_options = mock.MagicMock()
output_options.bulk_checks_metadata[
"iam_disable_30_days_credentials"
].Compliance = bulk_check_metadata
assert get_check_compliance(finding, "aws", output_options) == {
"CIS-1.4": ["2.1.3"],
"CIS-1.5": ["2.1.3"],
}