feat(ec2): Add ResourceArn (#1649)

Co-authored-by: sergargar <sergio@verica.io>
This commit is contained in:
Gabriel Soltz
2023-01-04 11:55:58 +01:00
committed by GitHub
parent 54fbaa808e
commit 6ed0c59762
75 changed files with 565 additions and 16 deletions

View File

@@ -9,7 +9,7 @@
"SubServiceName": "ami",
"ResourceIdTemplate": "arn:partition:service:region:account-id:resource-id",
"Severity": "critical",
"ResourceType": "AwsEc2SecurityGroup",
"ResourceType": "Other",
"Description": "Ensure there are no EC2 AMIs set as Public.",
"Risk": "A shared AMI is an AMI that a developer created and made available for other developers to use. If AMIs have embebed information about the environment could pose a security risk. You use a shared AMI at your own risk. Amazon can not vouch for the integrity or security of AMIs shared by Amazon EC2 users.",
"RelatedUrl": "",

View File

@@ -9,6 +9,7 @@ class ec2_ami_public(Check):
report = Check_Report_AWS(self.metadata())
report.region = image.region
report.resource_id = image.id
report.resource_arn = image.arn
report.status = "PASS"
report.status_extended = f"EC2 AMI {image.id} is not public."
if image.public:

View File

@@ -8,6 +8,7 @@ class ec2_ebs_public_snapshot(Check):
for snapshot in ec2_client.snapshots:
report = Check_Report_AWS(self.metadata())
report.region = snapshot.region
report.resource_arn = snapshot.arn
if not snapshot.public:
report.status = "PASS"
report.status_extended = f"EBS Snapshot {snapshot.id} is not Public."

View File

@@ -8,6 +8,7 @@ class ec2_ebs_snapshots_encrypted(Check):
for snapshot in ec2_client.snapshots:
report = Check_Report_AWS(self.metadata())
report.region = snapshot.region
report.resource_arn = snapshot.arn
if snapshot.encrypted:
report.status = "PASS"
report.status_extended = f"EBS Snapshot {snapshot.id} is encrypted."

View File

@@ -9,6 +9,7 @@ class ec2_ebs_volume_encryption(Check):
report = Check_Report_AWS(self.metadata())
report.region = volume.region
report.resource_id = volume.id
report.resource_arn = volume.arn
if volume.encrypted:
report.status = "PASS"
report.status_extended = f"EBS Snapshot {volume.id} is encrypted."

View File

@@ -15,6 +15,7 @@ class ec2_elastic_ip_shodan(Check):
for eip in ec2_client.elastic_ips:
report = Check_Report_AWS(self.metadata())
report.region = eip.region
report.resource_arn = eip.arn
if eip.public_ip:
try:
shodan_info = api.host(eip.public_ip)

View File

@@ -10,6 +10,7 @@ class ec2_elastic_ip_unassgined(Check):
report.region = eip.region
if eip.public_ip:
report.resource_id = eip.public_ip
report.resource_arn = eip.arn
report.status = "FAIL"
report.status_extended = f"Elastic IP {eip.public_ip} is not associated with an instance or network interface."
if eip.association_id:

View File

@@ -9,6 +9,7 @@ class ec2_instance_imdsv2_enabled(Check):
report = Check_Report_AWS(self.metadata())
report.region = instance.region
report.resource_id = instance.id
report.resource_arn = instance.arn
report.status = "FAIL"
report.status_extended = (
f"EC2 Instance {instance.id} has IMDSv2 disabled or not required."

View File

@@ -9,6 +9,7 @@ class ec2_instance_internet_facing_with_instance_profile(Check):
report = Check_Report_AWS(self.metadata())
report.region = instance.region
report.resource_id = instance.id
report.resource_arn = instance.arn
report.status = "PASS"
report.status_extended = f"EC2 Instance {instance.id} is not internet facing with an instance profile."
if instance.public_ip and instance.instance_profile:

View File

@@ -9,6 +9,7 @@ class ec2_instance_managed_by_ssm(Check):
for instance in ec2_client.instances:
report = Check_Report_AWS(self.metadata())
report.region = instance.region
report.resource_arn = instance.arn
if not ssm_client.managed_instances.get(instance.id):
report.status = "FAIL"
report.status_extended = (

View File

@@ -13,6 +13,7 @@ class ec2_instance_older_than_specific_days(Check):
report = Check_Report_AWS(self.metadata())
report.region = instance.region
report.resource_id = instance.id
report.resource_arn = instance.arn
report.status = "PASS"
report.status_extended = f"EC2 Instance {instance.id} is not running."
if instance.state == "running":

View File

@@ -9,6 +9,7 @@ class ec2_instance_profile_attached(Check):
report = Check_Report_AWS(self.metadata())
report.region = instance.region
report.resource_id = instance.id
report.resource_arn = instance.arn
report.status = "FAIL"
report.status_extended = f"EC2 Instance {instance.id} not associated with an Instance Profile Role."
if instance.instance_profile:

View File

@@ -8,6 +8,7 @@ class ec2_instance_public_ip(Check):
for instance in ec2_client.instances:
report = Check_Report_AWS(self.metadata())
report.region = instance.region
report.resource_arn = instance.arn
if instance.public_ip:
report.status = "FAIL"
report.status_extended = f"EC2 Instance {instance.id} has a Public IP: {instance.public_ip} ({instance.public_dns})."

View File

@@ -16,6 +16,7 @@ class ec2_instance_secrets_user_data(Check):
report = Check_Report_AWS(self.metadata())
report.region = instance.region
report.resource_id = instance.id
report.resource_arn = instance.arn
if instance.user_data:
temp_user_data_file = tempfile.NamedTemporaryFile(delete=False)

View File

@@ -12,6 +12,7 @@ class ec2_networkacl_allow_ingress_any_port(Check):
report = Check_Report_AWS(self.metadata())
report.region = network_acl.region
report.resource_id = network_acl.id
report.resource_arn = network_acl.arn
# If some entry allows it, that ACL is not securely configured
if not check_network_acl(network_acl.entries, tcp_protocol, check_port):
report.status = "PASS"

View File

@@ -11,6 +11,7 @@ class ec2_networkacl_allow_ingress_tcp_port_22(Check):
for network_acl in ec2_client.network_acls:
report = Check_Report_AWS(self.metadata())
report.region = network_acl.region
report.resource_arn = network_acl.arn
# If some entry allows it, that ACL is not securely configured
if not check_network_acl(network_acl.entries, tcp_protocol, check_port):
report.status = "PASS"

View File

@@ -11,6 +11,7 @@ class ec2_networkacl_allow_ingress_tcp_port_3389(Check):
for network_acl in ec2_client.network_acls:
report = Check_Report_AWS(self.metadata())
report.region = network_acl.region
report.resource_arn = network_acl.arn
# If some entry allows it, that ACL is not securely configured
if not check_network_acl(network_acl.entries, tcp_protocol, check_port):
report.status = "PASS"

View File

@@ -12,6 +12,7 @@ class ec2_securitygroup_allow_ingress_from_internet_to_any_port(Check):
report.status = "PASS"
report.status_extended = f"Security group {security_group.name} ({security_group.id}) has not all ports open to the Internet."
report.resource_id = security_group.id
report.resource_arn = security_group.arn
# Loop through every security group's ingress rule and check it
for ingress_rule in security_group.ingress_rules:
if check_security_group(ingress_rule, "-1", any_address=True):

View File

@@ -11,6 +11,7 @@ class ec2_securitygroup_allow_ingress_from_internet_to_port_mongodb_27017_27018(
report = Check_Report_AWS(self.metadata())
report.region = security_group.region
report.resource_id = security_group.id
report.resource_arn = security_group.arn
report.status = "PASS"
report.status_extended = f"Security group {security_group.name} ({security_group.id}) has not MongoDB ports 27017 and 27018 open to the Internet."
# Loop through every security group's ingress rule and check it

View File

@@ -13,6 +13,7 @@ class ec2_securitygroup_allow_ingress_from_internet_to_tcp_ftp_port_20_21(Check)
report.status = "PASS"
report.status_extended = f"Security group {security_group.name} ({security_group.id}) has not FTP ports 20 and 21 open to the Internet."
report.resource_id = security_group.id
report.resource_arn = security_group.arn
# Loop through every security group's ingress rule and check it
for ingress_rule in security_group.ingress_rules:
if check_security_group(

View File

@@ -13,6 +13,7 @@ class ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_22(Check):
report.status = "PASS"
report.status_extended = f"Security group {security_group.name} ({security_group.id}) has not SSH port 22 open to the Internet."
report.resource_id = security_group.id
report.resource_arn = security_group.arn
# Loop through every security group's ingress rule and check it
for ingress_rule in security_group.ingress_rules:
if check_security_group(

View File

@@ -13,6 +13,7 @@ class ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_3389(Check):
report.status = "PASS"
report.status_extended = f"Security group {security_group.name} ({security_group.id}) has not Microsoft RDP port 3389 open to the Internet."
report.resource_id = security_group.id
report.resource_arn = security_group.arn
# Loop through every security group's ingress rule and check it
for ingress_rule in security_group.ingress_rules:
if check_security_group(

View File

@@ -13,6 +13,7 @@ class ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_cassandra_7199_9
report = Check_Report_AWS(self.metadata())
report.region = security_group.region
report.resource_id = security_group.id
report.resource_arn = security_group.arn
report.status = "PASS"
report.status_extended = f"Security group {security_group.name} ({security_group.id}) has not Casandra ports 7199, 8888 and 9160 open to the Internet."
# Loop through every security group's ingress rule and check it

View File

@@ -13,6 +13,7 @@ class ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_elasticsearch_ki
report = Check_Report_AWS(self.metadata())
report.region = security_group.region
report.resource_id = security_group.id
report.resource_arn = security_group.arn
report.status = "PASS"
report.status_extended = f"Security group {security_group.name} ({security_group.id}) has not Elasticsearch/Kibana ports 9200, 9300 and 5601 open to the Internet."
# Loop through every security group's ingress rule and check it

View File

@@ -11,6 +11,7 @@ class ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_kafka_9092(Check
report = Check_Report_AWS(self.metadata())
report.region = security_group.region
report.resource_id = security_group.id
report.resource_arn = security_group.arn
report.status = "PASS"
report.status_extended = f"Security group {security_group.name} ({security_group.id}) has not Kafka port 9092 open to the Internet."
# Loop through every security group's ingress rule and check it

View File

@@ -11,6 +11,7 @@ class ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_memcached_11211(
report = Check_Report_AWS(self.metadata())
report.region = security_group.region
report.resource_id = security_group.id
report.resource_arn = security_group.arn
report.status = "PASS"
report.status_extended = f"Security group {security_group.name} ({security_group.id}) has not Memcached port 11211 open to the Internet."
# Loop through every security group's ingress rule and check it

View File

@@ -11,6 +11,7 @@ class ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_mysql_3306(Check
report = Check_Report_AWS(self.metadata())
report.region = security_group.region
report.resource_id = security_group.id
report.resource_arn = security_group.arn
report.status = "PASS"
report.status_extended = f"Security group {security_group.name} ({security_group.id}) has not MySQL port 3306 open to the Internet."
# Loop through every security group's ingress rule and check it

View File

@@ -11,6 +11,7 @@ class ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_oracle_1521_2483
report = Check_Report_AWS(self.metadata())
report.region = security_group.region
report.resource_id = security_group.id
report.resource_arn = security_group.arn
report.status = "PASS"
report.status_extended = f"Security group {security_group.name} ({security_group.id}) has not Oracle ports 1521 and 2483 open to the Internet."
# Loop through every security group's ingress rule and check it

View File

@@ -11,6 +11,7 @@ class ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_postgres_5432(Ch
report = Check_Report_AWS(self.metadata())
report.region = security_group.region
report.resource_id = security_group.id
report.resource_arn = security_group.arn
report.status = "PASS"
report.status_extended = f"Security group {security_group.name} ({security_group.id}) has not Postgres port 5432 open to the Internet."
# Loop through every security group's ingress rule and check it

View File

@@ -11,6 +11,7 @@ class ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_redis_6379(Check
report = Check_Report_AWS(self.metadata())
report.region = security_group.region
report.resource_id = security_group.id
report.resource_arn = security_group.arn
report.status = "PASS"
report.status_extended = f"Security group {security_group.name} ({security_group.id}) has not Redis port 6379 open to the Internet."
# Loop through every security group's ingress rule and check it

View File

@@ -13,6 +13,7 @@ class ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_sql_server_1433_
report = Check_Report_AWS(self.metadata())
report.region = security_group.region
report.resource_id = security_group.id
report.resource_arn = security_group.arn
report.status = "PASS"
report.status_extended = f"Security group {security_group.name} ({security_group.id}) has not Microsoft SQL Server ports 1433 and 1434 open to the Internet."
# Loop through every security group's ingress rule and check it

View File

@@ -11,6 +11,7 @@ class ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_telnet_23(Check)
report = Check_Report_AWS(self.metadata())
report.region = security_group.region
report.resource_id = security_group.id
report.resource_arn = security_group.arn
report.status = "PASS"
report.status_extended = f"Security group {security_group.name} ({security_group.id}) has not Telnet port 23 open to the Internet."
# Loop through every security group's ingress rule and check it

View File

@@ -12,6 +12,7 @@ class ec2_securitygroup_allow_wide_open_public_ipv4(Check):
report = Check_Report_AWS(self.metadata())
report.region = security_group.region
report.resource_id = security_group.id
report.resource_arn = security_group.arn
report.status = "PASS"
report.status_extended = f"Security group {security_group.name} ({security_group.id}) has no potential wide-open non-RFC1918 address."
# Loop through every security group's ingress rule and check it

View File

@@ -10,6 +10,7 @@ class ec2_securitygroup_default_restrict_traffic(Check):
report = Check_Report_AWS(self.metadata())
report.region = security_group.region
report.resource_id = security_group.id
report.resource_arn = security_group.arn
# Find default security group
if security_group.name == "default":
report.status = "PASS"

View File

@@ -9,6 +9,7 @@ class ec2_securitygroup_from_launch_wizard(Check):
report = Check_Report_AWS(self.metadata())
report.region = security_group.region
report.resource_id = security_group.id
report.resource_arn = security_group.arn
report.status = "PASS"
report.status_extended = f"Security group {security_group.name} ({security_group.id}) was not created using the EC2 Launch Wizard."
if "launch-wizard" in security_group.name:

View File

@@ -10,6 +10,7 @@ class ec2_securitygroup_in_use_without_ingress_filtering(Check):
report = Check_Report_AWS(self.metadata())
report.region = security_group.region
report.resource_id = security_group.id
report.resource_arn = security_group.arn
report.status = "PASS"
report.status_extended = f"Security group {security_group.name} ({security_group.id}) has ingress filtering."
for ingress_rule in security_group.ingress_rules:

View File

@@ -9,6 +9,7 @@ class ec2_securitygroup_not_used(Check):
report = Check_Report_AWS(self.metadata())
report.region = security_group.region
report.resource_id = security_group.id
report.resource_arn = security_group.arn
report.status = "PASS"
report.status_extended = f"Security group {security_group.name} ({security_group.id}) it is being used."
if len(security_group.network_interfaces) == 0:

View File

@@ -11,6 +11,7 @@ class ec2_securitygroup_with_many_ingress_egress_rules(Check):
report = Check_Report_AWS(self.metadata())
report.region = security_group.region
report.resource_id = security_group.id
report.resource_arn = security_group.arn
report.status = "PASS"
report.status_extended = f"Security group {security_group.name} ({security_group.id}) has {len(security_group.ingress_rules)} inbound rules and {len(security_group.egress_rules)} outbound rules"
if (

View File

@@ -54,6 +54,7 @@ class EC2:
for page in describe_instances_paginator.paginate():
for reservation in page["Reservations"]:
for instance in reservation["Instances"]:
arn = f"arn:{self.audited_partition}:ec2:{regional_client.region}:{self.audited_account}:instance/{instance['InstanceId']}"
http_tokens = None
http_endpoint = None
public_dns = None
@@ -74,6 +75,7 @@ class EC2:
self.instances.append(
Instance(
instance["InstanceId"],
arn,
instance["State"]["Name"],
regional_client.region,
instance["InstanceType"],
@@ -101,9 +103,11 @@ class EC2:
)
for page in describe_security_groups_paginator.paginate():
for sg in page["SecurityGroups"]:
arn = f"arn:{self.audited_partition}:ec2:{regional_client.region}:{self.audited_account}:security-group/{sg['GroupId']}"
self.security_groups.append(
SecurityGroup(
sg["GroupName"],
arn,
regional_client.region,
sg["GroupId"],
sg["IpPermissions"],
@@ -123,9 +127,11 @@ class EC2:
)
for page in describe_network_acls_paginator.paginate():
for nacl in page["NetworkAcls"]:
arn = f"arn:{self.audited_partition}:ec2:{regional_client.region}:{self.audited_account}:network-acl/{nacl['NetworkAclId']}"
self.network_acls.append(
NetworkACL(
nacl["NetworkAclId"],
arn,
regional_client.region,
nacl["Entries"],
)
@@ -144,11 +150,15 @@ class EC2:
encrypted = False
for page in describe_snapshots_paginator.paginate(OwnerIds=["self"]):
for snapshot in page["Snapshots"]:
arn = f"arn:{self.audited_partition}:ec2:{regional_client.region}:{self.audited_account}:snapshot/{snapshot['SnapshotId']}"
if snapshot["Encrypted"]:
encrypted = True
self.snapshots.append(
Snapshot(
snapshot["SnapshotId"], regional_client.region, encrypted
snapshot["SnapshotId"],
arn,
regional_client.region,
encrypted,
)
)
except Exception as error:
@@ -220,11 +230,16 @@ class EC2:
try:
public = False
for image in regional_client.describe_images(Owners=["self"])["Images"]:
arn = f"arn:{self.audited_partition}:ec2:{regional_client.region}:{self.audited_account}:image/{image['ImageId']}"
if image["Public"]:
public = True
self.images.append(
Image(
image["ImageId"], image["Name"], public, regional_client.region
image["ImageId"],
arn,
image["Name"],
public,
regional_client.region,
)
)
except Exception as error:
@@ -240,9 +255,11 @@ class EC2:
)
for page in describe_volumes_paginator.paginate():
for volume in page["Volumes"]:
arn = f"arn:{self.audited_partition}:ec2:{regional_client.region}:{self.audited_account}:volume/{volume['VolumeId']}"
self.volumes.append(
Volume(
volume["VolumeId"],
arn,
regional_client.region,
volume["Encrypted"],
)
@@ -301,6 +318,7 @@ class EC2:
@dataclass
class Instance:
id: str
arn: str
state: str
region: str
type: str
@@ -318,6 +336,7 @@ class Instance:
def __init__(
self,
id,
arn,
state,
region,
type,
@@ -332,6 +351,7 @@ class Instance:
instance_profile,
):
self.id = id
self.arn = arn
self.state = state
self.region = region
self.type = type
@@ -350,12 +370,14 @@ class Instance:
@dataclass
class Snapshot:
id: str
arn: str
region: str
encrypted: bool
public: bool
def __init__(self, id, region, encrypted):
def __init__(self, id, arn, region, encrypted):
self.id = id
self.arn = arn
self.region = region
self.encrypted = encrypted
self.public = False
@@ -364,11 +386,13 @@ class Snapshot:
@dataclass
class Volume:
id: str
arn: str
region: str
encrypted: bool
def __init__(self, id, region, encrypted):
def __init__(self, id, arn, region, encrypted):
self.id = id
self.arn = arn
self.region = region
self.encrypted = encrypted
@@ -376,14 +400,16 @@ class Volume:
@dataclass
class SecurityGroup:
name: str
arn: str
region: str
id: str
network_interfaces: list[str]
ingress_rules: list[dict]
egress_rules: list[dict]
def __init__(self, name, region, id, ingress_rules, egress_rules):
def __init__(self, name, arn, region, id, ingress_rules, egress_rules):
self.name = name
self.arn = arn
self.region = region
self.id = id
self.ingress_rules = ingress_rules
@@ -394,11 +420,13 @@ class SecurityGroup:
@dataclass
class NetworkACL:
id: str
arn: str
region: str
entries: list[dict]
def __init__(self, id, region, entries):
def __init__(self, id, arn, region, entries):
self.id = id
self.arn = arn
self.region = region
self.entries = entries
@@ -422,12 +450,14 @@ class ElasticIP:
@dataclass
class Image:
id: str
arn: str
name: str
public: bool
region: str
def __init__(self, id, name, public, region):
def __init__(self, id, arn, name, public, region):
self.id = id
self.arn = arn
self.name = name
self.public = public
self.region = region

View File

@@ -65,6 +65,10 @@ class Test_ec2_ami_public:
assert result[0].status == "PASS"
assert result[0].status_extended == f"EC2 AMI {image_id} is not public."
assert result[0].resource_id == image_id
assert (
result[0].resource_arn
== f"arn:{current_audit_info.audited_partition}:ec2:{AWS_REGION}:{current_audit_info.audited_account}:image/{image_id}"
)
@mock_ec2
def test_one_public_ami(self):
@@ -111,3 +115,7 @@ class Test_ec2_ami_public:
result[0].status_extended == f"EC2 AMI {image_id} is currently public."
)
assert result[0].resource_id == image_id
assert (
result[0].resource_arn
== f"arn:{current_audit_info.audited_partition}:ec2:{AWS_REGION}:{current_audit_info.audited_account}:image/{image_id}"
)

View File

@@ -72,6 +72,10 @@ class Test_ec2_ebs_public_snapshot:
snap.status_extended
== f"EBS Snapshot {snapshot.id} is currently Public."
)
assert (
snap.resource_arn
== f"arn:{current_audit_info.audited_partition}:ec2:{AWS_REGION}:{current_audit_info.audited_account}:snapshot/{snapshot.id}"
)
@mock_ec2
def test_ec2_private_snapshot(self):
@@ -109,3 +113,7 @@ class Test_ec2_ebs_public_snapshot:
snap.status_extended
== f"EBS Snapshot {snapshot.id} is not Public."
)
assert (
snap.resource_arn
== f"arn:{current_audit_info.audited_partition}:ec2:{AWS_REGION}:{current_audit_info.audited_account}:snapshot/{snapshot.id}"
)

View File

@@ -66,6 +66,10 @@ class Test_ec2_ebs_snapshots_encrypted:
snap.status_extended
== f"EBS Snapshot {snapshot.id} is unencrypted."
)
assert (
snap.resource_arn
== f"arn:{current_audit_info.audited_partition}:ec2:{AWS_REGION}:{current_audit_info.audited_account}:snapshot/{snapshot.id}"
)
@mock_ec2
def test_ec2_encrypted_snapshot(self):
@@ -103,3 +107,7 @@ class Test_ec2_ebs_snapshots_encrypted:
snap.status_extended
== f"EBS Snapshot {snapshot.id} is encrypted."
)
assert (
snap.resource_arn
== f"arn:{current_audit_info.audited_partition}:ec2:{AWS_REGION}:{current_audit_info.audited_account}:snapshot/{snapshot.id}"
)

View File

@@ -60,6 +60,10 @@ class Test_ec2_ebs_volume_encryption:
assert (
result[0].status_extended == f"EBS Snapshot {volume.id} is unencrypted."
)
assert (
result[0].resource_arn
== f"arn:{current_audit_info.audited_partition}:ec2:{AWS_REGION}:{current_audit_info.audited_account}:volume/{volume.id}"
)
@mock_ec2
def test_ec2_encrypted_volume(self):
@@ -93,3 +97,7 @@ class Test_ec2_ebs_volume_encryption:
assert (
result[0].status_extended == f"EBS Snapshot {volume.id} is encrypted."
)
assert (
result[0].resource_arn
== f"arn:{current_audit_info.audited_partition}:ec2:{AWS_REGION}:{current_audit_info.audited_account}:volume/{volume.id}"
)

View File

@@ -36,7 +36,9 @@ class Test_ec2_elastic_ip_unassgined:
def test_eip_unassociated(self):
# Create EC2 Mocked Resources
ec2_client = client("ec2", region_name=AWS_REGION)
ec2_client.allocate_address(Domain="vpc", Address="127.38.43.222")
allocation_id = ec2_client.allocate_address(
Domain="vpc", Address="127.38.43.222"
)["AllocationId"]
from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info
from prowler.providers.aws.services.ec2.ec2_service import EC2
@@ -62,6 +64,10 @@ class Test_ec2_elastic_ip_unassgined:
"is not associated",
results[0].status_extended,
)
assert (
results[0].resource_arn
== f"arn:{current_audit_info.audited_partition}:ec2:{AWS_REGION}:{current_audit_info.audited_account}:eip-allocation/{allocation_id}"
)
@mock_ec2
def test_eip_associated(self):
@@ -106,3 +112,7 @@ class Test_ec2_elastic_ip_unassgined:
"is associated",
results[0].status_extended,
)
assert (
results[0].resource_arn
== f"arn:{current_audit_info.audited_partition}:ec2:{AWS_REGION}:{current_audit_info.audited_account}:eip-allocation/{eip.allocation_id}"
)

View File

@@ -73,6 +73,10 @@ class Test_ec2_instance_imdsv2_enabled:
result[0].status_extended,
)
assert result[0].resource_id == instance.id
assert (
result[0].resource_arn
== f"arn:{current_audit_info.audited_partition}:ec2:{AWS_REGION}:{current_audit_info.audited_account}:instance/{instance.id}"
)
@mock_ec2
def test_one_uncompliant_ec2(self):
@@ -115,3 +119,7 @@ class Test_ec2_instance_imdsv2_enabled:
result[0].status_extended,
)
assert result[0].resource_id == instance.id
assert (
result[0].resource_arn
== f"arn:{current_audit_info.audited_partition}:ec2:{AWS_REGION}:{current_audit_info.audited_account}:instance/{instance.id}"
)

View File

@@ -81,6 +81,10 @@ class Test_ec2_instance_internet_facing_with_instance_profile:
result[0].status_extended,
)
assert result[0].resource_id == instance.id
assert (
result[0].resource_arn
== f"arn:{current_audit_info.audited_partition}:ec2:{AWS_REGION}:{current_audit_info.audited_account}:instance/{instance.id}"
)
@mock_iam
@mock_ec2
@@ -130,3 +134,7 @@ class Test_ec2_instance_internet_facing_with_instance_profile:
"is internet-facing with Instance Profile", result[0].status_extended
)
assert result[0].resource_id == instance.id
assert (
result[0].resource_arn
== f"arn:{current_audit_info.audited_partition}:ec2:{AWS_REGION}:{current_audit_info.audited_account}:instance/{instance.id}"
)

View File

@@ -68,6 +68,10 @@ class Test_ec2_instance_older_than_specific_days:
f"EC2 Instance {instance.id} is not older", result[0].status_extended
)
assert result[0].resource_id == instance.id
assert (
result[0].resource_arn
== f"arn:{current_audit_info.audited_partition}:ec2:{AWS_REGION}:{current_audit_info.audited_account}:instance/{instance.id}"
)
@mock_ec2
def test_one_old_ec2(self):
@@ -107,3 +111,7 @@ class Test_ec2_instance_older_than_specific_days:
f"EC2 Instance {instance.id} is older", result[0].status_extended
)
assert result[0].resource_id == instance.id
assert (
result[0].resource_arn
== f"arn:{current_audit_info.audited_partition}:ec2:{AWS_REGION}:{current_audit_info.audited_account}:instance/{instance.id}"
)

View File

@@ -81,6 +81,10 @@ class Test_ec2_instance_profile_attached:
result[0].status_extended,
)
assert result[0].resource_id == instance.id
assert (
result[0].resource_arn
== f"arn:{current_audit_info.audited_partition}:ec2:{AWS_REGION}:{current_audit_info.audited_account}:instance/{instance.id}"
)
@mock_ec2
def test_one_non_compliant_ec2(self):
@@ -123,3 +127,7 @@ class Test_ec2_instance_profile_attached:
"not associated with an Instance Profile", result[0].status_extended
)
assert result[0].resource_id == instance.id
assert (
result[0].resource_arn
== f"arn:{current_audit_info.audited_partition}:ec2:{AWS_REGION}:{current_audit_info.audited_account}:instance/{instance.id}"
)

View File

@@ -75,6 +75,10 @@ class Test_ec2_instance_public_ip:
result[0].status_extended,
)
assert result[0].resource_id == instance.id
assert (
result[0].resource_arn
== f"arn:{current_audit_info.audited_partition}:ec2:{AWS_REGION}:{current_audit_info.audited_account}:instance/{instance.id}"
)
@mock_ec2
def test_one_ec2_with_public_ip(self):
@@ -118,3 +122,7 @@ class Test_ec2_instance_public_ip:
f"EC2 Instance {instance.id} has a Public IP", result[0].status_extended
)
assert result[0].resource_id == instance.id
assert (
result[0].resource_arn
== f"arn:{current_audit_info.audited_partition}:ec2:{AWS_REGION}:{current_audit_info.audited_account}:instance/{instance.id}"
)

