feat(S3): add S3 service and checks (#1450)

Co-authored-by: sergargar <sergio@verica.io>
Co-authored-by: Pepe Fagoaga <pepe@verica.io>
This commit is contained in:
Sergio Garcia
2022-11-08 18:06:06 +01:00
committed by GitHub
parent bbecd505eb
commit 19ab29628f
47 changed files with 2095 additions and 757 deletions

View File

@@ -14,7 +14,7 @@ class cloudtrail_cloudwatch_logging_enabled(Check):
report = Check_Report(self.metadata)
report.region = trail.region
report.resource_id = trail.name
report.resource_arn = trail.trail_arn
report.resource_arn = trail.arn
report.status = "PASS"
if trail.is_multiregion:
report.status_extended = (

View File

@@ -10,7 +10,7 @@ class cloudtrail_kms_encryption_enabled(Check):
report = Check_Report(self.metadata)
report.region = trail.region
report.resource_id = trail.name
report.resource_arn = trail.trail_arn
report.resource_arn = trail.arn
report.status = "FAIL"
if trail.is_multiregion:
report.status_extended = (

View File

@@ -10,7 +10,7 @@ class cloudtrail_log_file_validation_enabled(Check):
report = Check_Report(self.metadata)
report.region = trail.region
report.resource_id = trail.name
report.resource_arn = trail.trail_arn
report.resource_arn = trail.arn
report.status = "FAIL"
if trail.is_multiregion:
report.status_extended = (

View File

@@ -12,7 +12,7 @@ class cloudtrail_logs_s3_bucket_access_logging_enabled(Check):
report = Check_Report(self.metadata)
report.region = trail.region
report.resource_id = trail.name
report.resource_arn = trail.trail_arn
report.resource_arn = trail.arn
report.status = "FAIL"
if trail.is_multiregion:
report.status_extended = f"Multiregion Trail {trail.name} S3 bucket access logging is not enabled for bucket {trail_bucket}"

View File

@@ -12,7 +12,7 @@ class cloudtrail_logs_s3_bucket_is_not_publicly_accessible(Check):
report = Check_Report(self.metadata)
report.region = trail.region
report.resource_id = trail.name
report.resource_arn = trail.trail_arn
report.resource_arn = trail.arn
report.status = "PASS"
if trail.is_multiregion:
report.status_extended = f"S3 Bucket {trail_bucket} from multiregion trail {trail.name} is not publicly accessible"

View File

@@ -25,13 +25,14 @@ class cloudtrail_multi_region_enabled(Check):
else:
report.status_extended = f"Trail {trail.name} is not multiregion and it is logging"
report.resource_id = trail.name
report.resource_arn = trail.trail_arn
report.resource_arn = trail.arn
trail_in_region = True # Trail enabled in region
else:
report.status = "FAIL"
report.status_extended = (
f"No CloudTrail trails enabled and logging were found"
)
report.region = cloudtrail_client.region
report.resource_arn = "No trails"
report.resource_id = "No trails"
actual_region = trail.region
@@ -42,6 +43,7 @@ class cloudtrail_multi_region_enabled(Check):
)
report.resource_arn = "No trails"
report.resource_id = "No trails"
report.region = cloudtrail_client.region
findings.append(report)
return findings

View File

@@ -0,0 +1,35 @@
{
"Provider": "aws",
"CheckID": "cloudtrail_s3_dataevents_enabled",
"CheckTitle": "Check if S3 buckets have Object-level logging enabled in CloudTrail.",
"CheckType": ["Logging and Monitoring"],
"ServiceName": "s3",
"SubServiceName": "",
"ResourceIdTemplate": "arn:partition:service:region:account-id:resource-id",
"Severity": "low",
"ResourceType": "AwsS3Bucket",
"Description": "Ensure that all your AWS CloudTrail trails are configured to log Data events in order to record S3 object-level API operations, such as GetObject, DeleteObject and PutObject, for individual S3 buckets or for all current and future S3 buckets provisioned in your AWS account.",
"Risk": "If logs are not enabled, monitoring of service use and threat analysis is not possible.",
"RelatedUrl": "",
"Remediation": {
"Code": {
"CLI": "aws cloudtrail put-event-selectors --trail-name <YOUR_TRAIL_NAME_HERE> --event-selectors '[{ 'ReadWriteType': 'All', 'IncludeManagementEvents':true, 'DataResources': [{ 'Type': 'AWS::S3::Object', 'Values': ['arn:aws:s3'] }] }]'",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Enable logs. Create an S3 lifecycle policy. Define use cases, metrics and automated responses where applicable.",
"Url": "https://docs.aws.amazon.com/AmazonS3/latest/userguide/enable-cloudtrail-logging-for-s3.html"
}
},
"Categories": [],
"Tags": {
"Tag1Key": "value",
"Tag2Key": "value"
},
"DependsOn": [],
"RelatedTo": [],
"Notes": "",
"Compliance": []
}

View File

@@ -0,0 +1,30 @@
from lib.check.models import Check, Check_Report
from providers.aws.services.cloudtrail.cloudtrail_client import cloudtrail_client
class cloudtrail_s3_dataevents_enabled(Check):
def execute(self):
findings = []
report = Check_Report(self.metadata)
report.region = cloudtrail_client.region
report.resource_id = "No trails"
report.resource_arn = "No trails"
report.status = "FAIL"
report.status_extended = f"No CloudTrail trails have a data event to record all S3 object-level API operations."
for trail in cloudtrail_client.trails:
for data_event in trail.data_events:
# Check if trail has a data event for all S3 Buckets for write/read
if data_event["ReadWriteType"] == "All":
for resource in data_event["DataResources"]:
if "AWS::S3::Object" == resource["Type"] and (
"arn:aws:s3" in resource["Values"]
or "arn:aws:s3:::*/*" in resource["Values"]
):
report.region = trail.region
report.resource_id = trail.name
report.resource_arn = trail.arn
report.status = "PASS"
report.status_extended = f"Trail {trail.name} have a data event to record all S3 object-level API operations."
findings.append(report)
return findings

View File

@@ -0,0 +1,143 @@
from re import search
from unittest import mock
from boto3 import client
from moto import mock_cloudtrail, mock_s3
class Test_cloudtrail_s3_dataevents_enabled:
@mock_cloudtrail
@mock_s3
def test_trail_without_data_events(self):
cloudtrail_client_us_east_1 = client("cloudtrail", region_name="us-east-1")
s3_client_us_east_1 = client("s3", region_name="us-east-1")
trail_name_us = "trail_test_us"
bucket_name_us = "bucket_test_us"
s3_client_us_east_1.create_bucket(Bucket=bucket_name_us)
cloudtrail_client_us_east_1.create_trail(
Name=trail_name_us, S3BucketName=bucket_name_us, IsMultiRegionTrail=False
)
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.cloudtrail.cloudtrail_service import Cloudtrail
current_audit_info.audited_partition = "aws"
with mock.patch(
"providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_enabled.cloudtrail_s3_dataevents_enabled.cloudtrail_client",
new=Cloudtrail(current_audit_info),
):
# Test Check
from providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_enabled.cloudtrail_s3_dataevents_enabled import (
cloudtrail_s3_dataevents_enabled,
)
check = cloudtrail_s3_dataevents_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert search(
"No CloudTrail trails have a data event to record all S3 object-level API operations.",
result[0].status_extended,
)
assert result[0].resource_id == "No trails"
assert result[0].resource_arn == "No trails"
@mock_cloudtrail
@mock_s3
def test_trail_without_s3_data_events(self):
cloudtrail_client_us_east_1 = client("cloudtrail", region_name="us-east-1")
s3_client_us_east_1 = client("s3", region_name="us-east-1")
trail_name_us = "trail_test_us"
bucket_name_us = "bucket_test_us"
s3_client_us_east_1.create_bucket(Bucket=bucket_name_us)
cloudtrail_client_us_east_1.create_trail(
Name=trail_name_us, S3BucketName=bucket_name_us, IsMultiRegionTrail=False
)
data_events_response = cloudtrail_client_us_east_1.put_event_selectors(
TrailName=trail_name_us,
EventSelectors=[
{
"ReadWriteType": "All",
"IncludeManagementEvents": True,
"DataResources": [
{"Type": "AWS::Lambda::Function", "Values": ["arn:aws:lambda"]}
],
}
],
)["EventSelectors"]
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.cloudtrail.cloudtrail_service import Cloudtrail
current_audit_info.audited_partition = "aws"
with mock.patch(
"providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_enabled.cloudtrail_s3_dataevents_enabled.cloudtrail_client",
new=Cloudtrail(current_audit_info),
):
# Test Check
from providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_enabled.cloudtrail_s3_dataevents_enabled import (
cloudtrail_s3_dataevents_enabled,
)
check = cloudtrail_s3_dataevents_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert search(
"No CloudTrail trails have a data event to record all S3 object-level API operations.",
result[0].status_extended,
)
assert result[0].resource_id == "No trails"
assert result[0].resource_arn == "No trails"
@mock_cloudtrail
@mock_s3
def test_trail_with_s3_data_events(self):
cloudtrail_client_us_east_1 = client("cloudtrail", region_name="us-east-1")
s3_client_us_east_1 = client("s3", region_name="us-east-1")
trail_name_us = "trail_test_us"
bucket_name_us = "bucket_test_us"
s3_client_us_east_1.create_bucket(Bucket=bucket_name_us)
trail_us = cloudtrail_client_us_east_1.create_trail(
Name=trail_name_us, S3BucketName=bucket_name_us, IsMultiRegionTrail=False
)
data_events_response = cloudtrail_client_us_east_1.put_event_selectors(
TrailName=trail_name_us,
EventSelectors=[
{
"ReadWriteType": "All",
"IncludeManagementEvents": True,
"DataResources": [
{"Type": "AWS::S3::Object", "Values": ["arn:aws:s3:::*/*"]}
],
}
],
)["EventSelectors"]
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.cloudtrail.cloudtrail_service import Cloudtrail
current_audit_info.audited_partition = "aws"
with mock.patch(
"providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_enabled.cloudtrail_s3_dataevents_enabled.cloudtrail_client",
new=Cloudtrail(current_audit_info),
):
# Test Check
from providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_enabled.cloudtrail_s3_dataevents_enabled import (
cloudtrail_s3_dataevents_enabled,
)
check = cloudtrail_s3_dataevents_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert search(
"have a data event to record all S3 object-level API operations.",
result[0].status_extended,
)
assert result[0].resource_id == trail_name_us
assert result[0].resource_arn == trail_us["TrailARN"]

View File

@@ -3,7 +3,10 @@ import threading
from dataclasses import dataclass
from lib.logger import logger
from providers.aws.aws_provider import generate_regional_clients
from providers.aws.aws_provider import (
generate_regional_clients,
get_region_global_service,
)
################### CLOUDTRAIL
@@ -12,10 +15,12 @@ class Cloudtrail:
self.service = "cloudtrail"
self.session = audit_info.audit_session
self.audited_account = audit_info.audited_account
self.region = get_region_global_service(audit_info)
self.regional_clients = generate_regional_clients(self.service, audit_info)
self.trails = []
self.__threading_call__(self.__get_trails__)
self.__get_trail_status__()
self.__get_event_selectors__()
def __get_session__(self):
return self.session
@@ -44,7 +49,7 @@ class Cloudtrail:
name=trail["Name"],
is_multiregion=trail["IsMultiRegionTrail"],
home_region=trail["HomeRegion"],
trail_arn=trail["TrailARN"],
arn=trail["TrailARN"],
region=regional_client.region,
is_logging=False,
log_file_validation_enabled=trail[
@@ -53,6 +58,7 @@ class Cloudtrail:
latest_cloudwatch_delivery_time=None,
s3_bucket=trail["S3BucketName"],
kms_key=kms_key_id,
data_events=[],
)
)
else:
@@ -61,19 +67,20 @@ class Cloudtrail:
name=None,
is_multiregion=None,
home_region=None,
trail_arn=None,
arn=None,
region=regional_client.region,
is_logging=None,
log_file_validation_enabled=None,
latest_cloudwatch_delivery_time=None,
s3_bucket=None,
kms_key=None,
data_events=[],
)
)
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}: {error}"
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def __get_trail_status__(self):
@@ -82,7 +89,7 @@ class Cloudtrail:
for trail in self.trails:
for region, client in self.regional_clients.items():
if trail.region == region and trail.name:
status = client.get_trail_status(Name=trail.trail_arn)
status = client.get_trail_status(Name=trail.arn)
trail.is_logging = status["IsLogging"]
if "LatestCloudWatchLogsDeliveryTime" in status:
trail.latest_cloudwatch_delivery_time = status[
@@ -90,7 +97,24 @@ class Cloudtrail:
]
except Exception as error:
logger.error(f"{client.region} -- {error.__class__.__name__}: {error}")
logger.error(
f"{client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def __get_event_selectors__(self):
logger.info("Cloudtrail - Getting event selector")
try:
for trail in self.trails:
for region, client in self.regional_clients.items():
if trail.region == region and trail.name:
data_events = client.get_event_selectors(TrailName=trail.arn)
if "EventSelectors" in data_events:
for event in data_events["EventSelectors"]:
trail.data_events.append(event)
except Exception as error:
logger.error(
f"{client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
@dataclass
@@ -98,34 +122,37 @@ class Trail:
name: str
is_multiregion: bool
home_region: str
trail_arn: str
arn: str
region: str
is_logging: bool
log_file_validation_enabled: bool
latest_cloudwatch_delivery_time: datetime
s3_bucket: str
kms_key: str
data_events: list
def __init__(
self,
name,
is_multiregion,
home_region,
trail_arn,
arn,
region,
is_logging,
log_file_validation_enabled,
latest_cloudwatch_delivery_time,
s3_bucket,
kms_key,
data_events,
):
self.name = name
self.is_multiregion = is_multiregion
self.home_region = home_region
self.trail_arn = trail_arn
self.arn = arn
self.region = region
self.is_logging = is_logging
self.log_file_validation_enabled = log_file_validation_enabled
self.latest_cloudwatch_delivery_time = latest_cloudwatch_delivery_time
self.s3_bucket = s3_bucket
self.kms_key = kms_key
self.data_events = data_events

View File

@@ -58,8 +58,6 @@ class Test_Cloudtrail_Service:
cloudtrail = Cloudtrail(audit_info)
assert cloudtrail.audited_account == AWS_ACCOUNT_NUMBER
# WAITING FOR MOTO PR TO BE APPROVED (https://github.com/spulec/moto/pull/5607)
@mock_cloudtrail
@mock_s3
def test_describe_trails(self):
@@ -85,7 +83,7 @@ class Test_Cloudtrail_Service:
)
audit_info = self.set_mocked_audit_info()
cloudtrail = Cloudtrail(audit_info)
# Here we are expecting 2, but moto does something weird and return 46 records
# 1 None result per region plus 2 created
assert len(cloudtrail.trails) == 23
for trail in cloudtrail.trails:
if trail.name:
@@ -131,7 +129,7 @@ class Test_Cloudtrail_Service:
)
audit_info = self.set_mocked_audit_info()
cloudtrail = Cloudtrail(audit_info)
# Here we are expecting 2, but moto does something weird and return 46 records
# 1 None result per region plus 2 created
assert len(cloudtrail.trails) == 23
for trail in cloudtrail.trails:
if trail.name:
@@ -143,3 +141,46 @@ class Test_Cloudtrail_Service:
assert trail.log_file_validation_enabled
assert not trail.latest_cloudwatch_delivery_time
assert trail.s3_bucket == bucket_name_us
@mock_cloudtrail
@mock_s3
def test_get_event_selectors(self):
cloudtrail_client_us_east_1 = client("cloudtrail", region_name="us-east-1")
s3_client_us_east_1 = client("s3", region_name="us-east-1")
trail_name_us = "trail_test_us"
bucket_name_us = "bucket_test_us"
s3_client_us_east_1.create_bucket(Bucket=bucket_name_us)
cloudtrail_client_us_east_1.create_trail(
Name=trail_name_us,
S3BucketName=bucket_name_us,
IsMultiRegionTrail=False,
EnableLogFileValidation=True,
)
cloudtrail_client_us_east_1.start_logging(Name=trail_name_us)
data_events_response = cloudtrail_client_us_east_1.put_event_selectors(
TrailName=trail_name_us,
EventSelectors=[
{
"ReadWriteType": "All",
"IncludeManagementEvents": True,
"DataResources": [
{"Type": "AWS::S3::Object", "Values": ["arn:aws:s3:::*/*"]}
],
}
],
)["EventSelectors"]
audit_info = self.set_mocked_audit_info()
cloudtrail = Cloudtrail(audit_info)
# 1 None result per region plus 2 created
assert len(cloudtrail.trails) == 23
for trail in cloudtrail.trails:
if trail.name:
if trail.name == trail_name_us:
assert not trail.is_multiregion
assert trail.home_region == "us-east-1"
assert trail.region == "us-east-1"
assert trail.is_logging
assert trail.log_file_validation_enabled
assert not trail.latest_cloudwatch_delivery_time
assert trail.s3_bucket == bucket_name_us
assert trail.data_events == data_events_response

