feat(CIS checks): Complete CIS checks (#1461)

Co-authored-by: sergargar <sergio@verica.io>
Co-authored-by: Nacho Rivera <59198746+n4ch04@users.noreply.github.com>
Co-authored-by: Pepe Fagoaga <pepe@verica.io>
This commit is contained in:
Sergio Garcia
2022-11-14 17:50:26 +01:00
committed by GitHub
parent 6497f7bfe8
commit 8c8763a620
57 changed files with 1817 additions and 222 deletions

View File

@@ -1,7 +1,7 @@
{
"Provider": "aws",
"CheckID": "cloudtrail_s3_dataevents_enabled",
"CheckTitle": "Check if S3 buckets have Object-level logging enabled in CloudTrail.",
"CheckID": "cloudtrail_s3_dataevents_read_enabled",
"CheckTitle": "Check if S3 buckets have Object-level logging for read events is enabled in CloudTrail.",
"CheckType": ["Logging and Monitoring"],
"ServiceName": "s3",
"SubServiceName": "",
@@ -13,7 +13,7 @@
"RelatedUrl": "",
"Remediation": {
"Code": {
"CLI": "aws cloudtrail put-event-selectors --trail-name <YOUR_TRAIL_NAME_HERE> --event-selectors '[{ 'ReadWriteType': 'All', 'IncludeManagementEvents':true, 'DataResources': [{ 'Type': 'AWS::S3::Object', 'Values': ['arn:aws:s3'] }] }]'",
"CLI": "aws cloudtrail put-event-selectors --trail-name <YOUR_TRAIL_NAME_HERE> --event-selectors '[{ 'ReadWriteType': 'ReadOnly', 'IncludeManagementEvents':true, 'DataResources': [{ 'Type': 'AWS::S3::Object', 'Values': ['arn:aws:s3'] }] }]'",
"NativeIaC": "",
"Other": "",
"Terraform": ""

View File

@@ -2,7 +2,7 @@ from lib.check.models import Check, Check_Report
from providers.aws.services.cloudtrail.cloudtrail_client import cloudtrail_client
class cloudtrail_s3_dataevents_enabled(Check):
class cloudtrail_s3_dataevents_read_enabled(Check):
def execute(self):
findings = []
report = Check_Report(self.metadata)
@@ -13,8 +13,11 @@ class cloudtrail_s3_dataevents_enabled(Check):
report.status_extended = "No CloudTrail trails have a data event to record all S3 object-level API operations."
for trail in cloudtrail_client.trails:
for data_event in trail.data_events:
# Check if trail has a data event for all S3 Buckets for write/read
if data_event["ReadWriteType"] == "All":
# Check if trail has a data event for all S3 Buckets for read
if (
data_event["ReadWriteType"] == "ReadOnly"
or data_event["ReadWriteType"] == "All"
):
for resource in data_event["DataResources"]:
if "AWS::S3::Object" == resource["Type"] and (
"arn:aws:s3" in resource["Values"]

View File

@@ -5,7 +5,7 @@ from boto3 import client
from moto import mock_cloudtrail, mock_s3
class Test_cloudtrail_s3_dataevents_enabled:
class Test_cloudtrail_s3_dataevents_read_enabled:
@mock_cloudtrail
@mock_s3
def test_trail_without_data_events(self):
@@ -24,15 +24,15 @@ class Test_cloudtrail_s3_dataevents_enabled:
current_audit_info.audited_partition = "aws"
with mock.patch(
"providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_enabled.cloudtrail_s3_dataevents_enabled.cloudtrail_client",
"providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_read_enabled.cloudtrail_s3_dataevents_read_enabled.cloudtrail_client",
new=Cloudtrail(current_audit_info),
):
# Test Check
from providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_enabled.cloudtrail_s3_dataevents_enabled import (
cloudtrail_s3_dataevents_enabled,
from providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_read_enabled.cloudtrail_s3_dataevents_read_enabled import (
cloudtrail_s3_dataevents_read_enabled,
)
check = cloudtrail_s3_dataevents_enabled()
check = cloudtrail_s3_dataevents_read_enabled()
result = check.execute()
assert len(result) == 1
@@ -73,15 +73,15 @@ class Test_cloudtrail_s3_dataevents_enabled:
current_audit_info.audited_partition = "aws"
with mock.patch(
"providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_enabled.cloudtrail_s3_dataevents_enabled.cloudtrail_client",
"providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_read_enabled.cloudtrail_s3_dataevents_read_enabled.cloudtrail_client",
new=Cloudtrail(current_audit_info),
):
# Test Check
from providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_enabled.cloudtrail_s3_dataevents_enabled import (
cloudtrail_s3_dataevents_enabled,
from providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_read_enabled.cloudtrail_s3_dataevents_read_enabled import (
cloudtrail_s3_dataevents_read_enabled,
)
check = cloudtrail_s3_dataevents_enabled()
check = cloudtrail_s3_dataevents_read_enabled()
result = check.execute()
assert len(result) == 1
@@ -122,15 +122,15 @@ class Test_cloudtrail_s3_dataevents_enabled:
current_audit_info.audited_partition = "aws"
with mock.patch(
"providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_enabled.cloudtrail_s3_dataevents_enabled.cloudtrail_client",
"providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_read_enabled.cloudtrail_s3_dataevents_read_enabled.cloudtrail_client",
new=Cloudtrail(current_audit_info),
):
# Test Check
from providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_enabled.cloudtrail_s3_dataevents_enabled import (
cloudtrail_s3_dataevents_enabled,
from providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_read_enabled.cloudtrail_s3_dataevents_read_enabled import (
cloudtrail_s3_dataevents_read_enabled,
)
check = cloudtrail_s3_dataevents_enabled()
check = cloudtrail_s3_dataevents_read_enabled()
result = check.execute()
assert len(result) == 1

View File

@@ -0,0 +1,35 @@
{
"Provider": "aws",
"CheckID": "cloudtrail_s3_dataevents_write_enabled",
"CheckTitle": "Check if S3 buckets have Object-level logging for write events is enabled in CloudTrail.",
"CheckType": ["Logging and Monitoring"],
"ServiceName": "s3",
"SubServiceName": "",
"ResourceIdTemplate": "arn:partition:service:region:account-id:resource-id",
"Severity": "low",
"ResourceType": "AwsS3Bucket",
"Description": "Ensure that all your AWS CloudTrail trails are configured to log Data events in order to record S3 object-level API operations, such as GetObject, DeleteObject and PutObject, for individual S3 buckets or for all current and future S3 buckets provisioned in your AWS account.",
"Risk": "If logs are not enabled, monitoring of service use and threat analysis is not possible.",
"RelatedUrl": "",
"Remediation": {
"Code": {
"CLI": "aws cloudtrail put-event-selectors --trail-name <YOUR_TRAIL_NAME_HERE> --event-selectors '[{ 'ReadWriteType': 'WriteOnly', 'IncludeManagementEvents':true, 'DataResources': [{ 'Type': 'AWS::S3::Object', 'Values': ['arn:aws:s3'] }] }]'",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Enable logs. Create an S3 lifecycle policy. Define use cases, metrics and automated responses where applicable.",
"Url": "https://docs.aws.amazon.com/AmazonS3/latest/userguide/enable-cloudtrail-logging-for-s3.html"
}
},
"Categories": [],
"Tags": {
"Tag1Key": "value",
"Tag2Key": "value"
},
"DependsOn": [],
"RelatedTo": [],
"Notes": "",
"Compliance": []
}

View File

@@ -0,0 +1,33 @@
from lib.check.models import Check, Check_Report
from providers.aws.services.cloudtrail.cloudtrail_client import cloudtrail_client
class cloudtrail_s3_dataevents_write_enabled(Check):
def execute(self):
findings = []
report = Check_Report(self.metadata)
report.region = cloudtrail_client.region
report.resource_id = "No trails"
report.resource_arn = "No trails"
report.status = "FAIL"
report.status_extended = "No CloudTrail trails have a data event to record all S3 object-level API operations."
for trail in cloudtrail_client.trails:
for data_event in trail.data_events:
# Check if trail has a data event for all S3 Buckets for write
if (
data_event["ReadWriteType"] == "All"
or data_event["ReadWriteType"] == "WriteOnly"
):
for resource in data_event["DataResources"]:
if "AWS::S3::Object" == resource["Type"] and (
"arn:aws:s3" in resource["Values"]
or "arn:aws:s3:::*/*" in resource["Values"]
):
report.region = trail.region
report.resource_id = trail.name
report.resource_arn = trail.arn
report.status = "PASS"
report.status_extended = f"Trail {trail.name} have a data event to record all S3 object-level API operations."
findings.append(report)
return findings

View File

@@ -0,0 +1,143 @@
from re import search
from unittest import mock
from boto3 import client
from moto import mock_cloudtrail, mock_s3
class Test_cloudtrail_s3_dataevents_write_enabled:
@mock_cloudtrail
@mock_s3
def test_trail_without_data_events(self):
cloudtrail_client_us_east_1 = client("cloudtrail", region_name="us-east-1")
s3_client_us_east_1 = client("s3", region_name="us-east-1")
trail_name_us = "trail_test_us"
bucket_name_us = "bucket_test_us"
s3_client_us_east_1.create_bucket(Bucket=bucket_name_us)
cloudtrail_client_us_east_1.create_trail(
Name=trail_name_us, S3BucketName=bucket_name_us, IsMultiRegionTrail=False
)
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.cloudtrail.cloudtrail_service import Cloudtrail
current_audit_info.audited_partition = "aws"
with mock.patch(
"providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_write_enabled.cloudtrail_s3_dataevents_write_enabled.cloudtrail_client",
new=Cloudtrail(current_audit_info),
):
# Test Check
from providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_write_enabled.cloudtrail_s3_dataevents_write_enabled import (
cloudtrail_s3_dataevents_write_enabled,
)
check = cloudtrail_s3_dataevents_write_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert search(
"No CloudTrail trails have a data event to record all S3 object-level API operations.",
result[0].status_extended,
)
assert result[0].resource_id == "No trails"
assert result[0].resource_arn == "No trails"
@mock_cloudtrail
@mock_s3
def test_trail_without_s3_data_events(self):
cloudtrail_client_us_east_1 = client("cloudtrail", region_name="us-east-1")
s3_client_us_east_1 = client("s3", region_name="us-east-1")
trail_name_us = "trail_test_us"
bucket_name_us = "bucket_test_us"
s3_client_us_east_1.create_bucket(Bucket=bucket_name_us)
cloudtrail_client_us_east_1.create_trail(
Name=trail_name_us, S3BucketName=bucket_name_us, IsMultiRegionTrail=False
)
_ = cloudtrail_client_us_east_1.put_event_selectors(
TrailName=trail_name_us,
EventSelectors=[
{
"ReadWriteType": "All",
"IncludeManagementEvents": True,
"DataResources": [
{"Type": "AWS::Lambda::Function", "Values": ["arn:aws:lambda"]}
],
}
],
)["EventSelectors"]
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.cloudtrail.cloudtrail_service import Cloudtrail
current_audit_info.audited_partition = "aws"
with mock.patch(
"providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_write_enabled.cloudtrail_s3_dataevents_write_enabled.cloudtrail_client",
new=Cloudtrail(current_audit_info),
):
# Test Check
from providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_write_enabled.cloudtrail_s3_dataevents_write_enabled import (
cloudtrail_s3_dataevents_write_enabled,
)
check = cloudtrail_s3_dataevents_write_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert search(
"No CloudTrail trails have a data event to record all S3 object-level API operations.",
result[0].status_extended,
)
assert result[0].resource_id == "No trails"
assert result[0].resource_arn == "No trails"
@mock_cloudtrail
@mock_s3
def test_trail_with_s3_data_events(self):
cloudtrail_client_us_east_1 = client("cloudtrail", region_name="us-east-1")
s3_client_us_east_1 = client("s3", region_name="us-east-1")
trail_name_us = "trail_test_us"
bucket_name_us = "bucket_test_us"
s3_client_us_east_1.create_bucket(Bucket=bucket_name_us)
trail_us = cloudtrail_client_us_east_1.create_trail(
Name=trail_name_us, S3BucketName=bucket_name_us, IsMultiRegionTrail=False
)
_ = cloudtrail_client_us_east_1.put_event_selectors(
TrailName=trail_name_us,
EventSelectors=[
{
"ReadWriteType": "All",
"IncludeManagementEvents": True,
"DataResources": [
{"Type": "AWS::S3::Object", "Values": ["arn:aws:s3:::*/*"]}
],
}
],
)["EventSelectors"]
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.cloudtrail.cloudtrail_service import Cloudtrail
current_audit_info.audited_partition = "aws"
with mock.patch(
"providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_write_enabled.cloudtrail_s3_dataevents_write_enabled.cloudtrail_client",
new=Cloudtrail(current_audit_info),
):
# Test Check
from providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_write_enabled.cloudtrail_s3_dataevents_write_enabled import (
cloudtrail_s3_dataevents_write_enabled,
)
check = cloudtrail_s3_dataevents_write_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert search(
"have a data event to record all S3 object-level API operations.",
result[0].status_extended,
)
assert result[0].resource_id == trail_name_us
assert result[0].resource_arn == trail_us["TrailARN"]

View File

@@ -0,0 +1,35 @@
{
"Provider": "aws",
"CheckID": "cloudwatch_log_metric_filter_aws_organizations_changes",
"CheckTitle": "Ensure a log metric filter and alarm exist for AWS Organizations changes.",
"CheckType": ["Software and Configuration Checks/Industry and Regulatory Standards/CIS AWS Foundations Benchmark"],
"ServiceName": "cloudwatch",
"SubServiceName": "",
"ResourceIdTemplate": "arn:partition:cloudwatch:region:account-id:certificate/resource-id",
"Severity": "medium",
"ResourceType": "AwsCloudTrailTrail",
"Description": "Ensure a log metric filter and alarm exist for AWS Organizations changes.",
"Risk": "Monitoring unauthorized API calls will help reveal application errors and may reduce time to detect malicious activity.",
"RelatedUrl": "https://docs.aws.amazon.com/awscloudtrail/latest/userguide/cloudwatch-alarms-for-cloudtrail.html",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "https://www.trendmicro.com/cloudoneconformity/knowledge-base/aws/CloudWatchLogs/organizations-changes-alarm.html",
"Terraform": ""
},
"Recommendation": {
"Text": "It is recommended that a metric filter and alarm be established for unauthorized requests.",
"Url": "https://docs.aws.amazon.com/awscloudtrail/latest/userguide/cloudwatch-alarms-for-cloudtrail.html"
}
},
"Categories": [],
"Tags": {
"Tag1Key": "value",
"Tag2Key": "value"
},
"DependsOn": [],
"RelatedTo": [],
"Notes": "Logging and Monitoring",
"Compliance": []
}

View File

@@ -0,0 +1,41 @@
import re
from lib.check.models import Check, Check_Report
from providers.aws.services.cloudtrail.cloudtrail_client import cloudtrail_client
from providers.aws.services.cloudwatch.cloudwatch_client import cloudwatch_client
from providers.aws.services.cloudwatch.logs_client import logs_client
class cloudwatch_log_metric_filter_aws_organizations_changes(Check):
def execute(self):
pattern = r"\$\.eventSource\s*=\s*organizations\.amazonaws\.com.+\$\.eventName\s*=\s*AcceptHandshake.+\$\.eventName\s*=\s*AttachPolicy.+\$\.eventName\s*=\s*CancelHandshake.+\$\.eventName\s*=\s*CreateAccount.+\$\.eventName\s*=\s*CreateOrganization.+\$\.eventName\s*=\s*CreateOrganizationalUnit.+\$\.eventName\s*=\s*CreatePolicy.+\$\.eventName\s*=\s*DeclineHandshake.+\$\.eventName\s*=\s*DeleteOrganization.+\$\.eventName\s*=\s*DeleteOrganizationalUnit.+\$\.eventName\s*=\s*DeletePolicy.+\$\.eventName\s*=\s*EnableAllFeatures.+\$\.eventName\s*=\s*EnablePolicyType.+\$\.eventName\s*=\s*InviteAccountToOrganization.+\$\.eventName\s*=\s*LeaveOrganization.+\$\.eventName\s*=\s*DetachPolicy.+\$\.eventName\s*=\s*DisablePolicyType.+\$\.eventName\s*=\s*MoveAccount.+\$\.eventName\s*=\s*RemoveAccountFromOrganization.+\$\.eventName\s*=\s*UpdateOrganizationalUnit.+\$\.eventName\s*=\s*UpdatePolicy"
findings = []
report = Check_Report(self.metadata)
report.status = "FAIL"
report.status_extended = (
"No CloudWatch log groups found with metric filters or alarms associated."
)
report.region = "us-east-1"
report.resource_id = ""
# 1. Iterate for CloudWatch Log Group in CloudTrail trails
log_groups = []
for trail in cloudtrail_client.trails:
if trail.log_group_arn:
log_groups.append(trail.log_group_arn.split(":")[6])
# 2. Describe metric filters for previous log groups
for metric_filter in logs_client.metric_filters:
if metric_filter.log_group in log_groups:
if re.search(pattern, metric_filter.pattern):
report.resource_id = metric_filter.log_group
report.region = metric_filter.region
report.status = "FAIL"
report.status_extended = f"CloudWatch log group {metric_filter.log_group} found with metric filter {metric_filter.name} but no alarms associated."
# 3. Check if there is an alarm for the metric
for alarm in cloudwatch_client.metric_alarms:
if alarm.metric == metric_filter.metric:
report.status = "PASS"
report.status_extended = f"CloudWatch log group {metric_filter.log_group} found with metric filter {metric_filter.name} and alarms set."
break
findings.append(report)
return findings

View File

@@ -0,0 +1,282 @@
from unittest import mock
from boto3 import client
from moto import mock_cloudtrail, mock_cloudwatch, mock_logs, mock_s3
from moto.core import DEFAULT_ACCOUNT_ID
AWS_REGION = "us-east-1"
class Test_cloudwatch_log_metric_filter_aws_organizations_changes:
@mock_logs
@mock_cloudtrail
@mock_cloudwatch
def test_cloudwatch_no_log_groups(self):
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.cloudwatch.cloudwatch_service import (
CloudWatch,
Logs,
)
current_audit_info.audited_partition = "aws"
from providers.aws.services.cloudtrail.cloudtrail_client import Cloudtrail
with mock.patch(
"providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_aws_organizations_changes.cloudwatch_log_metric_filter_aws_organizations_changes.logs_client",
new=Logs(current_audit_info),
), mock.patch(
"providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_aws_organizations_changes.cloudwatch_log_metric_filter_aws_organizations_changes.cloudwatch_client",
new=CloudWatch(current_audit_info),
), mock.patch(
"providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_aws_organizations_changes.cloudwatch_log_metric_filter_aws_organizations_changes.cloudtrail_client",
new=Cloudtrail(current_audit_info),
):
# Test Check
from providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_aws_organizations_changes.cloudwatch_log_metric_filter_aws_organizations_changes import (
cloudwatch_log_metric_filter_aws_organizations_changes,
)
check = cloudwatch_log_metric_filter_aws_organizations_changes()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== "No CloudWatch log groups found with metric filters or alarms associated."
)
assert result[0].resource_id == ""
@mock_logs
@mock_cloudtrail
@mock_cloudwatch
@mock_s3
def test_cloudwatch_trail_no_log_group(self):
cloudtrail_client = client("cloudtrail", region_name=AWS_REGION)
s3_client = client("s3", region_name=AWS_REGION)
s3_client.create_bucket(Bucket="test")
cloudtrail_client.create_trail(Name="test_trail", S3BucketName="test")
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.cloudwatch.cloudwatch_service import (
CloudWatch,
Logs,
)
current_audit_info.audited_partition = "aws"
from providers.aws.services.cloudtrail.cloudtrail_client import Cloudtrail
with mock.patch(
"providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_aws_organizations_changes.cloudwatch_log_metric_filter_aws_organizations_changes.logs_client",
new=Logs(current_audit_info),
), mock.patch(
"providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_aws_organizations_changes.cloudwatch_log_metric_filter_aws_organizations_changes.cloudwatch_client",
new=CloudWatch(current_audit_info),
), mock.patch(
"providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_aws_organizations_changes.cloudwatch_log_metric_filter_aws_organizations_changes.cloudtrail_client",
new=Cloudtrail(current_audit_info),
):
# Test Check
from providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_aws_organizations_changes.cloudwatch_log_metric_filter_aws_organizations_changes import (
cloudwatch_log_metric_filter_aws_organizations_changes,
)
check = cloudwatch_log_metric_filter_aws_organizations_changes()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== "No CloudWatch log groups found with metric filters or alarms associated."
)
assert result[0].resource_id == ""
@mock_logs
@mock_cloudtrail
@mock_cloudwatch
@mock_s3
def test_cloudwatch_trail_with_log_group(self):
cloudtrail_client = client("cloudtrail", region_name=AWS_REGION)
logs_client = client("logs", region_name=AWS_REGION)
s3_client = client("s3", region_name=AWS_REGION)
s3_client.create_bucket(Bucket="test")
logs_client.create_log_group(logGroupName="/log-group/test")
cloudtrail_client.create_trail(
Name="test_trail",
S3BucketName="test",
CloudWatchLogsLogGroupArn=f"arn:aws:logs:{AWS_REGION}:{DEFAULT_ACCOUNT_ID}:log-group:/log-group/test:*",
)
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.cloudwatch.cloudwatch_service import (
CloudWatch,
Logs,
)
current_audit_info.audited_partition = "aws"
from providers.aws.services.cloudtrail.cloudtrail_client import Cloudtrail
with mock.patch(
"providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_aws_organizations_changes.cloudwatch_log_metric_filter_aws_organizations_changes.logs_client",
new=Logs(current_audit_info),
), mock.patch(
"providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_aws_organizations_changes.cloudwatch_log_metric_filter_aws_organizations_changes.cloudwatch_client",
new=CloudWatch(current_audit_info),
), mock.patch(
"providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_aws_organizations_changes.cloudwatch_log_metric_filter_aws_organizations_changes.cloudtrail_client",
new=Cloudtrail(current_audit_info),
):
# Test Check
from providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_aws_organizations_changes.cloudwatch_log_metric_filter_aws_organizations_changes import (
cloudwatch_log_metric_filter_aws_organizations_changes,
)
check = cloudwatch_log_metric_filter_aws_organizations_changes()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== "No CloudWatch log groups found with metric filters or alarms associated."
)
assert result[0].resource_id == ""
@mock_logs
@mock_cloudtrail
@mock_cloudwatch
@mock_s3
def test_cloudwatch_trail_with_log_group_with_metric(self):
cloudtrail_client = client("cloudtrail", region_name=AWS_REGION)
logs_client = client("logs", region_name=AWS_REGION)
s3_client = client("s3", region_name=AWS_REGION)
s3_client.create_bucket(Bucket="test")
logs_client.create_log_group(logGroupName="/log-group/test")
cloudtrail_client.create_trail(
Name="test_trail",
S3BucketName="test",
CloudWatchLogsLogGroupArn=f"arn:aws:logs:{AWS_REGION}:{DEFAULT_ACCOUNT_ID}:log-group:/log-group/test:*",
)
logs_client.put_metric_filter(
logGroupName="/log-group/test",
filterName="test-filter",
filterPattern="{ ($.eventSource = organizations.amazonaws.com) && ($.eventName = AcceptHandshake) || ($.eventName = AttachPolicy) || ($.eventName = CancelHandshake) || ($.eventName = CreateAccount) || ($.eventName = CreateOrganization) || ($.eventName = CreateOrganizationalUnit) || ($.eventName = CreatePolicy) || ($.eventName = DeclineHandshake) || ($.eventName = DeleteOrganization) || ($.eventName = DeleteOrganizationalUnit) || ($.eventName = DeletePolicy) || ($.eventName = EnableAllFeatures) || ($.eventName = EnablePolicyType) || ($.eventName = InviteAccountToOrganization) || ($.eventName = LeaveOrganization) || ($.eventName = DetachPolicy) || ($.eventName = DisablePolicyType) || ($.eventName = MoveAccount) || ($.eventName = RemoveAccountFromOrganization) || ($.eventName = UpdateOrganizationalUnit) || ($.eventName = UpdatePolicy) }",
metricTransformations=[
{
"metricName": "my-metric",
"metricNamespace": "my-namespace",
"metricValue": "$.value",
}
],
)
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.cloudwatch.cloudwatch_service import (
CloudWatch,
Logs,
)
current_audit_info.audited_partition = "aws"
from providers.aws.services.cloudtrail.cloudtrail_client import Cloudtrail
with mock.patch(
"providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_aws_organizations_changes.cloudwatch_log_metric_filter_aws_organizations_changes.logs_client",
new=Logs(current_audit_info),
), mock.patch(
"providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_aws_organizations_changes.cloudwatch_log_metric_filter_aws_organizations_changes.cloudwatch_client",
new=CloudWatch(current_audit_info),
), mock.patch(
"providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_aws_organizations_changes.cloudwatch_log_metric_filter_aws_organizations_changes.cloudtrail_client",
new=Cloudtrail(current_audit_info),
):
# Test Check
from providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_aws_organizations_changes.cloudwatch_log_metric_filter_aws_organizations_changes import (
cloudwatch_log_metric_filter_aws_organizations_changes,
)
check = cloudwatch_log_metric_filter_aws_organizations_changes()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== "CloudWatch log group /log-group/test found with metric filter test-filter but no alarms associated."
)
assert result[0].resource_id == "/log-group/test"
@mock_logs
@mock_cloudtrail
@mock_cloudwatch
@mock_s3
def test_cloudwatch_trail_with_log_group_with_metric_and_alarm(self):
cloudtrail_client = client("cloudtrail", region_name=AWS_REGION)
cloudwatch_client = client("cloudwatch", region_name=AWS_REGION)
logs_client = client("logs", region_name=AWS_REGION)
s3_client = client("s3", region_name=AWS_REGION)
s3_client.create_bucket(Bucket="test")
logs_client.create_log_group(logGroupName="/log-group/test")
cloudtrail_client.create_trail(
Name="test_trail",
S3BucketName="test",
CloudWatchLogsLogGroupArn=f"arn:aws:logs:{AWS_REGION}:{DEFAULT_ACCOUNT_ID}:log-group:/log-group/test:*",
)
logs_client.put_metric_filter(
logGroupName="/log-group/test",
filterName="test-filter",
filterPattern="{ ($.eventSource = organizations.amazonaws.com) && ($.eventName = AcceptHandshake) || ($.eventName = AttachPolicy) || ($.eventName = CancelHandshake) || ($.eventName = CreateAccount) || ($.eventName = CreateOrganization) || ($.eventName = CreateOrganizationalUnit) || ($.eventName = CreatePolicy) || ($.eventName = DeclineHandshake) || ($.eventName = DeleteOrganization) || ($.eventName = DeleteOrganizationalUnit) || ($.eventName = DeletePolicy) || ($.eventName = EnableAllFeatures) || ($.eventName = EnablePolicyType) || ($.eventName = InviteAccountToOrganization) || ($.eventName = LeaveOrganization) || ($.eventName = DetachPolicy) || ($.eventName = DisablePolicyType) || ($.eventName = MoveAccount) || ($.eventName = RemoveAccountFromOrganization) || ($.eventName = UpdateOrganizationalUnit) || ($.eventName = UpdatePolicy) }",
metricTransformations=[
{
"metricName": "my-metric",
"metricNamespace": "my-namespace",
"metricValue": "$.value",
}
],
)
cloudwatch_client.put_metric_alarm(
AlarmName="test-alarm",
MetricName="my-metric",
Namespace="my-namespace",
Period=10,
EvaluationPeriods=5,
Statistic="Average",
Threshold=2,
ComparisonOperator="GreaterThanThreshold",
ActionsEnabled=True,
)
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.cloudwatch.cloudwatch_service import (
CloudWatch,
Logs,
)
current_audit_info.audited_partition = "aws"
from providers.aws.services.cloudtrail.cloudtrail_client import Cloudtrail
with mock.patch(
"providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_aws_organizations_changes.cloudwatch_log_metric_filter_aws_organizations_changes.logs_client",
new=Logs(current_audit_info),
), mock.patch(
"providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_aws_organizations_changes.cloudwatch_log_metric_filter_aws_organizations_changes.cloudwatch_client",
new=CloudWatch(current_audit_info),
), mock.patch(
"providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_aws_organizations_changes.cloudwatch_log_metric_filter_aws_organizations_changes.cloudtrail_client",
new=Cloudtrail(current_audit_info),
):
# Test Check
from providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_aws_organizations_changes.cloudwatch_log_metric_filter_aws_organizations_changes import (
cloudwatch_log_metric_filter_aws_organizations_changes,
)
check = cloudwatch_log_metric_filter_aws_organizations_changes()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "CloudWatch log group /log-group/test found with metric filter test-filter and alarms set."
)
assert result[0].resource_id == "/log-group/test"

View File

@@ -1,42 +0,0 @@
#!/usr/bin/env bash
# Prowler - the handy cloud security tool (copyright 2019) by Toni de la Fuente
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed
# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
CHECK_ID_extra7138="7.138"
CHECK_TITLE_extra7138="[extra7138] Ensure no Network ACLs allow ingress from 0.0.0.0/0 to any port"
CHECK_SCORED_extra7138="NOT SCORED"
CHECK_CIS_LEVEL_extra7138="LEVEL2"
CHECK_SEVERITY_extra7138="High"
CHECK_ASFF_TYPE_extra7138="Software and Configuration Checks/Industry and Regulatory Standards/CIS AWS Foundations Benchmark"
CHECK_ASFF_RESOURCE_TYPE_extra7138="AwsEc2NetworkAcl"
CHECK_ALTERNATE_check7138="extra7138"
CHECK_SERVICENAME_extra7138="ec2"
CHECK_RISK_extra7138='Even having a perimeter firewall; having network acls open allows any user or malware with vpc access to scan for well known and sensitive ports and gain access to instance.'
CHECK_REMEDIATION_extra7138='Apply Zero Trust approach. Implement a process to scan and remediate unrestricted or overly permissive network acls. Recommended best practices is to narrow the definition for the minimum ports required.'
CHECK_DOC_extra7138='https://docs.aws.amazon.com/vpc/latest/userguide/vpc-network-acls.html'
CHECK_CAF_EPIC_extra7138='Infrastructure Security'
extra7138(){
for regx in $REGIONS; do
NACL_LIST=$($AWSCLI ec2 describe-network-acls --query 'NetworkAcls[?Entries[?((!PortRange) && (CidrBlock == `0.0.0.0/0`) && (Egress == `false`) && (RuleAction == `allow`))]].{NetworkAclId:NetworkAclId}' $PROFILE_OPT --region $regx --output text 2>&1)
if [[ $(echo "$NACL_LIST" | grep -E 'AccessDenied|UnauthorizedOperation|AuthorizationError') ]]; then
textInfo "$regx: Access Denied trying to describe network acls" "$regx"
continue
fi
if [[ $NACL_LIST ]];then
for NACL in $NACL_LIST;do
textInfo "$regx: Found Network ACL: $NACL open to 0.0.0.0/0 for any port" "$regx" "$NACL"
done
else
textPass "$regx: No Network ACL found with any port open to 0.0.0.0/0" "$regx" "$NACL"
fi
done
}

View File

@@ -1,48 +0,0 @@
#!/usr/bin/env bash
# Prowler - the handy cloud security tool (copyright 2018) by Toni de la Fuente
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed
# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
CHECK_ID_extra729="7.29"
CHECK_TITLE_extra729="[extra729] Ensure there are no EBS Volumes unencrypted"
CHECK_SCORED_extra729="NOT_SCORED"
CHECK_CIS_LEVEL_extra729="EXTRA"
CHECK_SEVERITY_extra729="Medium"
CHECK_ASFF_RESOURCE_TYPE_extra729="AwsEc2Volume"
CHECK_ALTERNATE_check729="extra729"
CHECK_ASFF_COMPLIANCE_TYPE_extra729="ens-mp.info.3.aws.ebs.1"
CHECK_SERVICENAME_extra729="ec2"
CHECK_RISK_extra729='Data encryption at rest prevents data visibility in the event of its unauthorized access or theft.'
CHECK_REMEDIATION_extra729='Encrypt all EBS volumes and Enable Encryption by default You can configure your AWS account to enforce the encryption of the new EBS volumes and snapshot copies that you create. For example; Amazon EBS encrypts the EBS volumes created when you launch an instance and the snapshots that you copy from an unencrypted snapshot.'
CHECK_DOC_extra729='https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/EBSEncryption.html'
CHECK_CAF_EPIC_extra729='Data Protection'
extra729(){
# "Ensure there are no EBS Volumes unencrypted "
for regx in $REGIONS; do
LIST_OF_EBS_NON_ENC_VOLUMES=$($AWSCLI ec2 describe-volumes $PROFILE_OPT --region $regx --query 'Volumes[?Encrypted==`false`].VolumeId' --output text 2>&1)
if [[ $(echo "$LIST_OF_EBS_NON_ENC_VOLUMES" | grep -E 'AccessDenied|UnauthorizedOperation') ]]; then
textInfo "$regx: Access Denied trying to describe volumes" "$regx"
continue
fi
if [[ $LIST_OF_EBS_NON_ENC_VOLUMES ]];then
for volume in $LIST_OF_EBS_NON_ENC_VOLUMES; do
textFail "$regx: $volume is not encrypted!" "$regx" "$volume"
done
fi
LIST_OF_EBS_ENC_VOLUMES=$($AWSCLI ec2 describe-volumes $PROFILE_OPT --region $regx --query 'Volumes[?Encrypted==`true`].VolumeId' --output text)
if [[ $LIST_OF_EBS_ENC_VOLUMES ]];then
for volume in $LIST_OF_EBS_ENC_VOLUMES; do
textPass "$regx: $volume is encrypted" "$regx" "$volume"
done
fi
done
}

View File

@@ -0,0 +1,35 @@
{
"Provider": "aws",
"CheckID": "ec2_ebs_volume_encryption",
"CheckTitle": "Ensure there are no EBS Volumes unencrypted.",
"CheckType": ["Data Protection"],
"ServiceName": "ec2",
"SubServiceName": "volume",
"ResourceIdTemplate": "arn:partition:service:region:account-id:resource-id",
"Severity": "medium",
"ResourceType": "AwsEc2Volume",
"Description": "Ensure there are no EBS Volumes unencrypted.",
"Risk": "Data encryption at rest prevents data visibility in the event of its unauthorized access or theft.",
"RelatedUrl": "",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Encrypt all EBS volumes and Enable Encryption by default You can configure your AWS account to enforce the encryption of the new EBS volumes and snapshot copies that you create. For example; Amazon EBS encrypts the EBS volumes created when you launch an instance and the snapshots that you copy from an unencrypted snapshot.",
"Url": "https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/EBSEncryption.html"
}
},
"Categories": [],
"Tags": {
"Tag1Key": "value",
"Tag2Key": "value"
},
"DependsOn": [],
"RelatedTo": [],
"Notes": "",
"Compliance": []
}