View File

@@ -102,6 +102,10 @@ class Test_ec2_instance_secrets_user_data:
== f"Potential secret found in EC2 instance {instance.id} User Data."
)
assert result[0].resource_id == instance.id
assert (
result[0].resource_arn
== f"arn:{current_audit_info.audited_partition}:ec2:{AWS_REGION}:{current_audit_info.audited_account}:instance/{instance.id}"
)
@mock_ec2
def test_one_ec2_file_with_secrets(self):
@@ -140,6 +144,10 @@ class Test_ec2_instance_secrets_user_data:
== f"Potential secret found in EC2 instance {instance.id} User Data."
)
assert result[0].resource_id == instance.id
assert (
result[0].resource_arn
== f"arn:{current_audit_info.audited_partition}:ec2:{AWS_REGION}:{current_audit_info.audited_account}:instance/{instance.id}"
)
@mock_ec2
def test_one_launch_configurations_without_user_data(self):
@@ -173,3 +181,7 @@ class Test_ec2_instance_secrets_user_data:
== f"No secrets found in EC2 instance {instance.id} since User Data is empty."
)
assert result[0].resource_id == instance.id
assert (
result[0].resource_arn
== f"arn:{current_audit_info.audited_partition}:ec2:{AWS_REGION}:{current_audit_info.audited_account}:instance/{instance.id}"
)

