diff --git a/prowler/providers/aws/services/athena/__init__.py b/prowler/providers/aws/services/athena/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/prowler/providers/aws/services/athena/athena_client.py b/prowler/providers/aws/services/athena/athena_client.py new file mode 100644 index 00000000..23d3d4ad --- /dev/null +++ b/prowler/providers/aws/services/athena/athena_client.py @@ -0,0 +1,4 @@ +from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info +from prowler.providers.aws.services.athena.athena_service import Athena + +athena_client = Athena(current_audit_info) diff --git a/prowler/providers/aws/services/athena/athena_service.py b/prowler/providers/aws/services/athena/athena_service.py new file mode 100644 index 00000000..1092e0d3 --- /dev/null +++ b/prowler/providers/aws/services/athena/athena_service.py @@ -0,0 +1,109 @@ +from typing import Optional + +from pydantic import BaseModel + +from prowler.lib.logger import logger +from prowler.lib.scan_filters.scan_filters import is_resource_filtered +from prowler.providers.aws.lib.service.service import AWSService + + +################## Athena +class Athena(AWSService): + def __init__(self, audit_info): + # Call AWSService's __init__ + super().__init__(__class__.__name__, audit_info) + self.workgroups = {} + self.__threading_call__(self.__list_workgroups__) + self.__get_workgroups__() + self.__list_tags_for_resource__() + + def __list_workgroups__(self, regional_client): + logger.info("Athena - Listing WorkGroups...") + try: + list_workgroups = regional_client.list_work_groups() + for workgroup in list_workgroups["WorkGroups"]: + workgroup_name = workgroup["Name"] + workgroup_arn = f"arn:{self.audited_partition}:athena:{regional_client.region}:{self.audited_account}:workgroup/{workgroup_name}" + if not self.audit_resources or ( + is_resource_filtered(workgroup_arn, self.audit_resources) + ): + self.workgroups[workgroup_arn] = WorkGroup( + arn=workgroup_arn, + name=workgroup_name, + region=regional_client.region, + ) + + except Exception as error: + logger.error( + f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + ) + + def __get_workgroups__(self): + logger.info("Athena - Getting WorkGroups...") + try: + for workgroup in self.workgroups.values(): + wg = self.regional_clients[workgroup.region].get_work_group( + WorkGroup=workgroup.name + ) + + wg_configuration = wg.get("WorkGroup").get("Configuration") + self.workgroups[ + workgroup.arn + ].enforce_workgroup_configuration = wg_configuration.get( + "EnforceWorkGroupConfiguration", False + ) + + # We include an empty EncryptionConfiguration to handle if the workgroup does not have encryption configured + encryption = ( + wg_configuration.get( + "ResultConfiguration", + {"EncryptionConfiguration": {}}, + ) + .get( + "EncryptionConfiguration", + {"EncryptionOption": ""}, + ) + .get("EncryptionOption") + ) + + if encryption in ["SSE_S3", "SSE_KMS", "CSE_KMS"]: + encryption_configuration = EncryptionConfiguration( + encryption_option=encryption, encrypted=True + ) + self.workgroups[ + workgroup.arn + ].encryption_configuration = encryption_configuration + + except Exception as error: + logger.error( + f"{self.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + ) + + def __list_tags_for_resource__(self): + logger.info("Athena - Listing Tags...") + try: + for workgroup in self.workgroups.values(): + regional_client = self.regional_clients[workgroup.region] + workgroup.tags = regional_client.list_tags_for_resource( + ResourceARN=workgroup.arn + )["Tags"] + except Exception as error: + logger.error( + f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + ) + + +class EncryptionConfiguration(BaseModel): + encryption_option: str + encrypted: bool + + +class WorkGroup(BaseModel): + arn: str + name: str + encryption_configuration: EncryptionConfiguration = EncryptionConfiguration( + encryption_option="", encrypted=False + ) + enforce_workgroup_configuration: bool = False + region: str + tags: Optional[list] = [] diff --git a/prowler/providers/aws/services/athena/athena_workgroup_encryption/__init__.py b/prowler/providers/aws/services/athena/athena_workgroup_encryption/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/prowler/providers/aws/services/athena/athena_workgroup_encryption/athena_workgroup_encryption.metadata.json b/prowler/providers/aws/services/athena/athena_workgroup_encryption/athena_workgroup_encryption.metadata.json new file mode 100644 index 00000000..105a1085 --- /dev/null +++ b/prowler/providers/aws/services/athena/athena_workgroup_encryption/athena_workgroup_encryption.metadata.json @@ -0,0 +1,34 @@ +{ + "Provider": "aws", + "CheckID": "athena_workgroup_encryption", + "CheckTitle": "Ensure that encryption at rest is enabled for Amazon Athena query results stored in Amazon S3 in order to secure data and meet compliance requirements for data-at-rest encryption.", + "CheckType": [ + "Software and Configuration Checks" + ], + "ServiceName": "athena", + "SubServiceName": "", + "ResourceIdTemplate": "arn:partition:athena:region:account-id:workgroup/resource-id", + "Severity": "high", + "ResourceType": "WorkGroup", + "Description": "Ensure that encryption at rest is enabled for Amazon Athena query results stored in Amazon S3 in order to secure data and meet compliance requirements for data-at-rest encryption.", + "Risk": "If not enabled sensitive information at rest is not protected.", + "RelatedUrl": "https://docs.aws.amazon.com/athena/latest/ug/encryption.html", + "Remediation": { + "Code": { + "CLI": "aws athena update-work-group --region --work-group --configuration-updates ResultConfigurationUpdates={EncryptionConfiguration={EncryptionOption=SSE_S3|SSE_KMS|CSE_KMS}}", + "NativeIaC": "", + "Other": "https://www.trendmicro.com/cloudoneconformity/knowledge-base/aws/Athena/encryption-enabled.html", + "Terraform": "https://docs.bridgecrew.io/docs/ensure-that-athena-workgroup-is-encrypted#terraform" + }, + "Recommendation": { + "Text": "Enable Encryption. Use a CMK where possible. It will provide additional management and privacy benefits.", + "Url": "https://docs.aws.amazon.com/athena/latest/ug/encrypting-query-results-stored-in-s3.html" + } + }, + "Categories": [ + "encryption" + ], + "DependsOn": [], + "RelatedTo": [], + "Notes": "" +} diff --git a/prowler/providers/aws/services/athena/athena_workgroup_encryption/athena_workgroup_encryption.py b/prowler/providers/aws/services/athena/athena_workgroup_encryption/athena_workgroup_encryption.py new file mode 100644 index 00000000..7bf98ced --- /dev/null +++ b/prowler/providers/aws/services/athena/athena_workgroup_encryption/athena_workgroup_encryption.py @@ -0,0 +1,27 @@ +from prowler.lib.check.models import Check, Check_Report_AWS +from prowler.providers.aws.services.athena.athena_client import athena_client + + +class athena_workgroup_encryption(Check): + """Check if there are Athena workgroups not encrypting query results""" + + def execute(self): + """Execute the athena_workgroup_encryption check""" + findings = [] + for workgroup in athena_client.workgroups.values(): + report = Check_Report_AWS(self.metadata()) + report.region = workgroup.region + report.resource_id = workgroup.name + report.resource_arn = workgroup.arn + report.resource_tags = workgroup.tags + + if workgroup.encryption_configuration.encrypted: + report.status = "PASS" + report.status_extended = f"Athena WorkGroup {workgroup.name} encrypts the query results using {workgroup.encryption_configuration.encryption_option}." + else: + report.status = "FAIL" + report.status_extended = f"Athena WorkGroup {workgroup.name} does not encrypt the query results." + + findings.append(report) + + return findings diff --git a/prowler/providers/aws/services/athena/athena_workgroup_enforce_configuration/__init__.py b/prowler/providers/aws/services/athena/athena_workgroup_enforce_configuration/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/prowler/providers/aws/services/athena/athena_workgroup_enforce_configuration/athena_workgroup_enforce_configuration.metadata.json b/prowler/providers/aws/services/athena/athena_workgroup_enforce_configuration/athena_workgroup_enforce_configuration.metadata.json new file mode 100644 index 00000000..e09e0b56 --- /dev/null +++ b/prowler/providers/aws/services/athena/athena_workgroup_enforce_configuration/athena_workgroup_enforce_configuration.metadata.json @@ -0,0 +1,32 @@ +{ + "Provider": "aws", + "CheckID": "athena_workgroup_enforce_configuration", + "CheckTitle": "Ensure that workgroup configuration is enforced so it cannot be overriden by client-side settings.", + "CheckType": [ + "Software and Configuration Checks" + ], + "ServiceName": "athena", + "SubServiceName": "", + "ResourceIdTemplate": "arn:partition:athena:region:account-id:workgroup/resource-id", + "Severity": "medium", + "ResourceType": "WorkGroup", + "Description": "Ensure that workgroup configuration is enforced so it cannot be overriden by client-side settings.", + "Risk": "If workgroup configuration is not enforced security settings like encryption can be overriden by client-side settings.", + "RelatedUrl": "https://docs.aws.amazon.com/athena/latest/ug/workgroups-settings-override.html", + "Remediation": { + "Code": { + "CLI": "aws athena update-work-group --region --work-group --configuration-updates EnforceWorkGroupConfiguration=True", + "NativeIaC": "https://docs.bridgecrew.io/docs/bc_aws_general_33#cloudformation", + "Other": "", + "Terraform": "https://docs.bridgecrew.io/docs/bc_aws_general_33#terraform" + }, + "Recommendation": { + "Text": "Ensure that workgroup configuration is enforced so it cannot be overriden by client-side settings.", + "Url": "https://docs.aws.amazon.com/athena/latest/ug/workgroups-settings-override.html" + } + }, + "Categories": [], + "DependsOn": [], + "RelatedTo": [], + "Notes": "" +} \ No newline at end of file diff --git a/prowler/providers/aws/services/athena/athena_workgroup_enforce_configuration/athena_workgroup_enforce_configuration.py b/prowler/providers/aws/services/athena/athena_workgroup_enforce_configuration/athena_workgroup_enforce_configuration.py new file mode 100644 index 00000000..42fd7011 --- /dev/null +++ b/prowler/providers/aws/services/athena/athena_workgroup_enforce_configuration/athena_workgroup_enforce_configuration.py @@ -0,0 +1,27 @@ +from prowler.lib.check.models import Check, Check_Report_AWS +from prowler.providers.aws.services.athena.athena_client import athena_client + + +class athena_workgroup_enforce_configuration(Check): + """Check if there are Athena workgroups not encrypting query results""" + + def execute(self): + """Execute the athena_workgroup_enforce_configuration check""" + findings = [] + for workgroup in athena_client.workgroups.values(): + report = Check_Report_AWS(self.metadata()) + report.region = workgroup.region + report.resource_id = workgroup.name + report.resource_arn = workgroup.arn + report.resource_tags = workgroup.tags + + if workgroup.enforce_workgroup_configuration: + report.status = "PASS" + report.status_extended = f"Athena WorkGroup {workgroup.name} enforces the workgroup configuration, so it cannot be overridden by the client-side settings." + else: + report.status = "FAIL" + report.status_extended = f"Athena WorkGroup {workgroup.name} does not enforce the workgroup configuration, so it can be overridden by the client-side settings." + + findings.append(report) + + return findings diff --git a/tests/providers/aws/services/athena/athena_service_test.py b/tests/providers/aws/services/athena/athena_service_test.py new file mode 100644 index 00000000..d241d7e0 --- /dev/null +++ b/tests/providers/aws/services/athena/athena_service_test.py @@ -0,0 +1,147 @@ +from boto3 import session +from botocore.client import BaseClient +from mock import patch +from moto import mock_athena + +from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info +from prowler.providers.aws.services.athena.athena_service import Athena +from prowler.providers.common.models import Audit_Metadata + +AWS_ACCOUNT_NUMBER = "123456789012" +AWS_REGION = "eu-west-1" + + +# Mocking Access Analyzer Calls +make_api_call = BaseClient._make_api_call + + +def mock_make_api_call(self, operation_name, kwarg): + """ + Mock every AWS API call using Boto3 + + As you can see the operation_name has the get_work_group snake_case form but + we are using the GetWorkGroup form. + Rationale -> https://github.com/boto/botocore/blob/develop/botocore/client.py#L810:L816 + """ + if operation_name == "GetWorkGroup": + return { + "WorkGroup": { + "Name": "primary", + "State": "ENABLED", + "Configuration": { + "ResultConfiguration": { + "EncryptionConfiguration": { + "EncryptionOption": "SSE_S3", + }, + }, + "EnforceWorkGroupConfiguration": True, + }, + } + } + return make_api_call(self, operation_name, kwarg) + + +# Mock generate_regional_clients() +def mock_generate_regional_clients(service, audit_info, _): + regional_client = audit_info.audit_session.client(service, region_name=AWS_REGION) + regional_client.region = AWS_REGION + return {AWS_REGION: regional_client} + + +# Patch every AWS call using Boto3 and generate_regional_clients to have 1 client +@patch( + "prowler.providers.aws.lib.service.service.generate_regional_clients", + new=mock_generate_regional_clients, +) +class Test_Athena_Service: + # Mocked Audit Info + def set_mocked_audit_info(self): + audit_info = AWS_Audit_Info( + session_config=None, + original_session=None, + audit_session=session.Session( + profile_name=None, + botocore_session=None, + ), + audited_account=AWS_ACCOUNT_NUMBER, + audited_account_arn=f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:root", + audited_user_id=None, + audited_partition="aws", + audited_identity_arn=None, + profile=None, + profile_region=None, + credentials=None, + assumed_role_info=None, + audited_regions=None, + organizations_metadata=None, + audit_resources=None, + mfa_enabled=False, + audit_metadata=Audit_Metadata( + services_scanned=0, + expected_checks=[], + completed_checks=0, + audit_progress=0, + ), + ) + return audit_info + + # Test Athena Get Workgrups + @mock_athena + def test__get_workgroups__not_encrypted(self): + default_workgroup_name = "primary" + audit_info = self.set_mocked_audit_info() + workgroup_arn = f"arn:{audit_info.audited_partition}:athena:{AWS_REGION}:{audit_info.audited_account}:workgroup/{default_workgroup_name}" + athena = Athena(audit_info) + assert len(athena.workgroups) == 1 + assert athena.workgroups[workgroup_arn] + assert athena.workgroups[workgroup_arn].arn == workgroup_arn + assert athena.workgroups[workgroup_arn].name == default_workgroup_name + assert athena.workgroups[workgroup_arn].region == AWS_REGION + assert athena.workgroups[workgroup_arn].tags == [] + assert ( + athena.workgroups[workgroup_arn].encryption_configuration.encrypted is False + ) + assert ( + athena.workgroups[workgroup_arn].encryption_configuration.encryption_option + == "" + ) + assert athena.workgroups[workgroup_arn].enforce_workgroup_configuration is False + + # Test Athena Get Workgrups + # We mock the get_work_group to return an encrypted workgroup + @patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call) + @mock_athena + def test__get_workgroups__encrypted(self): + default_workgroup_name = "primary" + audit_info = self.set_mocked_audit_info() + + # Athena client + # This API call is not implemented by Moto + # athena_client = audit_info.audit_session.client( + # "athena", region_name=AWS_REGION + # ) + # athena_client.update_work_group( + # WorkGroup=default_workgroup_name, + # ConfigurationUpdates={ + # "ResultConfigurationUpdates": { + # "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"} + # } + # }, + # ) + + workgroup_arn = f"arn:{audit_info.audited_partition}:athena:{AWS_REGION}:{audit_info.audited_account}:workgroup/{default_workgroup_name}" + athena = Athena(audit_info) + assert len(athena.workgroups) == 1 + assert athena.workgroups[workgroup_arn] + assert athena.workgroups[workgroup_arn].arn == workgroup_arn + assert athena.workgroups[workgroup_arn].name == default_workgroup_name + assert athena.workgroups[workgroup_arn].region == AWS_REGION + assert athena.workgroups[workgroup_arn].tags == [] + assert ( + athena.workgroups[workgroup_arn].encryption_configuration.encrypted is True + ) + assert ( + athena.workgroups[workgroup_arn].encryption_configuration.encryption_option + == "SSE_S3" + ) + assert athena.workgroups[workgroup_arn].enforce_workgroup_configuration is True diff --git a/tests/providers/aws/services/athena/athena_workgroup_encryption/athena_workgroup_encryption_test.py b/tests/providers/aws/services/athena/athena_workgroup_encryption/athena_workgroup_encryption_test.py new file mode 100644 index 00000000..0200f73e --- /dev/null +++ b/tests/providers/aws/services/athena/athena_workgroup_encryption/athena_workgroup_encryption_test.py @@ -0,0 +1,111 @@ +from unittest import mock + +from boto3 import session +from mock import patch +from moto import mock_athena + +from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info +from prowler.providers.common.models import Audit_Metadata +from tests.providers.aws.services.athena.athena_service_test import mock_make_api_call + +AWS_REGION = "eu-west-1" +AWS_ACCOUNT_NUMBER = "123456789012" +ATHENA_PRIMARY_WORKGROUP = "primary" +ATHENA_PRIMARY_WORKGROUP_ARN = f"arn:aws:athena:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:workgroup/{ATHENA_PRIMARY_WORKGROUP}" + + +class Test_athena_workgroup_encryption: + def set_mocked_audit_info(self): + audit_info = AWS_Audit_Info( + session_config=None, + original_session=None, + audit_session=session.Session( + profile_name=None, + botocore_session=None, + ), + audited_account=AWS_ACCOUNT_NUMBER, + audited_account_arn=f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:root", + audited_user_id=None, + audited_partition="aws", + audited_identity_arn=None, + profile=None, + profile_region=None, + credentials=None, + assumed_role_info=None, + audited_regions=[AWS_REGION], + organizations_metadata=None, + audit_resources=None, + mfa_enabled=False, + audit_metadata=Audit_Metadata( + services_scanned=0, + expected_checks=[], + completed_checks=0, + audit_progress=0, + ), + ) + + return audit_info + + @mock_athena + def test_primary_workgroup_not_encrypted(self): + from prowler.providers.aws.services.athena.athena_service import Athena + + current_audit_info = self.set_mocked_audit_info() + + with mock.patch( + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=current_audit_info, + ), mock.patch( + "prowler.providers.aws.services.athena.athena_workgroup_encryption.athena_workgroup_encryption.athena_client", + new=Athena(current_audit_info), + ): + from prowler.providers.aws.services.athena.athena_workgroup_encryption.athena_workgroup_encryption import ( + athena_workgroup_encryption, + ) + + check = athena_workgroup_encryption() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "FAIL" + assert ( + result[0].status_extended + == f"Athena WorkGroup {ATHENA_PRIMARY_WORKGROUP} does not encrypt the query results." + ) + assert result[0].resource_id == ATHENA_PRIMARY_WORKGROUP + assert result[0].resource_arn == ATHENA_PRIMARY_WORKGROUP_ARN + assert result[0].region == AWS_REGION + assert result[0].resource_tags == [] + + @mock_athena + # We mock the get_work_group to return an encrypted workgroup + @patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call) + def test_primary_workgroup_encrypted(self): + from prowler.providers.aws.services.athena.athena_service import Athena + + current_audit_info = self.set_mocked_audit_info() + + with mock.patch( + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=current_audit_info, + ), mock.patch( + "prowler.providers.aws.services.athena.athena_workgroup_encryption.athena_workgroup_encryption.athena_client", + new=Athena(current_audit_info), + ): + from prowler.providers.aws.services.athena.athena_workgroup_encryption.athena_workgroup_encryption import ( + athena_workgroup_encryption, + ) + + check = athena_workgroup_encryption() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "PASS" + assert ( + result[0].status_extended + == f"Athena WorkGroup {ATHENA_PRIMARY_WORKGROUP} encrypts the query results using SSE_S3." + ) + assert result[0].resource_id == ATHENA_PRIMARY_WORKGROUP + assert result[0].resource_arn == ATHENA_PRIMARY_WORKGROUP_ARN + assert result[0].region == AWS_REGION + assert result[0].resource_tags == [] diff --git a/tests/providers/aws/services/athena/athena_workgroup_enforce_configuration/athena_workgroup_enforce_configuration_test.py b/tests/providers/aws/services/athena/athena_workgroup_enforce_configuration/athena_workgroup_enforce_configuration_test.py new file mode 100644 index 00000000..5868a5ca --- /dev/null +++ b/tests/providers/aws/services/athena/athena_workgroup_enforce_configuration/athena_workgroup_enforce_configuration_test.py @@ -0,0 +1,111 @@ +from unittest import mock + +from boto3 import session +from mock import patch +from moto import mock_athena + +from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info +from prowler.providers.common.models import Audit_Metadata +from tests.providers.aws.services.athena.athena_service_test import mock_make_api_call + +AWS_REGION = "eu-west-1" +AWS_ACCOUNT_NUMBER = "123456789012" +ATHENA_PRIMARY_WORKGROUP = "primary" +ATHENA_PRIMARY_WORKGROUP_ARN = f"arn:aws:athena:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:workgroup/{ATHENA_PRIMARY_WORKGROUP}" + + +class Test_athena_workgroup_enforce_configuration: + def set_mocked_audit_info(self): + audit_info = AWS_Audit_Info( + session_config=None, + original_session=None, + audit_session=session.Session( + profile_name=None, + botocore_session=None, + ), + audited_account=AWS_ACCOUNT_NUMBER, + audited_account_arn=f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:root", + audited_user_id=None, + audited_partition="aws", + audited_identity_arn=None, + profile=None, + profile_region=None, + credentials=None, + assumed_role_info=None, + audited_regions=[AWS_REGION], + organizations_metadata=None, + audit_resources=None, + mfa_enabled=False, + audit_metadata=Audit_Metadata( + services_scanned=0, + expected_checks=[], + completed_checks=0, + audit_progress=0, + ), + ) + + return audit_info + + @mock_athena + def test_primary_workgroup_configuration_not_enforced(self): + from prowler.providers.aws.services.athena.athena_service import Athena + + current_audit_info = self.set_mocked_audit_info() + + with mock.patch( + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=current_audit_info, + ), mock.patch( + "prowler.providers.aws.services.athena.athena_workgroup_enforce_configuration.athena_workgroup_enforce_configuration.athena_client", + new=Athena(current_audit_info), + ): + from prowler.providers.aws.services.athena.athena_workgroup_enforce_configuration.athena_workgroup_enforce_configuration import ( + athena_workgroup_enforce_configuration, + ) + + check = athena_workgroup_enforce_configuration() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "FAIL" + assert ( + result[0].status_extended + == f"Athena WorkGroup {ATHENA_PRIMARY_WORKGROUP} does not enforce the workgroup configuration, so it can be overridden by the client-side settings." + ) + assert result[0].resource_id == ATHENA_PRIMARY_WORKGROUP + assert result[0].resource_arn == ATHENA_PRIMARY_WORKGROUP_ARN + assert result[0].region == AWS_REGION + assert result[0].resource_tags == [] + + @mock_athena + # We mock the get_work_group to return a workgroup not enforcing configuration + @patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call) + def test_primary_workgroup_configuration_enforced(self): + from prowler.providers.aws.services.athena.athena_service import Athena + + current_audit_info = self.set_mocked_audit_info() + + with mock.patch( + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=current_audit_info, + ), mock.patch( + "prowler.providers.aws.services.athena.athena_workgroup_enforce_configuration.athena_workgroup_enforce_configuration.athena_client", + new=Athena(current_audit_info), + ): + from prowler.providers.aws.services.athena.athena_workgroup_enforce_configuration.athena_workgroup_enforce_configuration import ( + athena_workgroup_enforce_configuration, + ) + + check = athena_workgroup_enforce_configuration() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "PASS" + assert ( + result[0].status_extended + == f"Athena WorkGroup {ATHENA_PRIMARY_WORKGROUP} enforces the workgroup configuration, so it cannot be overridden by the client-side settings." + ) + assert result[0].resource_id == ATHENA_PRIMARY_WORKGROUP + assert result[0].resource_arn == ATHENA_PRIMARY_WORKGROUP_ARN + assert result[0].region == AWS_REGION + assert result[0].resource_tags == []