fix: Linter issues (#1471)

Co-authored-by: Sergio Garcia <38561120+sergargar@users.noreply.github.com>
This commit is contained in:
Pepe Fagoaga
2022-11-14 16:21:51 +01:00
committed by GitHub
parent 3b86b3ac77
commit 9d3bff9e54
89 changed files with 1439 additions and 986 deletions

View File

@@ -136,7 +136,7 @@ def provider_set_session(
current_audit_info.organizations_metadata = get_organizations_metadata(
current_audit_info.audited_account, assumed_credentials
)
logger.info(f"Organizations metadata retrieved")
logger.info("Organizations metadata retrieved")
logger.info("Checking if role assumption is needed ...")
if input_role:
@@ -194,11 +194,11 @@ def print_audit_credentials(audit_info: AWS_Audit_Info):
# Beautify audited regions, set "all" if there is no filter region
regions = (
", ".join(audit_info.audited_regions)
if audit_info.audited_regions != None
if audit_info.audited_regions is not None
else "all"
)
# Beautify audited profile, set "default" if there is no profile set
profile = audit_info.profile if audit_info.profile != None else "default"
profile = audit_info.profile if audit_info.profile is not None else "default"
report = f"""
This report is being generated using credentials below:
@@ -208,7 +208,7 @@ AWS Account: {Fore.YELLOW}[{audit_info.audited_account}]{Style.RESET_ALL} UserId
Caller Identity ARN: {Fore.YELLOW}[{audit_info.audited_identity_arn}]{Style.RESET_ALL}
"""
# If -A is set, print Assumed Role ARN
if audit_info.assumed_role_info.role_arn != None:
if audit_info.assumed_role_info.role_arn is not None:
report += f"Assumed Role ARN: {Fore.YELLOW}[{audit_info.assumed_role_info.role_arn}]{Style.RESET_ALL}"
print(report)

View File

@@ -91,19 +91,19 @@ class Test_AWS_Provider:
# Recover credentials for the assume role operation
credentials = assume_role_response["Credentials"]
# Test the response
## SessionToken
# SessionToken
credentials["SessionToken"].should.have.length_of(356)
credentials["SessionToken"].startswith("FQoGZXIvYXdzE")
## AccessKeyId
# AccessKeyId
credentials["AccessKeyId"].should.have.length_of(20)
credentials["AccessKeyId"].startswith("ASIA")
## SecretAccessKey
# SecretAccessKey
credentials["SecretAccessKey"].should.have.length_of(40)
## Assumed Role
# Assumed Role
assume_role_response["AssumedRoleUser"]["Arn"].should.equal(
f"arn:aws:sts::{ACCOUNT_ID}:assumed-role/{role_name}/{sessionName}"
)
## AssumedRoleUser
# AssumedRoleUser
assert assume_role_response["AssumedRoleUser"]["AssumedRoleId"].startswith(
"AROA"
)

View File

@@ -113,30 +113,20 @@ class Test_Allowlist:
}
}
assert (
is_allowlisted(
allowlist, AWS_ACCOUNT_NUMBER, "check_test", AWS_REGION, "prowler"
)
== True
assert is_allowlisted(
allowlist, AWS_ACCOUNT_NUMBER, "check_test", AWS_REGION, "prowler"
)
assert (
is_allowlisted(
allowlist, AWS_ACCOUNT_NUMBER, "check_test", AWS_REGION, "prowler-test"
)
== True
assert is_allowlisted(
allowlist, AWS_ACCOUNT_NUMBER, "check_test", AWS_REGION, "prowler-test"
)
assert (
is_allowlisted(
allowlist, AWS_ACCOUNT_NUMBER, "check_test", AWS_REGION, "test-prowler"
)
== True
assert is_allowlisted(
allowlist, AWS_ACCOUNT_NUMBER, "check_test", AWS_REGION, "test-prowler"
)
assert (
assert not (
is_allowlisted(
allowlist, AWS_ACCOUNT_NUMBER, "check_test", "us-east-2", "test"
)
== False
)

View File

@@ -18,7 +18,7 @@ def arn_parsing(arn):
else:
arn_parsed = arnparse(arn)
# First check if region is empty (in IAM arns region is always empty)
if arn_parsed.region != None:
if arn_parsed.region is not None:
raise RoleArnParsingIAMRegionNotEmpty
else:
# check if needed fields are filled:
@@ -27,12 +27,12 @@ def arn_parsing(arn):
# - account_id
# - resource_type
# - resource
if arn_parsed.partition == None:
if arn_parsed.partition is None:
raise RoleArnParsingPartitionEmpty
elif arn_parsed.service != "iam":
raise RoleArnParsingServiceNotIAM
elif (
arn_parsed.account_id == None
arn_parsed.account_id is None
or len(arn_parsed.account_id) != 12
or not arn_parsed.account_id.isnumeric()
):

View File

@@ -94,7 +94,7 @@ class Test_accessanalyzer_enabled_without_findings:
assert result[1].status == "FAIL"
assert (
result[1].status_extended
== f"IAM Access Analyzer Test Analyzer has 10 active findings"
== "IAM Access Analyzer Test Analyzer has 10 active findings"
)
assert result[1].resource_id == "Test Analyzer"
@@ -128,7 +128,7 @@ class Test_accessanalyzer_enabled_without_findings:
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"IAM Access Analyzer Test Analyzer has no active findings"
== "IAM Access Analyzer Test Analyzer has no active findings"
)
assert result[0].resource_id == "Test Analyzer"
@@ -162,6 +162,6 @@ class Test_accessanalyzer_enabled_without_findings:
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"IAM Access Analyzer Test Analyzer is not active"
== "IAM Access Analyzer Test Analyzer is not active"
)
assert result[0].resource_id == "Test Analyzer"

View File

@@ -11,12 +11,15 @@ AWS_REGION = "eu-west-1"
# Mocking Access Analyzer Calls
make_api_call = botocore.client.BaseClient._make_api_call
# As you can see the operation_name has the list_analyzers snake_case form but
# we are using the ListAnalyzers form.
# Rationale -> https://github.com/boto/botocore/blob/develop/botocore/client.py#L810:L816
#
# We have to mock every AWS API call using Boto3
def mock_make_api_call(self, operation_name, kwarg):
"""
Mock every AWS API call using Boto3
As you can see the operation_name has the list_analyzers snake_case form but
we are using the ListAnalyzers form.
Rationale -> https://github.com/boto/botocore/blob/develop/botocore/client.py#L810:L816
"""
if operation_name == "ListAnalyzers":
return {
"analyzers": [

View File

@@ -2,7 +2,7 @@ from colorama import Fore, Style
from lib.check.models import Check
### This check has no findings since it is manual
# This check has no findings since it is manual
class account_maintain_current_contact_details(Check):

View File

@@ -2,7 +2,7 @@ from colorama import Fore, Style
from lib.check.models import Check
### This check has no findings since it is manual
# This check has no findings since it is manual
class account_security_contact_information_is_registered(Check):

View File

@@ -2,7 +2,7 @@ from colorama import Fore, Style
from lib.check.models import Check
### This check has no findings since it is manual
# This check has no findings since it is manual
class account_security_questions_are_registered_in_the_aws_account(Check):

View File

@@ -44,8 +44,8 @@ class Test_ACM_Service:
# ACM client for this test class
audit_info = self.set_mocked_audit_info()
acm = ACM(audit_info)
for client in acm.regional_clients.values():
assert client.__class__.__name__ == "ACM"
for regional_client in acm.regional_clients.values():
assert regional_client.__class__.__name__ == "ACM"
# Test ACM Session
@mock_acm

View File

@@ -44,8 +44,8 @@ class Test_APIGateway_Service:
# APIGateway client for this test class
audit_info = self.set_mocked_audit_info()
apigateway = APIGateway(audit_info)
for client in apigateway.regional_clients.values():
assert client.__class__.__name__ == "APIGateway"
for regional_client in apigateway.regional_clients.values():
assert regional_client.__class__.__name__ == "APIGateway"
# Test APIGateway Session
@mock_apigateway
@@ -97,7 +97,7 @@ class Test_APIGateway_Service:
# APIGateway client for this test class
audit_info = self.set_mocked_audit_info()
apigateway = APIGateway(audit_info)
assert apigateway.rest_apis[0].authorizer == True
assert apigateway.rest_apis[0].authorizer is True
# Test APIGateway Get Rest API
@mock_apigateway
@@ -111,7 +111,7 @@ class Test_APIGateway_Service:
# APIGateway client for this test class
audit_info = self.set_mocked_audit_info()
apigateway = APIGateway(audit_info)
assert apigateway.rest_apis[0].public_endpoint == False
assert apigateway.rest_apis[0].public_endpoint is False
# Test APIGateway Get Stages
@mock_apigateway
@@ -162,4 +162,4 @@ class Test_APIGateway_Service:
)
audit_info = self.set_mocked_audit_info()
apigateway = APIGateway(audit_info)
assert apigateway.rest_apis[0].stages[0].logging == True
assert apigateway.rest_apis[0].stages[0].logging is True

View File

@@ -9,10 +9,14 @@ AWS_REGION = "us-east-1"
# Mocking ApiGatewayV2 Calls
make_api_call = botocore.client.BaseClient._make_api_call
# Rationale -> https://github.com/boto/botocore/blob/develop/botocore/client.py#L810:L816
#
# We have to mock every AWS API call using Boto3
def mock_make_api_call(self, operation_name, kwarg):
"""
We have to mock every AWS API call using Boto3
Rationale -> https://github.com/boto/botocore/blob/develop/botocore/client.py#L810:L816
"""
if operation_name == "GetAuthorizers":
return {"Items": [{"AuthorizerId": "authorizer-id", "Name": "test-authorizer"}]}
elif operation_name == "GetStages":

View File

@@ -9,10 +9,14 @@ AWS_REGION = "us-east-1"
# Mocking ApiGatewayV2 Calls
make_api_call = botocore.client.BaseClient._make_api_call
# Rationale -> https://github.com/boto/botocore/blob/develop/botocore/client.py#L810:L816
#
# We have to mock every AWS API call using Boto3
def mock_make_api_call(self, operation_name, kwarg):
"""
We have to mock every AWS API call using Boto3
Rationale -> https://github.com/boto/botocore/blob/develop/botocore/client.py#L810:L816
"""
if operation_name == "GetAuthorizers":
return {"Items": [{"AuthorizerId": "authorizer-id", "Name": "test-authorizer"}]}
elif operation_name == "GetStages":

View File

@@ -11,10 +11,14 @@ AWS_REGION = "us-east-1"
# Mocking ApiGatewayV2 Calls
make_api_call = botocore.client.BaseClient._make_api_call
# Rationale -> https://github.com/boto/botocore/blob/develop/botocore/client.py#L810:L816
#
# We have to mock every AWS API call using Boto3
def mock_make_api_call(self, operation_name, kwarg):
"""
We have to mock every AWS API call using Boto3
Rationale -> https://github.com/boto/botocore/blob/develop/botocore/client.py#L810:L816
"""
if operation_name == "GetAuthorizers":
return {"Items": [{"AuthorizerId": "authorizer-id", "Name": "test-authorizer"}]}
elif operation_name == "GetStages":
@@ -69,8 +73,8 @@ class Test_ApiGatewayV2_Service:
# ApiGatewayV2 client for this test class
audit_info = self.set_mocked_audit_info()
apigatewayv2 = ApiGatewayV2(audit_info)
for client in apigatewayv2.regional_clients.values():
assert client.__class__.__name__ == "ApiGatewayV2"
for regional_client in apigatewayv2.regional_clients.values():
assert regional_client.__class__.__name__ == "ApiGatewayV2"
# Test ApiGatewayV2 Session
@mock_apigatewayv2
@@ -118,7 +122,7 @@ class Test_ApiGatewayV2_Service:
# ApiGatewayV2 client for this test class
audit_info = self.set_mocked_audit_info()
apigatewayv2 = ApiGatewayV2(audit_info)
assert apigatewayv2.apis[0].authorizer == True
assert apigatewayv2.apis[0].authorizer is True
# Test ApiGatewayV2 Get Stages
@mock_apigatewayv2
@@ -130,4 +134,4 @@ class Test_ApiGatewayV2_Service:
audit_info = self.set_mocked_audit_info()
apigatewayv2 = ApiGatewayV2(audit_info)
assert apigatewayv2.apis[0].stages[0].logging == True
assert apigatewayv2.apis[0].stages[0].logging is True

View File

@@ -4,7 +4,8 @@ from providers.aws.services.appstream.appstream_client import appstream_client
max_session_duration_seconds = get_config_var("max_session_duration_seconds")
"""max_session_duration_seconds, default: 36000 seconds (10 hours)"""
# Check if there are AppStream Fleets with the user maximum session duration no longer than 10 hours
class appstream_fleet_maximum_session_duration(Check):
"""Check if there are AppStream Fleets with the user maximum session duration no longer than 10 hours"""

View File

@@ -5,7 +5,7 @@ from providers.aws.services.appstream.appstream_client import appstream_client
max_disconnect_timeout_in_seconds = get_config_var("max_disconnect_timeout_in_seconds")
"""max_disconnect_timeout_in_seconds, default: 300 seconds (5 minutes)"""
# Check if there are AppStream Fleets with the session disconnect timeout set to 5 minutes or less
class appstream_fleet_session_disconnect_timeout(Check):
"""Check if there are AppStream Fleets with the session disconnect timeout set to 5 minutes or less"""

View File

@@ -7,7 +7,7 @@ max_idle_disconnect_timeout_in_seconds = get_config_var(
)
"""max_idle_disconnect_timeout_in_seconds, default: 600 seconds (10 minutes)"""
# Check if there are AppStream Fleets with the idle disconnect timeout set to 10 minutes or less
class appstream_fleet_session_idle_disconnect_timeout(Check):
"""Check if there are AppStream Fleets with the idle disconnect timeout set to 10 minutes or less"""

View File

@@ -13,12 +13,15 @@ AWS_REGION = "eu-west-1"
# Mocking Access Analyzer Calls
make_api_call = botocore.client.BaseClient._make_api_call
# As you can see the operation_name has the list_analyzers snake_case form but
# we are using the ListAnalyzers form.
# Rationale -> https://github.com/boto/botocore/blob/develop/botocore/client.py#L810:L816
#
# We have to mock every AWS API call using Boto3
def mock_make_api_call(self, operation_name, kwarg):
"""
We have to mock every AWS API call using Boto3
As you can see the operation_name has the list_analyzers snake_case form but
we are using the ListAnalyzers form.
Rationale -> https://github.com/boto/botocore/blob/develop/botocore/client.py#L810:L816
"""
if operation_name == "DescribeFleets":
return {
"Fleets": [
@@ -86,7 +89,7 @@ class Test_AppStream_Service:
assert appstream.fleets[0].max_user_duration_in_seconds == 100
assert appstream.fleets[0].disconnect_timeout_in_seconds == 900
assert appstream.fleets[0].idle_disconnect_timeout_in_seconds == 900
assert appstream.fleets[0].enable_default_internet_access == False
assert appstream.fleets[0].enable_default_internet_access is False
assert appstream.fleets[0].region == AWS_REGION
assert (
@@ -97,5 +100,5 @@ class Test_AppStream_Service:
assert appstream.fleets[1].max_user_duration_in_seconds == 57600
assert appstream.fleets[1].disconnect_timeout_in_seconds == 900
assert appstream.fleets[1].idle_disconnect_timeout_in_seconds == 900
assert appstream.fleets[1].enable_default_internet_access == True
assert appstream.fleets[1].enable_default_internet_access is True
assert appstream.fleets[1].region == AWS_REGION

View File

@@ -46,8 +46,8 @@ class Test_AutoScaling_Service:
# AutoScaling client for this test class
audit_info = self.set_mocked_audit_info()
autoscaling = AutoScaling(audit_info)
for client in autoscaling.regional_clients.values():
assert client.__class__.__name__ == "AutoScaling"
for regional_client in autoscaling.regional_clients.values():
assert regional_client.__class__.__name__ == "AutoScaling"
# Test AutoScaling Session
@mock_autoscaling

View File

@@ -39,12 +39,15 @@ dummy_template = {
# Mocking Access Analyzer Calls
make_api_call = botocore.client.BaseClient._make_api_call
# As you can see the operation_name has the list_analyzers snake_case form but
# we are using the ListAnalyzers form.
# Rationale -> https://github.com/boto/botocore/blob/develop/botocore/client.py#L810:L816
#
# We have to mock every AWS API call using Boto3
def mock_make_api_call(self, operation_name, kwarg):
"""
As you can see the operation_name has the list_analyzers snake_case form but
we are using the ListAnalyzers form.
Rationale -> https://github.com/boto/botocore/blob/develop/botocore/client.py#L810:L816
We have to mock every AWS API call using Boto3
"""
if operation_name == "CreateStack":
return {
"StackId": "arn:aws:cloudformation:eu-west-1:123456789012:stack/Test-Stack/796c8d26-b390-41d7-a23c-0702c4e78b60"
@@ -122,13 +125,6 @@ def mock_generate_regional_clients(service, audit_info):
return {AWS_REGION: regional_client}
# Mock generate_regional_clients()
def mock_generate_regional_clients(service, audit_info):
regional_client = audit_info.audit_session.client(service, region_name=AWS_REGION)
regional_client.region = AWS_REGION
return {AWS_REGION: regional_client}
# Patch every AWS call using Boto3 and generate_regional_clients to have 1 client
@patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call)
@patch(
@@ -207,7 +203,7 @@ class Test_CloudFormation_Service:
assert cloudformation.stacks[0].arn == stack_arn["StackId"]
assert cloudformation.stacks[0].name == "Test-Stack"
assert cloudformation.stacks[0].outputs == ["TestOutput1:TestValue1"]
assert cloudformation.stacks[0].enable_termination_protection == True
assert cloudformation.stacks[0].is_nested_stack == False
assert cloudformation.stacks[0].enable_termination_protection is True
assert cloudformation.stacks[0].is_nested_stack is False
assert cloudformation.stacks[0].root_nested_stack == ""
assert cloudformation.stacks[0].region == AWS_REGION

View File

@@ -21,8 +21,8 @@ class cloudtrail_logs_s3_bucket_is_not_publicly_accessible(Check):
for bucket in s3_client.buckets:
# Here we need to ensure that acl_grantee is filled since if we don't have permissions to query the api for a concrete region
# (for example due to a SCP) we are going to try access an attribute from a None type
if trail_bucket == bucket.name and bucket.acl_grantee:
for grant in bucket.acl_grantee:
if trail_bucket == bucket.name and bucket.acl_grantees:
for grant in bucket.acl_grantees:
if (
grant.URI
== "http://acs.amazonaws.com/groups/global/AllUsers"

View File

@@ -50,16 +50,10 @@ class Test_cloudtrail_logs_s3_bucket_is_not_publicly_accessible:
@mock_cloudtrail
@mock_s3
def test_trail_bucket_not_valid_acl(self):
cloudtrail_client = client("cloudtrail", region_name="us-east-1")
def test_trail_bucket_public_acl(self):
s3_client = client("s3", region_name="us-east-1")
trail_name_us = "trail_test_us"
bucket_name_us = "bucket_test_us"
s3_client.create_bucket(Bucket=bucket_name_us)
trail_us = cloudtrail_client.create_trail(
Name=trail_name_us, S3BucketName=bucket_name_us, IsMultiRegionTrail=False
)
s3_client.put_bucket_acl(
AccessControlPolicy={
"Grants": [
@@ -78,6 +72,13 @@ class Test_cloudtrail_logs_s3_bucket_is_not_publicly_accessible:
},
Bucket=bucket_name_us,
)
trail_name_us = "trail_test_us"
cloudtrail_client = client("cloudtrail", region_name="us-east-1")
trail_us = cloudtrail_client.create_trail(
Name=trail_name_us, S3BucketName=bucket_name_us, IsMultiRegionTrail=False
)
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.cloudtrail.cloudtrail_service import Cloudtrail
from providers.aws.services.s3.s3_service import S3
@@ -89,7 +90,7 @@ class Test_cloudtrail_logs_s3_bucket_is_not_publicly_accessible:
new=Cloudtrail(current_audit_info),
):
with mock.patch(
"providers.aws.services.cloudtrail.cloudtrail_logs_s3_bucket_access_logging_enabled.cloudtrail_logs_s3_bucket_access_logging_enabled.s3_client",
"providers.aws.services.cloudtrail.cloudtrail_logs_s3_bucket_is_not_publicly_accessible.cloudtrail_logs_s3_bucket_is_not_publicly_accessible.s3_client",
new=S3(current_audit_info),
):
# Test Check
@@ -111,7 +112,7 @@ class Test_cloudtrail_logs_s3_bucket_is_not_publicly_accessible:
@mock_cloudtrail
@mock_s3
def test_trail_bucket_not_valid_acl(self):
def test_trail_bucket_not_public_acl(self):
cloudtrail_client = client("cloudtrail", region_name="us-east-1")
s3_client = client("s3", region_name="us-east-1")
trail_name_us = "trail_test_us"

View File

@@ -30,7 +30,7 @@ class cloudtrail_multi_region_enabled(Check):
else:
report.status = "FAIL"
report.status_extended = (
f"No CloudTrail trails enabled and logging were found"
"No CloudTrail trails enabled and logging were found"
)
report.region = cloudtrail_client.region
report.resource_arn = "No trails"

View File

@@ -54,10 +54,10 @@ class Test_cloudtrail_multi_region_enabled:
Bucket=bucket_name_eu,
CreateBucketConfiguration={"LocationConstraint": "eu-west-1"},
)
trail_us = cloudtrail_client_us_east_1.create_trail(
_ = cloudtrail_client_us_east_1.create_trail(
Name=trail_name_us, S3BucketName=bucket_name_us, IsMultiRegionTrail=False
)
trail_eu = cloudtrail_client_eu_west_1.create_trail(
_ = cloudtrail_client_eu_west_1.create_trail(
Name=trail_name_eu, S3BucketName=bucket_name_eu, IsMultiRegionTrail=False
)
@@ -113,8 +113,8 @@ class Test_cloudtrail_multi_region_enabled:
cloudtrail_client_eu_west_1.create_trail(
Name=trail_name_eu, S3BucketName=bucket_name_eu, IsMultiRegionTrail=False
)
response = cloudtrail_client_us_east_1.start_logging(Name=trail_name_us)
status = cloudtrail_client_us_east_1.get_trail_status(Name=trail_name_us)
_ = cloudtrail_client_us_east_1.start_logging(Name=trail_name_us)
_ = cloudtrail_client_us_east_1.get_trail_status(Name=trail_name_us)
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.cloudtrail.cloudtrail_service import Cloudtrail

View File

@@ -10,7 +10,7 @@ class cloudtrail_s3_dataevents_enabled(Check):
report.resource_id = "No trails"
report.resource_arn = "No trails"
report.status = "FAIL"
report.status_extended = f"No CloudTrail trails have a data event to record all S3 object-level API operations."
report.status_extended = "No CloudTrail trails have a data event to record all S3 object-level API operations."
for trail in cloudtrail_client.trails:
for data_event in trail.data_events:
# Check if trail has a data event for all S3 Buckets for write/read

View File

@@ -55,7 +55,7 @@ class Test_cloudtrail_s3_dataevents_enabled:
cloudtrail_client_us_east_1.create_trail(
Name=trail_name_us, S3BucketName=bucket_name_us, IsMultiRegionTrail=False
)
data_events_response = cloudtrail_client_us_east_1.put_event_selectors(
_ = cloudtrail_client_us_east_1.put_event_selectors(
TrailName=trail_name_us,
EventSelectors=[
{
@@ -104,7 +104,7 @@ class Test_cloudtrail_s3_dataevents_enabled:
trail_us = cloudtrail_client_us_east_1.create_trail(
Name=trail_name_us, S3BucketName=bucket_name_us, IsMultiRegionTrail=False
)
data_events_response = cloudtrail_client_us_east_1.put_event_selectors(
_ = cloudtrail_client_us_east_1.put_event_selectors(
TrailName=trail_name_us,
EventSelectors=[
{

View File

@@ -41,8 +41,8 @@ class Test_Cloudtrail_Service:
def test_client(self):
audit_info = self.set_mocked_audit_info()
cloudtrail = Cloudtrail(audit_info)
for client in cloudtrail.regional_clients.values():
assert client.__class__.__name__ == "CloudTrail"
for regional_client in cloudtrail.regional_clients.values():
assert regional_client.__class__.__name__ == "CloudTrail"
# Test Cloudtrail session
@mock_cloudtrail

View File

@@ -13,7 +13,7 @@ class cloudwatch_changes_to_network_acls_alarm_configured(Check):
report = Check_Report(self.metadata)
report.status = "FAIL"
report.status_extended = (
f"No CloudWatch log groups found with metric filters or alarms associated."
"No CloudWatch log groups found with metric filters or alarms associated."
)
report.region = "us-east-1"
report.resource_id = ""

View File

@@ -202,7 +202,7 @@ class Test_cloudwatch_log_metric_filter_unauthorized_api_calls:
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"CloudWatch log group /log-group/test found with metric filter test-filter but no alarms associated."
== "CloudWatch log group /log-group/test found with metric filter test-filter but no alarms associated."
)
assert result[0].resource_id == "/log-group/test"
@@ -277,6 +277,6 @@ class Test_cloudwatch_log_metric_filter_unauthorized_api_calls:
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"CloudWatch log group /log-group/test found with metric filter test-filter and alarms set."
== "CloudWatch log group /log-group/test found with metric filter test-filter and alarms set."
)
assert result[0].resource_id == "/log-group/test"

View File

@@ -13,7 +13,7 @@ class cloudwatch_changes_to_network_gateways_alarm_configured(Check):
report = Check_Report(self.metadata)
report.status = "FAIL"
report.status_extended = (
f"No CloudWatch log groups found with metric filters or alarms associated."
"No CloudWatch log groups found with metric filters or alarms associated."
)
report.region = "us-east-1"
report.resource_id = ""

View File

@@ -202,7 +202,7 @@ class Test_cloudwatch_log_metric_filter_unauthorized_api_calls:
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"CloudWatch log group /log-group/test found with metric filter test-filter but no alarms associated."
== "CloudWatch log group /log-group/test found with metric filter test-filter but no alarms associated."
)
assert result[0].resource_id == "/log-group/test"
@@ -277,6 +277,6 @@ class Test_cloudwatch_log_metric_filter_unauthorized_api_calls:
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"CloudWatch log group /log-group/test found with metric filter test-filter and alarms set."
== "CloudWatch log group /log-group/test found with metric filter test-filter and alarms set."
)
assert result[0].resource_id == "/log-group/test"

View File

@@ -13,7 +13,7 @@ class cloudwatch_changes_to_network_route_tables_alarm_configured(Check):
report = Check_Report(self.metadata)
report.status = "FAIL"
report.status_extended = (
f"No CloudWatch log groups found with metric filters or alarms associated."
"No CloudWatch log groups found with metric filters or alarms associated."
)
report.region = "us-east-1"
report.resource_id = ""

View File

@@ -202,7 +202,7 @@ class Test_cloudwatch_log_metric_filter_unauthorized_api_calls:
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"CloudWatch log group /log-group/test found with metric filter test-filter but no alarms associated."
== "CloudWatch log group /log-group/test found with metric filter test-filter but no alarms associated."
)
assert result[0].resource_id == "/log-group/test"
@@ -277,6 +277,6 @@ class Test_cloudwatch_log_metric_filter_unauthorized_api_calls:
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"CloudWatch log group /log-group/test found with metric filter test-filter and alarms set."
== "CloudWatch log group /log-group/test found with metric filter test-filter and alarms set."
)
assert result[0].resource_id == "/log-group/test"

View File

@@ -13,7 +13,7 @@ class cloudwatch_changes_to_vpcs_alarm_configured(Check):
report = Check_Report(self.metadata)
report.status = "FAIL"
report.status_extended = (
f"No CloudWatch log groups found with metric filters or alarms associated."
"No CloudWatch log groups found with metric filters or alarms associated."
)
report.region = "us-east-1"
report.resource_id = ""

View File

@@ -202,7 +202,7 @@ class Test_cloudwatch_log_metric_filter_unauthorized_api_calls:
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"CloudWatch log group /log-group/test found with metric filter test-filter but no alarms associated."
== "CloudWatch log group /log-group/test found with metric filter test-filter but no alarms associated."
)
assert result[0].resource_id == "/log-group/test"
@@ -277,6 +277,6 @@ class Test_cloudwatch_log_metric_filter_unauthorized_api_calls:
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"CloudWatch log group /log-group/test found with metric filter test-filter and alarms set."
== "CloudWatch log group /log-group/test found with metric filter test-filter and alarms set."
)
assert result[0].resource_id == "/log-group/test"

View File

@@ -15,7 +15,7 @@ class cloudwatch_log_metric_filter_and_alarm_for_aws_config_configuration_change
report = Check_Report(self.metadata)
report.status = "FAIL"
report.status_extended = (
f"No CloudWatch log groups found with metric filters or alarms associated."
"No CloudWatch log groups found with metric filters or alarms associated."
)
report.region = "us-east-1"
report.resource_id = ""

View File

@@ -210,7 +210,7 @@ class Test_cloudwatch_log_metric_filter_unauthorized_api_calls:
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"CloudWatch log group /log-group/test found with metric filter test-filter but no alarms associated."
== "CloudWatch log group /log-group/test found with metric filter test-filter but no alarms associated."
)
assert result[0].resource_id == "/log-group/test"
@@ -287,6 +287,6 @@ class Test_cloudwatch_log_metric_filter_unauthorized_api_calls:
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"CloudWatch log group /log-group/test found with metric filter test-filter and alarms set."
== "CloudWatch log group /log-group/test found with metric filter test-filter and alarms set."
)
assert result[0].resource_id == "/log-group/test"

View File

@@ -15,7 +15,7 @@ class cloudwatch_log_metric_filter_and_alarm_for_cloudtrail_configuration_change
report = Check_Report(self.metadata)
report.status = "FAIL"
report.status_extended = (
f"No CloudWatch log groups found with metric filters or alarms associated."
"No CloudWatch log groups found with metric filters or alarms associated."
)
report.region = "us-east-1"
report.resource_id = ""

View File

@@ -210,7 +210,7 @@ class Test_cloudwatch_log_metric_filter_unauthorized_api_calls:
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"CloudWatch log group /log-group/test found with metric filter test-filter but no alarms associated."
== "CloudWatch log group /log-group/test found with metric filter test-filter but no alarms associated."
)
assert result[0].resource_id == "/log-group/test"
@@ -287,6 +287,6 @@ class Test_cloudwatch_log_metric_filter_unauthorized_api_calls:
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"CloudWatch log group /log-group/test found with metric filter test-filter and alarms set."
== "CloudWatch log group /log-group/test found with metric filter test-filter and alarms set."
)
assert result[0].resource_id == "/log-group/test"

View File

@@ -13,7 +13,7 @@ class cloudwatch_log_metric_filter_authentication_failures(Check):
report = Check_Report(self.metadata)
report.status = "FAIL"
report.status_extended = (
f"No CloudWatch log groups found with metric filters or alarms associated."
"No CloudWatch log groups found with metric filters or alarms associated."
)
report.region = "us-east-1"
report.resource_id = ""

View File

@@ -202,7 +202,7 @@ class Test_cloudwatch_log_metric_filter_unauthorized_api_calls:
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"CloudWatch log group /log-group/test found with metric filter test-filter but no alarms associated."
== "CloudWatch log group /log-group/test found with metric filter test-filter but no alarms associated."
)
assert result[0].resource_id == "/log-group/test"
@@ -277,6 +277,6 @@ class Test_cloudwatch_log_metric_filter_unauthorized_api_calls:
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"CloudWatch log group /log-group/test found with metric filter test-filter and alarms set."
== "CloudWatch log group /log-group/test found with metric filter test-filter and alarms set."
)
assert result[0].resource_id == "/log-group/test"

View File

@@ -13,7 +13,7 @@ class cloudwatch_log_metric_filter_disable_or_scheduled_deletion_of_kms_cmk(Chec
report = Check_Report(self.metadata)
report.status = "FAIL"
report.status_extended = (
f"No CloudWatch log groups found with metric filters or alarms associated."
"No CloudWatch log groups found with metric filters or alarms associated."
)
report.region = "us-east-1"
report.resource_id = ""

View File

@@ -210,7 +210,7 @@ class Test_cloudwatch_log_metric_filter_unauthorized_api_calls:
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"CloudWatch log group /log-group/test found with metric filter test-filter but no alarms associated."
== "CloudWatch log group /log-group/test found with metric filter test-filter but no alarms associated."
)
assert result[0].resource_id == "/log-group/test"
@@ -287,6 +287,6 @@ class Test_cloudwatch_log_metric_filter_unauthorized_api_calls:
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"CloudWatch log group /log-group/test found with metric filter test-filter and alarms set."
== "CloudWatch log group /log-group/test found with metric filter test-filter and alarms set."
)
assert result[0].resource_id == "/log-group/test"

View File

@@ -13,7 +13,7 @@ class cloudwatch_log_metric_filter_for_s3_bucket_policy_changes(Check):
report = Check_Report(self.metadata)
report.status = "FAIL"
report.status_extended = (
f"No CloudWatch log groups found with metric filters or alarms associated."
"No CloudWatch log groups found with metric filters or alarms associated."
)
report.region = "us-east-1"
report.resource_id = ""

View File

@@ -202,7 +202,7 @@ class Test_cloudwatch_log_metric_filter_unauthorized_api_calls:
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"CloudWatch log group /log-group/test found with metric filter test-filter but no alarms associated."
== "CloudWatch log group /log-group/test found with metric filter test-filter but no alarms associated."
)
assert result[0].resource_id == "/log-group/test"
@@ -277,6 +277,6 @@ class Test_cloudwatch_log_metric_filter_unauthorized_api_calls:
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"CloudWatch log group /log-group/test found with metric filter test-filter and alarms set."
== "CloudWatch log group /log-group/test found with metric filter test-filter and alarms set."
)
assert result[0].resource_id == "/log-group/test"

View File

@@ -13,7 +13,7 @@ class cloudwatch_log_metric_filter_policy_changes(Check):
report = Check_Report(self.metadata)
report.status = "FAIL"
report.status_extended = (
f"No CloudWatch log groups found with metric filters or alarms associated."
"No CloudWatch log groups found with metric filters or alarms associated."
)
report.region = "us-east-1"
report.resource_id = ""

View File

@@ -202,7 +202,7 @@ class Test_cloudwatch_log_metric_filter_unauthorized_api_calls:
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"CloudWatch log group /log-group/test found with metric filter test-filter but no alarms associated."
== "CloudWatch log group /log-group/test found with metric filter test-filter but no alarms associated."
)
assert result[0].resource_id == "/log-group/test"
@@ -277,6 +277,6 @@ class Test_cloudwatch_log_metric_filter_unauthorized_api_calls:
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"CloudWatch log group /log-group/test found with metric filter test-filter and alarms set."
== "CloudWatch log group /log-group/test found with metric filter test-filter and alarms set."
)
assert result[0].resource_id == "/log-group/test"