View File

@@ -0,0 +1,20 @@
from lib.check.models import Check, Check_Report
from providers.aws.services.ec2.ec2_client import ec2_client
class ec2_ebs_volume_encryption(Check):
def execute(self):
findings = []
for volume in ec2_client.volumes:
report = Check_Report(self.metadata)
report.region = volume.region
report.resource_id = volume.id
if volume.encrypted:
report.status = "PASS"
report.status_extended = f"EBS Snapshot {volume.id} is encrypted."
else:
report.status = "FAIL"
report.status_extended = f"EBS Snapshot {volume.id} is unencrypted."
findings.append(report)
return findings

View File

@@ -0,0 +1,92 @@
from unittest import mock
from boto3 import resource
from moto import mock_ec2
AWS_REGION = "us-east-1"
class Test_ec2_ebs_volume_encryption:
@mock_ec2
def test_ec2_no_volumes(self):
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.ec2.ec2_service import EC2
current_audit_info.audited_partition = "aws"
with mock.patch(
"providers.aws.services.ec2.ec2_ebs_volume_encryption.ec2_ebs_volume_encryption.ec2_client",
new=EC2(current_audit_info),
):
# Test Check
from providers.aws.services.ec2.ec2_ebs_volume_encryption.ec2_ebs_volume_encryption import (
ec2_ebs_volume_encryption,
)
check = ec2_ebs_volume_encryption()
result = check.execute()
assert len(result) == 0
@mock_ec2
def test_ec2_unencrypted_volume(self):
# Create EC2 Mocked Resources
ec2 = resource("ec2", region_name=AWS_REGION)
volume = ec2.create_volume(Size=80, AvailabilityZone=f"{AWS_REGION}a")
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.ec2.ec2_service import EC2
current_audit_info.audited_partition = "aws"
with mock.patch(
"providers.aws.services.ec2.ec2_ebs_volume_encryption.ec2_ebs_volume_encryption.ec2_client",
new=EC2(current_audit_info),
):
# Test Check
from providers.aws.services.ec2.ec2_ebs_volume_encryption.ec2_ebs_volume_encryption import (
ec2_ebs_volume_encryption,
)
check = ec2_ebs_volume_encryption()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended == f"EBS Snapshot {volume.id} is unencrypted."
)
@mock_ec2
def test_ec2_encrypted_volume(self):
# Create EC2 Mocked Resources
ec2 = resource("ec2", region_name=AWS_REGION)
volume = ec2.create_volume(
Size=80, AvailabilityZone=f"{AWS_REGION}a", Encrypted=True
)
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.ec2.ec2_service import EC2
current_audit_info.audited_partition = "aws"
with mock.patch(
"providers.aws.services.ec2.ec2_ebs_volume_encryption.ec2_ebs_volume_encryption.ec2_client",
new=EC2(current_audit_info),
):
# Test Check
from providers.aws.services.ec2.ec2_ebs_volume_encryption.ec2_ebs_volume_encryption import (
ec2_ebs_volume_encryption,
)
check = ec2_ebs_volume_encryption()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended == f"EBS Snapshot {volume.id} is encrypted."
)

