feat(config): add config service and checks and check43 (#1441)

Co-authored-by: Pepe Fagoaga <pepe@verica.io>
This commit is contained in:
Sergio Garcia
2022-10-31 14:37:59 +01:00
committed by GitHub
parent adf04ba632
commit 3e749dd652
34 changed files with 563 additions and 424 deletions

View File

@@ -84,6 +84,8 @@ class Test_autoscaling_find_secrets_ec2_launch_configuration:
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.autoscaling.autoscaling_service import AutoScaling
current_audit_info.audited_partition = "aws"
with mock.patch(
"providers.aws.services.autoscaling.autoscaling_find_secrets_ec2_launch_configuration.autoscaling_find_secrets_ec2_launch_configuration.autoscaling_client",
new=AutoScaling(current_audit_info),
@@ -124,6 +126,8 @@ class Test_autoscaling_find_secrets_ec2_launch_configuration:
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.autoscaling.autoscaling_service import AutoScaling
current_audit_info.audited_partition = "aws"
with mock.patch(
"providers.aws.services.autoscaling.autoscaling_find_secrets_ec2_launch_configuration.autoscaling_find_secrets_ec2_launch_configuration.autoscaling_client",
new=AutoScaling(current_audit_info),
@@ -158,6 +162,8 @@ class Test_autoscaling_find_secrets_ec2_launch_configuration:
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.autoscaling.autoscaling_service import AutoScaling
current_audit_info.audited_partition = "aws"
with mock.patch(
"providers.aws.services.autoscaling.autoscaling_find_secrets_ec2_launch_configuration.autoscaling_find_secrets_ec2_launch_configuration.autoscaling_client",
new=AutoScaling(current_audit_info),

View File

@@ -1,55 +0,0 @@
#!/usr/bin/env bash
# Prowler - the handy cloud security tool (copyright 2019) by Toni de la Fuente
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed
# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
#
# Remediation:
#
# https://d1.awsstatic.com/whitepapers/compliance/AWS_CIS_Foundations_Benchmark.pdf
#
# aws logs put-metric-filter \
# --region us-east-1 \
# --log-group-name CloudTrail/CloudWatchLogGroup \
# --filter-name S3BucketConfigChanges \
# --filter-pattern '{ ($.eventSource = s3.amazonaws.com) && (($.eventName = PutBucketAcl) || ($.eventName = PutBucketPolicy) || ($.eventName = PutBucketCors) || ($.eventName = PutBucketLifecycle) || ($.eventName = PutBucketReplication) || ($.eventName = DeleteBucketPolicy) || ($.eventName = DeleteBucketCors) || ($.eventName = DeleteBucketLifecycle) || ($.eventName = DeleteBucketReplication)) }' \
# --metric-transformations metricName=S3BucketEventCount,metricNamespace=CloudTrailMetrics,metricValue=1
#
# aws cloudwatch put-metric-alarm \
# --region us-east-1 \
# --alarm-name S3BucketConfigChangesAlarm \
# --alarm-description "Triggered by AWS S3 Bucket config changes." \
# --metric-name S3BucketEventCount \
# --namespace CloudTrailMetrics \
# --statistic Sum \
# --comparison-operator GreaterThanOrEqualToThreshold \
# --evaluation-periods 1 \
# --period 300 \
# --threshold 1 \
# --actions-enabled \
# --alarm-actions arn:aws:sns:us-east-1:123456789012:CloudWatchAlarmTopic
CHECK_ID_check38="3.8"
CHECK_TITLE_check38="[check38] Ensure a log metric filter and alarm exist for S3 bucket policy changes"
CHECK_SCORED_check38="SCORED"
CHECK_CIS_LEVEL_check38="LEVEL1"
CHECK_SEVERITY_check38="Medium"
CHECK_ASFF_TYPE_check38="Software and Configuration Checks/Industry and Regulatory Standards/CIS AWS Foundations Benchmark"
CHECK_ASFF_RESOURCE_TYPE_check38="AwsCloudTrailTrail"
CHECK_ALTERNATE_check308="check38"
CHECK_SERVICENAME_check38="s3"
CHECK_RISK_check38='Monitoring unauthorized API calls will help reveal application errors and may reduce time to detect malicious activity.'
CHECK_REMEDIATION_check38='It is recommended that a metric filter and alarm be established for unauthorized requests.'
CHECK_DOC_check38='https://docs.aws.amazon.com/awscloudtrail/latest/userguide/cloudwatch-alarms-for-cloudtrail.html'
CHECK_CAF_EPIC_check38='Logging and Monitoring'
check38(){
check3x '\$\.eventSource\s*=\s*s3.amazonaws.com.+\$\.eventName\s*=\s*PutBucketAcl.+\$\.eventName\s*=\s*PutBucketPolicy.+\$\.eventName\s*=\s*PutBucketCors.+\$\.eventName\s*=\s*PutBucketLifecycle.+\$\.eventName\s*=\s*PutBucketReplication.+\$\.eventName\s*=\s*DeleteBucketPolicy.+\$\.eventName\s*=\s*DeleteBucketCors.+\$\.eventName\s*=\s*DeleteBucketLifecycle.+\$\.eventName\s*=\s*DeleteBucketReplication'
}

View File

@@ -1,47 +0,0 @@
#!/usr/bin/env bash
# Prowler - the handy cloud security tool (copyright 2019) by Toni de la Fuente
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed
# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
CHECK_ID_check25="2.5"
CHECK_TITLE_check25="[check25] Ensure AWS Config is enabled in all regions"
CHECK_SCORED_check25="SCORED"
CHECK_CIS_LEVEL_check25="LEVEL1"
CHECK_SEVERITY_check25="Medium"
CHECK_ASFF_TYPE_check25="Software and Configuration Checks/Industry and Regulatory Standards/CIS AWS Foundations Benchmark"
CHECK_ALTERNATE_check205="check25"
CHECK_ASFF_COMPLIANCE_TYPE_check25="ens-op.exp.1.aws.cfg.1"
CHECK_SERVICENAME_check25="config"
CHECK_RISK_check25='The AWS configuration item history captured by AWS Config enables security analysis; resource change tracking; and compliance auditing.'
CHECK_REMEDIATION_check25='It is recommended to enable AWS Config be enabled in all regions.'
CHECK_DOC_check25='https://aws.amazon.com/blogs/mt/aws-config-best-practices/'
CHECK_CAF_EPIC_check25='Logging and Monitoring'
check25(){
# "Ensure AWS Config is enabled in all regions (Scored)"
for regx in $REGIONS; do
CHECK_AWSCONFIG_RECORDING=$($AWSCLI configservice describe-configuration-recorder-status $PROFILE_OPT --region $regx --query 'ConfigurationRecordersStatus[*].recording' --output text 2>&1)
CHECK_AWSCONFIG_STATUS=$($AWSCLI configservice describe-configuration-recorder-status $PROFILE_OPT --region $regx --query 'ConfigurationRecordersStatus[*].lastStatus' --output text 2>&1)
if [[ $(echo "$CHECK_AWSCONFIG_STATUS" | grep AccessDenied) ]]; then
textInfo "$regx: Access Denied trying to describe configuration recorder status" "$regx" "recorder"
continue
fi
if [[ $CHECK_AWSCONFIG_RECORDING == "True" ]]; then
if [[ $CHECK_AWSCONFIG_STATUS == "SUCCESS" ]]; then
textPass "$regx: AWS Config recorder enabled" "$regx" "recorder"
else
textFail "$regx: AWS Config recorder in failure state" "$regx" "recorder"
fi
else
textFail "$regx: AWS Config recorder disabled" "$regx" "recorder"
fi
done
}

View File

@@ -0,0 +1,4 @@
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.config.config_service import Config
config_client = Config(current_audit_info)

View File

@@ -0,0 +1,35 @@
{
"Provider": "aws",
"CheckID": "config_recorder_all_regions_enabled",
"CheckTitle": "Ensure AWS Config is enabled in all regions.",
"CheckType": ["Logging and Monitoring"],
"ServiceName": "config",
"SubServiceName": "",
"ResourceIdTemplate": "arn:partition:access-recorder:region:account-id:recorder/resource-id",
"Severity": "medium",
"ResourceType": "Other",
"Description": "Ensure AWS Config is enabled in all regions.",
"Risk": "The AWS configuration item history captured by AWS Config enables security analysis, resource change tracking and compliance auditing.",
"RelatedUrl": "https://aws.amazon.com/blogs/mt/aws-config-best-practices/",
"Remediation": {
"Code": {
"CLI": "https://docs.bridgecrew.io/docs/logging_5-enable-aws-config-regions#cli-command",
"NativeIaC": "",
"Other": "https://docs.bridgecrew.io/docs/logging_5-enable-aws-config-regions#aws-console",
"Terraform": "https://docs.bridgecrew.io/docs/logging_5-enable-aws-config-regions#terraform"
},
"Recommendation": {
"Text": "It is recommended to enable AWS Config be enabled in all regions.",
"Url": "https://aws.amazon.com/blogs/mt/aws-config-best-practices/"
}
},
"Categories": [],
"Tags": {
"Tag1Key": "value",
"Tag2Key": "value"
},
"DependsOn": [],
"RelatedTo": [],
"Notes": "",
"Compliance": []
}

View File

@@ -0,0 +1,35 @@
from lib.check.models import Check, Check_Report
from providers.aws.services.config.config_client import config_client
class config_recorder_all_regions_enabled(Check):
def execute(self):
findings = []
for recorder in config_client.recorders:
report = Check_Report(self.metadata)
report.region = recorder.region
report.resource_id = recorder.name
# Check if Config is enabled in region
if not recorder.name:
report.status = "FAIL"
report.status_extended = f"No AWS Config recorders in region."
else:
if recorder.recording:
if recorder.last_status == "Failure":
report.status = "FAIL"
report.status_extended = (
f"AWS Config recorder {recorder.name} in failure state."
)
else:
report.status = "PASS"
report.status_extended = (
f"AWS Config recorder {recorder.name} is enabled."
)
else:
report.status = "FAIL"
report.status_extended = (
f"AWS Config recorder {recorder.name} is disabled."
)
findings.append(report)
return findings

View File

@@ -0,0 +1,107 @@
from unittest import mock
from boto3 import client
from moto import mock_config
AWS_REGION = "us-east-1"
class Test_config_recorder_all_regions_enabled:
@mock_config
def test_config_no_recorders(self):
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.config.config_service import Config
current_audit_info.audited_partition = "aws"
with mock.patch(
"providers.aws.services.config.config_recorder_all_regions_enabled.config_recorder_all_regions_enabled.config_client",
new=Config(current_audit_info),
):
# Test Check
from providers.aws.services.config.config_recorder_all_regions_enabled.config_recorder_all_regions_enabled import (
config_recorder_all_regions_enabled,
)
check = config_recorder_all_regions_enabled()
result = check.execute()
assert (
len(result) == 23
) # One fail result per region, since there are no recorders
assert result[0].status == "FAIL"
@mock_config
def test_config_one_recoder_disabled(self):
# Create Config Mocked Resources
config_client = client("config", region_name=AWS_REGION)
# Create Config Recorder
config_client.put_configuration_recorder(
ConfigurationRecorder={"name": "default", "roleARN": "somearn"}
)
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.config.config_service import Config
current_audit_info.audited_partition = "aws"
with mock.patch(
"providers.aws.services.config.config_recorder_all_regions_enabled.config_recorder_all_regions_enabled.config_client",
new=Config(current_audit_info),
):
# Test Check
from providers.aws.services.config.config_recorder_all_regions_enabled.config_recorder_all_regions_enabled import (
config_recorder_all_regions_enabled,
)
check = config_recorder_all_regions_enabled()
result = check.execute()
assert len(result) == 23
# Search for the recorder just created
for recorder in result:
if recorder.resource_id:
assert recorder.status == "FAIL"
assert (
recorder.status_extended
== f"AWS Config recorder default is disabled."
)
assert recorder.resource_id == "default"
@mock_config
def test_config_one_recoder_enabled(self):
# Create Config Mocked Resources
config_client = client("config", region_name=AWS_REGION)
# Create Config Recorder and start it
config_client.put_configuration_recorder(
ConfigurationRecorder={"name": "default", "roleARN": "somearn"}
)
# Make the delivery channel
config_client.put_delivery_channel(
DeliveryChannel={"name": "testchannel", "s3BucketName": "somebucket"}
)
config_client.start_configuration_recorder(ConfigurationRecorderName="default")
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.config.config_service import Config
current_audit_info.audited_partition = "aws"
with mock.patch(
"providers.aws.services.config.config_recorder_all_regions_enabled.config_recorder_all_regions_enabled.config_client",
new=Config(current_audit_info),
):
# Test Check
from providers.aws.services.config.config_recorder_all_regions_enabled.config_recorder_all_regions_enabled import (
config_recorder_all_regions_enabled,
)
check = config_recorder_all_regions_enabled()
result = check.execute()
assert len(result) == 23
# Search for the recorder just created
for recorder in result:
if recorder.resource_id:
assert recorder.status == "PASS"
assert (
recorder.status_extended
== f"AWS Config recorder default is enabled."
)
assert recorder.resource_id == "default"

View File

@@ -0,0 +1,90 @@
import threading
from dataclasses import dataclass
from lib.logger import logger
from providers.aws.aws_provider import generate_regional_clients
################## Config
class Config:
def __init__(self, audit_info):
self.service = "config"
self.session = audit_info.audit_session
self.audited_account = audit_info.audited_account
self.regional_clients = generate_regional_clients(self.service, audit_info)
self.recorders = []
self.__threading_call__(self.__describe_configuration_recorder_status__)
def __get_session__(self):
return self.session
def __threading_call__(self, call):
threads = []
for regional_client in self.regional_clients.values():
threads.append(threading.Thread(target=call, args=(regional_client,)))
for t in threads:
t.start()
for t in threads:
t.join()
def __describe_configuration_recorder_status__(self, regional_client):
logger.info("Config - Listing Recorders...")
try:
recorders = regional_client.describe_configuration_recorder_status()[
"ConfigurationRecordersStatus"
]
if recorders:
for recorder in recorders:
if "lastStatus" in recorder:
self.recorders.append(
Recorder(
recorder["name"],
recorder["recording"],
recorder["lastStatus"],
regional_client.region,
)
)
else:
self.recorders.append(
Recorder(
recorder["name"],
recorder["recording"],
None,
regional_client.region,
)
)
# No config recorders in region
else:
self.recorders.append(
Recorder(
None,
None,
None,
regional_client.region,
)
)
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
@dataclass
class Recorder:
name: str
recording: bool
last_status: str
region: str
def __init__(
self,
name,
recording,
last_status,
region,
):
self.name = name
self.recording = recording
self.last_status = last_status
self.region = region

View File

@@ -0,0 +1,89 @@
from boto3 import client, session
from moto import mock_config
from providers.aws.lib.audit_info.models import AWS_Audit_Info
from providers.aws.services.config.config_service import Config
AWS_ACCOUNT_NUMBER = 123456789012
AWS_REGION = "us-east-1"
class Test_Config_Service:
# Mocked Audit Info
def set_mocked_audit_info(self):
audit_info = AWS_Audit_Info(
original_session=None,
audit_session=session.Session(
profile_name=None,
botocore_session=None,
),
audited_account=AWS_ACCOUNT_NUMBER,
audited_user_id=None,
audited_partition="aws",
audited_identity_arn=None,
profile=None,
profile_region=None,
credentials=None,
assumed_role_info=None,
audited_regions=None,
organizations_metadata=None,
)
return audit_info
# Test Config Service
@mock_config
def test_service(self):
# Config client for this test class
audit_info = self.set_mocked_audit_info()
config = Config(audit_info)
assert config.service == "config"
# Test Config Client
@mock_config
def test_client(self):
# Config client for this test class
audit_info = self.set_mocked_audit_info()
config = Config(audit_info)
for client in config.regional_clients.values():
assert client.__class__.__name__ == "ConfigService"
# Test Config Session
@mock_config
def test__get_session__(self):
# Config client for this test class
audit_info = self.set_mocked_audit_info()
config = Config(audit_info)
assert config.session.__class__.__name__ == "Session"
# Test Config Session
@mock_config
def test_audited_account(self):
# Config client for this test class
audit_info = self.set_mocked_audit_info()
config = Config(audit_info)
assert config.audited_account == AWS_ACCOUNT_NUMBER
# Test Config Get Rest APIs
@mock_config
def test__describe_configuration_recorder_status__(self):
# Generate Config Client
config_client = client("config", region_name=AWS_REGION)
# Create Config Recorder and start it
config_client.put_configuration_recorder(
ConfigurationRecorder={"name": "default", "roleARN": "somearn"}
)
# Make the delivery channel
config_client.put_delivery_channel(
DeliveryChannel={"name": "testchannel", "s3BucketName": "somebucket"}
)
config_client.start_configuration_recorder(ConfigurationRecorderName="default")
# Config client for this test class
audit_info = self.set_mocked_audit_info()
config = Config(audit_info)
# One recorder per region
assert len(config.recorders) == 23
# Check the active one
# Search for the recorder just created
for recorder in config.recorders:
if recorder.name == "default":
assert recorder.recording == True

View File

@@ -1,54 +0,0 @@
#!/usr/bin/env bash
# Prowler - the handy cloud security tool (copyright 2019) by Toni de la Fuente
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed
# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
CHECK_ID_check119="1.19"
CHECK_TITLE_check119="[check119] Ensure IAM instance roles are used for AWS resource access from instances"
CHECK_SCORED_check119="NOT_SCORED"
CHECK_CIS_LEVEL_check119="LEVEL2"
CHECK_SEVERITY_check119="Medium"
CHECK_ASFF_TYPE_check119="Software and Configuration Checks/Industry and Regulatory Standards/CIS AWS Foundations Benchmark"
CHECK_ASFF_RESOURCE_TYPE_check119="AwsEc2Instance"
CHECK_ALTERNATE_check119="check119"
CHECK_SERVICENAME_check119="ec2"
CHECK_RISK_check119='AWS access from within AWS instances can be done by either encoding AWS keys into AWS API calls or by assigning the instance to a role which has an appropriate permissions policy for the required access. AWS IAM roles reduce the risks associated with sharing and rotating credentials that can be used outside of AWS itself. If credentials are compromised; they can be used from outside of the AWS account.'
CHECK_REMEDIATION_check119='IAM roles can only be associated at the launch of an instance. To remediate an instance to add it to a role you must create or re-launch a new instance. (Check for external dependencies on its current private ip or public addresses).'
CHECK_DOC_check119='http://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_use_switch-role-ec2.html'
CHECK_CAF_EPIC_check119='IAM'
check119(){
for regx in $REGIONS; do
EC2_DATA=$($AWSCLI ec2 describe-instances $PROFILE_OPT --region $regx --query 'Reservations[].Instances[].[InstanceId, IamInstanceProfile.Arn, State.Name]' --output json 2>&1)
if [[ $(echo "$EC2_DATA" | grep UnauthorizedOperation) ]]; then
textInfo "$regx: Unauthorized Operation error trying to describe instances" "$regx"
continue
else
EC2_DATA=$(echo $EC2_DATA | jq '.[]|{InstanceId: .[0], ProfileArn: .[1], StateName: .[2]}')
INSTANCE_LIST=$(echo $EC2_DATA | jq -r '.InstanceId')
fi
if [[ $INSTANCE_LIST ]]; then
for instance in $INSTANCE_LIST; do
STATE_NAME=$(echo $EC2_DATA | jq -r --arg i "$instance" 'select(.InstanceId==$i)|.StateName')
if [[ $STATE_NAME != "terminated" && $STATE_NAME != "shutting-down" ]]; then
PROFILEARN=$(echo $EC2_DATA | jq -r --arg i "$instance" 'select(.InstanceId==$i)|.ProfileArn')
if [[ $PROFILEARN == "null" ]]; then
textFail "$regx: Instance $instance not associated with an instance role" "$regx" "$instance"
else
textPass "$regx: Instance $instance associated with role ${PROFILEARN##*/}" "$regx" "$instance"
fi
fi
done
else
textInfo "$regx: No EC2 instances found" "$regx" "$instance"
fi
done
}

View File

@@ -1,46 +0,0 @@
#!/usr/bin/env bash
# Prowler - the handy cloud security tool (copyright 2019) by Toni de la Fuente
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed
# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
CHECK_ID_check43="4.3"
CHECK_TITLE_check43="[check43] Ensure the default security group of every VPC restricts all traffic"
CHECK_SCORED_check43="SCORED"
CHECK_CIS_LEVEL_check43="LEVEL2"
CHECK_SEVERITY_check43="High"
CHECK_ASFF_TYPE_check43="Software and Configuration Checks/Industry and Regulatory Standards/CIS AWS Foundations Benchmark"
CHECK_ASFF_RESOURCE_TYPE_check43="AwsEc2SecurityGroup"
CHECK_ALTERNATE_check403="check43"
CHECK_ASFF_COMPLIANCE_TYPE_check43="ens-mp.com.4.aws.sg.1"
CHECK_SERVICENAME_check43="ec2"
CHECK_RISK_check43='Even having a perimeter firewall; having security groups open allows any user or malware with vpc access to scan for well known and sensitive ports and gain access to instance.'
CHECK_REMEDIATION_check43='Apply Zero Trust approach. Implement a process to scan and remediate unrestricted or overly permissive security groups. Recommended best practices is to narrow the definition for the minimum ports required.'
CHECK_DOC_check43='https://docs.aws.amazon.com/eks/latest/userguide/sec-group-reqs.html'
CHECK_CAF_EPIC_check43='Infrastructure Security'
check43(){
# "Ensure the default security group of every VPC restricts all traffic (Scored)"
for regx in $REGIONS; do
CHECK_SGDEFAULT_IDS=$($AWSCLI ec2 describe-security-groups $PROFILE_OPT --region $regx --filters Name=group-name,Values='default' --query 'SecurityGroups[*].GroupId[]' --output text 2>&1)
if [[ $(echo "$CHECK_SGDEFAULT_IDS" | grep -E 'AccessDenied|UnauthorizedOperation') ]]; then
textInfo "$regx: Access Denied trying to describe security groups" "$regx"
continue
fi
for CHECK_SGDEFAULT_ID in $CHECK_SGDEFAULT_IDS; do
CHECK_SGDEFAULT_ID_OPEN=$($AWSCLI ec2 describe-security-groups $PROFILE_OPT --region $regx --group-ids $CHECK_SGDEFAULT_ID --query 'SecurityGroups[*].{IpPermissions:IpPermissions,IpPermissionsEgress:IpPermissionsEgress,GroupId:GroupId}' --output text |egrep '\s0.0.0.0|\:\:\/0')
if [[ $CHECK_SGDEFAULT_ID_OPEN ]];then
textFail "$regx: Default Security Groups ($CHECK_SGDEFAULT_ID) found that allow 0.0.0.0 IN or OUT traffic" "$regx" "$CHECK_SGDEFAULT_ID"
else
textPass "$regx: No Default Security Groups ($CHECK_SGDEFAULT_ID) open to 0.0.0.0 found" "$regx" "$CHECK_SGDEFAULT_ID"
fi
done
done
}

View File

@@ -4,11 +4,8 @@ from boto3 import client
from moto import mock_ec2
from config.config import get_config_var
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.ec2.ec2_service import EC2
EXAMPLE_AMI_ID = "ami-12c6146b"
current_audit_info.audited_partition = "aws"
shodan_api_key = get_config_var("shodan_api_key")
@@ -22,6 +19,11 @@ class Test_ec2_elastic_ip_shodan:
# Create EC2 Instance
ec2_client.run_instances(ImageId=EXAMPLE_AMI_ID, MinCount=1, MaxCount=1)
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.ec2.ec2_service import EC2
current_audit_info.audited_partition = "aws"
with mock.patch(
"providers.aws.services.ec2.ec2_elastic_ip_shodan.ec2_elastic_ip_shodan.ec2_client",
new=EC2(current_audit_info),
@@ -43,6 +45,11 @@ class Test_ec2_elastic_ip_shodan:
# Create EC2 Instance
ec2_client.allocate_address(Domain="vpc")
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.ec2.ec2_service import EC2
current_audit_info.audited_partition = "aws"
with mock.patch(
"providers.aws.services.ec2.ec2_elastic_ip_shodan.ec2_elastic_ip_shodan.ec2_client",
new=EC2(current_audit_info),
@@ -71,6 +78,11 @@ class Test_ec2_elastic_ip_shodan:
InstanceId=instance["Instances"][0]["InstanceId"],
)
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.ec2.ec2_service import EC2
current_audit_info.audited_partition = "aws"
with mock.patch(
"providers.aws.services.ec2.ec2_elastic_ip_shodan.ec2_elastic_ip_shodan.ec2_client",
new=EC2(current_audit_info),

View File

@@ -0,0 +1,35 @@
{
"Provider": "aws",
"CheckID": "ec2_securitygroup_default_restrict_traffic",
"CheckTitle": "Ensure the default security group of every VPC restricts all traffic.",
"CheckType": ["Infrastructure Security"],
"ServiceName": "ec2",
"SubServiceName": "securitygroup",
"ResourceIdTemplate": "arn:partition:service:region:account-id:resource-id",
"Severity": "high",
"ResourceType": "AwsEc2SecurityGroup",
"Description": "Ensure the default security group of every VPC restricts all traffic.",
"Risk": "Even having a perimeter firewall, having security groups open allows any user or malware with vpc access to scan for well known and sensitive ports and gain access to instance.",
"RelatedUrl": "",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "https://docs.bridgecrew.io/docs/networking_4#aws-console",
"Terraform": "https://docs.bridgecrew.io/docs/networking_4#terraform"
},
"Recommendation": {
"Text": "Apply Zero Trust approach. Implement a process to scan and remediate unrestricted or overly permissive security groups. Recommended best practices is to narrow the definition for the minimum ports required.",
"Url": "https://docs.aws.amazon.com/eks/latest/userguide/sec-group-reqs.html"
}
},
"Categories": [],
"Tags": {
"Tag1Key": "value",
"Tag2Key": "value"
},
"DependsOn": [],
"RelatedTo": [],
"Notes": "",
"Compliance": []
}

View File

@@ -0,0 +1,27 @@
from lib.check.models import Check, Check_Report
from providers.aws.services.ec2.ec2_client import ec2_client
from providers.aws.services.ec2.lib.security_groups import check_security_group
class ec2_securitygroup_default_restrict_traffic(Check):
def execute(self):
findings = []
for security_group in ec2_client.security_groups:
public = False
report = Check_Report(self.metadata)
report.region = security_group.region
# Find default security group
if security_group.name == "default":
for ingress_rule in security_group.ingress_rules:
public = check_security_group(ingress_rule, "-1")
if public:
report.status = "FAIL"
report.status_extended = f"Default Security Group ({security_group.id}) is open to the Internet."
report.resource_id = security_group.id
else:
report.status = "PASS"
report.status_extended = f"Default Security Group ({security_group.id}) is not open to the Internet."
report.resource_id = security_group.id
findings.append(report)
return findings

View File

@@ -0,0 +1,120 @@
from unittest import mock
from boto3 import client
from moto import mock_ec2
AWS_REGION = "us-east-1"
class Test_ec2_securitygroup_default_restrict_traffic:
@mock_ec2
def test_ec2_default_sgs(self):
# Create EC2 Mocked Resources
ec2_client = client("ec2", region_name=AWS_REGION)
ec2_client.create_vpc(CidrBlock="10.0.0.0/16")
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.ec2.ec2_service import EC2
current_audit_info.audited_partition = "aws"
with mock.patch(
"providers.aws.services.ec2.ec2_securitygroup_default_restrict_traffic.ec2_securitygroup_default_restrict_traffic.ec2_client",
new=EC2(current_audit_info),
):
# Test Check
from providers.aws.services.ec2.ec2_securitygroup_default_restrict_traffic.ec2_securitygroup_default_restrict_traffic import (
ec2_securitygroup_default_restrict_traffic,
)
check = ec2_securitygroup_default_restrict_traffic()
result = check.execute()
# One default sg per region
assert len(result) == 24
# All are compliant by default
assert result[0].status == "PASS"
@mock_ec2
def test_ec2_non_compliant_default_sg(self):
# Create EC2 Mocked Resources
ec2_client = client("ec2", region_name=AWS_REGION)
ec2_client.create_vpc(CidrBlock="10.0.0.0/16")
default_sg_id = ec2_client.describe_security_groups(GroupNames=["default"])[
"SecurityGroups"
][0]["GroupId"]
ec2_client.authorize_security_group_ingress(
GroupId=default_sg_id,
IpPermissions=[{"IpProtocol": "-1", "IpRanges": [{"CidrIp": "0.0.0.0/0"}]}],
)
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.ec2.ec2_service import EC2
current_audit_info.audited_partition = "aws"
with mock.patch(
"providers.aws.services.ec2.ec2_securitygroup_default_restrict_traffic.ec2_securitygroup_default_restrict_traffic.ec2_client",
new=EC2(current_audit_info),
):
# Test Check
from providers.aws.services.ec2.ec2_securitygroup_default_restrict_traffic.ec2_securitygroup_default_restrict_traffic import (
ec2_securitygroup_default_restrict_traffic,
)
check = ec2_securitygroup_default_restrict_traffic()
result = check.execute()
# One default sg per region
assert len(result) == 24
# Search changed sg
for sg in result:
if sg.resource_id == default_sg_id:
assert sg.status == "FAIL"
assert (
sg.status_extended
== f"Default Security Group ({default_sg_id}) is open to the Internet."
)
@mock_ec2
def test_ec2_compliant_default_sg(self):
# Create EC2 Mocked Resources
ec2_client = client("ec2", region_name=AWS_REGION)
ec2_client.create_vpc(CidrBlock="10.0.0.0/16")
default_sg_id = ec2_client.describe_security_groups(GroupNames=["default"])[
"SecurityGroups"
][0]["GroupId"]
ec2_client.authorize_security_group_ingress(
GroupId=default_sg_id,
IpPermissions=[
{"IpProtocol": "-1", "IpRanges": [{"CidrIp": "10.11.12.13/32"}]}
],
)
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.ec2.ec2_service import EC2
current_audit_info.audited_partition = "aws"
with mock.patch(
"providers.aws.services.ec2.ec2_securitygroup_default_restrict_traffic.ec2_securitygroup_default_restrict_traffic.ec2_client",
new=EC2(current_audit_info),
):
# Test Check
from providers.aws.services.ec2.ec2_securitygroup_default_restrict_traffic.ec2_securitygroup_default_restrict_traffic import (
ec2_securitygroup_default_restrict_traffic,
)
check = ec2_securitygroup_default_restrict_traffic()
result = check.execute()
# One default sg per region
assert len(result) == 24
# Search changed sg
for sg in result:
if sg.resource_id == default_sg_id:
assert sg.status == "PASS"
assert (
sg.status_extended
== f"Default Security Group ({default_sg_id}) is not open to the Internet."
)

View File

@@ -122,21 +122,3 @@ class Test_kms_cmk_are_used:
)
assert result[0].resource_id == key["KeyId"]
assert result[0].resource_arn == key["Arn"]
@mock_kms
def test_bad_response(self):
mock_client = mock.MagicMock()
with mock.patch(
"providers.aws.services.kms.kms_cmk_are_used.kms_cmk_are_used.kms_client",
new=mock_client,
):
# Test Check
from providers.aws.services.kms.kms_cmk_are_used.kms_cmk_are_used import (
kms_cmk_are_used,
)
check = kms_cmk_are_used()
result = check.execute()
assert len(result) == 0

View File

@@ -92,21 +92,3 @@ class Test_kms_cmk_rotation_enabled:
)
assert result[0].resource_id == key["KeyId"]
assert result[0].resource_arn == key["Arn"]
@mock_kms
def test_bad_response(self):
mock_client = mock.MagicMock()
with mock.patch(
"providers.aws.services.kms.kms_cmk_rotation_enabled.kms_cmk_rotation_enabled.kms_client",
new=mock_client,
):
# Test Check
from providers.aws.services.kms.kms_cmk_rotation_enabled.kms_cmk_rotation_enabled import (
kms_cmk_rotation_enabled,
)
check = kms_cmk_rotation_enabled()
result = check.execute()
assert len(result) == 0

View File

@@ -108,21 +108,3 @@ class Test_kms_key_not_publicly_accessible:
)
assert result[0].resource_id == key["KeyId"]
assert result[0].resource_arn == key["Arn"]
@mock_kms
def test_bad_response(self):
mock_client = mock.MagicMock()
with mock.patch(
"providers.aws.services.kms.kms_key_not_publicly_accessible.kms_key_not_publicly_accessible.kms_client",
new=mock_client,
):
# Test Check
from providers.aws.services.kms.kms_key_not_publicly_accessible.kms_key_not_publicly_accessible import (
kms_key_not_publicly_accessible,
)
check = kms_key_not_publicly_accessible()
result = check.execute()
assert len(result) == 0

View File

@@ -1,93 +0,0 @@
#!/usr/bin/env bash
# Prowler - the handy cloud security tool (copyright 2019) by Toni de la Fuente
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed
# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
CHECK_ID_check26="2.6"
CHECK_TITLE_check26="[check26] Ensure S3 bucket access logging is enabled on the CloudTrail S3 bucket"
CHECK_SCORED_check26="SCORED"
CHECK_CIS_LEVEL_check26="LEVEL1"
CHECK_SEVERITY_check26="Medium"
CHECK_ASFF_TYPE_check26="Software and Configuration Checks/Industry and Regulatory Standards/CIS AWS Foundations Benchmark"
CHECK_ASFF_RESOURCE_TYPE_check26="AwsS3Bucket"
CHECK_ALTERNATE_check206="check26"
CHECK_SERVICENAME_check26="s3"
CHECK_RISK_check26='Server access logs can assist you in security and access audits; help you learn about your customer base; and understand your Amazon S3 bill.'
CHECK_REMEDIATION_check26='Ensure that S3 buckets have Logging enabled. CloudTrail data events can be used in place of S3 bucket logging. If that is the case; this finding can be considered a false positive.'
CHECK_DOC_check26='https://docs.aws.amazon.com/AmazonS3/latest/dev/security-best-practices.html'
CHECK_CAF_EPIC_check26='Logging and Monitoring'
check26(){
trail_count=0
# "Ensure S3 bucket access logging is enabled on the CloudTrail S3 bucket (Scored)"
for regx in $REGIONS; do
TRAILS_AND_REGIONS=$($AWSCLI cloudtrail describe-trails $PROFILE_OPT --region $regx --query 'trailList[*].{Name:TrailARN, HomeRegion:HomeRegion}' --output text 2>&1 | tr " " ',')
if [[ $(echo "$TRAILS_AND_REGIONS" | grep AccessDenied) ]]; then
textInfo "$regx: Access Denied trying to describe trails" "$regx" "$trail"
continue
fi
if [[ $TRAILS_AND_REGIONS ]]; then
for reg_trail in $TRAILS_AND_REGIONS; do
TRAIL_REGION=$(echo $reg_trail | cut -d',' -f1)
if [ $TRAIL_REGION != $regx ]; then # Only report trails once in home region
continue
fi
trail=$(echo $reg_trail | cut -d',' -f2)
trail_count=$((trail_count + 1))
CLOUDTRAILBUCKET=$($AWSCLI cloudtrail describe-trails $PROFILE_OPT --region $TRAIL_REGION --query 'trailList[*].[S3BucketName]' --output text --trail-name-list $trail)
if [[ -z $CLOUDTRAILBUCKET ]]; then
textFail "$regx: Trail $trail does not publish to S3" "$TRAIL_REGION" "$trail"
continue
fi
CLOUDTRAIL_ACCOUNT_ID=$(echo $trail | awk -F: '{ print $5 }')
if [ "$CLOUDTRAIL_ACCOUNT_ID" != "$ACCOUNT_NUM" ]; then
textInfo "$regx: Trail $trail S3 logging bucket $CLOUDTRAILBUCKET is not in current account" "$TRAIL_REGION" "$trail"
continue
fi
#
# LOCATION - requests referencing buckets created after March 20, 2019
# must be made to S3 endpoints in the same region as the bucket was
# created.
#
BUCKET_LOCATION=$($AWSCLI s3api get-bucket-location $PROFILE_OPT --region $regx --bucket $CLOUDTRAILBUCKET --output text 2>&1)
if [[ $(echo "$BUCKET_LOCATION" | grep AccessDenied) ]]; then
textInfo "$regx: Trail $trail Access Denied getting bucket location for $CLOUDTRAILBUCKET" "$TRAIL_REGION" "$trail"
continue
fi
if [[ $BUCKET_LOCATION == "None" ]]; then
BUCKET_LOCATION="us-east-1"
fi
if [[ $BUCKET_LOCATION == "EU" ]]; then
BUCKET_LOCATION="eu-west-1"
fi
CLOUDTRAILBUCKET_LOGENABLED=$($AWSCLI s3api get-bucket-logging --bucket $CLOUDTRAILBUCKET $PROFILE_OPT --region $BUCKET_LOCATION --query 'LoggingEnabled.TargetBucket' --output text 2>&1)
if [[ $(echo "$CLOUDTRAILBUCKET_LOGENABLED" | grep AccessDenied) ]]; then
textInfo "$regx: Trail $trail Access Denied getting bucket logging for $CLOUDTRAILBUCKET" "$TRAIL_REGION" "$trail"
continue
fi
if [[ $CLOUDTRAILBUCKET_LOGENABLED != "None" ]]; then
textPass "$regx: Trail $trail S3 bucket access logging is enabled for $CLOUDTRAILBUCKET" "$TRAIL_REGION" "$trail"
else
textFail "$regx: Trail $trail S3 bucket access logging is not enabled for $CLOUDTRAILBUCKET" "$TRAIL_REGION" "$trail"
fi
done
fi
done
if [[ $trail_count == 0 ]]; then
textFail "$REGION: No CloudTrail trails were found in the account" "$REGION" "$trail"
fi
}

View File

@@ -252,21 +252,3 @@ class Test_vpc_endpoint_connections_trust_boundaries:
== vpc_endpoint["VpcEndpoint"]["VpcEndpointId"]
)
assert result[0].region == AWS_REGION
@mock_ec2
def test_bad_response(self):
mock_client = mock.MagicMock()
with mock.patch(
"providers.aws.services.vpc.vpc_endpoint_connections_trust_boundaries.vpc_endpoint_connections_trust_boundaries.vpc_client",
new=mock_client,
):
# Test Check
from providers.aws.services.vpc.vpc_endpoint_connections_trust_boundaries.vpc_endpoint_connections_trust_boundaries import (
vpc_endpoint_connections_trust_boundaries,
)
check = vpc_endpoint_connections_trust_boundaries()
result = check.execute()
assert len(result) == 0