View File

@@ -13,7 +13,7 @@ class cloudwatch_log_metric_filter_root_usage(Check):
report = Check_Report(self.metadata)
report.status = "FAIL"
report.status_extended = (
f"No CloudWatch log groups found with metric filters or alarms associated."
"No CloudWatch log groups found with metric filters or alarms associated."
)
report.region = "us-east-1"
report.resource_id = ""

View File

@@ -202,7 +202,7 @@ class Test_cloudwatch_log_metric_filter_unauthorized_api_calls:
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"CloudWatch log group /log-group/test found with metric filter test-filter but no alarms associated."
== "CloudWatch log group /log-group/test found with metric filter test-filter but no alarms associated."
)
assert result[0].resource_id == "/log-group/test"
@@ -277,6 +277,6 @@ class Test_cloudwatch_log_metric_filter_unauthorized_api_calls:
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"CloudWatch log group /log-group/test found with metric filter test-filter and alarms set."
== "CloudWatch log group /log-group/test found with metric filter test-filter and alarms set."
)
assert result[0].resource_id == "/log-group/test"

View File

@@ -13,7 +13,7 @@ class cloudwatch_log_metric_filter_security_group_changes(Check):
report = Check_Report(self.metadata)
report.status = "FAIL"
report.status_extended = (
f"No CloudWatch log groups found with metric filters or alarms associated."
"No CloudWatch log groups found with metric filters or alarms associated."
)
report.region = "us-east-1"
report.resource_id = ""

