Files
prowler/providers/aws/services/accessanalyzer/accessanalyzer_service.py
2022-10-20 14:32:35 +02:00

118 lines
3.7 KiB
Python

import threading
from dataclasses import dataclass
from lib.logger import logger
from providers.aws.aws_provider import generate_regional_clients
################## AccessAnalyzer
class AccessAnalyzer:
def __init__(self, audit_info):
self.service = "accessanalyzer"
self.session = audit_info.audit_session
self.audited_account = audit_info.audited_account
self.regional_clients = generate_regional_clients(self.service, audit_info)
self.analyzers = []
self.__threading_call__(self.__list_analyzers__)
self.__list_findings__()
def __get_session__(self):
return self.session
def __threading_call__(self, call):
threads = []
for regional_client in self.regional_clients.values():
threads.append(threading.Thread(target=call, args=(regional_client,)))
for t in threads:
t.start()
for t in threads:
t.join()
def __list_analyzers__(self, regional_client):
logger.info("AccessAnalyzer - Listing Analyzers...")
try:
list_analyzers_paginator = regional_client.get_paginator("list_analyzers")
analyzer_count = 0
for page in list_analyzers_paginator.paginate():
analyzer_count += len(page["analyzers"])
for analyzer in page["analyzers"]:
self.analyzers.append(
Analyzer(
analyzer["arn"],
analyzer["name"],
analyzer["status"],
0,
str(analyzer["tags"]),
analyzer["type"],
regional_client.region,
)
)
# No analyzers in region
if analyzer_count == 0:
self.analyzers.append(
Analyzer(
"",
"",
"NOT_AVAILABLE",
"",
"",
"",
regional_client.region,
)
)
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def __list_findings__(self):
logger.info("AccessAnalyzer - Listing Findings per Analyzer...")
try:
for analyzer in self.analyzers:
if analyzer.status != "NOT_AVAILABLE":
findings_count = 0
regional_client = self.regional_clients[analyzer.region]
list_findings_paginator = regional_client.get_paginator(
"list_findings"
)
for page in list_findings_paginator.paginate(
analyzerArn=analyzer.arn
):
findings_count += len(page["findings"])
analyzer.findings_count = findings_count
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
@dataclass
class Analyzer:
arn: str
name: str
status: str
findings_count: int
tags: str
type: str
region: str
def __init__(
self,
arn,
name,
status,
findings_count,
tags,
type,
region,
):
self.arn = arn
self.name = name
self.status = status
self.findings_count = findings_count
self.tags = tags
self.type = type
self.region = region