chore(aws): Improve tests and status from accessanalyzer to cloudwatch (#2711)

This commit is contained in:
Pepe Fagoaga
2023-08-11 11:04:04 +02:00
committed by GitHub
parent 3fafac75ef
commit 0313dba7b4
78 changed files with 725 additions and 287 deletions

View File

@@ -25,6 +25,7 @@ class accessanalyzer_enabled(Check):
f"IAM Access Analyzer in account {analyzer.name} is not enabled."
)
report.resource_id = analyzer.name
report.resource_arn = analyzer.arn
else:
report.status = "FAIL"
report.status_extended = (

View File

@@ -12,9 +12,7 @@ class accessanalyzer_enabled_without_findings(Check):
report.region = analyzer.region
if analyzer.status == "ACTIVE":
report.status = "PASS"
report.status_extended = (
f"IAM Access Analyzer {analyzer.name} does not have active findings"
)
report.status_extended = f"IAM Access Analyzer {analyzer.name} does not have active findings."
report.resource_id = analyzer.name
report.resource_arn = analyzer.arn
report.resource_tags = analyzer.tags
@@ -26,20 +24,21 @@ class accessanalyzer_enabled_without_findings(Check):
if active_finding_counter > 0:
report.status = "FAIL"
report.status_extended = f"IAM Access Analyzer {analyzer.name} has {active_finding_counter} active findings"
report.status_extended = f"IAM Access Analyzer {analyzer.name} has {active_finding_counter} active findings."
report.resource_id = analyzer.name
report.resource_arn = analyzer.arn
report.resource_tags = analyzer.tags
elif analyzer.status == "NOT_AVAILABLE":
report.status = "FAIL"
report.status_extended = (
f"IAM Access Analyzer in account {analyzer.name} is not enabled"
f"IAM Access Analyzer in account {analyzer.name} is not enabled."
)
report.resource_id = analyzer.name
report.resource_arn = analyzer.arn
else:
report.status = "FAIL"
report.status_extended = (
f"IAM Access Analyzer {analyzer.name} is not active"
f"IAM Access Analyzer {analyzer.name} is not active."
)
report.resource_id = analyzer.name
report.resource_arn = analyzer.arn

View File

@@ -43,7 +43,7 @@ class AccessAnalyzer(AWSService):
if analyzer_count == 0:
self.analyzers.append(
Analyzer(
arn="",
arn=self.audited_account_arn,
name=self.audited_account,
status="NOT_AVAILABLE",
tags=[],

View File

@@ -15,11 +15,13 @@ class apigatewayv2_access_logging_enabled(Check):
report.status = "PASS"
report.status_extended = f"API Gateway V2 {api.name} ID {api.id} in stage {stage.name} has access logging enabled."
report.resource_id = api.name
report.resource_arn = api.arn
report.resource_tags = api.tags
else:
report.status = "FAIL"
report.status_extended = f"API Gateway V2 {api.name} ID {api.id} in stage {stage.name} has access logging disabled."
report.resource_id = api.name
report.resource_arn = api.arn
report.resource_tags = api.tags
findings.append(report)

View File

@@ -19,12 +19,12 @@ class appstream_fleet_default_internet_access_disabled(Check):
if fleet.enable_default_internet_access:
report.status = "FAIL"
report.status_extended = (
f"Fleet {fleet.name} has default internet access enabled"
f"Fleet {fleet.name} has default internet access enabled."
)
else:
report.status = "PASS"
report.status_extended = (
f"Fleet {fleet.name} has default internet access disabled"
f"Fleet {fleet.name} has default internet access disabled."
)
findings.append(report)

View File

@@ -23,10 +23,10 @@ class appstream_fleet_maximum_session_duration(Check):
if fleet.max_user_duration_in_seconds < max_session_duration_seconds:
report.status = "PASS"
report.status_extended = f"Fleet {fleet.name} has the maximum session duration configured for less that 10 hours"
report.status_extended = f"Fleet {fleet.name} has the maximum session duration configured for less that 10 hours."
else:
report.status = "FAIL"
report.status_extended = f"Fleet {fleet.name} has the maximum session duration configured for more that 10 hours"
report.status_extended = f"Fleet {fleet.name} has the maximum session duration configured for more that 10 hours."
findings.append(report)

View File

@@ -23,11 +23,11 @@ class appstream_fleet_session_disconnect_timeout(Check):
if fleet.disconnect_timeout_in_seconds <= max_disconnect_timeout_in_seconds:
report.status = "PASS"
report.status_extended = f"Fleet {fleet.name} has the session disconnect timeout set to less than 5 minutes"
report.status_extended = f"Fleet {fleet.name} has the session disconnect timeout set to less than 5 minutes."
else:
report.status = "FAIL"
report.status_extended = f"Fleet {fleet.name} has the session disconnect timeout set to more than 5 minutes"
report.status_extended = f"Fleet {fleet.name} has the session disconnect timeout set to more than 5 minutes."
findings.append(report)

View File

@@ -27,11 +27,11 @@ class appstream_fleet_session_idle_disconnect_timeout(Check):
<= max_idle_disconnect_timeout_in_seconds
):
report.status = "PASS"
report.status_extended = f"Fleet {fleet.name} has the session idle disconnect timeout set to less than 10 minutes"
report.status_extended = f"Fleet {fleet.name} has the session idle disconnect timeout set to less than 10 minutes."
else:
report.status = "FAIL"
report.status_extended = f"Fleet {fleet.name} has the session idle disconnect timeout set to more than 10 minutes"
report.status_extended = f"Fleet {fleet.name} has the session idle disconnect timeout set to more than 10 minutes."
findings.append(report)

View File

@@ -17,7 +17,7 @@ class awslambda_function_invoke_api_operations_cloudtrail_logging_enabled(Check)
report.status = "FAIL"
report.status_extended = (
f"Lambda function {function.name} is not recorded by CloudTrail"
f"Lambda function {function.name} is not recorded by CloudTrail."
)
lambda_recorded_cloudtrail = False
for trail in cloudtrail_client.trails:
@@ -46,7 +46,7 @@ class awslambda_function_invoke_api_operations_cloudtrail_logging_enabled(Check)
break
if lambda_recorded_cloudtrail:
report.status = "PASS"
report.status_extended = f"Lambda function {function.name} is recorded by CloudTrail trail {trail.name}"
report.status_extended = f"Lambda function {function.name} is recorded by CloudTrail trail {trail.name}."
break
findings.append(report)

View File

@@ -21,7 +21,7 @@ class awslambda_function_no_secrets_in_code(Check):
report.status = "PASS"
report.status_extended = (
f"No secrets found in Lambda function {function.name} code"
f"No secrets found in Lambda function {function.name} code."
)
with tempfile.TemporaryDirectory() as tmp_dir_name:
function.code.code_zip.extractall(tmp_dir_name)
@@ -57,9 +57,9 @@ class awslambda_function_no_secrets_in_code(Check):
report.status = "FAIL"
# report.status_extended = f"Potential {'secrets' if len(secrets_findings)>1 else 'secret'} found in Lambda function {function.name} code. {final_output_string}"
if len(secrets_findings) > 1:
report.status_extended = f"Potential secrets found in Lambda function {function.name} code -> {final_output_string}"
report.status_extended = f"Potential secrets found in Lambda function {function.name} code -> {final_output_string}."
else:
report.status_extended = f"Potential secret found in Lambda function {function.name} code -> {final_output_string}"
report.status_extended = f"Potential secret found in Lambda function {function.name} code -> {final_output_string}."
# break // Don't break as there may be additional findings
findings.append(report)

View File

@@ -21,7 +21,7 @@ class awslambda_function_no_secrets_in_variables(Check):
report.status = "PASS"
report.status_extended = (
f"No secrets found in Lambda function {function.name} variables"
f"No secrets found in Lambda function {function.name} variables."
)
if function.environment:
@@ -47,7 +47,7 @@ class awslambda_function_no_secrets_in_variables(Check):
]
)
report.status = "FAIL"
report.status_extended = f"Potential secret found in Lambda function {function.name} variables -> {secrets_string}"
report.status_extended = f"Potential secret found in Lambda function {function.name} variables -> {secrets_string}."
os.remove(temp_env_data_file.name)

View File

@@ -13,7 +13,7 @@ class awslambda_function_not_publicly_accessible(Check):
report.resource_tags = function.tags
report.status = "PASS"
report.status_extended = f"Lambda function {function.name} has a policy resource-based policy not public"
report.status_extended = f"Lambda function {function.name} has a policy resource-based policy not public."
public_access = False
if function.policy:
@@ -36,7 +36,7 @@ class awslambda_function_not_publicly_accessible(Check):
if public_access:
report.status = "FAIL"
report.status_extended = f"Lambda function {function.name} has a policy resource-based policy with public access"
report.status_extended = f"Lambda function {function.name} has a policy resource-based policy with public access."
findings.append(report)

View File

@@ -17,10 +17,10 @@ class awslambda_function_using_supported_runtimes(Check):
"obsolete_lambda_runtimes", []
):
report.status = "FAIL"
report.status_extended = f"Lambda function {function.name} is using {function.runtime} which is obsolete"
report.status_extended = f"Lambda function {function.name} is using {function.runtime} which is obsolete."
else:
report.status = "PASS"
report.status_extended = f"Lambda function {function.name} is using {function.runtime} which is supported"
report.status_extended = f"Lambda function {function.name} is using {function.runtime} which is supported."
findings.append(report)

View File

@@ -7,15 +7,13 @@ class backup_plans_exist(Check):
findings = []
report = Check_Report_AWS(self.metadata())
report.status = "FAIL"
report.status_extended = "No Backup Plan Exist"
report.status_extended = "No Backup Plan exist."
report.resource_arn = backup_client.audited_account_arn
report.resource_id = backup_client.audited_account
report.region = backup_client.region
if backup_client.backup_plans:
report.status = "PASS"
report.status_extended = (
f"At least one backup plan exists: {backup_client.backup_plans[0].name}"
)
report.status_extended = f"At least one backup plan exists: {backup_client.backup_plans[0].name}."
report.resource_arn = backup_client.backup_plans[0].arn
report.resource_id = backup_client.backup_plans[0].name
report.region = backup_client.backup_plans[0].region

View File

@@ -9,13 +9,13 @@ class backup_reportplans_exist(Check):
if backup_client.backup_plans:
report = Check_Report_AWS(self.metadata())
report.status = "FAIL"
report.status_extended = "No Backup Report Plan Exist"
report.status_extended = "No Backup Report Plan exist."
report.resource_arn = backup_client.audited_account_arn
report.resource_id = backup_client.audited_account
report.region = backup_client.region
if backup_client.backup_report_plans:
report.status = "PASS"
report.status_extended = f"At least one backup report plan exists: { backup_client.backup_report_plans[0].name}"
report.status_extended = f"At least one backup report plan exists: {backup_client.backup_report_plans[0].name}."
report.resource_arn = backup_client.backup_report_plans[0].arn
report.resource_id = backup_client.backup_report_plans[0].name
report.region = backup_client.backup_report_plans[0].region

View File

@@ -11,7 +11,7 @@ class backup_vaults_encrypted(Check):
report = Check_Report_AWS(self.metadata())
report.status = "FAIL"
report.status_extended = (
f"Backup Vault {backup_vault.name} is not encrypted"
f"Backup Vault {backup_vault.name} is not encrypted."
)
report.resource_arn = backup_vault.arn
report.resource_id = backup_vault.name
@@ -20,7 +20,7 @@ class backup_vaults_encrypted(Check):
if backup_vault.encryption:
report.status = "PASS"
report.status_extended = (
f"Backup Vault {backup_vault.name} is encrypted"
f"Backup Vault {backup_vault.name} is encrypted."
)
# then we store the finding
findings.append(report)

View File

@@ -7,13 +7,13 @@ class backup_vaults_exist(Check):
findings = []
report = Check_Report_AWS(self.metadata())
report.status = "FAIL"
report.status_extended = "No Backup Vault Exist"
report.status_extended = "No Backup Vault exist."
report.resource_arn = backup_client.audited_account_arn
report.resource_id = backup_client.audited_account
report.region = backup_client.region
if backup_client.backup_vaults:
report.status = "PASS"
report.status_extended = f"At least one backup vault exists: { backup_client.backup_vaults[0].name}"
report.status_extended = f"At least one backup vault exists: {backup_client.backup_vaults[0].name}."
report.resource_arn = backup_client.backup_vaults[0].arn
report.resource_id = backup_client.backup_vaults[0].name
report.region = backup_client.backup_vaults[0].region

View File

@@ -18,10 +18,10 @@ class cloudfront_distributions_field_level_encryption_enabled(Check):
and distribution.default_cache_config.field_level_encryption_id
):
report.status = "PASS"
report.status_extended = f"CloudFront Distribution {distribution.id} has Field Level Encryption enabled"
report.status_extended = f"CloudFront Distribution {distribution.id} has Field Level Encryption enabled."
else:
report.status = "FAIL"
report.status_extended = f"CloudFront Distribution {distribution.id} has Field Level Encryption disabled"
report.status_extended = f"CloudFront Distribution {distribution.id} has Field Level Encryption disabled."
findings.append(report)

View File

@@ -18,10 +18,10 @@ class cloudfront_distributions_geo_restrictions_enabled(Check):
report.resource_tags = distribution.tags
if distribution.geo_restriction_type == GeoRestrictionType.none:
report.status = "FAIL"
report.status_extended = f"CloudFront Distribution {distribution.id} has Geo restrictions disabled"
report.status_extended = f"CloudFront Distribution {distribution.id} has Geo restrictions disabled."
else:
report.status = "PASS"
report.status_extended = f"CloudFront Distribution {distribution.id} has Geo restrictions enabled"
report.status_extended = f"CloudFront Distribution {distribution.id} has Geo restrictions enabled."
findings.append(report)

View File

@@ -24,7 +24,7 @@ class cloudfront_distributions_https_enabled(Check):
):
report.status = "PASS"
report.status_extended = (
f"CloudFront Distribution {distribution.id} has redirect to HTTPS"
f"CloudFront Distribution {distribution.id} has redirect to HTTPS."
)
elif (
distribution.default_cache_config
@@ -33,11 +33,11 @@ class cloudfront_distributions_https_enabled(Check):
):
report.status = "PASS"
report.status_extended = (
f"CloudFront Distribution {distribution.id} has HTTPS only"
f"CloudFront Distribution {distribution.id} has HTTPS only."
)
else:
report.status = "FAIL"
report.status_extended = f"CloudFront Distribution {distribution.id} viewers can use HTTP or HTTPS"
report.status_extended = f"CloudFront Distribution {distribution.id} viewers can use HTTP or HTTPS."
findings.append(report)