View File

@@ -108,21 +108,3 @@ class Test_vpc_endpoint_services_allowed_principals_trust_boundaries:
"ServiceId"
]
)
@mock_ec2
def test_bad_response(self):
mock_client = mock.MagicMock()
with mock.patch(
"providers.aws.services.vpc.vpc_endpoint_services_allowed_principals_trust_boundaries.vpc_endpoint_services_allowed_principals_trust_boundaries.vpc_client",
new=mock_client,
):
# Test Check
from providers.aws.services.vpc.vpc_endpoint_services_allowed_principals_trust_boundaries.vpc_endpoint_services_allowed_principals_trust_boundaries import (
vpc_endpoint_services_allowed_principals_trust_boundaries,
)
check = vpc_endpoint_services_allowed_principals_trust_boundaries()
result = check.execute()
assert len(result) == 0

View File

@@ -107,21 +107,3 @@ class Test_vpc_flow_logs_enabled:
== f"VPC {vpc['VpcId']} Flow logs are disabled."
)
assert result.resource_id == vpc["VpcId"]
@mock_ec2
def test_bad_response(self):
mock_client = mock.MagicMock()
with mock.patch(
"providers.aws.services.vpc.vpc_flow_logs_enabled.vpc_flow_logs_enabled.vpc_client",
new=mock_client,
):
# Test Check
from providers.aws.services.vpc.vpc_flow_logs_enabled.vpc_flow_logs_enabled import (
vpc_flow_logs_enabled,
)
check = vpc_flow_logs_enabled()
result = check.execute()
assert len(result) == 0

View File

@@ -152,21 +152,3 @@ class Test_vpc_peering_routing_tables_with_least_privilege:
)
assert result[0].resource_id == vpc_pcx_id
assert result[0].region == AWS_REGION
@mock_ec2
def test_bad_response(self):
mock_client = mock.MagicMock()
with mock.patch(
"providers.aws.services.vpc.vpc_peering_routing_tables_with_least_privilege.vpc_peering_routing_tables_with_least_privilege.vpc_client",
new=mock_client,
):
# Test Check
from providers.aws.services.vpc.vpc_peering_routing_tables_with_least_privilege.vpc_peering_routing_tables_with_least_privilege import (
vpc_peering_routing_tables_with_least_privilege,
)
check = vpc_peering_routing_tables_with_least_privilege()
result = check.execute()
assert len(result) == 0