feat(compliance): Loader and Execute (#1465)

This commit is contained in:
Pepe Fagoaga
2022-11-23 15:53:53 +01:00
committed by GitHub
parent 1a70a45805
commit b3e57ca3e5
515 changed files with 6018 additions and 5614 deletions

View File

@@ -1,4 +1,7 @@
import functools
import importlib
import os
import sys
from pkgutil import walk_packages
from types import ModuleType
from typing import Any
@@ -6,7 +9,8 @@ from typing import Any
from alive_progress import alive_bar
from colorama import Fore, Style
from config.config import groups_file, orange_color
from config.config import compliance_specification_dir, groups_file, orange_color
from lib.check.compliance_models import load_compliance_framework
from lib.check.models import Check, Output_From_Options, load_check_metadata
from lib.logger import logger
from lib.outputs.outputs import report
@@ -31,6 +35,29 @@ def bulk_load_checks_metadata(provider: str) -> dict:
return bulk_check_metadata
# Bulk load all compliance frameworks specification
def bulk_load_compliance_frameworks(provider: str) -> dict:
"""Bulk load all compliance frameworks specification into a dict"""
bulk_compliance_frameworks = {}
compliance_specification_dir_path = f"{compliance_specification_dir}/{provider}"
try:
for filename in os.listdir(compliance_specification_dir_path):
file_path = os.path.join(compliance_specification_dir_path, filename)
# Check if it is a file
if os.path.isfile(file_path):
# Open Compliance file in JSON
# cis_v1.4_aws.json --> cis_v1.4_aws
compliance_framework_name = filename.split(".json")[0]
# Store the compliance info
bulk_compliance_frameworks[
compliance_framework_name
] = load_compliance_framework(file_path)
except Exception as e:
logger.error(f"{e.__class__.__name__} -- {e}")
return bulk_compliance_frameworks
# Exclude checks to run
def exclude_checks_to_run(checks_to_execute: set, excluded_checks: list) -> set:
for check in excluded_checks:
@@ -101,16 +128,43 @@ def print_services(service_list: set):
print(f"- {service}")
def print_checks(provider: str, check_list: set, bulk_checks_metadata: dict):
def print_compliance_frameworks(bulk_compliance_frameworks: dict):
print(
f"There are {Fore.YELLOW}{len(bulk_compliance_frameworks.keys())}{Style.RESET_ALL} available Compliance Frameworks: \n"
)
for framework in bulk_compliance_frameworks.keys():
print(f"\t- {Fore.YELLOW}{framework}{Style.RESET_ALL}")
def print_compliance_requirements(bulk_compliance_frameworks: dict):
if bulk_compliance_frameworks and "ens_rd2022_aws" in bulk_compliance_frameworks:
print("Listing ENS RD2022 AWS Compliance Requirements:\n")
for compliance in bulk_compliance_frameworks.values():
for requirement in compliance.Requirements:
checks = ""
for check in requirement.Checks:
checks += f" {Fore.YELLOW}\t\t{check}\n{Style.RESET_ALL}"
print(
f"Requirement Id: {Fore.MAGENTA}{requirement.Id}{Style.RESET_ALL}\n\t- Description: {requirement.Description}\n\t- Checks:\n{checks}"
)
def print_checks(
provider: str,
check_list: set,
bulk_checks_metadata: dict,
):
for check in check_list:
try:
print(
f"[{bulk_checks_metadata[check].CheckID}] {bulk_checks_metadata[check].CheckTitle} - {Fore.MAGENTA}{bulk_checks_metadata[check].ServiceName} {Fore.YELLOW}[{bulk_checks_metadata[check].Severity}]{Style.RESET_ALL}"
)
except KeyError as error:
logger.error(
logger.critical(
f"Check {error} was not found for the {provider.upper()} provider"
)
sys.exit()
print(
f"\nThere are {Fore.YELLOW}{len(check_list)}{Style.RESET_ALL} available checks.\n"
)
@@ -150,21 +204,51 @@ def load_checks_to_execute_from_groups(
return checks_to_execute
# Parse checks from compliance frameworks specification
def parse_checks_from_compliance_framework(
compliance_frameworks: list, bulk_compliance_frameworks: dict
) -> list:
"""Parse checks from compliance frameworks specification"""
checks_to_execute = set()
try:
for framework in compliance_frameworks:
# compliance_framework_json["Requirements"][*]["Checks"]
compliance_framework_checks_list = [
requirement.Checks
for requirement in bulk_compliance_frameworks[framework].Requirements
]
# Reduce nested list into a list
# Pythonic functional magic
compliance_framework_checks = functools.reduce(
lambda x, y: x + y, compliance_framework_checks_list
)
# Then union this list of checks with the initial one
checks_to_execute = checks_to_execute.union(compliance_framework_checks)
except Exception as e:
logger.error(f"{e.__class__.__name__}[{e.__traceback__.tb_lineno}] -- {e}")
return checks_to_execute
# Recover all checks from the selected provider and service
def recover_checks_from_provider(provider: str, service: str = None) -> list:
checks = []
modules = list_modules(provider, service)
for module_name in modules:
# Format: "providers.{provider}.services.{service}.{check_name}.{check_name}"
check_name = module_name.name
# We need to exclude common shared libraries in services
if (
check_name.count(".") == 5
and "lib" not in check_name
and "test" not in check_name
):
checks.append(check_name)
return checks
try:
checks = []
modules = list_modules(provider, service)
for module_name in modules:
# Format: "providers.{provider}.services.{service}.{check_name}.{check_name}"
check_name = module_name.name
# We need to exclude common shared libraries in services
if (
check_name.count(".") == 5
and "lib" not in check_name
and "test" not in check_name
):
checks.append(check_name)
return checks
except Exception as e:
logger.critical(f"{e.__class__.__name__}[{e.__traceback__.tb_lineno}]: {e}")
sys.exit()
# List all available modules in the selected provider and service
@@ -184,6 +268,7 @@ def import_check(check_path: str) -> ModuleType:
return lib
# Sets the Output_From_Options to be used in the output modes
def set_output_options(
quiet: bool,
output_modes: list,
@@ -191,8 +276,10 @@ def set_output_options(
security_hub_enabled: bool,
output_filename: str,
allowlist_file: str,
bulk_checks_metadata: dict,
verbose: bool,
):
"""Sets the Output_From_Options to be used in the output modes"""
global output_options
output_options = Output_From_Options(
is_quiet=quiet,
@@ -201,6 +288,7 @@ def set_output_options(
security_hub_enabled=security_hub_enabled,
output_filename=output_filename,
allowlist_file=allowlist_file,
bulk_checks_metadata=bulk_checks_metadata,
verbose=verbose,
# set input options here
)
@@ -211,15 +299,15 @@ def run_check(check: Check, output_options: Output_From_Options) -> list:
findings = []
if output_options.verbose or output_options.is_quiet:
print(
f"\nCheck ID: {check.checkID} - {Fore.MAGENTA}{check.serviceName}{Fore.YELLOW} [{check.severity}]{Style.RESET_ALL}"
f"\nCheck ID: {check.CheckID} - {Fore.MAGENTA}{check.ServiceName}{Fore.YELLOW} [{check.Severity}]{Style.RESET_ALL}"
)
logger.debug(f"Executing check: {check.checkID}")
logger.debug(f"Executing check: {check.CheckID}")
try:
findings = check.execute()
except Exception as error:
print(f"Something went wrong in {check.checkID}, please use --log-level ERROR")
print(f"Something went wrong in {check.CheckID}, please use --log-level ERROR")
logger.error(
f"{check.checkID} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
f"{check.CheckID} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
finally:
return findings
@@ -264,13 +352,14 @@ def execute_checks(
# If check does not exists in the provider or is from another provider
except ModuleNotFoundError:
logger.error(
logger.critical(
f"Check '{check_name}' was not found for the {provider.upper()} provider"
)
bar.title = f"-> {Fore.RED}Scan was aborted!{Style.RESET_ALL}"
sys.exit()
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
bar.title = f"-> {Fore.GREEN}Scan is completed!"
print(Style.RESET_ALL)
bar.title = f"-> {Fore.GREEN}Scan is completed!{Style.RESET_ALL}"
return all_findings

View File

@@ -1,10 +1,13 @@
import os
from unittest import mock
from lib.check.check import (
bulk_load_compliance_frameworks,
exclude_checks_to_run,
exclude_groups_to_run,
exclude_services_to_run,
load_checks_to_execute_from_groups,
parse_checks_from_compliance_framework,
parse_checks_from_file,
parse_groups_from_file,
)
@@ -12,17 +15,6 @@ from lib.check.models import load_check_metadata
class Test_Check:
# def test_import_check(self):
# test_cases = [
# {
# "name": "Test valid check path",
# "input": "providers.aws.services.iam.iam_disable_30_days_credentials.iam_disable_30_days_credentials",
# "expected": "providers.aws.services.iam.iam_disable_30_days_credentials.iam_disable_30_days_credentials",
# }
# ]
# for test in test_cases:
# assert importlib.import_module(test["input"]).__name__ == test["expected"
def test_parse_groups_from_file(self):
test_cases = [
{
@@ -222,3 +214,75 @@ class Test_Check:
exclude_services_to_run(checks_to_run, excluded_services, provider)
== test["expected"]
)
def test_parse_checks_from_compliance_framework_two(self):
test_case = {
"input": {"compliance_frameworks": ["cis_v1.4_aws", "ens_v3_aws"]},
"expected": {
"vpc_flow_logs_enabled",
"ec2_ebs_snapshot_encryption",
"iam_user_mfa_enabled_console_access",
"cloudtrail_multi_region_enabled",
"ec2_elbv2_insecure_ssl_ciphers",
"guardduty_is_enabled",
"s3_bucket_default_encryption",
"cloudfront_distributions_https_enabled",
"iam_avoid_root_usage",
"s3_bucket_secure_transport_policy",
},
}
with mock.patch(
"lib.check.check.compliance_specification_dir",
new=f"{os.path.dirname(os.path.realpath(__file__))}/fixtures",
):
provider = "aws"
bulk_compliance_frameworks = bulk_load_compliance_frameworks(provider)
compliance_frameworks = test_case["input"]["compliance_frameworks"]
assert (
parse_checks_from_compliance_framework(
compliance_frameworks, bulk_compliance_frameworks
)
== test_case["expected"]
)
def test_parse_checks_from_compliance_framework_one(self):
test_case = {
"input": {"compliance_frameworks": ["cis_v1.4_aws"]},
"expected": {
"iam_user_mfa_enabled_console_access",
"s3_bucket_default_encryption",
"iam_avoid_root_usage",
},
}
with mock.patch(
"lib.check.check.compliance_specification_dir",
new=f"{os.path.dirname(os.path.realpath(__file__))}/fixtures",
):
provider = "aws"
bulk_compliance_frameworks = bulk_load_compliance_frameworks(provider)
compliance_frameworks = test_case["input"]["compliance_frameworks"]
assert (
parse_checks_from_compliance_framework(
compliance_frameworks, bulk_compliance_frameworks
)
== test_case["expected"]
)
def test_parse_checks_from_compliance_framework_no_compliance(self):
test_case = {
"input": {"compliance_frameworks": []},
"expected": set(),
}
with mock.patch(
"lib.check.check.compliance_specification_dir",
new=f"{os.path.dirname(os.path.realpath(__file__))}/fixtures",
):
provider = "aws"
bulk_compliance_frameworks = bulk_load_compliance_frameworks(provider)
compliance_frameworks = test_case["input"]["compliance_frameworks"]
assert (
parse_checks_from_compliance_framework(
compliance_frameworks, bulk_compliance_frameworks
)
== test_case["expected"]
)

View File

@@ -1,6 +1,7 @@
from config.config import groups_file
from lib.check.check import (
from lib.check.check import ( # load_checks_to_execute_from_compliance_framework,
load_checks_to_execute_from_groups,
parse_checks_from_compliance_framework,
parse_checks_from_file,
parse_groups_from_file,
recover_checks_from_provider,
@@ -8,18 +9,20 @@ from lib.check.check import (
from lib.logger import logger
# Generate the list of checks to execute
# test this function
# Generate the list of checks to execute
# PENDING Test for this function
def load_checks_to_execute(
bulk_checks_metadata: dict,
bulk_compliance_frameworks: dict,
checks_file: str,
check_list: list,
service_list: list,
group_list: list,
severities: list,
compliance_frameworks: list,
provider: str,
) -> set:
"""Generate the list of checks to execute based on the cloud provider and input arguments specified"""
checks_to_execute = set()
# Handle if there are checks passed using -c/--checks
@@ -39,7 +42,7 @@ def load_checks_to_execute(
try:
checks_to_execute = parse_checks_from_file(checks_file, provider)
except Exception as e:
logger.error(f"{e.__class__.__name__} -- {e}")
logger.error(f"{e.__class__.__name__}[{e.__traceback__.tb_lineno}] -- {e}")
# Handle if there are services passed using -s/--services
elif service_list:
@@ -65,7 +68,16 @@ def load_checks_to_execute(
available_groups, group_list, provider
)
except Exception as e:
logger.error(f"{e.__class__.__name__} -- {e}")
logger.error(f"{e.__class__.__name__}[{e.__traceback__.tb_lineno}] -- {e}")
# Handle if there are compliance frameworks passed using --compliance
elif compliance_frameworks:
try:
checks_to_execute = parse_checks_from_compliance_framework(
compliance_frameworks, bulk_compliance_frameworks
)
except Exception as e:
logger.error(f"{e.__class__.__name__}[{e.__traceback__.tb_lineno}] -- {e}")
# If there are no checks passed as argument
else:
@@ -73,7 +85,7 @@ def load_checks_to_execute(
# Get all check modules to run with the specific provider
checks = recover_checks_from_provider(provider)
except Exception as e:
logger.error(f"{e.__class__.__name__} -- {e}")
logger.error(f"{e.__class__.__name__}[{e.__traceback__.tb_lineno}] -- {e}")
else:
for check_name in checks:
# Recover check name from import path (last part)

43
lib/check/compliance.py Normal file
View File

@@ -0,0 +1,43 @@
import sys
from lib.check.compliance_models import Compliance_Base_Model, Compliance_Requirement
from lib.logger import logger
def update_checks_metadata_with_compliance(
bulk_compliance_frameworks: dict, bulk_checks_metadata: dict
):
"""Update the check metadata model with the compliance framework"""
try:
for check in bulk_checks_metadata:
check_compliance = []
for framework in bulk_compliance_frameworks.values():
for requirement in framework.Requirements:
compliance_requirements = []
if check in requirement.Checks:
# Create the Compliance_Requirement
requirement = Compliance_Requirement(
Id=requirement.Id,
Description=requirement.Description,
Attributes=requirement.Attributes,
Checks=requirement.Checks,
)
# For the check metadata we don't need the "Checks" key
delattr(requirement, "Checks")
# Include the requirment into the check's framework requirements
compliance_requirements.append(requirement)
# Create the Compliance_Model
compliance = Compliance_Base_Model(
Framework=framework.Framework,
Provider=framework.Provider,
Version=framework.Version,
Requirements=compliance_requirements,
)
# Include the compliance framework for the check
check_compliance.append(compliance)
# Save it into the check's metadata
bulk_checks_metadata[check].Compliance = check_compliance
return bulk_checks_metadata
except Exception as e:
logger.critical(f"{e.__class__.__name__}[{e.__traceback__.tb_lineno}] -- {e}")
sys.exit()

View File

@@ -0,0 +1,75 @@
import sys
from enum import Enum
from typing import Any, List, Optional, Union
from pydantic import BaseModel, ValidationError
from lib.logger import logger
# ENS - Esquema Nacional de Seguridad - España
class ENS_Requirements_Nivel(str, Enum):
"""ENS V3 Requirements Level"""
bajo = "bajo"
medio = "medio"
alto = "alto"
pytec = "pytec"
class ENS_Requirements_Dimensiones(str, Enum):
"""ENS V3 Requirements Dimensions"""
confidencialidad = "confidencialidad"
integridad = "integridad"
trazabilidad = "trazabilidad"
autenticidad = "autenticidad"
disponibilidad = "disponibilidad"
class ENS_Requirements(BaseModel):
"""ENS V3 Framework Requirements"""
IdGrupoControl: str
Marco: str
Categoria: str
Descripcion_Control: str
Nivel: list[ENS_Requirements_Nivel]
Dimensiones: list[ENS_Requirements_Dimensiones]
# Base Compliance Model
class Compliance_Requirement(BaseModel):
"""Compliance_Requirement holds the base model for every requirement within a compliance framework"""
Id: str
Description: str
Attributes: list[Union[ENS_Requirements, Any]]
Checks: List[str]
class Compliance_Base_Model(BaseModel):
"""Compliance_Base_Model holds the base model for every compliance framework"""
Framework: str
Provider: Optional[str]
Version: str
Requirements: list[Compliance_Requirement]
# Testing Pending
def load_compliance_framework(
compliance_specification_file: str,
) -> Compliance_Base_Model:
"""load_compliance_framework loads and parse a Compliance Framework Specification"""
try:
compliance_framework = Compliance_Base_Model.parse_file(
compliance_specification_file
)
except ValidationError as error:
logger.critical(
f"Compliance Framework Specification from {compliance_specification_file} is not valid: {error}"
)
sys.exit()
else:
return compliance_framework

View File

@@ -0,0 +1,82 @@
{
"Framework": "CIS",
"Provider": "AWS",
"Version": "1.4",
"Requirements": [
{
"Id": "1.4",
"Description": "Ensure no 'root' user account access key exists (Automated)",
"Attributes": [
{
"Section": "1. Identity and Access Management (IAM)",
"Level": [
"level1"
],
"Rationale": "Removing access keys associated with the 'root' user account limits vectors by which the account can be compromised. Additionally, removing the 'root' access keys encourages the creation and use of role based accounts that are least privileged.",
"Guidance": "The 'root' user account is the most privileged user in an AWS account. AWS Access Keys provide programmatic access to a given AWS account. It is recommended that all access keys associated with the 'root' user account be removed.",
"Additional information": "IAM User account \"root\" for us-gov cloud regions is not enabled by default. However, on request to AWS support enables 'root' access only through access-keys (CLI, API methods) for us-gov cloud region.",
"References": [
"CCE-78910-7",
"https://docs.aws.amazon.com/general/latest/gr/aws-access-keys-best-practices.html",
"https://docs.aws.amazon.com/general/latest/gr/managing-aws-access-keys.html",
"https://docs.aws.amazon.com/IAM/latest/APIReference/API_GetAccountSummary.html",
"https://aws.amazon.com/blogs/security/an-easier-way-to-determine-the-presence-of-aws-account-access-keys/"
]
}
],
"Checks": [
"iam_avoid_root_usage"
]
},
{
"Id": "1.10",
"Description": "Ensure multi-factor authentication (MFA) is enabled for all IAM users that have a console password (Automated)",
"Attributes": [
{
"Section": "1. Identity and Access Management (IAM)",
"Level": [
"level1"
],
"Guidance": "Multi-Factor Authentication (MFA) adds an extra layer of authentication assurance beyond traditional credentials. With MFA enabled, when a user signs in to the AWS Console, they will be prompted for their user name and password as well as for an authentication code from their physical or virtual MFA token. It is recommended that MFA be enabled for all accounts that have a console password.",
"Rationale": "Enabling MFA provides increased security for console access as it requires the authenticating principal to possess a device that displays a time-sensitive key and have knowledge of a credential.",
"Impact": "AWS will soon end support for SMS multi-factor authentication (MFA). New customers are not allowed to use this feature. We recommend that existing customers switch to one of the following alternative methods of MFA.",
"Additional information": "Forced IAM User Self-Service Remediation. Amazon has published a pattern that forces users to self-service setup MFA before they have access to their complete permissions set. Until they complete this step, they cannot access their full permissions. This pattern can be used on new AWS accounts. It can also be used on existing accounts - it is recommended users are given instructions and a grace period to accomplish MFA enrollment before active enforcement on existing AWS accounts.",
"References": [
"CCE-78901-6",
"https://tools.ietf.org/html/rfc6238",
"https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_mfa.html",
"https://docs.aws.amazon.com/IAM/latest/UserGuide/best-practices.html#enable-mfa-for-privileged-users",
"https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_mfa_enable_virtual.html",
"https://blogs.aws.amazon.com/security/post/Tx2SJJYE082KBUK/How-to-Delegate-Management-of-Multi-Factor-Authentication-to-AWS-IAM-Users"
]
}
],
"Checks": [
"iam_user_mfa_enabled_console_access"
]
},
{
"Id": "2.1.1",
"Description": "Ensure all S3 buckets employ encryption-at-rest (Automated)",
"Attributes": [
{
"Section": "2. Storage",
"Level": [
"level2"
],
"Guidance": "Amazon S3 provides a variety of no, or low, cost encryption options to protect data at rest.",
"Rationale": "Encrypting data at rest reduces the likelihood that it is unintentionally exposed and can nullify the impact of disclosure if the encryption remains unbroken.",
"Impact": "Amazon S3 buckets with default bucket encryption using SSE-KMS cannot be used as destination buckets for Amazon S3 server access logging. Only SSE-S3 default encryption is supported for server access log destination buckets.",
"Additional information": "S3 bucket encryption only applies to objects as they are placed in the bucket. Enabling S3 bucket encryption does not encrypt objects previously stored within the bucket",
"References": [
"https://docs.aws.amazon.com/AmazonS3/latest/user-guide/default-bucket-encryption.html",
"https://docs.aws.amazon.com/AmazonS3/latest/dev/bucket-encryption.html#bucket-encryption-related-resources"
]
}
],
"Checks": [
"s3_bucket_default_encryption"
]
}
]
}

View File

@@ -0,0 +1,82 @@
{
"Framework": "ENS",
"Version": "3",
"Requirements": [
{
"Id": "op.mon.1",
"Description": "Detección de intrusión",
"Attributes": [
{
"Marco": "operacional",
"Categoria": "monitorización del sistema",
"Descripcion_Control": "- En ausencia de otras herramientas de terceros, habilitar Amazon GuarDuty para la detección de amenazas e intrusiones..- Activar el servicio de eventos AWS CloudTrail para todas las regiones..- Activar el servicio VPC FlowLogs..-Deberá habilitarse Amazon GuardDuty para todas las regiones tanto en la cuenta raíz como en las cuentas miembro de un entorno multi-cuenta..-Todas las cuentas miembro deberán estar añadidas para la supervisión bajo la cuenta raíz..-La adminsitración de Amazon GuardDuty quedará delegada exclusivamente a la cuenta de seguridad para garantizar una correcta asignación de los roles para este servicio.",
"Nivel": [
"bajo",
"medio",
"alto"
],
"Dimensiones": [
"confidencialidad",
"integridad",
"trazabilidad",
"autenticidad",
"disponibilidad"
]
}
],
"Checks": [
"guardduty_is_enabled",
"cloudtrail_multi_region_enabled",
"vpc_flow_logs_enabled",
"guardduty_is_enabled"
]
},
{
"Id": "op.mon.3",
"Description": "Protección de la integridad y de la autenticidad",
"Attributes": [
{
"Marco": "operacional",
"Categoria": "protección de las comunicaciones",
"Descripcion_Control": "- Habilitar TLS en los balanceadores de carga ELB.- Evitar el uso de protocolos de cifrado inseguros en la conexión TLS entre clientes y balanceadores de carga.- Asegurar que los Buckets de almacenamiento S3 apliquen cifrado para la transferencia de datos empleando TLS.- Asegurar que la distribución entre frontales CloudFront y sus orígenes únicamente emplee tráfico HTTPS.",
"Nivel": [
"bajo",
"medio",
"alto"
],
"Dimensiones": [
"integridad",
"autenticidad"
]
}
],
"Checks": [
"ec2_elbv2_insecure_ssl_ciphers",
"ec2_elbv2_insecure_ssl_ciphers",
"s3_bucket_secure_transport_policy",
"cloudfront_distributions_https_enabled"
]
},
{
"Id": "mp.si.2.r2.1",
"Description": "Copias de seguridad",
"Attributes": [
{
"Marco": "medidas de protección",
"Categoria": "protección de los soportes de información",
"Descripcion_Control": "Se deberá asegurar el cifrado de las copias de seguridad de EBS.",
"Nivel": [
"alto"
],
"Dimensiones": [
"confidencialidad",
"integridad"
]
}
],
"Checks": [
"ec2_ebs_snapshot_encryption"
]
}
]
}

View File

@@ -1,7 +1,6 @@
import sys
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import List
from pydantic import BaseModel, ValidationError
@@ -10,34 +9,21 @@ from lib.logger import logger
@dataclass
class Output_From_Options:
"""Class to store the Prowler output modes options"""
is_quiet: bool
output_modes: list
output_directory: str
security_hub_enabled: bool
output_filename: str
allowlist_file: str
bulk_checks_metadata: dict
verbose: str
# Testing Pending
def load_check_metadata(metadata_file: str) -> dict:
try:
check_metadata = Check_Metadata_Model.parse_file(metadata_file)
except ValidationError as error:
logger.critical(f"Metadata from {metadata_file} is not valid: {error}")
sys.exit()
else:
return check_metadata
class ComplianceItem(BaseModel):
Control: List[str]
Framework: str
Group: List[str]
Version: str
class Code(BaseModel):
"""Check's remediation information using IaC like CloudFormation, Terraform or the native CLI"""
NativeIaC: str
Terraform: str
CLI: str
@@ -45,22 +31,26 @@ class Code(BaseModel):
class Recommendation(BaseModel):
"""Check's recommendation information"""
Text: str
Url: str
class Remediation(BaseModel):
"""Check's remediation: Code and Recommendation"""
Code: Code
Recommendation: Recommendation
class Check_Metadata_Model(BaseModel):
"""Check Metadata Model"""
Provider: str
CheckID: str
# CheckName: str
CheckTitle: str
# CheckAlias: str
CheckType: List[str]
CheckType: list[str]
ServiceName: str
SubServiceName: str
ResourceIdTemplate: str
@@ -70,151 +60,67 @@ class Check_Metadata_Model(BaseModel):
Risk: str
RelatedUrl: str
Remediation: Remediation
Categories: List[str]
Categories: list[str]
Tags: dict
DependsOn: List[str]
RelatedTo: List[str]
DependsOn: list[str]
RelatedTo: list[str]
Notes: str
Compliance: List[ComplianceItem]
# We set the compliance to None to
# store the compliance later if supplied
Compliance: list = None
class Check(ABC):
def __init__(self):
# Load metadata from check
class Check(ABC, Check_Metadata_Model):
"""Prowler Check"""
def __init__(self, **data):
"""Check's init function. Calls the CheckMetadataModel init."""
# Parse the Check's metadata file
check_path_name = self.__class__.__module__.replace(".", "/")
metadata_file = f"{check_path_name}.metadata.json"
self.__check_metadata__ = load_check_metadata(metadata_file)
# Assign metadata values
self.__Provider__ = self.__check_metadata__.Provider
self.__CheckID__ = self.__check_metadata__.CheckID
# self.__CheckName__ = self.__check_metadata__.CheckName
self.__CheckTitle__ = self.__check_metadata__.CheckTitle
# self.__CheckAlias__ = self.__check_metadata__.CheckAlias
self.__CheckType__ = self.__check_metadata__.CheckType
self.__ServiceName__ = self.__check_metadata__.ServiceName
self.__SubServiceName__ = self.__check_metadata__.SubServiceName
self.__ResourceIdTemplate__ = self.__check_metadata__.ResourceIdTemplate
self.__Severity__ = self.__check_metadata__.Severity
self.__ResourceType__ = self.__check_metadata__.ResourceType
self.__Description__ = self.__check_metadata__.Description
self.__Risk__ = self.__check_metadata__.Risk
self.__RelatedUrl__ = self.__check_metadata__.RelatedUrl
self.__Remediation__ = self.__check_metadata__.Remediation
self.__Categories__ = self.__check_metadata__.Categories
self.__Tags__ = self.__check_metadata__.Tags
self.__DependsOn__ = self.__check_metadata__.DependsOn
self.__RelatedTo__ = self.__check_metadata__.RelatedTo
self.__Notes__ = self.__check_metadata__.Notes
self.__Compliance__ = self.__check_metadata__.Compliance
# Store it to validate them with Pydantic
data = Check_Metadata_Model.parse_file(metadata_file).dict()
# Calls parents init function
super().__init__(**data)
@property
def provider(self):
return self.__Provider__
@property
def checkID(self):
return self.__CheckID__
# @property
# def checkName(self):
# return self.__CheckName__
@property
def checkTitle(self):
return self.__CheckTitle__
# @property
# def checkAlias(self):
# return self.__CheckAlias__
@property
def checkType(self):
return self.__CheckType__
@property
def serviceName(self):
return self.__ServiceName__
@property
def subServiceName(self):
return self.__SubServiceName__
@property
def resourceIdTemplate(self):
return self.__ResourceIdTemplate__
@property
def severity(self):
return self.__Severity__
@property
def resourceType(self):
return self.__ResourceType__
@property
def description(self):
return self.__Description__
@property
def relatedUrl(self):
return self.__RelatedUrl__
@property
def risk(self):
return self.__Risk__
@property
def remediation(self):
return self.__Remediation__
@property
def categories(self):
return self.__Categories__
@property
def tags(self):
return self.__Tags__
@property
def dependsOn(self):
return self.__DependsOn__
@property
def relatedTo(self):
return self.__RelatedTo__
@property
def notes(self):
return self.__Notes__
@property
def compliance(self):
return self.__Compliance__
@property
def metadata(self):
return self.__check_metadata__
def metadata(self) -> dict:
"""Return the JSON representation of the check's metadata"""
return self.json()
@abstractmethod
def execute(self):
pass
"""Execute the check's logic"""
@dataclass
class Check_Report:
"""Contains the Check's finding information."""
status: str
region: str
status_extended: str
check_metadata: dict
check_metadata: Check_Metadata_Model
resource_id: str
resource_details: str
resource_tags: list
resource_arn: str
def __init__(self, metadata):
self.check_metadata = metadata
self.check_metadata = Check_Metadata_Model.parse_raw(metadata)
self.status_extended = ""
self.resource_details = ""
self.resource_tags = []
self.resource_id = ""
self.resource_arn = ""
# Testing Pending
def load_check_metadata(metadata_file: str) -> Check_Metadata_Model:
"""load_check_metadata loads and parse a Check's metadata file"""
try:
check_metadata = Check_Metadata_Model.parse_file(metadata_file)
except ValidationError as error:
logger.critical(f"Metadata from {metadata_file} is not valid: {error}")
sys.exit()
else:
return check_metadata

View File

@@ -4,7 +4,7 @@ from typing import List, Optional
from pydantic import BaseModel
from config.config import timestamp
from lib.check.models import Check_Report, ComplianceItem, Remediation
from lib.check.models import Check_Report, Remediation
from providers.aws.lib.audit_info.models import AWS_Organizations_Info
@@ -25,7 +25,6 @@ class Check_Output_JSON(BaseModel):
OrganizationsInfo: Optional[AWS_Organizations_Info]
Region: str = ""
CheckID: str
# CheckName: str
CheckTitle: str
CheckType: List[str]
ServiceName: str
@@ -46,7 +45,7 @@ class Check_Output_JSON(BaseModel):
DependsOn: List[str]
RelatedTo: List[str]
Notes: str
Compliance: List[ComplianceItem]
# Compliance: List[ComplianceItem]
# JSON ASFF Output
@@ -92,6 +91,26 @@ class Check_Output_JSON_ASFF(BaseModel):
Remediation: dict = None
class Check_Output_CSV_ENS_RD2022(BaseModel):
Provider: str
AccountId: str
Region: str
AssessmentDate: str
Requirements_Id: str
Requirements_Description: str
Requirements_Attributes_IdGrupoControl: str
Requirements_Attributes_Marco: str
Requirements_Attributes_Categoria: str
Requirements_Attributes_DescripcionControl: str
Requirements_Attributes_Nivel: str
Requirements_Attributes_Tipo: str
Requirements_Attributes_Dimensiones: str
Status: str
StatusExtended: str
ResourceId: str
CheckId: str
@dataclass
class Check_Output_CSV:
assessment_start_time: str
@@ -106,7 +125,6 @@ class Check_Output_CSV:
account_tags: str
region: str
check_id: str
# check_name: str
check_title: str
check_type: str
status: str
@@ -132,7 +150,7 @@ class Check_Output_CSV:
depends_on: str
related_to: str
notes: str
compliance: str
# compliance: str
def get_csv_header(self):
csv_header = []
@@ -160,7 +178,6 @@ class Check_Output_CSV:
self.account_tags = organizations.account_details_tags
self.region = report.region
self.check_id = report.check_metadata.CheckID
# self.check_name = report.check_metadata.CheckName
self.check_title = report.check_metadata.CheckTitle
self.check_type = report.check_metadata.CheckType
self.status = report.status
@@ -198,7 +215,7 @@ class Check_Output_CSV:
self.depends_on = self.__unroll_list__(report.check_metadata.DependsOn)
self.related_to = self.__unroll_list__(report.check_metadata.RelatedTo)
self.notes = report.check_metadata.Notes
self.compliance = self.__unroll_compliance__(report.check_metadata.Compliance)
# self.compliance = self.__unroll_compliance__(report.check_metadata.Compliance)
def __unroll_list__(self, listed_items: list):
unrolled_items = ""

View File

@@ -2,6 +2,8 @@ import json
import os
import sys
from csv import DictWriter
from io import TextIOWrapper
from typing import Any
from colorama import Fore, Style
from tabulate import tabulate
@@ -12,12 +14,14 @@ from config.config import (
json_file_suffix,
orange_color,
prowler_version,
timestamp,
timestamp_iso,
timestamp_utc,
)
from lib.logger import logger
from lib.outputs.models import (
Check_Output_CSV,
Check_Output_CSV_ENS_RD2022,
Check_Output_JSON,
Check_Output_JSON_ASFF,
Compliance,
@@ -32,18 +36,17 @@ from providers.aws.lib.security_hub.security_hub import send_to_security_hub
def report(check_findings, output_options, audit_info):
# Sort check findings
check_findings.sort(key=lambda x: x.region)
csv_fields = []
# check output options
# Generate the required output files
# csv_fields = []
file_descriptors = {}
if output_options.output_modes:
if "csv" in output_options.output_modes:
csv_fields = generate_csv_fields()
# We have to create the required output files
file_descriptors = fill_file_descriptors(
output_options.output_modes,
output_options.output_directory,
csv_fields,
output_options.output_filename,
)
@@ -70,7 +73,64 @@ def report(check_findings, output_options, audit_info):
f"\t{color}{finding.status}{Style.RESET_ALL} {finding.region}: {finding.status_extended}"
)
if file_descriptors:
# sending the finding to input options
if "ens_rd2022_aws" in output_options.output_modes:
# We have to retrieve all the check's compliance requirements
check_compliance = output_options.bulk_checks_metadata[
finding.check_metadata.CheckID
].Compliance
for compliance in check_compliance:
if (
compliance.Framework == "ENS"
and compliance.Version == "RD2022"
):
for requirement in compliance.Requirements:
requirement_description = requirement.Description
requirement_id = requirement.Id
for attribute in requirement.Attributes:
compliance_row = Check_Output_CSV_ENS_RD2022(
Provider=finding.check_metadata.Provider,
AccountId=audit_info.audited_account,
Region=finding.region,
AssessmentDate=timestamp.isoformat(),
Requirements_Id=requirement_id,
Requirements_Description=requirement_description,
Requirements_Attributes_IdGrupoControl=attribute.get(
"IdGrupoControl"
),
Requirements_Attributes_Marco=attribute.get(
"Marco"
),
Requirements_Attributes_Categoria=attribute.get(
"Categoria"
),
Requirements_Attributes_DescripcionControl=attribute.get(
"DescripcionControl"
),
Requirements_Attributes_Nivel=attribute.get(
"Nivel"
),
Requirements_Attributes_Tipo=attribute.get(
"Tipo"
),
Requirements_Attributes_Dimensiones=",".join(
attribute.get("Dimensiones")
),
Status=finding.status,
StatusExtended=finding.status_extended,
ResourceId=finding.resource_id,
CheckId=finding.check_metadata.CheckID,
)
csv_header = generate_csv_fields(
Check_Output_CSV_ENS_RD2022
)
csv_writer = DictWriter(
file_descriptors["ens_rd2022_aws"],
fieldnames=csv_header,
delimiter=";",
)
csv_writer.writerow(compliance_row.__dict__)
if "csv" in file_descriptors:
finding_output = Check_Output_CSV(
audit_info.audited_account,
@@ -79,7 +139,9 @@ def report(check_findings, output_options, audit_info):
audit_info.organizations_metadata,
)
csv_writer = DictWriter(
file_descriptors["csv"], fieldnames=csv_fields, delimiter=";"
file_descriptors["csv"],
fieldnames=generate_csv_fields(Check_Output_CSV),
delimiter=";",
)
csv_writer.writerow(finding_output.__dict__)
@@ -117,65 +179,75 @@ def report(check_findings, output_options, audit_info):
file_descriptors.get(file_descriptor).close()
def fill_file_descriptors(output_modes, output_directory, csv_fields, output_filename):
def initialize_file_descriptor(
filename: str, output_mode: str, format: Any = None
) -> TextIOWrapper:
"""Open/Create the output file. If needed include headers or the required format"""
if file_exists(filename):
file_descriptor = open_file(
filename,
"a",
)
else:
file_descriptor = open_file(
filename,
"a",
)
if output_mode in ("csv", "ens_rd2022_aws"):
# Format is the class model of the CSV format to print the headers
csv_header = [x.upper() for x in generate_csv_fields(format)]
csv_writer = DictWriter(
file_descriptor, fieldnames=csv_header, delimiter=";"
)
csv_writer.writeheader()
if output_mode in ("json", "json-asff"):
file_descriptor = open_file(
filename,
"a",
)
file_descriptor.write("[")
return file_descriptor
def fill_file_descriptors(output_modes, output_directory, output_filename):
file_descriptors = {}
for output_mode in output_modes:
if output_mode == "csv":
filename = f"{output_directory}/{output_filename}{csv_file_suffix}"
if file_exists(filename):
file_descriptor = open_file(
filename,
"a",
if output_modes:
for output_mode in output_modes:
if output_mode == "csv":
filename = f"{output_directory}/{output_filename}{csv_file_suffix}"
file_descriptor = initialize_file_descriptor(
filename, output_mode, Check_Output_CSV
)
else:
file_descriptor = open_file(
filename,
"a",
)
csv_header = [x.upper() for x in csv_fields]
csv_writer = DictWriter(
file_descriptor, fieldnames=csv_header, delimiter=";"
)
csv_writer.writeheader()
file_descriptors.update({output_mode: file_descriptor})
file_descriptors.update({output_mode: file_descriptor})
if output_mode == "json":
filename = f"{output_directory}/{output_filename}{json_file_suffix}"
file_descriptor = initialize_file_descriptor(filename, output_mode)
file_descriptors.update({output_mode: file_descriptor})
if output_mode == "json":
filename = f"{output_directory}/{output_filename}{json_file_suffix}"
if file_exists(filename):
file_descriptor = open_file(
filename,
"a",
if output_mode == "json-asff":
filename = (
f"{output_directory}/{output_filename}{json_asff_file_suffix}"
)
else:
file_descriptor = open_file(
filename,
"a",
)
file_descriptor.write("[")
file_descriptor = initialize_file_descriptor(filename, output_mode)
file_descriptors.update({output_mode: file_descriptor})
file_descriptors.update({output_mode: file_descriptor})
if output_mode == "json-asff":
filename = f"{output_directory}/{output_filename}{json_asff_file_suffix}"
if file_exists(filename):
file_descriptor = open_file(
filename,
"a",
if output_mode == "ens_rd2022_aws":
filename = f"{output_directory}/{output_filename}_ens_rd2022_aws{csv_file_suffix}"
file_descriptor = initialize_file_descriptor(
filename, output_mode, Check_Output_CSV_ENS_RD2022
)
else:
file_descriptor = open_file(
filename,
"a",
)
file_descriptor.write("[")
file_descriptors.update({output_mode: file_descriptor})
file_descriptors.update({output_mode: file_descriptor})
return file_descriptors
def set_report_color(status):
def set_report_color(status: str) -> str:
"""Return the color for a give result status"""
color = ""
if status == "PASS":
color = Fore.GREEN
@@ -192,9 +264,10 @@ def set_report_color(status):
return color
def generate_csv_fields():
def generate_csv_fields(format: Any) -> list[str]:
"""Generates the CSV headers for the given class"""
csv_fields = []
for field in Check_Output_CSV.__dict__["__annotations__"].keys():
for field in format.__dict__.get("__annotations__").keys():
csv_fields.append(field)
return csv_fields
@@ -271,7 +344,9 @@ def close_json(output_filename, output_directory, mode):
file_descriptor.write("]")
file_descriptor.close()
except Exception as error:
logger.critical(f"{error.__class__.__name__} -- {error}")
logger.critical(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}] -- {error}"
)
sys.exit()
@@ -294,7 +369,9 @@ def send_to_s3_bucket(
s3_client.upload_file(file_name, bucket_name, object_name)
except Exception as error:
logger.critical(f"{error.__class__.__name__} -- {error}")
logger.critical(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}] -- {error}"
)
sys.exit()
@@ -305,75 +382,76 @@ def display_summary_table(
output_directory: str,
):
try:
if findings:
current = {
"Service": "",
"Provider": "",
"Critical": 0,
"High": 0,
"Medium": 0,
"Low": 0,
}
findings_table = {
"Provider": [],
"Service": [],
"Status": [],
"Critical": [],
"High": [],
"Medium": [],
"Low": [],
}
pass_count = fail_count = 0
for finding in findings:
# If new service and not first, add previous row
if (
current["Service"] != finding.check_metadata.ServiceName
and current["Service"]
):
current = {
"Service": "",
"Provider": "",
"Total": 0,
"Critical": 0,
"High": 0,
"Medium": 0,
"Low": 0,
}
findings_table = {
"Provider": [],
"Service": [],
"Status": [],
"Critical": [],
"High": [],
"Medium": [],
"Low": [],
}
pass_count = fail_count = 0
for finding in findings:
# If new service and not first, add previous row
if (
current["Service"] != finding.check_metadata.ServiceName
and current["Service"]
):
add_service_to_table(findings_table, current)
add_service_to_table(findings_table, current)
current["Critical"] = current["High"] = current["Medium"] = current[
"Low"
] = 0
current["Total"] = current["Critical"] = current["High"] = current[
"Medium"
] = current["Low"] = 0
current["Service"] = finding.check_metadata.ServiceName
current["Provider"] = finding.check_metadata.Provider
current["Service"] = finding.check_metadata.ServiceName
current["Provider"] = finding.check_metadata.Provider
if finding.status == "PASS":
pass_count += 1
elif finding.status == "FAIL":
fail_count += 1
if finding.check_metadata.Severity == "critical":
current["Critical"] += 1
elif finding.check_metadata.Severity == "high":
current["High"] += 1
elif finding.check_metadata.Severity == "medium":
current["Medium"] += 1
elif finding.check_metadata.Severity == "low":
current["Low"] += 1
current["Total"] += 1
if finding.status == "PASS":
pass_count += 1
elif finding.status == "FAIL":
fail_count += 1
if finding.check_metadata.Severity == "critical":
current["Critical"] += 1
elif finding.check_metadata.Severity == "high":
current["High"] += 1
elif finding.check_metadata.Severity == "medium":
current["Medium"] += 1
elif finding.check_metadata.Severity == "low":
current["Low"] += 1
# Add final service
add_service_to_table(findings_table, current)
# Add final service
add_service_to_table(findings_table, current)
print("\nOverview Results:")
overview_table = [
[
f"{Fore.RED}{round(fail_count/len(findings)*100, 2)}% ({fail_count}) Failed{Style.RESET_ALL}",
f"{Fore.GREEN}{round(pass_count/len(findings)*100, 2)}% ({pass_count}) Passed{Style.RESET_ALL}",
]
print("\nOverview Results:")
overview_table = [
[
f"{Fore.RED}{round(fail_count/len(findings)*100, 2)}% ({fail_count}) Failed{Style.RESET_ALL}",
f"{Fore.GREEN}{round(pass_count/len(findings)*100, 2)}% ({pass_count}) Passed{Style.RESET_ALL}",
]
print(tabulate(overview_table, tablefmt="rounded_grid"))
print(
f"\nAccount {Fore.YELLOW}{audit_info.audited_account}{Style.RESET_ALL} Scan Results (severity columns are for fails only):"
)
print(tabulate(findings_table, headers="keys", tablefmt="rounded_grid"))
print(
f"{Style.BRIGHT}* You only see here those services that contains resources.{Style.RESET_ALL}"
)
print("\nDetailed results are in:")
print(f" - CSV: {output_directory}/{output_filename}.csv")
print(f" - JSON: {output_directory}/{output_filename}.json\n")
]
print(tabulate(overview_table, tablefmt="rounded_grid"))
print(
f"\nAccount {Fore.YELLOW}{audit_info.audited_account}{Style.RESET_ALL} Scan Results (severity columns are for fails only):"
)
print(tabulate(findings_table, headers="keys", tablefmt="rounded_grid"))
print(
f"{Style.BRIGHT}* You only see here those services that contains resources.{Style.RESET_ALL}"
)
print("\nDetailed results are in:")
print(f" - CSV: {output_directory}/{output_filename}.csv")
print(f" - JSON: {output_directory}/{output_filename}.json\n")
except Exception as error:
logger.critical(
@@ -389,9 +467,12 @@ def add_service_to_table(findings_table, current):
or current["Medium"] > 0
or current["Low"] > 0
):
current["Status"] = f"{Fore.RED}FAIL{Style.RESET_ALL}"
total_fails = (
current["Critical"] + current["High"] + current["Medium"] + current["Low"]
)
current["Status"] = f"{Fore.RED}FAIL ({total_fails}){Style.RESET_ALL}"
else:
current["Status"] = f"{Fore.GREEN}PASS{Style.RESET_ALL}"
current["Status"] = f"{Fore.GREEN}PASS ({current['Total']}){Style.RESET_ALL}"
findings_table["Provider"].append(current["Provider"])
findings_table["Service"].append(current["Service"])
findings_table["Status"].append(current["Status"])
@@ -403,3 +484,109 @@ def add_service_to_table(findings_table, current):
f"{Fore.YELLOW}{current['Medium']}{Style.RESET_ALL}"
)
findings_table["Low"].append(f"{Fore.BLUE}{current['Low']}{Style.RESET_ALL}")
def display_compliance_table(
findings: list,
bulk_checks_metadata: dict,
compliance_framework: str,
output_filename: str,
output_directory: str,
):
try:
if "ens_rd2022_aws" in compliance_framework:
marcos = {}
ens_compliance_table = {
"Proveedor": [],
"Marco/Categoria": [],
"Estado": [],
"PYTEC": [],
"Alto": [],
"Medio": [],
"Bajo": [],
}
pass_count = fail_count = 0
for finding in findings:
check = bulk_checks_metadata[finding.check_metadata.CheckID]
check_compliances = check.Compliance
for compliance in check_compliances:
if (
compliance.Framework == "ENS"
and compliance.Provider == "AWS"
and compliance.Version == "RD2022"
):
for requirement in compliance.Requirements:
for attribute in requirement.Attributes:
marco_categoria = (
f"{attribute['Marco']}/{attribute['Categoria']}"
)
# Check if Marco/Categoria exists
if marco_categoria not in marcos:
marcos[marco_categoria] = {
"Estado": f"{Fore.GREEN}CUMPLE{Style.RESET_ALL}",
"Pytec": 0,
"Alto": 0,
"Medio": 0,
"Bajo": 0,
}
if finding.status == "FAIL":
fail_count += 1
marcos[marco_categoria][
"Estado"
] = f"{Fore.RED}NO CUMPLE{Style.RESET_ALL}"
elif finding.status == "PASS":
pass_count += 1
if attribute["Nivel"] == "pytec":
marcos[marco_categoria]["Pytec"] += 1
elif attribute["Nivel"] == "alto":
marcos[marco_categoria]["Alto"] += 1
elif attribute["Nivel"] == "medio":
marcos[marco_categoria]["Medio"] += 1
elif attribute["Nivel"] == "bajo":
marcos[marco_categoria]["Bajo"] += 1
# Add results to table
for marco in marcos:
ens_compliance_table["Proveedor"].append("aws")
ens_compliance_table["Marco/Categoria"].append(marco)
ens_compliance_table["Estado"].append(marcos[marco]["Estado"])
ens_compliance_table["PYTEC"].append(
f"{Fore.LIGHTRED_EX}{marcos[marco]['Pytec']}{Style.RESET_ALL}"
)
ens_compliance_table["Alto"].append(
f"{Fore.RED}{marcos[marco]['Alto']}{Style.RESET_ALL}"
)
ens_compliance_table["Medio"].append(
f"{Fore.YELLOW}{marcos[marco]['Medio']}{Style.RESET_ALL}"
)
ens_compliance_table["Bajo"].append(
f"{Fore.BLUE}{marcos[marco]['Bajo']}{Style.RESET_ALL}"
)
print(
f"\nEstado de Cumplimiento de {Fore.YELLOW}ENS RD2022 - AWS{Style.RESET_ALL}:"
)
overview_table = [
[
f"{Fore.RED}{round(fail_count/(fail_count+pass_count)*100, 2)}% ({fail_count}) NO CUMPLE{Style.RESET_ALL}",
f"{Fore.GREEN}{round(pass_count/(fail_count+pass_count)*100, 2)}% ({pass_count}) CUMPLE{Style.RESET_ALL}",
]
]
print(tabulate(overview_table, tablefmt="rounded_grid"))
print(f"\nResultados de {Fore.YELLOW}ENS RD2022 - AWS{Style.RESET_ALL}:")
print(
tabulate(ens_compliance_table, headers="keys", tablefmt="rounded_grid")
)
print(
f"{Style.BRIGHT}* Solo aparece el Marco/Categoria que contiene resultados.{Style.RESET_ALL}"
)
print("\nResultados detallados en:")
print(
f" - CSV: {output_directory}/{output_filename}_{compliance_framework[0]}.csv\n"
)
except Exception as error:
logger.critical(
f"{error.__class__.__name__}:{error.__traceback__.tb_lineno} -- {error}"
)
sys.exit()

View File

@@ -2,6 +2,7 @@ import os
from os import path, remove
import boto3
import pytest
from colorama import Fore
from moto import mock_s3
@@ -9,14 +10,15 @@ from config.config import (
csv_file_suffix,
json_asff_file_suffix,
json_file_suffix,
orange_color,
output_file_timestamp,
prowler_version,
timestamp_iso,
timestamp_utc,
orange_color,
)
from lib.check.models import Check_Report, load_check_metadata
from lib.outputs.models import (
Check_Output_CSV,
Check_Output_JSON,
Check_Output_JSON_ASFF,
Compliance,
@@ -40,7 +42,7 @@ class Test_Outputs:
def test_fill_file_descriptors(self):
audited_account = "123456789012"
output_directory = f"{os.path.dirname(os.path.realpath(__file__))}"
csv_fields = generate_csv_fields()
generate_csv_fields(Check_Output_CSV)
test_output_modes = [
["csv"],
["json"],
@@ -98,7 +100,6 @@ class Test_Outputs:
test_output_file_descriptors = fill_file_descriptors(
output_mode_list,
output_directory,
csv_fields,
output_filename,
)
for output_mode in output_mode_list:
@@ -115,6 +116,17 @@ class Test_Outputs:
for status in test_status:
assert set_report_color(status) in test_colors
def test_set_report_color_invalid(self):
test_status = "INVALID"
with pytest.raises(Exception) as exc:
set_report_color(test_status)
assert "Invalid Report Status. Must be PASS, FAIL, ERROR or WARNING" in str(
exc.value
)
assert exc.type == Exception
def test_generate_csv_fields(self):
expected = [
"assessment_start_time",
@@ -154,10 +166,10 @@ class Test_Outputs:
"depends_on",
"related_to",
"notes",
"compliance",
# "compliance",
]
assert generate_csv_fields() == expected
assert generate_csv_fields(Check_Output_CSV) == expected
def test_fill_json(self):
input_audit_info = AWS_Audit_Info(
@@ -177,7 +189,7 @@ class Test_Outputs:
finding = Check_Report(
load_check_metadata(
f"{path.dirname(path.realpath(__file__))}/fixtures/metadata.json"
)
).json()
)
finding.resource_details = "Test resource details"
finding.resource_id = "test-resource"
@@ -221,7 +233,7 @@ class Test_Outputs:
finding = Check_Report(
load_check_metadata(
f"{path.dirname(path.realpath(__file__))}/fixtures/metadata.json"
)
).json()
)
finding.resource_details = "Test resource details"
finding.resource_id = "test-resource"

View File

@@ -12,7 +12,9 @@ def open_file(input_file: str, mode: str = "r") -> TextIOWrapper:
try:
f = open(input_file, mode)
except Exception as e:
logger.critical(f"{input_file}: {e.__class__.__name__}")
logger.critical(
f"{input_file}: {e.__class__.__name__}[{e.__traceback__.tb_lineno}]"
)
sys.exit()
else:
return f
@@ -23,7 +25,9 @@ def parse_json_file(input_file: TextIOWrapper) -> Any:
try:
json_file = json.load(input_file)
except Exception as e:
logger.critical(f"{input_file.name}: {e.__class__.__name__}")
logger.critical(
f"{input_file.name}: {e.__class__.__name__}[{e.__traceback__.tb_lineno}]"
)
sys.exit()
else:
return json_file
@@ -34,13 +38,12 @@ def file_exists(filename: str):
try:
exists_filename = exists(filename)
except Exception as e:
logger.critical(f"{exists_filename.name}: {e.__class__.__name__}")
quit()
logger.critical(
f"{exists_filename.name}: {e.__class__.__name__}[{e.__traceback__.tb_lineno}]"
)
sys.exit()
else:
if exists_filename:
return True
else:
return False
return exists_filename
# create sha512 hash for string