feat(html): add html to Azure and GCP (#2181)

Co-authored-by: Pepe Fagoaga <pepe@verica.io>
This commit is contained in:
Sergio Garcia
2023-04-18 16:13:57 +02:00
committed by GitHub
parent 4536780a19
commit 7b5fe34316
8 changed files with 404 additions and 85 deletions

View File

@@ -58,6 +58,9 @@ def prowler():
severities = args.severity severities = args.severity
compliance_framework = args.compliance compliance_framework = args.compliance
if not args.no_banner:
print_banner(args)
# Set the audit info based on the selected provider # Set the audit info based on the selected provider
audit_info = set_provider_audit_info(provider, args.__dict__) audit_info = set_provider_audit_info(provider, args.__dict__)
@@ -72,9 +75,6 @@ def prowler():
# Set Logger configuration # Set Logger configuration
set_logging_config(args.log_level, args.log_file, args.only_logs) set_logging_config(args.log_level, args.log_file, args.only_logs)
if not args.no_banner:
print_banner(args)
if args.list_services: if args.list_services:
print_services(list_services(provider)) print_services(list_services(provider))
sys.exit() sys.exit()

View File

@@ -100,6 +100,13 @@ def fill_file_descriptors(output_modes, output_directory, output_filename, audit
) )
file_descriptors.update({output_mode: file_descriptor}) file_descriptors.update({output_mode: file_descriptor})
elif output_mode == "html":
filename = f"{output_directory}/{output_filename}{html_file_suffix}"
file_descriptor = initialize_file_descriptor(
filename, output_mode, audit_info
)
file_descriptors.update({output_mode: file_descriptor})
elif isinstance(audit_info, AWS_Audit_Info): elif isinstance(audit_info, AWS_Audit_Info):
if output_mode == "json-asff": if output_mode == "json-asff":
filename = f"{output_directory}/{output_filename}{json_asff_file_suffix}" filename = f"{output_directory}/{output_filename}{json_asff_file_suffix}"
@@ -108,15 +115,6 @@ def fill_file_descriptors(output_modes, output_directory, output_filename, audit
) )
file_descriptors.update({output_mode: file_descriptor}) file_descriptors.update({output_mode: file_descriptor})
elif output_mode == "html":
filename = (
f"{output_directory}/{output_filename}{html_file_suffix}"
)
file_descriptor = initialize_file_descriptor(
filename, output_mode, audit_info
)
file_descriptors.update({output_mode: file_descriptor})
elif output_mode == "ens_rd2022_aws": elif output_mode == "ens_rd2022_aws":
filename = f"{output_directory}/{output_filename}_ens_rd2022_aws{csv_file_suffix}" filename = f"{output_directory}/{output_filename}_ens_rd2022_aws{csv_file_suffix}"
file_descriptor = initialize_file_descriptor( file_descriptor = initialize_file_descriptor(

View File

@@ -1,3 +1,4 @@
import importlib
import sys import sys
from os import path from os import path
@@ -8,6 +9,7 @@ from prowler.config.config import (
prowler_version, prowler_version,
timestamp, timestamp,
) )
from prowler.lib.check.models import Check_Report_AWS, Check_Report_GCP
from prowler.lib.logger import logger from prowler.lib.logger import logger
from prowler.lib.outputs.models import ( from prowler.lib.outputs.models import (
get_check_compliance, get_check_compliance,
@@ -16,18 +18,13 @@ from prowler.lib.outputs.models import (
unroll_tags, unroll_tags,
) )
from prowler.lib.utils.utils import open_file from prowler.lib.utils.utils import open_file
from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info
from prowler.providers.azure.lib.audit_info.models import Azure_Audit_Info
from prowler.providers.gcp.lib.audit_info.models import GCP_Audit_Info
def add_html_header(file_descriptor, audit_info): def add_html_header(file_descriptor, audit_info):
try: try:
if not audit_info.profile:
audit_info.profile = "ENV"
if isinstance(audit_info.audited_regions, list):
audited_regions = " ".join(audit_info.audited_regions)
elif not audit_info.audited_regions:
audited_regions = "All Regions"
else:
audited_regions = audit_info.audited_regions
file_descriptor.write( file_descriptor.write(
""" """
<!DOCTYPE html> <!DOCTYPE html>
@@ -114,51 +111,9 @@ def add_html_header(file_descriptor, audit_info):
</li> </li>
</ul> </ul>
</div> </div>
</div> </div> """
<div class="col-md-2"> + get_assessment_summary(audit_info)
<div class="card">
<div class="card-header">
AWS Assessment Summary
</div>
<ul class="list-group list-group-flush">
<li class="list-group-item">
<b>AWS Account:</b> """
+ audit_info.audited_account
+ """ + """
</li>
<li class="list-group-item">
<b>AWS-CLI Profile:</b> """
+ audit_info.profile
+ """
</li>
<li class="list-group-item">
<b>Audited Regions:</b> """
+ audited_regions
+ """
</li>
</ul>
</div>
</div>
<div class="col-md-4">
<div class="card">
<div class="card-header">
AWS Credentials
</div>
<ul class="list-group list-group-flush">
<li class="list-group-item">
<b>User Id:</b> """
+ audit_info.audited_user_id
+ """
</li>
<li class="list-group-item">
<b>Caller Identity ARN:</b>
"""
+ audit_info.audited_identity_arn
+ """
</li>
</ul>
</div>
</div>
<div class="col-md-2"> <div class="col-md-2">
<div class="card"> <div class="card">
<div class="card-header"> <div class="card-header">
@@ -205,9 +160,10 @@ def add_html_header(file_descriptor, audit_info):
""" """
) )
except Exception as error: except Exception as error:
logger.error( logger.critical(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}] -- {error}"
) )
sys.exit(1)
def fill_html(file_descriptor, finding, output_options): def fill_html(file_descriptor, finding, output_options):
@@ -225,7 +181,7 @@ def fill_html(file_descriptor, finding, output_options):
<td>{finding.status}</td> <td>{finding.status}</td>
<td>{finding.check_metadata.Severity}</td> <td>{finding.check_metadata.Severity}</td>
<td>{finding.check_metadata.ServiceName}</td> <td>{finding.check_metadata.ServiceName}</td>
<td>{finding.region}</td> <td>{finding.location if isinstance(finding, Check_Report_GCP) else finding.region if isinstance(finding, Check_Report_AWS) else ""}</td>
<td>{finding.check_metadata.CheckID.replace("_", "<wbr>_")}</td> <td>{finding.check_metadata.CheckID.replace("_", "<wbr>_")}</td>
<td>{finding.check_metadata.CheckTitle}</td> <td>{finding.check_metadata.CheckTitle}</td>
<td>{finding.resource_id.replace("<", "&lt;").replace(">", "&gt;").replace("_", "<wbr>_")}</td> <td>{finding.resource_id.replace("<", "&lt;").replace(">", "&gt;").replace("_", "<wbr>_")}</td>
@@ -377,3 +333,207 @@ def add_html_footer(output_filename, output_directory):
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}] -- {error}" f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}] -- {error}"
) )
sys.exit(1) sys.exit(1)
def get_aws_html_assessment_summary(audit_info):
try:
if isinstance(audit_info, AWS_Audit_Info):
if not audit_info.profile:
audit_info.profile = "ENV"
if isinstance(audit_info.audited_regions, list):
audited_regions = " ".join(audit_info.audited_regions)
elif not audit_info.audited_regions:
audited_regions = "All Regions"
else:
audited_regions = audit_info.audited_regions
return (
"""
<div class="col-md-2">
<div class="card">
<div class="card-header">
AWS Assessment Summary
</div>
<ul class="list-group list-group-flush">
<li class="list-group-item">
<b>AWS Account:</b> """
+ audit_info.audited_account
+ """
</li>
<li class="list-group-item">
<b>AWS-CLI Profile:</b> """
+ audit_info.profile
+ """
</li>
<li class="list-group-item">
<b>Audited Regions:</b> """
+ audited_regions
+ """
</li>
</ul>
</div>
</div>
<div class="col-md-4">
<div class="card">
<div class="card-header">
AWS Credentials
</div>
<ul class="list-group list-group-flush">
<li class="list-group-item">
<b>User Id:</b> """
+ audit_info.audited_user_id
+ """
</li>
<li class="list-group-item">
<b>Caller Identity ARN:</b> """
+ audit_info.audited_identity_arn
+ """
</li>
</ul>
</div>
</div>
"""
)
except Exception as error:
logger.critical(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}] -- {error}"
)
sys.exit(1)
def get_azure_html_assessment_summary(audit_info):
try:
if isinstance(audit_info, Azure_Audit_Info):
printed_subscriptions = []
for key, value in audit_info.identity.subscriptions.items():
intermediate = key + " : " + value
printed_subscriptions.append(intermediate)
return (
"""
<div class="col-md-2">
<div class="card">
<div class="card-header">
Azure Assessment Summary
</div>
<ul class="list-group list-group-flush">
<li class="list-group-item">
<b>Azure Tenant IDs:</b> """
+ " ".join(audit_info.identity.tenant_ids)
+ """
</li>
<li class="list-group-item">
<b>Azure Tenant Domain:</b> """
+ audit_info.identity.domain
+ """
</li>
<li class="list-group-item">
<b>Azure Subscriptions:</b> """
+ " ".join(printed_subscriptions)
+ """
</li>
</ul>
</div>
</div>
<div class="col-md-4">
<div class="card">
<div class="card-header">
Azure Credentials
</div>
<ul class="list-group list-group-flush">
<li class="list-group-item">
<b>Azure Identity Type:</b> """
+ audit_info.identity.identity_type
+ """
</li>
<li class="list-group-item">
<b>Azure Identity ID:</b> """
+ audit_info.identity.identity_id
+ """
</li>
</ul>
</div>
</div>
"""
)
except Exception as error:
logger.critical(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}] -- {error}"
)
sys.exit(1)
def get_gcp_html_assessment_summary(audit_info):
try:
if isinstance(audit_info, GCP_Audit_Info):
try:
getattr(audit_info.credentials, "_service_account_email")
profile = (
audit_info.credentials._service_account_email
if audit_info.credentials._service_account_email is not None
else "default"
)
except AttributeError:
profile = "default"
return (
"""
<div class="col-md-2">
<div class="card">
<div class="card-header">
GCP Assessment Summary
</div>
<ul class="list-group list-group-flush">
<li class="list-group-item">
<b>GCP Project ID:</b> """
+ audit_info.project_id
+ """
</li>
</ul>
</div>
</div>
<div class="col-md-4">
<div class="card">
<div class="card-header">
GCP Credentials
</div>
<ul class="list-group list-group-flush">
<li class="list-group-item">
<b>GCP Account:</b> """
+ profile
+ """
</li>
</ul>
</div>
</div>
"""
)
except Exception as error:
logger.critical(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}] -- {error}"
)
sys.exit(1)
def get_assessment_summary(audit_info):
"""
get_assessment_summary gets the HTML assessment summary for the provider
"""
try:
# This is based in the Provider_Audit_Info class
# It is not pretty but useful
# AWS_Audit_Info --> aws
# GCP_Audit_Info --> gcp
# Azure_Audit_Info --> azure
provider = audit_info.__class__.__name__.split("_")[0].lower()
# Dynamically get the Provider quick inventory handler
provider_html_assessment_summary_function = (
f"get_{provider}_html_assessment_summary"
)
return getattr(
importlib.import_module(__name__), provider_html_assessment_summary_function
)(audit_info)
except Exception as error:
logger.critical(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
sys.exit(1)

View File

@@ -104,12 +104,6 @@ def report(check_findings, output_options, audit_info):
file_descriptors, file_descriptors,
) )
if "html" in file_descriptors:
fill_html(
file_descriptors["html"], finding, output_options
)
file_descriptors["html"].write("")
if "json-asff" in file_descriptors: if "json-asff" in file_descriptors:
finding_output = Check_Output_JSON_ASFF() finding_output = Check_Output_JSON_ASFF()
fill_json_asff( fill_json_asff(
@@ -137,6 +131,10 @@ def report(check_findings, output_options, audit_info):
) )
# Common outputs # Common outputs
if "html" in file_descriptors:
fill_html(file_descriptors["html"], finding, output_options)
file_descriptors["html"].write("")
if "csv" in file_descriptors: if "csv" in file_descriptors:
csv_writer, finding_output = generate_provider_output_csv( csv_writer, finding_output = generate_provider_output_csv(
finding.check_metadata.Provider, finding.check_metadata.Provider,

View File

@@ -96,7 +96,7 @@ This report is being generated using the identity below:
Azure Tenant IDs: {Fore.YELLOW}[{" ".join(audit_info.identity.tenant_ids)}]{Style.RESET_ALL} Azure Tenant Domain: {Fore.YELLOW}[{audit_info.identity.domain}]{Style.RESET_ALL} Azure Tenant IDs: {Fore.YELLOW}[{" ".join(audit_info.identity.tenant_ids)}]{Style.RESET_ALL} Azure Tenant Domain: {Fore.YELLOW}[{audit_info.identity.domain}]{Style.RESET_ALL}
Azure Subscriptions: {Fore.YELLOW}{printed_subscriptions}{Style.RESET_ALL} Azure Subscriptions: {Fore.YELLOW}{printed_subscriptions}{Style.RESET_ALL}
Azure Identity type: {Fore.YELLOW}[{audit_info.identity.identity_type}]{Style.RESET_ALL} Azure Identity ID: {Fore.YELLOW}[{audit_info.identity.identity_id}]{Style.RESET_ALL} Azure Identity Type: {Fore.YELLOW}[{audit_info.identity.identity_type}]{Style.RESET_ALL} Azure Identity ID: {Fore.YELLOW}[{audit_info.identity.identity_id}]{Style.RESET_ALL}
""" """
print(report) print(report)

View File

@@ -75,10 +75,6 @@ class Azure_Output_Options(Provider_Output_Options):
else: else:
self.output_filename = arguments.output_filename self.output_filename = arguments.output_filename
# Remove HTML Output since it is not supported yet
if "html" in arguments.output_modes:
arguments.output_modes.remove("html")
class Gcp_Output_Options(Provider_Output_Options): class Gcp_Output_Options(Provider_Output_Options):
def __init__(self, arguments, audit_info, allowlist_file, bulk_checks_metadata): def __init__(self, arguments, audit_info, allowlist_file, bulk_checks_metadata):
@@ -96,10 +92,6 @@ class Gcp_Output_Options(Provider_Output_Options):
else: else:
self.output_filename = arguments.output_filename self.output_filename = arguments.output_filename
# Remove HTML Output since it is not supported yet
if "html" in arguments.output_modes:
arguments.output_modes.remove("html")
class Aws_Output_Options(Provider_Output_Options): class Aws_Output_Options(Provider_Output_Options):
security_hub_enabled: bool security_hub_enabled: bool

View File

@@ -7,7 +7,7 @@ from prowler.providers.aws.lib.quick_inventory.quick_inventory import quick_inve
def run_provider_quick_inventory(provider, audit_info, output_directory): def run_provider_quick_inventory(provider, audit_info, output_directory):
""" """
run_provider_quick_inventory executes the quick inventory for te provider run_provider_quick_inventory executes the quick inventory for the provider
""" """
try: try:
# Dynamically get the Provider quick inventory handler # Dynamically get the Provider quick inventory handler

View File

@@ -3,6 +3,7 @@ from argparse import Namespace
from boto3 import session from boto3 import session
from mock import patch from mock import patch
from prowler.lib.outputs.html import get_assessment_summary
from prowler.providers.aws.lib.audit_info.audit_info import AWS_Audit_Info from prowler.providers.aws.lib.audit_info.audit_info import AWS_Audit_Info
from prowler.providers.azure.lib.audit_info.audit_info import ( from prowler.providers.azure.lib.audit_info.audit_info import (
Azure_Audit_Info, Azure_Audit_Info,
@@ -11,8 +12,10 @@ from prowler.providers.azure.lib.audit_info.audit_info import (
from prowler.providers.common.outputs import ( from prowler.providers.common.outputs import (
Aws_Output_Options, Aws_Output_Options,
Azure_Output_Options, Azure_Output_Options,
Gcp_Output_Options,
set_provider_output_options, set_provider_output_options,
) )
from prowler.providers.gcp.lib.audit_info.models import GCP_Audit_Info
AWS_ACCOUNT_NUMBER = "012345678912" AWS_ACCOUNT_NUMBER = "012345678912"
DATETIME = "20230101120000" DATETIME = "20230101120000"
@@ -38,6 +41,16 @@ class Test_Common_Output_Options:
) )
return audit_info return audit_info
# Mocked GCP Audit Info
def set_mocked_gcp_audit_info(self):
audit_info = GCP_Audit_Info(
credentials=None,
project_id="test-project",
audit_resources=None,
audit_metadata=None,
)
return audit_info
# Mocked AWS Audit Info # Mocked AWS Audit Info
def set_mocked_aws_audit_info(self): def set_mocked_aws_audit_info(self):
audit_info = AWS_Audit_Info( audit_info = AWS_Audit_Info(
@@ -48,9 +61,9 @@ class Test_Common_Output_Options:
botocore_session=None, botocore_session=None,
), ),
audited_account=AWS_ACCOUNT_NUMBER, audited_account=AWS_ACCOUNT_NUMBER,
audited_user_id=None, audited_user_id="test-user",
audited_partition="aws", audited_partition="aws",
audited_identity_arn=None, audited_identity_arn="test-user-arn",
profile=None, profile=None,
profile_region=None, profile_region=None,
credentials=None, credentials=None,
@@ -91,6 +104,33 @@ class Test_Common_Output_Options:
assert output_options.verbose assert output_options.verbose
assert output_options.output_filename == arguments.output_filename assert output_options.output_filename == arguments.output_filename
def test_set_provider_output_options_gcp(self):
# Set the cloud provider
provider = "gcp"
# Set the arguments passed
arguments = Namespace()
arguments.quiet = True
arguments.output_modes = ["html", "csv", "json"]
arguments.output_directory = "output_test_directory"
arguments.verbose = True
arguments.output_filename = "output_test_filename"
arguments.only_logs = False
audit_info = self.set_mocked_gcp_audit_info()
allowlist_file = ""
bulk_checks_metadata = {}
output_options = set_provider_output_options(
provider, arguments, audit_info, allowlist_file, bulk_checks_metadata
)
assert isinstance(output_options, Gcp_Output_Options)
assert output_options.is_quiet
assert output_options.output_modes == ["html", "csv", "json"]
assert output_options.output_directory == arguments.output_directory
assert output_options.allowlist_file == ""
assert output_options.bulk_checks_metadata == {}
assert output_options.verbose
assert output_options.output_filename == arguments.output_filename
def test_set_provider_output_options_aws_no_output_filename(self): def test_set_provider_output_options_aws_no_output_filename(self):
# Set the cloud provider # Set the cloud provider
provider = "aws" provider = "aws"
@@ -148,6 +188,7 @@ class Test_Common_Output_Options:
assert isinstance(output_options, Azure_Output_Options) assert isinstance(output_options, Azure_Output_Options)
assert output_options.is_quiet assert output_options.is_quiet
assert output_options.output_modes == [ assert output_options.output_modes == [
"html",
"csv", "csv",
"json", "json",
] ]
@@ -184,6 +225,7 @@ class Test_Common_Output_Options:
assert isinstance(output_options, Azure_Output_Options) assert isinstance(output_options, Azure_Output_Options)
assert output_options.is_quiet assert output_options.is_quiet
assert output_options.output_modes == [ assert output_options.output_modes == [
"html",
"csv", "csv",
"json", "json",
] ]
@@ -195,3 +237,132 @@ class Test_Common_Output_Options:
output_options.output_filename output_options.output_filename
== f"prowler-output-{'-'.join(tenants)}-{DATETIME}" == f"prowler-output-{'-'.join(tenants)}-{DATETIME}"
) )
def test_azure_get_assessment_summary(self):
# Mock Azure Audit Info
audit_info = self.set_mocked_azure_audit_info()
tenants = ["tenant-1", "tenant-2"]
audit_info.identity.tenant_ids = tenants
audit_info.identity.subscriptions = {
"Azure subscription 1": "12345-qwerty",
"Subscription2": "12345-qwerty",
}
printed_subscriptions = []
for key, value in audit_info.identity.subscriptions.items():
intermediate = key + " : " + value
printed_subscriptions.append(intermediate)
assert (
get_assessment_summary(audit_info)
== f"""
<div class="col-md-2">
<div class="card">
<div class="card-header">
Azure Assessment Summary
</div>
<ul class="list-group list-group-flush">
<li class="list-group-item">
<b>Azure Tenant IDs:</b> {" ".join(audit_info.identity.tenant_ids)}
</li>
<li class="list-group-item">
<b>Azure Tenant Domain:</b> {audit_info.identity.domain}
</li>
<li class="list-group-item">
<b>Azure Subscriptions:</b> {" ".join(printed_subscriptions)}
</li>
</ul>
</div>
</div>
<div class="col-md-4">
<div class="card">
<div class="card-header">
Azure Credentials
</div>
<ul class="list-group list-group-flush">
<li class="list-group-item">
<b>Azure Identity Type:</b> {audit_info.identity.identity_type}
</li>
<li class="list-group-item">
<b>Azure Identity ID:</b> {audit_info.identity.identity_id}
</li>
</ul>
</div>
</div>
"""
)
def test_aws_get_assessment_summary(self):
# Mock AWS Audit Info
audit_info = self.set_mocked_aws_audit_info()
assert (
get_assessment_summary(audit_info)
== f"""
<div class="col-md-2">
<div class="card">
<div class="card-header">
AWS Assessment Summary
</div>
<ul class="list-group list-group-flush">
<li class="list-group-item">
<b>AWS Account:</b> {audit_info.audited_account}
</li>
<li class="list-group-item">
<b>AWS-CLI Profile:</b> {audit_info.profile}
</li>
<li class="list-group-item">
<b>Audited Regions:</b> All Regions
</li>
</ul>
</div>
</div>
<div class="col-md-4">
<div class="card">
<div class="card-header">
AWS Credentials
</div>
<ul class="list-group list-group-flush">
<li class="list-group-item">
<b>User Id:</b> {audit_info.audited_user_id}
</li>
<li class="list-group-item">
<b>Caller Identity ARN:</b> {audit_info.audited_identity_arn}
</li>
</ul>
</div>
</div>
"""
)
def test_gcp_get_assessment_summary(self):
# Mock Azure Audit Info
audit_info = self.set_mocked_gcp_audit_info()
profile = "default"
assert (
get_assessment_summary(audit_info)
== f"""
<div class="col-md-2">
<div class="card">
<div class="card-header">
GCP Assessment Summary
</div>
<ul class="list-group list-group-flush">
<li class="list-group-item">
<b>GCP Project ID:</b> {audit_info.project_id}
</li>
</ul>
</div>
</div>
<div class="col-md-4">
<div class="card">
<div class="card-header">
GCP Credentials
</div>
<ul class="list-group list-group-flush">
<li class="list-group-item">
<b>GCP Account:</b> {profile}
</li>
</ul>
</div>
</div>
"""
)