View File

@@ -19,12 +19,12 @@ class cloudfront_distributions_logging_enabled(Check):
):
report.status = "PASS"
report.status_extended = (
f"CloudFront Distribution {distribution.id} has logging enabled"
f"CloudFront Distribution {distribution.id} has logging enabled."
)
else:
report.status = "FAIL"
report.status_extended = (
f"CloudFront Distribution {distribution.id} has logging disabled"
f"CloudFront Distribution {distribution.id} has logging disabled."
)
findings.append(report)

View File

@@ -17,7 +17,7 @@ class cloudfront_distributions_using_deprecated_ssl_protocols(Check):
report.resource_id = distribution.id
report.resource_tags = distribution.tags
report.status = "PASS"
report.status_extended = f"CloudFront Distribution {distribution.id} is not using a deprecated SSL protocol"
report.status_extended = f"CloudFront Distribution {distribution.id} is not using a deprecated SSL protocol."
bad_ssl_protocol = False
for origin in distribution.origins:
@@ -34,7 +34,7 @@ class cloudfront_distributions_using_deprecated_ssl_protocols(Check):
break
if bad_ssl_protocol:
report.status = "FAIL"
report.status_extended = f"CloudFront Distribution {distribution.id} is using a deprecated SSL protocol"
report.status_extended = f"CloudFront Distribution {distribution.id} is using a deprecated SSL protocol."
break
findings.append(report)

View File

@@ -15,10 +15,10 @@ class cloudfront_distributions_using_waf(Check):
report.resource_tags = distribution.tags
if distribution.web_acl_id:
report.status = "PASS"
report.status_extended = f"CloudFront Distribution {distribution.id} is using AWS WAF web ACL {distribution.web_acl_id}"
report.status_extended = f"CloudFront Distribution {distribution.id} is using AWS WAF web ACL {distribution.web_acl_id}."
else:
report.status = "FAIL"
report.status_extended = f"CloudFront Distribution {distribution.id} is not using AWS WAF web ACL"
report.status_extended = f"CloudFront Distribution {distribution.id} is not using AWS WAF web ACL."
findings.append(report)
return findings

View File

@@ -21,10 +21,10 @@ class cloudtrail_cloudwatch_logging_enabled(Check):
report.status = "PASS"
if trail.is_multiregion:
report.status_extended = (
f"Multiregion trail {trail.name} has been logging the last 24h"
f"Multiregion trail {trail.name} has been logging the last 24h."
)
else:
report.status_extended = f"Single region trail {trail.name} has been logging the last 24h"
report.status_extended = f"Single region trail {trail.name} has been logging the last 24h."
if trail.latest_cloudwatch_delivery_time:
last_log_delivery = (
datetime.now().replace(tzinfo=timezone.utc)
@@ -33,15 +33,15 @@ class cloudtrail_cloudwatch_logging_enabled(Check):
if last_log_delivery > timedelta(days=maximum_time_without_logging):
report.status = "FAIL"
if trail.is_multiregion:
report.status_extended = f"Multiregion trail {trail.name} is not logging in the last 24h"
report.status_extended = f"Multiregion trail {trail.name} is not logging in the last 24h."
else:
report.status_extended = f"Single region trail {trail.name} is not logging in the last 24h"
report.status_extended = f"Single region trail {trail.name} is not logging in the last 24h."
else:
report.status = "FAIL"
if trail.is_multiregion:
report.status_extended = f"Multiregion trail {trail.name} is not logging in the last 24h or not configured to deliver logs"
report.status_extended = f"Multiregion trail {trail.name} is not logging in the last 24h or not configured to deliver logs."
else:
report.status_extended = f"Single region trail {trail.name} is not logging in the last 24h or not configured to deliver logs"
report.status_extended = f"Single region trail {trail.name} is not logging in the last 24h or not configured to deliver logs."
findings.append(report)
return findings

View File

@@ -17,21 +17,21 @@ class cloudtrail_kms_encryption_enabled(Check):
report.status = "FAIL"
if trail.is_multiregion:
report.status_extended = (
f"Multiregion trail {trail.name} has encryption disabled"
f"Multiregion trail {trail.name} has encryption disabled."
)
else:
report.status_extended = (
f"Single region trail {trail.name} has encryption disabled"
f"Single region trail {trail.name} has encryption disabled."
)
if trail.kms_key:
report.status = "PASS"
if trail.is_multiregion:
report.status_extended = (
f"Multiregion trail {trail.name} has encryption enabled"
f"Multiregion trail {trail.name} has encryption enabled."
)
else:
report.status_extended = (
f"Single region trail {trail.name} has encryption enabled"
f"Single region trail {trail.name} has encryption enabled."
)
findings.append(report)

View File

@@ -17,18 +17,16 @@ class cloudtrail_log_file_validation_enabled(Check):
report.status = "FAIL"
if trail.is_multiregion:
report.status_extended = (
f"Multiregion trail {trail.name} log file validation disabled"
f"Multiregion trail {trail.name} log file validation disabled."
)
else:
report.status_extended = (
f"Single region trail {trail.name} log file validation disabled"
)
report.status_extended = f"Single region trail {trail.name} log file validation disabled."
if trail.log_file_validation_enabled:
report.status = "PASS"
if trail.is_multiregion:
report.status_extended = f"Multiregion trail {trail.name} log file validation enabled"
report.status_extended = f"Multiregion trail {trail.name} log file validation enabled."
else:
report.status_extended = f"Single region trail {trail.name} log file validation enabled"
report.status_extended = f"Single region trail {trail.name} log file validation enabled."
findings.append(report)
return findings

View File

@@ -19,24 +19,24 @@ class cloudtrail_logs_s3_bucket_access_logging_enabled(Check):
report.resource_tags = trail.tags
report.status = "FAIL"
if trail.is_multiregion:
report.status_extended = f"Multiregion Trail {trail.name} S3 bucket access logging is not enabled for bucket {trail_bucket}"
report.status_extended = f"Multiregion Trail {trail.name} S3 bucket access logging is not enabled for bucket {trail_bucket}."
else:
report.status_extended = f"Single region Trail {trail.name} S3 bucket access logging is not enabled for bucket {trail_bucket}"
report.status_extended = f"Single region Trail {trail.name} S3 bucket access logging is not enabled for bucket {trail_bucket}."
for bucket in s3_client.buckets:
if trail_bucket == bucket.name:
trail_bucket_is_in_account = True
if bucket.logging:
report.status = "PASS"
if trail.is_multiregion:
report.status_extended = f"Multiregion trail {trail.name} S3 bucket access logging is enabled for bucket {trail_bucket}"
report.status_extended = f"Multiregion trail {trail.name} S3 bucket access logging is enabled for bucket {trail_bucket}."
else:
report.status_extended = f"Single region trail {trail.name} S3 bucket access logging is enabled for bucket {trail_bucket}"
report.status_extended = f"Single region trail {trail.name} S3 bucket access logging is enabled for bucket {trail_bucket}."
break
# check if trail is delivering logs in a cross account bucket
if not trail_bucket_is_in_account:
report.status = "INFO"
report.status_extended = f"Trail {trail.name} is delivering logs in a cross-account bucket {trail_bucket} in another account out of Prowler's permissions scope, please check it manually"
report.status_extended = f"Trail {trail.name} is delivering logs in a cross-account bucket {trail_bucket} in another account out of Prowler's permissions scope, please check it manually."
findings.append(report)
return findings

View File

@@ -19,9 +19,9 @@ class cloudtrail_logs_s3_bucket_is_not_publicly_accessible(Check):
report.resource_tags = trail.tags
report.status = "PASS"
if trail.is_multiregion:
report.status_extended = f"S3 Bucket {trail_bucket} from multiregion trail {trail.name} is not publicly accessible"
report.status_extended = f"S3 Bucket {trail_bucket} from multiregion trail {trail.name} is not publicly accessible."
else:
report.status_extended = f"S3 Bucket {trail_bucket} from single region trail {trail.name} is not publicly accessible"
report.status_extended = f"S3 Bucket {trail_bucket} from single region trail {trail.name} is not publicly accessible."
for bucket in s3_client.buckets:
# Here we need to ensure that acl_grantee is filled since if we don't have permissions to query the api for a concrete region
# (for example due to a SCP) we are going to try access an attribute from a None type
@@ -35,14 +35,14 @@ class cloudtrail_logs_s3_bucket_is_not_publicly_accessible(Check):
):
report.status = "FAIL"
if trail.is_multiregion:
report.status_extended = f"S3 Bucket {trail_bucket} from multiregion trail {trail.name} is publicly accessible"
report.status_extended = f"S3 Bucket {trail_bucket} from multiregion trail {trail.name} is publicly accessible."
else:
report.status_extended = f"S3 Bucket {trail_bucket} from single region trail {trail.name} is publicly accessible"
report.status_extended = f"S3 Bucket {trail_bucket} from single region trail {trail.name} is publicly accessible."
break
# check if trail bucket is a cross account bucket
if not trail_bucket_is_in_account:
report.status = "INFO"
report.status_extended = f"Trail {trail.name} bucket ({trail_bucket}) is a cross-account bucket in another account out of Prowler's permissions scope, please check it manually"
report.status_extended = f"Trail {trail.name} bucket ({trail_bucket}) is a cross-account bucket in another account out of Prowler's permissions scope, please check it manually."
findings.append(report)
return findings

View File

@@ -19,10 +19,10 @@ class cloudtrail_multi_region_enabled(Check):
report.resource_tags = trail.tags
if trail.is_multiregion:
report.status_extended = (
f"Trail {trail.name} is multiregion and it is logging"
f"Trail {trail.name} is multiregion and it is logging."
)
else:
report.status_extended = f"Trail {trail.name} is not multiregion and it is logging"
report.status_extended = f"Trail {trail.name} is not multiregion and it is logging."
# Since there exists a logging trail in that region there is no point in checking the reamaining trails
# Store the finding and exit the loop
findings.append(report)
@@ -30,7 +30,7 @@ class cloudtrail_multi_region_enabled(Check):
else:
report.status = "FAIL"
report.status_extended = (
"No CloudTrail trails enabled and logging were found"
"No CloudTrail trails enabled and logging were found."
)
report.resource_arn = cloudtrail_client.audited_account_arn
report.resource_id = cloudtrail_client.audited_account

View File

@@ -7,7 +7,7 @@ class cloudwatch_cross_account_sharing_disabled(Check):
findings = []
report = Check_Report_AWS(self.metadata())
report.status = "PASS"
report.status_extended = "CloudWatch doesn't allow cross-account sharing"
report.status_extended = "CloudWatch doesn't allow cross-account sharing."
report.resource_arn = iam_client.audited_account_arn
report.resource_id = iam_client.audited_account
report.region = iam_client.region

View File

@@ -81,7 +81,7 @@ class cloudwatch_log_group_no_secrets_in_logs(Check):
if log_group_secrets:
secrets_string = "; ".join(log_group_secrets)
report.status = "FAIL"
report.status_extended = f"Potential secrets found in log group {log_group.name} {secrets_string}"
report.status_extended = f"Potential secrets found in log group {log_group.name} {secrets_string}."
findings.append(report)
return findings

View File

@@ -4,6 +4,13 @@ from prowler.providers.aws.services.accessanalyzer.accessanalyzer_service import
Analyzer,
)
AWS_REGION_1 = "eu-west-1"
AWS_REGION_2 = "eu-west-2"
AWS_ACCOUNT_NUMBER = "123456789012"
AWS_ACCOUNT_ARN = f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:root"
ACCESS_ANALYZER_NAME = "test-analyzer"
ACCESS_ANALYZER_ARN = f"arn:aws:access-analyzer:{AWS_REGION_2}:{AWS_ACCOUNT_NUMBER}:analyzer/{ACCESS_ANALYZER_NAME}"
class Test_accessanalyzer_enabled:
def test_no_analyzers(self):
@@ -28,12 +35,12 @@ class Test_accessanalyzer_enabled:
accessanalyzer_client = mock.MagicMock
accessanalyzer_client.analyzers = [
Analyzer(
arn="",
name="012345678910",
arn=AWS_ACCOUNT_ARN,
name=AWS_ACCOUNT_NUMBER,
status="NOT_AVAILABLE",
tags=[],
type="",
region="eu-west-1",
region=AWS_REGION_1,
)
]
with mock.patch(
@@ -51,28 +58,31 @@ class Test_accessanalyzer_enabled:
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== "IAM Access Analyzer in account 012345678910 is not enabled."
== f"IAM Access Analyzer in account {AWS_ACCOUNT_NUMBER} is not enabled."
)
assert result[0].resource_id == "012345678910"
assert result[0].resource_id == AWS_ACCOUNT_NUMBER
assert result[0].resource_arn == AWS_ACCOUNT_ARN
assert result[0].region == AWS_REGION_1
assert result[0].resource_tags == []
def test_two_analyzers(self):
accessanalyzer_client = mock.MagicMock
accessanalyzer_client.analyzers = [
Analyzer(
arn="",
name="012345678910",
arn=AWS_ACCOUNT_ARN,
name=AWS_ACCOUNT_NUMBER,
status="NOT_AVAILABLE",
tags=[],
type="",
region="eu-west-1",
region=AWS_REGION_1,
),
Analyzer(
arn="",
name="Test Analyzer",
arn=ACCESS_ANALYZER_ARN,
name=ACCESS_ANALYZER_NAME,
status="ACTIVE",
tags=[],
type="",
region="eu-west-2",
region=AWS_REGION_2,
),
]
@@ -90,31 +100,37 @@ class Test_accessanalyzer_enabled:
result = check.execute()
assert len(result) == 2
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== "IAM Access Analyzer in account 012345678910 is not enabled."
== f"IAM Access Analyzer in account {AWS_ACCOUNT_NUMBER} is not enabled."
)
assert result[0].resource_id == "012345678910"
assert result[0].region == "eu-west-1"
assert result[0].resource_id == AWS_ACCOUNT_NUMBER
assert result[0].resource_arn == AWS_ACCOUNT_ARN
assert result[0].resource_tags == []
assert result[0].region == AWS_REGION_1
assert result[1].status == "PASS"
assert (
result[1].status_extended
== "IAM Access Analyzer Test Analyzer is enabled."
== f"IAM Access Analyzer {ACCESS_ANALYZER_NAME} is enabled."
)
assert result[1].resource_id == "Test Analyzer"
assert result[1].region == "eu-west-2"
assert result[1].resource_id == ACCESS_ANALYZER_NAME
assert result[1].resource_arn == ACCESS_ANALYZER_ARN
assert result[1].resource_tags == []
assert result[1].region == AWS_REGION_2
def test_one_active_analyzer(self):
accessanalyzer_client = mock.MagicMock
accessanalyzer_client.analyzers = [
Analyzer(
arn="",
name="Test Analyzer",
arn=ACCESS_ANALYZER_ARN,
name=ACCESS_ANALYZER_NAME,
status="ACTIVE",
tags=[],
type="",
region="eu-west-2",
region=AWS_REGION_2,
)
]
@@ -131,10 +147,13 @@ class Test_accessanalyzer_enabled:
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "IAM Access Analyzer Test Analyzer is enabled."
== f"IAM Access Analyzer {ACCESS_ANALYZER_NAME} is enabled."
)
assert result[0].resource_id == "Test Analyzer"
assert result[0].region == "eu-west-2"
assert result[0].resource_id == ACCESS_ANALYZER_NAME
assert result[0].resource_arn == ACCESS_ANALYZER_ARN
assert result[0].resource_tags == []
assert result[0].region == AWS_REGION_2

