fix(checks): solve different errors in EFS, S3 and VPC (#1841)

Co-authored-by: sergargar <sergargar@users.noreply.github.com>
This commit is contained in:
Sergio Garcia
2023-02-07 09:42:10 +01:00
committed by GitHub
parent 043986f35b
commit 13316b68aa
4 changed files with 21 additions and 13 deletions

View File

@@ -1,3 +1,4 @@
import json
import threading import threading
from dataclasses import dataclass from dataclasses import dataclass
@@ -74,7 +75,7 @@ class EFS:
FileSystemId=filesystem.id FileSystemId=filesystem.id
) )
if "Policy" in fs_policy: if "Policy" in fs_policy:
filesystem.policy = fs_policy["Policy"] filesystem.policy = json.loads(fs_policy["Policy"])
except ClientError as e: except ClientError as e:
if e.response["Error"]["Code"] == "PolicyNotFound": if e.response["Error"]["Code"] == "PolicyNotFound":
filesystem.policy = {} filesystem.policy = {}

View File

@@ -53,7 +53,8 @@ class s3_bucket_public_access(Check):
report.status_extended = f"S3 Bucket {bucket.name} has public access due to bucket policy." report.status_extended = f"S3 Bucket {bucket.name} has public access due to bucket policy."
else: else:
if ( if (
"AWS" in statement["Principal"] "Principal" in statement
and "AWS" in statement["Principal"]
and statement["Effect"] == "Allow" and statement["Effect"] == "Allow"
): ):
if type(statement["Principal"]["AWS"]) == str: if type(statement["Principal"]["AWS"]) == str:

View File

@@ -26,20 +26,25 @@ class vpc_endpoint_connections_trust_boundaries(Check):
else: else:
principals = statement["Principal"]["AWS"] principals = statement["Principal"]["AWS"]
for principal_arn in principals: for principal_arn in principals:
account_id = principal_arn.split(":")[4]
report = Check_Report_AWS(self.metadata()) report = Check_Report_AWS(self.metadata())
report.region = endpoint.region report.region = endpoint.region
if ( if principal_arn == "*":
account_id in trusted_account_ids report.status = "FAIL"
or account_id in vpc_client.audited_account report.status_extended = f"VPC Endpoint {endpoint.id} in VPC {endpoint.vpc_id} has full access."
):
report.status = "PASS"
report.status_extended = f"Found trusted account {account_id} in VPC Endpoint {endpoint.id} in VPC {endpoint.vpc_id}."
report.resource_id = endpoint.id report.resource_id = endpoint.id
else: else:
report.status = "FAIL" account_id = principal_arn.split(":")[4]
report.status_extended = f"Found untrusted account {account_id} in VPC Endpoint {endpoint.id} in VPC {endpoint.vpc_id}." if (
report.resource_id = endpoint.id account_id in trusted_account_ids
or account_id in vpc_client.audited_account
):
report.status = "PASS"
report.status_extended = f"Found trusted account {account_id} in VPC Endpoint {endpoint.id} in VPC {endpoint.vpc_id}."
report.resource_id = endpoint.id
else:
report.status = "FAIL"
report.status_extended = f"Found untrusted account {account_id} in VPC Endpoint {endpoint.id} in VPC {endpoint.vpc_id}."
report.resource_id = endpoint.id
findings.append(report) findings.append(report)
return findings return findings

View File

@@ -1,3 +1,4 @@
import json
from unittest.mock import patch from unittest.mock import patch
import botocore import botocore
@@ -34,7 +35,7 @@ filesystem_policy = {
def mock_make_api_call(self, operation_name, kwarg): def mock_make_api_call(self, operation_name, kwarg):
if operation_name == "DescribeFileSystemPolicy": if operation_name == "DescribeFileSystemPolicy":
return {"FileSystemId": file_system_id, "Policy": filesystem_policy} return {"FileSystemId": file_system_id, "Policy": json.dumps(filesystem_policy)}
if operation_name == "DescribeBackupPolicy": if operation_name == "DescribeBackupPolicy":
return {"BackupPolicy": {"Status": backup_policy_status}} return {"BackupPolicy": {"Status": backup_policy_status}}
return make_api_call(self, operation_name, kwarg) return make_api_call(self, operation_name, kwarg)