View File

@@ -1,66 +0,0 @@
#!/usr/bin/env bash
# Prowler - the handy cloud security tool (copyright 2018) by Toni de la Fuente
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed
# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
CHECK_ID_extra7172="7.172"
CHECK_TITLE_extra7172="[extra7172] Check if S3 buckets have ACLs enabled"
CHECK_SCORED_extra7172="NOT_SCORED"
CHECK_CIS_LEVEL_extra7172="EXTRA"
CHECK_SEVERITY_extra7172="Medium"
CHECK_ASFF_RESOURCE_TYPE_extra7172="AwsS3Bucket"
CHECK_ALTERNATE_check7172="extra7172"
CHECK_SERVICENAME_extra7172="s3"
CHECK_RISK_extra7172='S3 ACLs are a legacy access control mechanism that predates IAM. IAM and bucket policies are currently the preferred methods.'
CHECK_REMEDIATION_extra7172='Ensure that S3 ACLs are disabled (BucketOwnerEnforced). Use IAM policies and bucket policies to manage access.'
CHECK_DOC_extra7172='https://docs.aws.amazon.com/AmazonS3/latest/userguide/about-object-ownership.html'
CHECK_CAF_EPIC_extra7172='Logging and Monitoring'
extra7172(){
# "Check if S3 buckets have server access logging enabled"
LIST_OF_BUCKETS=$("${AWSCLI}" s3api list-buckets ${PROFILE_OPT} --query Buckets[*].Name --region "${REGION}" --output text 2>&1)
if grep -q -E 'AccessDenied|UnauthorizedOperation|AuthorizationError' <<< "$LIST_OF_BUCKETS"; then
textInfo "${REGION}: Access Denied Trying to list buckets" "${REGION}"
exit
fi
if [[ $LIST_OF_BUCKETS ]]; then
for bucket in $LIST_OF_BUCKETS;do
# Recover Bucket region
BUCKET_REGION=$("${AWSCLI}" ${PROFILE_OPT} s3api get-bucket-location --bucket "${bucket}" --region "${REGION}" --query LocationConstraint --output text)
if grep -q -E 'AccessDenied|UnauthorizedOperation|AuthorizationError' <<< "${BUCKET_REGION}"; then
textInfo "${REGION}: Access Denied trying to get bucket location for ${bucket}" "${REGION}"
fi
# If None use default region
if [[ "${BUCKET_REGION}" == "None" ]]; then
BUCKET_REGION="${REGION}"
fi
BUCKET_ACLS_DISABLED=$(${AWSCLI} ${PROFILE_OPT} s3api get-bucket-ownership-controls --bucket "${bucket}" --region "${BUCKET_REGION}" --output text 2>&1)
if grep -q -E 'AccessDenied|UnauthorizedOperation|AuthorizationError' <<< "${BUCKET_ACLS_DISABLED}" ; then
textInfo "${BUCKET_REGION}: Access Denied Trying to Get Bucket Ownership Controls for ${bucket}" "${BUCKET_REGION}" "${bucket}"
continue
elif grep -q -E 'IllegalLocationConstraintException' <<< "${BUCKET_ACLS_DISABLED}"; then
textInfo "${BUCKET_REGION}: Location Constraint Trying to Get Bucket Ownership Controls for ${bucket}" "${BUCKET_REGION}" "${bucket}"
continue
fi
if grep -q "BucketOwnerEnforced" <<< "${BUCKET_ACLS_DISABLED}"; then
textPass "${BUCKET_REGION}: Bucket ${bucket} has bucket ACLs disabled!" "${BUCKET_REGION}" "${bucket}"
elif grep -q "BucketOwnerPreferred" <<< "${BUCKET_ACLS_DISABLED}"; then
textFail "${BUCKET_REGION}: Bucket ${bucket} has bucket ACLs enabled!" "${BUCKET_REGION}" "${bucket}"
elif grep -q "OwnershipControlsNotFoundError" <<< "${BUCKET_ACLS_DISABLED}"; then
textFail "${BUCKET_REGION}: Bucket ${bucket} has bucket ACLs enabled!" "${BUCKET_REGION}" "${bucket}"
else
textFail "${BUCKET_REGION}: Bucket ${bucket} returned an unknown error" "${BUCKET_REGION}" "${bucket}"
fi
done
else
textInfo "${REGION}: No S3 Buckets found" "${REGION}"
fi
}

View File

@@ -1,56 +0,0 @@
#!/usr/bin/env bash
# Prowler - the handy cloud security tool (copyright 2018) by Toni de la Fuente
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed
# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
CHECK_ID_extra718="7.18"
CHECK_TITLE_extra718="[extra718] Check if S3 buckets have server access logging enabled"
CHECK_SCORED_extra718="NOT_SCORED"
CHECK_CIS_LEVEL_extra718="EXTRA"
CHECK_SEVERITY_extra718="Medium"
CHECK_ASFF_RESOURCE_TYPE_extra718="AwsS3Bucket"
CHECK_ALTERNATE_check718="extra718"
CHECK_SERVICENAME_extra718="s3"
CHECK_RISK_extra718='Server access logs can assist you in security and access audits; help you learn about your customer base; and understand your Amazon S3 bill.'
CHECK_REMEDIATION_extra718='Ensure that S3 buckets have Logging enabled. CloudTrail data events can be used in place of S3 bucket logging. If that is the case; this finding can be considered a false positive.'
CHECK_DOC_extra718='https://docs.aws.amazon.com/AmazonS3/latest/dev/security-best-practices.html'
CHECK_CAF_EPIC_extra718='Logging and Monitoring'
extra718(){
# "Check if S3 buckets have server access logging enabled "
LIST_OF_BUCKETS=$("${AWSCLI}" s3api list-buckets ${PROFILE_OPT} --query Buckets[*].Name --output text|xargs -n1)
if [[ $LIST_OF_BUCKETS ]]; then
for bucket in $LIST_OF_BUCKETS;do
# Recover Bucket region
BUCKET_REGION=$("${AWSCLI}" ${PROFILE_OPT} s3api get-bucket-location --bucket "${bucket}" --region "${REGION}" --query LocationConstraint --output text)
if grep -q -E 'AccessDenied|UnauthorizedOperation|AuthorizationError' <<< "${BUCKET_REGION}"; then
textInfo "${REGION}: Access Denied trying to get bucket location for ${bucket}" "${REGION}"
continue
fi
# If None use default region
if [[ "${BUCKET_REGION}" == "None" ]]; then
BUCKET_REGION="${REGION}"
fi
BUCKET_SERVER_LOG_ENABLED=$("${AWSCLI}" s3api get-bucket-logging --bucket "${bucket}" ${PROFILE_OPT} --region "${BUCKET_REGION}" --query [LoggingEnabled] --output text 2>&1)
if grep -q AccessDenied <<< "${BUCKET_SERVER_LOG_ENABLED}"; then
textInfo "${BUCKET_REGION}: Access Denied Trying to Get Bucket Logging for ${bucket}" "${BUCKET_REGION}" "${bucket}"
continue
fi
if grep -q "^None$" <<< "${BUCKET_SERVER_LOG_ENABLED}"; then
textFail "${BUCKET_REGION}: Bucket ${bucket} has server access logging disabled!" "${BUCKET_REGION}" "${bucket}"
else
textPass "${BUCKET_REGION}: Bucket ${bucket} has server access logging enabled" "${BUCKET_REGION}" "${bucket}"
fi
done
else
textInfo "${REGION}: No S3 Buckets found" "${REGION}" "${bucket}"
fi
}

View File

