fix(aws): Handle unique map keys (#2390)

Co-authored-by: Sergio Garcia <sergargar1@gmail.com>
This commit is contained in:
Pepe Fagoaga
2023-05-23 15:54:22 +02:00
committed by GitHub
parent d34e0341e2
commit 9e9e7e1e96
45 changed files with 944 additions and 305 deletions

22
poetry.lock generated
View File

@@ -1006,7 +1006,7 @@ name = "importlib-metadata"
version = "6.6.0"
description = "Read metadata from Python packages"
category = "main"
optional = true
optional = false
python-versions = ">=3.7"
files = [
{file = "importlib_metadata-6.6.0-py3-none-any.whl", hash = "sha256:43dd286a2cd8995d5eaef7fee2066340423b818ed3fd70adf0bad5f1fac53fed"},
@@ -1989,6 +1989,22 @@ tomli = {version = ">=1.0.0", markers = "python_version < \"3.11\""}
[package.extras]
testing = ["argcomplete", "attrs (>=19.2.0)", "hypothesis (>=3.56)", "mock", "nose", "pygments (>=2.7.2)", "requests", "xmlschema"]
[[package]]
name = "pytest-randomly"
version = "3.12.0"
description = "Pytest plugin to randomly order tests and control random.seed."
category = "dev"
optional = false
python-versions = ">=3.7"
files = [
{file = "pytest-randomly-3.12.0.tar.gz", hash = "sha256:d60c2db71ac319aee0fc6c4110a7597d611a8b94a5590918bfa8583f00caccb2"},
{file = "pytest_randomly-3.12.0-py3-none-any.whl", hash = "sha256:f4f2e803daf5d1ba036cc22bf4fe9dbbf99389ec56b00e5cba732fb5c1d07fdd"},
]
[package.dependencies]
importlib-metadata = {version = ">=3.6.0", markers = "python_version < \"3.10\""}
pytest = "*"
[[package]]
name = "pytest-xdist"
version = "3.3.0"
@@ -2858,7 +2874,7 @@ name = "zipp"
version = "3.15.0"
description = "Backport of pathlib-compatible object wrapper for zip files"
category = "main"
optional = true
optional = false
python-versions = ">=3.7"
files = [
{file = "zipp-3.15.0-py3-none-any.whl", hash = "sha256:48904fc76a60e542af151aded95726c1a5c34ed43ab4134b597665c86d7ad556"},
@@ -2875,4 +2891,4 @@ docs = ["mkdocs", "mkdocs-material"]
[metadata]
lock-version = "2.0"
python-versions = "^3.9"
content-hash = "fad0b94d53f2c82f07c7cdded0469947ce28c7b854d8c8133d7460e92b56b495"
content-hash = "25f5c9874335d9fe564abc5bd7938c9350fc083dc6074f5de9f75a916ba4d71c"

View File

@@ -62,18 +62,19 @@ class Lambda:
):
lambda_name = function["FunctionName"]
lambda_arn = function["FunctionArn"]
self.functions[lambda_name] = Function(
# We must use the Lambda ARN as the dict key since we could have Lambdas in different regions with the same name
self.functions[lambda_arn] = Function(
name=lambda_name,
arn=lambda_arn,
region=regional_client.region,
)
if "Runtime" in function:
self.functions[lambda_name].runtime = function["Runtime"]
self.functions[lambda_arn].runtime = function["Runtime"]
if "Environment" in function:
lambda_environment = function["Environment"].get(
"Variables"
)
self.functions[lambda_name].environment = lambda_environment
self.functions[lambda_arn].environment = lambda_environment
except Exception as error:
logger.error(
@@ -93,7 +94,7 @@ class Lambda:
if "Location" in function_information["Code"]:
code_location_uri = function_information["Code"]["Location"]
raw_code_zip = requests.get(code_location_uri).content
self.functions[function.name].code = LambdaCode(
self.functions[function.arn].code = LambdaCode(
location=code_location_uri,
code_zip=zipfile.ZipFile(io.BytesIO(raw_code_zip)),
)
@@ -114,12 +115,12 @@ class Lambda:
function_policy = regional_client.get_policy(
FunctionName=function.name
)
self.functions[function.name].policy = json.loads(
self.functions[function.arn].policy = json.loads(
function_policy["Policy"]
)
except ClientError as e:
if e.response["Error"]["Code"] == "ResourceNotFoundException":
self.functions[function.name].policy = {}
self.functions[function.arn].policy = {}
except Exception as error:
logger.error(
@@ -141,14 +142,14 @@ class Lambda:
allow_origins = function_url_config["Cors"]["AllowOrigins"]
else:
allow_origins = []
self.functions[function.name].url_config = URLConfig(
self.functions[function.arn].url_config = URLConfig(
auth_type=function_url_config["AuthType"],
url=function_url_config["FunctionUrl"],
cors_config=URLConfigCORS(allow_origins=allow_origins),
)
except ClientError as e:
if e.response["Error"]["Code"] == "ResourceNotFoundException":
self.functions[function.name].url_config = None
self.functions[function.arn].url_config = None
except Exception as error:
logger.error(

View File

@@ -52,7 +52,8 @@ class CodeArtifact:
package_domain_owner = repository["domainOwner"]
package_arn = repository["arn"]
# Save Repository
self.repositories[package_name] = Repository(
# We must use the Package ARN as the dict key to have unique keys
self.repositories[package_arn] = Repository(
name=package_name,
arn=package_arn,
domain_name=package_domain_name,

View File

@@ -46,7 +46,8 @@ class Glacier:
):
vault_name = vault["VaultName"]
vault_arn = vault["VaultARN"]
self.vaults[vault_name] = Vault(
# We must use the Vault ARN as the dict key to have unique keys
self.vaults[vault_arn] = Vault(
name=vault_name,
arn=vault_arn,
region=regional_client.region,
@@ -68,12 +69,12 @@ class Glacier:
vault_access_policy = regional_client.get_vault_access_policy(
vaultName=vault.name
)
self.vaults[vault.name].access_policy = json.loads(
self.vaults[vault.arn].access_policy = json.loads(
vault_access_policy["policy"]["Policy"]
)
except ClientError as e:
if e.response["Error"]["Code"] == "ResourceNotFoundException":
self.vaults[vault.name].access_policy = {}
self.vaults[vault.arn].access_policy = {}
except Exception as error:
logger.error(
f"{regional_client.region} --"

View File

@@ -37,7 +37,8 @@ class GlobalAccelerator:
accelerator_arn = accelerator["AcceleratorArn"]
accelerator_name = accelerator["Name"]
enabled = accelerator["Enabled"]
self.accelerators[accelerator_name] = Accelerator(
# We must use the Accelerator ARN as the dict key to have unique keys
self.accelerators[accelerator_arn] = Accelerator(
name=accelerator_name,
arn=accelerator_arn,
region=self.region,

View File

@@ -9,6 +9,7 @@ class rds_instance_backup_enabled(Check):
report = Check_Report_AWS(self.metadata())
report.region = db_instance.region
report.resource_id = db_instance.id
report.resource_arn = db_instance.arn
report.resource_tags = db_instance.tags
if db_instance.backup_retention_period > 0:
report.status = "PASS"

View File

@@ -9,10 +9,11 @@ class rds_instance_deletion_protection(Check):
report = Check_Report_AWS(self.metadata())
report.region = db_instance.region
report.resource_id = db_instance.id
report.resource_arn = db_instance.arn
report.resource_tags = db_instance.tags
# Check if is member of a cluster
if db_instance.cluster_id:
if rds_client.db_clusters[db_instance.cluster_id].deletion_protection:
if rds_client.db_clusters[db_instance.cluster_arn].deletion_protection:
report.status = "PASS"
report.status_extended = f"RDS Instance {db_instance.id} deletion protection is enabled at cluster {db_instance.cluster_id} level."
else:

View File

@@ -5,23 +5,23 @@ from prowler.providers.aws.services.rds.rds_client import rds_client
class rds_instance_deprecated_engine_version(Check):
def execute(self):
findings = []
for instance in rds_client.db_instances:
for db_instance in rds_client.db_instances:
report = Check_Report_AWS(self.metadata())
report.region = instance.region
report.region = db_instance.region
report.status = "FAIL"
report.resource_id = instance.id
report.resource_tags = instance.tags
report.status_extended = f"RDS instance {instance.id} is using a deprecated engine {instance.engine} with version {instance.engine_version}."
report.resource_id = db_instance.id
report.resource_arn = db_instance.arn
report.resource_tags = db_instance.tags
report.status_extended = f"RDS instance {db_instance.id} is using a deprecated engine {db_instance.engine} with version {db_instance.engine_version}."
if (
instance.engine_version
in rds_client.db_engines[instance.region][
instance.engine
db_instance.engine_version
in rds_client.db_engines[db_instance.region][
db_instance.engine
].engine_versions
):
report.status = "PASS"
report.status_extended = f"RDS instance {instance.id} is not using a deprecated engine {instance.engine} with version {instance.engine_version}."
report.status_extended = f"RDS instance {db_instance.id} is not using a deprecated engine {db_instance.engine} with version {db_instance.engine_version}."
findings.append(report)

View File

@@ -9,6 +9,7 @@ class rds_instance_enhanced_monitoring_enabled(Check):
report = Check_Report_AWS(self.metadata())
report.region = db_instance.region
report.resource_id = db_instance.id
report.resource_arn = db_instance.arn
report.resource_tags = db_instance.tags
if db_instance.enhanced_monitoring_arn:
report.status = "PASS"

View File

@@ -9,6 +9,7 @@ class rds_instance_integration_cloudwatch_logs(Check):
report = Check_Report_AWS(self.metadata())
report.region = db_instance.region
report.resource_id = db_instance.id
report.resource_arn = db_instance.arn
report.resource_tags = db_instance.tags
if db_instance.cloudwatch_logs:
report.status = "PASS"

View File

@@ -9,6 +9,7 @@ class rds_instance_minor_version_upgrade_enabled(Check):
report = Check_Report_AWS(self.metadata())
report.region = db_instance.region
report.resource_id = db_instance.id
report.resource_arn = db_instance.arn
report.resource_tags = db_instance.tags
if db_instance.auto_minor_version_upgrade:
report.status = "PASS"

View File

@@ -9,10 +9,11 @@ class rds_instance_multi_az(Check):
report = Check_Report_AWS(self.metadata())
report.region = db_instance.region
report.resource_id = db_instance.id
report.resource_arn = db_instance.arn
report.resource_tags = db_instance.tags
# Check if is member of a cluster
if db_instance.cluster_id:
if rds_client.db_clusters[db_instance.cluster_id].multi_az:
if rds_client.db_clusters[db_instance.cluster_arn].multi_az:
report.status = "PASS"
report.status_extended = f"RDS Instance {db_instance.id} has multi-AZ enabled at cluster {db_instance.cluster_id} level."
else:

View File

@@ -9,6 +9,7 @@ class rds_instance_no_public_access(Check):
report = Check_Report_AWS(self.metadata())
report.region = db_instance.region
report.resource_id = db_instance.id
report.resource_arn = db_instance.arn
report.resource_tags = db_instance.tags
if not db_instance.public:
report.status = "PASS"

View File

@@ -9,6 +9,7 @@ class rds_instance_storage_encrypted(Check):
report = Check_Report_AWS(self.metadata())
report.region = db_instance.region
report.resource_id = db_instance.id
report.resource_arn = db_instance.arn
report.resource_tags = db_instance.tags
if db_instance.encrypted:
report.status = "PASS"

View File

@@ -10,6 +10,7 @@ class rds_instance_transport_encrypted(Check):
report = Check_Report_AWS(self.metadata())
report.region = db_instance.region
report.resource_id = db_instance.id
report.resource_arn = db_instance.arn
report.resource_tags = db_instance.tags
report.status = "FAIL"
report.status_extended = (

View File

@@ -15,6 +15,7 @@ class RDS:
self.service = "rds"
self.session = audit_info.audit_session
self.audited_account = audit_info.audited_account
self.audited_partition = audit_info.audited_partition
self.audit_resources = audit_info.audit_resources
self.regional_clients = generate_regional_clients(self.service, audit_info)
self.db_instances = []
@@ -60,6 +61,7 @@ class RDS:
self.db_instances.append(
DBInstance(
id=instance["DBInstanceIdentifier"],
arn=f"arn:{self.audited_partition}:rds:{regional_client.region}:{self.audited_account}:db:{instance['DBInstanceIdentifier']}",
endpoint=instance.get("Endpoint"),
engine=instance["Engine"],
engine_version=instance["EngineVersion"],
@@ -85,8 +87,9 @@ class RDS:
],
multi_az=instance["MultiAZ"],
cluster_id=instance.get("DBClusterIdentifier"),
cluster_arn=f"arn:{self.audited_partition}:rds:{regional_client.region}:{self.audited_account}:cluster:{instance.get('DBClusterIdentifier')}",
region=regional_client.region,
tags=instance.get("TagList"),
tags=instance.get("TagList", []),
)
)
except Exception as error:
@@ -131,9 +134,10 @@ class RDS:
self.db_snapshots.append(
DBSnapshot(
id=snapshot["DBSnapshotIdentifier"],
arn=f"arn:{self.audited_partition}:rds:{regional_client.region}:{self.audited_account}:snapshot:{snapshot['DBSnapshotIdentifier']}",
instance_id=snapshot["DBInstanceIdentifier"],
region=regional_client.region,
tags=snapshot.get("TagList"),
tags=snapshot.get("TagList", []),
)
)
except Exception as error:
@@ -177,8 +181,10 @@ class RDS:
)
):
if cluster["Engine"] != "docdb":
db_cluster_arn = f"arn:{self.audited_partition}:rds:{regional_client.region}:{self.audited_account}:cluster:{cluster['DBClusterIdentifier']}"
db_cluster = DBCluster(
id=cluster["DBClusterIdentifier"],
arn=db_cluster_arn,
endpoint=cluster.get("Endpoint"),
engine=cluster["Engine"],
status=cluster["Status"],
@@ -197,11 +203,10 @@ class RDS:
parameter_group=cluster["DBClusterParameterGroup"],
multi_az=cluster["MultiAZ"],
region=regional_client.region,
tags=cluster.get("TagList"),
tags=cluster.get("TagList", []),
)
self.db_clusters[
cluster["DBClusterIdentifier"]
] = db_cluster
# We must use a unique value as the dict key to have unique keys
self.db_clusters[db_cluster_arn] = db_cluster
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
@@ -225,9 +230,10 @@ class RDS:
self.db_cluster_snapshots.append(
ClusterSnapshot(
id=snapshot["DBClusterSnapshotIdentifier"],
arn=f"arn:{self.audited_partition}:rds:{regional_client.region}:{self.audited_account}:cluster-snapshot:{snapshot['DBClusterSnapshotIdentifier']}",
cluster_id=snapshot["DBClusterIdentifier"],
region=regional_client.region,
tags=snapshot.get("TagList"),
tags=snapshot.get("TagList", []),
)
)
except Exception as error:
@@ -285,6 +291,8 @@ class RDS:
class DBInstance(BaseModel):
id: str
# arn:{partition}:rds:{region}:{account}:db:{resource_id}
arn: str
endpoint: Optional[dict]
engine: str
engine_version: str
@@ -300,12 +308,14 @@ class DBInstance(BaseModel):
parameter_groups: list[str] = []
parameters: list[dict] = []
cluster_id: Optional[str]
cluster_arn: Optional[str]
region: str
tags: Optional[list] = []
class DBCluster(BaseModel):
id: str
arn: str
endpoint: Optional[str]
engine: str
status: str
@@ -323,6 +333,8 @@ class DBCluster(BaseModel):
class DBSnapshot(BaseModel):
id: str
# arn:{partition}:rds:{region}:{account}:snapshot:{resource_id}
arn: str
instance_id: str
public: bool = False
region: str
@@ -332,6 +344,8 @@ class DBSnapshot(BaseModel):
class ClusterSnapshot(BaseModel):
id: str
cluster_id: str
# arn:{partition}:rds:{region}:{account}:cluster-snapshot:{resource_id}
arn: str
public: bool = False
region: str
tags: Optional[list] = []

View File

@@ -9,6 +9,7 @@ class rds_snapshots_public_access(Check):
report = Check_Report_AWS(self.metadata())
report.region = db_snap.region
report.resource_id = db_snap.id
report.resource_arn = db_snap.arn
report.resource_tags = db_snap.tags
if db_snap.public:
report.status = "FAIL"
@@ -27,6 +28,7 @@ class rds_snapshots_public_access(Check):
report = Check_Report_AWS(self.metadata())
report.region = db_snap.region
report.resource_id = db_snap.id
report.resource_arn = db_snap.arn
report.resource_tags = db_snap.tags
if db_snap.public:
report.status = "FAIL"

View File

@@ -40,14 +40,15 @@ class SecretsManager:
if not self.audit_resources or (
is_resource_filtered(secret["ARN"], self.audit_resources)
):
self.secrets[secret["Name"]] = Secret(
# We must use the Secret ARN as the dict key to have unique keys
self.secrets[secret["ARN"]] = Secret(
arn=secret["ARN"],
name=secret["Name"],
region=regional_client.region,
tags=secret.get("Tags"),
)
if "RotationEnabled" in secret:
self.secrets[secret["Name"]].rotation_enabled = secret[
self.secrets[secret["ARN"]].rotation_enabled = secret[
"RotationEnabled"
]

View File

@@ -63,7 +63,8 @@ class SSM:
):
document_name = document["Name"]
document_arn = f"arn:{self.audited_partition}:ssm:{regional_client.region}:{self.audited_account}:document/{document_name}"
self.documents[document_name] = Document(
# We must use the Document ARN as the dict key to have unique keys
self.documents[document_arn] = Document(
arn=document_arn,
name=document_name,
region=regional_client.region,
@@ -83,7 +84,7 @@ class SSM:
try:
if document.region == regional_client.region:
document_info = regional_client.get_document(Name=document.name)
self.documents[document.name].content = json.loads(
self.documents[document.arn].content = json.loads(
document_info["Content"]
)
@@ -111,7 +112,7 @@ class SSM:
document_permissions = regional_client.describe_document_permission(
Name=document.name, PermissionType="Share"
)
self.documents[document.name].account_owners = document_permissions[
self.documents[document.arn].account_owners = document_permissions[
"AccountIds"
]

View File

@@ -123,7 +123,7 @@ class SSMIncidents:
for response_plan in page["responsePlanSummaries"]:
self.response_plans.append(
ResponsePlan(
arn=response_plan["Arn"],
arn=response_plan.get("Arn", ""),
region=regional_client.region,
name=response_plan["Name"],
)

View File

@@ -61,6 +61,7 @@ moto = "4.1.10"
openapi-spec-validator = "0.5.6"
pylint = "2.17.4"
pytest = "7.3.1"
pytest-randomly = "3.12.0"
pytest-xdist = "3.3.0"
safety = "2.3.5"
sure = "2.0.1"

View File

@@ -16,6 +16,7 @@ from prowler.providers.common.models import Audit_Metadata
# Mock Test Region
AWS_REGION = "eu-west-1"
AWS_REGION_NORTH_VIRGINIA = "us-east-1"
def create_zip_file(code: str = "") -> io.BytesIO:
@@ -47,9 +48,18 @@ def mock_request_get(_):
# Mock generate_regional_clients()
def mock_generate_regional_clients(service, audit_info):
regional_client = audit_info.audit_session.client(service, region_name=AWS_REGION)
regional_client.region = AWS_REGION
return {AWS_REGION: regional_client}
regional_client_eu_west_1 = audit_info.audit_session.client(
service, region_name=AWS_REGION
)
regional_client_us_east_1 = audit_info.audit_session.client(
service, region_name=AWS_REGION_NORTH_VIRGINIA
)
regional_client_eu_west_1.region = AWS_REGION
regional_client_us_east_1.region = AWS_REGION_NORTH_VIRGINIA
return {
AWS_REGION: regional_client_eu_west_1,
AWS_REGION_NORTH_VIRGINIA: regional_client_us_east_1,
}
@patch(
@@ -118,7 +128,7 @@ class Test_Lambda_Service:
Bucket="test-bucket",
CreateBucketConfiguration={"LocationConstraint": AWS_REGION},
)
# Create Test Lambda
# Create Test Lambda 1
lambda_client = client("lambda", region_name=AWS_REGION)
lambda_name = "test-lambda"
resp = lambda_client.create_function(
@@ -139,6 +149,7 @@ class Test_Lambda_Service:
Environment={"Variables": {"db-password": "test-password"}},
Tags={"test": "test"},
)
lambda_arn_1 = resp["FunctionArn"]
# Update Lambda Policy
lambda_policy = {
"Version": "2012-10-17",
@@ -181,49 +192,72 @@ class Test_Lambda_Service:
},
)
lambda_arn = resp["FunctionArn"]
# Create Test Lambda 2 (with the same attributes but different region)
lambda_client_2 = client("lambda", region_name=AWS_REGION_NORTH_VIRGINIA)
lambda_name = "test-lambda"
resp_2 = lambda_client_2.create_function(
FunctionName=lambda_name,
Runtime="python3.7",
Role=iam_role,
Handler="lambda_function.lambda_handler",
Code={"ZipFile": create_zip_file().read()},
Description="test lambda function",
Timeout=3,
MemorySize=128,
PackageType="ZIP",
Publish=True,
VpcConfig={
"SecurityGroupIds": ["sg-123abc"],
"SubnetIds": ["subnet-123abc"],
},
Environment={"Variables": {"db-password": "test-password"}},
Tags={"test": "test"},
)
lambda_arn_2 = resp_2["FunctionArn"]
with mock.patch(
"prowler.providers.aws.services.awslambda.awslambda_service.requests.get",
new=mock_request_get,
):
awslambda = Lambda(self.set_mocked_audit_info())
assert awslambda.functions
assert awslambda.functions[lambda_name].name == lambda_name
assert awslambda.functions[lambda_name].arn == lambda_arn
assert awslambda.functions[lambda_name].runtime == "python3.7"
assert awslambda.functions[lambda_name].environment == {
assert len(awslambda.functions) == 2
# Lambda 1
assert awslambda.functions[lambda_arn_1].name == lambda_name
assert awslambda.functions[lambda_arn_1].arn == lambda_arn_1
assert awslambda.functions[lambda_arn_1].runtime == "python3.7"
assert awslambda.functions[lambda_arn_1].environment == {
"db-password": "test-password"
}
assert awslambda.functions[lambda_name].region == AWS_REGION
assert awslambda.functions[lambda_name].policy == lambda_policy
assert awslambda.functions[lambda_arn_1].region == AWS_REGION
assert awslambda.functions[lambda_arn_1].policy == lambda_policy
assert awslambda.functions[lambda_name].code
assert awslambda.functions[lambda_arn_1].code
assert search(
f"s3://awslambda-{AWS_REGION}-tasks.s3-{AWS_REGION}.amazonaws.com",
awslambda.functions[lambda_name].code.location,
awslambda.functions[lambda_arn_1].code.location,
)
assert awslambda.functions[lambda_name].url_config
assert awslambda.functions[lambda_arn_1].url_config
assert (
awslambda.functions[lambda_name].url_config.auth_type
awslambda.functions[lambda_arn_1].url_config.auth_type
== AuthType.AWS_IAM
)
assert search(
"lambda-url.eu-west-1.on.aws",
awslambda.functions[lambda_name].url_config.url,
awslambda.functions[lambda_arn_1].url_config.url,
)
assert awslambda.functions[lambda_name].url_config.cors_config
assert awslambda.functions[lambda_arn_1].url_config.cors_config
assert awslambda.functions[
lambda_name
lambda_arn_1
].url_config.cors_config.allow_origins == ["*"]
assert awslambda.functions[lambda_name].tags == [{"test": "test"}]
assert awslambda.functions[lambda_arn_1].tags == [{"test": "test"}]
# Pending ZipFile tests
with tempfile.TemporaryDirectory() as tmp_dir_name:
awslambda.functions[lambda_name].code.code_zip.extractall(tmp_dir_name)
awslambda.functions[lambda_arn_1].code.code_zip.extractall(tmp_dir_name)
files_in_zip = next(os.walk(tmp_dir_name))[2]
assert len(files_in_zip) == 1
assert files_in_zip[0] == "lambda_function.py"
@@ -237,3 +271,24 @@ class Test_Lambda_Service:
# return event
# """
# )
# Lambda 2
assert awslambda.functions[lambda_arn_2].name == lambda_name
assert awslambda.functions[lambda_arn_2].arn == lambda_arn_2
assert awslambda.functions[lambda_arn_2].runtime == "python3.7"
assert awslambda.functions[lambda_arn_2].environment == {
"db-password": "test-password"
}
assert awslambda.functions[lambda_arn_2].region == AWS_REGION_NORTH_VIRGINIA
# Emtpy policy
assert awslambda.functions[lambda_arn_2].policy == {
"Id": "default",
"Statement": [],
"Version": "2012-10-17",
}
assert awslambda.functions[lambda_arn_2].code
assert search(
f"s3://awslambda-{AWS_REGION_NORTH_VIRGINIA}-tasks.s3-{AWS_REGION_NORTH_VIRGINIA}.amazonaws.com",
awslambda.functions[lambda_arn_2].code.location,
)

View File

@@ -20,6 +20,10 @@ AWS_ACCOUNT_NUMBER = "123456789012"
# Mocking Access Analyzer Calls
make_api_call = botocore.client.BaseClient._make_api_call
TEST_REPOSITORY_ARN = (
f"arn:aws:codebuild:{AWS_REGION}:{DEFAULT_ACCOUNT_ID}:repository/test-repository"
)
def mock_make_api_call(self, operation_name, kwarg):
"""We have to mock every AWS API call using Boto3"""
@@ -31,7 +35,7 @@ def mock_make_api_call(self, operation_name, kwarg):
"administratorAccount": DEFAULT_ACCOUNT_ID,
"domainName": "test-domain",
"domainOwner": DEFAULT_ACCOUNT_ID,
"arn": f"arn:aws:codebuild:{AWS_REGION}:{DEFAULT_ACCOUNT_ID}:repository/test-repository",
"arn": TEST_REPOSITORY_ARN,
"description": "test description",
},
]
@@ -146,63 +150,70 @@ class Test_CodeArtifact_Service:
assert len(codeartifact.repositories) == 1
assert codeartifact.repositories
assert codeartifact.repositories["test-repository"]
assert codeartifact.repositories["test-repository"].name == "test-repository"
assert codeartifact.repositories["test-repository"].tags == [
assert codeartifact.repositories[
f"arn:aws:codebuild:{AWS_REGION}:{DEFAULT_ACCOUNT_ID}:repository/test-repository"
]
assert codeartifact.repositories[TEST_REPOSITORY_ARN].name == "test-repository"
assert codeartifact.repositories[
f"arn:aws:codebuild:{AWS_REGION}:{DEFAULT_ACCOUNT_ID}:repository/test-repository"
].tags == [
{"key": "test", "value": "test"},
]
assert codeartifact.repositories[TEST_REPOSITORY_ARN].arn == TEST_REPOSITORY_ARN
assert (
codeartifact.repositories["test-repository"].arn
== f"arn:aws:codebuild:{AWS_REGION}:{DEFAULT_ACCOUNT_ID}:repository/test-repository"
codeartifact.repositories[TEST_REPOSITORY_ARN].domain_name == "test-domain"
)
assert codeartifact.repositories["test-repository"].domain_name == "test-domain"
assert (
codeartifact.repositories["test-repository"].domain_owner
codeartifact.repositories[TEST_REPOSITORY_ARN].domain_owner
== DEFAULT_ACCOUNT_ID
)
assert codeartifact.repositories["test-repository"].region == AWS_REGION
assert codeartifact.repositories[TEST_REPOSITORY_ARN].region == AWS_REGION
assert codeartifact.repositories["test-repository"].packages
assert len(codeartifact.repositories["test-repository"].packages) == 1
assert codeartifact.repositories[
f"arn:aws:codebuild:{AWS_REGION}:{DEFAULT_ACCOUNT_ID}:repository/test-repository"
].packages
assert len(codeartifact.repositories[TEST_REPOSITORY_ARN].packages) == 1
assert (
codeartifact.repositories["test-repository"].packages[0].name
codeartifact.repositories[TEST_REPOSITORY_ARN].packages[0].name
== "test-package"
)
assert (
codeartifact.repositories["test-repository"].packages[0].namespace
codeartifact.repositories[TEST_REPOSITORY_ARN].packages[0].namespace
== "test-namespace"
)
assert codeartifact.repositories["test-repository"].packages[0].format == "pypi"
assert (
codeartifact.repositories["test-repository"]
codeartifact.repositories[TEST_REPOSITORY_ARN].packages[0].format == "pypi"
)
assert (
codeartifact.repositories[TEST_REPOSITORY_ARN]
.packages[0]
.origin_configuration.restrictions.publish
== RestrictionValues.ALLOW
)
assert (
codeartifact.repositories["test-repository"]
codeartifact.repositories[TEST_REPOSITORY_ARN]
.packages[0]
.origin_configuration.restrictions.upstream
== RestrictionValues.ALLOW
)
assert (
codeartifact.repositories["test-repository"]
codeartifact.repositories[TEST_REPOSITORY_ARN]
.packages[0]
.latest_version.version
== "latest"
)
assert (
codeartifact.repositories["test-repository"]
codeartifact.repositories[TEST_REPOSITORY_ARN]
.packages[0]
.latest_version.status
== LatestPackageVersionStatus.Published
)
assert (
codeartifact.repositories["test-repository"]
codeartifact.repositories[TEST_REPOSITORY_ARN]
.packages[0]
.latest_version.origin.origin_type
== OriginInformationValues.INTERNAL

View File

@@ -39,6 +39,9 @@ class Test_elb_ssl_listeners:
from prowler.providers.aws.services.elb.elb_service import ELB
with mock.patch(
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
new=self.set_mocked_audit_info(),
), mock.patch(
"prowler.providers.aws.services.elb.elb_ssl_listeners.elb_ssl_listeners.elb_client",
new=ELB(self.set_mocked_audit_info()),
):

View File

@@ -24,7 +24,7 @@ class Test_elbv2_desync_mitigation_mode:
audited_partition="aws",
audited_identity_arn=None,
profile=None,
profile_region=None,
profile_region=AWS_REGION,
credentials=None,
assumed_role_info=None,
audited_regions=["us-east-1", "eu-west-1"],
@@ -39,6 +39,9 @@ class Test_elbv2_desync_mitigation_mode:
from prowler.providers.aws.services.elbv2.elbv2_service import ELBv2
with mock.patch(
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
new=self.set_mocked_audit_info(),
), mock.patch(
"prowler.providers.aws.services.elbv2.elbv2_desync_mitigation_mode.elbv2_desync_mitigation_mode.elbv2_client",
new=ELBv2(self.set_mocked_audit_info()),
):

View File

@@ -15,6 +15,9 @@ AWS_ACCOUNT_NUMBER = "123456789012"
# Mocking Access Analyzer Calls
make_api_call = botocore.client.BaseClient._make_api_call
TEST_VAULT_ARN = (
f"arn:aws:glacier:{AWS_REGION}:{DEFAULT_ACCOUNT_ID}:vaults/examplevault"
)
vault_json_policy = {
"Version": "2012-10-17",
"Statement": [
@@ -28,9 +31,7 @@ vault_json_policy = {
"glacier:AbortMultipartUpload",
"glacier:CompleteMultipartUpload",
],
"Resource": [
f"arn:aws:glacier:{AWS_REGION}:{DEFAULT_ACCOUNT_ID}:vaults/examplevault"
],
"Resource": [TEST_VAULT_ARN],
}
],
}
@@ -42,7 +43,7 @@ def mock_make_api_call(self, operation_name, kwarg):
return {
"VaultList": [
{
"VaultARN": f"arn:aws:glacier:{AWS_REGION}:{DEFAULT_ACCOUNT_ID}:vaults/examplevault",
"VaultARN": TEST_VAULT_ARN,
"VaultName": "examplevault",
"CreationDate": "2012-03-16T22:22:47.214Z",
"LastInventoryDate": "2012-03-21T22:06:51.218Z",
@@ -118,25 +119,25 @@ class Test_Glacier_Service:
glacier = Glacier(self.set_mocked_audit_info())
vault_name = "examplevault"
assert len(glacier.vaults) == 1
assert glacier.vaults[vault_name]
assert glacier.vaults[vault_name].name == vault_name
assert glacier.vaults[TEST_VAULT_ARN]
assert glacier.vaults[TEST_VAULT_ARN].name == vault_name
assert (
glacier.vaults[vault_name].arn
glacier.vaults[TEST_VAULT_ARN].arn
== f"arn:aws:glacier:{AWS_REGION}:{DEFAULT_ACCOUNT_ID}:vaults/examplevault"
)
assert glacier.vaults[vault_name].region == AWS_REGION
assert glacier.vaults[vault_name].tags == [{"test": "test"}]
assert glacier.vaults[TEST_VAULT_ARN].region == AWS_REGION
assert glacier.vaults[TEST_VAULT_ARN].tags == [{"test": "test"}]
def test__get_vault_access_policy__(self):
# Set partition for the service
glacier = Glacier(self.set_mocked_audit_info())
vault_name = "examplevault"
assert len(glacier.vaults) == 1
assert glacier.vaults[vault_name]
assert glacier.vaults[vault_name].name == vault_name
assert glacier.vaults[TEST_VAULT_ARN]
assert glacier.vaults[TEST_VAULT_ARN].name == vault_name
assert (
glacier.vaults[vault_name].arn
glacier.vaults[TEST_VAULT_ARN].arn
== f"arn:aws:glacier:{AWS_REGION}:{DEFAULT_ACCOUNT_ID}:vaults/examplevault"
)
assert glacier.vaults[vault_name].region == AWS_REGION
assert glacier.vaults[vault_name].access_policy == vault_json_policy
assert glacier.vaults[TEST_VAULT_ARN].region == AWS_REGION
assert glacier.vaults[TEST_VAULT_ARN].access_policy == vault_json_policy

View File

@@ -14,6 +14,8 @@ AWS_REGION = "us-west-2"
# Mocking Access Analyzer Calls
make_api_call = botocore.client.BaseClient._make_api_call
TEST_ACCELERATOR_ARN = f"arn:aws:globalaccelerator::{DEFAULT_ACCOUNT_ID}:accelerator/5555abcd-abcd-5555-abcd-5555EXAMPLE1"
def mock_make_api_call(self, operation_name, kwarg):
"""We have to mock every AWS API call using Boto3"""
@@ -21,7 +23,7 @@ def mock_make_api_call(self, operation_name, kwarg):
return {
"Accelerators": [
{
"AcceleratorArn": f"arn:aws:globalaccelerator::{DEFAULT_ACCOUNT_ID}:accelerator/5555abcd-abcd-5555-abcd-5555EXAMPLE1",
"AcceleratorArn": TEST_ACCELERATOR_ARN,
"Name": "TestAccelerator",
"IpAddressType": "IPV4",
"Enabled": True,
@@ -96,13 +98,18 @@ class Test_GlobalAccelerator_Service:
audit_info = self.set_mocked_audit_info()
globalaccelerator = GlobalAccelerator(audit_info)
accelerator_arn = f"arn:aws:globalaccelerator::{DEFAULT_ACCOUNT_ID}:accelerator/5555abcd-abcd-5555-abcd-5555EXAMPLE1"
accelerator_name = "TestAccelerator"
assert globalaccelerator.accelerators
assert len(globalaccelerator.accelerators) == 1
assert globalaccelerator.accelerators[accelerator_name]
assert globalaccelerator.accelerators[accelerator_name].name == accelerator_name
assert globalaccelerator.accelerators[accelerator_name].arn == accelerator_arn
assert globalaccelerator.accelerators[accelerator_name].region == AWS_REGION
assert globalaccelerator.accelerators[accelerator_name].enabled
assert globalaccelerator.accelerators[TEST_ACCELERATOR_ARN]
assert (
globalaccelerator.accelerators[TEST_ACCELERATOR_ARN].name
== accelerator_name
)
assert (
globalaccelerator.accelerators[TEST_ACCELERATOR_ARN].arn
== TEST_ACCELERATOR_ARN
)
assert globalaccelerator.accelerators[TEST_ACCELERATOR_ARN].region == AWS_REGION
assert globalaccelerator.accelerators[TEST_ACCELERATOR_ARN].enabled

View File

@@ -81,9 +81,13 @@ class Test_iam_role_cross_service_confused_deputy_prevention:
current_audit_info = self.set_mocked_audit_info()
current_audit_info.audited_account = AWS_ACCOUNT_ID
with mock.patch(
"prowler.providers.aws.services.iam.iam_service.IAM",
iam_client,
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
new=current_audit_info,
), mock.patch(
"prowler.providers.aws.services.iam.iam_role_cross_service_confused_deputy_prevention.iam_role_cross_service_confused_deputy_prevention.iam_client",
new=iam_client,
):
# Test Check
from prowler.providers.aws.services.iam.iam_role_cross_service_confused_deputy_prevention.iam_role_cross_service_confused_deputy_prevention import (

View File

@@ -1,11 +1,15 @@
from unittest import mock
from boto3 import session
from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info
from prowler.providers.aws.services.networkfirewall.networkfirewall_service import (
Firewall,
)
from prowler.providers.aws.services.vpc.vpc_service import VPCs, VpcSubnet
AWS_REGION = "us-east-1"
AWS_ACCOUNT_NUMBER = "123456789012"
FIREWALL_ARN = "arn:aws:network-firewall:us-east-1:123456789012:firewall/my-firewall"
FIREWALL_NAME = "my-firewall"
VPC_ID_PROTECTED = "vpc-12345678901234567"
@@ -14,6 +18,30 @@ POLICY_ARN = "arn:aws:network-firewall:us-east-1:123456789012:firewall-policy/my
class Test_networkfirewall_in_all_vpc:
# Mocked Audit Info
def set_mocked_audit_info(self):
audit_info = AWS_Audit_Info(
session_config=None,
original_session=None,
audit_session=session.Session(
profile_name=None,
botocore_session=None,
region_name=AWS_REGION,
),
audited_account=AWS_ACCOUNT_NUMBER,
audited_user_id=None,
audited_partition="aws",
audited_identity_arn=None,
profile=None,
profile_region=AWS_REGION,
credentials=None,
assumed_role_info=None,
audited_regions=None,
organizations_metadata=None,
audit_resources=None,
)
return audit_info
def test_no_vpcs(self):
networkfirewall_client = mock.MagicMock
networkfirewall_client.region = AWS_REGION
@@ -21,23 +49,30 @@ class Test_networkfirewall_in_all_vpc:
vpc_client = mock.MagicMock
vpc_client.region = AWS_REGION
vpc_client.vpcs = {}
audit_info = self.set_mocked_audit_info()
with mock.patch(
"prowler.providers.aws.services.networkfirewall.networkfirewall_service.NetworkFirewall",
new=networkfirewall_client,
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
new=audit_info,
):
with mock.patch(
"prowler.providers.aws.services.vpc.vpc_service.VPC",
"prowler.providers.aws.services.networkfirewall.networkfirewall_in_all_vpc.networkfirewall_in_all_vpc.vpc_client",
new=vpc_client,
):
# Test Check
from prowler.providers.aws.services.networkfirewall.networkfirewall_in_all_vpc.networkfirewall_in_all_vpc import (
networkfirewall_in_all_vpc,
)
with mock.patch(
"prowler.providers.aws.services.networkfirewall.networkfirewall_in_all_vpc.networkfirewall_in_all_vpc.networkfirewall_client",
new=networkfirewall_client,
):
# Test Check
from prowler.providers.aws.services.networkfirewall.networkfirewall_in_all_vpc.networkfirewall_in_all_vpc import (
networkfirewall_in_all_vpc,
)
check = networkfirewall_in_all_vpc()
result = check.execute()
check = networkfirewall_in_all_vpc()
result = check.execute()
assert len(result) == 0
assert len(result) == 0
def test_vpcs_with_firewall_all(self):
networkfirewall_client = mock.MagicMock
@@ -78,32 +113,39 @@ class Test_networkfirewall_in_all_vpc:
tags=[],
)
}
audit_info = self.set_mocked_audit_info()
with mock.patch(
"prowler.providers.aws.services.networkfirewall.networkfirewall_service.NetworkFirewall",
new=networkfirewall_client,
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
new=audit_info,
):
with mock.patch(
"prowler.providers.aws.services.vpc.vpc_service.VPC",
"prowler.providers.aws.services.networkfirewall.networkfirewall_in_all_vpc.networkfirewall_in_all_vpc.vpc_client",
new=vpc_client,
):
# Test Check
from prowler.providers.aws.services.networkfirewall.networkfirewall_in_all_vpc.networkfirewall_in_all_vpc import (
networkfirewall_in_all_vpc,
)
with mock.patch(
"prowler.providers.aws.services.networkfirewall.networkfirewall_in_all_vpc.networkfirewall_in_all_vpc.networkfirewall_client",
new=networkfirewall_client,
):
# Test Check
from prowler.providers.aws.services.networkfirewall.networkfirewall_in_all_vpc.networkfirewall_in_all_vpc import (
networkfirewall_in_all_vpc,
)
check = networkfirewall_in_all_vpc()
result = check.execute()
check = networkfirewall_in_all_vpc()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"VPC {VPC_ID_PROTECTED} has Network Firewall enabled."
)
assert result[0].region == AWS_REGION
assert result[0].resource_id == VPC_ID_PROTECTED
assert result[0].resource_tags == []
assert result[0].resource_arn == ""
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"VPC {VPC_ID_PROTECTED} has Network Firewall enabled."
)
assert result[0].region == AWS_REGION
assert result[0].resource_id == VPC_ID_PROTECTED
assert result[0].resource_tags == []
assert result[0].resource_arn == ""
def test_vpcs_without_firewall(self):
networkfirewall_client = mock.MagicMock
@@ -134,32 +176,39 @@ class Test_networkfirewall_in_all_vpc:
tags=[],
)
}
audit_info = self.set_mocked_audit_info()
with mock.patch(
"prowler.providers.aws.services.networkfirewall.networkfirewall_service.NetworkFirewall",
new=networkfirewall_client,
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
new=audit_info,
):
with mock.patch(
"prowler.providers.aws.services.vpc.vpc_service.VPC",
"prowler.providers.aws.services.networkfirewall.networkfirewall_in_all_vpc.networkfirewall_in_all_vpc.vpc_client",
new=vpc_client,
):
# Test Check
from prowler.providers.aws.services.networkfirewall.networkfirewall_in_all_vpc.networkfirewall_in_all_vpc import (
networkfirewall_in_all_vpc,
)
with mock.patch(
"prowler.providers.aws.services.networkfirewall.networkfirewall_in_all_vpc.networkfirewall_in_all_vpc.networkfirewall_client",
new=networkfirewall_client,
):
# Test Check
from prowler.providers.aws.services.networkfirewall.networkfirewall_in_all_vpc.networkfirewall_in_all_vpc import (
networkfirewall_in_all_vpc,
)
check = networkfirewall_in_all_vpc()
result = check.execute()
check = networkfirewall_in_all_vpc()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"VPC {VPC_ID_UNPROTECTED} does not have Network Firewall enabled."
)
assert result[0].region == AWS_REGION
assert result[0].resource_id == VPC_ID_UNPROTECTED
assert result[0].resource_tags == []
assert result[0].resource_arn == ""
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"VPC {VPC_ID_UNPROTECTED} does not have Network Firewall enabled."
)
assert result[0].region == AWS_REGION
assert result[0].resource_id == VPC_ID_UNPROTECTED
assert result[0].resource_tags == []
assert result[0].resource_arn == ""
def test_vpcs_with_and_without_firewall(self):
networkfirewall_client = mock.MagicMock
@@ -221,41 +270,48 @@ class Test_networkfirewall_in_all_vpc:
tags=[],
),
}
audit_info = self.set_mocked_audit_info()
with mock.patch(
"prowler.providers.aws.services.networkfirewall.networkfirewall_service.NetworkFirewall",
new=networkfirewall_client,
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
new=audit_info,
):
with mock.patch(
"prowler.providers.aws.services.vpc.vpc_service.VPC",
"prowler.providers.aws.services.networkfirewall.networkfirewall_in_all_vpc.networkfirewall_in_all_vpc.vpc_client",
new=vpc_client,
):
# Test Check
from prowler.providers.aws.services.networkfirewall.networkfirewall_in_all_vpc.networkfirewall_in_all_vpc import (
networkfirewall_in_all_vpc,
)
with mock.patch(
"prowler.providers.aws.services.networkfirewall.networkfirewall_in_all_vpc.networkfirewall_in_all_vpc.networkfirewall_client",
new=networkfirewall_client,
):
# Test Check
from prowler.providers.aws.services.networkfirewall.networkfirewall_in_all_vpc.networkfirewall_in_all_vpc import (
networkfirewall_in_all_vpc,
)
check = networkfirewall_in_all_vpc()
result = check.execute()
check = networkfirewall_in_all_vpc()
result = check.execute()
assert len(result) == 2
for r in result:
if r.resource_id == VPC_ID_PROTECTED:
assert r.status == "PASS"
assert (
r.status_extended
== f"VPC {VPC_ID_PROTECTED} has Network Firewall enabled."
)
assert r.region == AWS_REGION
assert r.resource_id == VPC_ID_PROTECTED
assert r.resource_tags == []
assert r.resource_arn == ""
if r.resource_id == VPC_ID_UNPROTECTED:
assert r.status == "FAIL"
assert (
r.status_extended
== f"VPC {VPC_ID_UNPROTECTED} does not have Network Firewall enabled."
)
assert r.region == AWS_REGION
assert r.resource_id == VPC_ID_UNPROTECTED
assert r.resource_tags == []
assert r.resource_arn == ""
assert len(result) == 2
for r in result:
if r.resource_id == VPC_ID_PROTECTED:
assert r.status == "PASS"
assert (
r.status_extended
== f"VPC {VPC_ID_PROTECTED} has Network Firewall enabled."
)
assert r.region == AWS_REGION
assert r.resource_id == VPC_ID_PROTECTED
assert r.resource_tags == []
assert r.resource_arn == ""
if r.resource_id == VPC_ID_UNPROTECTED:
assert r.status == "FAIL"
assert (
r.status_extended
== f"VPC {VPC_ID_UNPROTECTED} does not have Network Firewall enabled."
)
assert r.region == AWS_REGION
assert r.resource_id == VPC_ID_UNPROTECTED
assert r.resource_tags == []
assert r.resource_arn == ""

View File

@@ -1,17 +1,44 @@
from unittest import mock
from boto3 import session
from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info
from prowler.providers.aws.services.organizations.organizations_service import (
Organization,
Policy,
)
AWS_REGION = "us-east-1"
AWS_ACCOUNT_NUMBER = "123456789012"
# Moto: NotImplementedError: The TAG_POLICY policy type has not been implemented
# Needs to Mock manually
class Test_organizations_tags_policies_enabled_and_attached:
# Mocked Audit Info
def set_mocked_audit_info(self):
audit_info = AWS_Audit_Info(
session_config=None,
original_session=None,
audit_session=session.Session(
profile_name=None,
botocore_session=None,
region_name=AWS_REGION,
),
audited_account=AWS_ACCOUNT_NUMBER,
audited_user_id=None,
audited_partition="aws",
audited_identity_arn=None,
profile=None,
profile_region=AWS_REGION,
credentials=None,
assumed_role_info=None,
audited_regions=None,
organizations_metadata=None,
audit_resources=None,
)
return audit_info
def test_organization_no_organization(self):
organizations_client = mock.MagicMock
organizations_client.region = AWS_REGION
@@ -24,27 +51,33 @@ class Test_organizations_tags_policies_enabled_and_attached:
)
]
audit_info = self.set_mocked_audit_info()
with mock.patch(
"prowler.providers.aws.services.organizations.organizations_service.Organizations",
new=organizations_client,
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
new=audit_info,
):
# Test Check
from prowler.providers.aws.services.organizations.organizations_tags_policies_enabled_and_attached.organizations_tags_policies_enabled_and_attached import (
organizations_tags_policies_enabled_and_attached,
)
with mock.patch(
"prowler.providers.aws.services.organizations.organizations_tags_policies_enabled_and_attached.organizations_tags_policies_enabled_and_attached.organizations_client",
new=organizations_client,
):
# Test Check
from prowler.providers.aws.services.organizations.organizations_tags_policies_enabled_and_attached.organizations_tags_policies_enabled_and_attached import (
organizations_tags_policies_enabled_and_attached,
)
check = organizations_tags_policies_enabled_and_attached()
result = check.execute()
check = organizations_tags_policies_enabled_and_attached()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== "AWS Organizations is not in-use for this AWS Account"
)
assert result[0].resource_id == "AWS Organization"
assert result[0].resource_arn == ""
assert result[0].region == AWS_REGION
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== "AWS Organizations is not in-use for this AWS Account"
)
assert result[0].resource_id == "AWS Organization"
assert result[0].resource_arn == ""
assert result[0].region == AWS_REGION
def test_organization_with_tag_policies_not_attached(self):
organizations_client = mock.MagicMock
@@ -69,30 +102,36 @@ class Test_organizations_tags_policies_enabled_and_attached:
)
]
audit_info = self.set_mocked_audit_info()
with mock.patch(
"prowler.providers.aws.services.organizations.organizations_tags_policies_enabled_and_attached.organizations_tags_policies_enabled_and_attached.organizations_client",
new=organizations_client,
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
new=audit_info,
):
# Test Check
from prowler.providers.aws.services.organizations.organizations_tags_policies_enabled_and_attached.organizations_tags_policies_enabled_and_attached import (
organizations_tags_policies_enabled_and_attached,
)
with mock.patch(
"prowler.providers.aws.services.organizations.organizations_tags_policies_enabled_and_attached.organizations_tags_policies_enabled_and_attached.organizations_client",
new=organizations_client,
):
# Test Check
from prowler.providers.aws.services.organizations.organizations_tags_policies_enabled_and_attached.organizations_tags_policies_enabled_and_attached import (
organizations_tags_policies_enabled_and_attached,
)
check = organizations_tags_policies_enabled_and_attached()
result = check.execute()
check = organizations_tags_policies_enabled_and_attached()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== "AWS Organization o-1234567890 has tag policies enabled but not attached"
)
assert result[0].resource_id == "o-1234567890"
assert (
result[0].resource_arn
== "arn:aws:organizations::1234567890:organization/o-1234567890"
)
assert result[0].region == AWS_REGION
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== "AWS Organization o-1234567890 has tag policies enabled but not attached"
)
assert result[0].resource_id == "o-1234567890"
assert (
result[0].resource_arn
== "arn:aws:organizations::1234567890:organization/o-1234567890"
)
assert result[0].region == AWS_REGION
def test_organization_with_tag_policies_attached(self):
organizations_client = mock.MagicMock
@@ -117,27 +156,33 @@ class Test_organizations_tags_policies_enabled_and_attached:
)
]
audit_info = self.set_mocked_audit_info()
with mock.patch(
"prowler.providers.aws.services.organizations.organizations_tags_policies_enabled_and_attached.organizations_tags_policies_enabled_and_attached.organizations_client",
new=organizations_client,
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
new=audit_info,
):
# Test Check
from prowler.providers.aws.services.organizations.organizations_tags_policies_enabled_and_attached.organizations_tags_policies_enabled_and_attached import (
organizations_tags_policies_enabled_and_attached,
)
with mock.patch(
"prowler.providers.aws.services.organizations.organizations_tags_policies_enabled_and_attached.organizations_tags_policies_enabled_and_attached.organizations_client",
new=organizations_client,
):
# Test Check
from prowler.providers.aws.services.organizations.organizations_tags_policies_enabled_and_attached.organizations_tags_policies_enabled_and_attached import (
organizations_tags_policies_enabled_and_attached,
)
check = organizations_tags_policies_enabled_and_attached()
result = check.execute()
check = organizations_tags_policies_enabled_and_attached()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "AWS Organization o-1234567890 has tag policies enabled and attached to an AWS account"
)
assert result[0].resource_id == "o-1234567890"
assert (
result[0].resource_arn
== "arn:aws:organizations::1234567890:organization/o-1234567890"
)
assert result[0].region == AWS_REGION
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "AWS Organization o-1234567890 has tag policies enabled and attached to an AWS account"
)
assert result[0].resource_id == "o-1234567890"
assert (
result[0].resource_arn
== "arn:aws:organizations::1234567890:organization/o-1234567890"
)
assert result[0].region == AWS_REGION

View File

@@ -1,6 +1,7 @@
from re import search
from unittest import mock
import botocore
from boto3 import client, session
from moto import mock_rds
@@ -10,6 +11,25 @@ AWS_ACCOUNT_NUMBER = "123456789012"
AWS_REGION = "us-east-1"
make_api_call = botocore.client.BaseClient._make_api_call
def mock_make_api_call(self, operation_name, kwarg):
if operation_name == "DescribeDBEngineVersions":
return {
"DBEngineVersions": [
{
"Engine": "mysql",
"EngineVersion": "8.0.32",
"DBEngineDescription": "description",
"DBEngineVersionDescription": "description",
},
]
}
return make_api_call(self, operation_name, kwarg)
@mock.patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call)
class Test_rds_instance_backup_enabled:
# Mocked Audit Info
def set_mocked_audit_info(self):
@@ -99,6 +119,12 @@ class Test_rds_instance_backup_enabled:
result[0].status_extended,
)
assert result[0].resource_id == "db-master-1"
assert result[0].region == AWS_REGION
assert (
result[0].resource_arn
== f"arn:aws:rds:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:db:db-master-1"
)
assert result[0].resource_tags == []
@mock_rds
def test_rds_instance_with_backup(self):
@@ -138,3 +164,9 @@ class Test_rds_instance_backup_enabled:
result[0].status_extended,
)
assert result[0].resource_id == "db-master-1"
assert result[0].region == AWS_REGION
assert (
result[0].resource_arn
== f"arn:aws:rds:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:db:db-master-1"
)
assert result[0].resource_tags == []

View File

@@ -1,6 +1,7 @@
from re import search
from unittest import mock
import botocore
from boto3 import client, session
from moto import mock_rds
@@ -10,6 +11,25 @@ AWS_ACCOUNT_NUMBER = "123456789012"
AWS_REGION = "us-east-1"
make_api_call = botocore.client.BaseClient._make_api_call
def mock_make_api_call(self, operation_name, kwarg):
if operation_name == "DescribeDBEngineVersions":
return {
"DBEngineVersions": [
{
"Engine": "mysql",
"EngineVersion": "8.0.32",
"DBEngineDescription": "description",
"DBEngineVersionDescription": "description",
},
]
}
return make_api_call(self, operation_name, kwarg)
@mock.patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call)
class Test_rds_instance_deletion_protection:
# Mocked Audit Info
def set_mocked_audit_info(self):
@@ -96,6 +116,12 @@ class Test_rds_instance_deletion_protection:
result[0].status_extended,
)
assert result[0].resource_id == "db-master-1"
assert result[0].region == AWS_REGION
assert (
result[0].resource_arn
== f"arn:aws:rds:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:db:db-master-1"
)
assert result[0].resource_tags == []
@mock_rds
def test_rds_instance_with_deletion_protection(self):
@@ -136,6 +162,12 @@ class Test_rds_instance_deletion_protection:
result[0].status_extended,
)
assert result[0].resource_id == "db-master-1"
assert result[0].region == AWS_REGION
assert (
result[0].resource_arn
== f"arn:aws:rds:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:db:db-master-1"
)
assert result[0].resource_tags == []
@mock_rds
def test_rds_instance_without_cluster_deletion_protection(self):
@@ -188,6 +220,12 @@ class Test_rds_instance_deletion_protection:
result[0].status_extended,
)
assert result[0].resource_id == "db-master-1"
assert result[0].region == AWS_REGION
assert (
result[0].resource_arn
== f"arn:aws:rds:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:db:db-master-1"
)
assert result[0].resource_tags == []
@mock_rds
def test_rds_instance_with_cluster_deletion_protection(self):
@@ -240,3 +278,9 @@ class Test_rds_instance_deletion_protection:
result[0].status_extended,
)
assert result[0].resource_id == "db-master-1"
assert result[0].region == AWS_REGION
assert (
result[0].resource_arn
== f"arn:aws:rds:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:db:db-master-1"
)
assert result[0].resource_tags == []