View File

@@ -8,6 +8,9 @@ from prowler.providers.aws.services.accessanalyzer.accessanalyzer_service import
AWS_REGION_1 = "eu-west-1"
AWS_REGION_2 = "eu-west-2"
AWS_ACCOUNT_NUMBER = "123456789012"
AWS_ACCOUNT_ARN = f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:root"
ACCESS_ANALYZER_NAME = "test-analyzer"
ACCESS_ANALYZER_ARN = f"arn:aws:access-analyzer:{AWS_REGION_2}:{AWS_ACCOUNT_NUMBER}:analyzer/{ACCESS_ANALYZER_NAME}"
class Test_accessanalyzer_enabled_without_findings:
@@ -33,7 +36,7 @@ class Test_accessanalyzer_enabled_without_findings:
accessanalyzer_client = mock.MagicMock
accessanalyzer_client.analyzers = [
Analyzer(
arn="",
arn=AWS_ACCOUNT_ARN,
name=AWS_ACCOUNT_NUMBER,
status="NOT_AVAILABLE",
tags=[],
@@ -57,15 +60,18 @@ class Test_accessanalyzer_enabled_without_findings:
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"IAM Access Analyzer in account {AWS_ACCOUNT_NUMBER} is not enabled"
== f"IAM Access Analyzer in account {AWS_ACCOUNT_NUMBER} is not enabled."
)
assert result[0].resource_id == AWS_ACCOUNT_NUMBER
assert result[0].resource_arn == AWS_ACCOUNT_ARN
assert result[0].region == AWS_REGION_1
assert result[0].resource_tags == []
def test_two_analyzers(self):
accessanalyzer_client = mock.MagicMock
accessanalyzer_client.analyzers = [
Analyzer(
arn="",
arn=AWS_ACCOUNT_ARN,
name=AWS_ACCOUNT_NUMBER,
status="NOT_AVAILABLE",
tags=[],
@@ -74,8 +80,8 @@ class Test_accessanalyzer_enabled_without_findings:
region=AWS_REGION_1,
),
Analyzer(
arn="",
name="Test Analyzer",
arn=ACCESS_ANALYZER_ARN,
name=ACCESS_ANALYZER_NAME,
status="ACTIVE",
findings=[
Finding(
@@ -107,27 +113,33 @@ class Test_accessanalyzer_enabled_without_findings:
result = check.execute()
assert len(result) == 2
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"IAM Access Analyzer in account {AWS_ACCOUNT_NUMBER} is not enabled"
== f"IAM Access Analyzer in account {AWS_ACCOUNT_NUMBER} is not enabled."
)
assert result[0].resource_id == AWS_ACCOUNT_NUMBER
assert result[0].resource_arn == AWS_ACCOUNT_ARN
assert result[0].region == AWS_REGION_1
assert result[0].resource_tags == []
assert result[1].status == "FAIL"
assert (
result[1].status_extended
== "IAM Access Analyzer Test Analyzer has 1 active findings"
== f"IAM Access Analyzer {ACCESS_ANALYZER_NAME} has 1 active findings."
)
assert result[1].resource_id == "Test Analyzer"
assert result[1].resource_id == ACCESS_ANALYZER_NAME
assert result[1].resource_arn == ACCESS_ANALYZER_ARN
assert result[1].region == AWS_REGION_2
assert result[1].resource_tags == []
def test_one_active_analyzer_without_findings(self):
accessanalyzer_client = mock.MagicMock
accessanalyzer_client.analyzers = [
Analyzer(
arn="",
name="Test Analyzer",
arn=ACCESS_ANALYZER_ARN,
name=ACCESS_ANALYZER_NAME,
status="ACTIVE",
tags=[],
fidings=[],
@@ -152,16 +164,18 @@ class Test_accessanalyzer_enabled_without_findings:
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "IAM Access Analyzer Test Analyzer does not have active findings"
== f"IAM Access Analyzer {ACCESS_ANALYZER_NAME} does not have active findings."
)
assert result[0].resource_id == "Test Analyzer"
assert result[0].resource_id == ACCESS_ANALYZER_NAME
assert result[0].resource_arn == ACCESS_ANALYZER_ARN
assert result[0].region == AWS_REGION_2
assert result[0].resource_tags == []
def test_one_active_analyzer_not_active(self):
accessanalyzer_client = mock.MagicMock
accessanalyzer_client.analyzers = [
Analyzer(
arn="",
arn=AWS_ACCOUNT_ARN,
name=AWS_ACCOUNT_NUMBER,
status="NOT_AVAILABLE",
tags=[],
@@ -187,17 +201,19 @@ class Test_accessanalyzer_enabled_without_findings:
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"IAM Access Analyzer in account {AWS_ACCOUNT_NUMBER} is not enabled"
== f"IAM Access Analyzer in account {AWS_ACCOUNT_NUMBER} is not enabled."
)
assert result[0].resource_id == AWS_ACCOUNT_NUMBER
assert result[0].resource_arn == AWS_ACCOUNT_ARN
assert result[0].region == AWS_REGION_1
assert result[0].resource_tags == []
def test_analyzer_finding_without_status(self):
accessanalyzer_client = mock.MagicMock
accessanalyzer_client.analyzers = [
Analyzer(
arn="",
name="Test Analyzer",
arn=ACCESS_ANALYZER_ARN,
name=ACCESS_ANALYZER_NAME,
status="ACTIVE",
findings=[
Finding(
@@ -228,7 +244,9 @@ class Test_accessanalyzer_enabled_without_findings:
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "IAM Access Analyzer Test Analyzer does not have active findings"
== f"IAM Access Analyzer {ACCESS_ANALYZER_NAME} does not have active findings."
)
assert result[0].resource_id == "Test Analyzer"
assert result[0].resource_id == ACCESS_ANALYZER_NAME
assert result[0].resource_arn == ACCESS_ANALYZER_ARN
assert result[0].region == AWS_REGION_1
assert result[0].resource_tags == []

View File

@@ -65,6 +65,7 @@ class Test_acm_certificates_expiration_check:
assert result[0].resource_id == certificate_name
assert result[0].resource_arn == certificate_arn
assert result[0].region == AWS_REGION
assert result[0].resource_tags == []
def test_acm_certificate_not_expirated(self):
certificate_arn = f"arn:aws:acm:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:certificate/{str(uuid.uuid4())}"
@@ -105,3 +106,4 @@ class Test_acm_certificates_expiration_check:
assert result[0].resource_id == certificate_name
assert result[0].resource_arn == certificate_arn
assert result[0].region == AWS_REGION
assert result[0].resource_tags == []

View File

@@ -64,6 +64,7 @@ class Test_acm_certificates_transparency_logs_enabled:
assert result[0].resource_id == certificate_name
assert result[0].resource_arn == certificate_arn
assert result[0].region == AWS_REGION
assert result[0].resource_tags == []
def test_acm_certificate_without_logging(self):
certificate_arn = f"arn:aws:acm:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:certificate/{str(uuid.uuid4())}"
@@ -103,3 +104,4 @@ class Test_acm_certificates_transparency_logs_enabled:
assert result[0].resource_id == certificate_name
assert result[0].resource_arn == certificate_arn
assert result[0].region == AWS_REGION
assert result[0].resource_tags == []

View File

@@ -131,6 +131,8 @@ class Test_apigateway_authorizers_enabled:
result[0].resource_arn
== f"arn:{current_audit_info.audited_partition}:apigateway:{AWS_REGION}::/restapis/{rest_api['id']}"
)
assert result[0].region == AWS_REGION
assert result[0].resource_tags == [{}]
@mock_apigateway
def test_apigateway_one_rest_api_without_lambda_authorizer(self):
@@ -172,3 +174,5 @@ class Test_apigateway_authorizers_enabled:
result[0].resource_arn
== f"arn:{current_audit_info.audited_partition}:apigateway:{AWS_REGION}::/restapis/{rest_api['id']}"
)
assert result[0].region == AWS_REGION
assert result[0].resource_tags == [{}]

View File

@@ -141,6 +141,8 @@ class Test_apigateway_client_certificate_enabled:
result[0].resource_arn
== f"arn:{current_audit_info.audited_partition}:apigateway:{AWS_REGION}::/restapis/{rest_api['id']}/stages/test"
)
assert result[0].region == AWS_REGION
assert result[0].resource_tags == [None]
@mock_apigateway
def test_apigateway_one_stage_with_certificate(self):
@@ -192,3 +194,5 @@ class Test_apigateway_client_certificate_enabled:
result[0].resource_arn
== f"arn:{current_audit_info.audited_partition}:apigateway:{AWS_REGION}::/restapis/test-rest-api/stages/test"
)
assert result[0].region == AWS_REGION
assert result[0].resource_tags == []

View File

@@ -112,6 +112,8 @@ class Test_apigateway_endpoint_public:
result[0].resource_arn
== f"arn:{current_audit_info.audited_partition}:apigateway:{AWS_REGION}::/restapis/{rest_api['id']}"
)
assert result[0].region == AWS_REGION
assert result[0].resource_tags == [{}]
@mock_apigateway
def test_apigateway_one_public_rest_api(self):
@@ -158,3 +160,5 @@ class Test_apigateway_endpoint_public:
result[0].resource_arn
== f"arn:{current_audit_info.audited_partition}:apigateway:{AWS_REGION}::/restapis/{rest_api['id']}"
)
assert result[0].region == AWS_REGION
assert result[0].resource_tags == [{}]

View File

@@ -144,6 +144,8 @@ class Test_apigateway_logging_enabled:
result[0].resource_arn
== f"arn:{current_audit_info.audited_partition}:apigateway:{AWS_REGION}::/restapis/{rest_api['id']}/stages/test"
)
assert result[0].region == AWS_REGION
assert result[0].resource_tags == [None]
@mock_apigateway
def test_apigateway_one_rest_api_without_logging(self):
@@ -213,3 +215,5 @@ class Test_apigateway_logging_enabled:
result[0].resource_arn
== f"arn:{current_audit_info.audited_partition}:apigateway:{AWS_REGION}::/restapis/{rest_api['id']}/stages/test"
)
assert result[0].region == AWS_REGION
assert result[0].resource_tags == [None]

View File

@@ -150,6 +150,8 @@ class Test_apigateway_waf_acl_attached:
result[0].resource_arn
== f"arn:{current_audit_info.audited_partition}:apigateway:{AWS_REGION}::/restapis/{rest_api['id']}/stages/test"
)
assert result[0].region == AWS_REGION
assert result[0].resource_tags == [None]
@mock_apigateway
def test_apigateway_one_rest_api_without_waf(self):
@@ -219,3 +221,5 @@ class Test_apigateway_waf_acl_attached:
result[0].resource_arn
== f"arn:{current_audit_info.audited_partition}:apigateway:{AWS_REGION}::/restapis/{rest_api['id']}/stages/test"
)
assert result[0].region == AWS_REGION
assert result[0].resource_tags == [None]

View File

@@ -130,4 +130,11 @@ class Test_apigatewayv2_access_logging_enabled:
result[0].status_extended
== f"API Gateway V2 test-api ID {api['ApiId']} in stage test-stage has access logging enabled."
)
assert result[0].resource_id == "test-api"
assert (
result[0].resource_arn
== f"arn:aws:apigateway:{AWS_REGION}::apis/{api['ApiId']}"
)
assert result[0].region == AWS_REGION
assert result[0].resource_tags == [{}]

View File

@@ -137,3 +137,9 @@ class Test_apigatewayv2_authorizers_enabled:
== f"API Gateway V2 test-api ID {api['ApiId']} has an authorizer configured."
)
assert result[0].resource_id == "test-api"
assert (
result[0].resource_arn
== f"arn:aws:apigateway:{AWS_REGION}::apis/{api['ApiId']}"
)
assert result[0].region == AWS_REGION
assert result[0].resource_tags == [{}]

View File

@@ -58,8 +58,9 @@ class Test_appstream_fleet_default_internet_access_disabled:
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Fleet {fleet1.name} has default internet access enabled"
== f"Fleet {fleet1.name} has default internet access enabled."
)
assert result[0].resource_tags == []
def test_one_fleet_internet_access_disbaled(self):
appstream_client = mock.MagicMock
@@ -95,8 +96,9 @@ class Test_appstream_fleet_default_internet_access_disabled:
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"Fleet {fleet1.name} has default internet access disabled"
== f"Fleet {fleet1.name} has default internet access disabled."
)
assert result[0].resource_tags == []
def test_two_fleets_internet_access_one_enabled_two_disabled(self):
appstream_client = mock.MagicMock
@@ -145,8 +147,9 @@ class Test_appstream_fleet_default_internet_access_disabled:
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Fleet {fleet1.name} has default internet access enabled"
== f"Fleet {fleet1.name} has default internet access enabled."
)
assert result[0].resource_tags == []
if res.resource_id == fleet2.name:
assert result[1].resource_arn == fleet2.arn
assert result[1].region == fleet2.region
@@ -154,5 +157,6 @@ class Test_appstream_fleet_default_internet_access_disabled:
assert result[1].status == "PASS"
assert (
result[1].status_extended
== f"Fleet {fleet2.name} has default internet access disabled"
== f"Fleet {fleet2.name} has default internet access disabled."
)
assert result[1].resource_tags == []