View File

@@ -202,7 +202,7 @@ class Test_cloudwatch_log_metric_filter_unauthorized_api_calls:
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"CloudWatch log group /log-group/test found with metric filter test-filter but no alarms associated."
== "CloudWatch log group /log-group/test found with metric filter test-filter but no alarms associated."
)
assert result[0].resource_id == "/log-group/test"
@@ -277,6 +277,6 @@ class Test_cloudwatch_log_metric_filter_unauthorized_api_calls:
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"CloudWatch log group /log-group/test found with metric filter test-filter and alarms set."
== "CloudWatch log group /log-group/test found with metric filter test-filter and alarms set."
)
assert result[0].resource_id == "/log-group/test"

View File

@@ -13,7 +13,7 @@ class cloudwatch_log_metric_filter_sign_in_without_mfa(Check):
report = Check_Report(self.metadata)
report.status = "FAIL"
report.status_extended = (
f"No CloudWatch log groups found with metric filters or alarms associated."
"No CloudWatch log groups found with metric filters or alarms associated."
)
report.region = "us-east-1"
report.resource_id = ""

View File

@@ -202,7 +202,7 @@ class Test_cloudwatch_log_metric_filter_unauthorized_api_calls:
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"CloudWatch log group /log-group/test found with metric filter test-filter but no alarms associated."
== "CloudWatch log group /log-group/test found with metric filter test-filter but no alarms associated."
)
assert result[0].resource_id == "/log-group/test"
@@ -277,6 +277,6 @@ class Test_cloudwatch_log_metric_filter_unauthorized_api_calls:
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"CloudWatch log group /log-group/test found with metric filter test-filter and alarms set."
== "CloudWatch log group /log-group/test found with metric filter test-filter and alarms set."
)
assert result[0].resource_id == "/log-group/test"