View File

@@ -117,6 +117,11 @@ class Test_rds_instance_deprecated_engine_version:
== "RDS instance db-master-1 is not using a deprecated engine mysql with version 8.0.32."
)
assert result[0].resource_id == "db-master-1"
assert result[0].region == AWS_REGION
assert (
result[0].resource_arn
== f"arn:aws:rds:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:db:db-master-1"
)
assert result[0].resource_tags == []
@mock_rds
@@ -158,4 +163,9 @@ class Test_rds_instance_deprecated_engine_version:
== "RDS instance db-master-2 is using a deprecated engine mysql with version 8.0.23."
)
assert result[0].resource_id == "db-master-2"
assert result[0].region == AWS_REGION
assert (
result[0].resource_arn
== f"arn:aws:rds:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:db:db-master-2"
)
assert result[0].resource_tags == []

View File

@@ -1,6 +1,7 @@
from re import search
from unittest import mock
import botocore
from boto3 import client, session
from moto import mock_rds
@@ -9,7 +10,25 @@ from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info
AWS_ACCOUNT_NUMBER = "123456789012"
AWS_REGION = "us-east-1"
make_api_call = botocore.client.BaseClient._make_api_call
def mock_make_api_call(self, operation_name, kwarg):
if operation_name == "DescribeDBEngineVersions":
return {
"DBEngineVersions": [
{
"Engine": "mysql",
"EngineVersion": "8.0.32",
"DBEngineDescription": "description",
"DBEngineVersionDescription": "description",
},
]
}
return make_api_call(self, operation_name, kwarg)
@mock.patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call)
class Test_rds_instance_enhanced_monitoring_enabled:
# Mocked Audit Info
def set_mocked_audit_info(self):
@@ -97,6 +116,12 @@ class Test_rds_instance_enhanced_monitoring_enabled:
result[0].status_extended,
)
assert result[0].resource_id == "db-master-1"
assert result[0].region == AWS_REGION
assert (
result[0].resource_arn
== f"arn:aws:rds:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:db:db-master-1"
)
assert result[0].resource_tags == []
@mock_rds
def test_rds_instance_with_monitoring(self):
@@ -137,3 +162,9 @@ class Test_rds_instance_enhanced_monitoring_enabled:
result[0].status_extended,
)
assert result[0].resource_id == "db-master-1"
assert result[0].region == AWS_REGION
assert (
result[0].resource_arn
== f"arn:aws:rds:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:db:db-master-1"
)
assert result[0].resource_tags == []