@@ -1,67 +0,0 @@
#!/usr/bin/env bash
# Prowler - the handy cloud security tool (copyright 2018) by Toni de la Fuente
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed
# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
CHECK_ID_extra725="7.25"
CHECK_TITLE_extra725="[extra725] Check if S3 buckets have Object-level logging enabled in CloudTrail"
CHECK_SCORED_extra725="NOT_SCORED"
CHECK_CIS_LEVEL_extra725="EXTRA"
CHECK_SEVERITY_extra725="Medium"
CHECK_ASFF_RESOURCE_TYPE_extra725="AwsS3Bucket"
CHECK_ALTERNATE_check725="extra725"
CHECK_SERVICENAME_extra725="s3"
CHECK_RISK_extra725='If logs are not enabled; monitoring of service use and threat analysis is not possible.'
CHECK_REMEDIATION_extra725='Enable logs. Create an S3 lifecycle policy. Define use cases; metrics and automated responses where applicable.'
CHECK_DOC_extra725='https://docs.aws.amazon.com/AmazonS3/latest/userguide/enable-cloudtrail-logging-for-s3.html'
CHECK_CAF_EPIC_extra725='Logging and Monitoring'
# per Object-level logging is not configured at Bucket level but at CloudTrail trail level
extra725(){
# "Check if S3 buckets have Object-level logging enabled in CloudTrail "
LIST_OF_BUCKETS=$($AWSCLI s3api list-buckets $PROFILE_OPT --region $REGION --query 'Buckets[*].{Name:Name}' --output text 2>&1)
if [[ $(echo "$LIST_OF_BUCKETS" | grep AccessDenied) ]]; then
textInfo "$REGION: Access Denied trying to list buckets"
return
fi
LIST_OF_TRAILS=$($AWSCLI cloudtrail describe-trails $PROFILE_OPT --region $REGION --query 'trailList[].TrailARN' --output text 2>&1)
if [[ $(echo "$LIST_OF_TRAILS" | grep AccessDenied) ]]; then
textInfo "$REGION: Access Denied trying to describe trails"
return
fi
if [[ $LIST_OF_BUCKETS ]]; then
for bucketName in $LIST_OF_BUCKETS; do
if [[ $LIST_OF_TRAILS ]]; then
BUCKET_ENABLED_TRAILS=()
for trail in $LIST_OF_TRAILS; do
BUCKET_ENABLED_IN_TRAIL=$($AWSCLI cloudtrail get-event-selectors --region $REGION $PROFILE_OPT --trail-name $trail --query "EventSelectors[*].DataResources[?Type == \`AWS::S3::Object\`].Values" --output text |xargs -n1| grep -E "^arn:${AWS_PARTITION}:s3:::$bucketName/\S*$|^arn:${AWS_PARTITION}:s3$|^arn:${AWS_PARTITION}:s3:::$")
if [[ $BUCKET_ENABLED_IN_TRAIL ]]; then
BUCKET_ENABLED_TRAILS+=($trail)
fi
done
if [[ ${#BUCKET_ENABLED_TRAILS[@]} -gt 0 ]]; then
for trail in "${BUCKET_ENABLED_TRAILS[@]}"; do
textPass "$REGION: S3 bucket $bucketName has Object-level logging enabled in trail $trail" "$REGION" "$bucketName"
done
else
textFail "$REGION: S3 bucket $bucketName has Object-level logging disabled" "$REGION" "$bucketName"
fi
else
textFail "$REGION: S3 bucket $bucketName is not being recorded no CloudTrail found!" "$REGION" "$bucketName"
fi
done
else
textInfo "$REGION: No S3 buckets found" "$REGION"
fi
}

View File

@@ -1,158 +0,0 @@
#!/usr/bin/env bash
# Prowler - the handy cloud security tool (copyright 2018) by Toni de la Fuente
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed
# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
CHECK_ID_extra73="7.3"
CHECK_TITLE_extra73="[extra73] Ensure there are no S3 buckets open to Everyone or Any AWS user"
CHECK_SCORED_extra73="NOT_SCORED"
CHECK_CIS_LEVEL_extra73="EXTRA"
CHECK_SEVERITY_extra73="Critical"
CHECK_ASFF_RESOURCE_TYPE_extra73="AwsS3Bucket"
CHECK_ALTERNATE_extra703="extra73"
CHECK_ALTERNATE_check73="extra73"
CHECK_ALTERNATE_check703="extra73"
CHECK_SERVICENAME_extra73="s3"
CHECK_RISK_extra73='Even if you enable all possible bucket ACL options available in the Amazon S3 console the ACL alone does not allow everyone to download objects from your bucket. Depending on which option you select any user could perform some actions.'
CHECK_REMEDIATION_extra73='You can enable block public access settings only for access points; buckets; and AWS accounts. Amazon S3 does not support block public access settings on a per-object basis. When you apply block public access settings to an account; the settings apply to all AWS Regions globally. The settings might not take effect in all Regions immediately or simultaneously; but they eventually propagate to all Regions.'
CHECK_DOC_extra73='https://docs.aws.amazon.com/AmazonS3/latest/userguide/access-control-block-public-access.html'
CHECK_CAF_EPIC_extra73='Data Protection'
# Verified with AWS support that if get-bucket-acl doesn't return a grant
# for All and get-bucket-policy-status returns IsPublic false or bad request
# (no policy) then the bucket can be considered not public - though
# individual objects may still be. If in addition put-public-access-block is
# used to set IgnorePublicAcls and RestrictPublicBuckets to true then that
# causes Amazon S3 to ignore all public ACLs on a bucket and any objects that
# it contains.
#
# This check does not address legacy ACLs or policies that would give
# public access if not blocked at account or bucket level, instead it tries
# to reward the use of more broadly restrictive controls with quicker and less
# computational intensive checks.
#
# If we are assembling an inventory then maybe that is not what we want but
# for day to day usage that is probably desirable.
extra73(){
#
# If public ACLs disabled at account level then look no further
#
ACCOUNT_PUBLIC_ACCESS_BLOCK=$($AWSCLI s3control get-public-access-block $PROFILE_OPT --region $REGION --account-id $ACCOUNT_NUM --output json 2>&1)
if [[ $(echo "$ACCOUNT_PUBLIC_ACCESS_BLOCK" | grep AccessDenied) ]]; then
textInfo "$REGION: Access Denied getting PublicAccessBlock configuration for AWS account" "$REGION" "$bucket"
return
fi
if [[ $(echo "$ACCOUNT_PUBLIC_ACCESS_BLOCK" | grep NoSuchPublicAccessBlockConfiguration) ]]; then
ACCOUNTIGNOREPUBLICACLS=""
ACCOUNTRESTRICTPUBLICBUCKETS=""
else
ACCOUNTIGNOREPUBLICACLS=$(echo "$ACCOUNT_PUBLIC_ACCESS_BLOCK" | jq -r '.PublicAccessBlockConfiguration.IgnorePublicAcls')
ACCOUNTRESTRICTPUBLICBUCKETS=$(echo "$ACCOUNT_PUBLIC_ACCESS_BLOCK" | jq -r '.PublicAccessBlockConfiguration.RestrictPublicBuckets')
fi
if [[ $ACCOUNTIGNOREPUBLICACLS == "true" && $ACCOUNTRESTRICTPUBLICBUCKETS == "true" ]]; then
textPass "$REGION: All S3 public access blocked at account level" "$REGION" "$bucket"
return
fi
#
# Otherwise start to iterate bucket
#
ALL_BUCKETS_LIST=$($AWSCLI s3api list-buckets --query 'Buckets[*].{Name:Name}' $PROFILE_OPT --output text 2>&1)
if [[ $(echo "$ALL_BUCKETS_LIST" | grep AccessDenied) ]]; then
textInfo "$REGION: Access Denied Trying to List Buckets" "$REGION" "$bucket"
return
fi
if [[ "$ALL_BUCKETS_LIST" == "" ]]; then
textInfo "$REGION: No buckets found" "$REGION" "$bucket"
return
fi
for bucket in $ALL_BUCKETS_LIST; do
#
# LOCATION - requests referencing buckets created after March 20, 2019
# must be made to S3 endpoints in the same region as the bucket was
# created.
#
BUCKET_LOCATION=$($AWSCLI s3api get-bucket-location $PROFILE_OPT --region $REGION --bucket $bucket --output text 2>&1)
if [[ $(echo "$BUCKET_LOCATION" | grep AccessDenied) ]]; then
textInfo "$REGION: Access Denied Trying to Get Bucket Location for $bucket" "$REGION" "$bucket"
continue
fi
if [[ $BUCKET_LOCATION == "None" ]]; then
BUCKET_LOCATION="us-east-1"
fi
if [[ $BUCKET_LOCATION == "EU" ]]; then
BUCKET_LOCATION="eu-west-1"
fi
#
# If public ACLs disabled at bucket level then look no further
#
BUCKET_PUBLIC_ACCESS_BLOCK=$($AWSCLI s3api get-public-access-block $PROFILE_OPT --region $BUCKET_LOCATION --bucket $bucket --output json 2>&1)
if [[ $(echo "$BUCKET_PUBLIC_ACCESS_BLOCK" | grep AccessDenied) ]]; then
textInfo "$BUCKET_LOCATION: Access Denied Trying to Get Public Access Block for $bucket" "$BUCKET_LOCATION" "$bucket"
continue
fi
if [[ $(echo "$BUCKET_PUBLIC_ACCESS_BLOCK" | grep NoSuchPublicAccessBlockConfiguration) ]]; then
BUCKETIGNOREPUBLICACLS=""
BUCKETRESTRICTPUBLICBUCKETS=""
else
BUCKETIGNOREPUBLICACLS=$(echo "$BUCKET_PUBLIC_ACCESS_BLOCK" | jq -r '.PublicAccessBlockConfiguration.IgnorePublicAcls')
BUCKETRESTRICTPUBLICBUCKETS=$(echo "$BUCKET_PUBLIC_ACCESS_BLOCK" | jq -r '.PublicAccessBlockConfiguration.RestrictPublicBuckets')
fi
if [[ $BUCKETIGNOREPUBLICACLS == "true" && $BUCKETRESTRICTPUBLICBUCKETS == "true" ]]; then
textPass "$BUCKET_LOCATION: $bucket bucket is not Public" "$BUCKET_LOCATION" "$bucket"
continue
fi
#
# Check for public ACL grants
#
BUCKET_ACL=$($AWSCLI s3api get-bucket-acl $PROFILE_OPT --region $BUCKET_LOCATION --bucket $bucket --output json 2>&1)
if [[ $(echo "$BUCKET_ACL" | grep AccessDenied) ]]; then
textInfo "$BUCKET_LOCATION: Access Denied Trying to Get Bucket Acl for $bucket" "$BUCKET_LOCATION" "$bucket"
continue
fi
ALLUSERS_ACL=$(echo "$BUCKET_ACL" | jq '.Grants[]|select(.Grantee.URI != null)|select(.Grantee.URI | endswith("/AllUsers"))')
if [[ $ALLUSERS_ACL != "" ]]; then
textFail "$BUCKET_LOCATION: $bucket bucket is Public!" "$BUCKET_LOCATION" "$bucket"
continue
fi
AUTHENTICATEDUSERS_ACL=$(echo "$BUCKET_ACL" | jq '.Grants[]|select(.Grantee.URI != null)|select(.Grantee.URI | endswith("/AuthenticatedUsers"))')
if [[ $AUTHENTICATEDUSERS_ACL != "" ]]; then
textFail "$BUCKET_LOCATION: $bucket bucket is Public!" "$BUCKET_LOCATION" "$bucket"
continue
fi
#
# Check for public access in policy
#
BUCKET_POLICY_STATUS=$($AWSCLI s3api get-bucket-policy-status $PROFILE_OPT --region $BUCKET_LOCATION --bucket $bucket --query PolicyStatus.IsPublic --output text 2>&1)
if [[ $(echo "$BUCKET_POLICY_STATUS" | grep AccessDenied) ]]; then
textInfo "$BUCKET_LOCATION: Access Denied Trying to Get Bucket Policy Status for $bucket" "$BUCKET_LOCATION" "$bucket"
continue
fi
if [[ $(echo "$BUCKET_POLICY_STATUS" | grep NoSuchBucketPolicy) ]]; then
BUCKET_POLICY_STATUS="False"
fi
if [[ $BUCKET_POLICY_STATUS != "" && $BUCKET_POLICY_STATUS != "False" ]]; then
textFail "$BUCKET_LOCATION: $bucket bucket is Public!" "$BUCKET_LOCATION" "$bucket"
continue
fi
textPass "$BUCKET_LOCATION: $bucket bucket is not Public" "$BUCKET_LOCATION" "$bucket"
done
}

View File

@@ -1,95 +0,0 @@
#!/usr/bin/env bash
# Prowler - the handy cloud security tool (copyright 2018) by Toni de la Fuente
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed
# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
CHECK_ID_extra734="7.34"
CHECK_TITLE_extra734="[extra734] Check if S3 buckets have default encryption (SSE) enabled or use a bucket policy to enforce it"
CHECK_SCORED_extra734="NOT_SCORED"
CHECK_CIS_LEVEL_extra734="EXTRA"
CHECK_SEVERITY_extra734="Medium"
CHECK_ASFF_RESOURCE_TYPE_extra734="AwsS3Bucket"
CHECK_ALTERNATE_check734="extra734"
CHECK_ASFF_COMPLIANCE_TYPE_extra734="ens-mp.info.3.s3.1"
CHECK_SERVICENAME_extra734="s3"
CHECK_RISK_extra734='Amazon S3 default encryption provides a way to set the default encryption behavior for an S3 bucket. This will ensure data-at-rest is encrypted.'
CHECK_REMEDIATION_extra734='Ensure that S3 buckets has encryption at rest enabled.'
CHECK_DOC_extra734='https://docs.aws.amazon.com/AmazonS3/latest/dev/bucket-encryption.html'
CHECK_CAF_EPIC_extra734='Data Protection'
extra734(){
LIST_OF_BUCKETS=$("${AWSCLI}" s3api list-buckets ${PROFILE_OPT} --region "${REGION}" --query Buckets[*].Name --output text|xargs -n1)
if [[ $LIST_OF_BUCKETS ]]; then
for bucket in $LIST_OF_BUCKETS;do
BUCKET_LOCATION=$($AWSCLI s3api get-bucket-location ${PROFILE_OPT} --region "${REGION}" --bucket "${bucket}" --output text 2>&1)
if grep -q 'AccessDenied' <<< "${BUCKET_LOCATION}"; then
textInfo "${REGION}: Access Denied Trying to Get Bucket Location for ${bucket}" "${REGION}" "${bucket}"
continue
fi
if [[ $BUCKET_LOCATION == "None" ]]; then
BUCKET_LOCATION="us-east-1"
fi
if [[ $BUCKET_LOCATION == "EU" ]]; then
BUCKET_LOCATION="eu-west-1"
fi
# For this test to pass one of the following must be present:
# - Configure ServerSideEncryptionConfiguration rule for AES256 or aws:kms
# OR
# - Have bucket policy denying s3:PutObject when s3:x-amz-server-side-encryption is absent
# query to get if has encryption enabled or not
RESULT=$("${AWSCLI}" s3api get-bucket-encryption ${PROFILE_OPT} --region ${BUCKET_LOCATION} --bucket "${bucket}" --query ServerSideEncryptionConfiguration.Rules[].ApplyServerSideEncryptionByDefault[].SSEAlgorithm --output text 2>&1)
if grep -q 'AccessDenied' <<< "${RESULT}"; then
textInfo "${BUCKET_LOCATION}: Access Denied Trying to Get Encryption for ${bucket}" "${BUCKET_LOCATION}" "${bucket}"
continue
elif grep -q 'ServerSideEncryptionConfigurationNotFoundError' <<< "${RESULT}"
then
textFail "${BUCKET_LOCATION}: Server Side Encryption configuration is not configured for ${bucket}" "${BUCKET_LOCATION}" "${bucket}"
continue
fi
if [[ "${RESULT}" == "AES256" || "${RESULT}" == "aws:kms" ]];
then
textPass "${BUCKET_LOCATION}: Bucket $bucket is enabled for default encryption with ${RESULT}" "${BUCKET_LOCATION}" "${bucket}"
continue
fi
TEMP_SSE_POLICY_FILE=$(mktemp -t prowler-"${ACCOUNT_NUM}"-"${bucket}".policy.XXXXXXXXXX)
# get bucket policy
"${AWSCLI}" s3api get-bucket-policy ${PROFILE_OPT} --bucket "${bucket}" --region "${BUCKET_LOCATION}" --output text --query Policy > "${TEMP_SSE_POLICY_FILE}" 2>&1
if grep -q 'AccessDenied' <<< "${TEMP_SSE_POLICY_FILE}"; then
textInfo "${BUCKET_LOCATION}: Access Denied Trying to Get Bucket Policy for ${bucket}" "${BUCKET_LOCATION}" "${bucket}"
rm -f "${TEMP_SSE_POLICY_FILE}"
continue
fi
if grep -q 'NoSuchBucketPolicy' <<< "${TEMP_SSE_POLICY_FILE}"; then
textFail "${BUCKET_LOCATION}: No bucket policy for ${bucket}" "${BUCKET_LOCATION}" "${bucket}"
rm -f "${TEMP_SSE_POLICY_FILE}"
continue
fi
# check if the S3 policy forces SSE s3:x-amz-server-side-encryption:true
CHECK_BUCKET_SSE_POLICY_PRESENT=$(jq --arg arn "arn:${AWS_PARTITION}:s3:::${bucket}/*" '.Statement[]|select(.Effect=="Deny" and ((.Principal|type == "object") and .Principal.AWS == "*") or ((.Principal|type == "string") and .Principal == "*") and .Action=="s3:PutObject" and .Resource==$arn and .Condition.StringNotEquals."s3:x-amz-server-side-encryption" != null)' < "${TEMP_SSE_POLICY_FILE}")
if [[ "${CHECK_BUCKET_SSE_POLICY_PRESENT}" == "" ]]; then
textFail "${BUCKET_LOCATION}: Bucket ${bucket} does not enforce encryption!" "${BUCKET_LOCATION}" "${bucket}"
rm -f "${TEMP_SSE_POLICY_FILE}"
continue
fi
CHECK_BUCKET_SSE_POLICY_VALUE=$(jq -r '.Condition.StringNotEquals."s3:x-amz-server-side-encryption"' <<< "${CHECK_BUCKET_SSE_POLICY_PRESENT}")
textPass "${BUCKET_LOCATION}: Bucket ${bucket} has S3 bucket policy to enforce encryption with ${CHECK_BUCKET_SSE_POLICY_VALUE}" "${BUCKET_LOCATION}" "${bucket}"
rm -f "${TEMP_SSE_POLICY_FILE}"
done
else
textInfo "${REGION}: No S3 Buckets found" "${REGION}"
fi
}

View File

@@ -1,56 +0,0 @@
#!/usr/bin/env bash
# Prowler - the handy cloud security tool (copyright 2018) by Toni de la Fuente
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed
# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
CHECK_ID_extra763="7.63"
CHECK_TITLE_extra763="[extra763] Check if S3 buckets have object versioning enabled "
CHECK_SCORED_extra763="NOT_SCORED"
CHECK_CIS_LEVEL_extra763="EXTRA"
CHECK_SEVERITY_extra763="Medium"
CHECK_ASFF_RESOURCE_TYPE_extra763="AwsS3Bucket"
CHECK_ALTERNATE_check763="extra763"
CHECK_SERVICENAME_extra763="s3"
CHECK_RISK_extra763=' With versioning; you can easily recover from both unintended user actions and application failures.'
CHECK_REMEDIATION_extra763='Configure versioning using the Amazon console or API for buckets with sensitive information that is changing frecuently; and backup may not be enough to capture all the changes.'
CHECK_DOC_extra763='https://docs.aws.amazon.com/AmazonS3/latest/dev-retired/Versioning.html'
CHECK_CAF_EPIC_extra763='Data Protection'
extra763(){
# "Check if S3 buckets have object versioning enabled "
LIST_OF_BUCKETS=$($AWSCLI s3api list-buckets ${PROFILE_OPT} --region "${REGION}" --query Buckets[*].Name --output text|xargs -n1)
if [[ $LIST_OF_BUCKETS ]]; then
for bucket in $LIST_OF_BUCKETS; do
# Recover Bucket region
BUCKET_REGION=$("${AWSCLI}" ${PROFILE_OPT} s3api get-bucket-location --bucket "${bucket}" --region "${REGION}" --query LocationConstraint --output text)
if grep -q -E 'AccessDenied|UnauthorizedOperation|AuthorizationError' <<< "${BUCKET_REGION}"; then
textInfo "${REGION}: Access Denied trying to get bucket location for ${bucket}" "${REGION}"
continue
fi
# If None use default region
if [[ "${BUCKET_REGION}" == "None" ]]; then
BUCKET_REGION="${REGION}"
fi
BUCKET_VERSIONING_ENABLED=$("${AWSCLI}" s3api get-bucket-versioning --bucket "${bucket}" ${PROFILE_OPT} --region "${BUCKET_REGION}" --query Status --output text 2>&1)
if grep -q 'AccessDenied' <<< "${BUCKET_VERSIONING_ENABLED}"; then
textInfo "${BUCKET_REGION}: Access Denied Trying to Get Bucket Versioning for $bucket"
continue
fi
if grep -q "^Enabled$" <<< "${BUCKET_VERSIONING_ENABLED}"; then
textPass "${BUCKET_REGION}: Bucket ${bucket} has versioning enabled" "${BUCKET_REGION}" "${bucket}"
else
textFail "${BUCKET_REGION}: Bucket ${bucket} has versioning disabled!" "${BUCKET_REGION}" "${bucket}"
fi
done
else
textInfo "${REGION}: No S3 Buckets found" "${REGION}"
fi
}

View File

@@ -1,75 +0,0 @@
#!/usr/bin/env bash
# Prowler - the handy cloud security tool (copyright 2018) by Toni de la Fuente
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed
# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
CHECK_ID_extra764="7.64"
CHECK_TITLE_extra764="[extra764] Check if S3 buckets have secure transport policy "
CHECK_SCORED_extra764="NOT_SCORED"
CHECK_CIS_LEVEL_extra764="EXTRA"
CHECK_SEVERITY_extra764="Medium"
CHECK_ASFF_RESOURCE_TYPE_extra764="AwsS3Bucket"
CHECK_ALTERNATE_check764="extra764"
CHECK_ASFF_COMPLIANCE_TYPE_extra764="ens-mp.com.2.aws.s3.1"
CHECK_SERVICENAME_extra764="s3"
CHECK_RISK_extra764='If HTTPS is not enforced on the bucket policy; communication between clients and S3 buckets can use unencrypted HTTP. As a result; sensitive information could be transmitted in clear text over the network or internet.'
CHECK_REMEDIATION_extra764='Ensure that S3 buckets has encryption in transit enabled.'
CHECK_DOC_extra764='https://docs.aws.amazon.com/AmazonS3/latest/dev/security-best-practices.html'
CHECK_CAF_EPIC_extra764='Data Protection'
extra764(){
LIST_OF_BUCKETS=$(${AWSCLI} s3api list-buckets ${PROFILE_OPT} --query Buckets[*].Name --output text --region ${REGION}|xargs -n1)
if [[ $LIST_OF_BUCKETS ]]; then
for bucket in $LIST_OF_BUCKETS;do
BUCKET_LOCATION=$(${AWSCLI} s3api get-bucket-location ${PROFILE_OPT} --region ${REGION} --bucket ${bucket} --output text 2>&1)
if grep -q -E 'AccessDenied|UnauthorizedOperation|AuthorizationError' <<< "${BUCKET_LOCATION}"; then
textInfo "${REGION}: Access Denied Trying to Get Bucket Location for ${bucket}" "${REGION}" "${bucket}"
continue
elif grep -E 'NoSuchBucket' <<< "${BUCKET_LOCATION}"; then
textInfo "${REGION}: NoSuchBucket error Bucket ${bucket} does not exist" "${REGION}" "${bucket}"
continue
fi
if [[ "${BUCKET_LOCATION}" == "None" ]]; then
BUCKET_LOCATION="us-east-1"
fi
if [[ "${BUCKET_LOCATION}" == "EU" ]]; then
BUCKET_LOCATION="eu-west-1"
fi
# get bucket policy
TEMP_STP_POLICY_FILE=$(${AWSCLI} s3api get-bucket-policy ${PROFILE_OPT} --bucket "${bucket}" --output text --query Policy --region "${BUCKET_LOCATION}" 2>&1)
if grep -q -E 'AccessDenied|UnauthorizedOperation|AuthorizationError' <<< "${TEMP_STP_POLICY_FILE}"; then
textInfo "${BUCKET_LOCATION}: Access Denied Trying to Get Bucket Policy for ${bucket}" "${BUCKET_LOCATION}" "${bucket}"
continue
fi
if grep -q "NoSuchBucketPolicy" <<< "${TEMP_STP_POLICY_FILE}" ; then
textFail "${BUCKET_LOCATION}: No bucket policy for ${bucket}" "${BUCKET_LOCATION}" "${bucket}"
continue
fi
# https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-policy-for-config-rule/
# checking if $TEMP_STP_POLICY_FILE is a valid json before converting it to json with jq
if jq -e . >/dev/null 2>&1 <<< "${TEMP_STP_POLICY_FILE}"; then
CHECK_BUCKET_STP_POLICY_PRESENT=$(jq --arg arn "arn:${AWS_PARTITION}:s3:::${bucket}" \
'.Statement[]|select((((.Principal|type == "object") and .Principal.AWS == "*") or ((.Principal|type == "string") and .Principal == "*")) and .Effect=="Deny" and (.Action=="s3:*" or .Action=="*") and (.Resource|type == "array") and (.Resource|map({(.):0})[]|has($arn)) and (.Resource|map({(.):0})[]|has($arn+"/*")) and .Condition.Bool."aws:SecureTransport" == "false")' <<< "${TEMP_STP_POLICY_FILE}")
if [[ "${CHECK_BUCKET_STP_POLICY_PRESENT}" ]]; then
textPass "${BUCKET_LOCATION}: Bucket ${bucket} has S3 bucket policy to deny requests over insecure transport" "${BUCKET_LOCATION}" "${bucket}"
else
textFail "${BUCKET_LOCATION}: Bucket ${bucket} allows requests over insecure transport" "${BUCKET_LOCATION}" "${bucket}"
fi
else
textInfo "${BUCKET_LOCATION}: Bucket ${bucket} returned an unknown error" "${BUCKET_LOCATION}" "${bucket}"
fi
done
else
textInfo "${REGION}: No S3 Buckets found" "${REGION}"
fi
}

View File

@@ -1,78 +0,0 @@
#!/usr/bin/env bash
# Prowler - the handy cloud security tool (copyright 2018) by Toni de la Fuente
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed
# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
CHECK_ID_extra771="7.71"
CHECK_TITLE_extra771="[extra771] Check if S3 buckets have policies which allow WRITE access "
CHECK_SCORED_extra771="NOT_SCORED"
CHECK_CIS_LEVEL_extra771="EXTRA"
CHECK_SEVERITY_extra771="Critical"
CHECK_ASFF_RESOURCE_TYPE_extra771="AwsS3Bucket"
CHECK_ALTERNATE_check771="extra771"
CHECK_SERVICENAME_extra771="s3"
CHECK_RISK_extra771='Non intended users can put objects in a given bucket.'
CHECK_REMEDIATION_extra771='Ensure proper bucket policy is in place with the least privilege principle applied.'
CHECK_DOC_extra771='https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_examples_s3_rw-bucket.html'
CHECK_CAF_EPIC_extra771='IAM'
extra771(){
LIST_OF_BUCKETS=$("${AWSCLI}" s3api list-buckets ${PROFILE_OPT} --region "${REGION}" --query "sort_by(Buckets, &Name)[].Name" --output text 2>&1)
if grep -q -E 'AccessDenied|UnauthorizedOperation|AuthorizationError' <<< "${LIST_OF_BUCKETS}"; then
textInfo "${REGION}: Access Denied trying to list buckets" "${REGION}"
return
fi
if [[ "${LIST_OF_BUCKETS}" ]]; then
for bucket in ${LIST_OF_BUCKETS};do
# Recover Bucket region
BUCKET_REGION=$("${AWSCLI}" ${PROFILE_OPT} s3api get-bucket-location --bucket "${bucket}" --query LocationConstraint --output text)
if grep -q -E 'AccessDenied|UnauthorizedOperation|AuthorizationError' <<< "${BUCKET_POLICY_STATEMENTS}"; then
textInfo "${REGION}: Access Denied trying to get bucket policy for ${bucket}" "${REGION}"
fi
# If None use default region
if [[ "${BUCKET_REGION}" == "None" ]]; then
BUCKET_REGION="${REGION}"
fi
# Recover Bucket policy statements
BUCKET_POLICY_STATEMENTS=$("${AWSCLI}" s3api ${PROFILE_OPT} get-bucket-policy --region "${BUCKET_REGION}" --bucket "${bucket}" --output json --query Policy 2>&1)
if grep -q -E 'AccessDenied|UnauthorizedOperation|AuthorizationError' <<< "${BUCKET_POLICY_STATEMENTS}"; then
textInfo "${REGION}: Access Denied trying to get bucket policy for ${bucket}" "${REGION}"
continue
fi
if grep -q -E 'NoSuchBucketPolicy'<<< "${BUCKET_POLICY_STATEMENTS}"; then
textInfo "${REGION}: Bucket policy does not exist for bucket ${bucket}" "${REGION}"
else
BUCKET_POLICY_BAD_STATEMENTS=$(jq --compact-output --arg arn "arn:${AWS_PARTITION}:s3:::$bucket" 'fromjson | .Statement[]|select(
.Effect=="Allow" and
(
( (.Principal|type == "object") and (.Principal.AWS == "*") ) or
( (.Principal|type == "string") and (.Principal == "*") )
) and
(
( (.Action|type == "string") and (.Action|startswith("s3:Put")) ) or
( (.Action|type == "string") and (.Action|startswith("s3:*")) ) or
( (.Action|type == "array") and (.Action[]|startswith("s3:Put")) ) or
( (.Action|type == "array") and (.Action[]|startswith("s3:*")) )
) and
.Condition == null
)' <<< "${BUCKET_POLICY_STATEMENTS}" | tr '\n' ' ')
# Make sure JSON comma characted will not break CSV output. Replace "," by word "[comma]"
BUCKET_POLICY_BAD_STATEMENTS="${BUCKET_POLICY_BAD_STATEMENTS//,/[comma]}"
if [[ "${BUCKET_POLICY_BAD_STATEMENTS}" != "" ]]; then
textFail "${REGION}: Bucket ${bucket} allows public write: ${BUCKET_POLICY_BAD_STATEMENTS}" "${REGION}" "${bucket}"
else
textPass "${REGION}: Bucket ${bucket} has S3 bucket policy which does not allow public write access" "${REGION}" "${bucket}"
fi
fi
done
else
textInfo "${REGION}: No S3 Buckets found" "${REGION}"
fi
}

View File

@@ -0,0 +1,35 @@
{
"Provider": "aws",
"CheckID": "s3_bucket_acl_prohibited",
"CheckTitle": "Check if S3 buckets have ACLs enabled",
"CheckType": ["Logging and Monitoring"],
"ServiceName": "s3",
"SubServiceName": "",
"ResourceIdTemplate": "arn:partition:service:region:account-id:resource-id",
"Severity": "medium",
"ResourceType": "AwsS3Bucket",
"Description": "Check if S3 buckets have ACLs enabled",
"Risk": "S3 ACLs are a legacy access control mechanism that predates IAM. IAM and bucket policies are currently the preferred methods.",
"RelatedUrl": "",
"Remediation": {
"Code": {
"CLI": "aws s3api put-bucket-ownership-controls --bucket <bucket-name> --ownership-controls Rules=[{ObjectOwnership=BucketOwnerEnforced}]",
"NativeIaC": "https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-s3-bucket-ownershipcontrols.html",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Ensure that S3 ACLs are disabled (BucketOwnerEnforced). Use IAM policies and bucket policies to manage access.",
"Url": "https://docs.aws.amazon.com/AmazonS3/latest/userguide/about-object-ownership.html"
}
},
"Categories": [],
"Tags": {
"Tag1Key": "value",
"Tag2Key": "value"
},
"DependsOn": [],
"RelatedTo": [],
"Notes": "",
"Compliance": []
}

View File

@@ -0,0 +1,22 @@
from lib.check.models import Check, Check_Report
from providers.aws.services.s3.s3_client import s3_client
class s3_bucket_acl_prohibited(Check):
def execute(self):
findings = []
for bucket in s3_client.buckets:
report = Check_Report(self.metadata)
report.region = bucket.region
report.resource_id = bucket.name
report.status = "FAIL"
report.status_extended = f"S3 Bucket {bucket.name} has bucket ACLs enabled."
if bucket.ownership:
if "BucketOwnerEnforced" in bucket.ownership:
report.status = "PASS"
report.status_extended = (
f"S3 Bucket {bucket.name} has bucket ACLs disabled."
)
findings.append(report)
return findings

View File

@@ -0,0 +1,105 @@
from re import search
from unittest import mock
from boto3 import client
from moto import mock_s3
class Test_s3_bucket_acl_prohibited:
@mock_s3
def test_bucket_no_ownership(self):
s3_client_us_east_1 = client("s3", region_name="us-east-1")
bucket_name_us = "bucket_test_us"
s3_client_us_east_1.create_bucket(Bucket=bucket_name_us)
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.s3.s3_service import S3
current_audit_info.audited_partition = "aws"
with mock.patch(
"providers.aws.services.s3.s3_bucket_acl_prohibited.s3_bucket_acl_prohibited.s3_client",
new=S3(current_audit_info),
):
# Test Check
from providers.aws.services.s3.s3_bucket_acl_prohibited.s3_bucket_acl_prohibited import (
s3_bucket_acl_prohibited,
)
check = s3_bucket_acl_prohibited()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert search(
"ACLs enabled",
result[0].status_extended,
)
assert result[0].resource_id == bucket_name_us
assert result[0].region == "us-east-1"
@mock_s3
def test_bucket_without_ownership(self):
s3_client_us_east_1 = client("s3", region_name="us-east-1")
bucket_name_us = "bucket_test_us"
s3_client_us_east_1.create_bucket(Bucket=bucket_name_us)
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.s3.s3_service import S3
current_audit_info.audited_partition = "aws"
with mock.patch(
"providers.aws.services.s3.s3_bucket_acl_prohibited.s3_bucket_acl_prohibited.s3_client",
new=S3(current_audit_info),
):
# Test Check
from providers.aws.services.s3.s3_bucket_acl_prohibited.s3_bucket_acl_prohibited import (
s3_bucket_acl_prohibited,
)
check = s3_bucket_acl_prohibited()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert search(
"ACLs enabled",
result[0].status_extended,
)
assert result[0].resource_id == bucket_name_us
assert result[0].region == "us-east-1"
@mock_s3
def test_bucket_acl_disabled(self):
s3_client_us_east_1 = client("s3", region_name="us-east-1")
bucket_name_us = "bucket_test_us"
s3_client_us_east_1.create_bucket(
Bucket=bucket_name_us, ObjectOwnership="BucketOwnerEnforced"
)
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.s3.s3_service import S3
current_audit_info.audited_partition = "aws"
with mock.patch(
"providers.aws.services.s3.s3_bucket_acl_prohibited.s3_bucket_acl_prohibited.s3_client",
new=S3(current_audit_info),
):
# Test Check
from providers.aws.services.s3.s3_bucket_acl_prohibited.s3_bucket_acl_prohibited import (
s3_bucket_acl_prohibited,
)
check = s3_bucket_acl_prohibited()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert search(
"ACLs disabled",
result[0].status_extended,
)
assert result[0].resource_id == bucket_name_us
assert result[0].region == "us-east-1"

View File

@@ -0,0 +1,35 @@
{
"Provider": "aws",
"CheckID": "s3_bucket_default_encryption",
"CheckTitle": "Check if S3 buckets have default encryption (SSE) enabled or use a bucket policy to enforce it.",
"CheckType": ["Data Protection"],
"ServiceName": "s3",
"SubServiceName": "",
"ResourceIdTemplate": "arn:partition:service:region:account-id:resource-id",
"Severity": "medium",
"ResourceType": "AwsS3Bucket",
"Description": "Check if S3 buckets have default encryption (SSE) enabled or use a bucket policy to enforce it.",
"Risk": "Amazon S3 default encryption provides a way to set the default encryption behavior for an S3 bucket. This will ensure data-at-rest is encrypted.",
"RelatedUrl": "",
"Remediation": {
"Code": {
"CLI": "aws s3api put-bucket-encryption --bucket <bucket_name> --server-side-encryption-configuration '{'Rules': [{'ApplyServerSideEncryptionByDefault': {'SSEAlgorithm': 'AES256'}}]}'",
"NativeIaC": "https://docs.bridgecrew.io/docs/s3_14-data-encrypted-at-rest#cloudformation",
"Other": "",
"Terraform": "https://docs.bridgecrew.io/docs/s3_14-data-encrypted-at-rest#terraform"
},
"Recommendation": {
"Text": "Ensure that S3 buckets has encryption at rest enabled.",
"Url": "https://aws.amazon.com/blogs/security/how-to-prevent-uploads-of-unencrypted-objects-to-amazon-s3/"
}
},
"Categories": [],
"Tags": {
"Tag1Key": "value",
"Tag2Key": "value"
},
"DependsOn": [],
"RelatedTo": [],
"Notes": "",
"Compliance": []
}

View File

@@ -0,0 +1,19 @@
from lib.check.models import Check, Check_Report
from providers.aws.services.s3.s3_client import s3_client
class s3_bucket_default_encryption(Check):
def execute(self):
findings = []
for bucket in s3_client.buckets:
report = Check_Report(self.metadata)
report.region = bucket.region
report.resource_id = bucket.name
if bucket.encryption:
report.status = "PASS"
report.status_extended = f"S3 Bucket {bucket.name} has Server Side Encryption with {bucket.encryption}."
else:
report.status = "FAIL"
report.status_extended = f"Server Side Encryption is not configured for S3 Bucket {bucket.name}."
findings.append(report)
return findings

View File

@@ -0,0 +1,86 @@
from re import search
from unittest import mock
from boto3 import client
from moto import mock_s3
class Test_s3_bucket_default_encryption:
@mock_s3
def test_bucket_no_encryption(self):
s3_client_us_east_1 = client("s3", region_name="us-east-1")
bucket_name_us = "bucket_test_us"
s3_client_us_east_1.create_bucket(Bucket=bucket_name_us)
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.s3.s3_service import S3
current_audit_info.audited_partition = "aws"
with mock.patch(
"providers.aws.services.s3.s3_bucket_default_encryption.s3_bucket_default_encryption.s3_client",
new=S3(current_audit_info),
):
# Test Check
from providers.aws.services.s3.s3_bucket_default_encryption.s3_bucket_default_encryption import (
s3_bucket_default_encryption,
)
check = s3_bucket_default_encryption()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert search(
"Server Side Encryption is not configured",
result[0].status_extended,
)
assert result[0].resource_id == bucket_name_us
assert result[0].region == "us-east-1"
@mock_s3
def test_bucket_kms_encryption(self):
s3_client_us_east_1 = client("s3", region_name="us-east-1")
bucket_name_us = "bucket_test_us"
s3_client_us_east_1.create_bucket(
Bucket=bucket_name_us, ObjectOwnership="BucketOwnerEnforced"
)
sse_config = {
"Rules": [
{
"ApplyServerSideEncryptionByDefault": {
"SSEAlgorithm": "aws:kms",
"KMSMasterKeyID": "12345678",
}
}
]
}
s3_client_us_east_1.put_bucket_encryption(
Bucket=bucket_name_us, ServerSideEncryptionConfiguration=sse_config
)
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.s3.s3_service import S3
current_audit_info.audited_partition = "aws"
with mock.patch(
"providers.aws.services.s3.s3_bucket_default_encryption.s3_bucket_default_encryption.s3_client",
new=S3(current_audit_info),
):
# Test Check
from providers.aws.services.s3.s3_bucket_default_encryption.s3_bucket_default_encryption import (
s3_bucket_default_encryption,
)
check = s3_bucket_default_encryption()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert search(
"has Server Side Encryption",
result[0].status_extended,
)
assert result[0].resource_id == bucket_name_us
assert result[0].region == "us-east-1"

View File

@@ -15,8 +15,8 @@
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "",
"Terraform": ""
"Other": "https://docs.bridgecrew.io/docs/s3_16-enable-versioning#aws-console",
"Terraform": "https://docs.bridgecrew.io/docs/s3_16-enable-versioning#terraform"
},
"Recommendation": {
"Text": "Configure versioning using the Amazon console or API for buckets with sensitive information that is changing frecuently; and backup may not be enough to capture all the changes.",

View File

@@ -0,0 +1,76 @@
from re import search
from unittest import mock
from boto3 import client
from moto import mock_s3
class Test_s3_bucket_object_versioning:
@mock_s3
def test_bucket_no_object_versioning(self):
s3_client_us_east_1 = client("s3", region_name="us-east-1")
bucket_name_us = "bucket_test_us"
s3_client_us_east_1.create_bucket(Bucket=bucket_name_us)
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.s3.s3_service import S3
current_audit_info.audited_partition = "aws"
with mock.patch(
"providers.aws.services.s3.s3_bucket_object_versioning.s3_bucket_object_versioning.s3_client",
new=S3(current_audit_info),
):
# Test Check
from providers.aws.services.s3.s3_bucket_object_versioning.s3_bucket_object_versioning import (
s3_bucket_object_versioning,
)
check = s3_bucket_object_versioning()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert search(
"versioning disabled",
result[0].status_extended,
)
assert result[0].resource_id == bucket_name_us
assert result[0].region == "us-east-1"
@mock_s3
def test_bucket_object_versioning_enabled(self):
s3_client_us_east_1 = client("s3", region_name="us-east-1")
bucket_name_us = "bucket_test_us"
s3_client_us_east_1.create_bucket(
Bucket=bucket_name_us, ObjectOwnership="BucketOwnerEnforced"
)
s3_client_us_east_1.put_bucket_versioning(
Bucket=bucket_name_us,
VersioningConfiguration={"Status": "Enabled"},
)
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.s3.s3_service import S3
current_audit_info.audited_partition = "aws"
with mock.patch(
"providers.aws.services.s3.s3_bucket_object_versioning.s3_bucket_object_versioning.s3_client",
new=S3(current_audit_info),
):
# Test Check
from providers.aws.services.s3.s3_bucket_object_versioning.s3_bucket_object_versioning import (
s3_bucket_object_versioning,
)
check = s3_bucket_object_versioning()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert search(
"versioning enabled",
result[0].status_extended,
)
assert result[0].resource_id == bucket_name_us
assert result[0].region == "us-east-1"

View File

@@ -0,0 +1,35 @@
{
"Provider": "aws",
"CheckID": "s3_bucket_policy_public_write_access",
"CheckTitle": "Check if S3 buckets have policies which allow WRITE access.",
"CheckType": ["IAM"],
"ServiceName": "s3",
"SubServiceName": "",
"ResourceIdTemplate": "arn:partition:service:region:account-id:resource-id",
"Severity": "critical",
"ResourceType": "AwsS3Bucket",
"Description": "Check if S3 buckets have policies which allow WRITE access.",
"Risk": "Non intended users can put objects in a given bucket.",
"RelatedUrl": "",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "https://docs.bridgecrew.io/docs/s3_18-write-permissions-public#aws-console",
"Terraform": ""
},
"Recommendation": {
"Text": "Ensure proper bucket policy is in place with the least privilege principle applied.",
"Url": "https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_examples_s3_rw-bucket.html"
}
},
"Categories": [],
"Tags": {
"Tag1Key": "value",
"Tag2Key": "value"
},
"DependsOn": [],
"RelatedTo": [],
"Notes": "",
"Compliance": []
}

View File

@@ -0,0 +1,36 @@
from lib.check.models import Check, Check_Report
from providers.aws.services.s3.s3_client import s3_client
class s3_bucket_policy_public_write_access(Check):
def execute(self):
findings = []
for bucket in s3_client.buckets:
report = Check_Report(self.metadata)
report.region = bucket.region
report.resource_id = bucket.name
# Check if bucket policy allow public write access
if not bucket.policy:
report.status = "PASS"
report.status_extended = (
f"S3 Bucket {bucket.name} does not have a bucket policy."
)
else:
report.status = "PASS"
report.status_extended = f"S3 Bucket {bucket.name} does not allow public write access in the bucket policy."
for statement in bucket.policy["Statement"]:
if (
statement["Effect"] == "Allow"
and "Condition" not in statement
and "*" in str(statement["Principal"])
and (
"s3:PutObject" in statement["Action"]
or "*" in statement["Action"]
or "s3:*" in statement["Action"]
)
):
report.status = "FAIL"
report.status_extended = f"S3 Bucket {bucket.name} allows public write access in the bucket policy.."
findings.append(report)
return findings

View File

@@ -0,0 +1,116 @@
from re import search
from unittest import mock
from boto3 import client
from moto import mock_s3
class Test_s3_bucket_policy_public_write_access:
@mock_s3
def test_bucket_no_policy(self):
s3_client_us_east_1 = client("s3", region_name="us-east-1")
bucket_name_us = "bucket_test_us"
s3_client_us_east_1.create_bucket(Bucket=bucket_name_us)
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.s3.s3_service import S3
current_audit_info.audited_partition = "aws"
with mock.patch(
"providers.aws.services.s3.s3_bucket_policy_public_write_access.s3_bucket_policy_public_write_access.s3_client",
new=S3(current_audit_info),
):
# Test Check
from providers.aws.services.s3.s3_bucket_policy_public_write_access.s3_bucket_policy_public_write_access import (
s3_bucket_policy_public_write_access,
)
check = s3_bucket_policy_public_write_access()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert search(
"does not have a bucket policy",
result[0].status_extended,
)
assert result[0].resource_id == bucket_name_us
assert result[0].region == "us-east-1"
@mock_s3
def test_bucket_comply_policy(self):
s3_client_us_east_1 = client("s3", region_name="us-east-1")
bucket_name_us = "bucket_test_us"
s3_client_us_east_1.create_bucket(
Bucket=bucket_name_us, ObjectOwnership="BucketOwnerEnforced"
)
encryption_policy = '{"Version": "2012-10-17","Id": "PutObjPolicy","Statement": [{"Sid": "DenyIncorrectEncryptionHeader","Effect": "Deny","Principal": "*","Action": "s3:PutObject","Resource": "arn:aws:s3:::bucket_test_us/*","Condition": {"StringNotEquals": {"s3:x-amz-server-side-encryption": "aws:kms"}}}]}'
s3_client_us_east_1.put_bucket_policy(
Bucket=bucket_name_us,
Policy=encryption_policy,
)
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.s3.s3_service import S3
current_audit_info.audited_partition = "aws"
with mock.patch(
"providers.aws.services.s3.s3_bucket_policy_public_write_access.s3_bucket_policy_public_write_access.s3_client",
new=S3(current_audit_info),
):
# Test Check
from providers.aws.services.s3.s3_bucket_policy_public_write_access.s3_bucket_policy_public_write_access import (
s3_bucket_policy_public_write_access,
)
check = s3_bucket_policy_public_write_access()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert search(
"does not allow public write access in the bucket policy",
result[0].status_extended,
)
assert result[0].resource_id == bucket_name_us
assert result[0].region == "us-east-1"
@mock_s3
def test_bucket_public_write_policy(self):
s3_client_us_east_1 = client("s3", region_name="us-east-1")
bucket_name_us = "bucket_test_us"
s3_client_us_east_1.create_bucket(
Bucket=bucket_name_us, ObjectOwnership="BucketOwnerEnforced"
)
public_write_policy = '{"Version": "2012-10-17","Id": "PutObjPolicy","Statement": [{"Sid": "PublicWritePolicy","Effect": "Allow","Principal": "*","Action": "s3:PutObject","Resource": "arn:aws:s3:::bucket_test_us/*"}]}'
s3_client_us_east_1.put_bucket_policy(
Bucket=bucket_name_us,
Policy=public_write_policy,
)
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.s3.s3_service import S3
current_audit_info.audited_partition = "aws"
with mock.patch(
"providers.aws.services.s3.s3_bucket_policy_public_write_access.s3_bucket_policy_public_write_access.s3_client",
new=S3(current_audit_info),
):
# Test Check
from providers.aws.services.s3.s3_bucket_policy_public_write_access.s3_bucket_policy_public_write_access import (
s3_bucket_policy_public_write_access,
)
check = s3_bucket_policy_public_write_access()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert search(
"allows public write access in the bucket policy",
result[0].status_extended,
)
assert result[0].resource_id == bucket_name_us
assert result[0].region == "us-east-1"

View File

@@ -0,0 +1,35 @@
{
"Provider": "aws",
"CheckID": "s3_bucket_public_access",
"CheckTitle": "Ensure there are no S3 buckets open to Everyone or Any AWS user.",
"CheckType": ["Data Protection"],
"ServiceName": "s3",
"SubServiceName": "",
"ResourceIdTemplate": "arn:partition:service:region:account-id:resource-id",
"Severity": "critical",
"ResourceType": "AwsS3Bucket",
"Description": "Ensure there are no S3 buckets open to Everyone or Any AWS user.",
"Risk": "Even if you enable all possible bucket ACL options available in the Amazon S3 console the ACL alone does not allow everyone to download objects from your bucket. Depending on which option you select any user could perform some actions.",
"RelatedUrl": "",
"Remediation": {
"Code": {
"CLI": "aws s3api put-public-access-block --public-access-block-configuration BlockPublicAcls=true,IgnorePublicAcls=true,BlockPublicPolicy=true,RestrictPublicBuckets=true --bucket <bucket_name>",
"NativeIaC": "",
"Other": "https://github.com/cloudmatos/matos/tree/master/remediations/aws/s3/s3/block-public-access",
"Terraform": "https://docs.bridgecrew.io/docs/s3-bucket-should-have-public-access-blocks-defaults-to-false-if-the-public-access-block-is-not-attached#terraform"
},
"Recommendation": {
"Text": "You can enable block public access settings only for access points, buckets and AWS accounts. Amazon S3 does not support block public access settings on a per-object basis. When you apply block public access settings to an account; the settings apply to all AWS Regions globally. The settings might not take effect in all Regions immediately or simultaneously, but they eventually propagate to all Regions.",
"Url": "https://docs.aws.amazon.com/AmazonS3/latest/userguide/access-control-block-public-access.html"
}
},
"Categories": [],
"Tags": {
"Tag1Key": "value",
"Tag2Key": "value"
},
"DependsOn": [],
"RelatedTo": [],
"Notes": "",
"Compliance": []
}

View File

@@ -0,0 +1,65 @@
from lib.check.models import Check, Check_Report
from providers.aws.services.s3.s3_client import s3_client
from providers.aws.services.s3.s3control_client import s3control_client
class s3_bucket_public_access(Check):
def execute(self):
findings = []
# 1. Check if public buckets are restricted at account level
if (
s3control_client.account_public_access_block.ignore_public_acls
and s3control_client.account_public_access_block.restrict_public_buckets
):
report = Check_Report(self.metadata)
report.status = "PASS"
report.status_extended = f"All S3 public access blocked at account level."
report.region = s3control_client.region
report.resource_id = s3_client.audited_account
findings.append(report)
else:
# 2. If public access is not blocked at account level, check it at each bucket level
for bucket in s3_client.buckets:
report = Check_Report(self.metadata)
report.region = bucket.region
report.resource_id = bucket.name
report.status = "PASS"
report.status_extended = f"S3 Bucket {bucket.name} is not public."
if not (
bucket.public_access_block.ignore_public_acls
and bucket.public_access_block.restrict_public_buckets
):
# 3. If bucket has no public block, check bucket ACL
for grantee in bucket.acl_grantees:
if grantee.type in "Group":
if (
"AllUsers" in grantee.URI
or "AuthenticatedUsers" in grantee.URI
):
report.status = "FAIL"
report.status_extended = f"S3 Bucket {bucket.name} has public access due to bucket ACL."
# 4. Check bucket policy
if bucket.policy:
for statement in bucket.policy["Statement"]:
if (
"*" == statement["Principal"]
and statement["Effect"] == "Allow"
):
report.status = "FAIL"
report.status_extended = f"S3 Bucket {bucket.name} has public access due to bucket policy."
else:
if (
"AWS" in statement["Principal"]
and statement["Effect"] == "Allow"
):
if type(statement["Principal"]["AWS"]) == str:
principals = [statement["Principal"]["AWS"]]
else:
principals = statement["Principal"]["AWS"]
for principal_arn in principals:
if principal_arn == "*":
report.status = "FAIL"
report.status_extended = f"S3 Bucket {bucket.name} has public access due to bucket policy."
findings.append(report)
return findings

View File

@@ -0,0 +1,399 @@
from re import search
from unittest import mock
from boto3 import client, session
from moto import mock_s3, mock_s3control
from providers.aws.lib.audit_info.models import AWS_Audit_Info
AWS_ACCOUNT_NUMBER = "123456789012"
AWS_REGION = "us-east-1"
class Test_s3_bucket_public_access:
# Mocked Audit Info
def set_mocked_audit_info(self):
audit_info = AWS_Audit_Info(
original_session=None,
audit_session=session.Session(
profile_name=None,
botocore_session=None,
region_name=AWS_REGION,
),
audited_account=AWS_ACCOUNT_NUMBER,
audited_user_id=None,
audited_partition="aws",
audited_identity_arn=None,
profile=None,
profile_region=AWS_REGION,
credentials=None,
assumed_role_info=None,
audited_regions=None,
organizations_metadata=None,
)
return audit_info
@mock_s3
@mock_s3control
def test_no_buckets(self):
from providers.aws.services.s3.s3_service import S3, S3Control
audit_info = self.set_mocked_audit_info()
with mock.patch(
"providers.aws.lib.audit_info.audit_info.current_audit_info", new=audit_info
):
with mock.patch(
"providers.aws.services.s3.s3_bucket_public_access.s3_bucket_public_access.s3_client",
new=S3(audit_info),
):
with mock.patch(
"providers.aws.services.s3.s3_bucket_public_access.s3_bucket_public_access.s3control_client",
new=S3Control(audit_info),
):
# Test Check
from providers.aws.services.s3.s3_bucket_public_access.s3_bucket_public_access import (
s3_bucket_public_access,
)
check = s3_bucket_public_access()
result = check.execute()
assert len(result) == 0
@mock_s3
@mock_s3control
def test_bucket_account_public_block_without_buckets(self):
# Generate S3Control Client
s3control_client = client("s3control", region_name=AWS_REGION)
s3control_client.put_public_access_block(
AccountId=AWS_ACCOUNT_NUMBER,
PublicAccessBlockConfiguration={
"BlockPublicAcls": True,
"IgnorePublicAcls": True,
"BlockPublicPolicy": True,
"RestrictPublicBuckets": True,
},
)
from providers.aws.services.s3.s3_service import S3, S3Control
audit_info = self.set_mocked_audit_info()
with mock.patch(
"providers.aws.lib.audit_info.audit_info.current_audit_info", new=audit_info
):
with mock.patch(
"providers.aws.services.s3.s3_bucket_public_access.s3_bucket_public_access.s3_client",
new=S3(audit_info),
):
with mock.patch(
"providers.aws.services.s3.s3_bucket_public_access.s3_bucket_public_access.s3control_client",
new=S3Control(audit_info),
):
# Test Check
from providers.aws.services.s3.s3_bucket_public_access.s3_bucket_public_access import (
s3_bucket_public_access,
)
check = s3_bucket_public_access()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "All S3 public access blocked at account level."
)
assert result[0].resource_id == AWS_ACCOUNT_NUMBER
assert result[0].region == AWS_REGION
@mock_s3
@mock_s3control
def test_bucket_account_public_block(self):
s3_client = client("s3", region_name=AWS_REGION)
bucket_name_us = "bucket_test_us"
s3_client.create_bucket(Bucket=bucket_name_us)
# Generate S3Control Client
s3control_client = client("s3control", region_name=AWS_REGION)
s3control_client.put_public_access_block(
AccountId=AWS_ACCOUNT_NUMBER,
PublicAccessBlockConfiguration={
"BlockPublicAcls": True,
"IgnorePublicAcls": True,
"BlockPublicPolicy": True,
"RestrictPublicBuckets": True,
},
)
from providers.aws.services.s3.s3_service import S3, S3Control
audit_info = self.set_mocked_audit_info()
with mock.patch(
"providers.aws.lib.audit_info.audit_info.current_audit_info", new=audit_info
):
with mock.patch(
"providers.aws.services.s3.s3_bucket_public_access.s3_bucket_public_access.s3_client",
new=S3(audit_info),
):
with mock.patch(
"providers.aws.services.s3.s3_bucket_public_access.s3_bucket_public_access.s3control_client",
new=S3Control(audit_info),
):
# Test Check
from providers.aws.services.s3.s3_bucket_public_access.s3_bucket_public_access import (
s3_bucket_public_access,
)
check = s3_bucket_public_access()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "All S3 public access blocked at account level."
)
assert result[0].resource_id == AWS_ACCOUNT_NUMBER
assert result[0].region == AWS_REGION
@mock_s3
@mock_s3control
def test_bucket_public_block(self):
s3_client = client("s3", region_name=AWS_REGION)
bucket_name_us = "bucket_test_us"
s3_client.create_bucket(Bucket=bucket_name_us)
# Generate S3Control Client
s3control_client = client("s3control", region_name=AWS_REGION)
s3control_client.put_public_access_block(
AccountId=AWS_ACCOUNT_NUMBER,
PublicAccessBlockConfiguration={
"BlockPublicAcls": False,
"IgnorePublicAcls": False,
"BlockPublicPolicy": False,
"RestrictPublicBuckets": False,
},
)
s3_client.put_public_access_block(
Bucket=bucket_name_us,
PublicAccessBlockConfiguration={
"BlockPublicAcls": True,
"IgnorePublicAcls": True,
"BlockPublicPolicy": True,
"RestrictPublicBuckets": True,
},
)
from providers.aws.services.s3.s3_service import S3, S3Control
audit_info = self.set_mocked_audit_info()
with mock.patch(
"providers.aws.lib.audit_info.audit_info.current_audit_info", new=audit_info
):
with mock.patch(
"providers.aws.services.s3.s3_bucket_public_access.s3_bucket_public_access.s3_client",
new=S3(audit_info),
):
with mock.patch(
"providers.aws.services.s3.s3_bucket_public_access.s3_bucket_public_access.s3control_client",
new=S3Control(audit_info),
):
# Test Check
from providers.aws.services.s3.s3_bucket_public_access.s3_bucket_public_access import (
s3_bucket_public_access,
)
check = s3_bucket_public_access()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert search(
"not public",
result[0].status_extended,
)
assert result[0].resource_id == bucket_name_us
assert result[0].region == AWS_REGION
@mock_s3
@mock_s3control
def test_bucket_public_ACL(self):
s3_client = client("s3", region_name=AWS_REGION)
bucket_name_us = "bucket_test_us"
s3_client.create_bucket(Bucket=bucket_name_us)
bucket_owner = s3_client.get_bucket_acl(Bucket=bucket_name_us)["Owner"]
# Generate S3Control Client
s3control_client = client("s3control", region_name=AWS_REGION)
s3control_client.put_public_access_block(
AccountId=AWS_ACCOUNT_NUMBER,
PublicAccessBlockConfiguration={
"BlockPublicAcls": False,
"IgnorePublicAcls": False,
"BlockPublicPolicy": False,
"RestrictPublicBuckets": False,
},
)
s3_client.put_public_access_block(
Bucket=bucket_name_us,
PublicAccessBlockConfiguration={
"BlockPublicAcls": False,
"IgnorePublicAcls": False,
"BlockPublicPolicy": False,
"RestrictPublicBuckets": False,
},
)
s3_client.put_bucket_acl(
Bucket=bucket_name_us,
AccessControlPolicy={
"Grants": [
{
"Grantee": {
"URI": "http://acs.amazonaws.com/groups/global/AllUsers",
"Type": "Group",
},
"Permission": "READ",
},
],
"Owner": bucket_owner,
},
)
from providers.aws.services.s3.s3_service import S3, S3Control
audit_info = self.set_mocked_audit_info()
with mock.patch(
"providers.aws.lib.audit_info.audit_info.current_audit_info", new=audit_info
):
with mock.patch(
"providers.aws.services.s3.s3_bucket_public_access.s3_bucket_public_access.s3_client",
new=S3(audit_info),
):
with mock.patch(
"providers.aws.services.s3.s3_bucket_public_access.s3_bucket_public_access.s3control_client",
new=S3Control(audit_info),
):
# Test Check
from providers.aws.services.s3.s3_bucket_public_access.s3_bucket_public_access import (
s3_bucket_public_access,
)
check = s3_bucket_public_access()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert search(
"public access due to bucket ACL",
result[0].status_extended,
)
assert result[0].resource_id == bucket_name_us
assert result[0].region == AWS_REGION
@mock_s3
@mock_s3control
def test_bucket_public_policy(self):
s3_client = client("s3", region_name=AWS_REGION)
bucket_name_us = "bucket_test_us"
s3_client.create_bucket(Bucket=bucket_name_us)
# Generate S3Control Client
s3control_client = client("s3control", region_name=AWS_REGION)
s3control_client.put_public_access_block(
AccountId=AWS_ACCOUNT_NUMBER,
PublicAccessBlockConfiguration={
"BlockPublicAcls": False,
"IgnorePublicAcls": False,
"BlockPublicPolicy": False,
"RestrictPublicBuckets": False,
},
)
s3_client.put_public_access_block(
Bucket=bucket_name_us,
PublicAccessBlockConfiguration={
"BlockPublicAcls": False,
"IgnorePublicAcls": False,
"BlockPublicPolicy": False,
"RestrictPublicBuckets": False,
},
)
public_write_policy = '{"Version": "2012-10-17","Id": "PutObjPolicy","Statement": [{"Sid": "PublicWritePolicy","Effect": "Allow","Principal": "*","Action": "s3:PutObject","Resource": "arn:aws:s3:::bucket_test_us/*"}]}'
s3_client.put_bucket_policy(
Bucket=bucket_name_us,
Policy=public_write_policy,
)
from providers.aws.services.s3.s3_service import S3, S3Control
audit_info = self.set_mocked_audit_info()
with mock.patch(
"providers.aws.lib.audit_info.audit_info.current_audit_info", new=audit_info
):
with mock.patch(
"providers.aws.services.s3.s3_bucket_public_access.s3_bucket_public_access.s3_client",
new=S3(audit_info),
):
with mock.patch(
"providers.aws.services.s3.s3_bucket_public_access.s3_bucket_public_access.s3control_client",
new=S3Control(audit_info),
):
# Test Check
from providers.aws.services.s3.s3_bucket_public_access.s3_bucket_public_access import (
s3_bucket_public_access,
)
check = s3_bucket_public_access()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert search(
"public access due to bucket policy",
result[0].status_extended,
)
assert result[0].resource_id == bucket_name_us
assert result[0].region == AWS_REGION
@mock_s3
@mock_s3control
def test_bucket_not_public(self):
s3_client = client("s3", region_name=AWS_REGION)
bucket_name_us = "bucket_test_us"
s3_client.create_bucket(Bucket=bucket_name_us)
s3_client.put_public_access_block(
Bucket=bucket_name_us,
PublicAccessBlockConfiguration={
"BlockPublicAcls": True,
"IgnorePublicAcls": True,
"BlockPublicPolicy": True,
"RestrictPublicBuckets": True,
},
)
from providers.aws.services.s3.s3_service import S3, S3Control
audit_info = self.set_mocked_audit_info()
with mock.patch(
"providers.aws.lib.audit_info.audit_info.current_audit_info", new=audit_info
):
with mock.patch(
"providers.aws.services.s3.s3_bucket_public_access.s3_bucket_public_access.s3_client",
new=S3(audit_info),
):
with mock.patch(
"providers.aws.services.s3.s3_bucket_public_access.s3_bucket_public_access.s3control_client",
new=S3Control(audit_info),
):
# Test Check
from providers.aws.services.s3.s3_bucket_public_access.s3_bucket_public_access import (
s3_bucket_public_access,
)
check = s3_bucket_public_access()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert search(
"not public",
result[0].status_extended,
)
assert result[0].resource_id == bucket_name_us
assert result[0].region == AWS_REGION

