feat(EFS): Service and checks (#1469)

Co-authored-by: Sergio Garcia <38561120+sergargar@users.noreply.github.com>
Co-authored-by: sergargar <sergio@verica.io>
This commit is contained in:
Nacho Rivera
2022-11-14 15:05:41 +01:00
committed by GitHub
parent c9880b953f
commit c87327bb77
19 changed files with 683 additions and 151 deletions

View File

View File

@@ -1,53 +0,0 @@
#!/usr/bin/env bash
# Prowler - the handy cloud security tool (copyright 2018) by Toni de la Fuente
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed
# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
CHECK_ID_extra7143="7.143"
CHECK_TITLE_extra7143="[extra7143] Check if EFS have policies which allow access to everyone"
CHECK_SCORED_extra7143="NOT_SCORED"
CHECK_CIS_LEVEL_extra7143="EXTRA"
CHECK_SEVERITY_extra7143="Critical"
CHECK_ASFF_RESOURCE_TYPE_extra7143="AwsEFS"
CHECK_ALTERNATE_check7143="extra7143"
CHECK_SERVICENAME_extra7143="efs"
CHECK_RISK_extra7143='EFS accessible to everyone could expose sensitive data to bad actors'
CHECK_REMEDIATION_extra7143='Ensure efs has some policy but it does not have principle as *'
CHECK_DOC_extra7143='https://docs.aws.amazon.com/efs/latest/ug/access-control-block-public-access.html'
CHECK_CAF_EPIC_extra7143='Data Protection'
extra7143(){
# "Check if EFS have policies which allow access to everyone (Not Scored) (Not part of CIS benchmark)"
for regx in $REGIONS; do
LIST_OF_EFS_IDS=$($AWSCLI efs describe-file-systems $PROFILE_OPT --region $regx --query FileSystems[*].FileSystemId --output text 2>&1|xargs -n1 )
if [[ $(echo "$LIST_OF_EFS_IDS" | grep -E 'AccessDenied|UnauthorizedOperation|AuthorizationError') ]]; then
textInfo "$regx: Access Denied trying to describe file systems" "$regx"
continue
fi
if [[ $LIST_OF_EFS_IDS ]]; then
for efsId in $LIST_OF_EFS_IDS;do
EFS_POLICY_STATEMENTS=$($AWSCLI efs $PROFILE_OPT describe-file-system-policy --region $regx --file-system-id $efsId --output json --query Policy 2>&1)
if [[ $EFS_POLICY_STATEMENTS == *PolicyNotFound* ]]; then
textFail "$regx: EFS: $efsId doesn't have any policy which means it grants full access to any client" "$regx" "$efsId"
else
EFS_POLICY_BAD_STATEMENTS=$(echo $EFS_POLICY_STATEMENTS | jq '. | fromjson' | jq '.Statement[] | select(.Effect=="Allow") | select(.Principal=="*" or .Principal.AWS=="*" or .Principal.CanonicalUser=="*")')
if [[ $EFS_POLICY_BAD_STATEMENTS != "" ]]; then
textFail "$regx: EFS $efsId has policy which allows access to everyone" "$regx" "$efsId"
else
textPass "$regx: EFS $efsId has policy which does not allow access to everyone" "$regx" "$efsId"
fi
fi
done
else
textInfo "$regx: No EFS found" "$regx"
fi
done
}

View File

@@ -1,51 +0,0 @@
#!/usr/bin/env bash
# Prowler - the handy cloud security tool (copyright 2018) by Toni de la Fuente
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed
# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
CHECK_ID_extra7148="7.148"
CHECK_TITLE_extra7148="[extra7148] Check if EFS File systems have backup enabled"
CHECK_SCORED_extra7148="NOT_SCORED"
CHECK_CIS_LEVEL_extra7148="EXTRA"
CHECK_SEVERITY_extra7148="Medium"
CHECK_ASFF_RESOURCE_TYPE_extra7148="AwsEfsFileSystem"
CHECK_ALTERNATE_check7148="extra7148"
CHECK_SERVICENAME_extra7148="efs"
CHECK_RISK_extra7148='If backup is not enabled; data is vulnerable. Human error or bad actors could erase or modify data.'
CHECK_REMEDIATION_extra7148='Enable automated backup for production data. Define a retention period and periodically test backup restoration. A Disaster Recovery process should be in place to govern Data Protection approach.'
CHECK_DOC_extra7148='https://docs.aws.amazon.com/efs/latest/ug/whatisefs.html'
CHECK_CAF_EPIC_extra7148='Data Protection'
extra7148() {
for regx in $REGIONS; do
LIST_OF_EFS_SYSTEMS=$($AWSCLI efs describe-file-systems $PROFILE_OPT --region $regx --query 'FileSystems[*].FileSystemId' --output text 2>&1)
if [[ $(echo "$LIST_OF_EFS_SYSTEMS" | grep -E 'AccessDenied|UnauthorizedOperation|AuthorizationError') ]]; then
textInfo "$regx: Access Denied trying to describe file systems" "$regx"
continue
fi
if [[ $LIST_OF_EFS_SYSTEMS ]]; then
for filesystem in $LIST_OF_EFS_SYSTEMS; do
# if retention is 0 then is disabled
BACKUP_POLICY=$($AWSCLI efs describe-backup-policy $PROFILE_OPT --region $regx --file-system-id $filesystem --query BackupPolicy --output text 2>&1)
if [[ $(echo "$BACKUP_POLICY" | grep -E 'AccessDenied|UnauthorizedOperation|AuthorizationError') ]]; then
textInfo "$regx: Access Denied trying to describe backup policy" "$regx"
continue
fi
if [[ $BACKUP_POLICY == "DISABLED" ]]; then
textFail "$regx: File system $filesystem does not have backup enabled!" "$regx" "$filesystem"
else
textPass "$regx: EFS File system $filesystem has backup enabled" "$regx" "$filesystem"
fi
done
else
textInfo "$regx: No EFS File systems found" "$regx" "$filesystem"
fi
done
}

View File

@@ -1,47 +0,0 @@
#!/usr/bin/env bash
# Prowler - the handy cloud security tool (copyright 2018) by Toni de la Fuente
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed
# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
CHECK_ID_extra7161="7.161"
CHECK_TITLE_extra7161="[extra7161] Check if EFS protects sensitive data with encryption at rest"
CHECK_SCORED_extra7161="NOT_SCORED"
CHECK_CIS_LEVEL_extra7161="EXTRA"
CHECK_SEVERITY_extra7161="Medium"
CHECK_ASFF_RESOURCE_TYPE_extra7161="AwsEfsFileSystem"
CHECK_ALTERNATE_check7161="extra7161"
CHECK_SERVICENAME_extra7161="efs"
CHECK_RISK_extra7161='EFS should be encrypted at rest to prevent exposure of sensitive data to bad actors'
CHECK_REMEDIATION_extra7161='Ensure that encryption at rest is enabled for EFS file systems. Encryption at rest can only be enabled during the file system creation.'
CHECK_DOC_extra7161='https://docs.aws.amazon.com/efs/latest/ug/encryption-at-rest.html'
CHECK_CAF_EPIC_extra7161='Data Protection'
extra7161(){
# "Check if EFS has encryption at rest enabled (Not Scored) (Proposed requirement for 1.5 CIS benchmark)"
for regx in $REGIONS; do
LIST_OF_EFS_IDS=$($AWSCLI efs describe-file-systems $PROFILE_OPT --region $regx --query 'FileSystems[*].FileSystemId' --output text 2>&1| xargs -n1 )
if [[ $(echo "$LIST_OF_EFS_IDS" | grep -E 'AccessDenied|UnauthorizedOperation|AuthorizationError') ]]; then
textInfo "$regx: Access Denied trying to describe file systems" "$regx"
continue
fi
if [[ $LIST_OF_EFS_IDS ]]; then
for efsId in $LIST_OF_EFS_IDS;do
EFS_ENCRYPTION_CHECK=$($AWSCLI efs $PROFILE_OPT describe-file-systems --region $regx --file-system-id $efsId --output json --query 'FileSystems[*].Encrypted' --output text)
if [[ $EFS_ENCRYPTION_CHECK == "True" ]]; then
textPass "$regx: EFS $efsId has has encryption at rest enabled" "$regx" "$efsId"
else
textFail "$regx: EFS: $efsId does not have encryption at rest enabled" "$regx" "$efsId"
fi
done
else
textInfo "$regx: No EFS found" "$regx"
fi
done
}

View File

@@ -0,0 +1,4 @@
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.efs.efs_service import EFS
efs_client = EFS(current_audit_info)

View File

@@ -0,0 +1,35 @@
{
"Provider": "aws",
"CheckID": "efs_encryption_at_rest_enabled",
"CheckTitle": "Check if EFS protects sensitive data with encryption at rest",
"CheckType": ["Protect", "Data protection", "Encryption of data at rest"],
"ServiceName": "efs",
"SubServiceName": "",
"ResourceIdTemplate": "arn:partition:service:region:account-id:resource-id",
"Severity": "medium",
"ResourceType": "AwsEFSFileSystem",
"Description": "Check if EFS protects sensitive data with encryption at rest",
"Risk": "EFS should be encrypted at rest to prevent exposure of sensitive data to bad actors",
"RelatedUrl": "",
"Remediation": {
"Code": {
"CLI": "aws efs create-file-system --creation-token $(uuidgen) --performance-mode generalPurpose --encrypted --kms-key-id user/customer-managedCMKalias",
"NativeIaC": "https://docs.bridgecrew.io/docs/general_17#cloudformation",
"Other": "",
"Terraform": "https://docs.bridgecrew.io/docs/general_17#terraform"
},
"Recommendation": {
"Text": "Ensure that encryption at rest is enabled for EFS file systems. Encryption at rest can only be enabled during the file system creation.",
"Url": "https://docs.aws.amazon.com/efs/latest/ug/encryption-at-rest.html"
}
},
"Categories": [],
"Tags": {
"Tag1Key": "value",
"Tag2Key": "value"
},
"DependsOn": [],
"RelatedTo": [],
"Notes": "",
"Compliance": []
}

View File

@@ -0,0 +1,23 @@
from lib.check.models import Check, Check_Report
from providers.aws.services.efs.efs_client import efs_client
class efs_encryption_at_rest_enabled(Check):
def execute(self):
findings = []
for fs in efs_client.filesystems:
report = Check_Report(self.metadata)
report.region = fs.region
report.resource_id = fs.id
report.resource_arn = ""
report.status = "FAIL"
report.status_extended = (
f"EFS {fs.id} does not have encryption at rest enabled"
)
if fs.encrypted:
report.status = "PASS"
report.status_extended = f"EFS {fs.id} has encryption at rest enabled"
findings.append(report)
return findings

View File

@@ -0,0 +1,70 @@
from re import search
from unittest import mock
from providers.aws.services.efs.efs_service import FileSystem
# Mock Test Region
AWS_REGION = "eu-west-1"
AWS_ACCOUNT_NUMBER = "123456789012"
file_system_id = "fs-c7a0456e"
backup_valid_policy_status = "ENABLED"
class Test_efs_encryption_at_rest_enabled:
def test_efs_encryption_enabled(self):
efs_client = mock.MagicMock
efs_client.filesystems = [
FileSystem(
id=file_system_id,
region=AWS_REGION,
policy=None,
backup_policy=backup_valid_policy_status,
encrypted=True,
)
]
with mock.patch(
"providers.aws.services.efs.efs_service.EFS",
efs_client,
):
from providers.aws.services.efs.efs_encryption_at_rest_enabled.efs_encryption_at_rest_enabled import (
efs_encryption_at_rest_enabled,
)
check = efs_encryption_at_rest_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert search("has encryption at rest enabled", result[0].status_extended)
assert result[0].resource_id == file_system_id
assert result[0].resource_arn == ""
def test_efs_encryption_disabled(self):
efs_client = mock.MagicMock
efs_client.filesystems = [
FileSystem(
id=file_system_id,
region=AWS_REGION,
policy=None,
backup_policy=backup_valid_policy_status,
encrypted=False,
)
]
with mock.patch(
"providers.aws.services.efs.efs_service.EFS",
efs_client,
):
from providers.aws.services.efs.efs_encryption_at_rest_enabled.efs_encryption_at_rest_enabled import (
efs_encryption_at_rest_enabled,
)
check = efs_encryption_at_rest_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert search(
"does not have encryption at rest enabled", result[0].status_extended
)
assert result[0].resource_id == file_system_id
assert result[0].resource_arn == ""

View File

@@ -0,0 +1,35 @@
{
"Provider": "aws",
"CheckID": "efs_have_backup_enabled",
"CheckTitle": "Check if EFS File systems have backup enabled",
"CheckType": ["Recover", "Resilience", "Backup"],
"ServiceName": "efs",
"SubServiceName": "",
"ResourceIdTemplate": "arn:partition:service:region:account-id:resource-id",
"Severity": "medium",
"ResourceType": "AwsEFSFileSystem",
"Description": "Check if EFS File systems have backup enabled",
"Risk": "If backup is not enabled, data is vulnerable. Human error or bad actors could erase or modify data.",
"RelatedUrl": "",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Enable automated backup for production data. Define a retention period and periodically test backup restoration. A Disaster Recovery process should be in place to govern Data Protection approach.",
"Url": "https://docs.aws.amazon.com/efs/latest/ug/whatisefs.html"
}
},
"Categories": [],
"Tags": {
"Tag1Key": "value",
"Tag2Key": "value"
},
"DependsOn": [],
"RelatedTo": [],
"Notes": "",
"Compliance": []
}