View File

@@ -0,0 +1,35 @@
{
"Provider": "aws",
"CheckID": "ec2_network_acls_allow_ingress_any_port",
"CheckTitle": "Ensure no Network ACLs allow ingress from 0.0.0.0/0 to any port.",
"CheckType": ["Software and Configuration Checks", "Industry and Regulatory Standards", "CIS AWS Foundations Benchmark"],
"ServiceName": "ec2",
"SubServiceName": "networkacl",
"ResourceIdTemplate": "arn:partition:service:region:account-id:resource-id",
"Severity": "high",
"ResourceType": "AwsEc2NetworkAcl",
"Description": "Ensure no Network ACLs allow ingress from 0.0.0.0/0 to any port.",
"Risk": "Even having a perimeter firewall, having network acls open allows any user or malware with vpc access to scan for well known and sensitive ports and gain access to instance.",
"RelatedUrl": "",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Apply Zero Trust approach. Implement a process to scan and remediate unrestricted or overly permissive network acls. Recommended best practices is to narrow the definition for the minimum ports required.",
"Url": "https://docs.aws.amazon.com/vpc/latest/userguide/vpc-network-acls.html"
}
},
"Categories": [],
"Tags": {
"Tag1Key": "value",
"Tag2Key": "value"
},
"DependsOn": [],
"RelatedTo": [],
"Notes": "Infrastructure Security",
"Compliance": []
}