View File

@@ -107,6 +107,10 @@ class ec2_networkacl_allow_ingress_any_port:
nacl.status_extended
== f"Network ACL {nacl_id} has every port open to the Internet."
)
assert (
nacl.resource_arn
== f"arn:{current_audit_info.audited_partition}:ec2:{AWS_REGION}:{current_audit_info.audited_account}:network-acl/{nacl_id}"
)
@mock_ec2
def test_ec2_compliant_nacl(self):
@@ -153,3 +157,7 @@ class ec2_networkacl_allow_ingress_any_port:
nacl.status_extended
== f"Network ACL {nacl_id} has not every port open to the Internet."
)
assert (
nacl.resource_arn
== f"arn:{current_audit_info.audited_partition}:ec2:{AWS_REGION}:{current_audit_info.audited_account}:network-acl/{nacl_id}"
)

View File

@@ -108,6 +108,10 @@ class Test_ec2_networkacl_allow_ingress_tcp_port_22:
nacl.status_extended
== f"Network ACL {nacl_id} has SSH port 22 open to the Internet."
)
assert (
nacl.resource_arn
== f"arn:{current_audit_info.audited_partition}:ec2:{AWS_REGION}:{current_audit_info.audited_account}:network-acl/{nacl_id}"
)
@mock_ec2
def test_ec2_compliant_nacl(self):
@@ -155,3 +159,7 @@ class Test_ec2_networkacl_allow_ingress_tcp_port_22:
nacl.status_extended
== f"Network ACL {nacl_id} has not SSH port 22 open to the Internet."
)
assert (
nacl.resource_arn
== f"arn:{current_audit_info.audited_partition}:ec2:{AWS_REGION}:{current_audit_info.audited_account}:network-acl/{nacl_id}"
)