View File

@@ -61,8 +61,9 @@ class Test_appstream_fleet_maximum_session_duration:
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Fleet {fleet1.name} has the maximum session duration configured for more that 10 hours"
== f"Fleet {fleet1.name} has the maximum session duration configured for more that 10 hours."
)
assert result[0].resource_tags == []
def test_one_fleet_maximum_session_duration_less_than_10_hours(self):
appstream_client = mock.MagicMock
@@ -101,8 +102,9 @@ class Test_appstream_fleet_maximum_session_duration:
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"Fleet {fleet1.name} has the maximum session duration configured for less that 10 hours"
== f"Fleet {fleet1.name} has the maximum session duration configured for less that 10 hours."
)
assert result[0].resource_tags == []
def test_two_fleets_one_maximum_session_duration_less_than_10_hours_on_more_than_10_hours(
self,
@@ -157,8 +159,10 @@ class Test_appstream_fleet_maximum_session_duration:
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"Fleet {fleet1.name} has the maximum session duration configured for less that 10 hours"
== f"Fleet {fleet1.name} has the maximum session duration configured for less that 10 hours."
)
assert result[0].resource_tags == []
if res.resource_id == fleet2.name:
assert result[1].resource_arn == fleet2.arn
assert result[1].region == fleet2.region
@@ -166,5 +170,6 @@ class Test_appstream_fleet_maximum_session_duration:
assert result[1].status == "FAIL"
assert (
result[1].status_extended
== f"Fleet {fleet2.name} has the maximum session duration configured for more that 10 hours"
== f"Fleet {fleet2.name} has the maximum session duration configured for more that 10 hours."
)
assert result[1].resource_tags == []

View File

@@ -60,8 +60,9 @@ class Test_appstream_fleet_session_disconnect_timeout:
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Fleet {fleet1.name} has the session disconnect timeout set to more than 5 minutes"
== f"Fleet {fleet1.name} has the session disconnect timeout set to more than 5 minutes."
)
assert result[0].resource_tags == []
def test_one_fleet_session_disconnect_timeout_less_than_5_minutes(self):
appstream_client = mock.MagicMock
@@ -100,8 +101,9 @@ class Test_appstream_fleet_session_disconnect_timeout:
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"Fleet {fleet1.name} has the session disconnect timeout set to less than 5 minutes"
== f"Fleet {fleet1.name} has the session disconnect timeout set to less than 5 minutes."
)
assert result[0].resource_tags == []
def test_two_fleets_session_disconnect_timeout_less_than_5_minutes_one_more_than_5_minutes(
self,
@@ -156,8 +158,9 @@ class Test_appstream_fleet_session_disconnect_timeout:
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Fleet {fleet1.name} has the session disconnect timeout set to more than 5 minutes"
== f"Fleet {fleet1.name} has the session disconnect timeout set to more than 5 minutes."
)
assert result[0].resource_tags == []
if res.resource_id == fleet2.name:
assert result[1].resource_arn == fleet2.arn
assert result[1].region == fleet2.region
@@ -165,5 +168,6 @@ class Test_appstream_fleet_session_disconnect_timeout:
assert result[1].status == "PASS"
assert (
result[1].status_extended
== f"Fleet {fleet2.name} has the session disconnect timeout set to less than 5 minutes"
== f"Fleet {fleet2.name} has the session disconnect timeout set to less than 5 minutes."
)
assert result[1].resource_tags == []

View File

@@ -61,8 +61,9 @@ class Test_appstream_fleet_session_idle_disconnect_timeout:
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Fleet {fleet1.name} has the session idle disconnect timeout set to more than 10 minutes"
== f"Fleet {fleet1.name} has the session idle disconnect timeout set to more than 10 minutes."
)
assert result[0].resource_tags == []
def test_one_fleet_session_idle_disconnect_timeout_less_than_10_minutes(self):
appstream_client = mock.MagicMock
@@ -101,8 +102,9 @@ class Test_appstream_fleet_session_idle_disconnect_timeout:
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"Fleet {fleet1.name} has the session idle disconnect timeout set to less than 10 minutes"
== f"Fleet {fleet1.name} has the session idle disconnect timeout set to less than 10 minutes."
)
assert result[0].resource_tags == []
def test_two_fleets_session_idle_disconnect_timeout_than_10_minutes_one_more_than_10_minutes(
self,
@@ -157,7 +159,7 @@ class Test_appstream_fleet_session_idle_disconnect_timeout:
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"Fleet {fleet1.name} has the session idle disconnect timeout set to less than 10 minutes"
== f"Fleet {fleet1.name} has the session idle disconnect timeout set to less than 10 minutes."
)
if res.resource_id == fleet2.name:
assert result[1].resource_arn == fleet2.arn
@@ -166,5 +168,6 @@ class Test_appstream_fleet_session_idle_disconnect_timeout:
assert result[1].status == "FAIL"
assert (
result[1].status_extended
== f"Fleet {fleet2.name} has the session idle disconnect timeout set to more than 10 minutes"
== f"Fleet {fleet2.name} has the session idle disconnect timeout set to more than 10 minutes."
)
assert result[1].resource_tags == []

View File

@@ -73,15 +73,19 @@ class Test_autoscaling_find_secrets_ec2_launch_configuration:
@mock_autoscaling
def test_one_autoscaling_with_no_secrets(self):
# Include launch_configurations to check
launch_configuration_name = "tester"
autoscaling_client = client("autoscaling", region_name=AWS_REGION)
autoscaling_client.create_launch_configuration(
LaunchConfigurationName="tester",
LaunchConfigurationName=launch_configuration_name,
ImageId="ami-12c6146b",
InstanceType="t1.micro",
KeyName="the_keys",
SecurityGroups=["default", "default2"],
UserData="This is some user_data",
)
launch_configuration_arn = autoscaling_client.describe_launch_configurations(
LaunchConfigurationNames=[launch_configuration_name]
)["LaunchConfigurations"][0]["LaunchConfigurationARN"]
from prowler.providers.aws.services.autoscaling.autoscaling_service import (
AutoScaling,
@@ -107,22 +111,28 @@ class Test_autoscaling_find_secrets_ec2_launch_configuration:
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "No secrets found in autoscaling tester User Data."
== f"No secrets found in autoscaling {launch_configuration_name} User Data."
)
assert result[0].resource_id == "tester"
assert result[0].resource_id == launch_configuration_name
assert result[0].resource_arn == launch_configuration_arn
assert result[0].region == AWS_REGION
@mock_autoscaling
def test_one_autoscaling_with_secrets(self):
# Include launch_configurations to check
launch_configuration_name = "tester"
autoscaling_client = client("autoscaling", region_name=AWS_REGION)
autoscaling_client.create_launch_configuration(
LaunchConfigurationName="tester",
LaunchConfigurationName=launch_configuration_name,
ImageId="ami-12c6146b",
InstanceType="t1.micro",
KeyName="the_keys",
SecurityGroups=["default", "default2"],
UserData="DB_PASSWORD=foobar123",
)
launch_configuration_arn = autoscaling_client.describe_launch_configurations(
LaunchConfigurationNames=[launch_configuration_name]
)["LaunchConfigurations"][0]["LaunchConfigurationARN"]
from prowler.providers.aws.services.autoscaling.autoscaling_service import (
AutoScaling,
@@ -148,9 +158,11 @@ class Test_autoscaling_find_secrets_ec2_launch_configuration:
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== "Potential secret found in autoscaling tester User Data."
== f"Potential secret found in autoscaling {launch_configuration_name} User Data."
)
assert result[0].resource_id == "tester"
assert result[0].resource_id == launch_configuration_name
assert result[0].resource_arn == launch_configuration_arn
assert result[0].region == AWS_REGION
@mock_autoscaling
def test_one_autoscaling_file_with_secrets(self):
@@ -160,6 +172,7 @@ class Test_autoscaling_find_secrets_ec2_launch_configuration:
"r",
)
secrets = f.read()
launch_configuration_name = "tester"
autoscaling_client = client("autoscaling", region_name=AWS_REGION)
autoscaling_client.create_launch_configuration(
LaunchConfigurationName="tester",
@@ -169,6 +182,9 @@ class Test_autoscaling_find_secrets_ec2_launch_configuration:
SecurityGroups=["default", "default2"],
UserData=secrets,
)
launch_configuration_arn = autoscaling_client.describe_launch_configurations(
LaunchConfigurationNames=[launch_configuration_name]
)["LaunchConfigurations"][0]["LaunchConfigurationARN"]
from prowler.providers.aws.services.autoscaling.autoscaling_service import (
AutoScaling,
@@ -194,21 +210,27 @@ class Test_autoscaling_find_secrets_ec2_launch_configuration:
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== "Potential secret found in autoscaling tester User Data."
== f"Potential secret found in autoscaling {launch_configuration_name} User Data."
)
assert result[0].resource_id == "tester"
assert result[0].resource_id == launch_configuration_name
assert result[0].resource_arn == launch_configuration_arn
assert result[0].region == AWS_REGION
@mock_autoscaling
def test_one_launch_configurations_without_user_data(self):
# Include launch_configurations to check
launch_configuration_name = "tester"
autoscaling_client = client("autoscaling", region_name=AWS_REGION)
autoscaling_client.create_launch_configuration(
LaunchConfigurationName="tester",
LaunchConfigurationName=launch_configuration_name,
ImageId="ami-12c6146b",
InstanceType="t1.micro",
KeyName="the_keys",
SecurityGroups=["default", "default2"],
)
launch_configuration_arn = autoscaling_client.describe_launch_configurations(
LaunchConfigurationNames=[launch_configuration_name]
)["LaunchConfigurations"][0]["LaunchConfigurationARN"]
from prowler.providers.aws.services.autoscaling.autoscaling_service import (
AutoScaling,
@@ -234,6 +256,8 @@ class Test_autoscaling_find_secrets_ec2_launch_configuration:
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "No secrets found in autoscaling tester since User Data is empty."
== f"No secrets found in autoscaling {launch_configuration_name} since User Data is empty."
)
assert result[0].resource_id == "tester"
assert result[0].resource_id == launch_configuration_name
assert result[0].resource_arn == launch_configuration_arn
assert result[0].region == AWS_REGION

View File

@@ -80,8 +80,9 @@ class Test_autoscaling_group_multiple_az:
KeyName="the_keys",
SecurityGroups=["default", "default2"],
)
autoscaling_group_name = "my-autoscaling-group"
autoscaling_client.create_auto_scaling_group(
AutoScalingGroupName="my-autoscaling-group",
AutoScalingGroupName=autoscaling_group_name,
LaunchConfigurationName="test",
MinSize=0,
MaxSize=0,
@@ -89,6 +90,10 @@ class Test_autoscaling_group_multiple_az:
AvailabilityZones=["us-east-1a", "us-east-1b"],
)
autoscaling_group_arn = autoscaling_client.describe_auto_scaling_groups(
AutoScalingGroupNames=[autoscaling_group_name]
)["AutoScalingGroups"][0]["AutoScalingGroupARN"]
from prowler.providers.aws.services.autoscaling.autoscaling_service import (
AutoScaling,
)
@@ -114,9 +119,11 @@ class Test_autoscaling_group_multiple_az:
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "Autoscaling group my-autoscaling-group has multiple availability zones."
== f"Autoscaling group {autoscaling_group_name} has multiple availability zones."
)
assert result[0].resource_id == "my-autoscaling-group"
assert result[0].resource_id == autoscaling_group_name
assert result[0].resource_arn == autoscaling_group_arn
assert result[0].region == AWS_REGION
assert result[0].resource_tags == []
@mock_autoscaling
@@ -129,8 +136,9 @@ class Test_autoscaling_group_multiple_az:
KeyName="the_keys",
SecurityGroups=["default", "default2"],
)
autoscaling_group_name = "my-autoscaling-group"
autoscaling_client.create_auto_scaling_group(
AutoScalingGroupName="my-autoscaling-group",
AutoScalingGroupName=autoscaling_group_name,
LaunchConfigurationName="test",
MinSize=0,
MaxSize=0,
@@ -138,6 +146,10 @@ class Test_autoscaling_group_multiple_az:
AvailabilityZones=["us-east-1a"],
)
autoscaling_group_arn = autoscaling_client.describe_auto_scaling_groups(
AutoScalingGroupNames=[autoscaling_group_name]
)["AutoScalingGroups"][0]["AutoScalingGroupARN"]
from prowler.providers.aws.services.autoscaling.autoscaling_service import (
AutoScaling,
)
@@ -163,10 +175,11 @@ class Test_autoscaling_group_multiple_az:
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== "Autoscaling group my-autoscaling-group has only one availability zones."
== f"Autoscaling group {autoscaling_group_name} has only one availability zones."
)
assert result[0].resource_id == "my-autoscaling-group"
assert result[0].resource_id == autoscaling_group_name
assert result[0].resource_tags == []
assert result[0].resource_arn == autoscaling_group_arn
@mock_autoscaling
def test_groups_witd_and_without(self):
@@ -178,6 +191,7 @@ class Test_autoscaling_group_multiple_az:
KeyName="the_keys",
SecurityGroups=["default", "default2"],
)
autoscaling_group_name_1 = "asg-multiple"
autoscaling_client.create_auto_scaling_group(
AutoScalingGroupName="asg-multiple",
LaunchConfigurationName="test",
@@ -186,6 +200,11 @@ class Test_autoscaling_group_multiple_az:
DesiredCapacity=0,
AvailabilityZones=["us-east-1a", "us-east-1b"],
)
autoscaling_group_arn_1 = autoscaling_client.describe_auto_scaling_groups(
AutoScalingGroupNames=[autoscaling_group_name_1]
)["AutoScalingGroups"][0]["AutoScalingGroupARN"]
autoscaling_group_name_2 = "asg-single"
autoscaling_client.create_auto_scaling_group(
AutoScalingGroupName="asg-single",
LaunchConfigurationName="test",
@@ -194,6 +213,9 @@ class Test_autoscaling_group_multiple_az:
DesiredCapacity=0,
AvailabilityZones=["us-east-1a"],
)
autoscaling_group_arn_2 = autoscaling_client.describe_auto_scaling_groups(
AutoScalingGroupNames=[autoscaling_group_name_2]
)["AutoScalingGroups"][0]["AutoScalingGroupARN"]
from prowler.providers.aws.services.autoscaling.autoscaling_service import (
AutoScaling,
@@ -218,17 +240,21 @@ class Test_autoscaling_group_multiple_az:
assert len(result) == 2
for check in result:
if check.resource_id == "asg-multiple":
if check.resource_id == autoscaling_group_name_1:
assert check.status == "PASS"
assert (
check.status_extended
== "Autoscaling group asg-multiple has multiple availability zones."
== f"Autoscaling group {autoscaling_group_name_1} has multiple availability zones."
)
assert check.resource_arn == autoscaling_group_arn_1
assert check.resource_tags == []
if check.resource_id == "asg-single":
assert check.region == AWS_REGION
if check.resource_id == autoscaling_group_name_2:
assert check.status == "FAIL"
assert (
check.status_extended
== "Autoscaling group asg-single has only one availability zones."
== f"Autoscaling group {autoscaling_group_name_2} has only one availability zones."
)
assert check.resource_tags == []
assert check.resource_arn == autoscaling_group_arn_2
assert check.region == AWS_REGION

View File

@@ -147,8 +147,9 @@ class Test_awslambda_function_invoke_api_operations_cloudtrail_logging_enabled:
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Lambda function {function_name} is not recorded by CloudTrail"
== f"Lambda function {function_name} is not recorded by CloudTrail."
)
assert result[0].resource_tags == []
@mock_cloudtrail
@mock_s3
@@ -222,8 +223,9 @@ class Test_awslambda_function_invoke_api_operations_cloudtrail_logging_enabled:
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"Lambda function {function_name} is recorded by CloudTrail trail {trail_name}"
== f"Lambda function {function_name} is recorded by CloudTrail trail {trail_name}."
)
assert result[0].resource_tags == []
@mock_cloudtrail
@mock_s3
@@ -300,8 +302,9 @@ class Test_awslambda_function_invoke_api_operations_cloudtrail_logging_enabled:
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"Lambda function {function_name} is recorded by CloudTrail trail {trail_name}"
== f"Lambda function {function_name} is recorded by CloudTrail trail {trail_name}."
)
assert result[0].resource_tags == []
@mock_cloudtrail
@mock_s3
@@ -373,5 +376,6 @@ class Test_awslambda_function_invoke_api_operations_cloudtrail_logging_enabled:
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"Lambda function {function_name} is recorded by CloudTrail trail {trail_name}"
== f"Lambda function {function_name} is recorded by CloudTrail trail {trail_name}."
)
assert result[0].resource_tags == []