View File

@@ -0,0 +1,26 @@
from lib.check.models import Check, Check_Report
from providers.aws.services.ec2.ec2_client import ec2_client
from providers.aws.services.ec2.lib.network_acls import check_network_acl
class ec2_network_acls_allow_ingress_any_port(Check):
def execute(self):
findings = []
tcp_protocol = "-1"
check_port = 0
for network_acl in ec2_client.network_acls:
report = Check_Report(self.metadata)
report.region = network_acl.region
report.resource_id = network_acl.id
# If some entry allows it, that ACL is not securely configured
if not check_network_acl(network_acl.entries, tcp_protocol, check_port):
report.status = "PASS"
report.status_extended = f"Network ACL {network_acl.id} has not every port open to the Internet."
else:
report.status = "FAIL"
report.status_extended = (
f"Network ACL {network_acl.id} has every port open to the Internet."
)
findings.append(report)
return findings

View File

@@ -0,0 +1,153 @@
from unittest import mock
from boto3 import client
from moto import mock_ec2
AWS_REGION = "us-east-1"
class Test_ec2_network_acls_allow_ingress_any_port:
@mock_ec2
def test_ec2_default_nacls(self):
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.ec2.ec2_service import EC2
current_audit_info.audited_partition = "aws"
with mock.patch(
"providers.aws.services.ec2.ec2_network_acls_allow_ingress_any_port.ec2_network_acls_allow_ingress_any_port.ec2_client",
new=EC2(current_audit_info),
):
# Test Check
from providers.aws.services.ec2.ec2_network_acls_allow_ingress_any_port.ec2_network_acls_allow_ingress_any_port import (
ec2_network_acls_allow_ingress_any_port,
)
check = ec2_network_acls_allow_ingress_any_port()
result = check.execute()
# One default nacl per region
assert len(result) == 23
@mock_ec2
def test_ec2_non_default_compliant_nacl(self):
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.ec2.ec2_service import EC2
current_audit_info.audited_partition = "aws"
with mock.patch(
"providers.aws.services.ec2.ec2_network_acls_allow_ingress_any_port.ec2_network_acls_allow_ingress_any_port.ec2_client",
new=EC2(current_audit_info),
):
# Test Check
from providers.aws.services.ec2.ec2_network_acls_allow_ingress_any_port.ec2_network_acls_allow_ingress_any_port import (
ec2_network_acls_allow_ingress_any_port,
)
check = ec2_network_acls_allow_ingress_any_port()
result = check.execute()
# One default sg per region
assert len(result) == 23
# by default nacls are public
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Network ACL {result[0].resource_id} has every port open to the Internet."
)
@mock_ec2
def test_ec2_non_compliant_nacl(self):
# Create EC2 Mocked Resources
ec2_client = client("ec2", region_name=AWS_REGION)
vpc_id = ec2_client.create_vpc(CidrBlock="10.0.0.0/16")["Vpc"]["VpcId"]
nacl_id = ec2_client.create_network_acl(VpcId=vpc_id)["NetworkAcl"][
"NetworkAclId"
]
ec2_client.create_network_acl_entry(
NetworkAclId=nacl_id,
RuleNumber=100,
Protocol="-1",
RuleAction="allow",
Egress=False,
CidrBlock="0.0.0.0/0",
)
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.ec2.ec2_service import EC2
current_audit_info.audited_partition = "aws"
with mock.patch(
"providers.aws.services.ec2.ec2_network_acls_allow_ingress_any_port.ec2_network_acls_allow_ingress_any_port.ec2_client",
new=EC2(current_audit_info),
):
# Test Check
from providers.aws.services.ec2.ec2_network_acls_allow_ingress_any_port.ec2_network_acls_allow_ingress_any_port import (
ec2_network_acls_allow_ingress_any_port,
)
check = ec2_network_acls_allow_ingress_any_port()
result = check.execute()
# One default sg per region + default of new VPC + new NACL
assert len(result) == 25
# Search changed sg
for nacl in result:
print(nacl.status)
if nacl.resource_id == nacl_id:
assert nacl.status == "FAIL"
assert (
nacl.status_extended
== f"Network ACL {nacl_id} has every port open to the Internet."
)
@mock_ec2
def test_ec2_compliant_nacl(self):
# Create EC2 Mocked Resources
ec2_client = client("ec2", region_name=AWS_REGION)
vpc_id = ec2_client.create_vpc(CidrBlock="10.0.0.0/16")["Vpc"]["VpcId"]
nacl_id = ec2_client.create_network_acl(VpcId=vpc_id)["NetworkAcl"][
"NetworkAclId"
]
ec2_client.create_network_acl_entry(
NetworkAclId=nacl_id,
RuleNumber=100,
Protocol="-1",
RuleAction="allow",
Egress=False,
CidrBlock="10.0.0.2/32",
)
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.ec2.ec2_service import EC2
current_audit_info.audited_partition = "aws"
with mock.patch(
"providers.aws.services.ec2.ec2_network_acls_allow_ingress_any_port.ec2_network_acls_allow_ingress_any_port.ec2_client",
new=EC2(current_audit_info),
):
# Test Check
from providers.aws.services.ec2.ec2_network_acls_allow_ingress_any_port.ec2_network_acls_allow_ingress_any_port import (
ec2_network_acls_allow_ingress_any_port,
)
check = ec2_network_acls_allow_ingress_any_port()
result = check.execute()
# One default sg per region + default of new VPC + new NACL
assert len(result) == 25
# Search changed sg
for nacl in result:
print(nacl.status)
if nacl.resource_id == nacl_id:
assert nacl.status == "PASS"
assert (
nacl.status_extended
== f"Network ACL {nacl_id} has not every port open to the Internet."
)