View File

@@ -108,6 +108,10 @@ class Test_ec2_networkacl_allow_ingress_tcp_port_3389:
nacl.status_extended
== f"Network ACL {nacl_id} has Microsoft RDP port 3389 open to the Internet."
)
assert (
nacl.resource_arn
== f"arn:{current_audit_info.audited_partition}:ec2:{AWS_REGION}:{current_audit_info.audited_account}:network-acl/{nacl_id}"
)
@mock_ec2
def test_ec2_compliant_nacl(self):
@@ -155,3 +159,7 @@ class Test_ec2_networkacl_allow_ingress_tcp_port_3389:
nacl.status_extended
== f"Network ACL {nacl_id} has not Microsoft RDP port 3389 open to the Internet."
)
assert (
nacl.resource_arn
== f"arn:{current_audit_info.audited_partition}:ec2:{AWS_REGION}:{current_audit_info.audited_account}:network-acl/{nacl_id}"
)

View File

@@ -83,6 +83,10 @@ class Test_ec2_securitygroup_allow_ingress_from_internet_to_any_port:
"has all ports open to the Internet",
sg.status_extended,
)
assert (
sg.resource_arn
== f"arn:{current_audit_info.audited_partition}:ec2:{AWS_REGION}:{current_audit_info.audited_account}:security-group/{default_sg_id}"
)
@mock_ec2
def test_ec2_compliant_default_sg(self):
@@ -130,3 +134,7 @@ class Test_ec2_securitygroup_allow_ingress_from_internet_to_any_port:
"has not all ports open to the Internet",
sg.status_extended,
)
assert (
sg.resource_arn
== f"arn:{current_audit_info.audited_partition}:ec2:{AWS_REGION}:{current_audit_info.audited_account}:security-group/{default_sg_id}"
)

