feat(inspector2): New Service and Check (#2250)

Co-authored-by: Pepe Fagoaga <pepe@verica.io>
Co-authored-by: Sergio Garcia <38561120+sergargar@users.noreply.github.com>
This commit is contained in:
Gabriel Soltz
2023-04-24 12:15:16 +02:00
committed by GitHub
parent 828fb37ca8
commit 63501a0d59
8 changed files with 465 additions and 0 deletions

View File

@@ -0,0 +1,4 @@
from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info
from prowler.providers.aws.services.inspector2.inspector2_service import Inspector2
inspector2_client = Inspector2(current_audit_info)

View File

@@ -0,0 +1,30 @@
{
"Provider": "aws",
"CheckID": "inspector2_findings_exist",
"CheckTitle": "Check if Inspector2 findings exist",
"CheckType": [],
"ServiceName": "inspector2",
"SubServiceName": "",
"ResourceIdTemplate": "arn:aws:inspector2:region:account-id/detector-id",
"Severity": "medium",
"ResourceType": "Other",
"Description": "Check if Inspector2 findings exist",
"Risk": "Without using AWS Inspector, you may not be aware of all the security vulnerabilities in your AWS resources, which could lead to unauthorized access, data breaches, or other security incidents.",
"RelatedUrl": "https://docs.aws.amazon.com/inspector/latest/user/findings-understanding.html",
"Remediation": {
"Code": {
"CLI": "aws inspector2 enable",
"NativeIaC": "",
"Other": "https://www.trendmicro.com/cloudoneconformity/knowledge-base/aws/Inspector/amazon-inspector-findings.html",
"Terraform": ""
},
"Recommendation": {
"Text": "Enable Inspector2",
"Url": "https://docs.aws.amazon.com/inspector/latest/user/what-is-inspector.html"
}
},
"Categories": [],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}

View File

@@ -0,0 +1,31 @@
from prowler.lib.check.models import Check, Check_Report_AWS
from prowler.providers.aws.services.inspector2.inspector2_client import (
inspector2_client,
)
class inspector2_findings_exist(Check):
def execute(self):
findings = []
for inspector in inspector2_client.inspectors:
report = Check_Report_AWS(self.metadata())
report.status = "FAIL"
report.status_extended = "Inspector2 is not enabled."
report.resource_id = inspector.id
report.resource_arn = ""
report.region = inspector.region
if inspector.status == "ENABLED":
report.status = "PASS"
report.status_extended = "Inspector2 is enabled with no findings"
for finding in inspector.findings:
report.status_extended = (
"Inspector2 is enabled with no active findings"
)
if finding.status == "ACTIVE":
report.status = "FAIL"
report.status_extended = f"There are {str(len(inspector.findings))} ACTIVE Inspector2 findings."
break
findings.append(report)
return findings

View File

@@ -0,0 +1,108 @@
import threading
from pydantic import BaseModel
from prowler.lib.logger import logger
from prowler.lib.scan_filters.scan_filters import is_resource_filtered
from prowler.providers.aws.aws_provider import generate_regional_clients
################################ Inspector2
class Inspector2:
def __init__(self, audit_info):
self.service = "inspector2"
self.session = audit_info.audit_session
self.audited_account = audit_info.audited_account
self.audit_resources = audit_info.audit_resources
self.audited_partition = audit_info.audited_partition
self.regional_clients = generate_regional_clients(self.service, audit_info)
# If the region is not set in the audit profile,
# we pick the first region from the regional clients list
self.region = (
audit_info.profile_region
if audit_info.profile_region
else list(self.regional_clients.keys())[0]
)
self.inspectors = []
self.__threading_call__(self.__batch_get_account_status__)
self.__list_findings__()
def __get_session__(self):
return self.session
def __threading_call__(self, call):
threads = []
for regional_client in self.regional_clients.values():
threads.append(threading.Thread(target=call, args=(regional_client,)))
for t in threads:
t.start()
for t in threads:
t.join()
def __batch_get_account_status__(self, regional_client):
# We use this function to check if inspector2 is enabled
logger.info("Inspector2 - batch_get_account_status...")
try:
batch_get_account_status = regional_client.batch_get_account_status()[
"accounts"
][0]
self.inspectors.append(
Inspector(
id="Inspector2",
status=batch_get_account_status.get("state").get("status"),
region=regional_client.region,
)
)
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def __list_findings__(self):
logger.info("Inspector2 - listing findings...")
try:
for inspector in self.inspectors:
try:
regional_client = self.regional_clients[inspector.region]
list_findings_paginator = regional_client.get_paginator(
"list_findings"
)
for page in list_findings_paginator.paginate():
for finding in page["findings"]:
if not self.audit_resources or (
is_resource_filtered(finding, self.audit_resources)
):
inspector.findings.append(
InspectorFinding(
arn=finding.get("findingArn"),
region=regional_client.region,
severity=finding.get("severity"),
status=finding.get("status"),
title=finding.get("title"),
)
)
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
continue
except Exception as error:
logger.error(
f"{error.__class__.__name__}:{error.__traceback__.tb_lineno} -- {error}"
)
class InspectorFinding(BaseModel):
arn: str
region: str
severity: str
status: str
title: str
class Inspector(BaseModel):
id: str
region: str
status: str
findings: list[InspectorFinding] = []

View File

@@ -0,0 +1,153 @@
from unittest import mock
from prowler.providers.aws.services.inspector2.inspector2_service import (
Inspector,
InspectorFinding,
)
AWS_REGION = "us-east-1"
AWS_ACCOUNT_ID = "123456789012"
FINDING_ARN = (
"arn:aws:inspector2:us-east-1:123456789012:finding/0e436649379db5f327e3cf5bb4421d76"
)
class Test_inspector2_findings_exist:
def test_inspector2_disabled(self):
# Mock the inspector2 client
inspector2_client = mock.MagicMock
inspector2_client.region = AWS_REGION
inspector2_client.inspectors = [
Inspector(
id="Inspector2", status="DISABLED", region=AWS_REGION, findings=[]
)
]
with mock.patch(
"prowler.providers.aws.services.inspector2.inspector2_service.Inspector2",
new=inspector2_client,
):
# Test Check
from prowler.providers.aws.services.inspector2.inspector2_findings_exist.inspector2_findings_exist import (
inspector2_findings_exist,
)
check = inspector2_findings_exist()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert result[0].status_extended == "Inspector2 is not enabled."
assert result[0].resource_id == "Inspector2"
assert result[0].resource_arn == ""
assert result[0].region == AWS_REGION
def test_enabled_no_finding(self):
# Mock the inspector2 client
inspector2_client = mock.MagicMock
inspector2_client.region = AWS_REGION
inspector2_client.inspectors = [
Inspector(id="Inspector2", status="ENABLED", region=AWS_REGION, findings=[])
]
with mock.patch(
"prowler.providers.aws.services.inspector2.inspector2_service.Inspector2",
new=inspector2_client,
):
# Test Check
from prowler.providers.aws.services.inspector2.inspector2_findings_exist.inspector2_findings_exist import (
inspector2_findings_exist,
)
check = inspector2_findings_exist()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert result[0].status_extended == "Inspector2 is enabled with no findings"
assert result[0].resource_id == "Inspector2"
assert result[0].resource_arn == ""
assert result[0].region == AWS_REGION
def test_enabled_with_no_active_finding(self):
# Mock the inspector2 client
inspector2_client = mock.MagicMock
inspector2_client.region = AWS_REGION
inspector2_client.inspectors = [
Inspector(
id="Inspector2",
region=AWS_REGION,
status="ENABLED",
findings=[
InspectorFinding(
arn=FINDING_ARN,
region=AWS_REGION,
severity="MEDIUM",
status="NOT_ACTIVE",
title="CVE-2022-40897 - setuptools",
)
],
)
]
with mock.patch(
"prowler.providers.aws.services.inspector2.inspector2_service.Inspector2",
new=inspector2_client,
):
# Test Check
from prowler.providers.aws.services.inspector2.inspector2_findings_exist.inspector2_findings_exist import (
inspector2_findings_exist,
)
check = inspector2_findings_exist()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "Inspector2 is enabled with no active findings"
)
assert result[0].resource_id == "Inspector2"
assert result[0].resource_arn == ""
assert result[0].region == AWS_REGION
def test_enabled_with_active_finding(self):
# Mock the inspector2 client
inspector2_client = mock.MagicMock
inspector2_client.region = AWS_REGION
inspector2_client.inspectors = [
Inspector(
id="Inspector2",
region=AWS_REGION,
status="ENABLED",
findings=[
InspectorFinding(
arn=FINDING_ARN,
region=AWS_REGION,
severity="MEDIUM",
status="ACTIVE",
title="CVE-2022-40897 - setuptools",
)
],
)
]
with mock.patch(
"prowler.providers.aws.services.inspector2.inspector2_service.Inspector2",
new=inspector2_client,
):
# Test Check
from prowler.providers.aws.services.inspector2.inspector2_findings_exist.inspector2_findings_exist import (
inspector2_findings_exist,
)
check = inspector2_findings_exist()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended == "There are 1 ACTIVE Inspector2 findings."
)
assert result[0].resource_id == "Inspector2"
assert result[0].resource_arn == ""
assert result[0].region == AWS_REGION