View File

@@ -1,6 +1,7 @@
from re import search
from unittest import mock
import botocore
from boto3 import client, session
from moto import mock_rds
@@ -9,7 +10,25 @@ from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info
AWS_ACCOUNT_NUMBER = "123456789012"
AWS_REGION = "us-east-1"
make_api_call = botocore.client.BaseClient._make_api_call
def mock_make_api_call(self, operation_name, kwarg):
if operation_name == "DescribeDBEngineVersions":
return {
"DBEngineVersions": [
{
"Engine": "mysql",
"EngineVersion": "8.0.32",
"DBEngineDescription": "description",
"DBEngineVersionDescription": "description",
},
]
}
return make_api_call(self, operation_name, kwarg)
@mock.patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call)
class Test_rds_instance_integration_cloudwatch_logs:
# Mocked Audit Info
def set_mocked_audit_info(self):
@@ -97,6 +116,12 @@ class Test_rds_instance_integration_cloudwatch_logs:
result[0].status_extended,
)
assert result[0].resource_id == "db-master-1"
assert result[0].region == AWS_REGION
assert (
result[0].resource_arn
== f"arn:aws:rds:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:db:db-master-1"
)
assert result[0].resource_tags == []
@mock_rds
def test_rds_instance_with_logs(self):
@@ -137,3 +162,9 @@ class Test_rds_instance_integration_cloudwatch_logs:
result[0].status_extended,
)
assert result[0].resource_id == "db-master-1"
assert result[0].region == AWS_REGION
assert (
result[0].resource_arn
== f"arn:aws:rds:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:db:db-master-1"
)
assert result[0].resource_tags == []