View File

@@ -89,6 +89,10 @@ class Test_ec2_securitygroup_allow_ingress_from_internet_to_port_mongodb_27017_2
"has MongoDB ports 27017 and 27018 open to the Internet",
sg.status_extended,
)
assert (
sg.resource_arn
== f"arn:{current_audit_info.audited_partition}:ec2:{AWS_REGION}:{current_audit_info.audited_account}:security-group/{default_sg_id}"
)
@mock_ec2
def test_ec2_compliant_default_sg(self):
@@ -140,3 +144,7 @@ class Test_ec2_securitygroup_allow_ingress_from_internet_to_port_mongodb_27017_2
"has not MongoDB ports 27017 and 27018 open to the Internet",
sg.status_extended,
)
assert (
sg.resource_arn
== f"arn:{current_audit_info.audited_partition}:ec2:{AWS_REGION}:{current_audit_info.audited_account}:security-group/{default_sg_id}"
)

View File

@@ -89,6 +89,10 @@ class Test_ec2_securitygroup_allow_ingress_from_internet_to_tcp_ftp_port_20_21:
"has FTP ports 20 and 21 open to the Internet",
sg.status_extended,
)
assert (
sg.resource_arn
== f"arn:{current_audit_info.audited_partition}:ec2:{AWS_REGION}:{current_audit_info.audited_account}:security-group/{default_sg_id}"
)
@mock_ec2
def test_ec2_compliant_default_sg(self):
@@ -140,3 +144,7 @@ class Test_ec2_securitygroup_allow_ingress_from_internet_to_tcp_ftp_port_20_21:
"has not FTP ports 20 and 21 open to the Internet",
sg.status_extended,
)
assert (
sg.resource_arn
== f"arn:{current_audit_info.audited_partition}:ec2:{AWS_REGION}:{current_audit_info.audited_account}:security-group/{default_sg_id}"
)

View File

@@ -85,6 +85,10 @@ class Test_ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_22:
"has SSH port 22 open to the Internet",
sg.status_extended,
)
assert (
sg.resource_arn
== f"arn:{current_audit_info.audited_partition}:ec2:{AWS_REGION}:{current_audit_info.audited_account}:security-group/{default_sg_id}"
)
@mock_ec2
def test_ec2_compliant_default_sg(self):
@@ -134,3 +138,7 @@ class Test_ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_22:
"has not SSH port 22 open to the Internet",
sg.status_extended,
)
assert (
sg.resource_arn
== f"arn:{current_audit_info.audited_partition}:ec2:{AWS_REGION}:{current_audit_info.audited_account}:security-group/{default_sg_id}"
)

View File

@@ -85,6 +85,10 @@ class Test_ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_3389:
"has Microsoft RDP port 3389 open to the Internet",
sg.status_extended,
)
assert (
sg.resource_arn
== f"arn:{current_audit_info.audited_partition}:ec2:{AWS_REGION}:{current_audit_info.audited_account}:security-group/{default_sg_id}"
)
@mock_ec2
def test_ec2_compliant_default_sg(self):
@@ -134,3 +138,7 @@ class Test_ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_3389:
"has not Microsoft RDP port 3389 open to the Internet",
sg.status_extended,
)
assert (
sg.resource_arn
== f"arn:{current_audit_info.audited_partition}:ec2:{AWS_REGION}:{current_audit_info.audited_account}:security-group/{default_sg_id}"
)

View File

@@ -89,6 +89,10 @@ class Test_ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_cassandra_7
"has Casandra ports 7199, 8888 and 9160 open to the Internet",
sg.status_extended,
)
assert (
sg.resource_arn
== f"arn:{current_audit_info.audited_partition}:ec2:{AWS_REGION}:{current_audit_info.audited_account}:security-group/{default_sg_id}"
)
@mock_ec2
def test_ec2_compliant_default_sg(self):
@@ -140,3 +144,7 @@ class Test_ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_cassandra_7
"has not Casandra ports 7199, 8888 and 9160 open to the Internet",
sg.status_extended,
)
assert (
sg.resource_arn
== f"arn:{current_audit_info.audited_partition}:ec2:{AWS_REGION}:{current_audit_info.audited_account}:security-group/{default_sg_id}"
)