View File

@@ -13,7 +13,7 @@ class cloudwatch_log_metric_filter_unauthorized_api_calls(Check):
report = Check_Report(self.metadata)
report.status = "FAIL"
report.status_extended = (
f"No CloudWatch log groups found with metric filters or alarms associated."
"No CloudWatch log groups found with metric filters or alarms associated."
)
report.region = "us-east-1"
report.resource_id = ""

View File

@@ -202,7 +202,7 @@ class Test_cloudwatch_log_metric_filter_unauthorized_api_calls:
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"CloudWatch log group /log-group/test found with metric filter test-filter but no alarms associated."
== "CloudWatch log group /log-group/test found with metric filter test-filter but no alarms associated."
)
assert result[0].resource_id == "/log-group/test"
@@ -277,6 +277,6 @@ class Test_cloudwatch_log_metric_filter_unauthorized_api_calls:
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"CloudWatch log group /log-group/test found with metric filter test-filter and alarms set."
== "CloudWatch log group /log-group/test found with metric filter test-filter and alarms set."
)
assert result[0].resource_id == "/log-group/test"

View File

@@ -12,7 +12,7 @@ class config_recorder_all_regions_enabled(Check):
# Check if Config is enabled in region
if not recorder.name:
report.status = "FAIL"
report.status_extended = f"No AWS Config recorders in region."
report.status_extended = "No AWS Config recorders in region."
else:
if recorder.recording:
if recorder.last_status == "Failure":

