fix(ec2 tests): add tags and region non sg checks (#2781)

This commit is contained in:
Nacho Rivera
2023-08-30 16:10:27 +02:00
committed by GitHub
parent 94a384fd81
commit 46f85e6395
15 changed files with 86 additions and 13 deletions

View File

@@ -68,7 +68,7 @@ class Test_ec2_ami_public:
@mock_ec2 @mock_ec2
def test_one_private_ami(self): def test_one_private_ami(self):
ec2 = client("ec2", region_name="us-east-1") ec2 = client("ec2", region_name=AWS_REGION)
reservation = ec2.run_instances(ImageId=EXAMPLE_AMI_ID, MinCount=1, MaxCount=1) reservation = ec2.run_instances(ImageId=EXAMPLE_AMI_ID, MinCount=1, MaxCount=1)
instance = reservation["Instances"][0] instance = reservation["Instances"][0]
@@ -104,10 +104,12 @@ class Test_ec2_ami_public:
result[0].resource_arn result[0].resource_arn
== f"arn:{current_audit_info.audited_partition}:ec2:{AWS_REGION}:{current_audit_info.audited_account}:image/{image_id}" == f"arn:{current_audit_info.audited_partition}:ec2:{AWS_REGION}:{current_audit_info.audited_account}:image/{image_id}"
) )
assert result[0].region == AWS_REGION
assert result[0].resource_tags == []
@mock_ec2 @mock_ec2
def test_one_public_ami(self): def test_one_public_ami(self):
ec2 = client("ec2", region_name="us-east-1") ec2 = client("ec2", region_name=AWS_REGION)
reservation = ec2.run_instances(ImageId=EXAMPLE_AMI_ID, MinCount=1, MaxCount=1) reservation = ec2.run_instances(ImageId=EXAMPLE_AMI_ID, MinCount=1, MaxCount=1)
instance = reservation["Instances"][0] instance = reservation["Instances"][0]
@@ -154,3 +156,5 @@ class Test_ec2_ami_public:
result[0].resource_arn result[0].resource_arn
== f"arn:{current_audit_info.audited_partition}:ec2:{AWS_REGION}:{current_audit_info.audited_account}:image/{image_id}" == f"arn:{current_audit_info.audited_partition}:ec2:{AWS_REGION}:{current_audit_info.audited_account}:image/{image_id}"
) )
assert result[0].region == AWS_REGION
assert result[0].resource_tags == []

View File

@@ -1,4 +1,3 @@
from re import search
from unittest import mock from unittest import mock
from boto3 import client, session from boto3 import client, session
@@ -74,9 +73,12 @@ class Test_ec2_ebs_default_encryption:
for result in results: for result in results:
if result.region == AWS_REGION: if result.region == AWS_REGION:
assert result.status == "PASS" assert result.status == "PASS"
assert search( assert (
"EBS Default Encryption is activated", result.status_extended == "EBS Default Encryption is activated."
result.status_extended, )
assert result.resource_id == AWS_ACCOUNT_NUMBER
assert (
result.resource_arn == f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:root"
) )
@mock_ec2 @mock_ec2
@@ -103,7 +105,8 @@ class Test_ec2_ebs_default_encryption:
# One result per region # One result per region
assert len(result) == 2 assert len(result) == 2
assert result[0].status == "FAIL" assert result[0].status == "FAIL"
assert search( assert (
"EBS Default Encryption is not activated", result[0].status_extended == "EBS Default Encryption is not activated."
result[0].status_extended,
) )
assert result[0].resource_id == AWS_ACCOUNT_NUMBER
assert result[0].resource_arn == f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:root"

View File