View File

@@ -76,8 +76,9 @@ class Test_awslambda_function_no_secrets_in_code:
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Potential secret found in Lambda function {function_name} code -> lambda_function.py: Secret Keyword on line 3"
== f"Potential secret found in Lambda function {function_name} code -> lambda_function.py: Secret Keyword on line 3."
)
assert result[0].resource_tags == []
def test_function_code_without_secrets(self):
lambda_client = mock.MagicMock
@@ -123,5 +124,6 @@ class Test_awslambda_function_no_secrets_in_code:
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"No secrets found in Lambda function {function_name} code"
== f"No secrets found in Lambda function {function_name} code."
)
assert result[0].resource_tags == []

View File

@@ -62,8 +62,9 @@ class Test_awslambda_function_no_secrets_in_variables:
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"No secrets found in Lambda function {function_name} variables"
== f"No secrets found in Lambda function {function_name} variables."
)
assert result[0].resource_tags == []
def test_function_secrets_in_variables(self):
lambda_client = mock.MagicMock
@@ -102,8 +103,9 @@ class Test_awslambda_function_no_secrets_in_variables:
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Potential secret found in Lambda function {function_name} variables -> Secret Keyword in variable db_password"
== f"Potential secret found in Lambda function {function_name} variables -> Secret Keyword in variable db_password."
)
assert result[0].resource_tags == []
def test_function_no_secrets_in_variables(self):
lambda_client = mock.MagicMock
@@ -142,5 +144,6 @@ class Test_awslambda_function_no_secrets_in_variables:
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"No secrets found in Lambda function {function_name} variables"
== f"No secrets found in Lambda function {function_name} variables."
)
assert result[0].resource_tags == []

View File

@@ -77,8 +77,9 @@ class Test_awslambda_function_not_publicly_accessible:
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Lambda function {function_name} has a policy resource-based policy with public access"
== f"Lambda function {function_name} has a policy resource-based policy with public access."
)
assert result[0].resource_tags == []
def test_function_not_public(self):
lambda_client = mock.MagicMock
@@ -131,8 +132,9 @@ class Test_awslambda_function_not_publicly_accessible:
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"Lambda function {function_name} has a policy resource-based policy not public"
== f"Lambda function {function_name} has a policy resource-based policy not public."
)
assert result[0].resource_tags == []
def test_function_public_with_canonical(self):
lambda_client = mock.MagicMock
@@ -185,5 +187,6 @@ class Test_awslambda_function_not_publicly_accessible:
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Lambda function {function_name} has a policy resource-based policy with public access"
== f"Lambda function {function_name} has a policy resource-based policy with public access."
)
assert result[0].resource_tags == []

View File

@@ -73,6 +73,7 @@ class Test_awslambda_function_url_cors_policy:
result[0].status_extended
== f"Lambda function {function_name} URL has a wide CORS configuration."
)
assert result[0].resource_tags == []
def test_function_cors_not_wide(self):
lambda_client = mock.MagicMock
@@ -116,6 +117,7 @@ class Test_awslambda_function_url_cors_policy:
result[0].status_extended
== f"Lambda function {function_name} does not have a wide CORS configuration."
)
assert result[0].resource_tags == []
def test_function_cors_wide_with_two_origins(self):
lambda_client = mock.MagicMock
@@ -161,3 +163,4 @@ class Test_awslambda_function_url_cors_policy:
result[0].status_extended
== f"Lambda function {function_name} URL has a wide CORS configuration."
)
assert result[0].resource_tags == []

View File

@@ -73,6 +73,7 @@ class Test_awslambda_function_url_public:
result[0].status_extended
== f"Lambda function {function_name} has a publicly accessible function URL."
)
assert result[0].resource_tags == []
def test_function_private_url(self):
lambda_client = mock.MagicMock
@@ -116,3 +117,4 @@ class Test_awslambda_function_url_public:
result[0].status_extended
== f"Lambda function {function_name} does not have a publicly accessible function URL."
)
assert result[0].resource_tags == []

View File

@@ -79,8 +79,9 @@ class Test_awslambda_function_using_supported_runtimes:
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Lambda function {function_name} is using {function_runtime} which is obsolete"
== f"Lambda function {function_name} is using {function_runtime} which is obsolete."
)
assert result[0].resource_tags == []
def test_function_supported_runtime(self):
lambda_client = mock.MagicMock
@@ -135,8 +136,9 @@ class Test_awslambda_function_using_supported_runtimes:
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"Lambda function {function_name} is using {function_runtime} which is supported"
== f"Lambda function {function_name} is using {function_runtime} which is supported."
)
assert result[0].resource_tags == []
def test_function_no_runtime(self):
lambda_client = mock.MagicMock

View File

@@ -1,5 +1,6 @@
from datetime import datetime
from unittest import mock
from uuid import uuid4
from prowler.providers.aws.services.backup.backup_service import BackupPlan
@@ -28,7 +29,7 @@ class Test_backup_plans_exist:
assert len(result) == 1
assert result[0].status == "FAIL"
assert result[0].status_extended == "No Backup Plan Exist"
assert result[0].status_extended == "No Backup Plan exist."
assert result[0].resource_id == AWS_ACCOUNT_NUMBER
assert result[0].resource_arn == f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:root"
assert result[0].region == AWS_REGION
@@ -38,10 +39,14 @@ class Test_backup_plans_exist:
backup_client.audited_account = AWS_ACCOUNT_NUMBER
backup_client.audited_account_arn = f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:root"
backup_client.region = AWS_REGION
backup_plan_id = str(uuid4()).upper()
backup_plan_arn = (
f"arn:aws:backup:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:plan:{backup_plan_id}"
)
backup_client.backup_plans = [
BackupPlan(
arn="ARN",
id="MyBackupPlan",
arn=backup_plan_arn,
id=backup_plan_id,
region=AWS_REGION,
name="MyBackupPlan",
version_id="version_id",
@@ -65,8 +70,11 @@ class Test_backup_plans_exist:
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "At least one backup plan exists: " + result[0].resource_id
== f"At least one backup plan exists: {result[0].resource_id}."
)
assert result[0].resource_id == "MyBackupPlan"
assert result[0].resource_arn == "ARN"
assert (
result[0].resource_arn
== f"arn:aws:backup:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:plan:{backup_plan_id}"
)
assert result[0].region == AWS_REGION

View File

@@ -1,5 +1,6 @@
from datetime import datetime
from unittest import mock
from uuid import uuid4
from prowler.providers.aws.services.backup.backup_service import (
BackupPlan,
@@ -34,10 +35,14 @@ class Test_backup_reportplans_exist:
backup_client.audited_account = AWS_ACCOUNT_NUMBER
backup_client.audited_account_arn = f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:root"
backup_client.region = AWS_REGION
backup_plan_id = str(uuid4()).upper()
backup_plan_arn = (
f"arn:aws:backup:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:plan:{backup_plan_id}"
)
backup_client.backup_plans = [
BackupPlan(
arn="ARN",
id="MyBackupPlan",
arn=backup_plan_arn,
id=backup_plan_arn,
region=AWS_REGION,
name="MyBackupPlan",
version_id="version_id",
@@ -60,7 +65,7 @@ class Test_backup_reportplans_exist:
assert len(result) == 1
assert result[0].status == "FAIL"
assert result[0].status_extended == "No Backup Report Plan Exist"
assert result[0].status_extended == "No Backup Report Plan exist."
assert result[0].resource_id == AWS_ACCOUNT_NUMBER
assert result[0].resource_arn == f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:root"
assert result[0].region == AWS_REGION
@@ -70,10 +75,14 @@ class Test_backup_reportplans_exist:
backup_client.audited_account = AWS_ACCOUNT_NUMBER
backup_client.audited_account_arn = f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:root"
backup_client.region = AWS_REGION
backup_plan_id = str(uuid4()).upper()
backup_plan_arn = (
f"arn:aws:backup:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:plan:{backup_plan_id}"
)
backup_client.backup_plans = [
BackupPlan(
arn="ARN",
id="MyBackupPlan",
arn=backup_plan_arn,
id=backup_plan_id,
region=AWS_REGION,
name="MyBackupPlan",
version_id="version_id",
@@ -81,9 +90,11 @@ class Test_backup_reportplans_exist:
advanced_settings=[],
)
]
backup_report_plan_id = str(uuid4()).upper()
backup_report_plan_arn = f"arn:aws:backup:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:report-plan:MyBackupReportPlan-{backup_report_plan_id}"
backup_client.backup_report_plans = [
BackupReportPlan(
arn="ARN",
arn=backup_report_plan_arn,
region=AWS_REGION,
name="MyBackupReportPlan",
last_attempted_execution_date=datetime(2015, 1, 1),
@@ -107,8 +118,8 @@ class Test_backup_reportplans_exist:
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "At least one backup report plan exists: " + result[0].resource_id
== f"At least one backup report plan exists: {result[0].resource_id}."
)
assert result[0].resource_id == "MyBackupReportPlan"
assert result[0].resource_arn == "ARN"
assert result[0].resource_arn == backup_report_plan_arn
assert result[0].region == AWS_REGION

View File

@@ -3,6 +3,7 @@ from unittest import mock
from prowler.providers.aws.services.backup.backup_service import BackupVault
AWS_REGION = "eu-west-1"
AWS_ACCOUNT_NUMBER = "0123456789012"
class Test_backup_vaults_encrypted:
@@ -25,9 +26,10 @@ class Test_backup_vaults_encrypted:
def test_one_backup_vault_unencrypted(self):
backup_client = mock.MagicMock
backup_vault_arn = f"arn:aws:backup:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:backup-vault:MyBackupVault"
backup_client.backup_vaults = [
BackupVault(
arn="ARN",
arn=backup_vault_arn,
name="MyBackupVault",
region=AWS_REGION,
encryption="",
@@ -54,17 +56,18 @@ class Test_backup_vaults_encrypted:
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== "Backup Vault " + result[0].resource_id + " is not encrypted"
== f"Backup Vault {result[0].resource_id} is not encrypted."
)
assert result[0].resource_id == "MyBackupVault"
assert result[0].resource_arn == "ARN"
assert result[0].resource_arn == backup_vault_arn
assert result[0].region == AWS_REGION
def test_one_backup_vault_encrypted(self):
backup_client = mock.MagicMock
backup_vault_arn = f"arn:aws:backup:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:backup-vault:MyBackupVault"
backup_client.backup_vaults = [
BackupVault(
arn="ARN",
arn=backup_vault_arn,
name="MyBackupVault",
region=AWS_REGION,
encryption="test",
@@ -91,8 +94,8 @@ class Test_backup_vaults_encrypted:
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "Backup Vault " + result[0].resource_id + " is encrypted"
== f"Backup Vault {result[0].resource_id} is encrypted."
)
assert result[0].resource_id == "MyBackupVault"
assert result[0].resource_arn == "ARN"
assert result[0].resource_arn == backup_vault_arn
assert result[0].region == AWS_REGION

View File

@@ -27,7 +27,7 @@ class Test_backup_vaults_exist:
assert len(result) == 1
assert result[0].status == "FAIL"
assert result[0].status_extended == "No Backup Vault Exist"
assert result[0].status_extended == "No Backup Vault exist."
assert result[0].resource_id == AWS_ACCOUNT_NUMBER
assert result[0].resource_arn == f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:root"
assert result[0].region == AWS_REGION
@@ -37,9 +37,10 @@ class Test_backup_vaults_exist:
backup_client.audited_account = AWS_ACCOUNT_NUMBER
backup_client.audited_account_arn = f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:root"
backup_client.region = AWS_REGION
backup_vault_arn = f"arn:aws:backup:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:backup-vault:MyBackupVault"
backup_client.backup_vaults = [
BackupVault(
arn="ARN",
arn=backup_vault_arn,
name="MyBackupVault",
region=AWS_REGION,
encryption="",
@@ -66,8 +67,8 @@ class Test_backup_vaults_exist:
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "At least one backup vault exists: " + result[0].resource_id
== f"At least one backup vault exists: {result[0].resource_id}."
)
assert result[0].resource_id == "MyBackupVault"
assert result[0].resource_arn == "ARN"
assert result[0].resource_arn == backup_vault_arn
assert result[0].region == AWS_REGION

View File

@@ -59,6 +59,7 @@ class Test_cloudformation_stack_outputs_find_secrets:
== "arn:aws:cloudformation:eu-west-1:123456789012:stack/Test-Stack/796c8d26-b390-41d7-a23c-0702c4e78b60"
)
assert result[0].region == AWS_REGION
assert result[0].resource_tags == []
def test_stack_no_secret_in_outputs(self):
cloudformation_client = mock.MagicMock
@@ -95,6 +96,7 @@ class Test_cloudformation_stack_outputs_find_secrets:
== "arn:aws:cloudformation:eu-west-1:123456789012:stack/Test-Stack/796c8d26-b390-41d7-a23c-0702c4e78b60"
)
assert result[0].region == AWS_REGION
assert result[0].resource_tags == []
def test_stack_no_outputs(self):
cloudformation_client = mock.MagicMock
@@ -131,3 +133,4 @@ class Test_cloudformation_stack_outputs_find_secrets:
== "arn:aws:cloudformation:eu-west-1:123456789012:stack/Test-Stack/796c8d26-b390-41d7-a23c-0702c4e78b60"
)
assert result[0].region == AWS_REGION
assert result[0].resource_tags == []

View File

@@ -60,6 +60,7 @@ class Test_cloudformation_stacks_termination_protection_enabled:
== "arn:aws:cloudformation:eu-west-1:123456789012:stack/Test-Stack/796c8d26-b390-41d7-a23c-0702c4e78b60"
)
assert result[0].region == AWS_REGION
assert result[0].resource_tags == []
def test_stack_termination_protection_disabled(self):
cloudformation_client = mock.MagicMock
@@ -97,3 +98,4 @@ class Test_cloudformation_stacks_termination_protection_enabled:
== "arn:aws:cloudformation:eu-west-1:123456789012:stack/Test-Stack/796c8d26-b390-41d7-a23c-0702c4e78b60"
)
assert result[0].region == AWS_REGION
assert result[0].resource_tags == []

View File

@@ -68,8 +68,9 @@ class Test_cloudfront_distributions_field_level_encryption_enabled:
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"CloudFront Distribution {DISTRIBUTION_ID} has Field Level Encryption enabled"
== f"CloudFront Distribution {DISTRIBUTION_ID} has Field Level Encryption enabled."
)
assert result[0].resource_tags == []
def test_one_distribution_field_level_encryption_disabled(self):
cloudfront_client = mock.MagicMock
@@ -106,5 +107,6 @@ class Test_cloudfront_distributions_field_level_encryption_enabled:
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"CloudFront Distribution {DISTRIBUTION_ID} has Field Level Encryption disabled"
== f"CloudFront Distribution {DISTRIBUTION_ID} has Field Level Encryption disabled."
)
assert result[0].resource_tags == []

View File

@@ -63,8 +63,9 @@ class Test_cloudfront_distributions_geo_restrictions_enabled:
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"CloudFront Distribution {DISTRIBUTION_ID} has Geo restrictions disabled"
== f"CloudFront Distribution {DISTRIBUTION_ID} has Geo restrictions disabled."
)
assert result[0].resource_tags == []
def test_one_distribution_geo_restriction_enabled_whitelist(self):
cloudfront_client = mock.MagicMock
@@ -97,8 +98,9 @@ class Test_cloudfront_distributions_geo_restrictions_enabled:
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"CloudFront Distribution {DISTRIBUTION_ID} has Geo restrictions enabled"
== f"CloudFront Distribution {DISTRIBUTION_ID} has Geo restrictions enabled."
)
assert result[0].resource_tags == []
def test_one_distribution_geo_restriction_enabled_blacklist(self):
cloudfront_client = mock.MagicMock
@@ -131,5 +133,6 @@ class Test_cloudfront_distributions_geo_restrictions_enabled:
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"CloudFront Distribution {DISTRIBUTION_ID} has Geo restrictions enabled"
== f"CloudFront Distribution {DISTRIBUTION_ID} has Geo restrictions enabled."
)
assert result[0].resource_tags == []

