feat(TrustedAvisor): add TrustedAvisor tests and checks (#1498)

Co-authored-by: sergargar <sergio@verica.io>
This commit is contained in:
Sergio Garcia
2022-11-17 22:36:06 +01:00
committed by GitHub
parent 62081cb399
commit e016fb2d6b
9 changed files with 292 additions and 61 deletions

View File

@@ -1,61 +0,0 @@
#!/usr/bin/env bash
# Prowler - the handy cloud security tool (copyright 2018) by Toni de la Fuente
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed
# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
CHECK_ID_extra726="7.26"
CHECK_TITLE_extra726="[extra726] Check Trusted Advisor for errors and warnings"
CHECK_SCORED_extra726="NOT_SCORED"
CHECK_CIS_LEVEL_extra726="EXTRA"
CHECK_SEVERITY_extra726="Medium"
CHECK_ALTERNATE_check726="extra726"
CHECK_SERVICENAME_extra726="trustedadvisor"
CHECK_RISK_extra726='Improve the security of your application by closing gaps; enabling various AWS security features; and examining your permissions.'
CHECK_REMEDIATION_extra726='Review and act upon its recommendations.'
CHECK_DOC_extra726='https://aws.amazon.com/premiumsupport/technology/trusted-advisor/best-practice-checklist/'
CHECK_CAF_EPIC_extra726='IAM'
extra726(){
trap "exit" INT
# forcing REGION if not set will be us-east-1 region only since support only works in that region
TA_CHECKS_ID=$($AWSCLI support describe-trusted-advisor-checks --language en $PROFILE_OPT --region $REGION --query checks[*].id --output text 2>&1)
if [[ $(echo "$TA_CHECKS_ID" | grep SubscriptionRequiredException) ]]; then
textInfo "$REGION: Trusted Advisor requires AWS Premium Support Subscription" "$REGION"
return
fi
if [[ $(echo "$TA_CHECKS_ID" | grep "Could not connect") ]]; then
textInfo "$REGION: Trusted Advisor not available in this region" "$REGION"
return
fi
for checkid in $TA_CHECKS_ID; do
TA_CHECKS_NAME=$($AWSCLI support describe-trusted-advisor-checks --language en $PROFILE_OPT --region $REGION --query "checks[?id==\`$checkid\`].{name:name}[*]" --output text)
QUERY_TA_CHECK_RESULT=$($AWSCLI support describe-trusted-advisor-check-result --check-id $checkid --language en $PROFILE_OPT --region $REGION --query 'result.status' --output text)
# Possible results - https://docs.aws.amazon.com/cli/latest/reference/support/describe-trusted-advisor-check-result.html
case "$QUERY_TA_CHECK_RESULT" in
"ok")
textPass "$REGION: Trusted Advisor check $TA_CHECKS_NAME is in state $QUERY_TA_CHECK_RESULT" "$REGION" "$TA_CHECKS_NAME"
;;
"error")
textFail "$REGION: Trusted Advisor check $TA_CHECKS_NAME is in state $QUERY_TA_CHECK_RESULT" "$REGION" "$TA_CHECKS_NAME"
;;
"warning")
textInfo "$REGION: Trusted Advisor check $TA_CHECKS_NAME is in state $QUERY_TA_CHECK_RESULT" "$REGION" "$TA_CHECKS_NAME"
;;
"not_available")
textInfo "$REGION: Trusted Advisor check $TA_CHECKS_NAME is in state $QUERY_TA_CHECK_RESULT" "$REGION" "$TA_CHECKS_NAME"
;;
"*")
textFail "$REGION: Trusted Advisor check $TA_CHECKS_NAME is in state $QUERY_TA_CHECK_RESULT" "$REGION" "$TA_CHECKS_NAME"
;;
esac
done
}

View File

@@ -0,0 +1,4 @@
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.trustedadvisor.trustedadvisor_service import TrustedAdvisor
trustedadvisor_client = TrustedAdvisor(current_audit_info)

View File

