mirror of
https://github.com/ghndrx/prowler.git
synced 2026-02-10 14:55:00 +00:00
fix(awslambda_function_no_secrets_in_code): Retrieve Code if set (#1833)
This commit is contained in:
@@ -5,6 +5,7 @@ import sys
|
|||||||
import traceback
|
import traceback
|
||||||
from pkgutil import walk_packages
|
from pkgutil import walk_packages
|
||||||
from types import ModuleType
|
from types import ModuleType
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
from alive_progress import alive_bar
|
from alive_progress import alive_bar
|
||||||
from colorama import Fore, Style
|
from colorama import Fore, Style
|
||||||
@@ -24,7 +25,6 @@ except Exception:
|
|||||||
sys.exit()
|
sys.exit()
|
||||||
|
|
||||||
from prowler.lib.utils.utils import open_file, parse_json_file
|
from prowler.lib.utils.utils import open_file, parse_json_file
|
||||||
from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info
|
|
||||||
from prowler.providers.common.models import Audit_Metadata
|
from prowler.providers.common.models import Audit_Metadata
|
||||||
from prowler.providers.common.outputs import Provider_Output_Options
|
from prowler.providers.common.outputs import Provider_Output_Options
|
||||||
|
|
||||||
@@ -314,13 +314,23 @@ def run_check(check: Check, output_options: Provider_Output_Options) -> list:
|
|||||||
def execute_checks(
|
def execute_checks(
|
||||||
checks_to_execute: list,
|
checks_to_execute: list,
|
||||||
provider: str,
|
provider: str,
|
||||||
audit_info: AWS_Audit_Info,
|
audit_info: Any,
|
||||||
audit_output_options: Provider_Output_Options,
|
audit_output_options: Provider_Output_Options,
|
||||||
) -> list:
|
) -> list:
|
||||||
|
# List to store all the check's findings
|
||||||
all_findings = []
|
all_findings = []
|
||||||
|
# Services and checks executed for the Audit Status
|
||||||
services_executed = set()
|
services_executed = set()
|
||||||
checks_executed = set()
|
checks_executed = set()
|
||||||
total_checks_to_execute = len(checks_to_execute)
|
|
||||||
|
# Initialize the Audit Metadata
|
||||||
|
audit_info.audit_metadata = Audit_Metadata(
|
||||||
|
services_scanned=0,
|
||||||
|
expected_checks=checks_to_execute,
|
||||||
|
completed_checks=0,
|
||||||
|
audit_progress=0,
|
||||||
|
)
|
||||||
|
|
||||||
# Execution with the --only-logs flag
|
# Execution with the --only-logs flag
|
||||||
if audit_output_options.only_logs:
|
if audit_output_options.only_logs:
|
||||||
for check_name in checks_to_execute:
|
for check_name in checks_to_execute:
|
||||||
@@ -335,7 +345,6 @@ def execute_checks(
|
|||||||
audit_info,
|
audit_info,
|
||||||
services_executed,
|
services_executed,
|
||||||
checks_executed,
|
checks_executed,
|
||||||
total_checks_to_execute,
|
|
||||||
)
|
)
|
||||||
all_findings.extend(check_findings)
|
all_findings.extend(check_findings)
|
||||||
|
|
||||||
@@ -378,7 +387,6 @@ def execute_checks(
|
|||||||
audit_info,
|
audit_info,
|
||||||
services_executed,
|
services_executed,
|
||||||
checks_executed,
|
checks_executed,
|
||||||
total_checks_to_execute,
|
|
||||||
)
|
)
|
||||||
all_findings.extend(check_findings)
|
all_findings.extend(check_findings)
|
||||||
bar()
|
bar()
|
||||||
@@ -403,10 +411,9 @@ def execute(
|
|||||||
check_name: str,
|
check_name: str,
|
||||||
provider: str,
|
provider: str,
|
||||||
audit_output_options: Provider_Output_Options,
|
audit_output_options: Provider_Output_Options,
|
||||||
audit_info: AWS_Audit_Info,
|
audit_info: Any,
|
||||||
services_executed: set,
|
services_executed: set,
|
||||||
checks_executed: set,
|
checks_executed: set,
|
||||||
total_checks_to_execute: int,
|
|
||||||
):
|
):
|
||||||
# Import check module
|
# Import check module
|
||||||
check_module_path = (
|
check_module_path = (
|
||||||
@@ -416,16 +423,40 @@ def execute(
|
|||||||
# Recover functions from check
|
# Recover functions from check
|
||||||
check_to_execute = getattr(lib, check_name)
|
check_to_execute = getattr(lib, check_name)
|
||||||
c = check_to_execute()
|
c = check_to_execute()
|
||||||
|
|
||||||
# Run check
|
# Run check
|
||||||
check_findings = run_check(c, audit_output_options)
|
check_findings = run_check(c, audit_output_options)
|
||||||
|
|
||||||
|
# Update Audit Status
|
||||||
services_executed.add(service)
|
services_executed.add(service)
|
||||||
checks_executed.add(check_name)
|
checks_executed.add(check_name)
|
||||||
audit_info.audit_metadata = Audit_Metadata(
|
audit_info.audit_metadata = update_audit_metadata(
|
||||||
services_scanned=len(services_executed),
|
audit_info.audit_metadata, services_executed, checks_executed
|
||||||
expected_checks=total_checks_to_execute,
|
|
||||||
completed_checks=len(checks_executed),
|
|
||||||
audit_progress=100 * len(checks_executed) / total_checks_to_execute,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Report the check's findings
|
||||||
report(check_findings, audit_output_options, audit_info)
|
report(check_findings, audit_output_options, audit_info)
|
||||||
|
|
||||||
return check_findings
|
return check_findings
|
||||||
|
|
||||||
|
|
||||||
|
def update_audit_metadata(
|
||||||
|
audit_metadata: Audit_Metadata, services_executed: set, checks_executed: set
|
||||||
|
) -> Audit_Metadata:
|
||||||
|
"""update_audit_metadata returns the audit_metadata updated with the new status
|
||||||
|
|
||||||
|
Updates the given audit_metadata using the length of the services_executed and checks_executed
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
audit_metadata.services_scanned = len(services_executed)
|
||||||
|
audit_metadata.completed_checks = len(checks_executed)
|
||||||
|
audit_metadata.audit_progress = (
|
||||||
|
100 * len(checks_executed) / len(audit_metadata.expected_checks)
|
||||||
|
)
|
||||||
|
|
||||||
|
return audit_metadata
|
||||||
|
|
||||||
|
except Exception as error:
|
||||||
|
logger.error(
|
||||||
|
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
|
||||||
|
)
|
||||||
|
|||||||
@@ -24,7 +24,15 @@ class Lambda:
|
|||||||
self.regional_clients = generate_regional_clients(self.service, audit_info)
|
self.regional_clients = generate_regional_clients(self.service, audit_info)
|
||||||
self.functions = {}
|
self.functions = {}
|
||||||
self.__threading_call__(self.__list_functions__)
|
self.__threading_call__(self.__list_functions__)
|
||||||
self.__threading_call__(self.__get_function__)
|
|
||||||
|
# We only want to retrieve the Lambda code if the
|
||||||
|
# awslambda_function_no_secrets_in_code check is set
|
||||||
|
if (
|
||||||
|
"awslambda_function_no_secrets_in_code"
|
||||||
|
in audit_info.audit_metadata.expected_checks
|
||||||
|
):
|
||||||
|
self.__threading_call__(self.__get_function__)
|
||||||
|
|
||||||
self.__threading_call__(self.__get_policy__)
|
self.__threading_call__(self.__get_policy__)
|
||||||
self.__threading_call__(self.__get_function_url_config__)
|
self.__threading_call__(self.__get_function_url_config__)
|
||||||
|
|
||||||
|
|||||||
@@ -3,6 +3,8 @@ from pydantic import BaseModel
|
|||||||
|
|
||||||
class Audit_Metadata(BaseModel):
|
class Audit_Metadata(BaseModel):
|
||||||
services_scanned: int
|
services_scanned: int
|
||||||
expected_checks: int
|
# We can't use a set in the expected
|
||||||
|
# checks because the set is unordered
|
||||||
|
expected_checks: list
|
||||||
completed_checks: int
|
completed_checks: int
|
||||||
audit_progress: int
|
audit_progress: int
|
||||||
|
|||||||
@@ -12,6 +12,7 @@ from prowler.lib.check.check import (
|
|||||||
list_services,
|
list_services,
|
||||||
parse_checks_from_file,
|
parse_checks_from_file,
|
||||||
recover_checks_from_provider,
|
recover_checks_from_provider,
|
||||||
|
update_audit_metadata,
|
||||||
)
|
)
|
||||||
from prowler.lib.check.models import load_check_metadata
|
from prowler.lib.check.models import load_check_metadata
|
||||||
|
|
||||||
@@ -317,3 +318,56 @@ class Test_Check:
|
|||||||
# )
|
# )
|
||||||
# == test_case["expected"]
|
# == test_case["expected"]
|
||||||
# )
|
# )
|
||||||
|
|
||||||
|
def test_update_audit_metadata_complete(self):
|
||||||
|
from prowler.providers.common.models import Audit_Metadata
|
||||||
|
|
||||||
|
# Set the expected checks to run
|
||||||
|
expected_checks = ["iam_administrator_access_with_mfa"]
|
||||||
|
services_executed = {"iam"}
|
||||||
|
checks_executed = {"iam_administrator_access_with_mfa"}
|
||||||
|
|
||||||
|
# Set an empty Audit_Metadata
|
||||||
|
audit_metadata = Audit_Metadata(
|
||||||
|
services_scanned=0,
|
||||||
|
expected_checks=expected_checks,
|
||||||
|
completed_checks=0,
|
||||||
|
audit_progress=0,
|
||||||
|
)
|
||||||
|
|
||||||
|
audit_metadata = update_audit_metadata(
|
||||||
|
audit_metadata, services_executed, checks_executed
|
||||||
|
)
|
||||||
|
|
||||||
|
assert audit_metadata.audit_progress == float(100)
|
||||||
|
assert audit_metadata.services_scanned == 1
|
||||||
|
assert audit_metadata.expected_checks == expected_checks
|
||||||
|
assert audit_metadata.completed_checks == 1
|
||||||
|
|
||||||
|
def test_update_audit_metadata_50(self):
|
||||||
|
from prowler.providers.common.models import Audit_Metadata
|
||||||
|
|
||||||
|
# Set the expected checks to run
|
||||||
|
expected_checks = [
|
||||||
|
"iam_administrator_access_with_mfa",
|
||||||
|
"iam_support_role_created",
|
||||||
|
]
|
||||||
|
services_executed = {"iam"}
|
||||||
|
checks_executed = {"iam_administrator_access_with_mfa"}
|
||||||
|
|
||||||
|
# Set an empty Audit_Metadata
|
||||||
|
audit_metadata = Audit_Metadata(
|
||||||
|
services_scanned=0,
|
||||||
|
expected_checks=expected_checks,
|
||||||
|
completed_checks=0,
|
||||||
|
audit_progress=0,
|
||||||
|
)
|
||||||
|
|
||||||
|
audit_metadata = update_audit_metadata(
|
||||||
|
audit_metadata, services_executed, checks_executed
|
||||||
|
)
|
||||||
|
|
||||||
|
assert audit_metadata.audit_progress == float(50)
|
||||||
|
assert audit_metadata.services_scanned == 1
|
||||||
|
assert audit_metadata.expected_checks == expected_checks
|
||||||
|
assert audit_metadata.completed_checks == 1
|
||||||
|
|||||||
@@ -10,9 +10,9 @@ from boto3 import client, resource, session
|
|||||||
from moto import mock_iam, mock_lambda, mock_s3
|
from moto import mock_iam, mock_lambda, mock_s3
|
||||||
from moto.core import DEFAULT_ACCOUNT_ID
|
from moto.core import DEFAULT_ACCOUNT_ID
|
||||||
|
|
||||||
from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info
|
|
||||||
from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info
|
from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info
|
||||||
from prowler.providers.aws.services.awslambda.awslambda_service import AuthType, Lambda
|
from prowler.providers.aws.services.awslambda.awslambda_service import AuthType, Lambda
|
||||||
|
from prowler.providers.common.models import Audit_Metadata
|
||||||
|
|
||||||
# Mock Test Region
|
# Mock Test Region
|
||||||
AWS_REGION = "eu-west-1"
|
AWS_REGION = "eu-west-1"
|
||||||
@@ -75,22 +75,29 @@ class Test_Lambda_Service:
|
|||||||
audited_regions=None,
|
audited_regions=None,
|
||||||
organizations_metadata=None,
|
organizations_metadata=None,
|
||||||
audit_resources=None,
|
audit_resources=None,
|
||||||
|
audit_metadata=Audit_Metadata(
|
||||||
|
services_scanned=0,
|
||||||
|
# We need to set this check to call __list_functions__
|
||||||
|
expected_checks=["awslambda_function_no_secrets_in_code"],
|
||||||
|
completed_checks=0,
|
||||||
|
audit_progress=0,
|
||||||
|
),
|
||||||
)
|
)
|
||||||
return audit_info
|
return audit_info
|
||||||
|
|
||||||
# Test Lambda Client
|
# Test Lambda Client
|
||||||
def test__get_client__(self):
|
def test__get_client__(self):
|
||||||
awslambda = Lambda(current_audit_info)
|
awslambda = Lambda(self.set_mocked_audit_info())
|
||||||
assert awslambda.regional_clients[AWS_REGION].__class__.__name__ == "Lambda"
|
assert awslambda.regional_clients[AWS_REGION].__class__.__name__ == "Lambda"
|
||||||
|
|
||||||
# Test Lambda Session
|
# Test Lambda Session
|
||||||
def test__get_session__(self):
|
def test__get_session__(self):
|
||||||
awslambda = Lambda(current_audit_info)
|
awslambda = Lambda(self.set_mocked_audit_info())
|
||||||
assert awslambda.session.__class__.__name__ == "Session"
|
assert awslambda.session.__class__.__name__ == "Session"
|
||||||
|
|
||||||
# Test Lambda Service
|
# Test Lambda Service
|
||||||
def test__get_service__(self):
|
def test__get_service__(self):
|
||||||
awslambda = Lambda(current_audit_info)
|
awslambda = Lambda(self.set_mocked_audit_info())
|
||||||
assert awslambda.service == "lambda"
|
assert awslambda.service == "lambda"
|
||||||
|
|
||||||
@mock_lambda
|
@mock_lambda
|
||||||
|
|||||||
Reference in New Issue
Block a user