View File

@@ -14,9 +14,9 @@
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"NativeIaC": "https://docs.bridgecrew.io/docs/ensure-aws-nacl-does-not-allow-ingress-from-00000-to-port-22#cloudformation",
"Other": "",
"Terraform": ""
"Terraform": "https://docs.bridgecrew.io/docs/ensure-aws-nacl-does-not-allow-ingress-from-00000-to-port-22#terraform"
},
"Recommendation": {
"Text": "Apply Zero Trust approach. Implement a process to scan and remediate unrestricted or overly permissive network acls. Recommended best practices is to narrow the definition for the minimum ports required.",

View File

@@ -14,9 +14,9 @@
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"NativeIaC": "https://docs.bridgecrew.io/docs/ensure-aws-nacl-does-not-allow-ingress-from-00000-to-port-3389#cloudformation",
"Other": "",
"Terraform": ""
"Terraform": "https://docs.bridgecrew.io/docs/ensure-aws-nacl-does-not-allow-ingress-from-00000-to-port-3389#terraform"
},
"Recommendation": {
"Text": "Apply Zero Trust approach. Implement a process to scan and remediate unrestricted or overly permissive network acls. Recommended best practices is to narrow the definition for the minimum ports required.",

View File

@@ -7,21 +7,17 @@ class ec2_securitygroup_allow_ingress_from_internet_to_any_port(Check):
def execute(self):
findings = []
for security_group in ec2_client.security_groups:
public = False
report = Check_Report(self.metadata)
report.region = security_group.region
report.status = "PASS"
report.status_extended = f"Security group {security_group.name} ({security_group.id}) has not all ports open to the Internet."
report.resource_id = security_group.id
# Loop through every security group's ingress rule and check it
for ingress_rule in security_group.ingress_rules:
public = check_security_group(ingress_rule, "-1")
# Check
if public:
if check_security_group(ingress_rule, "-1"):
report.status = "FAIL"
report.status_extended = f"Security group {security_group.name} ({security_group.id}) has all ports open to the Internet."
report.resource_id = security_group.id
else:
report.status = "PASS"
report.status_extended = f"Security group {security_group.name} ({security_group.id}) has not all ports open to the Internet."
report.resource_id = security_group.id
findings.append(report)
break
findings.append(report)
return findings

View File

@@ -1,5 +1,4 @@
from lib.check.models import Check, Check_Report
from providers.aws.services.ec2.ec2_client import ec2_client
from providers.aws.services.ec2.lib.security_groups import check_security_group
@@ -9,21 +8,17 @@ class ec2_securitygroup_allow_ingress_from_internet_to_tcp_ftp_port_20_21(Check)
findings = []
check_ports = [20, 21]
for security_group in ec2_client.security_groups:
public = False
report = Check_Report(self.metadata)
report.region = security_group.region
report.status = "PASS"
report.status_extended = f"Security group {security_group.name} ({security_group.id}) has not FTP ports 20 and 21 open to the Internet."
report.resource_id = security_group.id
# Loop through every security group's ingress rule and check it
for ingress_rule in security_group.ingress_rules:
public = check_security_group(ingress_rule, "tcp", check_ports)
# Check
if public:
if check_security_group(ingress_rule, "tcp", check_ports):
report.status = "FAIL"
report.status_extended = f"Security group {security_group.name} ({security_group.id}) has FTP ports 20 and 21 open to the Internet."
report.resource_id = security_group.id
else:
report.status = "PASS"
report.status_extended = f"Security group {security_group.name} ({security_group.id}) has not FTP ports 20 and 21 open to the Internet."
report.resource_id = security_group.id
findings.append(report)
break
findings.append(report)
return findings

View File

@@ -8,21 +8,17 @@ class ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_22(Check):
findings = []
check_ports = [22]
for security_group in ec2_client.security_groups:
public = False
report = Check_Report(self.metadata)
report.region = security_group.region
report.status = "PASS"
report.status_extended = f"Security group {security_group.name} ({security_group.id}) has not SSH port 22 open to the Internet."
report.resource_id = security_group.id
# Loop through every security group's ingress rule and check it
for ingress_rule in security_group.ingress_rules:
public = check_security_group(ingress_rule, "tcp", check_ports)
# Check
if public:
if check_security_group(ingress_rule, "tcp", check_ports):
report.status = "FAIL"
report.status_extended = f"Security group {security_group.name} ({security_group.id}) has the SSH port 22 open to the Internet."
report.resource_id = security_group.id
else:
report.status = "PASS"
report.status_extended = f"Security group {security_group.name} ({security_group.id}) has not SSH port 22 open to the Internet."
report.resource_id = security_group.id
findings.append(report)
break
findings.append(report)
return findings

View File

@@ -8,21 +8,17 @@ class ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_3389(Check):
findings = []
check_ports = [3389]
for security_group in ec2_client.security_groups:
public = False
report = Check_Report(self.metadata)
report.region = security_group.region
report.status = "PASS"
report.status_extended = f"Security group {security_group.name} ({security_group.id}) has not Microsoft RDP port 3389 open to the Internet."
report.resource_id = security_group.id
# Loop through every security group's ingress rule and check it
for ingress_rule in security_group.ingress_rules:
public = check_security_group(ingress_rule, "tcp", check_ports)
# Check
if public:
if check_security_group(ingress_rule, "tcp", check_ports):
report.status = "FAIL"
report.status_extended = f"Security group {security_group.name} ({security_group.id}) has Microsoft RDP port 3389 open to the Internet."
report.resource_id = security_group.id
else:
report.status = "PASS"
report.status_extended = f"Security group {security_group.name} ({security_group.id}) has not Microsoft RDP port 3389 open to the Internet."
report.resource_id = security_group.id
findings.append(report)
break
findings.append(report)
return findings

View File

@@ -8,21 +8,18 @@ class ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_mysql_3306(Check
findings = []
check_ports = [3306]
for security_group in ec2_client.security_groups:
public = False
report = Check_Report(self.metadata)
report.region = security_group.region
report.resource_id = security_group.id
report.status = "PASS"
report.status_extended = f"Security group {security_group.name} ({security_group.id}) has not MySQL port 3306 open to the Internet."
# Loop through every security group's ingress rule and check it
for ingress_rule in security_group.ingress_rules:
public = check_security_group(ingress_rule, "tcp", check_ports)
# Check
if public:
if check_security_group(ingress_rule, "tcp", check_ports):
report.status = "FAIL"
report.status_extended = f"Security group {security_group.name} ({security_group.id}) has the MySQL port 3306 open to the Internet."
report.resource_id = security_group.id
else:
report.status = "PASS"
report.status_extended = f"Security group {security_group.name} ({security_group.id}) has not MySQL port 3306 open to the Internet."
report.resource_id = security_group.id
findings.append(report)
break
findings.append(report)
return findings

View File

@@ -8,21 +8,17 @@ class ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_oracle_1521_2483
findings = []
check_ports = [1521, 2483]
for security_group in ec2_client.security_groups:
public = False
report = Check_Report(self.metadata)
report.region = security_group.region
report.resource_id = security_group.id
report.status = "PASS"
report.status_extended = f"Security group {security_group.name} ({security_group.id}) has not Oracle ports 1521 and 2483 open to the Internet."
# Loop through every security group's ingress rule and check it
for ingress_rule in security_group.ingress_rules:
public = check_security_group(ingress_rule, "tcp", check_ports)
# Check
if public:
if check_security_group(ingress_rule, "tcp", check_ports):
report.status = "FAIL"
report.status_extended = f"Security group {security_group.name} ({security_group.id}) has Oracle ports 1521 and 2483 open to the Internet."
report.resource_id = security_group.id
else:
report.status = "PASS"
report.status_extended = f"Security group {security_group.name} ({security_group.id}) has not Oracle ports 1521 and 2483 open to the Internet."
report.resource_id = security_group.id
findings.append(report)
break
findings.append(report)
return findings

View File

@@ -7,21 +7,18 @@ class ec2_securitygroup_default_restrict_traffic(Check):
def execute(self):
findings = []
for security_group in ec2_client.security_groups:
public = False
report = Check_Report(self.metadata)
report.region = security_group.region
report.resource_id = security_group.id
# Find default security group
if security_group.name == "default":
report.status = "PASS"
report.status_extended = f"Default Security Group ({security_group.id}) is not open to the Internet."
for ingress_rule in security_group.ingress_rules:
public = check_security_group(ingress_rule, "-1")
if public:
report.status = "FAIL"
report.status_extended = f"Default Security Group ({security_group.id}) is open to the Internet."
report.resource_id = security_group.id
else:
report.status = "PASS"
report.status_extended = f"Default Security Group ({security_group.id}) is not open to the Internet."
report.resource_id = security_group.id
if check_security_group(ingress_rule, "-1"):
report.status = "FAIL"
report.status_extended = f"Default Security Group ({security_group.id}) is open to the Internet."
break
findings.append(report)
return findings

View File

@@ -23,6 +23,8 @@ class EC2:
self.__get_snapshot_public__()
self.elastic_ips = []
self.__threading_call__(self.__describe_elastic_ips__)
self.volumes = []
self.__threading_call__(self.__describe_volumes__)
def __get_session__(self):
return self.session
@@ -164,7 +166,7 @@ class EC2:
)
def __describe_elastic_ips__(self, regional_client):
logger.info("EC2 - Describing Security Groups...")
logger.info("EC2 - Describing Network Interfaces...")
try:
describe_network_interfaces_paginator = regional_client.get_paginator(
"describe_network_interfaces"
@@ -186,6 +188,26 @@ class EC2:
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def __describe_volumes__(self, regional_client):
logger.info("EC2 - Describing Volumes...")
try:
describe_volumes_paginator = regional_client.get_paginator(
"describe_volumes"
)
for page in describe_volumes_paginator.paginate():
for volume in page["Volumes"]:
self.volumes.append(
Volume(
volume["VolumeId"],
regional_client.region,
volume["Encrypted"],
)
)
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
@dataclass
class Instance:
@@ -236,6 +258,18 @@ class Snapshot:
self.public = False
@dataclass
class Volume:
id: str
region: str
encrypted: bool
def __init__(self, id, region, encrypted):
self.id = id
self.region = region
self.encrypted = encrypted
@dataclass
class SecurityGroup:
name: str

View File

@@ -0,0 +1,35 @@
{
"Provider": "aws",
"CheckID": "iam_disable_45_days_credentials",
"CheckTitle": "Ensure credentials unused for 45 days or greater are disabled",
"CheckType": ["Software and Configuration Checks"],
"ServiceName": "iam",
"SubServiceName": "",
"ResourceIdTemplate": "arn:partition:service:region:account-id:resource-id",
"Severity": "medium",
"ResourceType": "AwsIamUser",
"Description": "Ensure credentials unused for 45 days or greater are disabled",
"Risk": "To increase the security of your AWS account; remove IAM user credentials (that is; passwords and access keys) that are not needed. For example; when users leave your organization or no longer need AWS access.",
"RelatedUrl": "",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Find the credentials that they were using and ensure that they are no longer operational. Ideally; you delete credentials if they are no longer needed. You can always recreate them at a later date if the need arises. At the very least; you should change the password or deactivate the access keys so that the former users no longer have access.",
"Url": "https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_finding-unused.html"
}
},
"Categories": [],
"Tags": {
"Tag1Key": "value",
"Tag2Key": "value"
},
"DependsOn": [],
"RelatedTo": [],
"Notes": "",
"Compliance": []
}

