fix(ec2): avoid terminated instances (#2063)

This commit is contained in:
Sergio Garcia
2023-03-10 08:11:35 +01:00
committed by GitHub
parent b05f67db19
commit cf28b814cb
6 changed files with 108 additions and 102 deletions

View File

@@ -6,24 +6,25 @@ class ec2_instance_imdsv2_enabled(Check):
def execute(self):
findings = []
for instance in ec2_client.instances:
report = Check_Report_AWS(self.metadata())
report.region = instance.region
report.resource_id = instance.id
report.resource_arn = instance.arn
report.resource_tags = instance.tags
report.status = "FAIL"
report.status_extended = (
f"EC2 Instance {instance.id} has IMDSv2 disabled or not required."
)
if (
instance.http_endpoint == "enabled"
and instance.http_tokens == "required"
):
report.status = "PASS"
if instance.state != "terminated":
report = Check_Report_AWS(self.metadata())
report.region = instance.region
report.resource_id = instance.id
report.resource_arn = instance.arn
report.resource_tags = instance.tags
report.status = "FAIL"
report.status_extended = (
f"EC2 Instance {instance.id} has IMDSv2 enabled and required."
f"EC2 Instance {instance.id} has IMDSv2 disabled or not required."
)
if (
instance.http_endpoint == "enabled"
and instance.http_tokens == "required"
):
report.status = "PASS"
report.status_extended = (
f"EC2 Instance {instance.id} has IMDSv2 enabled and required."
)
findings.append(report)
findings.append(report)
return findings

View File

@@ -6,17 +6,18 @@ class ec2_instance_internet_facing_with_instance_profile(Check):
def execute(self):
findings = []
for instance in ec2_client.instances:
report = Check_Report_AWS(self.metadata())
report.region = instance.region
report.resource_id = instance.id
report.resource_arn = instance.arn
report.resource_tags = instance.tags
report.status = "PASS"
report.status_extended = f"EC2 Instance {instance.id} is not internet facing with an instance profile."
if instance.public_ip and instance.instance_profile:
report.status = "FAIL"
report.status_extended = f"EC2 Instance {instance.id} at IP {instance.public_ip} is internet-facing with Instance Profile {instance.instance_profile['Arn']}."
if instance.state != "terminated":
report = Check_Report_AWS(self.metadata())
report.region = instance.region
report.resource_id = instance.id
report.resource_arn = instance.arn
report.resource_tags = instance.tags
report.status = "PASS"
report.status_extended = f"EC2 Instance {instance.id} is not internet facing with an instance profile."
if instance.public_ip and instance.instance_profile:
report.status = "FAIL"
report.status_extended = f"EC2 Instance {instance.id} at IP {instance.public_ip} is internet-facing with Instance Profile {instance.instance_profile['Arn']}."
findings.append(report)
findings.append(report)
return findings

View File

@@ -7,22 +7,23 @@ class ec2_instance_managed_by_ssm(Check):
def execute(self):
findings = []
for instance in ec2_client.instances:
report = Check_Report_AWS(self.metadata())
report.region = instance.region
report.resource_arn = instance.arn
report.resource_tags = instance.tags
if not ssm_client.managed_instances.get(instance.id):
report.status = "FAIL"
report.status_extended = (
f"EC2 Instance {instance.id} is not managed by Systems Manager."
)
report.resource_id = instance.id
else:
report.status = "PASS"
report.status_extended = (
f"EC2 Instance {instance.id} is managed by Systems Manager."
)
report.resource_id = instance.id
findings.append(report)
if instance.state != "terminated":
report = Check_Report_AWS(self.metadata())
report.region = instance.region
report.resource_arn = instance.arn
report.resource_tags = instance.tags
if not ssm_client.managed_instances.get(instance.id):
report.status = "FAIL"
report.status_extended = (
f"EC2 Instance {instance.id} is not managed by Systems Manager."
)
report.resource_id = instance.id
else:
report.status = "PASS"
report.status_extended = (
f"EC2 Instance {instance.id} is managed by Systems Manager."
)
report.resource_id = instance.id
findings.append(report)
return findings

View File

@@ -6,17 +6,18 @@ class ec2_instance_profile_attached(Check):
def execute(self):
findings = []
for instance in ec2_client.instances:
report = Check_Report_AWS(self.metadata())
report.region = instance.region
report.resource_id = instance.id
report.resource_arn = instance.arn
report.resource_tags = instance.tags
report.status = "FAIL"
report.status_extended = f"EC2 Instance {instance.id} not associated with an Instance Profile Role."
if instance.instance_profile:
report.status = "PASS"
report.status_extended = f"EC2 Instance {instance.id} associated with Instance Profile Role {instance.instance_profile['Arn']}."
if instance.state != "terminated":
report = Check_Report_AWS(self.metadata())
report.region = instance.region
report.resource_id = instance.id
report.resource_arn = instance.arn
report.resource_tags = instance.tags
report.status = "FAIL"
report.status_extended = f"EC2 Instance {instance.id} not associated with an Instance Profile Role."
if instance.instance_profile:
report.status = "PASS"
report.status_extended = f"EC2 Instance {instance.id} associated with Instance Profile Role {instance.instance_profile['Arn']}."
findings.append(report)
findings.append(report)
return findings

View File

@@ -6,20 +6,21 @@ class ec2_instance_public_ip(Check):
def execute(self):
findings = []
for instance in ec2_client.instances:
report = Check_Report_AWS(self.metadata())
report.region = instance.region
report.resource_arn = instance.arn
report.resource_tags = instance.tags
if instance.public_ip:
report.status = "FAIL"
report.status_extended = f"EC2 Instance {instance.id} has a Public IP: {instance.public_ip} ({instance.public_dns})."
report.resource_id = instance.id
else:
report.status = "PASS"
report.status_extended = (
f"EC2 Instance {instance.id} has not a Public IP."
)
report.resource_id = instance.id
findings.append(report)
if instance.state != "terminated":
report = Check_Report_AWS(self.metadata())
report.region = instance.region
report.resource_arn = instance.arn
report.resource_tags = instance.tags
if instance.public_ip:
report.status = "FAIL"
report.status_extended = f"EC2 Instance {instance.id} has a Public IP: {instance.public_ip} ({instance.public_dns})."
report.resource_id = instance.id
else:
report.status = "PASS"
report.status_extended = (
f"EC2 Instance {instance.id} has not a Public IP."
)
report.resource_id = instance.id
findings.append(report)
return findings

View File

@@ -14,43 +14,44 @@ class ec2_instance_secrets_user_data(Check):
def execute(self):
findings = []
for instance in ec2_client.instances:
report = Check_Report_AWS(self.metadata())
report.region = instance.region
report.resource_id = instance.id
report.resource_arn = instance.arn
report.resource_tags = instance.tags
if instance.user_data:
temp_user_data_file = tempfile.NamedTemporaryFile(delete=False)
user_data = b64decode(instance.user_data)
if user_data[0:2] == b"\x1f\x8b": # GZIP magic number
user_data = zlib.decompress(user_data, zlib.MAX_WBITS | 32).decode(
"utf-8"
if instance.state != "terminated":
report = Check_Report_AWS(self.metadata())
report.region = instance.region
report.resource_id = instance.id
report.resource_arn = instance.arn
report.resource_tags = instance.tags
if instance.user_data:
temp_user_data_file = tempfile.NamedTemporaryFile(delete=False)
user_data = b64decode(instance.user_data)
if user_data[0:2] == b"\x1f\x8b": # GZIP magic number
user_data = zlib.decompress(
user_data, zlib.MAX_WBITS | 32
).decode("utf-8")
else:
user_data = user_data.decode("utf-8")
temp_user_data_file.write(
bytes(user_data, encoding="raw_unicode_escape")
)
else:
user_data = user_data.decode("utf-8")
temp_user_data_file.close()
secrets = SecretsCollection()
with default_settings():
secrets.scan_file(temp_user_data_file.name)
temp_user_data_file.write(
bytes(user_data, encoding="raw_unicode_escape")
)
temp_user_data_file.close()
secrets = SecretsCollection()
with default_settings():
secrets.scan_file(temp_user_data_file.name)
if secrets.json():
report.status = "FAIL"
report.status_extended = f"Potential secret found in EC2 instance {instance.id} User Data."
else:
report.status = "PASS"
report.status_extended = (
f"No secrets found in EC2 instance {instance.id} User Data."
)
if secrets.json():
report.status = "FAIL"
report.status_extended = f"Potential secret found in EC2 instance {instance.id} User Data."
os.remove(temp_user_data_file.name)
else:
report.status = "PASS"
report.status_extended = (
f"No secrets found in EC2 instance {instance.id} User Data."
)
report.status_extended = f"No secrets found in EC2 instance {instance.id} since User Data is empty."
os.remove(temp_user_data_file.name)
else:
report.status = "PASS"
report.status_extended = f"No secrets found in EC2 instance {instance.id} since User Data is empty."
findings.append(report)
findings.append(report)
return findings