View File

@@ -0,0 +1,35 @@
{
"Provider": "aws",
"CheckID": "s3_bucket_secure_transport_policy",
"CheckTitle": "Check if S3 buckets have secure transport policy.",
"CheckType": ["Data Protection"],
"ServiceName": "s3",
"SubServiceName": "",
"ResourceIdTemplate": "arn:partition:service:region:account-id:resource-id",
"Severity": "medium",
"ResourceType": "AwsS3Bucket",
"Description": "Check if S3 buckets have secure transport policy.",
"Risk": "If HTTPS is not enforced on the bucket policy, communication between clients and S3 buckets can use unencrypted HTTP. As a result, sensitive information could be transmitted in clear text over the network or internet.",
"RelatedUrl": "",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "https://docs.bridgecrew.io/docs/s3_15-secure-data-transport#aws-console",
"Terraform": ""
},
"Recommendation": {
"Text": "Ensure that S3 buckets has encryption in transit enabled.",
"Url": "https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-policy-for-config-rule/"
}
},
"Categories": [],
"Tags": {
"Tag1Key": "value",
"Tag2Key": "value"
},
"DependsOn": [],
"RelatedTo": [],
"Notes": "",
"Compliance": []
}

View File

@@ -0,0 +1,41 @@
from lib.check.models import Check, Check_Report
from providers.aws.services.s3.s3_client import s3_client
class s3_bucket_secure_transport_policy(Check):
def execute(self):
findings = []
for bucket in s3_client.buckets:
report = Check_Report(self.metadata)
report.region = bucket.region
report.resource_id = bucket.name
# Check if bucket policy enforces SSL
if not bucket.policy:
report.status = "FAIL"
report.status_extended = f"S3 Bucket {bucket.name} does not have a bucket policy, thus it allows HTTP requests."
else:
report.status = "FAIL"
report.status_extended = f"S3 Bucket {bucket.name} allows requests over insecure transport in the bucket policy."
for statement in bucket.policy["Statement"]:
if (
statement["Effect"] == "Deny"
and "Condition" in statement
and (
"s3:PutObject" in statement["Action"]
or "*" in statement["Action"]
or "s3:*" in statement["Action"]
)
):
if "Bool" in statement["Condition"]:
if "aws:SecureTransport" in statement["Condition"]["Bool"]:
if (
statement["Condition"]["Bool"][
"aws:SecureTransport"
]
== "false"
):
report.status = "PASS"
report.status_extended = f"S3 Bucket {bucket.name} has a bucket policy to deny requests over insecure transport."
findings.append(report)
return findings

