fix(iam): Handle ListRoleTags and policy errors (#2319)

This commit is contained in:
Pepe Fagoaga
2023-05-08 14:42:23 +02:00
committed by GitHub
parent 784aaa98c9
commit 5204acb5d0
3 changed files with 44 additions and 24 deletions

View File

@@ -18,19 +18,19 @@ class iam_policy_no_full_access_to_cloudtrail(Check):
report.status = "PASS"
report.status_extended = f"Custom Policy {policy.name} does not allow '{critical_service}:*' privileges"
if policy.document:
# Check the statements, if one includes critical_service:* stop iterating over the rest
if type(policy.document.get("Statement")) != list:
policy_statements = [policy.document.get("Statement")]
if type(policy.document["Statement"]) != list:
policy_statements = [policy.document["Statement"]]
else:
policy_statements = policy.document.get("Statement")
policy_statements = policy.document["Statement"]
# Check the statements, if one includes kms:* stop iterating over the rest
for statement in policy_statements:
# Check policies with "Effect": "Allow" with "Action": "*" over "Resource": "*".
if (
statement.get("Effect") == "Allow"
and critical_service + ":*" in statement.get("Action")
statement["Effect"] == "Allow"
and "Action" in statement
and critical_service + ":*" in statement["Action"]
and (
statement.get("Resource") == "*"
or statement.get("Resource") == ["*"]
statement["Resource"] == "*"
or statement["Resource"] == ["*"]
)
):
report.status = "FAIL"

View File

@@ -18,23 +18,24 @@ class iam_policy_no_full_access_to_kms(Check):
report.status = "PASS"
report.status_extended = f"Custom Policy {policy.name} does not allow '{critical_service}:*' privileges"
if policy.document:
# Check the statements, if one includes critical_service:* stop iterating over the rest
if type(policy.document.get("Statement")) != list:
policy_statements = [policy.document.get("Statement")]
if type(policy.document["Statement"]) != list:
policy_statements = [policy.document["Statement"]]
else:
policy_statements = policy.document.get("Statement")
policy_statements = policy.document["Statement"]
# Check the statements, if one includes kms:* stop iterating over the rest
for statement in policy_statements:
# Check policies with "Effect": "Allow" with "Action": "*" over "Resource": "*".
if (
statement.get("Effect") == "Allow"
and critical_service + ":*" in statement.get("Action")
statement["Effect"] == "Allow"
and "Action" in statement
and critical_service + ":*" in statement["Action"]
and (
statement.get("Resource") == "*"
or statement.get("Resource") == ["*"]
statement["Resource"] == "*"
or statement["Resource"] == ["*"]
)
):
report.status = "FAIL"
report.status_extended = f"Custom Policy {policy.name} allows '{critical_service}:*' privileges"
break
findings.append(report)
return findings

View File

@@ -498,24 +498,43 @@ class IAM:
logger.info("IAM - List Tags...")
try:
for role in self.roles:
response = self.client.list_role_tags(RoleName=role.name)["Tags"]
role.tags = response
try:
response = self.client.list_role_tags(RoleName=role.name)["Tags"]
role.tags = response
except ClientError as error:
if error.response["Error"]["Code"] == "NoSuchEntityException":
role.tags = []
except Exception as error:
logger.error(
f"{self.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
try:
for user in self.users:
response = self.client.list_user_tags(UserName=user.name)["Tags"]
user.tags = response
try:
response = self.client.list_user_tags(UserName=user.name)["Tags"]
user.tags = response
except ClientError as error:
if error.response["Error"]["Code"] == "NoSuchEntityException":
user.tags = []
except Exception as error:
logger.error(
f"{self.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
try:
for policy in self.policies:
response = self.client.list_policy_tags(PolicyArn=policy.arn)["Tags"]
policy.tags = response
try:
response = self.client.list_policy_tags(PolicyArn=policy.arn)[
"Tags"
]
policy.tags = response
except ClientError as error:
if error.response["Error"]["Code"] == "NoSuchEntityException":
policy.tags = []
except Exception as error:
logger.error(
f"{self.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"