View File

@@ -1,6 +1,7 @@
from re import search
from unittest import mock
import botocore
from boto3 import client, session
from moto import mock_rds
@@ -9,7 +10,25 @@ from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info
AWS_ACCOUNT_NUMBER = "123456789012"
AWS_REGION = "us-east-1"
make_api_call = botocore.client.BaseClient._make_api_call
def mock_make_api_call(self, operation_name, kwarg):
if operation_name == "DescribeDBEngineVersions":
return {
"DBEngineVersions": [
{
"Engine": "mysql",
"EngineVersion": "8.0.32",
"DBEngineDescription": "description",
"DBEngineVersionDescription": "description",
},
]
}
return make_api_call(self, operation_name, kwarg)
@mock.patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call)
class Test_rds_instance_minor_version_upgrade_enabled:
# Mocked Audit Info
def set_mocked_audit_info(self):
@@ -97,6 +116,12 @@ class Test_rds_instance_minor_version_upgrade_enabled:
result[0].status_extended,
)
assert result[0].resource_id == "db-master-1"
assert result[0].region == AWS_REGION
assert (
result[0].resource_arn
== f"arn:aws:rds:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:db:db-master-1"
)
assert result[0].resource_tags == []
@mock_rds
def test_rds_instance_with_auto_upgrade(self):
@@ -137,3 +162,9 @@ class Test_rds_instance_minor_version_upgrade_enabled:
result[0].status_extended,
)
assert result[0].resource_id == "db-master-1"
assert result[0].region == AWS_REGION
assert (
result[0].resource_arn
== f"arn:aws:rds:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:db:db-master-1"
)
assert result[0].resource_tags == []

