fix(accessanalyzer_enabled_without_findings): fixed status findings (#1799)

This commit is contained in:
Nacho Rivera
2023-01-30 13:22:05 +01:00
committed by GitHub
parent cb7439a831
commit 552e0fefc3
4 changed files with 126 additions and 90 deletions

View File

@@ -11,21 +11,28 @@ class accessanalyzer_enabled_without_findings(Check):
report = Check_Report_AWS(self.metadata())
report.region = analyzer.region
if analyzer.status == "ACTIVE":
if analyzer.findings_count > 0:
report.status = "FAIL"
report.status_extended = f"IAM Access Analyzer {analyzer.name} has {analyzer.findings_count} active findings"
report.resource_id = analyzer.name
report.resource_arn = analyzer.arn
else:
report.status = "PASS"
report.status_extended = (
f"IAM Access Analyzer {analyzer.name} has no active findings"
)
report.resource_id = analyzer.name
report.resource_arn = analyzer.arn
report.status = "PASS"
report.status_extended = (
f"IAM Access Analyzer {analyzer.name} does not have active findings"
)
report.resource_id = analyzer.name
report.resource_arn = analyzer.arn
if len(analyzer.findings) != 0:
active_finding_counter = 0
for finding in analyzer.findings:
if finding.status == "ACTIVE":
active_finding_counter += 1
if active_finding_counter > 0:
report.status = "FAIL"
report.status_extended = f"IAM Access Analyzer {analyzer.name} has {active_finding_counter} active findings"
report.resource_id = analyzer.name
report.resource_arn = analyzer.arn
elif analyzer.status == "NOT_AVAILABLE":
report.status = "FAIL"
report.status_extended = "IAM Access Analyzer is not enabled"
report.status_extended = (
f"IAM Access Analyzer {analyzer.name} is not enabled"
)
report.resource_id = analyzer.name
else:
report.status = "FAIL"

View File

@@ -1,5 +1,6 @@
import threading
from dataclasses import dataclass
from pydantic import BaseModel
from prowler.lib.logger import logger
from prowler.providers.aws.aws_provider import generate_regional_clients
@@ -15,6 +16,7 @@ class AccessAnalyzer:
self.analyzers = []
self.__threading_call__(self.__list_analyzers__)
self.__list_findings__()
self.__get_finding_status__()
def __get_session__(self):
return self.session
@@ -38,13 +40,12 @@ class AccessAnalyzer:
for analyzer in page["analyzers"]:
self.analyzers.append(
Analyzer(
analyzer["arn"],
analyzer["name"],
analyzer["status"],
0,
str(analyzer["tags"]),
analyzer["type"],
regional_client.region,
arn=analyzer["arn"],
name=analyzer["name"],
status=analyzer["status"],
tags=str(analyzer["tags"]),
type=analyzer["type"],
region=regional_client.region,
)
)
# No analyzers in region
@@ -56,7 +57,6 @@ class AccessAnalyzer:
"NOT_AVAILABLE",
"",
"",
"",
regional_client.region,
)
)
@@ -66,12 +66,28 @@ class AccessAnalyzer:
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def __get_finding_status__(self):
logger.info("AccessAnalyzer - Get Finding status...")
try:
for analyzer in self.analyzers:
if analyzer.status != "NOT_AVAILABLE":
regional_client = self.regional_clients[analyzer.region]
for finding in analyzer.findings:
finding_information = regional_client.get_finding(
analyzerArn=analyzer.arn, id=finding.id
)
finding.status = finding_information["finding"]["status"]
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def __list_findings__(self):
logger.info("AccessAnalyzer - Listing Findings per Analyzer...")
try:
for analyzer in self.analyzers:
if analyzer.status != "NOT_AVAILABLE":
findings_count = 0
regional_client = self.regional_clients[analyzer.region]
list_findings_paginator = regional_client.get_paginator(
"list_findings"
@@ -79,8 +95,8 @@ class AccessAnalyzer:
for page in list_findings_paginator.paginate(
analyzerArn=analyzer.arn
):
findings_count += len(page["findings"])
analyzer.findings_count = findings_count
for finding in page["findings"]:
analyzer.findings.append(Finding(id=finding["id"]))
except Exception as error:
logger.error(
@@ -88,30 +104,16 @@ class AccessAnalyzer:
)
@dataclass
class Analyzer:
class Finding(BaseModel):
id: str
status: str = ""
class Analyzer(BaseModel):
arn: str
name: str
status: str
findings_count: int
findings: list[Finding] = []
tags: str
type: str
region: str
def __init__(
self,
arn,
name,
status,
findings_count,
tags,
type,
region,
):
self.arn = arn
self.name = name
self.status = status
self.findings_count = findings_count
self.tags = tags
self.type = type
self.region = region

View File

@@ -2,6 +2,7 @@ from unittest import mock
from prowler.providers.aws.services.accessanalyzer.accessanalyzer_service import (
Analyzer,
Finding,
)
@@ -28,13 +29,12 @@ class Test_accessanalyzer_enabled_without_findings:
accessanalyzer_client = mock.MagicMock
accessanalyzer_client.analyzers = [
Analyzer(
"",
"Test Analyzer",
"NOT_AVAILABLE",
"",
"",
"",
"eu-west-1",
arn="",
name="Test Analyzer",
status="NOT_AVAILABLE",
tags="",
type="",
region="eu-west-1",
)
]
with mock.patch(
@@ -50,29 +50,40 @@ class Test_accessanalyzer_enabled_without_findings:
assert len(result) == 1
assert result[0].status == "FAIL"
assert result[0].status_extended == "IAM Access Analyzer is not enabled"
assert (
result[0].status_extended
== "IAM Access Analyzer Test Analyzer is not enabled"
)
assert result[0].resource_id == "Test Analyzer"
def test_two_analyzers(self):
accessanalyzer_client = mock.MagicMock
accessanalyzer_client.analyzers = [
Analyzer(
"",
"Test Analyzer",
"NOT_AVAILABLE",
"",
"",
"",
"eu-west-1",
arn="",
name="Test Analyzer",
status="NOT_AVAILABLE",
tags="",
type="",
region="eu-west-1",
),
Analyzer(
"",
"Test Analyzer",
"ACTIVE",
10,
"",
"",
"eu-west-1",
arn="",
name="Test Analyzer",
status="ACTIVE",
findings=[
Finding(
id="test-finding-1",
status="ACTIVE",
),
Finding(
id="test-finding-2",
status="ARCHIVED",
),
],
tags="",
type="",
region="eu-west-2",
),
]
@@ -91,26 +102,30 @@ class Test_accessanalyzer_enabled_without_findings:
assert len(result) == 2
assert result[0].status == "FAIL"
assert result[0].status_extended == "IAM Access Analyzer is not enabled"
assert (
result[0].status_extended
== "IAM Access Analyzer Test Analyzer is not enabled"
)
assert result[0].resource_id == "Test Analyzer"
assert result[0].region == "eu-west-1"
assert result[1].status == "FAIL"
assert (
result[1].status_extended
== "IAM Access Analyzer Test Analyzer has 10 active findings"
== "IAM Access Analyzer Test Analyzer has 1 active findings"
)
assert result[1].resource_id == "Test Analyzer"
assert result[1].region == "eu-west-2"
def test_one_active_analyzer_without_findings(self):
accessanalyzer_client = mock.MagicMock
accessanalyzer_client.analyzers = [
Analyzer(
"",
"Test Analyzer",
"ACTIVE",
0,
"",
"",
"eu-west-1",
arn="",
name="Test Analyzer",
status="ACTIVE",
tags="",
type="",
region="eu-west-2",
)
]
@@ -130,22 +145,22 @@ class Test_accessanalyzer_enabled_without_findings:
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "IAM Access Analyzer Test Analyzer has no active findings"
== "IAM Access Analyzer Test Analyzer does not have active findings"
)
assert result[0].resource_id == "Test Analyzer"
assert result[0].region == "eu-west-2"
def test_one_active_analyzer_not_active(self):
accessanalyzer_client = mock.MagicMock
accessanalyzer_client.analyzers = [
Analyzer(
"",
"Test Analyzer",
"FAILED",
0,
"",
"",
"eu-west-1",
)
arn="",
name="Test Analyzer",
status="NOT_AVAILABLE",
tags="",
type="",
region="eu-west-1",
),
]
# Patch AccessAnalyzer Client
with mock.patch(
@@ -164,6 +179,7 @@ class Test_accessanalyzer_enabled_without_findings:
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== "IAM Access Analyzer Test Analyzer is not active"
== "IAM Access Analyzer Test Analyzer is not enabled"
)
assert result[0].resource_id == "Test Analyzer"
assert result[0].region == "eu-west-1"

View File

@@ -39,11 +39,20 @@ def mock_make_api_call(self, operation_name, kwarg):
if operation_name == "ListFindings":
# If we only want to count the number of findings
# we return a list of values just to count them
return {"findings": [0, 1, 2]}
return {
"findings": [
{
"id": "test_id1",
}
]
}
if operation_name == "GetFinding":
# If we only want to count the number of findings
# we return a list of values just to count them
return {"finding": {"id": "test_id1", "status": "ARCHIVED"}}
return make_api_call(self, operation_name, kwarg)
# Mock generate_regional_clients()
def mock_generate_regional_clients(service, audit_info):
regional_client = audit_info.audit_session.client(service, region_name=AWS_REGION)
regional_client.region = AWS_REGION
@@ -92,4 +101,6 @@ class Test_AccessAnalyzer_Service:
current_audit_info.audited_partition = "aws"
access_analyzer = AccessAnalyzer(current_audit_info)
assert len(access_analyzer.analyzers) == 1
assert access_analyzer.analyzers[0].findings_count == 3
assert len(access_analyzer.analyzers[0].findings) == 1
assert access_analyzer.analyzers[0].findings[0].status == "ARCHIVED"
assert access_analyzer.analyzers[0].findings[0].id == "test_id1"