fix(inventory): handle exception for every call (#2457)

This commit is contained in:
Sergio Garcia
2023-06-07 09:33:10 +02:00
committed by GitHub
parent 37e180827a
commit 3e3e8a14ee

View File

@@ -64,27 +64,32 @@ def quick_inventory(audit_info: AWS_Audit_Info, args):
)
# Get all the resources
resources_count = 0
get_resources_paginator = client.get_paginator("get_resources")
for page in get_resources_paginator.paginate():
resources_count += len(page["ResourceTagMappingList"])
for resource in page["ResourceTagMappingList"]:
# Avoid adding S3 buckets again:
if resource["ResourceARN"].split(":")[2] != "s3":
# Check if region is not in ARN --> Global service
if not resource["ResourceARN"].split(":")[3]:
global_resources.append(
{
"arn": resource["ResourceARN"],
"tags": resource["Tags"],
}
)
else:
resources_in_region.append(
{
"arn": resource["ResourceARN"],
"tags": resource["Tags"],
}
)
try:
get_resources_paginator = client.get_paginator("get_resources")
for page in get_resources_paginator.paginate():
resources_count += len(page["ResourceTagMappingList"])
for resource in page["ResourceTagMappingList"]:
# Avoid adding S3 buckets again:
if resource["ResourceARN"].split(":")[2] != "s3":
# Check if region is not in ARN --> Global service
if not resource["ResourceARN"].split(":")[3]:
global_resources.append(
{
"arn": resource["ResourceARN"],
"tags": resource["Tags"],
}
)
else:
resources_in_region.append(
{
"arn": resource["ResourceARN"],
"tags": resource["Tags"],
}
)
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
bar()
if len(resources_in_region) > 0:
total_resources_per_region[region] = len(resources_in_region)
@@ -294,59 +299,85 @@ def create_output(resources: list, audit_info: AWS_Audit_Info, args):
def get_regional_buckets(audit_info: AWS_Audit_Info, region: str) -> list:
regional_buckets = []
s3_client = audit_info.audit_session.client("s3", region_name=region)
buckets = s3_client.list_buckets()
for bucket in buckets["Buckets"]:
bucket_region = s3_client.get_bucket_location(Bucket=bucket["Name"])[
"LocationConstraint"
]
if bucket_region == "EU": # If EU, bucket_region is eu-west-1
bucket_region = "eu-west-1"
if not bucket_region: # If None, bucket_region is us-east-1
bucket_region = "us-east-1"
if bucket_region == region: # Only add bucket if is in current region
try:
bucket_tags = s3_client.get_bucket_tagging(Bucket=bucket["Name"])[
"TagSet"
]
except ClientError as error:
bucket_tags = []
if error.response["Error"]["Code"] != "NoSuchTagSet":
logger.error(
f"{region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
bucket_arn = (
f"arn:{audit_info.audited_partition}:s3:{region}::{bucket['Name']}"
)
regional_buckets.append({"arn": bucket_arn, "tags": bucket_tags})
try:
buckets = s3_client.list_buckets()
for bucket in buckets["Buckets"]:
bucket_region = s3_client.get_bucket_location(Bucket=bucket["Name"])[
"LocationConstraint"
]
if bucket_region == "EU": # If EU, bucket_region is eu-west-1
bucket_region = "eu-west-1"
if not bucket_region: # If None, bucket_region is us-east-1
bucket_region = "us-east-1"
if bucket_region == region: # Only add bucket if is in current region
try:
bucket_tags = s3_client.get_bucket_tagging(Bucket=bucket["Name"])[
"TagSet"
]
except ClientError as error:
bucket_tags = []
if error.response["Error"]["Code"] != "NoSuchTagSet":
logger.error(
f"{region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
bucket_arn = (
f"arn:{audit_info.audited_partition}:s3:{region}::{bucket['Name']}"
)
regional_buckets.append({"arn": bucket_arn, "tags": bucket_tags})
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return regional_buckets
def get_iam_resources(session) -> list:
iam_resources = []
iam_client = session.client("iam")
get_roles_paginator = iam_client.get_paginator("list_roles")
for page in get_roles_paginator.paginate():
for role in page["Roles"]:
# Avoid aws-service-role roles
if "aws-service-role" not in role["Arn"]:
iam_resources.append({"arn": role["Arn"], "tags": role.get("Tags")})
get_users_paginator = iam_client.get_paginator("list_users")
for page in get_users_paginator.paginate():
for user in page["Users"]:
iam_resources.append({"arn": user["Arn"], "tags": user.get("Tags")})
get_groups_paginator = iam_client.get_paginator("list_groups")
for page in get_groups_paginator.paginate():
for group in page["Groups"]:
iam_resources.append({"arn": group["Arn"], "tags": []})
get_policies_paginator = iam_client.get_paginator("list_policies")
for page in get_policies_paginator.paginate(Scope="Local"):
for policy in page["Policies"]:
iam_resources.append({"arn": policy["Arn"], "tags": policy.get("Tags")})
for saml_provider in iam_client.list_saml_providers()["SAMLProviderList"]:
iam_resources.append({"arn": saml_provider["Arn"], "tags": []})
try:
get_roles_paginator = iam_client.get_paginator("list_roles")
for page in get_roles_paginator.paginate():
for role in page["Roles"]:
# Avoid aws-service-role roles
if "aws-service-role" not in role["Arn"]:
iam_resources.append({"arn": role["Arn"], "tags": role.get("Tags")})
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
try:
get_users_paginator = iam_client.get_paginator("list_users")
for page in get_users_paginator.paginate():
for user in page["Users"]:
iam_resources.append({"arn": user["Arn"], "tags": user.get("Tags")})
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
try:
get_groups_paginator = iam_client.get_paginator("list_groups")
for page in get_groups_paginator.paginate():
for group in page["Groups"]:
iam_resources.append({"arn": group["Arn"], "tags": []})
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
try:
get_policies_paginator = iam_client.get_paginator("list_policies")
for page in get_policies_paginator.paginate(Scope="Local"):
for policy in page["Policies"]:
iam_resources.append({"arn": policy["Arn"], "tags": policy.get("Tags")})
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
try:
for saml_provider in iam_client.list_saml_providers()["SAMLProviderList"]:
iam_resources.append({"arn": saml_provider["Arn"], "tags": []})
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return iam_resources