View File

@@ -1,6 +1,7 @@
from re import search
from unittest import mock
import botocore
from boto3 import client, session
from moto import mock_rds
@@ -10,7 +11,25 @@ from prowler.providers.aws.services.rds.rds_service import DBCluster, DBInstance
AWS_ACCOUNT_NUMBER = "123456789012"
AWS_REGION = "us-east-1"
make_api_call = botocore.client.BaseClient._make_api_call
def mock_make_api_call(self, operation_name, kwarg):
if operation_name == "DescribeDBEngineVersions":
return {
"DBEngineVersions": [
{
"Engine": "mysql",
"EngineVersion": "8.0.32",
"DBEngineDescription": "description",
"DBEngineVersionDescription": "description",
},
]
}
return make_api_call(self, operation_name, kwarg)
@mock.patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call)
class Test_rds_instance_multi_az:
# Mocked Audit Info
def set_mocked_audit_info(self):
@@ -97,6 +116,12 @@ class Test_rds_instance_multi_az:
result[0].status_extended,
)
assert result[0].resource_id == "db-master-1"
assert result[0].region == AWS_REGION
assert (
result[0].resource_arn
== f"arn:aws:rds:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:db:db-master-1"
)
assert result[0].resource_tags == []
@mock_rds
def test_rds_instance_multi_az(self):
@@ -137,12 +162,22 @@ class Test_rds_instance_multi_az:
result[0].status_extended,
)
assert result[0].resource_id == "db-master-1"
assert result[0].region == AWS_REGION
assert (
result[0].resource_arn
== f"arn:aws:rds:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:db:db-master-1"
)
assert result[0].resource_tags == []
def test_rds_instance_in_cluster_multi_az(self):
rds_client = mock.MagicMock
cluster_arn = (
f"arn:aws:rds:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:cluster:test-cluster"
)
rds_client.db_clusters = {
"test-cluster": DBCluster(
cluster_arn: DBCluster(
id="test-cluster",
arn=cluster_arn,
endpoint="",
engine="aurora",
status="available",
@@ -161,6 +196,7 @@ class Test_rds_instance_multi_az:
rds_client.db_instances = [
DBInstance(
id="test-instance",
arn=f"arn:aws:rds:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:db:test-instance",
endpoint="",
engine="aurora",
engine_version="1.0.0",
@@ -174,36 +210,53 @@ class Test_rds_instance_multi_az:
parameter_group=[],
multi_az=False,
cluster_id="test-cluster",
cluster_arn=cluster_arn,
region=AWS_REGION,
tags=[],
)
]
audit_info = self.set_mocked_audit_info()
with mock.patch(
"prowler.providers.aws.services.rds.rds_instance_multi_az.rds_instance_multi_az.rds_client",
new=rds_client,
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
new=audit_info,
):
# Test Check
from prowler.providers.aws.services.rds.rds_instance_multi_az.rds_instance_multi_az import (
rds_instance_multi_az,
)
with mock.patch(
"prowler.providers.aws.services.rds.rds_instance_multi_az.rds_instance_multi_az.rds_client",
new=rds_client,
):
# Test Check
from prowler.providers.aws.services.rds.rds_instance_multi_az.rds_instance_multi_az import (
rds_instance_multi_az,
)
check = rds_instance_multi_az()
result = check.execute()
check = rds_instance_multi_az()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert search(
"has multi-AZ enabled at cluster",
result[0].status_extended,
)
assert result[0].resource_id == "test-instance"
assert len(result) == 1
assert result[0].status == "PASS"
assert search(
"has multi-AZ enabled at cluster",
result[0].status_extended,
)
assert result[0].resource_id == "test-instance"
assert result[0].region == AWS_REGION
assert (
result[0].resource_arn
== f"arn:aws:rds:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:db:test-instance"
)
assert result[0].resource_tags == []
def test_rds_instance_in_cluster_without_multi_az(self):
rds_client = mock.MagicMock
cluster_arn = (
f"arn:aws:rds:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:cluster:test-cluster"
)
rds_client.db_clusters = {
"test-cluster": DBCluster(
cluster_arn: DBCluster(
id="test-cluster",
arn=cluster_arn,
endpoint="",
engine="aurora",
status="available",
@@ -222,6 +275,7 @@ class Test_rds_instance_multi_az:
rds_client.db_instances = [
DBInstance(
id="test-instance",
arn=f"arn:aws:rds:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:db:test-instance",
endpoint="",
engine="aurora",
engine_version="1.0.0",
@@ -235,27 +289,40 @@ class Test_rds_instance_multi_az:
parameter_group=[],
multi_az=False,
cluster_id="test-cluster",
cluster_arn=cluster_arn,
region=AWS_REGION,
tags=[],
)
]
audit_info = self.set_mocked_audit_info()
with mock.patch(
"prowler.providers.aws.services.rds.rds_instance_multi_az.rds_instance_multi_az.rds_client",
new=rds_client,
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
new=audit_info,
):
# Test Check
from prowler.providers.aws.services.rds.rds_instance_multi_az.rds_instance_multi_az import (
rds_instance_multi_az,
)
with mock.patch(
"prowler.providers.aws.services.rds.rds_instance_multi_az.rds_instance_multi_az.rds_client",
new=rds_client,
):
# Test Check
from prowler.providers.aws.services.rds.rds_instance_multi_az.rds_instance_multi_az import (
rds_instance_multi_az,
)
check = rds_instance_multi_az()
result = check.execute()
check = rds_instance_multi_az()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert search(
"does not have multi-AZ enabled at cluster",
result[0].status_extended,
)
assert result[0].resource_id == "test-instance"
assert len(result) == 1
assert result[0].status == "FAIL"
assert search(
"does not have multi-AZ enabled at cluster",
result[0].status_extended,
)
assert result[0].resource_id == "test-instance"
assert result[0].region == AWS_REGION
assert (
result[0].resource_arn
== f"arn:aws:rds:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:db:test-instance"
)
assert result[0].resource_tags == []

View File

@@ -1,6 +1,7 @@
from re import search
from unittest import mock
import botocore
from boto3 import client, session
from moto import mock_rds
@@ -9,7 +10,25 @@ from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info
AWS_ACCOUNT_NUMBER = "123456789012"
AWS_REGION = "us-east-1"
make_api_call = botocore.client.BaseClient._make_api_call
def mock_make_api_call(self, operation_name, kwarg):
if operation_name == "DescribeDBEngineVersions":
return {
"DBEngineVersions": [
{
"Engine": "mysql",
"EngineVersion": "8.0.32",
"DBEngineDescription": "description",
"DBEngineVersionDescription": "description",
},
]
}
return make_api_call(self, operation_name, kwarg)
@mock.patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call)
class Test_rds_instance_no_public_access:
# Mocked Audit Info
def set_mocked_audit_info(self):
@@ -97,6 +116,12 @@ class Test_rds_instance_no_public_access:
result[0].status_extended,
)
assert result[0].resource_id == "db-master-1"
assert result[0].region == AWS_REGION
assert (
result[0].resource_arn
== f"arn:aws:rds:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:db:db-master-1"
)
assert result[0].resource_tags == []
@mock_rds
def test_rds_instance_public(self):
@@ -137,3 +162,9 @@ class Test_rds_instance_no_public_access:
result[0].status_extended,
)
assert result[0].resource_id == "db-master-1"
assert result[0].region == AWS_REGION
assert (
result[0].resource_arn
== f"arn:aws:rds:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:db:db-master-1"
)
assert result[0].resource_tags == []

View File

@@ -1,6 +1,7 @@
from re import search
from unittest import mock
import botocore
from boto3 import client, session
from moto import mock_rds
@@ -9,7 +10,25 @@ from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info
AWS_ACCOUNT_NUMBER = "123456789012"
AWS_REGION = "us-east-1"
make_api_call = botocore.client.BaseClient._make_api_call
def mock_make_api_call(self, operation_name, kwarg):
if operation_name == "DescribeDBEngineVersions":
return {
"DBEngineVersions": [
{
"Engine": "mysql",
"EngineVersion": "8.0.32",
"DBEngineDescription": "description",
"DBEngineVersionDescription": "description",
},
]
}
return make_api_call(self, operation_name, kwarg)
@mock.patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call)
class Test_rds_instance_storage_encrypted:
# Mocked Audit Info
def set_mocked_audit_info(self):
@@ -96,6 +115,12 @@ class Test_rds_instance_storage_encrypted:
result[0].status_extended,
)
assert result[0].resource_id == "db-master-1"
assert result[0].region == AWS_REGION
assert (
result[0].resource_arn
== f"arn:aws:rds:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:db:db-master-1"
)
assert result[0].resource_tags == []
@mock_rds
def test_rds_instance_with_encryption(self):
@@ -136,3 +161,9 @@ class Test_rds_instance_storage_encrypted:
result[0].status_extended,
)
assert result[0].resource_id == "db-master-1"
assert result[0].region == AWS_REGION
assert (
result[0].resource_arn
== f"arn:aws:rds:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:db:db-master-1"
)
assert result[0].resource_tags == []