View File

@@ -68,8 +68,9 @@ class Test_cloudfront_distributions_https_enabled:
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"CloudFront Distribution {DISTRIBUTION_ID} viewers can use HTTP or HTTPS"
== f"CloudFront Distribution {DISTRIBUTION_ID} viewers can use HTTP or HTTPS."
)
assert result[0].resource_tags == []
def test_one_distribution_https_redirect(self):
cloudfront_client = mock.MagicMock
@@ -106,8 +107,9 @@ class Test_cloudfront_distributions_https_enabled:
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"CloudFront Distribution {DISTRIBUTION_ID} has redirect to HTTPS"
== f"CloudFront Distribution {DISTRIBUTION_ID} has redirect to HTTPS."
)
assert result[0].resource_tags == []
def test_one_distribution_https_only(self):
cloudfront_client = mock.MagicMock
@@ -144,5 +146,6 @@ class Test_cloudfront_distributions_https_enabled:
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"CloudFront Distribution {DISTRIBUTION_ID} has HTTPS only"
== f"CloudFront Distribution {DISTRIBUTION_ID} has HTTPS only."
)
assert result[0].resource_tags == []

View File

@@ -64,7 +64,7 @@ class Test_cloudfront_distributions_logging_enabled:
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"CloudFront Distribution {DISTRIBUTION_ID} has logging enabled"
== f"CloudFront Distribution {DISTRIBUTION_ID} has logging enabled."
)
def test_one_distribution_logging_disabled_realtime_disabled(self):
@@ -103,8 +103,9 @@ class Test_cloudfront_distributions_logging_enabled:
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"CloudFront Distribution {DISTRIBUTION_ID} has logging disabled"
== f"CloudFront Distribution {DISTRIBUTION_ID} has logging disabled."
)
assert result[0].resource_tags == []
def test_one_distribution_logging_disabled_realtime_enabled(self):
cloudfront_client = mock.MagicMock
@@ -142,8 +143,9 @@ class Test_cloudfront_distributions_logging_enabled:
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"CloudFront Distribution {DISTRIBUTION_ID} has logging enabled"
== f"CloudFront Distribution {DISTRIBUTION_ID} has logging enabled."
)
assert result[0].resource_tags == []
def test_one_distribution_logging_enabled_realtime_enabled(self):
cloudfront_client = mock.MagicMock
@@ -181,5 +183,6 @@ class Test_cloudfront_distributions_logging_enabled:
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"CloudFront Distribution {DISTRIBUTION_ID} has logging enabled"
== f"CloudFront Distribution {DISTRIBUTION_ID} has logging enabled."
)
assert result[0].resource_tags == []

View File

@@ -95,8 +95,9 @@ class Test_cloudfront_distributions_using_deprecated_ssl_protocols:
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"CloudFront Distribution {DISTRIBUTION_ID} is using a deprecated SSL protocol"
== f"CloudFront Distribution {DISTRIBUTION_ID} is using a deprecated SSL protocol."
)
assert result[0].resource_tags == []
def test_one_distribution_using_SSL_and_TLS(self):
cloudfront_client = mock.MagicMock
@@ -165,8 +166,9 @@ class Test_cloudfront_distributions_using_deprecated_ssl_protocols:
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"CloudFront Distribution {DISTRIBUTION_ID} is using a deprecated SSL protocol"
== f"CloudFront Distribution {DISTRIBUTION_ID} is using a deprecated SSL protocol."
)
assert result[0].resource_tags == []
def test_one_distribution_using_SSL_and_bad_TLS(self):
cloudfront_client = mock.MagicMock
@@ -235,8 +237,9 @@ class Test_cloudfront_distributions_using_deprecated_ssl_protocols:
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"CloudFront Distribution {DISTRIBUTION_ID} is using a deprecated SSL protocol"
== f"CloudFront Distribution {DISTRIBUTION_ID} is using a deprecated SSL protocol."
)
assert result[0].resource_tags == []
def test_one_distribution_not_using_deprecated_ssl_protocols(self):
cloudfront_client = mock.MagicMock
@@ -302,5 +305,6 @@ class Test_cloudfront_distributions_using_deprecated_ssl_protocols:
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"CloudFront Distribution {DISTRIBUTION_ID} is not using a deprecated SSL protocol"
== f"CloudFront Distribution {DISTRIBUTION_ID} is not using a deprecated SSL protocol."
)
assert result[0].resource_tags == []

View File

@@ -61,8 +61,9 @@ class Test_cloudfront_distributions_using_waf:
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"CloudFront Distribution {DISTRIBUTION_ID} is using AWS WAF web ACL {wef_acl_id}"
== f"CloudFront Distribution {DISTRIBUTION_ID} is using AWS WAF web ACL {wef_acl_id}."
)
assert result[0].resource_tags == []
def test_one_distribution_no_waf(self):
cloudfront_client = mock.MagicMock
@@ -94,5 +95,6 @@ class Test_cloudfront_distributions_using_waf:
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"CloudFront Distribution {DISTRIBUTION_ID} is not using AWS WAF web ACL"
== f"CloudFront Distribution {DISTRIBUTION_ID} is not using AWS WAF web ACL."
)
assert result[0].resource_tags == []

View File

@@ -109,6 +109,7 @@ class Test_cloudtrail_bucket_requires_mfa_delete:
assert result[0].resource_id == trail_name_us
assert result[0].region == "us-east-1"
assert result[0].resource_arn == trail_us["TrailARN"]
assert result[0].resource_tags == []
# Create an MFA device is not supported for moto, so we mock the call:
def mock_make_api_call_getbucketversioning_mfadelete_enabled(
@@ -169,6 +170,7 @@ class Test_cloudtrail_bucket_requires_mfa_delete:
assert result[0].resource_id == trail_name_us
assert result[0].region == "us-east-1"
assert result[0].resource_arn == trail_us["TrailARN"]
assert result[0].resource_tags == []
@mock_cloudtrail
@mock_s3
@@ -215,6 +217,7 @@ class Test_cloudtrail_bucket_requires_mfa_delete:
assert result[0].resource_id == trail_name_us
assert result[0].region == "us-east-1"
assert result[0].resource_arn == trail_us["TrailARN"]
assert result[0].resource_tags == []
@mock_cloudtrail
@mock_s3
@@ -267,3 +270,4 @@ class Test_cloudtrail_bucket_requires_mfa_delete:
assert result[0].resource_id == trail_name_us
assert result[0].region == "us-east-1"
assert result[0].resource_arn == trail_us["TrailARN"]
assert result[0].resource_tags == []

View File

@@ -42,6 +42,32 @@ class Test_cloudtrail_cloudwatch_logging_enabled:
)
return audit_info
@mock_cloudtrail
@mock_s3
def test_no_trails(self):
current_audit_info = self.set_mocked_audit_info()
from prowler.providers.aws.services.cloudtrail.cloudtrail_service import (
Cloudtrail,
)
with mock.patch(
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
new=current_audit_info,
):
with mock.patch(
"prowler.providers.aws.services.cloudtrail.cloudtrail_cloudwatch_logging_enabled.cloudtrail_cloudwatch_logging_enabled.cloudtrail_client",
new=Cloudtrail(current_audit_info),
):
# Test Check
from prowler.providers.aws.services.cloudtrail.cloudtrail_cloudwatch_logging_enabled.cloudtrail_cloudwatch_logging_enabled import (
cloudtrail_cloudwatch_logging_enabled,
)
check = cloudtrail_cloudwatch_logging_enabled()
result = check.execute()
assert len(result) == 0
@mock_cloudtrail
@mock_s3
def test_trails_sending_logs_during_and_not_last_day(self):
@@ -107,16 +133,20 @@ class Test_cloudtrail_cloudwatch_logging_enabled:
assert report.status == "PASS"
assert search(
report.status_extended,
f"Single region trail {trail_name_us} has been logging the last 24h",
f"Single region trail {trail_name_us} has been logging the last 24h.",
)
assert report.resource_tags == []
assert report.region == "us-east-1"
if report.resource_id == trail_name_eu:
assert report.resource_id == trail_name_eu
assert report.resource_arn == trail_eu["TrailARN"]
assert report.status == "FAIL"
assert search(
report.status_extended,
f"Single region trail {trail_name_eu} is not logging in the last 24h",
f"Single region trail {trail_name_eu} is not logging in the last 24h.",
)
assert report.resource_tags == []
assert report.region == "eu-west-1"
@mock_cloudtrail
@mock_s3
@@ -183,8 +213,9 @@ class Test_cloudtrail_cloudwatch_logging_enabled:
assert report.status == "PASS"
assert search(
report.status_extended,
f"Multiregion trail {trail_name_us} has been logging the last 24h",
f"Multiregion trail {trail_name_us} has been logging the last 24h.",
)
assert report.resource_tags == []
if (
report.resource_id == trail_name_eu
and report.region == "eu-west-1"
@@ -194,8 +225,9 @@ class Test_cloudtrail_cloudwatch_logging_enabled:
assert report.status == "FAIL"
assert search(
report.status_extended,
f"Single region trail {trail_name_eu} is not logging in the last 24h",
f"Single region trail {trail_name_eu} is not logging in the last 24h.",
)
assert report.resource_tags == []
@mock_cloudtrail
@mock_s3
@@ -260,13 +292,15 @@ class Test_cloudtrail_cloudwatch_logging_enabled:
assert report.status == "PASS"
assert (
report.status_extended
== f"Single region trail {trail_name_us} has been logging the last 24h"
== f"Single region trail {trail_name_us} has been logging the last 24h."
)
assert report.resource_tags == []
if report.resource_id == trail_name_eu:
assert report.resource_id == trail_name_eu
assert report.resource_arn == trail_eu["TrailARN"]
assert report.status == "FAIL"
assert (
report.status_extended
== f"Single region trail {trail_name_eu} is not logging in the last 24h or not configured to deliver logs"
== f"Single region trail {trail_name_eu} is not logging in the last 24h or not configured to deliver logs."
)
assert report.resource_tags == []

View File

@@ -102,6 +102,7 @@ class Test_cloudtrail_insights_exist:
assert result[0].resource_id == trail_name_us
assert result[0].region == "us-east-1"
assert result[0].resource_arn == trail_us["TrailARN"]
assert result[0].resource_tags == []
@mock_cloudtrail
@mock_s3
@@ -147,3 +148,4 @@ class Test_cloudtrail_insights_exist:
assert result[0].resource_id == trail_name_us
assert result[0].region == "us-east-1"
assert result[0].resource_arn == trail_us["TrailARN"]
assert result[0].resource_tags == []

View File

@@ -41,6 +41,30 @@ class Test_cloudtrail_kms_encryption_enabled:
)
return audit_info
@mock_cloudtrail
@mock_s3
def test_no_trails(self):
from prowler.providers.aws.services.cloudtrail.cloudtrail_service import (
Cloudtrail,
)
with mock.patch(
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
new=self.set_mocked_audit_info(),
), mock.patch(
"prowler.providers.aws.services.cloudtrail.cloudtrail_kms_encryption_enabled.cloudtrail_kms_encryption_enabled.cloudtrail_client",
new=Cloudtrail(self.set_mocked_audit_info()),
):
# Test Check
from prowler.providers.aws.services.cloudtrail.cloudtrail_kms_encryption_enabled.cloudtrail_kms_encryption_enabled import (
cloudtrail_kms_encryption_enabled,
)
check = cloudtrail_kms_encryption_enabled()
result = check.execute()
assert len(result) == 0
@mock_cloudtrail
@mock_s3
def test_trail_no_kms(self):
@@ -80,6 +104,8 @@ class Test_cloudtrail_kms_encryption_enabled:
)
assert result[0].resource_id == trail_name_us
assert result[0].resource_arn == trail_us["TrailARN"]
assert result[0].resource_tags == []
assert result[0].region == "us-east-1"
@mock_cloudtrail
@mock_s3
@@ -126,3 +152,5 @@ class Test_cloudtrail_kms_encryption_enabled:
)
assert result[0].resource_id == trail_name_us
assert result[0].resource_arn == trail_us["TrailARN"]
assert result[0].resource_tags == []
assert result[0].region == "us-east-1"