View File

@@ -0,0 +1,151 @@
from re import search
from unittest import mock
from boto3 import client
from moto import mock_s3
class Test_s3_bucket_secure_transport_policy:
@mock_s3
def test_bucket_no_policy(self):
s3_client_us_east_1 = client("s3", region_name="us-east-1")
bucket_name_us = "bucket_test_us"
s3_client_us_east_1.create_bucket(Bucket=bucket_name_us)
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.s3.s3_service import S3
current_audit_info.audited_partition = "aws"
with mock.patch(
"providers.aws.services.s3.s3_bucket_secure_transport_policy.s3_bucket_secure_transport_policy.s3_client",
new=S3(current_audit_info),
):
# Test Check
from providers.aws.services.s3.s3_bucket_secure_transport_policy.s3_bucket_secure_transport_policy import (
s3_bucket_secure_transport_policy,
)
check = s3_bucket_secure_transport_policy()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert search(
"does not have a bucket policy",
result[0].status_extended,
)
assert result[0].resource_id == bucket_name_us
assert result[0].region == "us-east-1"
@mock_s3
def test_bucket_comply_policy(self):
s3_client_us_east_1 = client("s3", region_name="us-east-1")
bucket_name_us = "bucket_test_us"
s3_client_us_east_1.create_bucket(Bucket=bucket_name_us)
ssl_policy = """
{
"Version": "2012-10-17",
"Id": "PutObjPolicy",
"Statement": [
{
"Sid": "s3-bucket-ssl-requests-only",
"Effect": "Deny",
"Principal": "*",
"Action": "s3:PutObject",
"Resource": "arn:aws:s3:::bucket_test_us/*",
"Condition": {
"Bool": {
"aws:SecureTransport": "false"
}
}
}
]
}
"""
s3_client_us_east_1.put_bucket_policy(
Bucket=bucket_name_us,
Policy=ssl_policy,
)
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.s3.s3_service import S3
current_audit_info.audited_partition = "aws"
with mock.patch(
"providers.aws.services.s3.s3_bucket_secure_transport_policy.s3_bucket_secure_transport_policy.s3_client",
new=S3(current_audit_info),
):
# Test Check
from providers.aws.services.s3.s3_bucket_secure_transport_policy.s3_bucket_secure_transport_policy import (
s3_bucket_secure_transport_policy,
)
check = s3_bucket_secure_transport_policy()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert search(
"bucket policy to deny requests over insecure transport",
result[0].status_extended,
)
assert result[0].resource_id == bucket_name_us
assert result[0].region == "us-east-1"
@mock_s3
def test_bucket_uncomply_policy(self):
s3_client_us_east_1 = client("s3", region_name="us-east-1")
bucket_name_us = "bucket_test_us"
s3_client_us_east_1.create_bucket(Bucket=bucket_name_us)
ssl_policy = """
{
"Version": "2012-10-17",
"Id": "PutObjPolicy",
"Statement": [
{
"Sid": "s3-bucket-ssl-requests-only",
"Effect": "Deny",
"Principal": "*",
"Action": "s3:GetObject",
"Resource": "arn:aws:s3:::bucket_test_us/*",
"Condition": {
"Bool": {
"aws:SecureTransport": "false"
}
}
}
]
}
"""
s3_client_us_east_1.put_bucket_policy(
Bucket=bucket_name_us,
Policy=ssl_policy,
)
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.s3.s3_service import S3
current_audit_info.audited_partition = "aws"
with mock.patch(
"providers.aws.services.s3.s3_bucket_secure_transport_policy.s3_bucket_secure_transport_policy.s3_client",
new=S3(current_audit_info),
):
# Test Check
from providers.aws.services.s3.s3_bucket_secure_transport_policy.s3_bucket_secure_transport_policy import (
s3_bucket_secure_transport_policy,
)
check = s3_bucket_secure_transport_policy()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert search(
"allows requests over insecure transport in the bucket policy",
result[0].status_extended,
)
assert result[0].resource_id == bucket_name_us
assert result[0].region == "us-east-1"