View File

@@ -62,7 +62,7 @@ class Test_config_recorder_all_regions_enabled:
assert recorder.status == "FAIL"
assert (
recorder.status_extended
== f"AWS Config recorder default is disabled."
== "AWS Config recorder default is disabled."
)
assert recorder.resource_id == "default"
@@ -102,6 +102,6 @@ class Test_config_recorder_all_regions_enabled:
assert recorder.status == "PASS"
assert (
recorder.status_extended
== f"AWS Config recorder default is enabled."
== "AWS Config recorder default is enabled."
)
assert recorder.resource_id == "default"

View File

@@ -44,8 +44,8 @@ class Test_Config_Service:
# Config client for this test class
audit_info = self.set_mocked_audit_info()
config = Config(audit_info)
for client in config.regional_clients.values():
assert client.__class__.__name__ == "ConfigService"
for regional_client in config.regional_clients.values():
assert regional_client.__class__.__name__ == "ConfigService"
# Test Config Session
@mock_config
@@ -86,4 +86,4 @@ class Test_Config_Service:
# Search for the recorder just created
for recorder in config.recorders:
if recorder.name == "default":
assert recorder.recording == True
assert recorder.recording is True

View File

@@ -41,6 +41,6 @@ class ec2_elastic_ip_shodan(Check):
findings.append(report)
else:
logger.error(
f"ERROR: No Shodan API Key -- Please input a Shodan API Key with -N/--shodan or in config.yaml"
"ERROR: No Shodan API Key -- Please input a Shodan API Key with -N/--shodan or in config.yaml"
)
return findings

