Files
prowler/tests/providers/aws/services/ec2/ec2_service_test.py
Sergio Garcia 3ac4dc8392 feat(scanner): Tag-based scan (#1751)
Co-authored-by: Toni de la Fuente <toni@blyx.com>
Co-authored-by: Pepe Fagoaga <pepe@verica.io>
2023-01-31 12:19:29 +01:00

412 lines
15 KiB
Python

import ipaddress
import re
from base64 import b64decode
from datetime import datetime
from boto3 import client, resource, session
from dateutil.tz import tzutc
from freezegun import freeze_time
from moto import mock_ec2
from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info
from prowler.providers.aws.services.ec2.ec2_service import EC2
AWS_ACCOUNT_NUMBER = 123456789012
AWS_REGION = "us-east-1"
EXAMPLE_AMI_ID = "ami-12c6146b"
MOCK_DATETIME = datetime(2023, 1, 4, 7, 27, 30, tzinfo=tzutc())
class Test_EC2_Service:
# Mocked Audit Info
def set_mocked_audit_info(self):
audit_info = AWS_Audit_Info(
original_session=None,
audit_session=session.Session(
profile_name=None,
botocore_session=None,
),
audited_account=AWS_ACCOUNT_NUMBER,
audited_user_id=None,
audited_partition="aws",
audited_identity_arn=None,
profile=None,
profile_region=None,
credentials=None,
assumed_role_info=None,
audited_regions=["eu-west-1", "us-east-1"],
organizations_metadata=None,
audit_resources=None,
)
return audit_info
# Test EC2 Service
@mock_ec2
def test_service(self):
# EC2 client for this test class
audit_info = self.set_mocked_audit_info()
ec2 = EC2(audit_info)
assert ec2.service == "ec2"
# Test EC2 Client
@mock_ec2
def test_client(self):
# EC2 client for this test class
audit_info = self.set_mocked_audit_info()
ec2 = EC2(audit_info)
for regional_client in ec2.regional_clients.values():
assert regional_client.__class__.__name__ == "EC2"
# Test EC2 Session
@mock_ec2
def test__get_session__(self):
# EC2 client for this test class
audit_info = self.set_mocked_audit_info()
ec2 = EC2(audit_info)
assert ec2.session.__class__.__name__ == "Session"
# Test EC2 Session
@mock_ec2
def test_audited_account(self):
# EC2 client for this test class
audit_info = self.set_mocked_audit_info()
ec2 = EC2(audit_info)
assert ec2.audited_account == AWS_ACCOUNT_NUMBER
# Test EC2 Describe Instances
@mock_ec2
@freeze_time(MOCK_DATETIME)
def test__describe_instances__(self):
# Generate EC2 Client
ec2_resource = resource("ec2", region_name=AWS_REGION)
ec2_client = client("ec2", region_name=AWS_REGION)
# Get AMI image
image_response = ec2_client.describe_images()
image_id = image_response["Images"][0]["ImageId"]
# Create EC2 Instances running
ec2_resource.create_instances(
MinCount=1,
MaxCount=1,
ImageId=image_id,
)
# EC2 client for this test class
audit_info = self.set_mocked_audit_info()
ec2 = EC2(audit_info)
assert len(ec2.instances) == 1
assert re.match(r"i-[0-9a-z]{17}", ec2.instances[0].id)
assert (
ec2.instances[0].arn
== f"arn:{audit_info.audited_partition}:ec2:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:instance/{ec2.instances[0].id}"
)
assert ec2.instances[0].type == "m1.small"
assert ec2.instances[0].state == "running"
assert re.match(r"ami-[0-9a-z]{8}", ec2.instances[0].image_id)
assert ec2.instances[0].launch_time == MOCK_DATETIME
assert not ec2.instances[0].user_data
assert not ec2.instances[0].http_tokens
assert not ec2.instances[0].http_endpoint
assert not ec2.instances[0].instance_profile
assert ipaddress.ip_address(ec2.instances[0].private_ip).is_private
assert (
ec2.instances[0].private_dns
== f"ip-{ec2.instances[0].private_ip.replace('.', '-')}.ec2.internal"
)
assert ipaddress.ip_address(ec2.instances[0].public_ip).is_global
assert (
ec2.instances[0].public_dns
== f"ec2-{ec2.instances[0].public_ip.replace('.', '-')}.compute-1.amazonaws.com"
)
# Test EC2 Describe Security Groups
@mock_ec2
def test__describe_security_groups__(self):
# Generate EC2 Client
ec2_client = client("ec2", region_name=AWS_REGION)
# Create EC2 Security Group
sg_id = ec2_client.create_security_group(
Description="test-description",
GroupName="test-security-group",
)["GroupId"]
# EC2 client for this test class
audit_info = self.set_mocked_audit_info()
ec2 = EC2(audit_info)
assert sg_id in str(ec2.security_groups)
for security_group in ec2.security_groups:
if security_group.id == sg_id:
assert security_group.name == "test-security-group"
assert (
security_group.arn
== f"arn:{audit_info.audited_partition}:ec2:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:security-group/{security_group.id}"
)
assert re.match(r"sg-[0-9a-z]{17}", security_group.id)
assert security_group.region == AWS_REGION
assert security_group.network_interfaces == []
assert security_group.ingress_rules == []
assert security_group.egress_rules == [
{
"IpProtocol": "-1",
"IpRanges": [{"CidrIp": "0.0.0.0/0"}],
"Ipv6Ranges": [],
"PrefixListIds": [],
"UserIdGroupPairs": [],
}
]
# Test EC2 Describe Nacls
@mock_ec2
def test__describe_network_acls__(self):
# Generate EC2 Client
ec2_client = client("ec2", region_name=AWS_REGION)
ec2_resource = resource("ec2", region_name=AWS_REGION)
# Create EC2 VPC and SG
vpc_id = ec2_client.create_vpc(CidrBlock="10.0.0.0/16")["Vpc"]["VpcId"]
nacl_id = ec2_resource.create_network_acl(
VpcId=vpc_id,
).id
# EC2 client for this test class
audit_info = self.set_mocked_audit_info()
ec2 = EC2(audit_info)
assert nacl_id in str(ec2.network_acls)
for acl in ec2.network_acls:
if acl.id == nacl_id:
assert re.match(r"acl-[0-9a-z]{8}", acl.id)
assert (
acl.arn
== f"arn:{audit_info.audited_partition}:ec2:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:network-acl/{acl.id}"
)
assert acl.entries == []
# Test EC2 Describe Snapshots
@mock_ec2
def test__describe_snapshots__(self):
# Generate EC2 Client
ec2_client = client("ec2", region_name=AWS_REGION)
ec2_resource = resource("ec2", region_name=AWS_REGION)
# Create EC2 Volume and Snapshot
volume_id = ec2_resource.create_volume(
AvailabilityZone="us-east-1a",
Size=80,
VolumeType="gp2",
).id
snapshot_id = ec2_client.create_snapshot(
VolumeId=volume_id,
)["SnapshotId"]
# EC2 client for this test class
audit_info = self.set_mocked_audit_info()
ec2 = EC2(audit_info)
assert snapshot_id in str(ec2.snapshots)
for snapshot in ec2.snapshots:
if snapshot.id == snapshot_id:
assert re.match(r"snap-[0-9a-z]{8}", snapshot.id)
assert (
snapshot.arn
== f"arn:{audit_info.audited_partition}:ec2:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:snapshot/{snapshot.id}"
)
assert snapshot.region == AWS_REGION
assert not snapshot.encrypted
assert not snapshot.public
# Test EC2 Get Snapshot Public
@mock_ec2
def test__get_snapshot_public__(self):
# Generate EC2 Client
ec2_client = client("ec2", region_name=AWS_REGION)
ec2_resource = resource("ec2", region_name=AWS_REGION)
# Create EC2 Volume and Snapshot
volume_id = ec2_resource.create_volume(
AvailabilityZone="us-east-1a",
Size=80,
VolumeType="gp2",
).id
snapshot_id = ec2_client.create_snapshot(
VolumeId=volume_id,
)["SnapshotId"]
ec2_client.modify_snapshot_attribute(
Attribute="createVolumePermission",
GroupNames=[
"all",
],
OperationType="add",
SnapshotId=snapshot_id,
)
# EC2 client for this test class
audit_info = self.set_mocked_audit_info()
ec2 = EC2(audit_info)
assert snapshot_id in str(ec2.snapshots)
for snapshot in ec2.snapshots:
if snapshot.id == snapshot_id:
assert re.match(r"snap-[0-9a-z]{8}", snapshot.id)
assert (
snapshot.arn
== f"arn:{audit_info.audited_partition}:ec2:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:snapshot/{snapshot.id}"
)
assert snapshot.region == AWS_REGION
assert not snapshot.encrypted
assert snapshot.public
# Test EC2 Instance User Data
@mock_ec2
def test__get_instance_user_data__(self):
user_data = "This is some user_data"
ec2 = resource("ec2", region_name=AWS_REGION)
ec2.create_instances(
ImageId=EXAMPLE_AMI_ID,
MinCount=1,
MaxCount=1,
UserData="This is some user_data",
)
# EC2 client for this test class
audit_info = self.set_mocked_audit_info()
ec2 = EC2(audit_info)
assert user_data == b64decode(ec2.instances[0].user_data).decode("utf-8")
# Test EC2 Get EBS Encryption by default
@mock_ec2
def test__get_ebs_encryption_by_default__(self):
ec2_client = client("ec2", region_name=AWS_REGION)
ec2_client.enable_ebs_encryption_by_default()
# EC2 client for this test class
audit_info = self.set_mocked_audit_info()
ec2 = EC2(audit_info)
# One result per region
assert len(ec2.ebs_encryption_by_default) == 2
for result in ec2.ebs_encryption_by_default:
if result.region == AWS_REGION:
assert result.status
# Test EC2 Describe Addresses
@mock_ec2
def test__describe_addresses__(self):
# Generate EC2 Client
ec2_client = client("ec2", region_name=AWS_REGION)
allocation_id = ec2_client.allocate_address(
Domain="vpc", Address="127.38.43.222"
)["AllocationId"]
# EC2 client for this test class
audit_info = self.set_mocked_audit_info()
ec2 = EC2(audit_info)
assert "127.38.43.222" in str(ec2.elastic_ips)
assert (
ec2.elastic_ips[0].arn
== f"arn:aws:ec2:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:eip-allocation/{allocation_id}"
)
# Test EC2 Describe Network Interfaces
@mock_ec2
def test__describe_network_interfaces__(self):
# Generate EC2 Client
ec2_client = client("ec2", region_name=AWS_REGION)
ec2_resource = resource("ec2", region_name=AWS_REGION)
# Create VPC, Subnet, SecurityGroup and Network Interface
vpc = ec2_resource.create_vpc(CidrBlock="10.0.0.0/16")
subnet = ec2_resource.create_subnet(VpcId=vpc.id, CidrBlock="10.0.0.0/18")
sg = ec2_resource.create_security_group(
GroupName="test-securitygroup", Description="n/a"
)
eni_id = subnet.create_network_interface(Groups=[sg.id]).id
print(eni_id)
ec2_client.modify_network_interface_attribute(
NetworkInterfaceId=eni_id, Groups=[sg.id]
)
# EC2 client for this test class
audit_info = self.set_mocked_audit_info()
ec2 = EC2(audit_info)
assert sg.id in str(ec2.security_groups)
for security_group in ec2.security_groups:
if security_group.id == sg.id:
assert security_group.name == "test-securitygroup"
assert (
security_group.arn
== f"arn:{audit_info.audited_partition}:ec2:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:security-group/{security_group.id}"
)
assert re.match(r"sg-[0-9a-z]{17}", security_group.id)
assert security_group.region == AWS_REGION
assert eni_id in security_group.network_interfaces
assert security_group.ingress_rules == []
assert security_group.egress_rules == [
{
"IpProtocol": "-1",
"IpRanges": [{"CidrIp": "0.0.0.0/0"}],
"Ipv6Ranges": [],
"PrefixListIds": [],
"UserIdGroupPairs": [],
}
]
# Test EC2 Describe Images
@mock_ec2
def test__describe_images__(self):
# Generate EC2 Client
ec2_client = client("ec2", region_name=AWS_REGION)
ec2_resource = resource("ec2", region_name=AWS_REGION)
# Create AMI
tag_specifications = [
{
"ResourceType": "image",
"Tags": [
{
"Key": "Base_AMI_Name",
"Value": "Deep Learning Base AMI (Amazon Linux 2) Version 31.0",
},
{"Key": "OS_Version", "Value": "AWS Linux 2"},
],
},
]
instance = ec2_resource.create_instances(
ImageId=EXAMPLE_AMI_ID, MinCount=1, MaxCount=1
)[0]
image_id = ec2_client.create_image(
InstanceId=instance.instance_id,
Name="test-image",
Description="test ami",
TagSpecifications=tag_specifications,
)["ImageId"]
# EC2 client for this test class
audit_info = self.set_mocked_audit_info()
ec2 = EC2(audit_info)
assert len(ec2.images) == 1
assert ec2.images[0].id == image_id
assert re.match(r"ami-[0-9a-z]{8}", ec2.images[0].id)
assert (
ec2.images[0].arn
== f"arn:{audit_info.audited_partition}:ec2:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:image/{ec2.images[0].id}"
)
assert not ec2.images[0].public
assert ec2.images[0].region == AWS_REGION
# Test EC2 Describe Volumes
@mock_ec2
def test__describe_volumes__(self):
# Generate EC2 Client
ec2_client = client("ec2", region_name=AWS_REGION)
# Create Volume
volume_id = ec2_client.create_volume(
AvailabilityZone=AWS_REGION,
Encrypted=False,
Size=40,
TagSpecifications=[],
)["VolumeId"]
# EC2 client for this test class
audit_info = self.set_mocked_audit_info()
ec2 = EC2(audit_info)
assert len(ec2.volumes) == 1
assert ec2.volumes[0].id == volume_id
assert re.match(r"vol-[0-9a-z]{8}", ec2.volumes[0].id)
assert (
ec2.volumes[0].arn
== f"arn:{audit_info.audited_partition}:ec2:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:volume/{ec2.volumes[0].id}"
)
assert ec2.volumes[0].region == AWS_REGION
assert not ec2.volumes[0].encrypted