View File

@@ -89,6 +89,10 @@ class Test_ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_elasticsear
"has Elasticsearch/Kibana ports 9200, 9300 and 5601 open to the Internet",
sg.status_extended,
)
assert (
sg.resource_arn
== f"arn:{current_audit_info.audited_partition}:ec2:{AWS_REGION}:{current_audit_info.audited_account}:security-group/{default_sg_id}"
)
@mock_ec2
def test_ec2_compliant_default_sg(self):
@@ -140,3 +144,7 @@ class Test_ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_elasticsear
"has not Elasticsearch/Kibana ports 9200, 9300 and 5601 open to the Internet",
sg.status_extended,
)
assert (
sg.resource_arn
== f"arn:{current_audit_info.audited_partition}:ec2:{AWS_REGION}:{current_audit_info.audited_account}:security-group/{default_sg_id}"
)

View File

@@ -88,6 +88,10 @@ class Test_ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_kafka_9092:
assert search(
"has Kafka port 9092 open to the Internet", sg.status_extended
)
assert (
sg.resource_arn
== f"arn:{current_audit_info.audited_partition}:ec2:{AWS_REGION}:{current_audit_info.audited_account}:security-group/{default_sg_id}"
)
@mock_ec2
def test_ec2_compliant_default_sg(self):
@@ -139,3 +143,7 @@ class Test_ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_kafka_9092:
"has not Kafka port 9092 open to the Internet",
sg.status_extended,
)
assert (
sg.resource_arn
== f"arn:{current_audit_info.audited_partition}:ec2:{AWS_REGION}:{current_audit_info.audited_account}:security-group/{default_sg_id}"
)

View File

@@ -89,6 +89,10 @@ class Test_ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_memcached_1
"has Memcached port 11211 open to the Internet",
sg.status_extended,
)
assert (
sg.resource_arn
== f"arn:{current_audit_info.audited_partition}:ec2:{AWS_REGION}:{current_audit_info.audited_account}:security-group/{default_sg_id}"
)
@mock_ec2
def test_ec2_compliant_default_sg(self):
@@ -140,3 +144,7 @@ class Test_ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_memcached_1
"has not Memcached port 11211 open to the Internet",
sg.status_extended,
)
assert (
sg.resource_arn
== f"arn:{current_audit_info.audited_partition}:ec2:{AWS_REGION}:{current_audit_info.audited_account}:security-group/{default_sg_id}"
)

View File

@@ -89,6 +89,10 @@ class Test_ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_mysql_3306:
"has MySQL port 3306 open to the Internet",
sg.status_extended,
)
assert (
sg.resource_arn
== f"arn:{current_audit_info.audited_partition}:ec2:{AWS_REGION}:{current_audit_info.audited_account}:security-group/{default_sg_id}"
)
@mock_ec2
def test_ec2_compliant_default_sg(self):
@@ -140,3 +144,7 @@ class Test_ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_mysql_3306:
"has not MySQL port 3306 open to the Internet",
sg.status_extended,
)
assert (
sg.resource_arn
== f"arn:{current_audit_info.audited_partition}:ec2:{AWS_REGION}:{current_audit_info.audited_account}:security-group/{default_sg_id}"
)

View File

@@ -89,6 +89,10 @@ class Test_ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_oracle_1521
"has Oracle ports 1521 and 2483 open to the Internet",
sg.status_extended,
)
assert (
sg.resource_arn
== f"arn:{current_audit_info.audited_partition}:ec2:{AWS_REGION}:{current_audit_info.audited_account}:security-group/{default_sg_id}"
)
@mock_ec2
def test_ec2_compliant_default_sg(self):
@@ -140,3 +144,7 @@ class Test_ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_oracle_1521
"has not Oracle ports 1521 and 2483 open to the Internet",
sg.status_extended,
)
assert (
sg.resource_arn
== f"arn:{current_audit_info.audited_partition}:ec2:{AWS_REGION}:{current_audit_info.audited_account}:security-group/{default_sg_id}"
)

View File

@@ -89,6 +89,10 @@ class Test_ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_postgres_54
"has Postgres port 5432 open to the Internet",
sg.status_extended,
)
assert (
sg.resource_arn
== f"arn:{current_audit_info.audited_partition}:ec2:{AWS_REGION}:{current_audit_info.audited_account}:security-group/{default_sg_id}"
)
@mock_ec2
def test_ec2_compliant_default_sg(self):
@@ -140,3 +144,7 @@ class Test_ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_postgres_54
"has not Postgres port 5432 open to the Internet",
sg.status_extended,
)
assert (
sg.resource_arn
== f"arn:{current_audit_info.audited_partition}:ec2:{AWS_REGION}:{current_audit_info.audited_account}:security-group/{default_sg_id}"
)

View File

@@ -88,6 +88,10 @@ class Test_ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_redis_6379:
assert search(
"has Redis port 6379 open to the Internet", sg.status_extended
)
assert (
sg.resource_arn
== f"arn:{current_audit_info.audited_partition}:ec2:{AWS_REGION}:{current_audit_info.audited_account}:security-group/{default_sg_id}"
)
@mock_ec2
def test_ec2_compliant_default_sg(self):
@@ -139,3 +143,7 @@ class Test_ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_redis_6379:
"has not Redis port 6379 open to the Internet",
sg.status_extended,
)
assert (
sg.resource_arn
== f"arn:{current_audit_info.audited_partition}:ec2:{AWS_REGION}:{current_audit_info.audited_account}:security-group/{default_sg_id}"
)

View File

@@ -89,6 +89,10 @@ class ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_sql_server_1433_
"has Microsoft SQL Server ports 1433 and 1434 open to the Internet",
sg.status_extended,
)
assert (
sg.resource_arn
== f"arn:{current_audit_info.audited_partition}:ec2:{AWS_REGION}:{current_audit_info.audited_account}:security-group/{default_sg_id}"
)
@mock_ec2
def test_ec2_compliant_default_sg(self):
@@ -140,3 +144,7 @@ class ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_sql_server_1433_
"has not Microsoft SQL Server ports 1433 and 1434 open to the Internet",
sg.status_extended,
)
assert (
sg.resource_arn
== f"arn:{current_audit_info.audited_partition}:ec2:{AWS_REGION}:{current_audit_info.audited_account}:security-group/{default_sg_id}"
)

View File

@@ -88,6 +88,10 @@ class Test_ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_telnet_23:
assert search(
"has Telnet port 23 open to the Internet", sg.status_extended
)
assert (
sg.resource_arn
== f"arn:{current_audit_info.audited_partition}:ec2:{AWS_REGION}:{current_audit_info.audited_account}:security-group/{default_sg_id}"
)
@mock_ec2
def test_ec2_compliant_default_sg(self):
@@ -139,3 +143,7 @@ class Test_ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_telnet_23:
"has not Telnet port 23 open to the Internet",
sg.status_extended,
)
assert (
sg.resource_arn
== f"arn:{current_audit_info.audited_partition}:ec2:{AWS_REGION}:{current_audit_info.audited_account}:security-group/{default_sg_id}"
)

View File

@@ -83,6 +83,10 @@ class Test_ec2_securitygroup_allow_wide_open_public_ipv4:
"has no potential wide-open non-RFC1918 address",
sg.status_extended,
)
assert (
sg.resource_arn
== f"arn:{current_audit_info.audited_partition}:ec2:{AWS_REGION}:{current_audit_info.audited_account}:security-group/{default_sg_id}"
)
@mock_ec2
def test_ec2_default_sg_with_non_RFC1918_address(self):
@@ -130,3 +134,7 @@ class Test_ec2_securitygroup_allow_wide_open_public_ipv4:
"has potential wide-open non-RFC1918 address",
sg.status_extended,
)
assert (
sg.resource_arn
== f"arn:{current_audit_info.audited_partition}:ec2:{AWS_REGION}:{current_audit_info.audited_account}:security-group/{default_sg_id}"
)