View File

@@ -44,8 +44,8 @@ class Test_EC2_Service:
# EC2 client for this test class
audit_info = self.set_mocked_audit_info()
ec2 = EC2(audit_info)
for client in ec2.regional_clients.values():
assert client.__class__.__name__ == "EC2"
for regional_client in ec2.regional_clients.values():
assert regional_client.__class__.__name__ == "EC2"
# Test EC2 Session
@mock_ec2
@@ -164,4 +164,4 @@ class Test_EC2_Service:
ec2 = EC2(audit_info)
for snapshot in ec2.snapshots:
if snapshot.id == snapshot_id:
assert snapshot.public == True
assert snapshot.public is True

View File

@@ -1,4 +1,3 @@
from re import T
from typing import Any
@@ -8,7 +7,8 @@ def check_network_acl(rules: Any, protocol: str, port: str) -> bool:
# Spliting IPv6 from IPv4 rules
rules_IPv6 = list(
filter(lambda rule: rule.get("CidrBlock") is None and not rule["Egress"], rules))
filter(lambda rule: rule.get("CidrBlock") is None and not rule["Egress"], rules)
)
# For IPv6
# Rules must order by RuleNumber
@@ -18,11 +18,9 @@ def check_network_acl(rules: Any, protocol: str, port: str) -> bool:
and rule["RuleAction"] == "deny"
and (
rule["Protocol"] == "-1"
or
(
or (
rule["Protocol"] == protocol
and
rule["PortRange"]["From"] <= port <= rule["PortRange"]["To"]
and rule["PortRange"]["From"] <= port <= rule["PortRange"]["To"]
)
)
):
@@ -34,11 +32,9 @@ def check_network_acl(rules: Any, protocol: str, port: str) -> bool:
and rule["RuleAction"] == "allow"
and (
rule["Protocol"] == "-1"
or
(
or (
rule["Protocol"] == protocol
and
rule["PortRange"]["From"] <= port <= rule["PortRange"]["To"]
and rule["PortRange"]["From"] <= port <= rule["PortRange"]["To"]
)
)
):
@@ -48,7 +44,11 @@ def check_network_acl(rules: Any, protocol: str, port: str) -> bool:
# There are not IPv6 Public access here
# Spliting IPv4 from IPv6 rules
rules_IPv4 = list(filter(lambda rule: rule.get("Ipv6CidrBlock") is None and not rule["Egress"], rules))
rules_IPv4 = list(
filter(
lambda rule: rule.get("Ipv6CidrBlock") is None and not rule["Egress"], rules
)
)
# For IPv4
# Rules must order by RuleNumber
@@ -58,11 +58,9 @@ def check_network_acl(rules: Any, protocol: str, port: str) -> bool:
and rule["RuleAction"] == "deny"
and (
rule["Protocol"] == "-1"
or
(
or (
rule["Protocol"] == protocol
and
rule["PortRange"]["From"] <= port <= rule["PortRange"]["To"]
and rule["PortRange"]["From"] <= port <= rule["PortRange"]["To"]
)
)
):
@@ -75,11 +73,9 @@ def check_network_acl(rules: Any, protocol: str, port: str) -> bool:
and rule["RuleAction"] == "allow"
and (
rule["Protocol"] == "-1"
or
(
or (
rule["Protocol"] == protocol
and
rule["PortRange"]["From"] <= port <= rule["PortRange"]["To"]
and rule["PortRange"]["From"] <= port <= rule["PortRange"]["To"]
)
)
):