View File

@@ -0,0 +1,139 @@
from datetime import datetime
from unittest.mock import patch
import botocore
from boto3 import session
from prowler.providers.aws.lib.audit_info.audit_info import AWS_Audit_Info
from prowler.providers.aws.services.inspector2.inspector2_service import Inspector2
AWS_REGION = "us-east-1"
AWS_ACCOUNT_ID = "123456789012"
FINDING_ARN = (
"arn:aws:inspector2:us-east-1:123456789012:finding/0e436649379db5f327e3cf5bb4421d76"
)
# Mocking Calls
make_api_call = botocore.client.BaseClient._make_api_call
def mock_make_api_call(self, operation_name, kwargs):
"""We have to mock every AWS API call using Boto3"""
if operation_name == "BatchGetAccountStatus":
return {
"accounts": [
{
"accountId": "string",
"resourceState": {
"ec2": {
"errorCode": "ALREADY_ENABLED",
"errorMessage": "string",
"status": "ENABLED",
},
"ecr": {
"errorCode": "ALREADY_ENABLED",
"errorMessage": "string",
"status": "ENABLED",
},
"lambda": {
"errorCode": "ALREADY_ENABLED",
"errorMessage": "string",
"status": "ENABLED",
},
},
"state": {
"errorCode": "ALREADY_ENABLED",
"errorMessage": "string",
"status": "ENABLED",
},
}
]
}
if operation_name == "ListFindings":
return {
"findings": [
{
"awsAccountId": AWS_ACCOUNT_ID,
"findingArn": FINDING_ARN,
"description": "Finding Description",
"severity": "MEDIUM",
"status": "ACTIVE",
"title": "CVE-2022-40897 - setuptools",
"type": "PACKAGE_VULNERABILITY",
"updatedAt": datetime(2024, 1, 1),
}
]
}
return make_api_call(self, operation_name, kwargs)
def mock_generate_regional_clients(service, audit_info):
regional_client = audit_info.audit_session.client(service, region_name=AWS_REGION)
regional_client.region = AWS_REGION
return {AWS_REGION: regional_client}
# Patch every AWS call using Boto3 and generate_regional_clients to have 1 client
@patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call)
@patch(
"prowler.providers.aws.services.inspector2.inspector2_service.generate_regional_clients",
new=mock_generate_regional_clients,
)
class Test_Inspector2_Service:
# Mocked Audit Info
def set_mocked_audit_info(self):
audit_info = AWS_Audit_Info(
session_config=None,
original_session=None,
audit_session=session.Session(
profile_name=None,
botocore_session=None,
),
audited_account=None,
audited_user_id=None,
audited_partition="aws",
audited_identity_arn=None,
profile=None,
profile_region=None,
credentials=None,
assumed_role_info=None,
audited_regions=None,
organizations_metadata=None,
audit_resources=None,
)
return audit_info
def test__get_client__(self):
audit_info = self.set_mocked_audit_info()
ssmincidents = Inspector2(audit_info)
assert (
ssmincidents.regional_clients[AWS_REGION].__class__.__name__ == "Inspector2"
)
def test__get_service__(self):
audit_info = self.set_mocked_audit_info()
ssmincidents = Inspector2(audit_info)
assert ssmincidents.service == "inspector2"
def test__batch_get_account_status__(self):
audit_info = self.set_mocked_audit_info()
ssmincidents = Inspector2(audit_info)
assert len(ssmincidents.inspectors) == 1
assert ssmincidents.inspectors[0].id == "Inspector2"
assert ssmincidents.inspectors[0].region == AWS_REGION
assert ssmincidents.inspectors[0].status == "ENABLED"
def test__list_findings__(self):
audit_info = self.set_mocked_audit_info()
ssmincidents = Inspector2(audit_info)
assert len(ssmincidents.inspectors[0].findings) == 1
assert ssmincidents.inspectors[0].findings[0].arn == FINDING_ARN
assert ssmincidents.inspectors[0].findings[0].region == AWS_REGION
assert ssmincidents.inspectors[0].findings[0].severity == "MEDIUM"
assert ssmincidents.inspectors[0].findings[0].status == "ACTIVE"
assert (
ssmincidents.inspectors[0].findings[0].title
== "CVE-2022-40897 - setuptools"
)