View File

@@ -77,6 +77,10 @@ class Test_ec2_securitygroup_default_restrict_traffic:
sg.status_extended
== f"Default Security Group ({default_sg_id}) is open to the Internet."
)
assert (
sg.resource_arn
== f"arn:{current_audit_info.audited_partition}:ec2:{AWS_REGION}:{current_audit_info.audited_account}:security-group/{default_sg_id}"
)
@mock_ec2
def test_ec2_compliant_default_sg(self):
@@ -121,3 +125,7 @@ class Test_ec2_securitygroup_default_restrict_traffic:
sg.status_extended
== f"Default Security Group ({default_sg_id}) is not open to the Internet."
)
assert (
sg.resource_arn
== f"arn:{current_audit_info.audited_partition}:ec2:{AWS_REGION}:{current_audit_info.audited_account}:security-group/{default_sg_id}"
)

View File

@@ -75,6 +75,10 @@ class Test_ec2_securitygroup_from_launch_wizard:
"was created using the EC2 Launch Wizard",
sg.status_extended,
)
assert (
sg.resource_arn
== f"arn:{current_audit_info.audited_partition}:ec2:{AWS_REGION}:{current_audit_info.audited_account}:security-group/{sg_id}"
)
@mock_ec2
def test_ec2_compliant_default_sg(self):
@@ -122,3 +126,7 @@ class Test_ec2_securitygroup_from_launch_wizard:
"was not created using the EC2 Launch Wizard",
sg.status_extended,
)
assert (
sg.resource_arn
== f"arn:{current_audit_info.audited_partition}:ec2:{AWS_REGION}:{current_audit_info.audited_account}:security-group/{default_sg_id}"
)

View File

@@ -84,6 +84,10 @@ class Test_ec2_securitygroup_in_use_without_ingress_filtering:
"has no ingress filtering and it is not being used",
sg.status_extended,
)
assert (
sg.resource_arn
== f"arn:{current_audit_info.audited_partition}:ec2:{AWS_REGION}:{current_audit_info.audited_account}:security-group/{default_sg_id}"
)
@mock_ec2
def test_ec2_used_public_default_sg(self):
@@ -139,6 +143,10 @@ class Test_ec2_securitygroup_in_use_without_ingress_filtering:
"has no ingress filtering and it is being used",
sg.status_extended,
)
assert (
sg.resource_arn
== f"arn:{current_audit_info.audited_partition}:ec2:{AWS_REGION}:{current_audit_info.audited_account}:security-group/{default_sg_id}"
)
@mock_ec2
def test_ec2_private_default_sg(self):
@@ -177,3 +185,7 @@ class Test_ec2_securitygroup_in_use_without_ingress_filtering:
"has ingress filtering",
sg.status_extended,
)
assert (
sg.resource_arn
== f"arn:{current_audit_info.audited_partition}:ec2:{AWS_REGION}:{current_audit_info.audited_account}:security-group/{default_sg_id}"
)

View File

@@ -75,6 +75,10 @@ class Test_ec2_securitygroup_not_used:
"it is not being used",
sg.status_extended,
)
assert (
sg.resource_arn
== f"arn:{current_audit_info.audited_partition}:ec2:{AWS_REGION}:{current_audit_info.audited_account}:security-group/{default_sg_id}"
)
@mock_ec2
def test_ec2_used_default_sg(self):
@@ -122,3 +126,7 @@ class Test_ec2_securitygroup_not_used:
"it is being used",
sg.status_extended,
)
assert (
sg.resource_arn
== f"arn:{current_audit_info.audited_partition}:ec2:{AWS_REGION}:{current_audit_info.audited_account}:security-group/{default_sg_id}"
)

View File

@@ -85,6 +85,10 @@ class Test_ec2_securitygroup_with_many_ingress_egress_rules:
assert search(
"has 60 inbound rules and 1 outbound rules", sg.status_extended
)
assert (
sg.resource_arn
== f"arn:{current_audit_info.audited_partition}:ec2:{AWS_REGION}:{current_audit_info.audited_account}:security-group/{default_sg_id}"
)
@mock_ec2
def test_ec2_compliant_default_sg(self):
@@ -134,3 +138,7 @@ class Test_ec2_securitygroup_with_many_ingress_egress_rules:
"has 1 inbound rules and 1 outbound rules",
sg.status_extended,
)
assert (
sg.resource_arn
== f"arn:{current_audit_info.audited_partition}:ec2:{AWS_REGION}:{current_audit_info.audited_account}:security-group/{default_sg_id}"
)

View File