File diff suppressed because it is too large Load Diff

View File

@@ -35,7 +35,7 @@ class Test_iam_no_root_access_key_test:
# raise Exception
assert result[0].status == "PASS"
assert search(
f"User <root_account> has not access keys.",
"User <root_account> has not access keys.",
result[0].status_extended,
)
assert result[0].resource_id == "<root_account>"
@@ -73,7 +73,7 @@ class Test_iam_no_root_access_key_test:
# raise Exception
assert result[0].status == "FAIL"
assert search(
f"User <root_account> has one active access key.",
"User <root_account> has one active access key.",
result[0].status_extended,
)
assert result[0].resource_id == "<root_account>"
@@ -111,7 +111,7 @@ class Test_iam_no_root_access_key_test:
# raise Exception
assert result[0].status == "FAIL"
assert search(
f"User <root_account> has one active access key.",
"User <root_account> has one active access key.",
result[0].status_extended,
)
assert result[0].resource_id == "<root_account>"
@@ -149,7 +149,7 @@ class Test_iam_no_root_access_key_test:
# raise Exception
assert result[0].status == "FAIL"
assert search(
f"User <root_account> has two active access key.",
"User <root_account> has two active access key.",
result[0].status_extended,
)
assert result[0].resource_id == "<root_account>"

View File

@@ -21,6 +21,6 @@ class iam_password_policy_lowercase(Check):
)
else:
report.status = "FAIL"
report.status_extended = f"Password policy cannot be found"
report.status_extended = "Password policy cannot be found"
findings.append(report)
return findings

View File

@@ -25,9 +25,7 @@ class iam_policy_attached_only_to_group_or_roles(Check):
report = Check_Report(self.metadata)
report.region = iam_client.region
report.status = "FAIL"
report.status_extended = (
f"User {user.name} has the following inline policy {policy}"
)
report.status_extended = f"User {user.name} has the following inline policy {policy}"
report.resource_id = user.name
findings.append(report)

View File

@@ -81,7 +81,10 @@ class Test_iam_policy_attached_only_to_group_or_roles:
f"User {user} has attached the following policy",
result[0].status_extended,
)
assert search(f"User {user} has the following inline policy", result[1].status_extended)
assert search(
f"User {user} has the following inline policy",
result[1].status_extended,
)
@mock_iam
def test_iam_user_inline_policy(self):

View File

@@ -382,10 +382,10 @@ class MFADevice:
class User:
name: str
arn: str
mfa_devices: "list[MFADevice]"
mfa_devices: list[MFADevice]
password_last_used: str
attached_policies: "list[dict]"
inline_policies: "list[str]"
attached_policies: list[dict]
inline_policies: list[str]
def __init__(self, name, arn, password_last_used):
self.name = name
@@ -400,8 +400,8 @@ class User:
class Group:
name: str
arn: str
attached_policies: "list[dict]"
users: " list[User]"
attached_policies: list[dict]
users: list[User]
def __init__(self, name, arn):
self.name = name

View File