View File

@@ -13,10 +13,10 @@
"RelatedUrl": "",
"Remediation": {
"Code": {
"CLI": "",
"CLI": "https://docs.bridgecrew.io/docs/s3_13-enable-logging#cli-command",
"NativeIaC": "",
"Other": "",
"Terraform": ""
"Other": "https://docs.bridgecrew.io/docs/s3_13-enable-logging#aws-console",
"Terraform": "https://docs.bridgecrew.io/docs/s3_13-enable-logging#terraform"
},
"Recommendation": {
"Text": "Ensure that S3 buckets have Logging enabled. CloudTrail data events can be used in place of S3 bucket logging. If that is the case, this finding can be considered a false positive.",

View File

@@ -0,0 +1,125 @@
from re import search
from unittest import mock
from boto3 import client
from moto import mock_s3
ACCOUNT_ID = "123456789012"
class Test_s3_bucket_server_access_logging_enabled:
@mock_s3
def test_bucket_no_logging(self):
s3_client_us_east_1 = client("s3", region_name="us-east-1")
bucket_name_us = "bucket_test_us"
s3_client_us_east_1.create_bucket(Bucket=bucket_name_us)
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.s3.s3_service import S3
current_audit_info.audited_partition = "aws"
with mock.patch(
"providers.aws.services.s3.s3_bucket_server_access_logging_enabled.s3_bucket_server_access_logging_enabled.s3_client",
new=S3(current_audit_info),
):
# Test Check
from providers.aws.services.s3.s3_bucket_server_access_logging_enabled.s3_bucket_server_access_logging_enabled import (
s3_bucket_server_access_logging_enabled,
)
check = s3_bucket_server_access_logging_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert search(
"server access logging disabled",
result[0].status_extended,
)
assert result[0].resource_id == bucket_name_us
@mock_s3
def test_bucket_with_logging(self):
s3_client_us_east_1 = client("s3", region_name="us-east-1")
bucket_name_us = "bucket_test_us"
s3_client_us_east_1.create_bucket(Bucket=bucket_name_us)
bucket_owner = s3_client_us_east_1.get_bucket_acl(Bucket=bucket_name_us)[
"Owner"
]
s3_client_us_east_1.put_bucket_acl(
Bucket=bucket_name_us,
AccessControlPolicy={
"Grants": [
{
"Grantee": {
"URI": "http://acs.amazonaws.com/groups/s3/LogDelivery",
"Type": "Group",
},
"Permission": "WRITE",
},
{
"Grantee": {
"URI": "http://acs.amazonaws.com/groups/s3/LogDelivery",
"Type": "Group",
},
"Permission": "READ_ACP",
},
{
"Grantee": {"Type": "CanonicalUser", "ID": bucket_owner["ID"]},
"Permission": "FULL_CONTROL",
},
],
"Owner": bucket_owner,
},
)
s3_client_us_east_1.put_bucket_logging(
Bucket=bucket_name_us,
BucketLoggingStatus={
"LoggingEnabled": {
"TargetBucket": bucket_name_us,
"TargetPrefix": "{}/".format(bucket_name_us),
"TargetGrants": [
{
"Grantee": {
"ID": "SOMEIDSTRINGHERE9238748923734823917498237489237409123840983274",
"Type": "CanonicalUser",
},
"Permission": "READ",
},
{
"Grantee": {
"ID": "SOMEIDSTRINGHERE9238748923734823917498237489237409123840983274",
"Type": "CanonicalUser",
},
"Permission": "WRITE",
},
],
}
},
)
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.s3.s3_service import S3
current_audit_info.audited_partition = "aws"
with mock.patch(
"providers.aws.services.s3.s3_bucket_server_access_logging_enabled.s3_bucket_server_access_logging_enabled.s3_client",
new=S3(current_audit_info),
):
# Test Check
from providers.aws.services.s3.s3_bucket_server_access_logging_enabled.s3_bucket_server_access_logging_enabled import (
s3_bucket_server_access_logging_enabled,
)
check = s3_bucket_server_access_logging_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert search(
"server access logging enabled",
result[0].status_extended,
)
assert result[0].resource_id == bucket_name_us

View File

@@ -1,8 +1,12 @@
import json
import threading
from dataclasses import dataclass
from lib.logger import logger
from providers.aws.aws_provider import current_audit_info, generate_regional_clients
from providers.aws.aws_provider import (
generate_regional_clients,
get_region_global_service,
)
################## S3
@@ -13,10 +17,14 @@ class S3:
self.client = self.session.client(self.service)
self.audited_account = audit_info.audited_account
self.regional_clients = generate_regional_clients(self.service, audit_info)
self.buckets = self.__list_buckets__()
self.buckets = self.__list_buckets__(audit_info)
self.__threading_call__(self.__get_bucket_versioning__)
self.__threading_call__(self.__get_bucket_logging__)
self.__threading_call__(self.__get_bucket_policy__)
self.__threading_call__(self.__get_bucket_acl__)
self.__threading_call__(self.__get_public_access_block__)
self.__threading_call__(self.__get_bucket_encryption__)
self.__threading_call__(self.__get_bucket_ownership_controls__)
def __get_session__(self):
return self.session
@@ -30,27 +38,32 @@ class S3:
for t in threads:
t.join()
def __list_buckets__(self):
def __list_buckets__(self, audit_info):
logger.info("S3 - Listing buckets...")
try:
buckets = []
list_buckets = self.client.list_buckets()
for bucket in list_buckets["Buckets"]:
bucket_region = self.client.get_bucket_location(Bucket=bucket["Name"])[
"LocationConstraint"
]
if not bucket_region: # If us-east-1, bucket_region is none
bucket_region = "us-east-1"
# Check if there are filter regions
if current_audit_info.audited_regions:
if bucket_region in current_audit_info.audited_regions:
try:
bucket_region = self.client.get_bucket_location(
Bucket=bucket["Name"]
)["LocationConstraint"]
if not bucket_region: # If us-east-1, bucket_region is none
bucket_region = "us-east-1"
# Check if there are filter regions
if audit_info.audited_regions:
if bucket_region in audit_info.audited_regions:
buckets.append(Bucket(bucket["Name"], bucket_region))
else:
buckets.append(Bucket(bucket["Name"], bucket_region))
else:
buckets.append(Bucket(bucket["Name"], bucket_region))
except Exception as error:
logger.error(
f"{bucket_region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return buckets
except Exception as error:
logger.error(
f"{bucket_region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def __get_bucket_versioning__(self, bucket):
@@ -68,6 +81,22 @@ class S3:
f"{bucket.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def __get_bucket_encryption__(self, bucket):
logger.info("S3 - Get buckets encryption...")
try:
regional_client = self.regional_clients[bucket.region]
bucket.encryption = regional_client.get_bucket_encryption(
Bucket=bucket.name
)["ServerSideEncryptionConfiguration"]["Rules"][0][
"ApplyServerSideEncryptionByDefault"
][
"SSEAlgorithm"
]
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def __get_bucket_logging__(self, bucket):
logger.info("S3 - Get buckets logging...")
try:
@@ -83,6 +112,20 @@ class S3:
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def __get_public_access_block__(self, bucket):
logger.info("S3 - Get buckets public access block...")
try:
regional_client = self.regional_clients[bucket.region]
bucket.public_access_block = PublicAccessBlock(
regional_client.get_public_access_block(Bucket=bucket.name)[
"PublicAccessBlockConfiguration"
]
)
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def __get_bucket_acl__(self, bucket):
logger.info("S3 - Get buckets acl...")
try:
@@ -90,36 +133,112 @@ class S3:
regional_client = self.regional_clients[bucket.region]
acl_grants = regional_client.get_bucket_acl(Bucket=bucket.name)["Grants"]
for grant in acl_grants:
grantee = ACL_Grantee(grantee_type=grant["Grantee"])
grantee = ACL_Grantee(type=grant["Grantee"]["Type"])
if "DisplayName" in grant["Grantee"]:
grantee.display_name = grant["Grantee"]["DisplayName"]
if "Type" in grant["Grantee"]:
grantee.grantee_type = grant["Grantee"]["Type"]
if "ID" in grant["Grantee"]:
grantee.ID = grant["Grantee"]["ID"]
if "URI" in grant["Grantee"]:
grantee.URI = grant["Grantee"]["URI"]
if "Permission" in grant:
grantee.permission = grant["Permission"]
grantees.append(grantee)
bucket.acl_grantee = grantees
bucket.acl_grantees = grantees
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def __get_bucket_policy__(self, bucket):
logger.info("S3 - Get buckets policy...")
try:
regional_client = self.regional_clients[bucket.region]
bucket.policy = json.loads(
regional_client.get_bucket_policy(Bucket=bucket.name)["Policy"]
)
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def __get_bucket_ownership_controls__(self, bucket):
logger.info("S3 - Get buckets ownership controls...")
try:
regional_client = self.regional_clients[bucket.region]
bucket.ownership = regional_client.get_bucket_ownership_controls(
Bucket=bucket.name
)["OwnershipControls"]["Rules"][0]["ObjectOwnership"]
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
################## S3Control
class S3Control:
def __init__(self, audit_info):
self.service = "s3control"
self.session = audit_info.audit_session
self.client = self.session.client(self.service)
self.audited_account = audit_info.audited_account
self.region = get_region_global_service(audit_info)
self.account_public_access_block = self.__get_public_access_block__()
def __get_session__(self):
return self.session
def __get_public_access_block__(self):
logger.info("S3 - Get account public access block...")
try:
return PublicAccessBlock(
self.client.get_public_access_block(AccountId=self.audited_account)[
"PublicAccessBlockConfiguration"
]
)
except Exception as error:
if "NoSuchPublicAccessBlockConfiguration" in str(error):
# Set all block as False
return PublicAccessBlock(
{
"BlockPublicAcls": False,
"IgnorePublicAcls": False,
"BlockPublicPolicy": False,
"RestrictPublicBuckets": False,
}
)
logger.error(
f"{self.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
@dataclass
class ACL_Grantee:
display_name: str
ID: str
grantee_type: str
type: str
URI: str
permission: str
def __init__(self, grantee_type):
def __init__(self, type):
self.display_name = None
self.ID = None
self.grantee_type = grantee_type
self.type = type
self.URI = None
self.permission = None
@dataclass
class PublicAccessBlock:
block_public_acls: bool
ignore_public_acls: bool
block_public_policy: bool
restrict_public_buckets: bool
def __init__(self, configuration):
self.block_public_acls = configuration["BlockPublicAcls"]
self.ignore_public_acls = configuration["IgnorePublicAcls"]
self.block_public_policy = configuration["BlockPublicPolicy"]
self.restrict_public_buckets = configuration["RestrictPublicBuckets"]
@dataclass
@@ -127,14 +246,32 @@ class Bucket:
name: str
versioning: bool
logging: bool
public_access_block: PublicAccessBlock
acl_grantees: list[ACL_Grantee]
policy: dict
encryption: str
region: str
acl_grantee: list[ACL_Grantee]
logging_target_bucket: str
ownership: str
def __init__(self, name, region):
self.name = name
self.versioning = False
self.logging = False
# Set all block as False
self.public_access_block = PublicAccessBlock(
{
"BlockPublicAcls": False,
"IgnorePublicAcls": False,
"BlockPublicPolicy": False,
"RestrictPublicBuckets": False,
}
)
self.acl_grantees = []
self.policy = {}
self.encryption = None
self.region = region
self.acl_grantee = None
self.logging_target_bucket = None
self.ownership = None

View File

@@ -1,10 +1,13 @@
import json
from boto3 import client, session
from moto import mock_s3
from moto import mock_s3, mock_s3control
from providers.aws.lib.audit_info.models import AWS_Audit_Info
from providers.aws.services.s3.s3_service import S3
from providers.aws.services.s3.s3_service import S3, S3Control
AWS_ACCOUNT_NUMBER = 123456789012
AWS_ACCOUNT_NUMBER = "123456789012"
AWS_REGION = "us-east-1"
class Test_S3_Service:
@@ -15,6 +18,7 @@ class Test_S3_Service:
audit_session=session.Session(
profile_name=None,
botocore_session=None,
region_name=AWS_REGION,
),
audited_account=AWS_ACCOUNT_NUMBER,
audited_user_id=None,
@@ -53,14 +57,6 @@ class Test_S3_Service:
s3 = S3(audit_info)
assert s3.session.__class__.__name__ == "Session"
# Test S3 Regional Clients
# @mock_s3
# def test_regional_clients(self):
# # S3 client for this test class
# audit_info = self.set_mocked_audit_info()
# s3 = S3(audit_info)
# print(s3.regional_clients.keys())
# Test S3 Session
@mock_s3
def test_audited_account(self):
@@ -105,7 +101,7 @@ class Test_S3_Service:
assert s3.buckets[0].name == bucket_name
assert s3.buckets[0].versioning == True
# Test S3 Get Bucket Versioning
# Test S3 Get Bucket ACL
@mock_s3
def test__get_bucket_acl__(self):
s3_client = client("s3")
@@ -132,55 +128,213 @@ class Test_S3_Service:
s3 = S3(audit_info)
assert len(s3.buckets) == 1
assert s3.buckets[0].name == bucket_name
assert s3.buckets[0].acl_grantee[0].display_name == "test"
assert s3.buckets[0].acl_grantee[0].ID == "test_ID"
assert s3.buckets[0].acl_grantee[0].grantee_type == "Group"
assert s3.buckets[0].acl_grantees[0].display_name == "test"
assert s3.buckets[0].acl_grantees[0].ID == "test_ID"
assert s3.buckets[0].acl_grantees[0].type == "Group"
assert (
s3.buckets[0].acl_grantee[0].URI
s3.buckets[0].acl_grantees[0].URI
== "http://acs.amazonaws.com/groups/global/AllUsers"
)
# Test S3 Get Bucket Versioning
# @mock_s3
# def test__get_bucket_logging__(self):
# # Generate S3 Client
# s3_client = client("s3")
# # Create S3 Bucket
# bucket_name = "test-bucket"
# s3_client.create_bucket(
# Bucket=bucket_name,
# ACL='private'
# )
# # Set Bucket Logging
# s3_client.put_bucket_logging(
# Bucket=bucket_name,
# BucketLoggingStatus={
# 'LoggingEnabled': {
# 'TargetBucket': bucket_name,
# 'TargetGrants': [
# {
# 'Grantee': {
# 'Type': 'Group',
# 'URI': 'http://acs.amazonaws.com/groups/s3/LogDelivery'
# },
# 'Permission': 'READ_ACP'
# },
# {
# 'Grantee': {
# 'Type': 'Group',
# 'URI': 'http://acs.amazonaws.com/groups/s3/LogDelivery'
# },
# 'Permission': 'WRITE'
# }
# ],
# 'TargetPrefix': 'test-prefix'
# }
# }
# )
# # S3 client for this test class
# audit_info = self.set_mocked_audit_info()
# s3 = S3(audit_info)
# print(s3.buckets)
# assert len(s3.buckets) == 1
# assert s3.buckets[0].name == bucket_name
# assert s3.buckets[0].versioning == True
# Test S3 Get Bucket Logging
@mock_s3
def test__get_bucket_logging__(self):
# Generate S3 Client
s3_client = client("s3")
# Create S3 Bucket
bucket_name = "test-bucket"
s3_client.create_bucket(
Bucket=bucket_name,
)
bucket_owner = s3_client.get_bucket_acl(Bucket=bucket_name)["Owner"]
s3_client.put_bucket_acl(
Bucket=bucket_name,
AccessControlPolicy={
"Grants": [
{
"Grantee": {
"URI": "http://acs.amazonaws.com/groups/s3/LogDelivery",
"Type": "Group",
},
"Permission": "WRITE",
},
{
"Grantee": {
"URI": "http://acs.amazonaws.com/groups/s3/LogDelivery",
"Type": "Group",
},
"Permission": "READ_ACP",
},
{
"Grantee": {"Type": "CanonicalUser", "ID": bucket_owner["ID"]},
"Permission": "FULL_CONTROL",
},
],
"Owner": bucket_owner,
},
)
s3_client.put_bucket_logging(
Bucket=bucket_name,
BucketLoggingStatus={
"LoggingEnabled": {
"TargetBucket": bucket_name,
"TargetPrefix": "{}/".format(bucket_name),
"TargetGrants": [
{
"Grantee": {
"ID": "SOMEIDSTRINGHERE9238748923734823917498237489237409123840983274",
"Type": "CanonicalUser",
},
"Permission": "READ",
},
{
"Grantee": {
"ID": "SOMEIDSTRINGHERE9238748923734823917498237489237409123840983274",
"Type": "CanonicalUser",
},
"Permission": "WRITE",
},
],
}
},
)
# S3 client for this test class
audit_info = self.set_mocked_audit_info()
s3 = S3(audit_info)
assert len(s3.buckets) == 1
assert s3.buckets[0].name == bucket_name
assert s3.buckets[0].logging == True
# Test S3 Get Bucket Policy
@mock_s3
def test__get_bucket_policy__(self):
s3_client = client("s3")
bucket_name = "test-bucket"
s3_client.create_bucket(Bucket=bucket_name)
ssl_policy = '{"Version": "2012-10-17","Id": "PutObjPolicy","Statement": [{"Sid": "s3-bucket-ssl-requests-only","Effect": "Deny","Principal": "*","Action": "s3:GetObject","Resource": "arn:aws:s3:::bucket_test_us/*","Condition": {"Bool": {"aws:SecureTransport": "false"}}}]}'
s3_client.put_bucket_policy(
Bucket=bucket_name,
Policy=ssl_policy,
)
audit_info = self.set_mocked_audit_info()
s3 = S3(audit_info)
assert len(s3.buckets) == 1
assert s3.buckets[0].name == bucket_name
assert s3.buckets[0].policy == json.loads(ssl_policy)
# Test S3 Get Bucket Encryption
@mock_s3
def test__get_bucket_encryption__(self):
# Generate S3 Client
s3_client = client("s3")
# Create S3 Bucket
bucket_name = "test-bucket"
s3_client.create_bucket(Bucket=bucket_name)
sse_config = {
"Rules": [
{
"ApplyServerSideEncryptionByDefault": {
"SSEAlgorithm": "aws:kms",
"KMSMasterKeyID": "12345678",
}
}
]
}
s3_client.put_bucket_encryption(
Bucket=bucket_name, ServerSideEncryptionConfiguration=sse_config
)
# S3 client for this test class
audit_info = self.set_mocked_audit_info()
s3 = S3(audit_info)
assert len(s3.buckets) == 1
assert s3.buckets[0].name == bucket_name
assert s3.buckets[0].encryption == "aws:kms"
# Test S3 Get Bucket Ownership Controls
@mock_s3
def test__get_bucket_ownership_controls__(self):
# Generate S3 Client
s3_client = client("s3")
# Create S3 Bucket
bucket_name = "test-bucket"
s3_client.create_bucket(
Bucket=bucket_name, ObjectOwnership="BucketOwnerEnforced"
)
# S3 client for this test class
audit_info = self.set_mocked_audit_info()
s3 = S3(audit_info)
assert len(s3.buckets) == 1
assert s3.buckets[0].name == bucket_name
assert s3.buckets[0].ownership == "BucketOwnerEnforced"
# Test S3 Get Bucket Ownership Controls
@mock_s3
def test__get_bucket_ownership_controls__(self):
# Generate S3 Client
s3_client = client("s3")
# Create S3 Bucket
bucket_name = "test-bucket"
s3_client.create_bucket(
Bucket=bucket_name, ObjectOwnership="BucketOwnerEnforced"
)
# S3 client for this test class
audit_info = self.set_mocked_audit_info()
s3 = S3(audit_info)
assert len(s3.buckets) == 1
assert s3.buckets[0].name == bucket_name
assert s3.buckets[0].ownership == "BucketOwnerEnforced"
# Test S3 Get Public Access Block
@mock_s3
def test__get_public_access_block__(self):
# Generate S3 Client
s3_client = client("s3")
# Create S3 Bucket
bucket_name = "test-bucket"
s3_client.create_bucket(
Bucket=bucket_name, ObjectOwnership="BucketOwnerEnforced"
)
s3_client.put_public_access_block(
Bucket=bucket_name,
PublicAccessBlockConfiguration={
"BlockPublicAcls": True,
"IgnorePublicAcls": True,
"BlockPublicPolicy": True,
"RestrictPublicBuckets": True,
},
)
# S3 client for this test class
audit_info = self.set_mocked_audit_info()
s3 = S3(audit_info)
assert len(s3.buckets) == 1
assert s3.buckets[0].name == bucket_name
assert s3.buckets[0].public_access_block.block_public_acls
assert s3.buckets[0].public_access_block.ignore_public_acls
assert s3.buckets[0].public_access_block.block_public_policy
assert s3.buckets[0].public_access_block.restrict_public_buckets
# Test S3 Control Account Get Public Access Block
@mock_s3control
def test__get_public_access_block__(self):
# Generate S3Control Client
s3control_client = client("s3control", region_name=AWS_REGION)
s3control_client.put_public_access_block(
AccountId=AWS_ACCOUNT_NUMBER,
PublicAccessBlockConfiguration={
"BlockPublicAcls": True,
"IgnorePublicAcls": True,
"BlockPublicPolicy": True,
"RestrictPublicBuckets": True,
},
)
# S3 client for this test class
audit_info = self.set_mocked_audit_info()
s3control = S3Control(audit_info)
assert s3control.account_public_access_block.block_public_acls
assert s3control.account_public_access_block.ignore_public_acls
assert s3control.account_public_access_block.block_public_policy
assert s3control.account_public_access_block.restrict_public_buckets

View File

@@ -0,0 +1,4 @@
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.s3.s3_service import S3Control
s3control_client = S3Control(current_audit_info)