View File

@@ -0,0 +1,21 @@
from lib.check.models import Check, Check_Report
from providers.aws.services.efs.efs_client import efs_client
class efs_have_backup_enabled(Check):
def execute(self):
findings = []
for fs in efs_client.filesystems:
report = Check_Report(self.metadata)
report.region = fs.region
report.resource_id = fs.id
report.resource_arn = ""
report.status = "PASS"
report.status_extended = f"EFS {fs.id} has backup enabled"
if fs.backup_policy == "DISABLED" or fs.backup_policy == "DISABLING":
report.status = "FAIL"
report.status_extended = f"EFS {fs.id} does not have backup enabled"
findings.append(report)
return findings

View File

@@ -0,0 +1,97 @@
from re import search
from unittest import mock
from providers.aws.services.efs.efs_service import FileSystem
# Mock Test Region
AWS_REGION = "eu-west-1"
AWS_ACCOUNT_NUMBER = "123456789012"
file_system_id = "fs-c7a0456e"
backup_valid_policy_status = "ENABLED"
backup_valid_invalid_policy_status_1 = "DISABLING"
backup_valid_invalid_policy_status_2 = "DISABLED"
class Test_efs_have_backup_enabled:
def test_efs_valid_backup_policy(self):
efs_client = mock.MagicMock
efs_client.filesystems = [
FileSystem(
id=file_system_id,
region=AWS_REGION,
policy=None,
backup_policy=backup_valid_policy_status,
encrypted=True,
)
]
with mock.patch(
"providers.aws.services.efs.efs_service.EFS",
efs_client,
):
from providers.aws.services.efs.efs_have_backup_enabled.efs_have_backup_enabled import (
efs_have_backup_enabled,
)
check = efs_have_backup_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert search("has backup enabled", result[0].status_extended)
assert result[0].resource_id == file_system_id
assert result[0].resource_arn == ""
def test_efs_invalid_policy_backup_1(self):
efs_client = mock.MagicMock
efs_client.filesystems = [
FileSystem(
id=file_system_id,
region=AWS_REGION,
policy=None,
backup_policy=backup_valid_invalid_policy_status_1,
encrypted=True,
)
]
with mock.patch(
"providers.aws.services.efs.efs_service.EFS",
efs_client,
):
from providers.aws.services.efs.efs_have_backup_enabled.efs_have_backup_enabled import (
efs_have_backup_enabled,
)
check = efs_have_backup_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert search("does not have backup enabled", result[0].status_extended)
assert result[0].resource_id == file_system_id
assert result[0].resource_arn == ""
def test_efs_invalid_policy_backup_2(self):
efs_client = mock.MagicMock
efs_client.filesystems = [
FileSystem(
id=file_system_id,
region=AWS_REGION,
policy=None,
backup_policy=backup_valid_invalid_policy_status_2,
encrypted=True,
)
]
with mock.patch(
"providers.aws.services.efs.efs_service.EFS",
efs_client,
):
from providers.aws.services.efs.efs_have_backup_enabled.efs_have_backup_enabled import (
efs_have_backup_enabled,
)
check = efs_have_backup_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert search("does not have backup enabled", result[0].status_extended)
assert result[0].resource_id == file_system_id
assert result[0].resource_arn == ""

