mirror of
https://github.com/ghndrx/prowler.git
synced 2026-02-10 14:55:00 +00:00
feat(athena): New AWS Athena service + 2 workgroup checks (#2696)
This commit is contained in:
0
prowler/providers/aws/services/athena/__init__.py
Normal file
0
prowler/providers/aws/services/athena/__init__.py
Normal file
4
prowler/providers/aws/services/athena/athena_client.py
Normal file
4
prowler/providers/aws/services/athena/athena_client.py
Normal file
@@ -0,0 +1,4 @@
|
||||
from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info
|
||||
from prowler.providers.aws.services.athena.athena_service import Athena
|
||||
|
||||
athena_client = Athena(current_audit_info)
|
||||
109
prowler/providers/aws/services/athena/athena_service.py
Normal file
109
prowler/providers/aws/services/athena/athena_service.py
Normal file
@@ -0,0 +1,109 @@
|
||||
from typing import Optional
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
from prowler.lib.logger import logger
|
||||
from prowler.lib.scan_filters.scan_filters import is_resource_filtered
|
||||
from prowler.providers.aws.lib.service.service import AWSService
|
||||
|
||||
|
||||
################## Athena
|
||||
class Athena(AWSService):
|
||||
def __init__(self, audit_info):
|
||||
# Call AWSService's __init__
|
||||
super().__init__(__class__.__name__, audit_info)
|
||||
self.workgroups = {}
|
||||
self.__threading_call__(self.__list_workgroups__)
|
||||
self.__get_workgroups__()
|
||||
self.__list_tags_for_resource__()
|
||||
|
||||
def __list_workgroups__(self, regional_client):
|
||||
logger.info("Athena - Listing WorkGroups...")
|
||||
try:
|
||||
list_workgroups = regional_client.list_work_groups()
|
||||
for workgroup in list_workgroups["WorkGroups"]:
|
||||
workgroup_name = workgroup["Name"]
|
||||
workgroup_arn = f"arn:{self.audited_partition}:athena:{regional_client.region}:{self.audited_account}:workgroup/{workgroup_name}"
|
||||
if not self.audit_resources or (
|
||||
is_resource_filtered(workgroup_arn, self.audit_resources)
|
||||
):
|
||||
self.workgroups[workgroup_arn] = WorkGroup(
|
||||
arn=workgroup_arn,
|
||||
name=workgroup_name,
|
||||
region=regional_client.region,
|
||||
)
|
||||
|
||||
except Exception as error:
|
||||
logger.error(
|
||||
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
|
||||
)
|
||||
|
||||
def __get_workgroups__(self):
|
||||
logger.info("Athena - Getting WorkGroups...")
|
||||
try:
|
||||
for workgroup in self.workgroups.values():
|
||||
wg = self.regional_clients[workgroup.region].get_work_group(
|
||||
WorkGroup=workgroup.name
|
||||
)
|
||||
|
||||
wg_configuration = wg.get("WorkGroup").get("Configuration")
|
||||
self.workgroups[
|
||||
workgroup.arn
|
||||
].enforce_workgroup_configuration = wg_configuration.get(
|
||||
"EnforceWorkGroupConfiguration", False
|
||||
)
|
||||
|
||||
# We include an empty EncryptionConfiguration to handle if the workgroup does not have encryption configured
|
||||
encryption = (
|
||||
wg_configuration.get(
|
||||
"ResultConfiguration",
|
||||
{"EncryptionConfiguration": {}},
|
||||
)
|
||||
.get(
|
||||
"EncryptionConfiguration",
|
||||
{"EncryptionOption": ""},
|
||||
)
|
||||
.get("EncryptionOption")
|
||||
)
|
||||
|
||||
if encryption in ["SSE_S3", "SSE_KMS", "CSE_KMS"]:
|
||||
encryption_configuration = EncryptionConfiguration(
|
||||
encryption_option=encryption, encrypted=True
|
||||
)
|
||||
self.workgroups[
|
||||
workgroup.arn
|
||||
].encryption_configuration = encryption_configuration
|
||||
|
||||
except Exception as error:
|
||||
logger.error(
|
||||
f"{self.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
|
||||
)
|
||||
|
||||
def __list_tags_for_resource__(self):
|
||||
logger.info("Athena - Listing Tags...")
|
||||
try:
|
||||
for workgroup in self.workgroups.values():
|
||||
regional_client = self.regional_clients[workgroup.region]
|
||||
workgroup.tags = regional_client.list_tags_for_resource(
|
||||
ResourceARN=workgroup.arn
|
||||
)["Tags"]
|
||||
except Exception as error:
|
||||
logger.error(
|
||||
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
|
||||
)
|
||||
|
||||
|
||||
class EncryptionConfiguration(BaseModel):
|
||||
encryption_option: str
|
||||
encrypted: bool
|
||||
|
||||
|
||||
class WorkGroup(BaseModel):
|
||||
arn: str
|
||||
name: str
|
||||
encryption_configuration: EncryptionConfiguration = EncryptionConfiguration(
|
||||
encryption_option="", encrypted=False
|
||||
)
|
||||
enforce_workgroup_configuration: bool = False
|
||||
region: str
|
||||
tags: Optional[list] = []
|
||||
@@ -0,0 +1,34 @@
|
||||
{
|
||||
"Provider": "aws",
|
||||
"CheckID": "athena_workgroup_encryption",
|
||||
"CheckTitle": "Ensure that encryption at rest is enabled for Amazon Athena query results stored in Amazon S3 in order to secure data and meet compliance requirements for data-at-rest encryption.",
|
||||
"CheckType": [
|
||||
"Software and Configuration Checks"
|
||||
],
|
||||
"ServiceName": "athena",
|
||||
"SubServiceName": "",
|
||||
"ResourceIdTemplate": "arn:partition:athena:region:account-id:workgroup/resource-id",
|
||||
"Severity": "high",
|
||||
"ResourceType": "WorkGroup",
|
||||
"Description": "Ensure that encryption at rest is enabled for Amazon Athena query results stored in Amazon S3 in order to secure data and meet compliance requirements for data-at-rest encryption.",
|
||||
"Risk": "If not enabled sensitive information at rest is not protected.",
|
||||
"RelatedUrl": "https://docs.aws.amazon.com/athena/latest/ug/encryption.html",
|
||||
"Remediation": {
|
||||
"Code": {
|
||||
"CLI": "aws athena update-work-group --region <REGION> --work-group <workgroup_name> --configuration-updates ResultConfigurationUpdates={EncryptionConfiguration={EncryptionOption=SSE_S3|SSE_KMS|CSE_KMS}}",
|
||||
"NativeIaC": "",
|
||||
"Other": "https://www.trendmicro.com/cloudoneconformity/knowledge-base/aws/Athena/encryption-enabled.html",
|
||||
"Terraform": "https://docs.bridgecrew.io/docs/ensure-that-athena-workgroup-is-encrypted#terraform"
|
||||
},
|
||||
"Recommendation": {
|
||||
"Text": "Enable Encryption. Use a CMK where possible. It will provide additional management and privacy benefits.",
|
||||
"Url": "https://docs.aws.amazon.com/athena/latest/ug/encrypting-query-results-stored-in-s3.html"
|
||||
}
|
||||
},
|
||||
"Categories": [
|
||||
"encryption"
|
||||
],
|
||||
"DependsOn": [],
|
||||
"RelatedTo": [],
|
||||
"Notes": ""
|
||||
}
|
||||
@@ -0,0 +1,27 @@
|
||||
from prowler.lib.check.models import Check, Check_Report_AWS
|
||||
from prowler.providers.aws.services.athena.athena_client import athena_client
|
||||
|
||||
|
||||
class athena_workgroup_encryption(Check):
|
||||
"""Check if there are Athena workgroups not encrypting query results"""
|
||||
|
||||
def execute(self):
|
||||
"""Execute the athena_workgroup_encryption check"""
|
||||
findings = []
|
||||
for workgroup in athena_client.workgroups.values():
|
||||
report = Check_Report_AWS(self.metadata())
|
||||
report.region = workgroup.region
|
||||
report.resource_id = workgroup.name
|
||||
report.resource_arn = workgroup.arn
|
||||
report.resource_tags = workgroup.tags
|
||||
|
||||
if workgroup.encryption_configuration.encrypted:
|
||||
report.status = "PASS"
|
||||
report.status_extended = f"Athena WorkGroup {workgroup.name} encrypts the query results using {workgroup.encryption_configuration.encryption_option}."
|
||||
else:
|
||||
report.status = "FAIL"
|
||||
report.status_extended = f"Athena WorkGroup {workgroup.name} does not encrypt the query results."
|
||||
|
||||
findings.append(report)
|
||||
|
||||
return findings
|
||||
@@ -0,0 +1,32 @@
|
||||
{
|
||||
"Provider": "aws",
|
||||
"CheckID": "athena_workgroup_enforce_configuration",
|
||||
"CheckTitle": "Ensure that workgroup configuration is enforced so it cannot be overriden by client-side settings.",
|
||||
"CheckType": [
|
||||
"Software and Configuration Checks"
|
||||
],
|
||||
"ServiceName": "athena",
|
||||
"SubServiceName": "",
|
||||
"ResourceIdTemplate": "arn:partition:athena:region:account-id:workgroup/resource-id",
|
||||
"Severity": "medium",
|
||||
"ResourceType": "WorkGroup",
|
||||
"Description": "Ensure that workgroup configuration is enforced so it cannot be overriden by client-side settings.",
|
||||
"Risk": "If workgroup configuration is not enforced security settings like encryption can be overriden by client-side settings.",
|
||||
"RelatedUrl": "https://docs.aws.amazon.com/athena/latest/ug/workgroups-settings-override.html",
|
||||
"Remediation": {
|
||||
"Code": {
|
||||
"CLI": "aws athena update-work-group --region <REGION> --work-group <workgroup_name> --configuration-updates EnforceWorkGroupConfiguration=True",
|
||||
"NativeIaC": "https://docs.bridgecrew.io/docs/bc_aws_general_33#cloudformation",
|
||||
"Other": "",
|
||||
"Terraform": "https://docs.bridgecrew.io/docs/bc_aws_general_33#terraform"
|
||||
},
|
||||
"Recommendation": {
|
||||
"Text": "Ensure that workgroup configuration is enforced so it cannot be overriden by client-side settings.",
|
||||
"Url": "https://docs.aws.amazon.com/athena/latest/ug/workgroups-settings-override.html"
|
||||
}
|
||||
},
|
||||
"Categories": [],
|
||||
"DependsOn": [],
|
||||
"RelatedTo": [],
|
||||
"Notes": ""
|
||||
}
|
||||
@@ -0,0 +1,27 @@
|
||||
from prowler.lib.check.models import Check, Check_Report_AWS
|
||||
from prowler.providers.aws.services.athena.athena_client import athena_client
|
||||
|
||||
|
||||
class athena_workgroup_enforce_configuration(Check):
|
||||
"""Check if there are Athena workgroups not encrypting query results"""
|
||||
|
||||
def execute(self):
|
||||
"""Execute the athena_workgroup_enforce_configuration check"""
|
||||
findings = []
|
||||
for workgroup in athena_client.workgroups.values():
|
||||
report = Check_Report_AWS(self.metadata())
|
||||
report.region = workgroup.region
|
||||
report.resource_id = workgroup.name
|
||||
report.resource_arn = workgroup.arn
|
||||
report.resource_tags = workgroup.tags
|
||||
|
||||
if workgroup.enforce_workgroup_configuration:
|
||||
report.status = "PASS"
|
||||
report.status_extended = f"Athena WorkGroup {workgroup.name} enforces the workgroup configuration, so it cannot be overridden by the client-side settings."
|
||||
else:
|
||||
report.status = "FAIL"
|
||||
report.status_extended = f"Athena WorkGroup {workgroup.name} does not enforce the workgroup configuration, so it can be overridden by the client-side settings."
|
||||
|
||||
findings.append(report)
|
||||
|
||||
return findings
|
||||
147
tests/providers/aws/services/athena/athena_service_test.py
Normal file
147
tests/providers/aws/services/athena/athena_service_test.py
Normal file
@@ -0,0 +1,147 @@
|
||||
from boto3 import session
|
||||
from botocore.client import BaseClient
|
||||
from mock import patch
|
||||
from moto import mock_athena
|
||||
|
||||
from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info
|
||||
from prowler.providers.aws.services.athena.athena_service import Athena
|
||||
from prowler.providers.common.models import Audit_Metadata
|
||||
|
||||
AWS_ACCOUNT_NUMBER = "123456789012"
|
||||
AWS_REGION = "eu-west-1"
|
||||
|
||||
|
||||
# Mocking Access Analyzer Calls
|
||||
make_api_call = BaseClient._make_api_call
|
||||
|
||||
|
||||
def mock_make_api_call(self, operation_name, kwarg):
|
||||
"""
|
||||
Mock every AWS API call using Boto3
|
||||
|
||||
As you can see the operation_name has the get_work_group snake_case form but
|
||||
we are using the GetWorkGroup form.
|
||||
Rationale -> https://github.com/boto/botocore/blob/develop/botocore/client.py#L810:L816
|
||||
"""
|
||||
if operation_name == "GetWorkGroup":
|
||||
return {
|
||||
"WorkGroup": {
|
||||
"Name": "primary",
|
||||
"State": "ENABLED",
|
||||
"Configuration": {
|
||||
"ResultConfiguration": {
|
||||
"EncryptionConfiguration": {
|
||||
"EncryptionOption": "SSE_S3",
|
||||
},
|
||||
},
|
||||
"EnforceWorkGroupConfiguration": True,
|
||||
},
|
||||
}
|
||||
}
|
||||
return make_api_call(self, operation_name, kwarg)
|
||||
|
||||
|
||||
# Mock generate_regional_clients()
|
||||
def mock_generate_regional_clients(service, audit_info, _):
|
||||
regional_client = audit_info.audit_session.client(service, region_name=AWS_REGION)
|
||||
regional_client.region = AWS_REGION
|
||||
return {AWS_REGION: regional_client}
|
||||
|
||||
|
||||
# Patch every AWS call using Boto3 and generate_regional_clients to have 1 client
|
||||
@patch(
|
||||
"prowler.providers.aws.lib.service.service.generate_regional_clients",
|
||||
new=mock_generate_regional_clients,
|
||||
)
|
||||
class Test_Athena_Service:
|
||||
# Mocked Audit Info
|
||||
def set_mocked_audit_info(self):
|
||||
audit_info = AWS_Audit_Info(
|
||||
session_config=None,
|
||||
original_session=None,
|
||||
audit_session=session.Session(
|
||||
profile_name=None,
|
||||
botocore_session=None,
|
||||
),
|
||||
audited_account=AWS_ACCOUNT_NUMBER,
|
||||
audited_account_arn=f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:root",
|
||||
audited_user_id=None,
|
||||
audited_partition="aws",
|
||||
audited_identity_arn=None,
|
||||
profile=None,
|
||||
profile_region=None,
|
||||
credentials=None,
|
||||
assumed_role_info=None,
|
||||
audited_regions=None,
|
||||
organizations_metadata=None,
|
||||
audit_resources=None,
|
||||
mfa_enabled=False,
|
||||
audit_metadata=Audit_Metadata(
|
||||
services_scanned=0,
|
||||
expected_checks=[],
|
||||
completed_checks=0,
|
||||
audit_progress=0,
|
||||
),
|
||||
)
|
||||
return audit_info
|
||||
|
||||
# Test Athena Get Workgrups
|
||||
@mock_athena
|
||||
def test__get_workgroups__not_encrypted(self):
|
||||
default_workgroup_name = "primary"
|
||||
audit_info = self.set_mocked_audit_info()
|
||||
workgroup_arn = f"arn:{audit_info.audited_partition}:athena:{AWS_REGION}:{audit_info.audited_account}:workgroup/{default_workgroup_name}"
|
||||
athena = Athena(audit_info)
|
||||
assert len(athena.workgroups) == 1
|
||||
assert athena.workgroups[workgroup_arn]
|
||||
assert athena.workgroups[workgroup_arn].arn == workgroup_arn
|
||||
assert athena.workgroups[workgroup_arn].name == default_workgroup_name
|
||||
assert athena.workgroups[workgroup_arn].region == AWS_REGION
|
||||
assert athena.workgroups[workgroup_arn].tags == []
|
||||
assert (
|
||||
athena.workgroups[workgroup_arn].encryption_configuration.encrypted is False
|
||||
)
|
||||
assert (
|
||||
athena.workgroups[workgroup_arn].encryption_configuration.encryption_option
|
||||
== ""
|
||||
)
|
||||
assert athena.workgroups[workgroup_arn].enforce_workgroup_configuration is False
|
||||
|
||||
# Test Athena Get Workgrups
|
||||
# We mock the get_work_group to return an encrypted workgroup
|
||||
@patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call)
|
||||
@mock_athena
|
||||
def test__get_workgroups__encrypted(self):
|
||||
default_workgroup_name = "primary"
|
||||
audit_info = self.set_mocked_audit_info()
|
||||
|
||||
# Athena client
|
||||
# This API call is not implemented by Moto
|
||||
# athena_client = audit_info.audit_session.client(
|
||||
# "athena", region_name=AWS_REGION
|
||||
# )
|
||||
# athena_client.update_work_group(
|
||||
# WorkGroup=default_workgroup_name,
|
||||
# ConfigurationUpdates={
|
||||
# "ResultConfigurationUpdates": {
|
||||
# "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"}
|
||||
# }
|
||||
# },
|
||||
# )
|
||||
|
||||
workgroup_arn = f"arn:{audit_info.audited_partition}:athena:{AWS_REGION}:{audit_info.audited_account}:workgroup/{default_workgroup_name}"
|
||||
athena = Athena(audit_info)
|
||||
assert len(athena.workgroups) == 1
|
||||
assert athena.workgroups[workgroup_arn]
|
||||
assert athena.workgroups[workgroup_arn].arn == workgroup_arn
|
||||
assert athena.workgroups[workgroup_arn].name == default_workgroup_name
|
||||
assert athena.workgroups[workgroup_arn].region == AWS_REGION
|
||||
assert athena.workgroups[workgroup_arn].tags == []
|
||||
assert (
|
||||
athena.workgroups[workgroup_arn].encryption_configuration.encrypted is True
|
||||
)
|
||||
assert (
|
||||
athena.workgroups[workgroup_arn].encryption_configuration.encryption_option
|
||||
== "SSE_S3"
|
||||
)
|
||||
assert athena.workgroups[workgroup_arn].enforce_workgroup_configuration is True
|
||||
@@ -0,0 +1,111 @@
|
||||
from unittest import mock
|
||||
|
||||
from boto3 import session
|
||||
from mock import patch
|
||||
from moto import mock_athena
|
||||
|
||||
from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info
|
||||
from prowler.providers.common.models import Audit_Metadata
|
||||
from tests.providers.aws.services.athena.athena_service_test import mock_make_api_call
|
||||
|
||||
AWS_REGION = "eu-west-1"
|
||||
AWS_ACCOUNT_NUMBER = "123456789012"
|
||||
ATHENA_PRIMARY_WORKGROUP = "primary"
|
||||
ATHENA_PRIMARY_WORKGROUP_ARN = f"arn:aws:athena:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:workgroup/{ATHENA_PRIMARY_WORKGROUP}"
|
||||
|
||||
|
||||
class Test_athena_workgroup_encryption:
|
||||
def set_mocked_audit_info(self):
|
||||
audit_info = AWS_Audit_Info(
|
||||
session_config=None,
|
||||
original_session=None,
|
||||
audit_session=session.Session(
|
||||
profile_name=None,
|
||||
botocore_session=None,
|
||||
),
|
||||
audited_account=AWS_ACCOUNT_NUMBER,
|
||||
audited_account_arn=f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:root",
|
||||
audited_user_id=None,
|
||||
audited_partition="aws",
|
||||
audited_identity_arn=None,
|
||||
profile=None,
|
||||
profile_region=None,
|
||||
credentials=None,
|
||||
assumed_role_info=None,
|
||||
audited_regions=[AWS_REGION],
|
||||
organizations_metadata=None,
|
||||
audit_resources=None,
|
||||
mfa_enabled=False,
|
||||
audit_metadata=Audit_Metadata(
|
||||
services_scanned=0,
|
||||
expected_checks=[],
|
||||
completed_checks=0,
|
||||
audit_progress=0,
|
||||
),
|
||||
)
|
||||
|
||||
return audit_info
|
||||
|
||||
@mock_athena
|
||||
def test_primary_workgroup_not_encrypted(self):
|
||||
from prowler.providers.aws.services.athena.athena_service import Athena
|
||||
|
||||
current_audit_info = self.set_mocked_audit_info()
|
||||
|
||||
with mock.patch(
|
||||
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
|
||||
new=current_audit_info,
|
||||
), mock.patch(
|
||||
"prowler.providers.aws.services.athena.athena_workgroup_encryption.athena_workgroup_encryption.athena_client",
|
||||
new=Athena(current_audit_info),
|
||||
):
|
||||
from prowler.providers.aws.services.athena.athena_workgroup_encryption.athena_workgroup_encryption import (
|
||||
athena_workgroup_encryption,
|
||||
)
|
||||
|
||||
check = athena_workgroup_encryption()
|
||||
result = check.execute()
|
||||
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "FAIL"
|
||||
assert (
|
||||
result[0].status_extended
|
||||
== f"Athena WorkGroup {ATHENA_PRIMARY_WORKGROUP} does not encrypt the query results."
|
||||
)
|
||||
assert result[0].resource_id == ATHENA_PRIMARY_WORKGROUP
|
||||
assert result[0].resource_arn == ATHENA_PRIMARY_WORKGROUP_ARN
|
||||
assert result[0].region == AWS_REGION
|
||||
assert result[0].resource_tags == []
|
||||
|
||||
@mock_athena
|
||||
# We mock the get_work_group to return an encrypted workgroup
|
||||
@patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call)
|
||||
def test_primary_workgroup_encrypted(self):
|
||||
from prowler.providers.aws.services.athena.athena_service import Athena
|
||||
|
||||
current_audit_info = self.set_mocked_audit_info()
|
||||
|
||||
with mock.patch(
|
||||
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
|
||||
new=current_audit_info,
|
||||
), mock.patch(
|
||||
"prowler.providers.aws.services.athena.athena_workgroup_encryption.athena_workgroup_encryption.athena_client",
|
||||
new=Athena(current_audit_info),
|
||||
):
|
||||
from prowler.providers.aws.services.athena.athena_workgroup_encryption.athena_workgroup_encryption import (
|
||||
athena_workgroup_encryption,
|
||||
)
|
||||
|
||||
check = athena_workgroup_encryption()
|
||||
result = check.execute()
|
||||
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "PASS"
|
||||
assert (
|
||||
result[0].status_extended
|
||||
== f"Athena WorkGroup {ATHENA_PRIMARY_WORKGROUP} encrypts the query results using SSE_S3."
|
||||
)
|
||||
assert result[0].resource_id == ATHENA_PRIMARY_WORKGROUP
|
||||
assert result[0].resource_arn == ATHENA_PRIMARY_WORKGROUP_ARN
|
||||
assert result[0].region == AWS_REGION
|
||||
assert result[0].resource_tags == []
|
||||
@@ -0,0 +1,111 @@
|
||||
from unittest import mock
|
||||
|
||||
from boto3 import session
|
||||
from mock import patch
|
||||
from moto import mock_athena
|
||||
|
||||
from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info
|
||||
from prowler.providers.common.models import Audit_Metadata
|
||||
from tests.providers.aws.services.athena.athena_service_test import mock_make_api_call
|
||||
|
||||
AWS_REGION = "eu-west-1"
|
||||
AWS_ACCOUNT_NUMBER = "123456789012"
|
||||
ATHENA_PRIMARY_WORKGROUP = "primary"
|
||||
ATHENA_PRIMARY_WORKGROUP_ARN = f"arn:aws:athena:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:workgroup/{ATHENA_PRIMARY_WORKGROUP}"
|
||||
|
||||
|
||||
class Test_athena_workgroup_enforce_configuration:
|
||||
def set_mocked_audit_info(self):
|
||||
audit_info = AWS_Audit_Info(
|
||||
session_config=None,
|
||||
original_session=None,
|
||||
audit_session=session.Session(
|
||||
profile_name=None,
|
||||
botocore_session=None,
|
||||
),
|
||||
audited_account=AWS_ACCOUNT_NUMBER,
|
||||
audited_account_arn=f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:root",
|
||||
audited_user_id=None,
|
||||
audited_partition="aws",
|
||||
audited_identity_arn=None,
|
||||
profile=None,
|
||||
profile_region=None,
|
||||
credentials=None,
|
||||
assumed_role_info=None,
|
||||
audited_regions=[AWS_REGION],
|
||||
organizations_metadata=None,
|
||||
audit_resources=None,
|
||||
mfa_enabled=False,
|
||||
audit_metadata=Audit_Metadata(
|
||||
services_scanned=0,
|
||||
expected_checks=[],
|
||||
completed_checks=0,
|
||||
audit_progress=0,
|
||||
),
|
||||
)
|
||||
|
||||
return audit_info
|
||||
|
||||
@mock_athena
|
||||
def test_primary_workgroup_configuration_not_enforced(self):
|
||||
from prowler.providers.aws.services.athena.athena_service import Athena
|
||||
|
||||
current_audit_info = self.set_mocked_audit_info()
|
||||
|
||||
with mock.patch(
|
||||
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
|
||||
new=current_audit_info,
|
||||
), mock.patch(
|
||||
"prowler.providers.aws.services.athena.athena_workgroup_enforce_configuration.athena_workgroup_enforce_configuration.athena_client",
|
||||
new=Athena(current_audit_info),
|
||||
):
|
||||
from prowler.providers.aws.services.athena.athena_workgroup_enforce_configuration.athena_workgroup_enforce_configuration import (
|
||||
athena_workgroup_enforce_configuration,
|
||||
)
|
||||
|
||||
check = athena_workgroup_enforce_configuration()
|
||||
result = check.execute()
|
||||
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "FAIL"
|
||||
assert (
|
||||
result[0].status_extended
|
||||
== f"Athena WorkGroup {ATHENA_PRIMARY_WORKGROUP} does not enforce the workgroup configuration, so it can be overridden by the client-side settings."
|
||||
)
|
||||
assert result[0].resource_id == ATHENA_PRIMARY_WORKGROUP
|
||||
assert result[0].resource_arn == ATHENA_PRIMARY_WORKGROUP_ARN
|
||||
assert result[0].region == AWS_REGION
|
||||
assert result[0].resource_tags == []
|
||||
|
||||
@mock_athena
|
||||
# We mock the get_work_group to return a workgroup not enforcing configuration
|
||||
@patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call)
|
||||
def test_primary_workgroup_configuration_enforced(self):
|
||||
from prowler.providers.aws.services.athena.athena_service import Athena
|
||||
|
||||
current_audit_info = self.set_mocked_audit_info()
|
||||
|
||||
with mock.patch(
|
||||
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
|
||||
new=current_audit_info,
|
||||
), mock.patch(
|
||||
"prowler.providers.aws.services.athena.athena_workgroup_enforce_configuration.athena_workgroup_enforce_configuration.athena_client",
|
||||
new=Athena(current_audit_info),
|
||||
):
|
||||
from prowler.providers.aws.services.athena.athena_workgroup_enforce_configuration.athena_workgroup_enforce_configuration import (
|
||||
athena_workgroup_enforce_configuration,
|
||||
)
|
||||
|
||||
check = athena_workgroup_enforce_configuration()
|
||||
result = check.execute()
|
||||
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "PASS"
|
||||
assert (
|
||||
result[0].status_extended
|
||||
== f"Athena WorkGroup {ATHENA_PRIMARY_WORKGROUP} enforces the workgroup configuration, so it cannot be overridden by the client-side settings."
|
||||
)
|
||||
assert result[0].resource_id == ATHENA_PRIMARY_WORKGROUP
|
||||
assert result[0].resource_arn == ATHENA_PRIMARY_WORKGROUP_ARN
|
||||
assert result[0].region == AWS_REGION
|
||||
assert result[0].resource_tags == []
|
||||
Reference in New Issue
Block a user