@@ -1,6 +1,11 @@
import ipaddress
import re
from base64 import b64decode
from datetime import datetime
from boto3 import client, resource, session
from dateutil.tz import tzutc
from freezegun import freeze_time
from moto import mock_ec2
from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info
@@ -9,6 +14,7 @@ from prowler.providers.aws.services.ec2.ec2_service import EC2
AWS_ACCOUNT_NUMBER = 123456789012
AWS_REGION = "us-east-1"
EXAMPLE_AMI_ID = "ami-12c6146b"
MOCK_DATETIME = datetime(2023, 1, 4, 7, 27, 30, tzinfo=tzutc())
class Test_EC2_Service:
@@ -68,6 +74,7 @@ class Test_EC2_Service:
# Test EC2 Describe Instances
@mock_ec2
@freeze_time(MOCK_DATETIME)
def test__describe_instances__(self):
# Generate EC2 Client
ec2_resource = resource("ec2", region_name=AWS_REGION)
@@ -75,17 +82,38 @@ class Test_EC2_Service:
# Get AMI image
image_response = ec2_client.describe_images()
image_id = image_response["Images"][0]["ImageId"]
# Create EC2 Instances
# Create EC2 Instances running
ec2_resource.create_instances(
MinCount=2,
MaxCount=2,
MinCount=1,
MaxCount=1,
ImageId=image_id,
)
# EC2 client for this test class
audit_info = self.set_mocked_audit_info()
ec2 = EC2(audit_info)
assert len(ec2.instances) == len(
ec2_client.describe_instances()["Reservations"][0]["Instances"]
assert len(ec2.instances) == 1
assert re.match(r"i-[0-9a-z]{17}", ec2.instances[0].id)
assert (
ec2.instances[0].arn
== f"arn:{audit_info.audited_partition}:ec2:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:instance/{ec2.instances[0].id}"
)
assert ec2.instances[0].type == "m1.small"
assert ec2.instances[0].state == "running"
assert re.match(r"ami-[0-9a-z]{8}", ec2.instances[0].image_id)
assert ec2.instances[0].launch_time == MOCK_DATETIME
assert not ec2.instances[0].user_data
assert not ec2.instances[0].http_tokens
assert not ec2.instances[0].http_endpoint
assert not ec2.instances[0].instance_profile
assert ipaddress.ip_address(ec2.instances[0].private_ip).is_private
assert (
ec2.instances[0].private_dns
== f"ip-{ec2.instances[0].private_ip.replace('.', '-')}.ec2.internal"
)
assert ipaddress.ip_address(ec2.instances[0].public_ip).is_global
assert (
ec2.instances[0].public_dns
== f"ec2-{ec2.instances[0].public_ip.replace('.', '-')}.compute-1.amazonaws.com"
)
# Test EC2 Describe Security Groups
@@ -101,7 +129,28 @@ class Test_EC2_Service:
# EC2 client for this test class
audit_info = self.set_mocked_audit_info()
ec2 = EC2(audit_info)
assert sg_id in str(ec2.security_groups)
for security_group in ec2.security_groups:
if security_group.id == sg_id:
assert security_group.name == "test-security-group"
assert (
security_group.arn
== f"arn:{audit_info.audited_partition}:ec2:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:security-group/{security_group.id}"
)
assert re.match(r"sg-[0-9a-z]{17}", security_group.id)
assert security_group.region == AWS_REGION
assert security_group.network_interfaces == []
assert security_group.ingress_rules == []
assert security_group.egress_rules == [
{
"IpProtocol": "-1",
"IpRanges": [{"CidrIp": "0.0.0.0/0"}],
"Ipv6Ranges": [],
"PrefixListIds": [],
"UserIdGroupPairs": [],
}
]
# Test EC2 Describe Nacls
@mock_ec2
@@ -117,7 +166,16 @@ class Test_EC2_Service:
# EC2 client for this test class
audit_info = self.set_mocked_audit_info()
ec2 = EC2(audit_info)
assert nacl_id in str(ec2.network_acls)
for acl in ec2.network_acls:
if acl.id == nacl_id:
assert re.match(r"acl-[0-9a-z]{8}", acl.id)
assert (
acl.arn
== f"arn:{audit_info.audited_partition}:ec2:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:network-acl/{acl.id}"
)
assert acl.entries == []
# Test EC2 Describe Snapshots
@mock_ec2
@@ -137,7 +195,18 @@ class Test_EC2_Service:
# EC2 client for this test class
audit_info = self.set_mocked_audit_info()
ec2 = EC2(audit_info)
assert snapshot_id in str(ec2.snapshots)
for snapshot in ec2.snapshots:
if snapshot.id == snapshot_id:
assert re.match(r"snap-[0-9a-z]{8}", snapshot.id)
assert (
snapshot.arn
== f"arn:{audit_info.audited_partition}:ec2:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:snapshot/{snapshot.id}"
)
assert snapshot.region == AWS_REGION
assert not snapshot.encrypted
assert not snapshot.public
# Test EC2 Get Snapshot Public
@mock_ec2
@@ -165,8 +234,17 @@ class Test_EC2_Service:
# EC2 client for this test class
audit_info = self.set_mocked_audit_info()
ec2 = EC2(audit_info)
assert snapshot_id in str(ec2.snapshots)
for snapshot in ec2.snapshots:
if snapshot.id == snapshot_id:
assert re.match(r"snap-[0-9a-z]{8}", snapshot.id)
assert (
snapshot.arn
== f"arn:{audit_info.audited_partition}:ec2:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:snapshot/{snapshot.id}"
)
assert snapshot.region == AWS_REGION
assert not snapshot.encrypted
assert snapshot.public
# Test EC2 Instance User Data
@@ -185,7 +263,7 @@ class Test_EC2_Service:
ec2 = EC2(audit_info)
assert user_data == b64decode(ec2.instances[0].user_data).decode("utf-8")
# Test EC2 Instance User Data
# Test EC2 Get EBS Encryption by default
@mock_ec2
def test__get_ebs_encryption_by_default__(self):
ec2_client = client("ec2", region_name=AWS_REGION)
@@ -200,7 +278,7 @@ class Test_EC2_Service:
if result.region == AWS_REGION:
assert result.status
# Test EC2 Describe Snapshots
# Test EC2 Describe Addresses
@mock_ec2
def test__describe_addresses__(self):
# Generate EC2 Client
@@ -216,3 +294,117 @@ class Test_EC2_Service:
ec2.elastic_ips[0].arn
== f"arn:aws:ec2:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:eip-allocation/{allocation_id}"
)
# Test EC2 Describe Network Interfaces
@mock_ec2
def test__describe_network_interfaces__(self):
# Generate EC2 Client
ec2_client = client("ec2", region_name=AWS_REGION)
ec2_resource = resource("ec2", region_name=AWS_REGION)
# Create VPC, Subnet, SecurityGroup and Network Interface
vpc = ec2_resource.create_vpc(CidrBlock="10.0.0.0/16")
subnet = ec2_resource.create_subnet(VpcId=vpc.id, CidrBlock="10.0.0.0/18")
sg = ec2_resource.create_security_group(
GroupName="test-securitygroup", Description="n/a"
)
eni_id = subnet.create_network_interface(Groups=[sg.id]).id
print(eni_id)
ec2_client.modify_network_interface_attribute(
NetworkInterfaceId=eni_id, Groups=[sg.id]
)
# EC2 client for this test class
audit_info = self.set_mocked_audit_info()
ec2 = EC2(audit_info)
assert sg.id in str(ec2.security_groups)
for security_group in ec2.security_groups:
if security_group.id == sg.id:
assert security_group.name == "test-securitygroup"
assert (
security_group.arn
== f"arn:{audit_info.audited_partition}:ec2:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:security-group/{security_group.id}"
)
assert re.match(r"sg-[0-9a-z]{17}", security_group.id)
assert security_group.region == AWS_REGION
assert eni_id in security_group.network_interfaces
assert security_group.ingress_rules == []
assert security_group.egress_rules == [
{
"IpProtocol": "-1",
"IpRanges": [{"CidrIp": "0.0.0.0/0"}],
"Ipv6Ranges": [],
"PrefixListIds": [],
"UserIdGroupPairs": [],
}
]
# Test EC2 Describe Images
@mock_ec2
def test__describe_images__(self):
# Generate EC2 Client
ec2_client = client("ec2", region_name=AWS_REGION)
ec2_resource = resource("ec2", region_name=AWS_REGION)
# Create AMI
tag_specifications = [
{
"ResourceType": "image",
"Tags": [
{
"Key": "Base_AMI_Name",
"Value": "Deep Learning Base AMI (Amazon Linux 2) Version 31.0",
},
{"Key": "OS_Version", "Value": "AWS Linux 2"},
],
},
]
instance = ec2_resource.create_instances(
ImageId=EXAMPLE_AMI_ID, MinCount=1, MaxCount=1
)[0]
image_id = ec2_client.create_image(
InstanceId=instance.instance_id,
Name="test-image",
Description="test ami",
TagSpecifications=tag_specifications,
)["ImageId"]
# EC2 client for this test class
audit_info = self.set_mocked_audit_info()
ec2 = EC2(audit_info)
assert len(ec2.images) == 1
assert ec2.images[0].id == image_id
assert re.match(r"ami-[0-9a-z]{8}", ec2.images[0].id)
assert (
ec2.images[0].arn
== f"arn:{audit_info.audited_partition}:ec2:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:image/{ec2.images[0].id}"
)
assert not ec2.images[0].public
assert ec2.images[0].region == AWS_REGION
# Test EC2 Describe Volumes
@mock_ec2
def test__describe_volumes__(self):
# Generate EC2 Client
ec2_client = client("ec2", region_name=AWS_REGION)
# Create Volume
volume_id = ec2_client.create_volume(
AvailabilityZone=AWS_REGION,
Encrypted=False,
Size=40,
TagSpecifications=[],
)["VolumeId"]
# EC2 client for this test class
audit_info = self.set_mocked_audit_info()
ec2 = EC2(audit_info)
assert len(ec2.volumes) == 1
assert ec2.volumes[0].id == volume_id
assert re.match(r"vol-[0-9a-z]{8}", ec2.volumes[0].id)
assert (
ec2.volumes[0].arn
== f"arn:{audit_info.audited_partition}:ec2:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:volume/{ec2.volumes[0].id}"
)
assert ec2.volumes[0].region == AWS_REGION
assert not ec2.volumes[0].encrypted