View File

@@ -41,6 +41,30 @@ class Test_cloudtrail_log_file_validation_enabled:
)
return audit_info
@mock_cloudtrail
@mock_s3
def test_no_trails(self):
from prowler.providers.aws.services.cloudtrail.cloudtrail_service import (
Cloudtrail,
)
with mock.patch(
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
new=self.set_mocked_audit_info(),
), mock.patch(
"prowler.providers.aws.services.cloudtrail.cloudtrail_log_file_validation_enabled.cloudtrail_log_file_validation_enabled.cloudtrail_client",
new=Cloudtrail(self.set_mocked_audit_info()),
):
# Test Check
from prowler.providers.aws.services.cloudtrail.cloudtrail_log_file_validation_enabled.cloudtrail_log_file_validation_enabled import (
cloudtrail_log_file_validation_enabled,
)
check = cloudtrail_log_file_validation_enabled()
result = check.execute()
assert len(result) == 0
@mock_cloudtrail
@mock_s3
def test_no_logging_validation(self):
@@ -76,6 +100,8 @@ class Test_cloudtrail_log_file_validation_enabled:
assert search("log file validation disabled", result[0].status_extended)
assert result[0].resource_id == trail_name_us
assert result[0].resource_arn == trail_us["TrailARN"]
assert result[0].resource_tags == []
assert result[0].region == "us-east-1"
@mock_cloudtrail
@mock_s3
@@ -132,6 +158,8 @@ class Test_cloudtrail_log_file_validation_enabled:
assert search("log file validation enabled", report.status_extended)
assert report.resource_id == trail_name_us
assert report.resource_arn == trail_us["TrailARN"]
assert report.resource_tags == []
assert report.region == "us-east-1"
elif report.resource_id == trail_name_eu:
assert report.status == "FAIL"
assert search(
@@ -139,3 +167,5 @@ class Test_cloudtrail_log_file_validation_enabled:
)
assert report.resource_id == trail_name_eu
assert report.resource_arn == trail_eu["TrailARN"]
assert report.resource_tags == []
assert report.region == "eu-west-1"

View File

@@ -41,6 +41,34 @@ class Test_cloudtrail_logs_s3_bucket_access_logging_enabled:
)
return audit_info
@mock_cloudtrail
@mock_s3
def test_no_trails(self):
from prowler.providers.aws.services.cloudtrail.cloudtrail_service import (
Cloudtrail,
)
from prowler.providers.aws.services.s3.s3_service import S3
with mock.patch(
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
new=self.set_mocked_audit_info(),
), mock.patch(
"prowler.providers.aws.services.cloudtrail.cloudtrail_logs_s3_bucket_access_logging_enabled.cloudtrail_logs_s3_bucket_access_logging_enabled.cloudtrail_client",
new=Cloudtrail(self.set_mocked_audit_info()),
), mock.patch(
"prowler.providers.aws.services.cloudtrail.cloudtrail_logs_s3_bucket_access_logging_enabled.cloudtrail_logs_s3_bucket_access_logging_enabled.s3_client",
new=S3(self.set_mocked_audit_info()),
):
# Test Check
from prowler.providers.aws.services.cloudtrail.cloudtrail_logs_s3_bucket_access_logging_enabled.cloudtrail_logs_s3_bucket_access_logging_enabled import (
cloudtrail_logs_s3_bucket_access_logging_enabled,
)
check = cloudtrail_logs_s3_bucket_access_logging_enabled()
result = check.execute()
assert len(result) == 0
@mock_cloudtrail
@mock_s3
def test_bucket_not_logging(self):
@@ -84,6 +112,8 @@ class Test_cloudtrail_logs_s3_bucket_access_logging_enabled:
)
assert result[0].resource_id == trail_name_us
assert result[0].resource_arn == trail_us["TrailARN"]
assert result[0].resource_tags == []
assert result[0].region == "us-east-1"
@mock_cloudtrail
@mock_s3
@@ -148,6 +178,8 @@ class Test_cloudtrail_logs_s3_bucket_access_logging_enabled:
)
assert result[0].resource_id == trail_name_us
assert result[0].resource_arn == trail_us["TrailARN"]
assert result[0].resource_tags == []
assert result[0].region == "us-east-1"
@mock_cloudtrail
@mock_s3
@@ -195,3 +227,5 @@ class Test_cloudtrail_logs_s3_bucket_access_logging_enabled:
)
assert result[0].resource_id == trail_name_us
assert result[0].resource_arn == trail_us["TrailARN"]
assert result[0].resource_tags == []
assert result[0].region == "us-east-1"

View File

@@ -41,6 +41,34 @@ class Test_cloudtrail_logs_s3_bucket_is_not_publicly_accessible:
)
return audit_info
@mock_cloudtrail
@mock_s3
def test_not_trails(self):
from prowler.providers.aws.services.cloudtrail.cloudtrail_service import (
Cloudtrail,
)
from prowler.providers.aws.services.s3.s3_service import S3
with mock.patch(
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
new=self.set_mocked_audit_info(),
), mock.patch(
"prowler.providers.aws.services.cloudtrail.cloudtrail_logs_s3_bucket_is_not_publicly_accessible.cloudtrail_logs_s3_bucket_is_not_publicly_accessible.cloudtrail_client",
new=Cloudtrail(self.set_mocked_audit_info()),
), mock.patch(
"prowler.providers.aws.services.cloudtrail.cloudtrail_logs_s3_bucket_is_not_publicly_accessible.cloudtrail_logs_s3_bucket_is_not_publicly_accessible.s3_client",
new=S3(self.set_mocked_audit_info()),
):
# Test Check
from prowler.providers.aws.services.cloudtrail.cloudtrail_logs_s3_bucket_is_not_publicly_accessible.cloudtrail_logs_s3_bucket_is_not_publicly_accessible import (
cloudtrail_logs_s3_bucket_is_not_publicly_accessible,
)
check = cloudtrail_logs_s3_bucket_is_not_publicly_accessible()
result = check.execute()
assert len(result) == 0
@mock_cloudtrail
@mock_s3
def test_trail_bucket_no_acl(self):
@@ -83,8 +111,10 @@ class Test_cloudtrail_logs_s3_bucket_is_not_publicly_accessible:
assert result[0].resource_arn == trail_us["TrailARN"]
assert search(
result[0].status_extended,
f"S3 Bucket {bucket_name_us} from single region trail {trail_name_us} is not publicly accessible",
f"S3 Bucket {bucket_name_us} from single region trail {trail_name_us} is not publicly accessible.",
)
assert result[0].resource_tags == []
assert result[0].region == "us-east-1"
@mock_cloudtrail
@mock_s3
@@ -146,8 +176,10 @@ class Test_cloudtrail_logs_s3_bucket_is_not_publicly_accessible:
assert result[0].resource_arn == trail_us["TrailARN"]
assert search(
result[0].status_extended,
f"S3 Bucket {bucket_name_us} from single region trail {trail_name_us} is publicly accessible",
f"S3 Bucket {bucket_name_us} from single region trail {trail_name_us} is publicly accessible.",
)
assert result[0].resource_tags == []
assert result[0].region == "us-east-1"
@mock_cloudtrail
@mock_s3
@@ -208,8 +240,10 @@ class Test_cloudtrail_logs_s3_bucket_is_not_publicly_accessible:
assert result[0].resource_arn == trail_us["TrailARN"]
assert search(
result[0].status_extended,
f"S3 Bucket {bucket_name_us} from single region trail {trail_name_us} is not publicly accessible",
f"S3 Bucket {bucket_name_us} from single region trail {trail_name_us} is not publicly accessible.",
)
assert result[0].resource_tags == []
assert result[0].region == "us-east-1"
@mock_cloudtrail
@mock_s3
@@ -254,6 +288,8 @@ class Test_cloudtrail_logs_s3_bucket_is_not_publicly_accessible:
assert result[0].resource_id == trail_name_us
assert result[0].resource_arn == trail_us["TrailARN"]
assert search(
"is a cross-account bucket in another account out of Prowler's permissions scope",
"is a cross-account bucket in another account out of Prowler's permissions scope.",
result[0].status_extended,
)
assert result[0].resource_tags == []
assert result[0].region == "us-east-1"

View File