View File

@@ -1,6 +1,7 @@
from re import search
from unittest import mock
import botocore
from boto3 import client, session
from moto import mock_rds
@@ -9,7 +10,25 @@ from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info
AWS_ACCOUNT_NUMBER = "123456789012"
AWS_REGION = "us-east-1"
make_api_call = botocore.client.BaseClient._make_api_call
def mock_make_api_call(self, operation_name, kwarg):
if operation_name == "DescribeDBEngineVersions":
return {
"DBEngineVersions": [
{
"Engine": "mysql",
"EngineVersion": "8.0.32",
"DBEngineDescription": "description",
"DBEngineVersionDescription": "description",
},
]
}
return make_api_call(self, operation_name, kwarg)
@mock.patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call)
class Test_rds_instance_transport_encrypted:
# Mocked Audit Info
def set_mocked_audit_info(self):
@@ -152,6 +171,12 @@ class Test_rds_instance_transport_encrypted:
result[0].status_extended,
)
assert result[0].resource_id == "db-master-1"
assert result[0].region == AWS_REGION
assert (
result[0].resource_arn
== f"arn:aws:rds:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:db:db-master-1"
)
assert result[0].resource_tags == []
@mock_rds
def test_rds_instance_with_ssl(self):
@@ -208,3 +233,9 @@ class Test_rds_instance_transport_encrypted:
result[0].status_extended,
)
assert result[0].resource_id == "db-master-1"
assert result[0].region == AWS_REGION
assert (
result[0].resource_arn
== f"arn:aws:rds:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:db:db-master-1"
)
assert result[0].resource_tags == []