View File

@@ -0,0 +1,35 @@
{
"Provider": "aws",
"CheckID": "efs_not_publicly_accessible",
"CheckTitle": "Check if EFS have policies which allow access to everyone",
"CheckType": ["Protect", "Data protection"],
"ServiceName": "efs",
"SubServiceName": "",
"ResourceIdTemplate": "arn:partition:service:region:account-id:resource-id",
"Severity": "critical",
"ResourceType": "AwsEFSFileSystem",
"Description": "Check if EFS have policies which allow access to everyone",
"Risk": "EFS accessible to everyone could expose sensitive data to bad actors",
"RelatedUrl": "",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Ensure efs has some policy but it does not have principle as *",
"Url": "https://docs.aws.amazon.com/efs/latest/ug/access-control-block-public-access.html"
}
},
"Categories": [],
"Tags": {
"Tag1Key": "value",
"Tag2Key": "value"
},
"DependsOn": [],
"RelatedTo": [],
"Notes": "",
"Compliance": []
}

View File

@@ -0,0 +1,36 @@
from lib.check.models import Check, Check_Report
from providers.aws.services.efs.efs_client import efs_client
class efs_not_publicly_accessible(Check):
def execute(self):
findings = []
for fs in efs_client.filesystems:
report = Check_Report(self.metadata)
report.region = fs.region
report.resource_id = fs.id
report.resource_arn = ""
report.status = "PASS"
report.status_extended = (
f"EFS {fs.id} has policy which does not allow access to everyone"
)
if not fs.policy:
report.status = "FAIL"
report.status_extended = f"EFS {fs.id} doesn't have any policy which means it grants full access to any client"
else:
for statement in fs.policy["Statement"]:
if statement["Effect"] == "Allow":
if (
statement["Principal"]["AWS"] == "*"
or statement["Principal"] == "*"
or (
"CanonicalUser" in statement["Principal"]
and statement["Principal"]["CanonicalUser"] == "*"
)
):
report.status = "FAIL"
report.status_extended = f"EFS {fs.id} has policy which allows access to everyone"
break
findings.append(report)
return findings

