From 227306c572a272d13248833c789d543f9329d6a3 Mon Sep 17 00:00:00 2001 From: Pepe Fagoaga Date: Fri, 24 Feb 2023 10:12:38 +0100 Subject: [PATCH] fix(acm): Fix issues with list-certificates (#1970) --- .../providers/aws/services/acm/acm_service.py | 52 ++++--- .../acm_certificates_expiration_check_test.py | 106 +++++++++----- ...ificates_transparency_logs_enabled_test.py | 136 +++++++++--------- .../aws/services/acm/acm_service_test.py | 125 ++++++++++++---- ...trustedadvisor_errors_and_warnings_test.py | 1 + 5 files changed, 258 insertions(+), 162 deletions(-) diff --git a/prowler/providers/aws/services/acm/acm_service.py b/prowler/providers/aws/services/acm/acm_service.py index b881a425..1fcf2dbd 100644 --- a/prowler/providers/aws/services/acm/acm_service.py +++ b/prowler/providers/aws/services/acm/acm_service.py @@ -1,7 +1,9 @@ import threading -from dataclasses import dataclass +from datetime import datetime +from typing import Optional + +from pydantic import BaseModel -from prowler.config.config import timestamp_utc from prowler.lib.logger import logger from prowler.lib.scan_filters.scan_filters import is_resource_filtered from prowler.providers.aws.aws_provider import generate_regional_clients @@ -44,12 +46,26 @@ class ACM: certificate["CertificateArn"], self.audit_resources ) ): + if "NotAfter" in certificate: + # We need to get the TZ info to be able to do the math + certificate_expiration_time = ( + certificate["NotAfter"] + - datetime.now( + certificate["NotAfter"].tzinfo + if hasattr(certificate["NotAfter"], "tzinfo") + else None + ) + ).days + else: + certificate_expiration_time = 0 self.certificates.append( Certificate( - certificate["CertificateArn"], - certificate["DomainName"], - False, - regional_client.region, + arn=certificate["CertificateArn"], + name=certificate["DomainName"], + type=certificate["Type"], + expiration_days=certificate_expiration_time, + transparency_logging=False, + region=regional_client.region, ) ) except Exception as error: @@ -65,13 +81,6 @@ class ACM: response = regional_client.describe_certificate( CertificateArn=certificate.arn )["Certificate"] - certificate.type = response["Type"] - if "NotAfter" in response: - certificate.expiration_days = ( - response["NotAfter"] - timestamp_utc - ).days - else: - certificate.expiration_days = 0 if ( response["Options"]["CertificateTransparencyLoggingPreference"] == "ENABLED" @@ -83,23 +92,10 @@ class ACM: ) -@dataclass -class Certificate: +class Certificate(BaseModel): arn: str name: str type: str expiration_days: int - transparency_logging: bool + transparency_logging: Optional[bool] region: str - - def __init__( - self, - arn, - name, - transparency_logging, - region, - ): - self.arn = arn - self.name = name - self.transparency_logging = transparency_logging - self.region = region diff --git a/tests/providers/aws/services/acm/acm_certificates_expiration_check/acm_certificates_expiration_check_test.py b/tests/providers/aws/services/acm/acm_certificates_expiration_check/acm_certificates_expiration_check_test.py index f5904d47..03d4b1d2 100644 --- a/tests/providers/aws/services/acm/acm_certificates_expiration_check/acm_certificates_expiration_check_test.py +++ b/tests/providers/aws/services/acm/acm_certificates_expiration_check/acm_certificates_expiration_check_test.py @@ -1,59 +1,92 @@ +import uuid from unittest import mock -from boto3 import client -from moto import mock_acm +from prowler.providers.aws.services.acm.acm_service import Certificate AWS_REGION = "us-east-1" +AWS_ACCOUNT_NUMBER = 123456789012 +DAYS_TO_EXPIRE_THRESHOLD = 7 class Test_acm_certificates_expiration_check: - @mock_acm - def test_acm_certificate_expirated(self): - # Generate ACM Client - acm_client = client("acm", region_name=AWS_REGION) - # Request ACM certificate - certificate = acm_client.request_certificate( - DomainName="test.com", - ) - from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info - from prowler.providers.aws.services.acm.acm_service import ACM - - current_audit_info.audited_partition = "aws" + def test_no_acm_certificates(self): + acm_client = mock.MagicMock + acm_client.certificates = [] with mock.patch( - "prowler.providers.aws.services.acm.acm_certificates_expiration_check.acm_certificates_expiration_check.acm_client", - new=ACM(current_audit_info), - ) as service_client: + "prowler.providers.aws.services.acm.acm_service.ACM", + new=acm_client, + ): + # Test Check + from prowler.providers.aws.services.acm.acm_certificates_transparency_logs_enabled.acm_certificates_transparency_logs_enabled import ( + acm_certificates_transparency_logs_enabled, + ) + + check = acm_certificates_transparency_logs_enabled() + result = check.execute() + + assert len(result) == 0 + + def test_acm_certificate_expirated(self): + certificate_arn = f"arn:aws:acm:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:certificate/{str(uuid.uuid4())}" + certificate_name = "test-certificate.com" + certificate_type = "AMAZON_ISSUED" + + acm_client = mock.MagicMock + acm_client.certificates = [ + Certificate( + arn=certificate_arn, + name=certificate_name, + type=certificate_type, + expiration_days=5, + transparency_logging=True, + region=AWS_REGION, + ) + ] + + with mock.patch( + "prowler.providers.aws.services.acm.acm_service.ACM", + new=acm_client, + ): # Test Check from prowler.providers.aws.services.acm.acm_certificates_expiration_check.acm_certificates_expiration_check import ( acm_certificates_expiration_check, ) - service_client.certificates[0].expiration_days = 5 check = acm_certificates_expiration_check() result = check.execute() assert len(result) == 1 assert result[0].status == "FAIL" - assert result[0].resource_id == "test.com" - assert result[0].resource_arn == certificate["CertificateArn"] + assert ( + result[0].status_extended + == f"ACM Certificate for {certificate_name} is about to expire in {DAYS_TO_EXPIRE_THRESHOLD} days." + ) + assert result[0].resource_id == certificate_name + assert result[0].resource_arn == certificate_arn + assert result[0].region == AWS_REGION - @mock_acm def test_acm_certificate_not_expirated(self): - # Generate ACM Client - acm_client = client("acm", region_name=AWS_REGION) - # Request ACM certificate - certificate = acm_client.request_certificate( - DomainName="test.com", - ) - from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info - from prowler.providers.aws.services.acm.acm_service import ACM + certificate_arn = f"arn:aws:acm:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:certificate/{str(uuid.uuid4())}" + certificate_name = "test-certificate.com" + certificate_type = "AMAZON_ISSUED" + expiration_days = 365 - current_audit_info.audited_partition = "aws" + acm_client = mock.MagicMock + acm_client.certificates = [ + Certificate( + arn=certificate_arn, + name=certificate_name, + type=certificate_type, + expiration_days=expiration_days, + transparency_logging=True, + region=AWS_REGION, + ) + ] with mock.patch( - "prowler.providers.aws.services.acm.acm_certificates_expiration_check.acm_certificates_expiration_check.acm_client", - new=ACM(current_audit_info), + "prowler.providers.aws.services.acm.acm_service.ACM", + new=acm_client, ): # Test Check from prowler.providers.aws.services.acm.acm_certificates_expiration_check.acm_certificates_expiration_check import ( @@ -65,5 +98,10 @@ class Test_acm_certificates_expiration_check: assert len(result) == 1 assert result[0].status == "PASS" - assert result[0].resource_id == "test.com" - assert result[0].resource_arn == certificate["CertificateArn"] + assert ( + result[0].status_extended + == f"ACM Certificate for {certificate_name} expires in {expiration_days} days." + ) + assert result[0].resource_id == certificate_name + assert result[0].resource_arn == certificate_arn + assert result[0].region == AWS_REGION diff --git a/tests/providers/aws/services/acm/acm_certificates_transparency_logs_enabled/acm_certificates_transparency_logs_enabled_test.py b/tests/providers/aws/services/acm/acm_certificates_transparency_logs_enabled/acm_certificates_transparency_logs_enabled_test.py index 55310729..fa3c43af 100644 --- a/tests/providers/aws/services/acm/acm_certificates_transparency_logs_enabled/acm_certificates_transparency_logs_enabled_test.py +++ b/tests/providers/aws/services/acm/acm_certificates_transparency_logs_enabled/acm_certificates_transparency_logs_enabled_test.py @@ -1,29 +1,51 @@ +import uuid from unittest import mock -from boto3 import client -from moto import mock_acm +from prowler.providers.aws.services.acm.acm_service import Certificate AWS_REGION = "us-east-1" +AWS_ACCOUNT_NUMBER = 123456789012 class Test_acm_certificates_transparency_logs_enabled: - @mock_acm - def test_acm_certificate_with_logging(self): - # Generate ACM Client - acm_client = client("acm", region_name=AWS_REGION) - # Request ACM certificate - certificate = acm_client.request_certificate( - DomainName="test.com", - Options={"CertificateTransparencyLoggingPreference": "ENABLED"}, - ) - from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info - from prowler.providers.aws.services.acm.acm_service import ACM - - current_audit_info.audited_partition = "aws" + def test_no_acm_certificates(self): + acm_client = mock.MagicMock + acm_client.certificates = [] with mock.patch( - "prowler.providers.aws.services.acm.acm_certificates_transparency_logs_enabled.acm_certificates_transparency_logs_enabled.acm_client", - new=ACM(current_audit_info), + "prowler.providers.aws.services.acm.acm_service.ACM", + new=acm_client, + ): + # Test Check + from prowler.providers.aws.services.acm.acm_certificates_transparency_logs_enabled.acm_certificates_transparency_logs_enabled import ( + acm_certificates_transparency_logs_enabled, + ) + + check = acm_certificates_transparency_logs_enabled() + result = check.execute() + + assert len(result) == 0 + + def test_acm_certificate_with_logging(self): + certificate_arn = f"arn:aws:acm:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:certificate/{str(uuid.uuid4())}" + certificate_name = "test-certificate.com" + certificate_type = "AMAZON_ISSUED" + + acm_client = mock.MagicMock + acm_client.certificates = [ + Certificate( + arn=certificate_arn, + name=certificate_name, + type=certificate_type, + expiration_days=365, + transparency_logging=True, + region=AWS_REGION, + ) + ] + + with mock.patch( + "prowler.providers.aws.services.acm.acm_service.ACM", + new=acm_client, ): # Test Check from prowler.providers.aws.services.acm.acm_certificates_transparency_logs_enabled.acm_certificates_transparency_logs_enabled import ( @@ -37,35 +59,38 @@ class Test_acm_certificates_transparency_logs_enabled: assert result[0].status == "PASS" assert ( result[0].status_extended - == "ACM Certificate for test.com has Certificate Transparency logging enabled." + == f"ACM Certificate for {certificate_name} has Certificate Transparency logging enabled." ) - assert result[0].resource_id == "test.com" - assert result[0].resource_arn == certificate["CertificateArn"] + assert result[0].resource_id == certificate_name + assert result[0].resource_arn == certificate_arn + assert result[0].region == AWS_REGION - @mock_acm def test_acm_certificate_without_logging(self): - # Generate ACM Client - acm_client = client("acm", region_name=AWS_REGION) - # Request ACM certificate - certificate = acm_client.request_certificate( - DomainName="test.com", - Options={"CertificateTransparencyLoggingPreference": "ENABLED"}, - ) - from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info - from prowler.providers.aws.services.acm.acm_service import ACM + certificate_arn = f"arn:aws:acm:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:certificate/{str(uuid.uuid4())}" + certificate_name = "test-certificate.com" + certificate_type = "AMAZON_ISSUED" - current_audit_info.audited_partition = "aws" + acm_client = mock.MagicMock + acm_client.certificates = [ + Certificate( + arn=certificate_arn, + name=certificate_name, + type=certificate_type, + expiration_days=365, + transparency_logging=False, + region=AWS_REGION, + ) + ] with mock.patch( - "prowler.providers.aws.services.acm.acm_certificates_transparency_logs_enabled.acm_certificates_transparency_logs_enabled.acm_client", - new=ACM(current_audit_info), - ) as service_client: + "prowler.providers.aws.services.acm.acm_service.ACM", + new=acm_client, + ): # Test Check from prowler.providers.aws.services.acm.acm_certificates_transparency_logs_enabled.acm_certificates_transparency_logs_enabled import ( acm_certificates_transparency_logs_enabled, ) - service_client.certificates[0].transparency_logging = False check = acm_certificates_transparency_logs_enabled() result = check.execute() @@ -73,41 +98,8 @@ class Test_acm_certificates_transparency_logs_enabled: assert result[0].status == "FAIL" assert ( result[0].status_extended - == "ACM Certificate for test.com has Certificate Transparency logging disabled." + == f"ACM Certificate for {certificate_name} has Certificate Transparency logging disabled." ) - assert result[0].resource_id == "test.com" - assert result[0].resource_arn == certificate["CertificateArn"] - - @mock_acm - def test_acm_default_certificate(self): - # Generate ACM Client - acm_client = client("acm", region_name=AWS_REGION) - # Request ACM certificate - certificate = acm_client.request_certificate( - DomainName="test.com", - ) - from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info - from prowler.providers.aws.services.acm.acm_service import ACM - - current_audit_info.audited_partition = "aws" - - with mock.patch( - "prowler.providers.aws.services.acm.acm_certificates_transparency_logs_enabled.acm_certificates_transparency_logs_enabled.acm_client", - new=ACM(current_audit_info), - ): - # Test Check - from prowler.providers.aws.services.acm.acm_certificates_transparency_logs_enabled.acm_certificates_transparency_logs_enabled import ( - acm_certificates_transparency_logs_enabled, - ) - - check = acm_certificates_transparency_logs_enabled() - result = check.execute() - - assert len(result) == 1 - assert result[0].status == "PASS" - assert ( - result[0].status_extended - == "ACM Certificate for test.com has Certificate Transparency logging enabled." - ) - assert result[0].resource_id == "test.com" - assert result[0].resource_arn == certificate["CertificateArn"] + assert result[0].resource_id == certificate_name + assert result[0].resource_arn == certificate_arn + assert result[0].region == AWS_REGION diff --git a/tests/providers/aws/services/acm/acm_service_test.py b/tests/providers/aws/services/acm/acm_service_test.py index 6232ec9e..dbf313ef 100644 --- a/tests/providers/aws/services/acm/acm_service_test.py +++ b/tests/providers/aws/services/acm/acm_service_test.py @@ -1,13 +1,91 @@ -from boto3 import client, session -from moto import mock_acm +import uuid +from datetime import datetime + +import botocore +from boto3 import session +from freezegun import freeze_time +from mock import patch from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info from prowler.providers.aws.services.acm.acm_service import ACM +# from moto import mock_acm + + AWS_ACCOUNT_NUMBER = 123456789012 AWS_REGION = "us-east-1" +# Mocking Access Analyzer Calls +make_api_call = botocore.client.BaseClient._make_api_call +certificate_arn = ( + f"arn:aws:acm:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:certificate/{str(uuid.uuid4())}" +) +certificate_name = "test-certificate.com" +certificate_type = "AMAZON_ISSUED" + + +def mock_make_api_call(self, operation_name, kwargs): + """ + As you can see the operation_name has the list_analyzers snake_case form but + we are using the ListAnalyzers form. + Rationale -> https://github.com/boto/botocore/blob/develop/botocore/client.py#L810:L816 + + We have to mock every AWS API call using Boto3 + """ + if operation_name == "ListCertificates": + return { + "CertificateSummaryList": [ + { + "CertificateArn": certificate_arn, + "DomainName": certificate_name, + "SubjectAlternativeNameSummaries": [ + "test-certificate-2.com", + ], + "HasAdditionalSubjectAlternativeNames": False, + "Status": "ISSUED", + "Type": certificate_type, + "KeyAlgorithm": "RSA_4096", + "KeyUsages": ["DIGITAL_SIGNATURE"], + "ExtendedKeyUsages": ["TLS_WEB_SERVER_AUTHENTICATION"], + "InUse": True, + "Exported": False, + "RenewalEligibility": "ELIGIBLE", + "NotBefore": datetime(2024, 1, 1), + "NotAfter": datetime(2024, 1, 1), + "CreatedAt": datetime(2024, 1, 1), + "IssuedAt": datetime(2024, 1, 1), + "ImportedAt": datetime(2024, 1, 1), + "RevokedAt": datetime(2024, 1, 1), + } + ] + } + if operation_name == "DescribeCertificate": + if kwargs["CertificateArn"] == certificate_arn: + return { + "Certificate": { + "Options": {"CertificateTransparencyLoggingPreference": "DISABLED"}, + } + } + return make_api_call(self, operation_name, kwargs) + + +# Mock generate_regional_clients() +def mock_generate_regional_clients(service, audit_info): + regional_client = audit_info.audit_session.client(service, region_name=AWS_REGION) + regional_client.region = AWS_REGION + return {AWS_REGION: regional_client} + + +# Patch every AWS call using Boto3 and generate_regional_clients to have 1 client +@patch( + "prowler.providers.aws.services.acm.acm_service.generate_regional_clients", + new=mock_generate_regional_clients, +) +@patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call) +# Freeze time +@freeze_time("2023-01-01") +# FIXME: Pending Moto PR to update ACM responses class Test_ACM_Service: # Mocked Audit Info def set_mocked_audit_info(self): @@ -33,7 +111,7 @@ class Test_ACM_Service: return audit_info # Test ACM Service - @mock_acm + # @mock_acm def test_service(self): # ACM client for this test class audit_info = self.set_mocked_audit_info() @@ -41,7 +119,7 @@ class Test_ACM_Service: assert acm.service == "acm" # Test ACM Client - @mock_acm + # @mock_acm def test_client(self): # ACM client for this test class audit_info = self.set_mocked_audit_info() @@ -50,7 +128,7 @@ class Test_ACM_Service: assert regional_client.__class__.__name__ == "ACM" # Test ACM Session - @mock_acm + # @mock_acm def test__get_session__(self): # ACM client for this test class audit_info = self.set_mocked_audit_info() @@ -58,7 +136,7 @@ class Test_ACM_Service: assert acm.session.__class__.__name__ == "Session" # Test ACM Session - @mock_acm + # @mock_acm def test_audited_account(self): # ACM client for this test class audit_info = self.set_mocked_audit_info() @@ -66,31 +144,22 @@ class Test_ACM_Service: assert acm.audited_account == AWS_ACCOUNT_NUMBER # Test ACM List Certificates - @mock_acm - def test__list_certificates__(self): + # @mock_acm + def test__list_and_describe_certificates__(self): # Generate ACM Client - acm_client = client("acm", region_name=AWS_REGION) + # acm_client = client("acm", region_name=AWS_REGION) # Request ACM certificate - certificate = acm_client.request_certificate( - DomainName="test.com", - ) + # certificate = acm_client.request_certificate( + # DomainName="test.com", + # ) + # ACM client for this test class audit_info = self.set_mocked_audit_info() acm = ACM(audit_info) assert len(acm.certificates) == 1 - assert acm.certificates[0].arn == certificate["CertificateArn"] - - # Test ACM Describe Certificates - @mock_acm - def test__describe_certificates__(self): - # Generate ACM Client - acm_client = client("acm", region_name=AWS_REGION) - # Request ACM certificate - certificate = acm_client.request_certificate( - DomainName="test.com", - ) - # ACM client for this test class - audit_info = self.set_mocked_audit_info() - acm = ACM(audit_info) - assert acm.certificates[0].type == "AMAZON_ISSUED" - assert acm.certificates[0].arn == certificate["CertificateArn"] + assert acm.certificates[0].arn == certificate_arn + assert acm.certificates[0].name == certificate_name + assert acm.certificates[0].type == certificate_type + assert acm.certificates[0].expiration_days == 365 + assert acm.certificates[0].transparency_logging is False + assert acm.certificates[0].region == AWS_REGION diff --git a/tests/providers/aws/services/trustedadvisor/trustedadvisor_errors_and_warnings/trustedadvisor_errors_and_warnings_test.py b/tests/providers/aws/services/trustedadvisor/trustedadvisor_errors_and_warnings/trustedadvisor_errors_and_warnings_test.py index 70147bca..a9c3fd18 100644 --- a/tests/providers/aws/services/trustedadvisor/trustedadvisor_errors_and_warnings/trustedadvisor_errors_and_warnings_test.py +++ b/tests/providers/aws/services/trustedadvisor/trustedadvisor_errors_and_warnings/trustedadvisor_errors_and_warnings_test.py @@ -16,6 +16,7 @@ class Test_trustedadvisor_errors_and_warnings: trustedadvisor_client.checks = [] trustedadvisor_client.enabled = False trustedadvisor_client.account = AWS_ACCOUNT_NUMBER + trustedadvisor_client.region = AWS_REGION with mock.patch( "prowler.providers.aws.services.trustedadvisor.trustedadvisor_service.TrustedAdvisor", trustedadvisor_client,