View File

@@ -228,23 +228,31 @@ class Test_RDS_Service:
# RDS client for this test class
audit_info = self.set_mocked_audit_info()
rds = RDS(audit_info)
db_cluster_arn = (
f"arn:aws:rds:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:cluster:{cluster_id}"
)
assert len(rds.db_clusters) == 1
assert rds.db_clusters[cluster_id].id == "db-master-1"
assert rds.db_clusters[cluster_id].engine == "postgres"
assert rds.db_clusters[cluster_id].region == AWS_REGION
assert f"{AWS_REGION}.rds.amazonaws.com" in rds.db_clusters[cluster_id].endpoint
assert rds.db_clusters[cluster_id].status == "available"
assert not rds.db_clusters[cluster_id].public
assert not rds.db_clusters[cluster_id].encrypted
assert rds.db_clusters[cluster_id].backup_retention_period == 1
assert rds.db_clusters[cluster_id].cloudwatch_logs == ["audit", "error"]
assert rds.db_clusters[cluster_id].deletion_protection
assert not rds.db_clusters[cluster_id].auto_minor_version_upgrade
assert not rds.db_clusters[cluster_id].multi_az
assert rds.db_clusters[cluster_id].tags == [
assert rds.db_clusters[db_cluster_arn].id == "db-master-1"
assert rds.db_clusters[db_cluster_arn].engine == "postgres"
assert rds.db_clusters[db_cluster_arn].region == AWS_REGION
assert (
f"{AWS_REGION}.rds.amazonaws.com"
in rds.db_clusters[db_cluster_arn].endpoint
)
assert rds.db_clusters[db_cluster_arn].status == "available"
assert not rds.db_clusters[db_cluster_arn].public
assert not rds.db_clusters[db_cluster_arn].encrypted
assert rds.db_clusters[db_cluster_arn].backup_retention_period == 1
assert rds.db_clusters[db_cluster_arn].cloudwatch_logs == ["audit", "error"]
assert rds.db_clusters[db_cluster_arn].deletion_protection
assert not rds.db_clusters[db_cluster_arn].auto_minor_version_upgrade
assert not rds.db_clusters[db_cluster_arn].multi_az
assert rds.db_clusters[db_cluster_arn].tags == [
{"Key": "test", "Value": "test"},
]
assert rds.db_clusters[cluster_id].parameter_group == "test"
assert rds.db_clusters[db_cluster_arn].parameter_group == "test"
# Test RDS Describe DB Cluster Snapshots
@mock_rds

View File

@@ -1,6 +1,7 @@
from re import search
from unittest import mock
import botocore
from boto3 import client, session
from moto import mock_rds
@@ -9,6 +10,32 @@ from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info
AWS_ACCOUNT_NUMBER = "123456789012"
AWS_REGION = "us-east-1"
make_api_call = botocore.client.BaseClient._make_api_call
def mock_make_api_call(self, operation_name, kwarg):
if operation_name == "DescribeDBEngineVersions":
return {
"DBEngineVersions": [
{
"Engine": "mysql",
"EngineVersion": "8.0.32",
"DBEngineDescription": "description",
"DBEngineVersionDescription": "description",
},
]
}
# if operation_name == "DescribeDBClusterSnapshotAttributes":
# return {
# "DBClusterSnapshotAttributesResult": {
# "DBClusterSnapshotIdentifier": "test-snapshot",
# "DBClusterSnapshotAttributes": [
# {"AttributeName": "restore", "AttributeValues": ["all"]}
# ],
# }
# }
return make_api_call(self, operation_name, kwarg)
class Test_rds_snapshots_public_access:
# Mocked Audit Info
@@ -29,13 +56,14 @@ class Test_rds_snapshots_public_access:
profile_region=AWS_REGION,
credentials=None,
assumed_role_info=None,
audited_regions=None,
audited_regions=[AWS_REGION],
organizations_metadata=None,
audit_resources=None,
)
return audit_info
@mock_rds
@mock.patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call)
def test_rds_no_snapshots(self):
from prowler.providers.aws.services.rds.rds_service import RDS
@@ -60,6 +88,7 @@ class Test_rds_snapshots_public_access:
assert len(result) == 0
@mock_rds
@mock.patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call)
def test_rds_private_snapshot(self):
conn = client("rds", region_name=AWS_REGION)
conn.create_db_instance(
@@ -103,6 +132,7 @@ class Test_rds_snapshots_public_access:
assert result[0].resource_id == "snapshot-1"
@mock_rds
@mock.patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call)
def test_rds_public_snapshot(self):
conn = client("rds", region_name=AWS_REGION)
conn.create_db_instance(
@@ -145,8 +175,15 @@ class Test_rds_snapshots_public_access:
result[0].status_extended,
)
assert result[0].resource_id == "snapshot-1"
assert result[0].region == AWS_REGION
assert (
result[0].resource_arn
== f"arn:aws:rds:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:snapshot:snapshot-1"
)
assert result[0].resource_tags == []
@mock_rds
@mock.patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call)
def test_rds_cluster_private_snapshot(self):
conn = client("rds", region_name=AWS_REGION)
conn.create_db_cluster(
@@ -188,8 +225,15 @@ class Test_rds_snapshots_public_access:
result[0].status_extended,
)
assert result[0].resource_id == "snapshot-1"
assert result[0].region == AWS_REGION
assert (
result[0].resource_arn
== f"arn:aws:rds:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:cluster-snapshot:snapshot-1"
)
assert result[0].resource_tags == []
@mock_rds
@mock.patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call)
def test_rds_cluster_public_snapshot(self):
conn = client("rds", region_name=AWS_REGION)
conn.create_db_cluster(
@@ -232,3 +276,9 @@ class Test_rds_snapshots_public_access:
result[0].status_extended,
)
assert result[0].resource_id == "snapshot-1"
assert result[0].region == AWS_REGION
assert (
result[0].resource_arn
== f"arn:aws:rds:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:cluster-snapshot:snapshot-1"
)
assert result[0].resource_tags == []