View File

@@ -0,0 +1,41 @@
import datetime
from lib.check.models import Check, Check_Report
from providers.aws.services.iam.iam_client import iam_client
maximum_expiration_days = 45
class iam_disable_45_days_credentials(Check):
def execute(self) -> Check_Report:
findings = []
response = iam_client.users
for user in response:
report = Check_Report(self.metadata)
report.resource_id = user.name
report.resource_arn = user.arn
report.region = iam_client.region
if user.password_last_used:
time_since_insertion = (
datetime.datetime.now()
- datetime.datetime.strptime(
str(user.password_last_used), "%Y-%m-%d %H:%M:%S+00:00"
)
)
if time_since_insertion.days > maximum_expiration_days:
report.status = "FAIL"
report.status_extended = f"User {user.name} has not logged into the console in the past 45 days."
else:
report.status = "PASS"
report.status_extended = f"User {user.name} has logged into the console in the past 45 days."
else:
report.status = "PASS"
report.status_extended = (
f"User {user.name} has not a console password or is unused."
)
# Append report
findings.append(report)
return findings

View File

@@ -0,0 +1,97 @@
import datetime
from re import search
from unittest import mock
from boto3 import client
from moto import mock_iam
class Test_iam_disable_45_days_credentials_test:
@mock_iam
def test_iam_user_logged_45_days(self):
password_last_used = (
datetime.datetime.now() - datetime.timedelta(days=2)
).strftime("%Y-%m-%d %H:%M:%S+00:00")
iam_client = client("iam")
user = "test-user"
arn = iam_client.create_user(UserName=user)["User"]["Arn"]
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.iam.iam_service import IAM
with mock.patch(
"providers.aws.services.iam.iam_disable_45_days_credentials.iam_disable_45_days_credentials.iam_client",
new=IAM(current_audit_info),
) as service_client:
from providers.aws.services.iam.iam_disable_45_days_credentials.iam_disable_45_days_credentials import (
iam_disable_45_days_credentials,
)
service_client.users[0].password_last_used = password_last_used
check = iam_disable_45_days_credentials()
result = check.execute()
assert result[0].status == "PASS"
assert search(
f"User {user} has logged into the console in the past 45 days.",
result[0].status_extended,
)
assert result[0].resource_id == user
assert result[0].resource_arn == arn
@mock_iam
def test_iam_user_not_logged_45_days(self):
password_last_used = (
datetime.datetime.now() - datetime.timedelta(days=60)
).strftime("%Y-%m-%d %H:%M:%S+00:00")
iam_client = client("iam")
user = "test-user"
arn = iam_client.create_user(UserName=user)["User"]["Arn"]
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.iam.iam_service import IAM
with mock.patch(
"providers.aws.services.iam.iam_disable_45_days_credentials.iam_disable_45_days_credentials.iam_client",
new=IAM(current_audit_info),
) as service_client:
from providers.aws.services.iam.iam_disable_45_days_credentials.iam_disable_45_days_credentials import (
iam_disable_45_days_credentials,
)
service_client.users[0].password_last_used = password_last_used
check = iam_disable_45_days_credentials()
result = check.execute()
assert result[0].status == "FAIL"
assert search(
f"User {user} has not logged into the console in the past 45 days.",
result[0].status_extended,
)
assert result[0].resource_id == user
assert result[0].resource_arn == arn
@mock_iam
def test_iam_user_not_logged(self):
iam_client = client("iam")
user = "test-user"
arn = iam_client.create_user(UserName=user)["User"]["Arn"]
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.iam.iam_service import IAM
with mock.patch(
"providers.aws.services.iam.iam_disable_45_days_credentials.iam_disable_45_days_credentials.iam_client",
new=IAM(current_audit_info),
) as service_client:
from providers.aws.services.iam.iam_disable_45_days_credentials.iam_disable_45_days_credentials import (
iam_disable_45_days_credentials,
)
service_client.users[0].password_last_used = ""
print(service_client.users)
# raise Exception
check = iam_disable_45_days_credentials()
result = check.execute()
assert result[0].status == "PASS"
assert search(
f"User {user} has not a console password or is unused.",
result[0].status_extended,
)
assert result[0].resource_id == user
assert result[0].resource_arn == arn

View File

