feat(tags): add resource tags to S-W services (#2020)

This commit is contained in:
Sergio Garcia
2023-03-02 14:21:05 +01:00
committed by GitHub
parent 24e8286f35
commit 24711a2f39
56 changed files with 349 additions and 98 deletions

View File

@@ -10,6 +10,7 @@ class s3_bucket_acl_prohibited(Check):
report.region = bucket.region
report.resource_id = bucket.name
report.resource_arn = bucket.arn
report.resource_tags = bucket.tags
report.status = "FAIL"
report.status_extended = f"S3 Bucket {bucket.name} has bucket ACLs enabled."
if bucket.ownership:

View File

@@ -10,6 +10,7 @@ class s3_bucket_default_encryption(Check):
report.region = bucket.region
report.resource_id = bucket.name
report.resource_arn = bucket.arn
report.resource_tags = bucket.tags
if bucket.encryption:
report.status = "PASS"
report.status_extended = f"S3 Bucket {bucket.name} has Server Side Encryption with {bucket.encryption}."

View File

@@ -10,6 +10,7 @@ class s3_bucket_level_public_access_block(Check):
report.region = bucket.region
report.resource_id = bucket.name
report.resource_arn = bucket.arn
report.resource_tags = bucket.tags
report.status = "PASS"
report.status_extended = (
f"Block Public Access is configured for the S3 Bucket {bucket.name}."

View File

@@ -10,6 +10,7 @@ class s3_bucket_no_mfa_delete(Check):
report.region = bucket.region
report.resource_id = bucket.name
report.resource_arn = bucket.arn
report.resource_tags = bucket.tags
if bucket.mfa_delete:
report.status = "PASS"
report.status_extended = (

View File

@@ -10,6 +10,7 @@ class s3_bucket_object_versioning(Check):
report.region = bucket.region
report.resource_id = bucket.name
report.resource_arn = bucket.arn
report.resource_tags = bucket.tags
if bucket.versioning:
report.status = "PASS"
report.status_extended = (

View File

@@ -10,6 +10,7 @@ class s3_bucket_policy_public_write_access(Check):
report.region = bucket.region
report.resource_id = bucket.name
report.resource_arn = bucket.arn
report.resource_tags = bucket.tags
# Check if bucket policy allow public write access
if not bucket.policy:
report.status = "PASS"

View File

@@ -25,6 +25,7 @@ class s3_bucket_public_access(Check):
report.region = bucket.region
report.resource_id = bucket.name
report.resource_arn = bucket.arn
report.resource_tags = bucket.tags
report.status = "PASS"
report.status_extended = f"S3 Bucket {bucket.name} is not public."
if not (

View File

@@ -10,6 +10,7 @@ class s3_bucket_secure_transport_policy(Check):
report.region = bucket.region
report.resource_id = bucket.name
report.resource_arn = bucket.arn
report.resource_tags = bucket.tags
# Check if bucket policy enforces SSL
if not bucket.policy:
report.status = "FAIL"

View File

@@ -10,6 +10,7 @@ class s3_bucket_server_access_logging_enabled(Check):
report.region = bucket.region
report.resource_id = bucket.name
report.resource_arn = bucket.arn
report.resource_tags = bucket.tags
if bucket.logging:
report.status = "PASS"
report.status_extended = (

View File

@@ -1,8 +1,9 @@
import json
import threading
from dataclasses import dataclass
from typing import Optional
from botocore.client import ClientError
from pydantic import BaseModel
from prowler.lib.logger import logger
from prowler.lib.scan_filters.scan_filters import is_resource_filtered
@@ -27,6 +28,7 @@ class S3:
self.__threading_call__(self.__get_public_access_block__)
self.__threading_call__(self.__get_bucket_encryption__)
self.__threading_call__(self.__get_bucket_ownership_controls__)
self.__threading_call__(self.__get_bucket_tagging__)
def __get_session__(self):
return self.session
@@ -61,9 +63,15 @@ class S3:
# Check if there are filter regions
if audit_info.audited_regions:
if bucket_region in audit_info.audited_regions:
buckets.append(Bucket(bucket["Name"], arn, bucket_region))
buckets.append(
Bucket(
name=bucket["Name"], arn=arn, region=bucket_region
)
)
else:
buckets.append(Bucket(bucket["Name"], arn, bucket_region))
buckets.append(
Bucket(name=bucket["Name"], arn=arn, region=bucket_region)
)
except ClientError as error:
if error.response["Error"]["Code"] == "NoSuchBucket":
logger.warning(
@@ -72,7 +80,7 @@ class S3:
except Exception as error:
if bucket:
logger.error(
f"{bucket} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
f"{bucket['Name']} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
else:
logger.error(
@@ -150,21 +158,23 @@ class S3:
logger.info("S3 - Get buckets public access block...")
try:
regional_client = self.regional_clients[bucket.region]
public_access_block = regional_client.get_public_access_block(
Bucket=bucket.name
)["PublicAccessBlockConfiguration"]
bucket.public_access_block = PublicAccessBlock(
regional_client.get_public_access_block(Bucket=bucket.name)[
"PublicAccessBlockConfiguration"
]
block_public_acls=public_access_block["BlockPublicAcls"],
ignore_public_acls=public_access_block["IgnorePublicAcls"],
block_public_policy=public_access_block["BlockPublicPolicy"],
restrict_public_buckets=public_access_block["RestrictPublicBuckets"],
)
except Exception as error:
if "NoSuchPublicAccessBlockConfiguration" in str(error):
# Set all block as False
bucket.public_access_block = PublicAccessBlock(
{
"BlockPublicAcls": False,
"IgnorePublicAcls": False,
"BlockPublicPolicy": False,
"RestrictPublicBuckets": False,
}
block_public_acls=False,
ignore_public_acls=False,
block_public_policy=False,
restrict_public_buckets=False,
)
else:
if regional_client:
@@ -244,6 +254,27 @@ class S3:
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def __get_bucket_tagging__(self, bucket):
logger.info("S3 - Get buckets logging...")
try:
regional_client = self.regional_clients[bucket.region]
bucket_tags = regional_client.get_bucket_tagging(Bucket=bucket.name)[
"TagSet"
]
bucket.tags = bucket_tags
except ClientError as e:
if e.response["Error"]["Code"] == "NoSuchTagSet":
bucket.tags = []
except Exception as error:
if regional_client:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
else:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
################## S3Control
class S3Control:
@@ -265,90 +296,55 @@ class S3Control:
def __get_public_access_block__(self):
logger.info("S3 - Get account public access block...")
try:
public_access_block = self.client.get_public_access_block(
AccountId=self.audited_account
)["PublicAccessBlockConfiguration"]
return PublicAccessBlock(
self.client.get_public_access_block(AccountId=self.audited_account)[
"PublicAccessBlockConfiguration"
]
block_public_acls=public_access_block["BlockPublicAcls"],
ignore_public_acls=public_access_block["IgnorePublicAcls"],
block_public_policy=public_access_block["BlockPublicPolicy"],
restrict_public_buckets=public_access_block["RestrictPublicBuckets"],
)
except Exception as error:
if "NoSuchPublicAccessBlockConfiguration" in str(error):
# Set all block as False
return PublicAccessBlock(
{
"BlockPublicAcls": False,
"IgnorePublicAcls": False,
"BlockPublicPolicy": False,
"RestrictPublicBuckets": False,
}
block_public_acls=False,
ignore_public_acls=False,
block_public_policy=False,
restrict_public_buckets=False,
)
logger.error(
f"{self.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
@dataclass
class ACL_Grantee:
display_name: str
ID: str
class ACL_Grantee(BaseModel):
display_name: Optional[str]
ID: Optional[str]
type: str
URI: str
permission: str
def __init__(self, type):
self.display_name = None
self.ID = None
self.type = type
self.URI = None
self.permission = None
URI: Optional[str]
permission: Optional[str]
@dataclass
class PublicAccessBlock:
class PublicAccessBlock(BaseModel):
block_public_acls: bool
ignore_public_acls: bool
block_public_policy: bool
restrict_public_buckets: bool
def __init__(self, configuration):
self.block_public_acls = configuration["BlockPublicAcls"]
self.ignore_public_acls = configuration["IgnorePublicAcls"]
self.block_public_policy = configuration["BlockPublicPolicy"]
self.restrict_public_buckets = configuration["RestrictPublicBuckets"]
@dataclass
class Bucket:
class Bucket(BaseModel):
name: str
arn: str
versioning: bool
logging: bool
public_access_block: PublicAccessBlock
acl_grantees: list[ACL_Grantee]
policy: dict
encryption: str
versioning: bool = False
logging: bool = False
public_access_block: Optional[PublicAccessBlock]
acl_grantees: list[ACL_Grantee] = []
policy: dict = {}
encryption: Optional[str]
region: str
logging_target_bucket: str
ownership: str
mfa_delete: bool
def __init__(self, name, arn, region):
self.name = name
self.arn = arn
self.versioning = False
self.logging = False
# Set all block as False
self.public_access_block = PublicAccessBlock(
{
"BlockPublicAcls": False,
"IgnorePublicAcls": False,
"BlockPublicPolicy": False,
"RestrictPublicBuckets": False,
}
)
self.acl_grantees = []
self.policy = {}
self.encryption = None
self.region = region
self.logging_target_bucket = None
self.ownership = None
self.mfa_delete = False
logging_target_bucket: Optional[str]
ownership: Optional[str]
mfa_delete: bool = False
tags: Optional[list] = []

View File

@@ -10,6 +10,7 @@ class sagemaker_models_network_isolation_enabled(Check):
report.region = model.region
report.resource_id = model.name
report.resource_arn = model.arn
report.resource_tags = model.tags
report.status = "PASS"
report.status_extended = f"Sagemaker notebook instance {model.name} has network isolation enabled"
if not model.network_isolation:

View File

@@ -10,6 +10,7 @@ class sagemaker_models_vpc_settings_configured(Check):
report.region = model.region
report.resource_id = model.name
report.resource_arn = model.arn
report.resource_tags = model.tags
report.status = "PASS"
report.status_extended = (
f"Sagemaker notebook instance {model.name} has VPC settings enabled"

View File

@@ -10,6 +10,7 @@ class sagemaker_notebook_instance_encryption_enabled(Check):
report.region = notebook_instance.region
report.resource_id = notebook_instance.name
report.resource_arn = notebook_instance.arn
report.resource_tags = notebook_instance.tags
report.status = "PASS"
report.status_extended = f"Sagemaker notebook instance {notebook_instance.name} has data encryption enabled"
if not notebook_instance.kms_key_id:

View File

@@ -10,6 +10,7 @@ class sagemaker_notebook_instance_root_access_disabled(Check):
report.region = notebook_instance.region
report.resource_id = notebook_instance.name
report.resource_arn = notebook_instance.arn
report.resource_tags = notebook_instance.tags
report.status = "PASS"
report.status_extended = f"Sagemaker notebook instance {notebook_instance.name} has root access disabled"
if notebook_instance.root_access:

View File

@@ -10,6 +10,7 @@ class sagemaker_notebook_instance_vpc_settings_configured(Check):
report.region = notebook_instance.region
report.resource_id = notebook_instance.name
report.resource_arn = notebook_instance.arn
report.resource_tags = notebook_instance.tags
report.status = "PASS"
report.status_extended = (
f"Sagemaker notebook instance {notebook_instance.name} is in a VPC"

View File

@@ -10,6 +10,7 @@ class sagemaker_notebook_instance_without_direct_internet_access_configured(Chec
report.region = notebook_instance.region
report.resource_id = notebook_instance.name
report.resource_arn = notebook_instance.arn
report.resource_tags = notebook_instance.tags
report.status = "PASS"
report.status_extended = f"Sagemaker notebook instance {notebook_instance.name} has direct internet access disabled"
if notebook_instance.direct_internet_access:

View File

@@ -1,4 +1,5 @@
import threading
from typing import Optional
from pydantic import BaseModel
@@ -23,6 +24,7 @@ class SageMaker:
self.__describe_model__(self.regional_clients)
self.__describe_notebook_instance__(self.regional_clients)
self.__describe_training_job__(self.regional_clients)
self.__list_tags_for_resource__()
def __get_session__(self):
return self.session
@@ -190,6 +192,36 @@ class SageMaker:
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def __list_tags_for_resource__(self):
logger.info("SageMaker - List Tags...")
try:
for model in self.sagemaker_models:
regional_client = self.regional_clients[model.region]
response = regional_client.list_tags(ResourceArn=model.arn)["Tags"]
model.tags = response
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
try:
for instance in self.sagemaker_notebook_instances:
regional_client = self.regional_clients[instance.region]
response = regional_client.list_tags(ResourceArn=instance.arn)["Tags"]
instance.tags = response
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
try:
for job in self.sagemaker_training_jobs:
regional_client = self.regional_clients[job.region]
response = regional_client.list_tags(ResourceArn=job.arn)["Tags"]
job.tags = response
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
class NotebookInstance(BaseModel):
name: str
@@ -199,6 +231,7 @@ class NotebookInstance(BaseModel):
subnet_id: str = None
direct_internet_access: bool = None
kms_key_id: str = None
tags: Optional[list] = []
class Model(BaseModel):
@@ -207,6 +240,7 @@ class Model(BaseModel):
arn: str
network_isolation: bool = None
vpc_config_subnets: list[str] = []
tags: Optional[list] = []
class TrainingJob(BaseModel):
@@ -217,3 +251,4 @@ class TrainingJob(BaseModel):
volume_kms_key_id: str = None
network_isolation: bool = None
vpc_config_subnets: list[str] = []
tags: Optional[list] = []

View File

@@ -10,6 +10,7 @@ class sagemaker_training_jobs_intercontainer_encryption_enabled(Check):
report.region = training_job.region
report.resource_id = training_job.name
report.resource_arn = training_job.arn
report.resource_tags = training_job.tags
report.status = "PASS"
report.status_extended = f"Sagemaker training job {training_job.name} has intercontainer encryption enabled"
if not training_job.container_traffic_encryption:

View File

@@ -10,6 +10,7 @@ class sagemaker_training_jobs_network_isolation_enabled(Check):
report.region = training_job.region
report.resource_id = training_job.name
report.resource_arn = training_job.arn
report.resource_tags = training_job.tags
report.status = "PASS"
report.status_extended = f"Sagemaker training job {training_job.name} has network isolation enabled"
if not training_job.network_isolation:

View File

@@ -10,6 +10,7 @@ class sagemaker_training_jobs_volume_and_output_encryption_enabled(Check):
report.region = training_job.region
report.resource_id = training_job.name
report.resource_arn = training_job.arn
report.resource_tags = training_job.tags
report.status = "PASS"
report.status_extended = (
f"Sagemaker training job {training_job.name} has KMS encryption enabled"

View File

@@ -10,6 +10,7 @@ class sagemaker_training_jobs_vpc_settings_configured(Check):
report.region = training_job.region
report.resource_id = training_job.name
report.resource_arn = training_job.arn
report.resource_tags = training_job.tags
report.status = "PASS"
report.status_extended = f"Sagemaker training job {training_job.name} has VPC settings for the training job volume and output enabled"
if not training_job.vpc_config_subnets:

View File

@@ -12,7 +12,7 @@ class secretsmanager_automatic_rotation_enabled(Check):
report.region = secret.region
report.resource_id = secret.name
report.resource_arn = secret.arn
report.resource_tags = secret.tags
if secret.rotation_enabled:
report.status = "PASS"
report.status_extended = (

View File

@@ -1,4 +1,5 @@
import threading
from typing import Optional
from pydantic import BaseModel
@@ -43,6 +44,7 @@ class SecretsManager:
arn=secret["ARN"],
name=secret["Name"],
region=regional_client.region,
tags=secret.get("Tags"),
)
if "RotationEnabled" in secret:
self.secrets[secret["Name"]].rotation_enabled = secret[
@@ -62,3 +64,4 @@ class Secret(BaseModel):
name: str
region: str
rotation_enabled: bool = False
tags: Optional[list] = []

View File

@@ -12,6 +12,7 @@ class shield_advanced_protection_in_associated_elastic_ips(Check):
report.region = shield_client.region
report.resource_id = elastic_ip.allocation_id
report.resource_arn = elastic_ip.arn
report.resource_tags = elastic_ip.tags
report.status = "FAIL"
report.status_extended = f"Elastic IP {elastic_ip.allocation_id} is not protected by AWS Shield Advanced"

View File

@@ -12,6 +12,7 @@ class shield_advanced_protection_in_classic_load_balancers(Check):
report.region = shield_client.region
report.resource_id = elb.name
report.resource_arn = elb.arn
report.resource_tags = elb.tags
report.status = "FAIL"
report.status_extended = (
f"ELB {elb.name} is not protected by AWS Shield Advanced"

View File

@@ -14,6 +14,7 @@ class shield_advanced_protection_in_cloudfront_distributions(Check):
report.region = shield_client.region
report.resource_id = distribution.id
report.resource_arn = distribution.arn
report.resource_tags = distribution.tags
report.status = "FAIL"
report.status_extended = f"CloudFront distribution {distribution.id} is not protected by AWS Shield Advanced"

View File

@@ -13,6 +13,7 @@ class shield_advanced_protection_in_internet_facing_load_balancers(Check):
report.region = shield_client.region
report.resource_id = elbv2.name
report.resource_arn = elbv2.arn
report.resource_tags = elbv2.tags
report.status = "FAIL"
report.status_extended = f"ELBv2 ALB {elbv2.name} is not protected by AWS Shield Advanced"

View File

@@ -12,6 +12,7 @@ class shield_advanced_protection_in_route53_hosted_zones(Check):
report.region = shield_client.region
report.resource_id = hosted_zone.id
report.resource_arn = hosted_zone.arn
report.resource_tags = hosted_zone.tags
report.status = "FAIL"
report.status_extended = f"Route53 Hosted Zone {hosted_zone.id} is not protected by AWS Shield Advanced"

View File

@@ -1,5 +1,6 @@
import threading
from json import loads
from typing import Optional
from pydantic import BaseModel
@@ -18,6 +19,7 @@ class SNS:
self.topics = []
self.__threading_call__(self.__list_topics__)
self.__get_topic_attributes__(self.regional_clients)
self.__list_tags_for_resource__()
def __get_session__(self):
return self.session
@@ -73,6 +75,20 @@ class SNS:
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def __list_tags_for_resource__(self):
logger.info("SNS - List Tags...")
try:
for topic in self.topics:
regional_client = self.regional_clients[topic.region]
response = regional_client.list_tags_for_resource(
ResourceArn=topic.arn
)["Tags"]
topic.tags = response
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
class Topic(BaseModel):
name: str
@@ -80,3 +96,4 @@ class Topic(BaseModel):
region: str
policy: dict = None
kms_master_key_id: str = None
tags: Optional[list] = []

View File

@@ -10,6 +10,7 @@ class sns_topics_kms_encryption_at_rest_enabled(Check):
report.region = topic.region
report.resource_id = topic.name
report.resource_arn = topic.arn
report.resource_tags = topic.tags
report.status = "PASS"
report.status_extended = f"SNS topic {topic.arn} is encrypted"
if not topic.kms_master_key_id:

View File

@@ -10,6 +10,7 @@ class sns_topics_not_publicly_accessible(Check):
report.region = topic.region
report.resource_id = topic.name
report.resource_arn = topic.arn
report.resource_tags = topic.tags
report.status = "PASS"
report.status_extended = f"SNS topic {topic.name} without public access"
if topic.policy:

View File

@@ -10,6 +10,7 @@ class sqs_queues_not_publicly_accessible(Check):
report.region = queue.region
report.resource_id = queue.id
report.resource_arn = queue.arn
report.resource_tags = queue.tags
report.status = "PASS"
report.status_extended = f"SQS queue {queue.id} is not public"
if queue.policy:

View File

@@ -10,6 +10,7 @@ class sqs_queues_server_side_encryption_enabled(Check):
report.region = queue.region
report.resource_id = queue.id
report.resource_arn = queue.arn
report.resource_tags = queue.tags
report.status = "PASS"
report.status_extended = (
f"SQS queue {queue.id} is using Server Side Encryption"

View File

@@ -1,5 +1,6 @@
import threading
from json import loads
from typing import Optional
from pydantic import BaseModel
@@ -18,6 +19,7 @@ class SQS:
self.queues = []
self.__threading_call__(self.__list_queues__)
self.__get_queue_attributes__(self.regional_clients)
self.__list_queue_tags__()
def __get_session__(self):
return self.session
@@ -79,6 +81,20 @@ class SQS:
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def __list_queue_tags__(self):
logger.info("SQS - List Tags...")
try:
for queue in self.queues:
regional_client = self.regional_clients[queue.region]
response = regional_client.list_queue_tags(QueueUrl=queue.id).get(
"Tags"
)
queue.tags = [response]
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
class Queue(BaseModel):
id: str
@@ -86,3 +102,4 @@ class Queue(BaseModel):
region: str
policy: dict = None
kms_key_id: str = None
tags: Optional[list] = []

View File

@@ -15,9 +15,9 @@ class ssm_document_secrets(Check):
for document in ssm_client.documents.values():
report = Check_Report_AWS(self.metadata())
report.region = document.region
report.resource_arn = f"arn:aws:ssm:{document.region}:{ssm_client.audited_account}:document/{document.name}"
report.resource_arn = document.arn
report.resource_id = document.name
report.resource_tags = document.tags
report.status = "PASS"
report.status_extended = f"No secrets found in SSM Document {document.name}"

View File

@@ -8,9 +8,9 @@ class ssm_documents_set_as_public(Check):
for document in ssm_client.documents.values():
report = Check_Report_AWS(self.metadata())
report.region = document.region
report.resource_arn = f"arn:aws:ssm:{document.region}:{ssm_client.audited_account}:document/{document.name}"
report.resource_arn = document.arn
report.resource_id = document.name
report.resource_tags = document.tags
if document.account_owners:
report.status = "FAIL"
report.status_extended = f"SSM Document {document.name} is public"

View File

@@ -9,7 +9,6 @@ class ssm_managed_compliant_patching(Check):
for resource in ssm_client.compliance_resources.values():
report = Check_Report_AWS(self.metadata())
report.region = resource.region
report.resource_arn = f"arn:aws:ec2:{resource.region}:{ssm_client.audited_account}:instance/{resource.id}"
report.resource_id = resource.id
if resource.status == ResourceStatus.COMPLIANT:

View File

@@ -1,6 +1,7 @@
import json
import threading
from enum import Enum
from typing import Optional
from pydantic import BaseModel
@@ -15,6 +16,7 @@ class SSM:
self.service = "ssm"
self.session = audit_info.audit_session
self.audited_account = audit_info.audited_account
self.audited_partition = audit_info.audited_partition
self.audit_resources = audit_info.audit_resources
self.regional_clients = generate_regional_clients(self.service, audit_info)
self.documents = {}
@@ -59,10 +61,12 @@ class SSM:
is_resource_filtered(document["Name"], self.audit_resources)
):
document_name = document["Name"]
document_arn = f"arn:{self.audited_partition}:ssm:{regional_client.region}:{self.audited_account}:document/{document_name}"
self.documents[document_name] = Document(
arn=document_arn,
name=document_name,
region=regional_client.region,
tags=document.get("Tags"),
)
except Exception as error:
@@ -141,8 +145,9 @@ class SSM:
for page in describe_instance_information_paginator.paginate():
for item in page["InstanceInformationList"]:
resource_id = item["InstanceId"]
resource_arn = f"arn:{self.audited_partition}:ec2:{regional_client.region}:{self.audited_account}:instance/{resource_id}"
self.managed_instances[resource_id] = ManagedInstance(
arn=resource_arn,
id=resource_id,
region=regional_client.region,
)
@@ -167,12 +172,15 @@ class ComplianceResource(BaseModel):
class Document(BaseModel):
arn: str
name: str
region: str
content: dict = None
account_owners: list[str] = None
tags: Optional[list] = []
class ManagedInstance(BaseModel):
arn: str
id: str
region: str

View File

@@ -18,6 +18,7 @@ class vpc_endpoint_connections_trust_boundaries(Check):
report.status = "FAIL"
report.status_extended = f"VPC Endpoint {endpoint.id} in VPC {endpoint.vpc_id} has full access."
report.resource_id = endpoint.id
report.resource_tags = endpoint.tags
findings.append(report)
break
@@ -33,6 +34,7 @@ class vpc_endpoint_connections_trust_boundaries(Check):
report.status = "FAIL"
report.status_extended = f"VPC Endpoint {endpoint.id} in VPC {endpoint.vpc_id} has full access."
report.resource_id = endpoint.id
report.resource_tags = endpoint.tags
else:
account_id = principal_arn.split(":")[4]
if (
@@ -42,10 +44,12 @@ class vpc_endpoint_connections_trust_boundaries(Check):
report.status = "PASS"
report.status_extended = f"Found trusted account {account_id} in VPC Endpoint {endpoint.id} in VPC {endpoint.vpc_id}."
report.resource_id = endpoint.id
report.resource_tags = endpoint.tags
else:
report.status = "FAIL"
report.status_extended = f"Found untrusted account {account_id} in VPC Endpoint {endpoint.id} in VPC {endpoint.vpc_id}."
report.resource_id = endpoint.id
report.resource_tags = endpoint.tags
findings.append(report)
return findings

View File

@@ -17,6 +17,7 @@ class vpc_endpoint_services_allowed_principals_trust_boundaries(Check):
f"VPC Endpoint Service {service.id} has no allowed principals."
)
report.resource_id = service.id
report.resource_tags = service.tags
findings.append(report)
else:
for principal in service.allowed_principals:
@@ -30,10 +31,12 @@ class vpc_endpoint_services_allowed_principals_trust_boundaries(Check):
report.status = "PASS"
report.status_extended = f"Found trusted account {account_id} in VPC Endpoint Service {service.id}."
report.resource_id = service.id
report.resource_tags = service.tags
else:
report.status = "FAIL"
report.status_extended = f"Found untrusted account {account_id} in VPC Endpoint Service {service.id}."
report.resource_id = service.id
report.resource_tags = service.tags
findings.append(report)
return findings

View File

@@ -8,6 +8,7 @@ class vpc_flow_logs_enabled(Check):
for vpc in vpc_client.vpcs:
report = Check_Report_AWS(self.metadata())
report.region = vpc.region
report.resource_tags = vpc.tags
if vpc.flow_log:
report.status = "PASS"
report.status_extended = f"VPC {vpc.id} Flow logs are enabled."

View File

@@ -8,6 +8,7 @@ class vpc_peering_routing_tables_with_least_privilege(Check):
for peer in vpc_client.vpc_peering_connections:
report = Check_Report_AWS(self.metadata())
report.region = peer.region
report.resource_tags = peer.tags
comply = True
# Check each cidr in the peering route table
for route_table in peer.route_tables:

View File

@@ -56,6 +56,7 @@ class VPC:
default=vpc["IsDefault"],
cidr_block=vpc["CidrBlock"],
region=regional_client.region,
tags=vpc.get("Tags"),
)
)
except Exception as error:
@@ -87,6 +88,7 @@ class VPC:
"CidrBlock"
),
region=regional_client.region,
tags=conn.get("Tags"),
)
)
except Exception as error:
@@ -171,6 +173,7 @@ class VPC:
policy_document=endpoint_policy,
owner_id=endpoint["OwnerId"],
region=regional_client.region,
tags=endpoint.get("Tags"),
)
)
except Exception as error:
@@ -198,6 +201,7 @@ class VPC:
service=endpoint["ServiceName"],
owner_id=endpoint["Owner"],
region=regional_client.region,
tags=endpoint.get("Tags"),
)
)
except Exception as error:
@@ -228,6 +232,7 @@ class VPCs(BaseModel):
cidr_block: str
flow_log: bool = False
region: str
tags: Optional[list] = []
class Route(BaseModel):
@@ -243,6 +248,7 @@ class VpcPeeringConnection(BaseModel):
requester_cidr: Optional[str]
route_tables: list[Route] = []
region: str
tags: Optional[list] = []
class VpcEndpoint(BaseModel):
@@ -252,6 +258,7 @@ class VpcEndpoint(BaseModel):
policy_document: Optional[dict]
owner_id: str
region: str
tags: Optional[list] = []
class VpcEndpointService(BaseModel):
@@ -260,3 +267,4 @@ class VpcEndpointService(BaseModel):
owner_id: str
allowed_principals: list = []
region: str
tags: Optional[list] = []

View File

@@ -1,4 +1,5 @@
import threading
from typing import Optional
from pydantic import BaseModel
@@ -16,6 +17,7 @@ class WorkSpaces:
self.regional_clients = generate_regional_clients(self.service, audit_info)
self.workspaces = []
self.__threading_call__(self.__describe_workspaces__)
self.__describe_tags__()
def __get_session__(self):
return self.session
@@ -62,6 +64,20 @@ class WorkSpaces:
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def __describe_tags__(self):
logger.info("Workspaces - List Tags...")
try:
for workspace in self.workspaces:
regional_client = self.regional_clients[workspace.region]
response = regional_client.describe_tags(ResourceId=workspace.id)[
"TagList"
]
workspace.tags = response
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
class WorkSpace(BaseModel):
id: str
@@ -69,3 +85,4 @@ class WorkSpace(BaseModel):
region: str
user_volume_encryption_enabled: bool = None
root_volume_encryption_enabled: bool = None
tags: Optional[list] = []

View File

@@ -12,6 +12,7 @@ class workspaces_volume_encryption_enabled(Check):
report.region = workspace.region
report.resource_id = workspace.id
report.resource_arn = workspace.arn
report.resource_tags = workspace.tags
report.status = "PASS"
report.status_extended = f"WorkSpaces workspace {workspace.id} without root or user unencrypted volumes"
if not workspace.user_volume_encryption_enabled:

View File

@@ -333,6 +333,31 @@ class Test_S3_Service:
assert s3.buckets[0].public_access_block.block_public_policy
assert s3.buckets[0].public_access_block.restrict_public_buckets
# Test S3 Get Bucket Tagging
@mock_s3
def test__get_bucket_tagging__(self):
# Generate S3 Client
s3_client = client("s3")
# Create S3 Bucket
bucket_name = "test-bucket"
s3_client.create_bucket(Bucket=bucket_name)
s3_client.put_bucket_tagging(
Bucket=bucket_name,
Tagging={
"TagSet": [
{"Key": "test", "Value": "test"},
]
},
)
# S3 client for this test class
audit_info = self.set_mocked_audit_info()
s3 = S3(audit_info)
assert len(s3.buckets) == 1
assert s3.buckets[0].tags == [
{"Key": "test", "Value": "test"},
]
# Test S3 Control Account Get Public Access Block
@mock_s3control
def test__get_public_access_block__s3_control(self):

View File

@@ -83,6 +83,12 @@ def mock_make_api_call(self, operation_name, kwarg):
"EnableNetworkIsolation": True,
"EnableInterContainerTrafficEncryption": True,
}
if operation_name == "ListTags":
return {
"Tags": [
{"Key": "test", "Value": "test"},
],
}
return make_api_call(self, operation_name, kwarg)
@@ -148,6 +154,9 @@ class Test_SageMaker_Service:
assert sagemaker.sagemaker_notebook_instances[0].name == test_notebook_instance
assert sagemaker.sagemaker_notebook_instances[0].arn == notebook_instance_arn
assert sagemaker.sagemaker_notebook_instances[0].region == AWS_REGION
assert sagemaker.sagemaker_notebook_instances[0].tags == [
{"Key": "test", "Value": "test"},
]
# Test SageMaker list models
def test_list_models(self):
@@ -157,6 +166,9 @@ class Test_SageMaker_Service:
assert sagemaker.sagemaker_models[0].name == test_model
assert sagemaker.sagemaker_models[0].arn == test_arn_model
assert sagemaker.sagemaker_models[0].region == AWS_REGION
assert sagemaker.sagemaker_models[0].tags == [
{"Key": "test", "Value": "test"},
]
# Test SageMaker list training jobs
def test_list_training_jobs(self):
@@ -166,6 +178,9 @@ class Test_SageMaker_Service:
assert sagemaker.sagemaker_training_jobs[0].name == test_training_job
assert sagemaker.sagemaker_training_jobs[0].arn == test_arn_training_job
assert sagemaker.sagemaker_training_jobs[0].region == AWS_REGION
assert sagemaker.sagemaker_training_jobs[0].tags == [
{"Key": "test", "Value": "test"},
]
# Test SageMaker describe notebook instance
def test_describe_notebook_instance(self):

View File

@@ -83,7 +83,11 @@ class Test_SecretsManager_Service:
secretsmanager_client = client("secretsmanager", region_name=AWS_REGION)
# Create Secret
resp = secretsmanager_client.create_secret(
Name="test-secret", SecretString="test-secret"
Name="test-secret",
SecretString="test-secret",
Tags=[
{"Key": "test", "Value": "test"},
],
)
secret_arn = resp["ARN"]
secret_name = resp["Name"]
@@ -155,3 +159,6 @@ class Test_SecretsManager_Service:
assert secretsmanager.secrets[secret_name].arn == secret_arn
assert secretsmanager.secrets[secret_name].region == AWS_REGION
assert secretsmanager.secrets[secret_name].rotation_enabled is True
assert secretsmanager.secrets[secret_name].tags == [
{"Key": "test", "Value": "test"},
]

View File

@@ -94,7 +94,12 @@ class Test_SNS_Service:
# Test SNS session
def test__list_topics__(self):
sns_client = client("sns", region_name=AWS_REGION)
sns_client.create_topic(Name=topic_name)
sns_client.create_topic(
Name=topic_name,
Tags=[
{"Key": "test", "Value": "test"},
],
)
audit_info = self.set_mocked_audit_info()
sns = SNS(audit_info)
@@ -106,6 +111,9 @@ class Test_SNS_Service:
== f"arn:aws:sns:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:{topic_name}"
)
assert sns.topics[0].region == AWS_REGION
assert sns.topics[0].tags == [
{"Key": "test", "Value": "test"},
]
@mock_sns
# Test SNS session

View File

@@ -96,12 +96,13 @@ class Test_SQS_Service:
# Test SQS list queues
def test__list_queues__(self):
sqs_client = client("sqs", region_name=AWS_REGION)
queue = sqs_client.create_queue(QueueName=test_queue)
queue = sqs_client.create_queue(QueueName=test_queue, tags={"test": "test"})
audit_info = self.set_mocked_audit_info()
sqs = SQS(audit_info)
assert len(sqs.queues) == 1
assert sqs.queues[0].id == queue["QueueUrl"]
assert sqs.queues[0].region == AWS_REGION
assert sqs.queues[0].tags == [{"test": "test"}]
@mock_sqs
# Test SQS list queues

View File

@@ -34,6 +34,7 @@ class Test_ssm_documents_secrets:
ssm_client.audited_account = DEFAULT_ACCOUNT_ID
ssm_client.documents = {
document_name: Document(
arn=document_arn,
name=document_name,
region=AWS_REGION,
content={"db_password": "test-password"},
@@ -71,6 +72,7 @@ class Test_ssm_documents_secrets:
ssm_client.audited_account = DEFAULT_ACCOUNT_ID
ssm_client.documents = {
document_name: Document(
arn=document_arn,
name=document_name,
region=AWS_REGION,
content={"profile": "test"},

View File

@@ -34,6 +34,7 @@ class Test_ssm_documents_set_as_public:
ssm_client.audited_account = DEFAULT_ACCOUNT_ID
ssm_client.documents = {
document_name: Document(
arn=document_arn,
name=document_name,
region=AWS_REGION,
content="",
@@ -70,6 +71,7 @@ class Test_ssm_documents_set_as_public:
ssm_client.audited_account = DEFAULT_ACCOUNT_ID
ssm_client.documents = {
document_name: Document(
arn=document_arn,
name=document_name,
region=AWS_REGION,
content="",

View File

@@ -31,9 +31,6 @@ class Test_ssm_managed_compliant_patching:
def test_compliance_resources_compliant(self):
ssm_client = mock.MagicMock
instance_id = "i-1234567890abcdef0"
instance_arn = (
f"arn:aws:ec2:{AWS_REGION}:{DEFAULT_ACCOUNT_ID}:instance/{instance_id}"
)
ssm_client.audited_account = DEFAULT_ACCOUNT_ID
ssm_client.compliance_resources = {
instance_id: ComplianceResource(
@@ -58,7 +55,6 @@ class Test_ssm_managed_compliant_patching:
assert len(result) == 1
assert result[0].region == AWS_REGION
assert result[0].resource_id == instance_id
assert result[0].resource_arn == instance_arn
assert result[0].status == "PASS"
assert (
result[0].status_extended
@@ -68,9 +64,6 @@ class Test_ssm_managed_compliant_patching:
def test_compliance_resources_non_compliant(self):
ssm_client = mock.MagicMock
instance_id = "i-1234567890abcdef0"
instance_arn = (
f"arn:aws:ec2:{AWS_REGION}:{DEFAULT_ACCOUNT_ID}:instance/{instance_id}"
)
ssm_client.audited_account = DEFAULT_ACCOUNT_ID
ssm_client.compliance_resources = {
instance_id: ComplianceResource(
@@ -95,7 +88,6 @@ class Test_ssm_managed_compliant_patching:
assert len(result) == 1
assert result[0].region == AWS_REGION
assert result[0].resource_id == instance_id
assert result[0].resource_arn == instance_arn
assert result[0].status == "FAIL"
assert (
result[0].status_extended

View File

@@ -174,6 +174,9 @@ class Test_SSM_Service:
Name=ssm_document_name,
DocumentType="Command",
DocumentFormat="YAML",
Tags=[
{"Key": "test", "Value": "test"},
],
)
# Add permissions
ssm_client.modify_document_permission(
@@ -189,6 +192,9 @@ class Test_SSM_Service:
assert ssm.documents[ssm_document_name]
assert ssm.documents[ssm_document_name].name == ssm_document_name
assert ssm.documents[ssm_document_name].region == AWS_REGION
assert ssm.documents[ssm_document_name].tags == [
{"Key": "test", "Value": "test"},
]
assert ssm.documents[ssm_document_name].content == yaml.safe_load(
ssm_document_yaml
)

View File

@@ -73,13 +73,28 @@ class Test_VPC_Service:
# Generate VPC Client
ec2_client = client("ec2", region_name=AWS_REGION)
# Create VPC
vpc = ec2_client.create_vpc(CidrBlock="10.0.0.0/16")["Vpc"]
vpc = ec2_client.create_vpc(
CidrBlock="10.0.0.0/16",
TagSpecifications=[
{
"ResourceType": "vpc",
"Tags": [
{"Key": "test", "Value": "test"},
],
},
],
)["Vpc"]
# VPC client for this test class
audit_info = self.set_mocked_audit_info()
vpc = VPC(audit_info)
assert (
len(vpc.vpcs) == 3
) # Number of AWS regions + created VPC, one default VPC per region
for vpc in vpc.vpcs:
if vpc.cidr_block == "10.0.0.0/16":
assert vpc.tags == [
{"Key": "test", "Value": "test"},
]
# Test VPC Describe Flow Logs
@mock_ec2
@@ -115,7 +130,16 @@ class Test_VPC_Service:
vpc = ec2_client.create_vpc(CidrBlock="10.0.0.0/16")
peer_vpc = ec2_client.create_vpc(CidrBlock="11.0.0.0/16")
vpc_pcx = ec2_client.create_vpc_peering_connection(
VpcId=vpc["Vpc"]["VpcId"], PeerVpcId=peer_vpc["Vpc"]["VpcId"]
VpcId=vpc["Vpc"]["VpcId"],
PeerVpcId=peer_vpc["Vpc"]["VpcId"],
TagSpecifications=[
{
"ResourceType": "vpc-peering-connection",
"Tags": [
{"Key": "test", "Value": "test"},
],
},
],
)
vpc_pcx_id = vpc_pcx["VpcPeeringConnection"]["VpcPeeringConnectionId"]
@@ -127,6 +151,9 @@ class Test_VPC_Service:
vpc = VPC(audit_info)
assert len(vpc.vpc_peering_connections) == 1
assert vpc.vpc_peering_connections[0].id == vpc_pcx_id
assert vpc.vpc_peering_connections[0].tags == [
{"Key": "test", "Value": "test"},
]
# Test VPC Describe VPC Peering connections
@mock_ec2
@@ -196,12 +223,23 @@ class Test_VPC_Service:
]
}
),
TagSpecifications=[
{
"ResourceType": "vpc-endpoint",
"Tags": [
{"Key": "test", "Value": "test"},
],
},
],
)["VpcEndpoint"]["VpcEndpointId"]
# VPC client for this test class
audit_info = self.set_mocked_audit_info()
vpc = VPC(audit_info)
assert len(vpc.vpc_endpoints) == 1
assert vpc.vpc_endpoints[0].id == endpoint
assert vpc.vpc_endpoints[0].tags == [
{"Key": "test", "Value": "test"},
]
# Test VPC Describe VPC Endpoint Services
@mock_ec2
@@ -228,7 +266,15 @@ class Test_VPC_Service:
)["LoadBalancers"][0]["LoadBalancerArn"]
_ = ec2_client.create_vpc_endpoint_service_configuration(
NetworkLoadBalancerArns=[lb_arn]
NetworkLoadBalancerArns=[lb_arn],
TagSpecifications=[
{
"ResourceType": "vpc-endpoint-service-configuration",
"Tags": [
{"Key": "test", "Value": "test"},
],
},
],
)
# VPC client for this test class
audit_info = self.set_mocked_audit_info()

View File

@@ -27,6 +27,12 @@ def mock_make_api_call(self, operation_name, kwarg):
},
],
}
if operation_name == "DescribeTags":
return {
"TagList": [
{"Key": "test", "Value": "test"},
]
}
return make_api_call(self, operation_name, kwarg)
@@ -91,5 +97,8 @@ class Test_WorkSpaces_Service:
assert len(workspaces.workspaces) == 1
assert workspaces.workspaces[0].id == workspace_id
assert workspaces.workspaces[0].region == AWS_REGION
assert workspaces.workspaces[0].tags == [
{"Key": "test", "Value": "test"},
]
assert workspaces.workspaces[0].user_volume_encryption_enabled
assert workspaces.workspaces[0].root_volume_encryption_enabled