@@ -230,7 +230,7 @@ class Test_IAM_Service:
assert iam.password_policy.uppercase == require_upper
assert iam.password_policy.lowercase == require_lower
assert iam.password_policy.allow_change == allow_users_to_change
assert iam.password_policy.expiration == True
assert iam.password_policy.expiration is True
assert iam.password_policy.max_age == max_password_age
assert iam.password_policy.reuse_prevention == password_reuse_prevention
assert iam.password_policy.hard_expiry == hard_expiry
@@ -379,7 +379,7 @@ class Test_IAM_Service:
@mock_iam
def test__get_entities_attached_to_support_roles__no_roles(self):
iam_client = client("iam")
support_roles = iam_client.list_entities_for_policy(
_ = iam_client.list_entities_for_policy(
PolicyArn="arn:aws:iam::aws:policy/aws-service-role/AWSSupportServiceRolePolicy",
EntityFilter="Role",
)["PolicyRoles"]
@@ -458,7 +458,7 @@ class Test_IAM_Service:
assert iam.list_policies_version[0]["Statement"][0]["Effect"] == "Allow"
assert iam.list_policies_version[0]["Statement"][0]["Action"] == "*"
assert iam.list_policies_version[0]["Statement"][0]["Resource"] == "*"
# Test IAM List SAML Providers
@mock_iam
def test__list_saml_providers__(self):

View File

@@ -16,6 +16,6 @@ class iam_support_role_created(Check):
report.status_extended = f"Support policy attached to role {iam_client.entities_attached_to_support_roles[0]['RoleName']}"
else:
report.status = "FAIL"
report.status_extended = f"Support policy is not attached to any role"
report.status_extended = "Support policy is not attached to any role"
findings.append(report)
return findings

View File

@@ -17,7 +17,7 @@ class kms_key_not_publicly_accessible(Check):
report.region = key.region
# If the "Principal" element value is set to { "AWS": "*" } and the policy statement is not using any Condition clauses to filter the access, the selected AWS KMS master key is publicly accessible.
for statement in key.policy["Statement"]:
if "*" == statement["Principal"] and not "Condition" in statement:
if "*" == statement["Principal"] and "Condition" not in statement:
report.status = "FAIL"
report.status_extended = (
f"KMS key {key.id} may be publicly accessible!"
@@ -28,7 +28,7 @@ class kms_key_not_publicly_accessible(Check):
else:
principals = statement["Principal"]["AWS"]
for principal_arn in principals:
if principal_arn == "*" and not "Condition" in statement:
if principal_arn == "*" and "Condition" not in statement:
report.status = "FAIL"
report.status_extended = (
f"KMS key {key.id} may be publicly accessible!"

View File

@@ -46,8 +46,8 @@ class Test_ACM_Service:
# KMS client for this test class
audit_info = self.set_mocked_audit_info()
kms = KMS(audit_info)
for client in kms.regional_clients.values():
assert client.__class__.__name__ == "KMS"
for regional_client in kms.regional_clients.values():
assert regional_client.__class__.__name__ == "KMS"
# Test KMS Session
@mock_kms
@@ -110,9 +110,9 @@ class Test_ACM_Service:
kms = KMS(audit_info)
assert len(kms.keys) == 2
assert kms.keys[0].arn == key1["Arn"]
assert kms.keys[0].rotation_enabled == False
assert kms.keys[0].rotation_enabled is False
assert kms.keys[1].arn == key2["Arn"]
assert kms.keys[1].rotation_enabled == True
assert kms.keys[1].rotation_enabled is True
# Test KMS Key policy
@mock_kms

View File

@@ -13,7 +13,7 @@ class s3_bucket_public_access(Check):
):
report = Check_Report(self.metadata)
report.status = "PASS"
report.status_extended = f"All S3 public access blocked at account level."
report.status_extended = "All S3 public access blocked at account level."
report.region = s3control_client.region
report.resource_id = s3_client.audited_account
findings.append(report)

View File

@@ -251,7 +251,6 @@ class Bucket:
policy: dict
encryption: str
region: str
acl_grantee: list[ACL_Grantee]
logging_target_bucket: str
ownership: str
@@ -272,6 +271,5 @@ class Bucket:
self.policy = {}
self.encryption = None
self.region = region
self.acl_grantee = None
self.logging_target_bucket = None
self.ownership = None

View File

@@ -99,7 +99,7 @@ class Test_S3_Service:
s3 = S3(audit_info)
assert len(s3.buckets) == 1
assert s3.buckets[0].name == bucket_name
assert s3.buckets[0].versioning == True
assert s3.buckets[0].versioning is True
# Test S3 Get Bucket ACL
@mock_s3
@@ -204,7 +204,7 @@ class Test_S3_Service:
s3 = S3(audit_info)
assert len(s3.buckets) == 1
assert s3.buckets[0].name == bucket_name
assert s3.buckets[0].logging == True
assert s3.buckets[0].logging is True
# Test S3 Get Bucket Policy
@mock_s3
@@ -270,24 +270,6 @@ class Test_S3_Service:
assert s3.buckets[0].name == bucket_name
assert s3.buckets[0].ownership == "BucketOwnerEnforced"
# Test S3 Get Bucket Ownership Controls
@mock_s3
def test__get_bucket_ownership_controls__(self):
# Generate S3 Client
s3_client = client("s3")
# Create S3 Bucket
bucket_name = "test-bucket"
s3_client.create_bucket(
Bucket=bucket_name, ObjectOwnership="BucketOwnerEnforced"
)
# S3 client for this test class
audit_info = self.set_mocked_audit_info()
s3 = S3(audit_info)
assert len(s3.buckets) == 1
assert s3.buckets[0].name == bucket_name
assert s3.buckets[0].ownership == "BucketOwnerEnforced"
# Test S3 Get Public Access Block
@mock_s3
def test__get_public_access_block__(self):
@@ -319,7 +301,7 @@ class Test_S3_Service:
# Test S3 Control Account Get Public Access Block
@mock_s3control
def test__get_public_access_block__(self):
def test__get_public_access_block__s3_control(self):
# Generate S3Control Client
s3control_client = client("s3control", region_name=AWS_REGION)
s3control_client.put_public_access_block(

View File

@@ -15,7 +15,7 @@ class securityhub_enabled(Check):
)
else:
report.status = "FAIL"
report.status_extended = f"Security Hub is not enabled"
report.status_extended = "Security Hub is not enabled"
report.resource_id = securityhub.id
report.resource_arn = securityhub.arn
findings.append(report)

View File

@@ -11,12 +11,14 @@ AWS_REGION = "eu-west-1"
# Mocking Access Analyzer Calls
make_api_call = botocore.client.BaseClient._make_api_call
# As you can see the operation_name has the list_analyzers snake_case form but
# we are using the ListAnalyzers form.
# Rationale -> https://github.com/boto/botocore/blob/develop/botocore/client.py#L810:L816
#
# We have to mock every AWS API call using Boto3
def mock_make_api_call(self, operation_name, kwarg):
"""
We have to mock every AWS API call using Boto3
As you can see the operation_name has the snake_case
Rationale -> https://github.com/boto/botocore/blob/develop/botocore/client.py#L810:L816
"""
if operation_name == "GetEnabledStandards":
return {
"StandardsSubscriptions": [

View File

@@ -10,10 +10,14 @@ ACCOUNT_ID = "123456789012"
# Mocking VPC Calls
make_api_call = botocore.client.BaseClient._make_api_call
# Rationale -> https://github.com/boto/botocore/blob/develop/botocore/client.py#L810:L816
#
# We have to mock every AWS API call using Boto3
def mock_make_api_call(self, operation_name, kwarg):
"""
We have to mock every AWS API call using Boto3
Rationale -> https://github.com/boto/botocore/blob/develop/botocore/client.py#L810:L816
"""
if operation_name == "DescribeVpcEndpointServices":
return {
"ServiceDetails": [
@@ -67,7 +71,7 @@ class Test_vpc_endpoint_services_allowed_principals_trust_boundaries:
AvailabilityZone=f"{AWS_REGION}a",
)
lb_name = "lb_vpce-test"
lb_arn = elbv2_client.create_load_balancer(
_ = elbv2_client.create_load_balancer(
Name=lb_name,
Subnets=[subnet["Subnet"]["SubnetId"]],
Scheme="internal",

View File

@@ -46,8 +46,8 @@ class Test_VPC_Service:
# VPC client for this test class
audit_info = self.set_mocked_audit_info()
vpc = VPC(audit_info)
for client in vpc.regional_clients.values():
assert client.__class__.__name__ == "EC2"
for regional_client in vpc.regional_clients.values():
assert regional_client.__class__.__name__ == "EC2"
# Test VPC Session
@mock_ec2
@@ -102,29 +102,7 @@ class Test_VPC_Service:
# Search created VPC among default ones
for vpc in vpc.vpcs:
if vpc.id == new_vpc["VpcId"]:
assert vpc.flow_log == True
# Test VPC Describe VPC Peering connections
@mock_ec2
def test__describe_vpc_peering_connections__(self):
# Generate VPC Client
ec2_client = client("ec2", region_name=AWS_REGION)
# Create VPCs peers
vpc = ec2_client.create_vpc(CidrBlock="10.0.0.0/16")
peer_vpc = ec2_client.create_vpc(CidrBlock="11.0.0.0/16")
vpc_pcx = ec2_client.create_vpc_peering_connection(
VpcId=vpc["Vpc"]["VpcId"], PeerVpcId=peer_vpc["Vpc"]["VpcId"]
)
vpc_pcx_id = vpc_pcx["VpcPeeringConnection"]["VpcPeeringConnectionId"]
vpc_pcx = ec2_client.accept_vpc_peering_connection(
VpcPeeringConnectionId=vpc_pcx_id
)
# VPC client for this test class
audit_info = self.set_mocked_audit_info()
vpc = VPC(audit_info)
assert len(vpc.vpc_peering_connections) == 1
assert vpc.vpc_peering_connections[0].id == vpc_pcx_id
assert vpc.flow_log is True
# Test VPC Describe VPC Peering connections
@mock_ec2
@@ -153,7 +131,7 @@ class Test_VPC_Service:
def test__describe_route_tables__(self):
# Generate VPC Client
ec2_client = client("ec2", region_name=AWS_REGION)
ec2_resource = resource("ec2", region_name=AWS_REGION)
_ = resource("ec2", region_name=AWS_REGION)
# Create VPCs peers as well as a route
vpc = ec2_client.create_vpc(CidrBlock="10.0.0.0/16")
@@ -247,7 +225,7 @@ class Test_VPC_Service:
Type="network",
)["LoadBalancers"][0]["LoadBalancerArn"]
service = ec2_client.create_vpc_endpoint_service_configuration(
_ = ec2_client.create_vpc_endpoint_service_configuration(
NetworkLoadBalancerArns=[lb_arn]
)
# VPC client for this test class