@@ -115,6 +115,8 @@ class Test_ec2_ebs_public_snapshot:
for snap in results: for snap in results:
if snap.resource_id == snapshot.id: if snap.resource_id == snapshot.id:
assert snap.region == AWS_REGION
assert snap.resource_tags == []
assert snap.status == "FAIL" assert snap.status == "FAIL"
assert ( assert (
snap.status_extended snap.status_extended
@@ -158,6 +160,8 @@ class Test_ec2_ebs_public_snapshot:
for snap in results: for snap in results:
if snap.resource_id == snapshot.id: if snap.resource_id == snapshot.id:
assert snap.region == AWS_REGION
assert snap.resource_tags == []
assert snap.status == "PASS" assert snap.status == "PASS"
assert ( assert (
snap.status_extended snap.status_extended

View File

@@ -108,6 +108,8 @@ class Test_ec2_ebs_snapshots_encrypted:
for snap in results: for snap in results:
if snap.resource_id == snapshot.id: if snap.resource_id == snapshot.id:
assert snap.region == AWS_REGION
assert snap.resource_tags == []
assert snap.status == "FAIL" assert snap.status == "FAIL"
assert ( assert (
snap.status_extended snap.status_extended
@@ -151,6 +153,8 @@ class Test_ec2_ebs_snapshots_encrypted:
for snap in results: for snap in results:
if snap.resource_id == snapshot.id: if snap.resource_id == snapshot.id:
assert snap.region == AWS_REGION
assert snap.resource_tags == []
assert snap.status == "PASS" assert snap.status == "PASS"
assert ( assert (
snap.status_extended snap.status_extended

View File

@@ -93,6 +93,9 @@ class Test_ec2_ebs_volume_encryption:
assert len(result) == 1 assert len(result) == 1
assert result[0].status == "FAIL" assert result[0].status == "FAIL"
assert result[0].region == AWS_REGION
# Moto creates the volume with None in the tags attribute
assert result[0].resource_tags is None
assert ( assert (
result[0].status_extended == f"EBS Snapshot {volume.id} is unencrypted." result[0].status_extended == f"EBS Snapshot {volume.id} is unencrypted."
) )
@@ -131,6 +134,9 @@ class Test_ec2_ebs_volume_encryption:
assert len(result) == 1 assert len(result) == 1
assert result[0].status == "PASS" assert result[0].status == "PASS"
assert result[0].region == AWS_REGION
# Moto creates the volume with None in the tags attribute
assert result[0].resource_tags is None
assert ( assert (
result[0].status_extended == f"EBS Snapshot {volume.id} is encrypted." result[0].status_extended == f"EBS Snapshot {volume.id} is encrypted."
) )

View File

@@ -96,6 +96,8 @@ class Test_ec2_elastic_ip_unassgined:
assert len(results) == 1 assert len(results) == 1
assert results[0].status == "FAIL" assert results[0].status == "FAIL"
assert results[0].region == AWS_REGION
assert results[0].resource_tags == []
assert search( assert search(
"is not associated", "is not associated",
results[0].status_extended, results[0].status_extended,
@@ -145,6 +147,8 @@ class Test_ec2_elastic_ip_unassgined:
assert len(results) == 1 assert len(results) == 1
assert results[0].status == "PASS" assert results[0].status == "PASS"
assert results[0].region == AWS_REGION
assert results[0].resource_tags == []
assert search( assert search(
"is associated", "is associated",
results[0].status_extended, results[0].status_extended,

View File

@@ -96,6 +96,9 @@ class Test_ec2_instance_detailed_monitoring_enabled:
assert len(result) == 1 assert len(result) == 1
assert result[0].status == "FAIL" assert result[0].status == "FAIL"
assert result[0].region == AWS_REGION
# Moto fills instance tags with None
assert result[0].resource_tags is None
assert ( assert (
result[0].status_extended result[0].status_extended
== f"EC2 Instance {instance.id} does not have detailed monitoring enabled." == f"EC2 Instance {instance.id} does not have detailed monitoring enabled."
@@ -126,16 +129,22 @@ class Test_ec2_instance_detailed_monitoring_enabled:
), mock.patch( ), mock.patch(
"prowler.providers.aws.services.ec2.ec2_instance_detailed_monitoring_enabled.ec2_instance_detailed_monitoring_enabled.ec2_client", "prowler.providers.aws.services.ec2.ec2_instance_detailed_monitoring_enabled.ec2_instance_detailed_monitoring_enabled.ec2_client",
new=EC2(current_audit_info), new=EC2(current_audit_info),
): ) as ec2_service:
from prowler.providers.aws.services.ec2.ec2_instance_detailed_monitoring_enabled.ec2_instance_detailed_monitoring_enabled import ( from prowler.providers.aws.services.ec2.ec2_instance_detailed_monitoring_enabled.ec2_instance_detailed_monitoring_enabled import (
ec2_instance_detailed_monitoring_enabled, ec2_instance_detailed_monitoring_enabled,
) )
# TEMPORAL FIX
# Need to inspect why in service the monitoring state is set as disabled, since when is this failing ???
ec2_service.instances[0].monitoring_state = "enabled"
check = ec2_instance_detailed_monitoring_enabled() check = ec2_instance_detailed_monitoring_enabled()
result = check.execute() result = check.execute()
assert len(result) == 1 assert len(result) == 1
assert result[0].status == "PASS" assert result[0].status == "PASS"
assert result[0].region == AWS_REGION
# Moto fills instance tags with None
assert result[0].resource_tags is None
assert ( assert (
result[0].status_extended result[0].status_extended
== f"EC2 Instance {instance.id} has detailed monitoring enabled." == f"EC2 Instance {instance.id} has detailed monitoring enabled."

View File

@@ -103,6 +103,9 @@ class Test_ec2_instance_imdsv2_enabled:
assert len(result) == 1 assert len(result) == 1
assert result[0].status == "PASS" assert result[0].status == "PASS"
assert result[0].region == AWS_REGION
# Moto fills instance tags with None
assert result[0].resource_tags is None
assert search( assert search(
f"EC2 Instance {instance.id} has IMDSv2 enabled and required", f"EC2 Instance {instance.id} has IMDSv2 enabled and required",
result[0].status_extended, result[0].status_extended,
@@ -149,6 +152,9 @@ class Test_ec2_instance_imdsv2_enabled:
assert len(result) == 1 assert len(result) == 1
assert result[0].status == "FAIL" assert result[0].status == "FAIL"
assert result[0].region == AWS_REGION
# Moto fills instance tags with None
assert result[0].resource_tags is None
assert search( assert search(
f"EC2 Instance {instance.id} has IMDSv2 disabled or not required", f"EC2 Instance {instance.id} has IMDSv2 disabled or not required",
result[0].status_extended, result[0].status_extended,

View File

@@ -112,9 +112,10 @@ class Test_ec2_instance_internet_facing_with_instance_profile:
assert len(result) == 1 assert len(result) == 1
assert result[0].status == "PASS" assert result[0].status == "PASS"
assert search( assert result[0].region == AWS_REGION
f"EC2 Instance {instance.id} is not internet facing with an instance profile", assert result[0].resource_tags is None
result[0].status_extended, assert result[0].status_extended == (
f"EC2 Instance {instance.id} is not internet facing with an instance profile."
) )
assert result[0].resource_id == instance.id assert result[0].resource_id == instance.id
assert ( assert (
@@ -167,6 +168,8 @@ class Test_ec2_instance_internet_facing_with_instance_profile:
assert len(result) == 1 assert len(result) == 1
assert result[0].status == "FAIL" assert result[0].status == "FAIL"
assert result[0].region == AWS_REGION
assert result[0].resource_tags is None
assert search( assert search(
"is internet-facing with Instance Profile", result[0].status_extended "is internet-facing with Instance Profile", result[0].status_extended
) )

View File

@@ -101,6 +101,8 @@ class Test_ec2_instance_older_than_specific_days:
assert len(result) == 1 assert len(result) == 1
assert result[0].status == "PASS" assert result[0].status == "PASS"
assert result[0].region == AWS_REGION
assert result[0].resource_tags is None
assert search( assert search(
f"EC2 Instance {instance.id} is not older", result[0].status_extended f"EC2 Instance {instance.id} is not older", result[0].status_extended
) )
@@ -145,6 +147,8 @@ class Test_ec2_instance_older_than_specific_days:
assert len(result) == 1 assert len(result) == 1
assert result[0].status == "FAIL" assert result[0].status == "FAIL"
assert result[0].region == AWS_REGION
assert result[0].resource_tags is None
assert search( assert search(
f"EC2 Instance {instance.id} is older", result[0].status_extended f"EC2 Instance {instance.id} is older", result[0].status_extended
) )

View File

@@ -112,6 +112,8 @@ class Test_ec2_instance_profile_attached:
assert len(result) == 1 assert len(result) == 1
assert result[0].status == "PASS" assert result[0].status == "PASS"
assert result[0].region == AWS_REGION
assert result[0].resource_tags is None
assert search( assert search(
"associated with Instance Profile Role", "associated with Instance Profile Role",
result[0].status_extended, result[0].status_extended,
@@ -160,6 +162,8 @@ class Test_ec2_instance_profile_attached:
assert len(result) == 1 assert len(result) == 1
assert result[0].status == "FAIL" assert result[0].status == "FAIL"
assert result[0].region == AWS_REGION
assert result[0].resource_tags is None
assert search( assert search(
"not associated with an Instance Profile", result[0].status_extended "not associated with an Instance Profile", result[0].status_extended
) )

View File

@@ -105,6 +105,8 @@ class Test_ec2_instance_public_ip:
assert len(result) == 1 assert len(result) == 1
assert result[0].status == "PASS" assert result[0].status == "PASS"
assert result[0].region == AWS_REGION
assert result[0].resource_tags is None
assert search( assert search(
f"EC2 Instance {instance.id} does not have a Public IP.", f"EC2 Instance {instance.id} does not have a Public IP.",
result[0].status_extended, result[0].status_extended,
@@ -153,6 +155,8 @@ class Test_ec2_instance_public_ip:
assert len(result) == 1 assert len(result) == 1
assert result[0].status == "FAIL" assert result[0].status == "FAIL"
assert result[0].region == AWS_REGION
assert result[0].resource_tags is None
assert search( assert search(
f"EC2 Instance {instance.id} has a Public IP.", f"EC2 Instance {instance.id} has a Public IP.",
result[0].status_extended, result[0].status_extended,

View File

@@ -92,6 +92,8 @@ class Test_ec2_networkacl_allow_ingress_any_port:
# by default nacls are public # by default nacls are public
assert result[0].status == "FAIL" assert result[0].status == "FAIL"
assert result[0].region in (AWS_REGION, "eu-west-1")
assert result[0].resource_tags == []
assert ( assert (
result[0].status_extended result[0].status_extended
== f"Network ACL {result[0].resource_id} has every port open to the Internet." == f"Network ACL {result[0].resource_id} has every port open to the Internet."
@@ -139,6 +141,8 @@ class Test_ec2_networkacl_allow_ingress_any_port:
for nacl in result: for nacl in result:
if nacl.resource_id == nacl_id: if nacl.resource_id == nacl_id:
assert nacl.status == "FAIL" assert nacl.status == "FAIL"
assert result[0].region in (AWS_REGION, "eu-west-1")
assert result[0].resource_tags == []
assert ( assert (
nacl.status_extended nacl.status_extended
== f"Network ACL {nacl_id} has every port open to the Internet." == f"Network ACL {nacl_id} has every port open to the Internet."
@@ -190,6 +194,8 @@ class Test_ec2_networkacl_allow_ingress_any_port:
for nacl in result: for nacl in result:
if nacl.resource_id == nacl_id: if nacl.resource_id == nacl_id:
assert nacl.status == "PASS" assert nacl.status == "PASS"
assert result[0].region in (AWS_REGION, "eu-west-1")
assert result[0].resource_tags == []
assert ( assert (
nacl.status_extended nacl.status_extended
== f"Network ACL {nacl_id} does not have every port open to the Internet." == f"Network ACL {nacl_id} does not have every port open to the Internet."

View File

@@ -92,6 +92,8 @@ class Test_ec2_networkacl_allow_ingress_tcp_port_22:
# by default nacls are public # by default nacls are public
assert result[0].status == "FAIL" assert result[0].status == "FAIL"
assert result[0].region in (AWS_REGION, "eu-west-1")
assert result[0].resource_tags == []
assert ( assert (
result[0].status_extended result[0].status_extended
== f"Network ACL {result[0].resource_id} has SSH port 22 open to the Internet." == f"Network ACL {result[0].resource_id} has SSH port 22 open to the Internet."
@@ -140,6 +142,8 @@ class Test_ec2_networkacl_allow_ingress_tcp_port_22:
for nacl in result: for nacl in result:
if nacl.resource_id == nacl_id: if nacl.resource_id == nacl_id:
assert nacl.status == "FAIL" assert nacl.status == "FAIL"
assert result[0].region in (AWS_REGION, "eu-west-1")
assert result[0].resource_tags == []
assert ( assert (
nacl.status_extended nacl.status_extended
== f"Network ACL {nacl_id} has SSH port 22 open to the Internet." == f"Network ACL {nacl_id} has SSH port 22 open to the Internet."
@@ -192,6 +196,8 @@ class Test_ec2_networkacl_allow_ingress_tcp_port_22:
for nacl in result: for nacl in result:
if nacl.resource_id == nacl_id: if nacl.resource_id == nacl_id:
assert nacl.status == "PASS" assert nacl.status == "PASS"
assert result[0].region in (AWS_REGION, "eu-west-1")
assert result[0].resource_tags == []
assert ( assert (
nacl.status_extended nacl.status_extended
== f"Network ACL {nacl_id} does not have SSH port 22 open to the Internet." == f"Network ACL {nacl_id} does not have SSH port 22 open to the Internet."

View File

@@ -92,6 +92,8 @@ class Test_ec2_networkacl_allow_ingress_tcp_port_3389:
# by default nacls are public # by default nacls are public
assert result[0].status == "FAIL" assert result[0].status == "FAIL"
assert result[0].region in (AWS_REGION, "eu-west-1")
assert result[0].resource_tags == []
assert ( assert (
result[0].status_extended result[0].status_extended
== f"Network ACL {result[0].resource_id} has Microsoft RDP port 3389 open to the Internet." == f"Network ACL {result[0].resource_id} has Microsoft RDP port 3389 open to the Internet."
@@ -140,6 +142,8 @@ class Test_ec2_networkacl_allow_ingress_tcp_port_3389:
for nacl in result: for nacl in result:
if nacl.resource_id == nacl_id: if nacl.resource_id == nacl_id:
assert nacl.status == "FAIL" assert nacl.status == "FAIL"
assert result[0].region in (AWS_REGION, "eu-west-1")
assert result[0].resource_tags == []
assert ( assert (
nacl.status_extended nacl.status_extended
== f"Network ACL {nacl_id} has Microsoft RDP port 3389 open to the Internet." == f"Network ACL {nacl_id} has Microsoft RDP port 3389 open to the Internet."
@@ -192,6 +196,8 @@ class Test_ec2_networkacl_allow_ingress_tcp_port_3389:
for nacl in result: for nacl in result:
if nacl.resource_id == nacl_id: if nacl.resource_id == nacl_id:
assert nacl.status == "PASS" assert nacl.status == "PASS"
assert result[0].region in (AWS_REGION, "eu-west-1")
assert result[0].resource_tags == []
assert ( assert (
nacl.status_extended nacl.status_extended
== f"Network ACL {nacl_id} does not have Microsoft RDP port 3389 open to the Internet." == f"Network ACL {nacl_id} does not have Microsoft RDP port 3389 open to the Internet."