@@ -0,0 +1,35 @@
{
"Provider": "aws",
"CheckID": "iam_no_expired_server_certificates_stored",
"CheckTitle": "Ensure that all the expired SSL/TLS certificates stored in AWS IAM are removed.",
"CheckType": ["Software and Configuration Checks", "Industry and Regulatory Standards" ,"CIS AWS Foundations Benchmark"],
"ServiceName": "iam",
"SubServiceName": "",
"ResourceIdTemplate": "arn:partition:service:region:account-id:resource-id",
"Severity": "critical",
"ResourceType": "AwsIamUser",
"Description": "Ensure that all the expired SSL/TLS certificates stored in AWS IAM are removed.",
"Risk": "Removing expired SSL/TLS certificates eliminates the risk that an invalid certificate will be deployed accidentally to a resource such as AWS Elastic Load Balancer (ELB), which can damage the credibility of the application/website behind the ELB.",
"RelatedUrl": "",
"Remediation": {
"Code": {
"CLI": "aws iam delete-server-certificate --server-certificate-name <CERTIFICATE_NAME",
"NativeIaC": "",
"Other": "Removing expired certificates via AWS Management Console is not currently supported.",
"Terraform": ""
},
"Recommendation": {
"Text": "Deleting the certificate could have implications for your application if you are using an expired server certificate with Elastic Load Balancing, CloudFront, etc. One has to make configurations at respective services to ensure there is no interruption in application functionality.",
"Url": "https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_server-certs.html"
}
},
"Categories": [],
"Tags": {
"Tag1Key": "value",
"Tag2Key": "value"
},
"DependsOn": [],
"RelatedTo": [],
"Notes": "Data Protection",
"Compliance": []
}

View File

@@ -0,0 +1,28 @@
from datetime import datetime, timezone
from lib.check.models import Check, Check_Report
from providers.aws.services.iam.iam_client import iam_client
class iam_no_expired_server_certificates_stored(Check):
def execute(self) -> Check_Report:
findings = []
for certificate in iam_client.server_certificates:
report = Check_Report(self.metadata)
report.region = iam_client.region
report.resource_id = certificate.id
report.resource_arn = certificate.arn
expiration_days = (datetime.now(timezone.utc) - certificate.expiration).days
print(certificate.expiration)
if expiration_days >= 0:
report.status = "FAIL"
report.status_extended = f"IAM Certificate {certificate.name} has expired {expiration_days} days ago."
else:
report.status = "PASS"
report.status_extended = (
f"IAM Certificate {certificate.name} is not expired."
)
findings.append(report)
return findings

View File

@@ -0,0 +1,57 @@
from re import search
from unittest import mock
from boto3 import client
from moto import mock_iam
class Test_iam_no_expired_server_certificates_stored_test:
@mock_iam
def test_no_certificates(self):
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.iam.iam_service import IAM
with mock.patch(
"providers.aws.services.iam.iam_no_expired_server_certificates_stored.iam_no_expired_server_certificates_stored.iam_client",
new=IAM(current_audit_info),
):
from providers.aws.services.iam.iam_no_expired_server_certificates_stored.iam_no_expired_server_certificates_stored import (
iam_no_expired_server_certificates_stored,
)
check = iam_no_expired_server_certificates_stored()
result = check.execute()
assert len(result) == 0
@mock_iam
def test_expired_certificate(self):
iam_client = client("iam")
# moto creates an expired certificate by default
cert = iam_client.upload_server_certificate(
ServerCertificateName="certname",
CertificateBody="certbody",
PrivateKey="privatekey",
)["ServerCertificateMetadata"]
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.iam.iam_service import IAM
with mock.patch(
"providers.aws.services.iam.iam_no_expired_server_certificates_stored.iam_no_expired_server_certificates_stored.iam_client",
new=IAM(current_audit_info),
):
from providers.aws.services.iam.iam_no_expired_server_certificates_stored.iam_no_expired_server_certificates_stored import (
iam_no_expired_server_certificates_stored,
)
check = iam_no_expired_server_certificates_stored()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert search(
"IAM Certificate certname has expired", result[0].status_extended
)
assert result[0].resource_id == cert["ServerCertificateId"]
assert result[0].resource_arn == cert["Arn"]

View File

