fix(service errors): solve errors in IAM, S3, Lambda, DS, Cloudfront services (#1882)

Co-authored-by: Pepe Fagoaga <pepe@verica.io>
This commit is contained in:
Sergio Garcia
2023-02-13 10:35:04 +01:00
committed by GitHub
parent 29790b8a5c
commit ab8942d05a
7 changed files with 85 additions and 76 deletions

View File

@@ -69,7 +69,9 @@ class Lambda:
if "Runtime" in function:
self.functions[lambda_name].runtime = function["Runtime"]
if "Environment" in function:
lambda_environment = function["Environment"]["Variables"]
lambda_environment = function["Environment"].get(
"Variables"
)
self.functions[lambda_name].environment = lambda_environment
except Exception as error:

View File

@@ -20,8 +20,8 @@ class CloudFront:
if global_client:
self.client = list(global_client.values())[0]
self.region = self.client.region
self.distributions = self.__list_distributions__(self.client, self.region)
self.distributions = self.__get_distribution_config__(
self.__list_distributions__(self.client, self.region)
self.__get_distribution_config__(
self.client, self.distributions, self.region
)
@@ -30,7 +30,6 @@ class CloudFront:
def __list_distributions__(self, client, region) -> dict:
logger.info("CloudFront - Listing Distributions...")
distributions = {}
try:
list_ditributions_paginator = client.get_paginator("list_distributions")
for page in list_ditributions_paginator.paginate():
@@ -48,9 +47,7 @@ class CloudFront:
origins=origins,
region=region,
)
distributions[distribution_id] = distribution
return distributions
self.distributions[distribution_id] = distribution
except Exception as error:
logger.error(
@@ -99,8 +96,6 @@ class CloudFront:
logger.error(
f"{region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
finally:
return distributions
class OriginsSSLProtocols(Enum):

View File

@@ -19,8 +19,14 @@ class directoryservice_ldap_certificate_expiration(Check):
report.resource_id = certificate.id
remaining_days_to_expire = (
certificate.expiry_date_time - datetime.today()
certificate.expiry_date_time
- datetime.now(
certificate.expiry_date_time.tz_info
if hasattr(certificate.expiry_date_time, "tz_info")
else None
)
).days
if remaining_days_to_expire <= DAYS_TO_EXPIRE_THRESHOLD:
report.status = "FAIL"
report.status_extended = f"LDAP Certificate {certificate.id} configured at {directory.id} is about to expire in {remaining_days_to_expire} days"

View File

@@ -12,35 +12,36 @@ class iam_no_custom_policy_permissive_role_assumption(Check):
report.resource_id = policy["PolicyName"]
report.status = "PASS"
report.status_extended = f"Custom Policy {policy['PolicyName']} does not allow permissive STS Role assumption"
if type(policy["PolicyDocument"]["Statement"]) != list:
policy_statements = [policy["PolicyDocument"]["Statement"]]
else:
policy_statements = policy["PolicyDocument"]["Statement"]
for statement in policy_statements:
if (
statement["Effect"] == "Allow"
and "Action" in statement
and "*" in statement["Resource"]
):
if type(statement["Action"]) == list:
for action in statement["Action"]:
if policy.get("PolicyDocument"):
if type(policy["PolicyDocument"]["Statement"]) != list:
policy_statements = [policy["PolicyDocument"]["Statement"]]
else:
policy_statements = policy["PolicyDocument"]["Statement"]
for statement in policy_statements:
if (
statement["Effect"] == "Allow"
and "Action" in statement
and "*" in statement["Resource"]
):
if type(statement["Action"]) == list:
for action in statement["Action"]:
if (
action == "sts:AssumeRole"
or action == "sts:*"
or action == "*"
):
report.status = "FAIL"
report.status_extended = f"Custom Policy {policy['PolicyName']} allows permissive STS Role assumption"
break
else:
if (
action == "sts:AssumeRole"
or action == "sts:*"
or action == "*"
statement["Action"] == "sts:AssumeRole"
or statement["Action"] == "sts:*"
or statement["Action"] == "*"
):
report.status = "FAIL"
report.status_extended = f"Custom Policy {policy['PolicyName']} allows permissive STS Role assumption"
break
else:
if (
statement["Action"] == "sts:AssumeRole"
or statement["Action"] == "sts:*"
or statement["Action"] == "*"
):
report.status = "FAIL"
report.status_extended = f"Custom Policy {policy['PolicyName']} allows permissive STS Role assumption"
break
break
findings.append(report)

View File

@@ -72,32 +72,33 @@ class iam_policy_allows_privilege_escalation(Check):
denied_not_actions = set()
# Recover all policy actions
if type(policy["PolicyDocument"]["Statement"]) != list:
policy_statements = [policy["PolicyDocument"]["Statement"]]
else:
policy_statements = policy["PolicyDocument"]["Statement"]
for statements in policy_statements:
# Recover allowed actions
if statements["Effect"] == "Allow":
if "Action" in statements:
if type(statements["Action"]) is str:
allowed_actions = {statements["Action"]}
if type(statements["Action"]) is list:
allowed_actions = set(statements["Action"])
if policy.get("PolicyDocument"):
if type(policy["PolicyDocument"]["Statement"]) != list:
policy_statements = [policy["PolicyDocument"]["Statement"]]
else:
policy_statements = policy["PolicyDocument"]["Statement"]
for statements in policy_statements:
# Recover allowed actions
if statements["Effect"] == "Allow":
if "Action" in statements:
if type(statements["Action"]) is str:
allowed_actions = {statements["Action"]}
if type(statements["Action"]) is list:
allowed_actions = set(statements["Action"])
# Recover denied actions
if statements["Effect"] == "Deny":
if "Action" in statements:
if type(statements["Action"]) is str:
denied_actions = {statements["Action"]}
if type(statements["Action"]) is list:
denied_actions = set(statements["Action"])
# Recover denied actions
if statements["Effect"] == "Deny":
if "Action" in statements:
if type(statements["Action"]) is str:
denied_actions = {statements["Action"]}
if type(statements["Action"]) is list:
denied_actions = set(statements["Action"])
if "NotAction" in statements:
if type(statements["NotAction"]) is str:
denied_not_actions = {statements["NotAction"]}
if type(statements["NotAction"]) is list:
denied_not_actions = set(statements["NotAction"])
if "NotAction" in statements:
if type(statements["NotAction"]) is str:
denied_not_actions = {statements["NotAction"]}
if type(statements["NotAction"]) is list:
denied_not_actions = set(statements["NotAction"])
# First, we need to perform a left join with ALLOWED_ACTIONS and DENIED_ACTIONS
left_actions = allowed_actions.difference(denied_actions)

View File

@@ -12,21 +12,25 @@ class iam_policy_no_administrative_privileges(Check):
report.resource_id = policy["PolicyName"]
report.status = "PASS"
report.status_extended = f"Policy {policy['PolicyName']} does not allow '*:*' administrative privileges"
# Check the statements, if one includes *:* stop iterating over the rest
if type(policy["PolicyDocument"]["Statement"]) != list:
policy_statements = [policy["PolicyDocument"]["Statement"]]
else:
policy_statements = policy["PolicyDocument"]["Statement"]
for statement in policy_statements:
# Check policies with "Effect": "Allow" with "Action": "*" over "Resource": "*".
if (
statement["Effect"] == "Allow"
and "Action" in statement
and (statement["Action"] == "*" or statement["Action"] == ["*"])
and (statement["Resource"] == "*" or statement["Resource"] == ["*"])
):
report.status = "FAIL"
report.status_extended = f"Policy {policy['PolicyName']} allows '*:*' administrative privileges"
break
if policy.get("PolicyDocument"):
# Check the statements, if one includes *:* stop iterating over the rest
if type(policy["PolicyDocument"]["Statement"]) != list:
policy_statements = [policy["PolicyDocument"]["Statement"]]
else:
policy_statements = policy["PolicyDocument"]["Statement"]
for statement in policy_statements:
# Check policies with "Effect": "Allow" with "Action": "*" over "Resource": "*".
if (
statement["Effect"] == "Allow"
and "Action" in statement
and (statement["Action"] == "*" or statement["Action"] == ["*"])
and (
statement["Resource"] == "*"
or statement["Resource"] == ["*"]
)
):
report.status = "FAIL"
report.status_extended = f"Policy {policy['PolicyName']} allows '*:*' administrative privileges"
break
findings.append(report)
return findings

View File

@@ -67,7 +67,7 @@ class S3:
buckets.append(Bucket(bucket["Name"], arn, bucket_region))
except Exception as error:
logger.error(
f"{bucket_region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
f"{bucket} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
except Exception as error:
logger.error(