fix(ClientError): handle ClientErrors in DynamoDB and Directory Service (#2400)

This commit is contained in:
Sergio Garcia
2023-05-24 11:50:08 +02:00
committed by GitHub
parent 4329aac377
commit c01c59023a
3 changed files with 69 additions and 39 deletions

View File

@@ -3,6 +3,7 @@ from datetime import datetime
from enum import Enum from enum import Enum
from typing import Optional, Union from typing import Optional, Union
from botocore.client import ClientError
from pydantic import BaseModel from pydantic import BaseModel
from prowler.lib.logger import logger from prowler.lib.logger import logger
@@ -117,21 +118,23 @@ class DirectoryService:
try: try:
for directory in self.directories.values(): for directory in self.directories.values():
if directory.region == regional_client.region: if directory.region == regional_client.region:
describe_event_topics_parameters = {"DirectoryId": directory.id} # Operation is not supported for Shared MicrosoftAD directories.
event_topics = [] if directory.type != DirectoryType.SharedMicrosoftAD:
describe_event_topics = regional_client.describe_event_topics( describe_event_topics_parameters = {"DirectoryId": directory.id}
**describe_event_topics_parameters event_topics = []
) describe_event_topics = regional_client.describe_event_topics(
for event_topic in describe_event_topics["EventTopics"]: **describe_event_topics_parameters
event_topics.append(
EventTopics(
topic_arn=event_topic["TopicArn"],
topic_name=event_topic["TopicName"],
status=event_topic["Status"],
created_date_time=event_topic["CreatedDateTime"],
)
) )
self.directories[directory.id].event_topics = event_topics for event_topic in describe_event_topics["EventTopics"]:
event_topics.append(
EventTopics(
topic_arn=event_topic["TopicArn"],
topic_name=event_topic["TopicName"],
status=event_topic["Status"],
created_date_time=event_topic["CreatedDateTime"],
)
)
self.directories[directory.id].event_topics = event_topics
except Exception as error: except Exception as error:
logger.error( logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
@@ -146,25 +149,42 @@ class DirectoryService:
directory.region == regional_client.region directory.region == regional_client.region
and directory.type != DirectoryType.SimpleAD and directory.type != DirectoryType.SimpleAD
): ):
list_certificates_paginator = regional_client.get_paginator( try:
"list_certificates" list_certificates_paginator = regional_client.get_paginator(
) "list_certificates"
list_certificates_parameters = {"DirectoryId": directory.id} )
certificates = [] list_certificates_parameters = {"DirectoryId": directory.id}
for page in list_certificates_paginator.paginate( certificates = []
**list_certificates_parameters for page in list_certificates_paginator.paginate(
): **list_certificates_parameters
for certificate_info in page["CertificatesInfo"]: ):
certificates.append( for certificate_info in page["CertificatesInfo"]:
Certificate( certificates.append(
id=certificate_info["CertificateId"], Certificate(
common_name=certificate_info["CommonName"], id=certificate_info["CertificateId"],
state=certificate_info["State"], common_name=certificate_info["CommonName"],
expiry_date_time=certificate_info["ExpiryDateTime"], state=certificate_info["State"],
type=certificate_info["Type"], expiry_date_time=certificate_info[
"ExpiryDateTime"
],
type=certificate_info["Type"],
)
) )
self.directories[directory.id].certificates = certificates
except ClientError as error:
if (
error.response["Error"]["Code"]
== "UnsupportedOperationException"
):
logger.warning(
f"{directory.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
) )
self.directories[directory.id].certificates = certificates else:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
continue
except Exception as error: except Exception as error:
logger.error( logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
@@ -194,7 +214,6 @@ class DirectoryService:
"ManualSnapshotsLimitReached" "ManualSnapshotsLimitReached"
], ],
) )
except Exception as error: except Exception as error:
logger.error( logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"

View File

@@ -102,11 +102,22 @@ class DynamoDB:
logger.info("DynamoDB - List Tags...") logger.info("DynamoDB - List Tags...")
try: try:
for table in self.tables: for table in self.tables:
regional_client = self.regional_clients[table.region] try:
response = regional_client.list_tags_of_resource(ResourceArn=table.arn)[ regional_client = self.regional_clients[table.region]
"Tags" response = regional_client.list_tags_of_resource(
] ResourceArn=table.arn
table.tags = response )["Tags"]
table.tags = response
except ClientError as error:
if error.response["Error"]["Code"] == "ResourceNotFoundException":
logger.warning(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
else:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
continue
except Exception as error: except Exception as error:
logger.error( logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"

View File

@@ -326,13 +326,13 @@ class Test_IAM_Service:
def test__get_account_summary__(self): def test__get_account_summary__(self):
# Generate IAM Client # Generate IAM Client
iam_client = client("iam") iam_client = client("iam")
account_summary = iam_client.get_account_summary() account_summary = iam_client.get_account_summary()["SummaryMap"]
# IAM client for this test class # IAM client for this test class
audit_info = self.set_mocked_audit_info() audit_info = self.set_mocked_audit_info()
iam = IAM(audit_info) iam = IAM(audit_info)
assert iam.account_summary == account_summary assert iam.account_summary["SummaryMap"] == account_summary
# Test IAM Get Password Policy # Test IAM Get Password Policy
@mock_iam @mock_iam