@@ -13,10 +13,10 @@
"RelatedUrl": "",
"Remediation": {
"Code": {
"CLI": "",
"CLI": "https://docs.bridgecrew.io/docs/iam_47#cli-command",
"NativeIaC": "",
"Other": "",
"Terraform": ""
"Other": "https://docs.bridgecrew.io/docs/iam_47#aws-console",
"Terraform": "https://docs.bridgecrew.io/docs/iam_47#terraform"
},
"Recommendation": {
"Text": "It is more secure to start with a minimum set of permissions and grant additional permissions as necessary; rather than starting with permissions that are too lenient and then trying to tighten them later. List policies an analyze if permissions are the least possible to conduct business activities.",

View File

@@ -1,5 +1,6 @@
import csv
from dataclasses import dataclass
from datetime import datetime
from lib.logger import logger
from providers.aws.aws_provider import get_region_global_service
@@ -33,6 +34,7 @@ class IAM:
self.policies = self.__list_policies__()
self.list_policies_version = self.__list_policies_version__(self.policies)
self.saml_providers = self.__list_saml_providers__()
self.server_certificates = self.__list_server_certificates__()
def __get_client__(self):
return self.client
@@ -367,6 +369,28 @@ class IAM:
finally:
return saml_providers
def __list_server_certificates__(self):
try:
server_certificates = []
for certificate in self.client.list_server_certificates()[
"ServerCertificateMetadataList"
]:
server_certificates.append(
Certificate(
certificate["ServerCertificateName"],
certificate["ServerCertificateId"],
certificate["Arn"],
certificate["Expiration"],
)
)
except Exception as error:
logger.error(
f"{self.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
finally:
return server_certificates
@dataclass
class MFADevice:
@@ -446,3 +470,17 @@ class PasswordPolicy:
self.max_age = max_age
self.reuse_prevention = reuse_prevention
self.hard_expiry = hard_expiry
@dataclass
class Certificate:
name: str
id: str
arn: str
expiration: datetime
def __init__(self, name, id, arn, expiration):
self.name = name
self.id = id
self.arn = arn
self.expiration = expiration

View File

View File

@@ -1,35 +0,0 @@
#!/usr/bin/env bash
# Prowler - the handy cloud security tool (copyright 2018) by Toni de la Fuente
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed
# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
CHECK_ID_extra712="7.12"
CHECK_TITLE_extra712="[extra712] Check if Amazon Macie is enabled"
CHECK_SCORED_extra712="NOT_SCORED"
CHECK_CIS_LEVEL_extra712="EXTRA"
CHECK_SEVERITY_extra712="Low"
CHECK_ALTERNATE_check712="extra712"
CHECK_ASFF_RESOURCE_TYPE_extra712="AwsMacieSession"
CHECK_SERVICENAME_extra712="macie"
CHECK_RISK_extra712='Amazon Macie is a fully managed data security and data privacy service that uses machine learning and pattern matching to help you discover; monitor; and protect your sensitive data in AWS.'
CHECK_REMEDIATION_extra712='Enable Amazon Macie and create appropriate jobs to discover sensitive data.'
CHECK_DOC_extra712='https://docs.aws.amazon.com/macie/latest/user/getting-started.html'
CHECK_CAF_EPIC_extra712='Data Protection'
extra712(){
# "No API commands available to check if Macie is enabled,"
# "just looking if IAM Macie related permissions exist. "
MACIE_IAM_ROLES_CREATED=$($AWSCLI iam list-roles $PROFILE_OPT --query 'Roles[*].Arn'|grep AWSMacieServiceCustomer|wc -l)
if [[ $MACIE_IAM_ROLES_CREATED -eq 2 ]];then
textPass "$REGION: Macie related IAM roles exist so it might be enabled. Check it out manually" "$REGION"
else
textFail "$REGION: No Macie related IAM roles found. It is most likely not to be enabled" "$REGION"
fi
}

View File

@@ -0,0 +1,4 @@
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.macie.macie_service import Macie
macie_client = Macie(current_audit_info)

View File

@@ -0,0 +1,35 @@
{
"Provider": "aws",
"CheckID": "macie_is_enabled",
"CheckTitle": "Check if Amazon Macie is enabled.",
"CheckType": ["Data Protection"],
"ServiceName": "macie",
"SubServiceName": "",
"ResourceIdTemplate": "arn:partition:access-analyzer:region:account-id:analyzer/resource-id",
"Severity": "low",
"ResourceType": "AwsMacieSession",
"Description": "Check if Amazon Macie is enabled.",
"Risk": "Amazon Macie is a fully managed data security and data privacy service that uses machine learning and pattern matching to help you discover, monitor and protect your sensitive data in AWS.",
"RelatedUrl": "",
"Remediation": {
"Code": {
"CLI": "aws macie2 enable-macie",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Enable Amazon Macie and create appropriate jobs to discover sensitive data.",
"Url": "https://aws.amazon.com/macie/getting-started/"
}
},
"Categories": [],
"Tags": {
"Tag1Key": "value",
"Tag2Key": "value"
},
"DependsOn": [],
"RelatedTo": [],
"Notes": "",
"Compliance": []
}

View File

@@ -0,0 +1,23 @@
from lib.check.models import Check, Check_Report
from providers.aws.services.macie.macie_client import macie_client
class macie_is_enabled(Check):
def execute(self):
findings = []
for session in macie_client.sessions:
report = Check_Report(self.metadata)
report.region = session.region
report.resource_id = "Macie"
if session.status == "ENABLED":
report.status = "PASS"
report.status_extended = "Macie is enabled."
elif session.status == "PAUSED":
report.status = "FAIL"
report.status_extended = "Macie is currently in a SUSPENDED state."
else:
report.status = "FAIL"
report.status_extended = "Macie is not enabled."
findings.append(report)
return findings

View File

@@ -0,0 +1,82 @@
from unittest import mock
from providers.aws.services.macie.macie_service import Session
class Test_macie_is_enabled:
def test_macie_disabled(self):
macie_client = mock.MagicMock
macie_client.sessions = [
Session(
"DISABLED",
"eu-west-1",
)
]
with mock.patch(
"providers.aws.services.macie.macie_service.Macie",
new=macie_client,
):
# Test Check
from providers.aws.services.macie.macie_is_enabled.macie_is_enabled import (
macie_is_enabled,
)
check = macie_is_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert result[0].status_extended == "Macie is not enabled."
assert result[0].resource_id == "Macie"
def test_macie_enabled(self):
macie_client = mock.MagicMock
macie_client.sessions = [
Session(
"ENABLED",
"eu-west-1",
)
]
with mock.patch(
"providers.aws.services.macie.macie_service.Macie",
new=macie_client,
):
# Test Check
from providers.aws.services.macie.macie_is_enabled.macie_is_enabled import (
macie_is_enabled,
)
check = macie_is_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert result[0].status_extended == "Macie is enabled."
assert result[0].resource_id == "Macie"
def test_macie_suspended(self):
macie_client = mock.MagicMock
macie_client.sessions = [
Session(
"PAUSED",
"eu-west-1",
)
]
with mock.patch(
"providers.aws.services.macie.macie_service.Macie",
new=macie_client,
):
# Test Check
from providers.aws.services.macie.macie_is_enabled.macie_is_enabled import (
macie_is_enabled,
)
check = macie_is_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended == "Macie is currently in a SUSPENDED state."
)
assert result[0].resource_id == "Macie"

View File

@@ -0,0 +1,65 @@
import threading
from dataclasses import dataclass
from lib.logger import logger
from providers.aws.aws_provider import generate_regional_clients
################## Macie
class Macie:
def __init__(self, audit_info):
self.service = "macie2"
self.session = audit_info.audit_session
self.audited_account = audit_info.audited_account
self.regional_clients = generate_regional_clients(self.service, audit_info)
self.sessions = []
self.__threading_call__(self.__get_macie_session__)
def __get_session__(self):
return self.session
def __threading_call__(self, call):
threads = []
for regional_client in self.regional_clients.values():
threads.append(threading.Thread(target=call, args=(regional_client,)))
for t in threads:
t.start()
for t in threads:
t.join()
def __get_macie_session__(self, regional_client):
logger.info("Macie - Get Macie Session...")
try:
self.sessions.append(
Session(
regional_client.get_macie_session()["status"],
regional_client.region,
)
)
except Exception as error:
if "Macie is not enabled" in str(error):
self.sessions.append(
Session(
"DISABLED",
regional_client.region,
)
)
else:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
@dataclass
class Session:
status: str
region: str
def __init__(
self,
status,
region,
):
self.status = status
self.region = region

View File

@@ -0,0 +1,75 @@
import datetime
from unittest.mock import patch
import botocore
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.macie.macie_service import Macie, Session
# Mock Test Region
AWS_REGION = "eu-west-1"
# Mocking Macie2 Calls
make_api_call = botocore.client.BaseClient._make_api_call
# As you can see the operation_name has the list_sessions snake_case form but
# we are using the GetMacieSession form.
# Rationale -> https://github.com/boto/botocore/blob/develop/botocore/client.py#L810:L816
#
# We have to mock every AWS API call using Boto3
def mock_make_api_call(self, operation_name, kwarg):
if operation_name == "GetMacieSession":
return {
"createdAt": datetime(2015, 1, 1),
"findingPublishingFrequency": "SIX_HOURS",
"serviceRole": "string",
"status": "ENABLED",
"updatedAt": datetime(2015, 1, 1),
}
return make_api_call(self, operation_name, kwarg)
# Mock generate_regional_clients()
def mock_generate_regional_clients(service, audit_info):
regional_client = audit_info.audit_session.client(service, region_name=AWS_REGION)
regional_client.region = AWS_REGION
return {AWS_REGION: regional_client}
# Patch every AWS call using Boto3 and generate_regional_clients to have 1 client
@patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call)
@patch(
"providers.aws.services.macie.macie_service.generate_regional_clients",
new=mock_generate_regional_clients,
)
class Test_Macie_Service:
# Test Macie Client
def test__get_client__(self):
macie = Macie(current_audit_info)
assert macie.regional_clients[AWS_REGION].__class__.__name__ == "Macie2"
# Test Macie Session
def test__get_session__(self):
macie = Macie(current_audit_info)
assert macie.session.__class__.__name__ == "Session"
# Test Macie Service
def test__get_service__(self):
macie = Macie(current_audit_info)
assert macie.service == "macie2"
def test__get_macie_session__(self):
# Set partition for the service
current_audit_info.audited_partition = "aws"
macie = Macie(current_audit_info)
macie.sessions = [
Session(
"ENABLED",
"eu-west-1",
)
]
assert len(macie.sessions) == 1
assert macie.sessions[0].status == "ENABLED"
assert macie.sessions[0].region == AWS_REGION

View File

@@ -0,0 +1,35 @@
{
"Provider": "aws",
"CheckID": "s3_bucket_no_mfa_delete",
"CheckTitle": "Check if S3 bucket MFA Delete is not enabled.",
"CheckType": ["Logging and Monitoring"],
"ServiceName": "s3",
"SubServiceName": "",
"ResourceIdTemplate": "arn:partition:service:region:account-id:resource-id",
"Severity": "medium",
"ResourceType": "AwsS3Bucket",
"Description": "Check if S3 bucket MFA Delete is not enabled.",
"Risk": "Your security credentials are compromised or unauthorized access is granted.",
"RelatedUrl": "",
"Remediation": {
"Code": {
"CLI": "aws s3api put-bucket-versioning --profile my-root-profile --bucket my-bucket-name --versioning-configuration Status=Enabled,MFADelete=Enabled --mfa 'arn:aws:iam::00000000:mfa/root-account-mfa-device 123456'",
"NativeIaC": "",
"Other": "",
"Terraform": "https://docs.bridgecrew.io/docs/bc_aws_s3_24#terraform"
},
"Recommendation": {
"Text": "Adding MFA delete to an S3 bucket, requires additional authentication when you change the version state of your bucket or you delete and object version adding another layer of security in the event your security credentials are compromised or unauthorized access is granted.",
"Url": "https://docs.aws.amazon.com/AmazonS3/latest/userguide/MultiFactorAuthenticationDelete.html"
}
},
"Categories": [],
"Tags": {
"Tag1Key": "value",
"Tag2Key": "value"
},
"DependsOn": [],
"RelatedTo": [],
"Notes": "",
"Compliance": []
}

View File

@@ -0,0 +1,24 @@
from lib.check.models import Check, Check_Report
from providers.aws.services.s3.s3_client import s3_client
class s3_bucket_no_mfa_delete(Check):
def execute(self):
findings = []
for bucket in s3_client.buckets:
report = Check_Report(self.metadata)
report.region = bucket.region
report.resource_id = bucket.name
if bucket.mfa_delete:
report.status = "PASS"
report.status_extended = (
f"S3 Bucket {bucket.name} has MFA Delete enabled."
)
else:
report.status = "FAIL"
report.status_extended = (
f"S3 Bucket {bucket.name} has MFA Delete disabled."
)
findings.append(report)
return findings

View File

@@ -0,0 +1,97 @@
from re import search
from unittest import mock
from boto3 import client
from moto import mock_s3
ACCOUNT_ID = "123456789012"
class Test_s3_bucket_no_mfa_delete:
@mock_s3
def test_no_buckets(self):
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.s3.s3_service import S3
current_audit_info.audited_partition = "aws"
with mock.patch(
"providers.aws.services.s3.s3_bucket_no_mfa_delete.s3_bucket_no_mfa_delete.s3_client",
new=S3(current_audit_info),
):
# Test Check
from providers.aws.services.s3.s3_bucket_no_mfa_delete.s3_bucket_no_mfa_delete import (
s3_bucket_no_mfa_delete,
)
check = s3_bucket_no_mfa_delete()
result = check.execute()
assert len(result) == 0
@mock_s3
def test_bucket_without_mfa(self):
s3_client_us_east_1 = client("s3", region_name="us-east-1")
bucket_name_us = "bucket_test_us"
s3_client_us_east_1.create_bucket(Bucket=bucket_name_us)
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.s3.s3_service import S3
current_audit_info.audited_partition = "aws"
with mock.patch(
"providers.aws.services.s3.s3_bucket_no_mfa_delete.s3_bucket_no_mfa_delete.s3_client",
new=S3(current_audit_info),
):
# Test Check
from providers.aws.services.s3.s3_bucket_no_mfa_delete.s3_bucket_no_mfa_delete import (
s3_bucket_no_mfa_delete,
)
check = s3_bucket_no_mfa_delete()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert search(
"MFA Delete disabled",
result[0].status_extended,
)
assert result[0].resource_id == bucket_name_us
@mock_s3
def test_bucket_with_mfa(self):
s3_client_us_east_1 = client("s3", region_name="us-east-1")
bucket_name_us = "bucket_test_us"
s3_client_us_east_1.create_bucket(Bucket=bucket_name_us)
s3_client_us_east_1.put_bucket_versioning(
Bucket=bucket_name_us,
VersioningConfiguration={"MFADelete": "Enabled", "Status": "Enabled"},
)
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.s3.s3_service import S3
current_audit_info.audited_partition = "aws"
with mock.patch(
"providers.aws.services.s3.s3_bucket_no_mfa_delete.s3_bucket_no_mfa_delete.s3_client",
new=S3(current_audit_info),
) as service_client:
# Test Check
from providers.aws.services.s3.s3_bucket_no_mfa_delete.s3_bucket_no_mfa_delete import (
s3_bucket_no_mfa_delete,
)
service_client.buckets[0].mfa_delete = True
check = s3_bucket_no_mfa_delete()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert search(
"MFA Delete enabled",
result[0].status_extended,
)
assert result[0].resource_id == bucket_name_us

View File

@@ -76,6 +76,9 @@ class S3:
if "Status" in bucket_versioning:
if "Enabled" == bucket_versioning["Status"]:
bucket.versioning = True
if "MFADelete" in bucket_versioning:
if "Enabled" == bucket_versioning["MFADelete"]:
bucket.mfa_delete = True
except Exception as error:
logger.error(
f"{bucket.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
@@ -253,6 +256,7 @@ class Bucket:
region: str
logging_target_bucket: str
ownership: str
mfa_delete: bool
def __init__(self, name, region):
self.name = name
@@ -273,3 +277,4 @@ class Bucket:
self.region = region
self.logging_target_bucket = None
self.ownership = None
self.mfa_delete = False