View File

@@ -154,11 +154,11 @@ class Test_SecretsManager_Service:
assert len(secretsmanager.secrets) == 1
assert secretsmanager.secrets
assert secretsmanager.secrets[secret_name]
assert secretsmanager.secrets[secret_name].name == secret_name
assert secretsmanager.secrets[secret_name].arn == secret_arn
assert secretsmanager.secrets[secret_name].region == AWS_REGION
assert secretsmanager.secrets[secret_name].rotation_enabled is True
assert secretsmanager.secrets[secret_name].tags == [
assert secretsmanager.secrets[secret_arn]
assert secretsmanager.secrets[secret_arn].name == secret_name
assert secretsmanager.secrets[secret_arn].arn == secret_arn
assert secretsmanager.secrets[secret_arn].region == AWS_REGION
assert secretsmanager.secrets[secret_arn].rotation_enabled is True
assert secretsmanager.secrets[secret_arn].tags == [
{"Key": "test", "Value": "test"},
]

View File

@@ -5,7 +5,7 @@ from prowler.providers.aws.services.securityhub.securityhub_service import (
)
class Test_accessanalyzer_enabled_without_findings:
class Test_securityhub_enabled:
def test_securityhub_hub_inactive(self):
securityhub_client = mock.MagicMock
securityhub_client.securityhubs = [

View File

@@ -53,6 +53,14 @@ def mock_make_api_call(self, operation_name, kwarg):
},
],
}
if operation_name == "DescribeInstanceInformation":
return {
"InstanceInformationList": [
{
"InstanceId": "test-instance-id",
},
],
}
return make_api_call(self, operation_name, kwarg)
@@ -132,7 +140,7 @@ class Test_SSM_Service:
profile_name=None,
botocore_session=None,
),
audited_account=None,
audited_account=DEFAULT_ACCOUNT_ID,
audited_user_id=None,
audited_partition="aws",
audited_identity_arn=None,
@@ -187,18 +195,19 @@ class Test_SSM_Service:
ssm = SSM(self.set_mocked_audit_info())
document_arn = f"arn:aws:ssm:{AWS_REGION}:{DEFAULT_ACCOUNT_ID}:document/{ssm_document_name}"
assert len(ssm.documents) == 1
assert ssm.documents
assert ssm.documents[ssm_document_name]
assert ssm.documents[ssm_document_name].name == ssm_document_name
assert ssm.documents[ssm_document_name].region == AWS_REGION
assert ssm.documents[ssm_document_name].tags == [
assert ssm.documents[document_arn]
assert ssm.documents[document_arn].arn == document_arn
assert ssm.documents[document_arn].name == ssm_document_name
assert ssm.documents[document_arn].region == AWS_REGION
assert ssm.documents[document_arn].tags == [
{"Key": "test", "Value": "test"},
]
assert ssm.documents[ssm_document_name].content == yaml.safe_load(
ssm_document_yaml
)
assert ssm.documents[ssm_document_name].account_owners == [DEFAULT_ACCOUNT_ID]
assert ssm.documents[document_arn].content == yaml.safe_load(ssm_document_yaml)
assert ssm.documents[document_arn].account_owners == [DEFAULT_ACCOUNT_ID]
@mock_ssm
def test__list_resource_compliance_summaries__(self):