View File

@@ -0,0 +1,124 @@
from re import search
from unittest import mock
from providers.aws.services.efs.efs_service import FileSystem
# Mock Test Region
AWS_REGION = "eu-west-1"
AWS_ACCOUNT_NUMBER = "123456789012"
file_system_id = "fs-c7a0456e"
filesystem_policy = {
"Id": "1",
"Statement": [
{
"Effect": "Allow",
"Action": ["elasticfilesystem:ClientMount"],
"Principal": {"AWS": f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:root"},
}
],
}
filesystem_invalid_policy = {
"Id": "1",
"Statement": [
{
"Effect": "Allow",
"Action": ["elasticfilesystem:ClientMount"],
"Principal": {"AWS": "*"},
}
],
}
class Test_efs_not_publicly_accessible:
def test_efs_valid_policy(self):
efs_client = mock.MagicMock
efs_client.filesystems = [
FileSystem(
id=file_system_id,
region=AWS_REGION,
policy=filesystem_policy,
backup_policy=None,
encrypted=True,
)
]
with mock.patch(
"providers.aws.services.efs.efs_service.EFS",
efs_client,
):
from providers.aws.services.efs.efs_not_publicly_accessible.efs_not_publicly_accessible import (
efs_not_publicly_accessible,
)
check = efs_not_publicly_accessible()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert search(
"has policy which does not allow access to everyone",
result[0].status_extended,
)
assert result[0].resource_id == file_system_id
assert result[0].resource_arn == ""
def test_efs_invalid_policy(self):
efs_client = mock.MagicMock
efs_client.filesystems = [
FileSystem(
id=file_system_id,
region=AWS_REGION,
policy=filesystem_invalid_policy,
backup_policy=None,
encrypted=True,
)
]
with mock.patch(
"providers.aws.services.efs.efs_service.EFS",
efs_client,
):
from providers.aws.services.efs.efs_not_publicly_accessible.efs_not_publicly_accessible import (
efs_not_publicly_accessible,
)
check = efs_not_publicly_accessible()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert search(
"has policy which allows access to everyone", result[0].status_extended
)
assert result[0].resource_id == file_system_id
assert result[0].resource_arn == ""
def test_efs_no_policy(self):
efs_client = mock.MagicMock
efs_client.filesystems = [
FileSystem(
id=file_system_id,
region=AWS_REGION,
policy=None,
backup_policy=None,
encrypted=True,
)
]
with mock.patch(
"providers.aws.services.efs.efs_service.EFS",
efs_client,
):
from providers.aws.services.efs.efs_not_publicly_accessible.efs_not_publicly_accessible import (
efs_not_publicly_accessible,
)
check = efs_not_publicly_accessible()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert search(
"doesn't have any policy which means it grants full access to any client",
result[0].status_extended,
)
assert result[0].resource_id == file_system_id
assert result[0].resource_arn == ""

View File

@@ -0,0 +1,93 @@
import threading
from dataclasses import dataclass
from lib.logger import logger
from providers.aws.aws_provider import generate_regional_clients
################### EFS
class EFS:
def __init__(self, audit_info):
self.service = "efs"
self.session = audit_info.audit_session
self.regional_clients = generate_regional_clients(self.service, audit_info)
self.filesystems = []
self.__threading_call__(self.__describe_file_systems__)
self.__describe_file_system_policies__()
def __get_session__(self):
return self.session
def __threading_call__(self, call):
threads = []
for regional_client in self.regional_clients.values():
threads.append(threading.Thread(target=call, args=(regional_client,)))
for t in threads:
t.start()
for t in threads:
t.join()
def __describe_file_systems__(self, regional_client):
logger.info("EFS - Describing file systems...")
try:
describe_efs_paginator = regional_client.get_paginator(
"describe_file_systems"
)
for page in describe_efs_paginator.paginate():
for efs in page["FileSystems"]:
self.filesystems.append(
FileSystem(
id=efs["FileSystemId"],
region=regional_client.region,
policy=None,
backup_policy=None,
encrypted=efs["Encrypted"],
)
)
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def __describe_file_system_policies__(self):
logger.info("EFS - Describing file system policies...")
try:
for filesystem in self.filesystems:
for region, client in self.regional_clients.items():
if filesystem.region == region:
filesystem.backup_policy = client.describe_backup_policy(
FileSystemId=filesystem.id
)["BackupPolicy"]["Status"]
fs_policy = client.describe_file_system_policy(
FileSystemId=filesystem.id
)
if "Policy" in fs_policy:
filesystem.policy = fs_policy["Policy"]
except Exception as error:
logger.error(
f"{client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
@dataclass
class FileSystem:
id: str
region: str
policy: dict
backup_policy: str
encrypted: bool
def __init__(
self,
id,
region,
policy,
backup_policy,
encrypted,
):
self.id = id
self.region = region
self.policy = policy
self.backup_policy = backup_policy
self.encrypted = encrypted

View File

@@ -0,0 +1,110 @@
from unittest.mock import patch
import botocore
from boto3 import client, session
from moto import mock_efs
from providers.aws.lib.audit_info.models import AWS_Audit_Info
from providers.aws.services.efs.efs_service import EFS
# Mock Test Region
AWS_REGION = "eu-west-1"
AWS_ACCOUNT_NUMBER = "123456789012"
# Mocking Access Analyzer Calls
make_api_call = botocore.client.BaseClient._make_api_call
file_system_id = "fs-c7a0456e"
creation_token = "console-d215fa78-1f83-4651-b026-facafd8a7da7"
backup_policy_status = "ENABLED"
filesystem_policy = {
"Id": "1",
"Statement": [
{
"Effect": "Allow",
"Action": ["elasticfilesystem:ClientMount"],
"Principal": {"AWS": f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:root"},
}
],
}
def mock_make_api_call(self, operation_name, kwarg):
if operation_name == "DescribeFileSystemPolicy":
return {"FileSystemId": file_system_id, "Policy": filesystem_policy}
if operation_name == "DescribeBackupPolicy":
return {"BackupPolicy": {"Status": backup_policy_status}}
return make_api_call(self, operation_name, kwarg)
def mock_generate_regional_clients(service, audit_info):
regional_client = audit_info.audit_session.client(service, region_name=AWS_REGION)
regional_client.region = AWS_REGION
return {AWS_REGION: regional_client}
# Patch every AWS call using Boto3 and generate_regional_clients to have 1 client
@patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call)
@patch(
"providers.aws.services.efs.efs_service.generate_regional_clients",
new=mock_generate_regional_clients,
)
class Test_EFS:
def set_mocked_audit_info(self):
audit_info = AWS_Audit_Info(
original_session=None,
audit_session=session.Session(
profile_name=None,
botocore_session=None,
),
audited_account=AWS_ACCOUNT_NUMBER,
audited_user_id=None,
audited_partition="aws",
audited_identity_arn=None,
profile=None,
profile_region=None,
credentials=None,
assumed_role_info=None,
audited_regions=None,
organizations_metadata=None,
)
return audit_info
# Test EFS Session
def test__get_session__(self):
access_analyzer = EFS(self.set_mocked_audit_info())
assert access_analyzer.session.__class__.__name__ == "Session"
# Test EFS Service
def test__get_service__(self):
access_analyzer = EFS(self.set_mocked_audit_info())
assert access_analyzer.service == "efs"
@mock_efs
# Test EFS describe file systems
def test__describe_file_systems__(self):
efs_client = client("efs", AWS_REGION)
efs = efs_client.create_file_system(
CreationToken=creation_token, Encrypted=True
)
filesystem = EFS(self.set_mocked_audit_info())
assert len(filesystem.filesystems) == 1
assert filesystem.filesystems[0].id == efs["FileSystemId"]
assert filesystem.filesystems[0].encrypted == efs["Encrypted"]
@mock_efs
# Test EFS describe file systems
def test__describe_file_system_policies__(self):
efs_client = client("efs", AWS_REGION)
efs = efs_client.create_file_system(
CreationToken=creation_token, Encrypted=True
)
filesystem = EFS(self.set_mocked_audit_info())
assert len(filesystem.filesystems) == 1
assert filesystem.filesystems[0].id == efs["FileSystemId"]
assert filesystem.filesystems[0].encrypted == efs["Encrypted"]
assert filesystem.filesystems[0].backup_policy == backup_policy_status
assert filesystem.filesystems[0].policy == filesystem_policy