@@ -9,6 +9,8 @@ from prowler.providers.aws.services.cloudtrail.cloudtrail_service import Trail
from prowler.providers.common.models import Audit_Metadata
AWS_ACCOUNT_NUMBER = "123456789012"
AWS_REGION_US_EAST_1 = "us-east-1"
AWS_REGION_EU_WEST_1 = "eu-west-1"
class Test_cloudtrail_multi_region_enabled:
@@ -29,7 +31,7 @@ class Test_cloudtrail_multi_region_enabled:
profile_region=None,
credentials=None,
assumed_role_info=None,
audited_regions=["us-east-1", "eu-west-1"],
audited_regions=[AWS_REGION_US_EAST_1, AWS_REGION_EU_WEST_1],
organizations_metadata=None,
audit_resources=None,
mfa_enabled=False,
@@ -67,23 +69,42 @@ class Test_cloudtrail_multi_region_enabled:
result = check.execute()
assert len(result) == len(current_audit_info.audited_regions)
for report in result:
assert report.status == "FAIL"
assert search(
"No CloudTrail trails enabled and logging were found",
report.status_extended,
)
assert report.resource_id == AWS_ACCOUNT_NUMBER
assert (
report.resource_arn == f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:root"
)
if report.region == AWS_REGION_US_EAST_1:
assert report.status == "FAIL"
assert (
report.status_extended
== "No CloudTrail trails enabled and logging were found."
)
assert report.resource_id == AWS_ACCOUNT_NUMBER
assert (
report.resource_arn
== f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:root"
)
assert report.resource_tags == []
elif report.region == AWS_REGION_EU_WEST_1:
assert report.status == "FAIL"
assert (
report.status_extended
== "No CloudTrail trails enabled and logging were found."
)
assert report.resource_id == AWS_ACCOUNT_NUMBER
assert (
report.resource_arn
== f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:root"
)
assert report.resource_tags == []
@mock_cloudtrail
@mock_s3
def test_various_trails_no_login(self):
cloudtrail_client_us_east_1 = client("cloudtrail", region_name="us-east-1")
s3_client_us_east_1 = client("s3", region_name="us-east-1")
cloudtrail_client_eu_west_1 = client("cloudtrail", region_name="eu-west-1")
s3_client_eu_west_1 = client("s3", region_name="eu-west-1")
cloudtrail_client_us_east_1 = client(
"cloudtrail", region_name=AWS_REGION_US_EAST_1
)
s3_client_us_east_1 = client("s3", region_name=AWS_REGION_US_EAST_1)
cloudtrail_client_eu_west_1 = client(
"cloudtrail", region_name=AWS_REGION_EU_WEST_1
)
s3_client_eu_west_1 = client("s3", region_name=AWS_REGION_EU_WEST_1)
trail_name_us = "trail_test_us"
bucket_name_us = "bucket_test_us"
trail_name_eu = "trail_test_eu"
@@ -91,7 +112,7 @@ class Test_cloudtrail_multi_region_enabled:
s3_client_us_east_1.create_bucket(Bucket=bucket_name_us)
s3_client_eu_west_1.create_bucket(
Bucket=bucket_name_eu,
CreateBucketConfiguration={"LocationConstraint": "eu-west-1"},
CreateBucketConfiguration={"LocationConstraint": AWS_REGION_EU_WEST_1},
)
_ = cloudtrail_client_us_east_1.create_trail(
Name=trail_name_us, S3BucketName=bucket_name_us, IsMultiRegionTrail=False
@@ -123,23 +144,42 @@ class Test_cloudtrail_multi_region_enabled:
result = check.execute()
assert len(result) == len(current_audit_info.audited_regions)
for report in result:
assert report.status == "FAIL"
assert search(
"No CloudTrail trails enabled and logging were found",
report.status_extended,
)
assert report.resource_id == AWS_ACCOUNT_NUMBER
assert (
report.resource_arn == f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:root"
)
if report.region == AWS_REGION_US_EAST_1:
assert report.status == "FAIL"
assert (
report.status_extended
== "No CloudTrail trails enabled and logging were found."
)
assert report.resource_id == AWS_ACCOUNT_NUMBER
assert (
report.resource_arn
== f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:root"
)
assert report.resource_tags == []
elif report.region == AWS_REGION_EU_WEST_1:
assert report.status == "FAIL"
assert (
report.status_extended
== "No CloudTrail trails enabled and logging were found."
)
assert report.resource_id == AWS_ACCOUNT_NUMBER
assert (
report.resource_arn
== f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:root"
)
assert report.resource_tags == []
@mock_cloudtrail
@mock_s3
def test_various_trails_with_and_without_login(self):
cloudtrail_client_us_east_1 = client("cloudtrail", region_name="us-east-1")
s3_client_us_east_1 = client("s3", region_name="us-east-1")
cloudtrail_client_eu_west_1 = client("cloudtrail", region_name="eu-west-1")
s3_client_eu_west_1 = client("s3", region_name="eu-west-1")
cloudtrail_client_us_east_1 = client(
"cloudtrail", region_name=AWS_REGION_US_EAST_1
)
s3_client_us_east_1 = client("s3", region_name=AWS_REGION_US_EAST_1)
cloudtrail_client_eu_west_1 = client(
"cloudtrail", region_name=AWS_REGION_EU_WEST_1
)
s3_client_eu_west_1 = client("s3", region_name=AWS_REGION_EU_WEST_1)
trail_name_us = "trail_test_us"
bucket_name_us = "bucket_test_us"
trail_name_eu = "trail_test_eu"
@@ -147,7 +187,7 @@ class Test_cloudtrail_multi_region_enabled:
s3_client_us_east_1.create_bucket(Bucket=bucket_name_us)
s3_client_eu_west_1.create_bucket(
Bucket=bucket_name_eu,
CreateBucketConfiguration={"LocationConstraint": "eu-west-1"},
CreateBucketConfiguration={"LocationConstraint": AWS_REGION_EU_WEST_1},
)
trail_us = cloudtrail_client_us_east_1.create_trail(
Name=trail_name_us, S3BucketName=bucket_name_us, IsMultiRegionTrail=False
@@ -189,10 +229,12 @@ class Test_cloudtrail_multi_region_enabled:
)
assert report.resource_id == trail_name_us
assert report.resource_arn == trail_us["TrailARN"]
assert report.resource_tags == []
assert report.region == AWS_REGION_US_EAST_1
else:
assert report.status == "FAIL"
assert search(
"No CloudTrail trails enabled and logging were found",
"No CloudTrail trails enabled and logging were found.",
report.status_extended,
)
assert report.resource_id == AWS_ACCOUNT_NUMBER
@@ -200,14 +242,20 @@ class Test_cloudtrail_multi_region_enabled:
report.resource_arn
== f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:root"
)
assert report.resource_tags == []
assert report.region == AWS_REGION_EU_WEST_1
@mock_cloudtrail
@mock_s3
def test_trail_multiregion_logging_and_single_region_not_login(self):
cloudtrail_client_us_east_1 = client("cloudtrail", region_name="us-east-1")
s3_client_us_east_1 = client("s3", region_name="us-east-1")
cloudtrail_client_eu_west_1 = client("cloudtrail", region_name="eu-west-1")
s3_client_eu_west_1 = client("s3", region_name="eu-west-1")
cloudtrail_client_us_east_1 = client(
"cloudtrail", region_name=AWS_REGION_US_EAST_1
)
s3_client_us_east_1 = client("s3", region_name=AWS_REGION_US_EAST_1)
cloudtrail_client_eu_west_1 = client(
"cloudtrail", region_name=AWS_REGION_EU_WEST_1
)
s3_client_eu_west_1 = client("s3", region_name=AWS_REGION_EU_WEST_1)
trail_name_us = "trail_test_us"
bucket_name_us = "bucket_test_us"
trail_name_eu = "aaaaa"
@@ -215,7 +263,7 @@ class Test_cloudtrail_multi_region_enabled:
s3_client_us_east_1.create_bucket(Bucket=bucket_name_us)
s3_client_eu_west_1.create_bucket(
Bucket=bucket_name_eu,
CreateBucketConfiguration={"LocationConstraint": "eu-west-1"},
CreateBucketConfiguration={"LocationConstraint": AWS_REGION_EU_WEST_1},
)
trail_us = cloudtrail_client_us_east_1.create_trail(
Name=trail_name_us, S3BucketName=bucket_name_us, IsMultiRegionTrail=True
@@ -251,25 +299,25 @@ class Test_cloudtrail_multi_region_enabled:
Trail(
name=trail_name_us,
is_multiregion=True,
home_region="us-east-1",
home_region=AWS_REGION_US_EAST_1,
arn=trail_us["TrailARN"],
region="us-east-1",
region=AWS_REGION_US_EAST_1,
is_logging=True,
),
Trail(
name=trail_name_eu,
is_multiregion=False,
home_region="eu-west-1",
home_region=AWS_REGION_EU_WEST_1,
arn="",
region="eu-west-1",
region=AWS_REGION_EU_WEST_1,
is_logging=False,
),
Trail(
name=trail_name_us,
is_multiregion=True,
home_region="us-east-1",
home_region=AWS_REGION_US_EAST_1,
arn=trail_us["TrailARN"],
region="eu-west-1",
region=AWS_REGION_EU_WEST_1,
is_logging=True,
),
]
@@ -279,19 +327,21 @@ class Test_cloudtrail_multi_region_enabled:
result = check.execute()
assert len(result) == len(current_audit_info.audited_regions)
for report in result:
if report.region == "us-east-1":
if report.region == AWS_REGION_US_EAST_1:
assert report.status == "PASS"
assert search(
f"Trail {trail_name_us} is multiregion and it is logging",
f"Trail {trail_name_us} is multiregion and it is logging.",
report.status_extended,
)
assert report.resource_id == trail_name_us
assert report.resource_arn == trail_us["TrailARN"]
elif report.region == "eu-west-1":
assert report.resource_tags == []
elif report.region == AWS_REGION_EU_WEST_1:
assert report.status == "PASS"
assert search(
f"Trail {trail_name_us} is multiregion and it is logging",
f"Trail {trail_name_us} is multiregion and it is logging.",
report.status_extended,
)
assert report.resource_id == trail_name_us
assert report.resource_arn == trail_us["TrailARN"]
assert report.resource_tags == []

View File

@@ -8,6 +8,7 @@ from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info
from prowler.providers.common.models import Audit_Metadata
AWS_ACCOUNT_NUMBER = "123456789012"
AWS_REGION = "us-east-1"
class Test_cloudtrail_s3_dataevents_read_enabled:
@@ -28,7 +29,7 @@ class Test_cloudtrail_s3_dataevents_read_enabled:
profile_region=None,
credentials=None,
assumed_role_info=None,
audited_regions=["us-east-1"],
audited_regions=[AWS_REGION],
organizations_metadata=None,
audit_resources=None,
mfa_enabled=False,
@@ -44,8 +45,8 @@ class Test_cloudtrail_s3_dataevents_read_enabled:
@mock_cloudtrail
@mock_s3
def test_trail_without_data_events(self):
cloudtrail_client_us_east_1 = client("cloudtrail", region_name="us-east-1")
s3_client_us_east_1 = client("s3", region_name="us-east-1")
cloudtrail_client_us_east_1 = client("cloudtrail", region_name=AWS_REGION)
s3_client_us_east_1 = client("s3", region_name=AWS_REGION)
trail_name_us = "trail_test_us"
bucket_name_us = "bucket_test_us"
s3_client_us_east_1.create_bucket(Bucket=bucket_name_us)
@@ -85,12 +86,14 @@ class Test_cloudtrail_s3_dataevents_read_enabled:
assert (
result[0].resource_arn == f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:root"
)
assert result[0].resource_tags == []
assert result[0].region == AWS_REGION
@mock_cloudtrail
@mock_s3
def test_trail_without_s3_data_events(self):
cloudtrail_client_us_east_1 = client("cloudtrail", region_name="us-east-1")
s3_client_us_east_1 = client("s3", region_name="us-east-1")
cloudtrail_client_us_east_1 = client("cloudtrail", region_name=AWS_REGION)
s3_client_us_east_1 = client("s3", region_name=AWS_REGION)
trail_name_us = "trail_test_us"
bucket_name_us = "bucket_test_us"
s3_client_us_east_1.create_bucket(Bucket=bucket_name_us)
@@ -142,12 +145,14 @@ class Test_cloudtrail_s3_dataevents_read_enabled:
assert (
result[0].resource_arn == f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:root"
)
assert result[0].resource_tags == []
assert result[0].region == AWS_REGION
@mock_cloudtrail
@mock_s3
def test_trail_with_s3_classic_data_events(self):
cloudtrail_client_us_east_1 = client("cloudtrail", region_name="us-east-1")
s3_client_us_east_1 = client("s3", region_name="us-east-1")
cloudtrail_client_us_east_1 = client("cloudtrail", region_name=AWS_REGION)
s3_client_us_east_1 = client("s3", region_name=AWS_REGION)
trail_name_us = "trail_test_us"
bucket_name_us = "bucket_test_us"
s3_client_us_east_1.create_bucket(Bucket=bucket_name_us)
@@ -197,12 +202,14 @@ class Test_cloudtrail_s3_dataevents_read_enabled:
)
assert result[0].resource_id == trail_name_us
assert result[0].resource_arn == trail_us["TrailARN"]
assert result[0].resource_tags == []
assert result[0].region == AWS_REGION
@mock_cloudtrail
@mock_s3
def test_trail_with_s3_advanced_data_events(self):
cloudtrail_client_us_east_1 = client("cloudtrail", region_name="us-east-1")
s3_client_us_east_1 = client("s3", region_name="us-east-1")
cloudtrail_client_us_east_1 = client("cloudtrail", region_name=AWS_REGION)
s3_client_us_east_1 = client("s3", region_name=AWS_REGION)
trail_name_us = "trail_test_us"
bucket_name_us = "bucket_test_us"
s3_client_us_east_1.create_bucket(Bucket=bucket_name_us)
@@ -252,3 +259,5 @@ class Test_cloudtrail_s3_dataevents_read_enabled:
)
assert result[0].resource_id == trail_name_us
assert result[0].resource_arn == trail_us["TrailARN"]
assert result[0].resource_tags == []
assert result[0].region == AWS_REGION

View File

@@ -8,6 +8,7 @@ from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info
from prowler.providers.common.models import Audit_Metadata
AWS_ACCOUNT_NUMBER = "123456789012"
AWS_REGION = "us-east-1"
class Test_cloudtrail_s3_dataevents_write_enabled:
@@ -28,7 +29,7 @@ class Test_cloudtrail_s3_dataevents_write_enabled:
profile_region=None,
credentials=None,
assumed_role_info=None,
audited_regions=["us-east-1"],
audited_regions=[AWS_REGION],
organizations_metadata=None,
audit_resources=None,
mfa_enabled=False,
@@ -44,8 +45,8 @@ class Test_cloudtrail_s3_dataevents_write_enabled:
@mock_cloudtrail
@mock_s3
def test_trail_without_data_events(self):
cloudtrail_client_us_east_1 = client("cloudtrail", region_name="us-east-1")
s3_client_us_east_1 = client("s3", region_name="us-east-1")
cloudtrail_client_us_east_1 = client("cloudtrail", region_name=AWS_REGION)
s3_client_us_east_1 = client("s3", region_name=AWS_REGION)
trail_name_us = "trail_test_us"
bucket_name_us = "bucket_test_us"
s3_client_us_east_1.create_bucket(Bucket=bucket_name_us)
@@ -85,12 +86,14 @@ class Test_cloudtrail_s3_dataevents_write_enabled:
assert (
result[0].resource_arn == f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:root"
)
assert result[0].resource_tags == []
assert result[0].region == AWS_REGION
@mock_cloudtrail
@mock_s3
def test_trail_without_s3_data_events(self):
cloudtrail_client_us_east_1 = client("cloudtrail", region_name="us-east-1")
s3_client_us_east_1 = client("s3", region_name="us-east-1")
cloudtrail_client_us_east_1 = client("cloudtrail", region_name=AWS_REGION)
s3_client_us_east_1 = client("s3", region_name=AWS_REGION)
trail_name_us = "trail_test_us"
bucket_name_us = "bucket_test_us"
s3_client_us_east_1.create_bucket(Bucket=bucket_name_us)
@@ -141,12 +144,14 @@ class Test_cloudtrail_s3_dataevents_write_enabled:
assert (
result[0].resource_arn == f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:root"
)
assert result[0].resource_tags == []
assert result[0].region == AWS_REGION
@mock_cloudtrail
@mock_s3
def test_trail_with_s3_data_events(self):
cloudtrail_client_us_east_1 = client("cloudtrail", region_name="us-east-1")
s3_client_us_east_1 = client("s3", region_name="us-east-1")
cloudtrail_client_us_east_1 = client("cloudtrail", region_name=AWS_REGION)
s3_client_us_east_1 = client("s3", region_name=AWS_REGION)
trail_name_us = "trail_test_us"
bucket_name_us = "bucket_test_us"
s3_client_us_east_1.create_bucket(Bucket=bucket_name_us)
@@ -196,12 +201,14 @@ class Test_cloudtrail_s3_dataevents_write_enabled:
)
assert result[0].resource_id == trail_name_us
assert result[0].resource_arn == trail_us["TrailARN"]
assert result[0].resource_tags == []
assert result[0].region == AWS_REGION
@mock_cloudtrail
@mock_s3
def test_trail_with_s3_advanced_data_events(self):
cloudtrail_client_us_east_1 = client("cloudtrail", region_name="us-east-1")
s3_client_us_east_1 = client("s3", region_name="us-east-1")
cloudtrail_client_us_east_1 = client("cloudtrail", region_name=AWS_REGION)
s3_client_us_east_1 = client("s3", region_name=AWS_REGION)
trail_name_us = "trail_test_us"
bucket_name_us = "bucket_test_us"
s3_client_us_east_1.create_bucket(Bucket=bucket_name_us)
@@ -250,3 +257,5 @@ class Test_cloudtrail_s3_dataevents_write_enabled:
)
assert result[0].resource_id == trail_name_us
assert result[0].resource_arn == trail_us["TrailARN"]
assert result[0].resource_tags == []
assert result[0].region == AWS_REGION

View File

@@ -77,7 +77,7 @@ class Test_cloudwatch_cross_account_sharing_disabled:
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "CloudWatch doesn't allow cross-account sharing"
== "CloudWatch doesn't allow cross-account sharing."
)
assert result[0].resource_id == AWS_ACCOUNT_NUMBER