@@ -0,0 +1,35 @@
{
"Provider": "aws",
"CheckID": "trustedadvisor_errors_and_warnings",
"CheckTitle": "Check Trusted Advisor for errors and warnings.",
"CheckType": [],
"ServiceName": "trustedadvisor",
"SubServiceName": "",
"ResourceIdTemplate": "arn:aws:service:region:account-id",
"Severity": "medium",
"ResourceType": "Other",
"Description": "Check Trusted Advisor for errors and warnings.",
"Risk": "Improve the security of your application by closing gaps, enabling various AWS security features and examining your permissions.",
"RelatedUrl": "https://aws.amazon.com/premiumsupport/technology/trusted-advisor/best-practice-checklist/",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "https://www.trendmicro.com/cloudoneconformity/knowledge-base/aws/TrustedAdvisor/checks.html",
"Terraform": ""
},
"Recommendation": {
"Text": "Review and act upon its recommendations.",
"Url": "https://aws.amazon.com/premiumsupport/technology/trusted-advisor/best-practice-checklist/"
}
},
"Categories": [],
"Tags": {
"Tag1Key": "value",
"Tag2Key": "value"
},
"DependsOn": [],
"RelatedTo": [],
"Notes": "",
"Compliance": []
}

View File

@@ -0,0 +1,23 @@
from lib.check.models import Check, Check_Report
from providers.aws.services.trustedadvisor.trustedadvisor_client import (
trustedadvisor_client,
)
class trustedadvisor_errors_and_warnings(Check):
def execute(self):
findings = []
if trustedadvisor_client.checks:
for check in trustedadvisor_client.checks:
report = Check_Report(self.metadata)
report.region = check.region
report.resource_id = check.id
report.status = "FAIL"
report.status_extended = (
f"Trusted Advisor check {check.name} is in state {check.status}."
)
if check.status == "ok":
report.status = "PASS"
findings.append(report)
return findings

View File

@@ -0,0 +1,79 @@
from re import search
from unittest import mock
from uuid import uuid4
from providers.aws.services.trustedadvisor.trustedadvisor_service import Check
AWS_REGION = "eu-west-1"
AWS_ACCOUNT_NUMBER = "123456789012"
detector_id = str(uuid4())
class Test_trustedadvisor_errors_and_warnings:
def test_no_detectors(self):
trustedadvisor_client = mock.MagicMock
trustedadvisor_client.checks = []
with mock.patch(
"providers.aws.services.trustedadvisor.trustedadvisor_service.TrustedAdvisor",
trustedadvisor_client,
):
from providers.aws.services.trustedadvisor.trustedadvisor_errors_and_warnings.trustedadvisor_errors_and_warnings import (
trustedadvisor_errors_and_warnings,
)
check = trustedadvisor_errors_and_warnings()
result = check.execute()
assert len(result) == 0
def test_trustedadvisor_all_passed_checks(self):
trustedadvisor_client = mock.MagicMock
trustedadvisor_client.checks = []
trustedadvisor_client.checks.append(
Check(
id="check1",
name="check1",
region=AWS_REGION,
status="ok",
)
)
with mock.patch(
"providers.aws.services.trustedadvisor.trustedadvisor_service.TrustedAdvisor",
trustedadvisor_client,
):
from providers.aws.services.trustedadvisor.trustedadvisor_errors_and_warnings.trustedadvisor_errors_and_warnings import (
trustedadvisor_errors_and_warnings,
)
check = trustedadvisor_errors_and_warnings()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert search("ok", result[0].status_extended)
assert result[0].resource_id == "check1"
def test_trustedadvisor_error_check(self):
trustedadvisor_client = mock.MagicMock
trustedadvisor_client.checks = []
trustedadvisor_client.checks.append(
Check(
id="check1",
name="check1",
region=AWS_REGION,
status="error",
)
)
with mock.patch(
"providers.aws.services.trustedadvisor.trustedadvisor_service.TrustedAdvisor",
trustedadvisor_client,
):
from providers.aws.services.trustedadvisor.trustedadvisor_errors_and_warnings.trustedadvisor_errors_and_warnings import (
trustedadvisor_errors_and_warnings,
)
check = trustedadvisor_errors_and_warnings()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert search("error", result[0].status_extended)
assert result[0].resource_id == "check1"

View File

@@ -0,0 +1,70 @@
import threading
from typing import Optional
from pydantic import BaseModel
from lib.logger import logger
from providers.aws.aws_provider import generate_regional_clients
################################ TrustedAdvisor
class TrustedAdvisor:
def __init__(self, audit_info):
self.service = "support"
self.session = audit_info.audit_session
self.regional_clients = generate_regional_clients(self.service, audit_info)
self.checks = []
self.__threading_call__(self.__describe_trusted_advisor_checks__)
self.__threading_call__(self.__describe_trusted_advisor_check_result__)
def __get_session__(self):
return self.session
def __threading_call__(self, call):
threads = []
for regional_client in self.regional_clients.values():
threads.append(threading.Thread(target=call, args=(regional_client,)))
for t in threads:
t.start()
for t in threads:
t.join()
def __describe_trusted_advisor_checks__(self, regional_client):
logger.info("TrustedAdvisor - Describing Checks...")
try:
for check in regional_client.describe_trusted_advisor_checks(language="en")[
"checks"
]:
self.checks.append(
Check(
id=check["id"],
name=check["name"],
region=regional_client.region,
)
)
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def __describe_trusted_advisor_check_result__(self, regional_client):
logger.info("TrustedAdvisor - Describing Check Result...")
try:
for check in self.checks:
if check.region == regional_client.region:
response = regional_client.describe_trusted_advisor_check_result(
checkId=check.id
)
if "result" in response:
check.status = response["result"]["status"]
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
class Check(BaseModel):
id: str
name: str
status: Optional[str]
region: str

View File

@@ -0,0 +1,81 @@
from unittest.mock import patch
import botocore
from boto3 import session
from moto import mock_support
from providers.aws.lib.audit_info.models import AWS_Audit_Info
from providers.aws.services.trustedadvisor.trustedadvisor_service import TrustedAdvisor
AWS_ACCOUNT_NUMBER = 123456789012
AWS_REGION = "us-east-1"
make_api_call = botocore.client.BaseClient._make_api_call
def mock_make_api_call(self, operation_name, kwarg):
if operation_name == "DescribeTrustedAdvisorCheckResult":
return {}
return make_api_call(self, operation_name, kwarg)
def mock_generate_regional_clients(service, audit_info):
regional_client = audit_info.audit_session.client(service, region_name=AWS_REGION)
regional_client.region = AWS_REGION
return {AWS_REGION: regional_client}
@patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call)
@patch(
"providers.aws.services.trustedadvisor.trustedadvisor_service.generate_regional_clients",
new=mock_generate_regional_clients,
)
class Test_TrustedAdvisor_Service:
# Mocked Audit Info
def set_mocked_audit_info(self):
audit_info = AWS_Audit_Info(
original_session=None,
audit_session=session.Session(
profile_name=None,
botocore_session=None,
),
audited_account=AWS_ACCOUNT_NUMBER,
audited_user_id=None,
audited_partition="aws",
audited_identity_arn=None,
profile=None,
profile_region=None,
credentials=None,
assumed_role_info=None,
audited_regions=None,
organizations_metadata=None,
)
return audit_info
# Test TrustedAdvisor Service
def test_service(self):
audit_info = self.set_mocked_audit_info()
trustedadvisor = TrustedAdvisor(audit_info)
assert trustedadvisor.service == "support"
# Test TrustedAdvisor client
def test_client(self):
audit_info = self.set_mocked_audit_info()
trustedadvisor = TrustedAdvisor(audit_info)
for reg_client in trustedadvisor.regional_clients.values():
assert reg_client.__class__.__name__ == "Support"
# Test TrustedAdvisor session
def test__get_session__(self):
audit_info = self.set_mocked_audit_info()
trustedadvisor = TrustedAdvisor(audit_info)
assert trustedadvisor.session.__class__.__name__ == "Session"
@mock_support
# Test TrustedAdvisor session
def test__describe_trusted_advisor_checks__(self):
audit_info = self.set_mocked_audit_info()
trustedadvisor = TrustedAdvisor(audit_info)
assert len(trustedadvisor.checks) == 104 # Default checks
assert trustedadvisor.checks[0].region == AWS_REGION