feat(autoscaling): Add AutoScaling service, check and test (#1426)

This commit is contained in:
Sergio Garcia
2022-10-28 09:33:29 +02:00
committed by GitHub
parent 8487777f96
commit 7e1b0d13c7
13 changed files with 1258 additions and 812 deletions

View File

@@ -18,6 +18,7 @@ coverage = "6.4.1"
pytest = "7.1.2"
pytest-xdist = "2.5.0"
shodan = "1.28.0"
detect-secrets = "1.4.0"
[dev-packages]

1489
Pipfile.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,4 @@
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.autoscaling.autoscaling_service import AutoScaling
autoscaling_client = AutoScaling(current_audit_info)

View File

@@ -0,0 +1,36 @@
{
"Provider": "aws",
"CheckID": "autoscaling_find_secrets_ec2_launch_configuration",
"CheckTitle": "Find secrets in EC2 Auto Scaling Launch Configuration",
"CheckType": ["IAM"],
"ServiceName": "autoscaling",
"SubServiceName": "",
"ResourceIdTemplate": "arn:partition:access-analyzer:region:account-id:analyzer/resource-id",
"Severity": "critical",
"ResourceType": "Other",
"Description": "Find secrets in EC2 Auto Scaling Launch Configuration",
"Risk": "The use of a hard-coded password increases the possibility of password guessing. If hard-coded passwords are used, it is possible that malicious users gain access through the account in question.",
"RelatedUrl": "",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Do not include sensitive information in user data within the launch configuration, try to use Secrets Manager instead.",
"Url": "https://docs.aws.amazon.com/secretsmanager/latest/userguide/intro.html"
}
},
"Categories": [],
"Tags": {
"Tag1Key": "value",
"Tag2Key": "value"
},
"DependsOn": [],
"RelatedTo": [],
"Notes": "",
"Compliance": [
]
}

View File

@@ -0,0 +1,47 @@
import os
import tempfile
from base64 import b64decode
from detect_secrets import SecretsCollection
from detect_secrets.settings import default_settings
from lib.check.models import Check, Check_Report
from providers.aws.services.autoscaling.autoscaling_client import autoscaling_client
class autoscaling_find_secrets_ec2_launch_configuration(Check):
def execute(self):
findings = []
for configuration in autoscaling_client.launch_configurations:
report = Check_Report(self.metadata)
report.region = configuration.region
report.resource_id = configuration.name
report.resource_arn = configuration.arn
if configuration.user_data:
temp_user_data_file = tempfile.NamedTemporaryFile(delete=False)
user_data = b64decode(configuration.user_data).decode("utf-8")
temp_user_data_file.write(
bytes(user_data, encoding="raw_unicode_escape")
)
temp_user_data_file.close()
secrets = SecretsCollection()
with default_settings():
secrets.scan_file(temp_user_data_file.name)
if secrets.json():
report.status = "FAIL"
report.status_extended = f"Potential secret found in autoscaling {configuration.name} User Data."
else:
report.status = "PASS"
report.status_extended = f"No secrets found in autoscaling {configuration.name} User Data."
os.remove(temp_user_data_file.name)
else:
report.status = "PASS"
report.status_extended = f"No secrets found in autoscaling {configuration.name} since User Data is empty."
findings.append(report)
return findings

View File

@@ -0,0 +1,178 @@
from unittest import mock
from boto3 import client
from moto import mock_autoscaling
AWS_REGION = "us-east-1"
class Test_autoscaling_find_secrets_ec2_launch_configuration:
@mock_autoscaling
def test_no_autoscaling(self):
autoscaling_client = client("autoscaling", region_name=AWS_REGION)
autoscaling_client.launch_configurations = []
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.autoscaling.autoscaling_service import AutoScaling
current_audit_info.audited_partition = "aws"
with mock.patch(
"providers.aws.services.autoscaling.autoscaling_find_secrets_ec2_launch_configuration.autoscaling_find_secrets_ec2_launch_configuration.autoscaling_client",
new=AutoScaling(current_audit_info),
):
# Test Check
from providers.aws.services.autoscaling.autoscaling_find_secrets_ec2_launch_configuration.autoscaling_find_secrets_ec2_launch_configuration import (
autoscaling_find_secrets_ec2_launch_configuration,
)
check = autoscaling_find_secrets_ec2_launch_configuration()
result = check.execute()
assert len(result) == 0
@mock_autoscaling
def test_one_autoscaling_with_no_secrets(self):
# Include launch_configurations to check
autoscaling_client = client("autoscaling", region_name=AWS_REGION)
autoscaling_client.create_launch_configuration(
LaunchConfigurationName="tester",
ImageId="ami-12c6146b",
InstanceType="t1.micro",
KeyName="the_keys",
SecurityGroups=["default", "default2"],
UserData="This is some user_data",
)
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.autoscaling.autoscaling_service import AutoScaling
current_audit_info.audited_partition = "aws"
with mock.patch(
"providers.aws.services.autoscaling.autoscaling_find_secrets_ec2_launch_configuration.autoscaling_find_secrets_ec2_launch_configuration.autoscaling_client",
new=AutoScaling(current_audit_info),
):
from providers.aws.services.autoscaling.autoscaling_find_secrets_ec2_launch_configuration.autoscaling_find_secrets_ec2_launch_configuration import (
autoscaling_find_secrets_ec2_launch_configuration,
)
check = autoscaling_find_secrets_ec2_launch_configuration()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "No secrets found in autoscaling tester User Data."
)
assert result[0].resource_id == "tester"
@mock_autoscaling
def test_one_autoscaling_with_secrets(self):
# Include launch_configurations to check
autoscaling_client = client("autoscaling", region_name=AWS_REGION)
autoscaling_client.create_launch_configuration(
LaunchConfigurationName="tester",
ImageId="ami-12c6146b",
InstanceType="t1.micro",
KeyName="the_keys",
SecurityGroups=["default", "default2"],
UserData="DB_PASSWORD=foobar123",
)
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.autoscaling.autoscaling_service import AutoScaling
with mock.patch(
"providers.aws.services.autoscaling.autoscaling_find_secrets_ec2_launch_configuration.autoscaling_find_secrets_ec2_launch_configuration.autoscaling_client",
new=AutoScaling(current_audit_info),
):
from providers.aws.services.autoscaling.autoscaling_find_secrets_ec2_launch_configuration.autoscaling_find_secrets_ec2_launch_configuration import (
autoscaling_find_secrets_ec2_launch_configuration,
)
check = autoscaling_find_secrets_ec2_launch_configuration()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== "Potential secret found in autoscaling tester User Data."
)
assert result[0].resource_id == "tester"
@mock_autoscaling
def test_one_autoscaling_file_with_secrets(self):
# Include launch_configurations to check
f = open(
"providers/aws/services/autoscaling/autoscaling_find_secrets_ec2_launch_configuration/fixtures/fixture",
"r",
)
secrets = f.read()
autoscaling_client = client("autoscaling", region_name=AWS_REGION)
autoscaling_client.create_launch_configuration(
LaunchConfigurationName="tester",
ImageId="ami-12c6146b",
InstanceType="t1.micro",
KeyName="the_keys",
SecurityGroups=["default", "default2"],
UserData=secrets,
)
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.autoscaling.autoscaling_service import AutoScaling
with mock.patch(
"providers.aws.services.autoscaling.autoscaling_find_secrets_ec2_launch_configuration.autoscaling_find_secrets_ec2_launch_configuration.autoscaling_client",
new=AutoScaling(current_audit_info),
):
from providers.aws.services.autoscaling.autoscaling_find_secrets_ec2_launch_configuration.autoscaling_find_secrets_ec2_launch_configuration import (
autoscaling_find_secrets_ec2_launch_configuration,
)
check = autoscaling_find_secrets_ec2_launch_configuration()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== "Potential secret found in autoscaling tester User Data."
)
assert result[0].resource_id == "tester"
@mock_autoscaling
def test_one_launch_configurations_without_user_data(self):
# Include launch_configurations to check
autoscaling_client = client("autoscaling", region_name=AWS_REGION)
autoscaling_client.create_launch_configuration(
LaunchConfigurationName="tester",
ImageId="ami-12c6146b",
InstanceType="t1.micro",
KeyName="the_keys",
SecurityGroups=["default", "default2"],
)
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.autoscaling.autoscaling_service import AutoScaling
with mock.patch(
"providers.aws.services.autoscaling.autoscaling_find_secrets_ec2_launch_configuration.autoscaling_find_secrets_ec2_launch_configuration.autoscaling_client",
new=AutoScaling(current_audit_info),
):
from providers.aws.services.autoscaling.autoscaling_find_secrets_ec2_launch_configuration.autoscaling_find_secrets_ec2_launch_configuration import (
autoscaling_find_secrets_ec2_launch_configuration,
)
check = autoscaling_find_secrets_ec2_launch_configuration()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "No secrets found in autoscaling tester since User Data is empty."
)
assert result[0].resource_id == "tester"

View File

@@ -0,0 +1,4 @@
DB_PASSWORD=foobar123
DB_USER=foo
API_KEY=12345abcd
SERVICE_PASSWORD=bbaabb45

View File

@@ -0,0 +1,74 @@
import threading
from dataclasses import dataclass
from lib.logger import logger
from providers.aws.aws_provider import generate_regional_clients
################## AutoScaling
class AutoScaling:
def __init__(self, audit_info):
self.service = "autoscaling"
self.session = audit_info.audit_session
self.audited_account = audit_info.audited_account
self.regional_clients = generate_regional_clients(self.service, audit_info)
self.launch_configurations = []
self.__threading_call__(self.__describe_launch_configurations__)
def __get_session__(self):
return self.session
def __threading_call__(self, call):
threads = []
for regional_client in self.regional_clients.values():
threads.append(threading.Thread(target=call, args=(regional_client,)))
for t in threads:
t.start()
for t in threads:
t.join()
def __describe_launch_configurations__(self, regional_client):
logger.info("AutoScaling - Describing Launch Configurations...")
try:
describe_launch_configurations_paginator = regional_client.get_paginator(
"describe_launch_configurations"
)
for page in describe_launch_configurations_paginator.paginate():
for configuration in page["LaunchConfigurations"]:
self.launch_configurations.append(
LaunchConfiguration(
configuration["LaunchConfigurationARN"],
configuration["LaunchConfigurationName"],
configuration["UserData"],
configuration["ImageId"],
regional_client.region,
)
)
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
@dataclass
class LaunchConfiguration:
arn: str
name: str
user_data: str
image_id: int
region: str
def __init__(
self,
arn,
name,
user_data,
image_id,
region,
):
self.arn = arn
self.name = name
self.image_id = image_id
self.user_data = user_data
self.region = region

View File

@@ -0,0 +1,100 @@
from base64 import b64decode
from boto3 import client, session
from moto import mock_autoscaling
from providers.aws.lib.audit_info.models import AWS_Audit_Info
from providers.aws.services.autoscaling.autoscaling_service import AutoScaling
AWS_ACCOUNT_NUMBER = 123456789012
AWS_REGION = "us-east-1"
class Test_AutoScaling_Service:
# Mocked Audit Info
def set_mocked_audit_info(self):
audit_info = AWS_Audit_Info(
original_session=None,
audit_session=session.Session(
profile_name=None,
botocore_session=None,
),
audited_account=AWS_ACCOUNT_NUMBER,
audited_user_id=None,
audited_partition="aws",
audited_identity_arn=None,
profile=None,
profile_region=None,
credentials=None,
assumed_role_info=None,
audited_regions=None,
organizations_metadata=None,
)
return audit_info
# Test AutoScaling Service
@mock_autoscaling
def test_service(self):
# AutoScaling client for this test class
audit_info = self.set_mocked_audit_info()
autoscaling = AutoScaling(audit_info)
assert autoscaling.service == "autoscaling"
# Test AutoScaling Client
@mock_autoscaling
def test_client(self):
# AutoScaling client for this test class
audit_info = self.set_mocked_audit_info()
autoscaling = AutoScaling(audit_info)
for client in autoscaling.regional_clients.values():
assert client.__class__.__name__ == "AutoScaling"
# Test AutoScaling Session
@mock_autoscaling
def test__get_session__(self):
# AutoScaling client for this test class
audit_info = self.set_mocked_audit_info()
autoscaling = AutoScaling(audit_info)
assert autoscaling.session.__class__.__name__ == "Session"
# Test AutoScaling Session
@mock_autoscaling
def test_audited_account(self):
# AutoScaling client for this test class
audit_info = self.set_mocked_audit_info()
autoscaling = AutoScaling(audit_info)
assert autoscaling.audited_account == AWS_ACCOUNT_NUMBER
# Test AutoScaling Get APIs
@mock_autoscaling
def test__describe_launch_configurations__(self):
# Generate AutoScaling Client
autoscaling_client = client("autoscaling", region_name=AWS_REGION)
# Create AutoScaling API
autoscaling_client.create_launch_configuration(
LaunchConfigurationName="tester1",
ImageId="ami-12c6146b",
InstanceType="t1.micro",
KeyName="the_keys",
SecurityGroups=["default", "default2"],
UserData="DB_PASSWORD=foobar123",
)
autoscaling_client.create_launch_configuration(
LaunchConfigurationName="tester2",
ImageId="ami-12c6146b",
InstanceType="t1.micro",
KeyName="the_keys",
SecurityGroups=["default", "default2"],
)
# AutoScaling client for this test class
audit_info = self.set_mocked_audit_info()
autoscaling = AutoScaling(audit_info)
assert len(autoscaling.launch_configurations) == 2
assert autoscaling.launch_configurations[0].name == "tester1"
assert (
b64decode(autoscaling.launch_configurations[0].user_data).decode("utf-8")
== "DB_PASSWORD=foobar123"
)
assert autoscaling.launch_configurations[0].image_id == "ami-12c6146b"
assert autoscaling.launch_configurations[1].image_id == "ami-12c6146b"
assert autoscaling.launch_configurations[1].name == "tester2"

View File

@@ -1,82 +0,0 @@
#!/usr/bin/env bash
# Prowler - the handy cloud security tool (copyright 2019) by Toni de la Fuente
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed
# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
CHECK_ID_extra775="7.75"
CHECK_TITLE_extra775="[extra775] Find secrets in EC2 Auto Scaling Launch Configuration "
CHECK_SCORED_extra775="NOT_SCORED"
CHECK_CIS_LEVEL_extra775="EXTRA"
CHECK_SEVERITY_extra775="Critical"
CHECK_ALTERNATE_check775="extra775"
CHECK_SERVICENAME_extra775="autoscaling"
CHECK_RISK_extra775='The use of a hard-coded password increases the possibility of password guessing. If hard-coded passwords are used; it is possible that malicious users gain access through the account in question.'
CHECK_REMEDIATION_extra775='Use Secrets Manager to securely provide database credentials to Lambda functions and secure the databases as well as use the credentials to connect and query them without hardcoding the secrets in code or passing them through environmental variables. '
CHECK_DOC_extra775='https://docs.aws.amazon.com/secretsmanager/latest/userguide/lambda-functions.html'
CHECK_CAF_EPIC_extra775='IAM'
extra775(){
SECRETS_TEMP_FOLDER="$PROWLER_DIR/secrets-$ACCOUNT_NUM-$PROWLER_START_TIME"
if [[ ! -d $SECRETS_TEMP_FOLDER ]]; then
# this folder is deleted once this check is finished
mkdir $SECRETS_TEMP_FOLDER
fi
for regx in $REGIONS; do
CHECK_DETECT_SECRETS_INSTALLATION=$(secretsDetector)
if [[ $? -eq 241 ]]; then
textInfo "$regx: python library detect-secrets not found. Make sure it is installed correctly." "$regx"
else
LIST_OF_EC2_AUTOSCALING=$($AWSCLI autoscaling describe-launch-configurations $PROFILE_OPT --region $regx --query LaunchConfigurations[*].LaunchConfigurationName --output text --max-items $MAXITEMS 2>&1 | grep -v None )
if [[ $(echo "$LIST_OF_EC2_AUTOSCALING" | grep -E 'AccessDenied|UnauthorizedOperation|AuthorizationError') ]]; then
textInfo "$regx: Access Denied trying to describe launch configurations" "$regx"
continue
fi
if [[ $LIST_OF_EC2_AUTOSCALING ]];then
for autoscaling_configuration in $LIST_OF_EC2_AUTOSCALING; do
EC2_AUTOSCALING_USERDATA_FILE="$SECRETS_TEMP_FOLDER/extra775-$autoscaling_configuration-userData.decoded"
EC2_AUTOSCALING_USERDATA=$($AWSCLI autoscaling describe-launch-configurations $PROFILE_OPT --launch-configuration-names $autoscaling_configuration --region $regx --query LaunchConfigurations[*].UserData --output text| grep -v ^None | decode_report > $EC2_AUTOSCALING_USERDATA_FILE)
if [ -s $EC2_AUTOSCALING_USERDATA_FILE ];then
FILE_FORMAT_ASCII=$(file -b $EC2_AUTOSCALING_USERDATA_FILE | grep ASCII)
# This finds ftp or http URLs with credentials and common keywords
# FINDINGS=$(egrep -i '[[:alpha:]]*://[[:alnum:]]*:[[:alnum:]]*@.*/|key|secret|token|pass' $EC2_AUTOSCALING_USERDATA_FILE |wc -l|tr -d '\ ')
# New implementation using https://github.com/Yelp/detect-secrets
if [[ $FILE_FORMAT_ASCII ]]; then
FINDINGS=$(secretsDetector file $EC2_AUTOSCALING_USERDATA_FILE)
if [[ $FINDINGS -eq 0 ]]; then
textPass "$regx: No secrets found in $autoscaling_configuration" "$regx" "$autoscaling_configuration"
# delete file if nothing interesting is there
rm -f $EC2_AUTOSCALING_USERDATA_FILE
else
textFail "$regx: Potential secret found in $autoscaling_configuration" "$regx" "$autoscaling_configuration"
# delete file to not leave trace, user must look at the autoscaling_configuration User Data
rm -f $EC2_AUTOSCALING_USERDATA_FILE
fi
else
mv $EC2_AUTOSCALING_USERDATA_FILE $EC2_AUTOSCALING_USERDATA_FILE.gz ; gunzip $EC2_AUTOSCALING_USERDATA_FILE.gz
FINDINGS=$(secretsDetector file $EC2_AUTOSCALING_USERDATA_FILE)
if [[ $FINDINGS -eq 0 ]]; then
textPass "$regx: No secrets found in $autoscaling_configuration User Data" "$regx" "$autoscaling_configuration"
rm -f $EC2_AUTOSCALING_USERDATA_FILE
else
textFail "$regx: Potential secret found in $autoscaling_configuration" "$regx" "$autoscaling_configuration"
fi
fi
else
textPass "$regx: No secrets found in $autoscaling_configuration User Data or it is empty" "$regx" "$autoscaling_configuration"
fi
done
else
textInfo "$regx: No EC2 autoscaling_configurations found" "$regx"
fi
fi
done
rm -rf $SECRETS_TEMP_FOLDER
}

View File

@@ -0,0 +1,55 @@
#!/usr/bin/env bash
# Prowler - the handy cloud security tool (copyright 2019) by Toni de la Fuente
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed
# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
#
# Remediation:
#
# https://d1.awsstatic.com/whitepapers/compliance/AWS_CIS_Foundations_Benchmark.pdf
#
# aws logs put-metric-filter \
# --region us-east-1 \
# --log-group-name CloudTrail/CloudWatchLogGroup \
# --filter-name S3BucketConfigChanges \
# --filter-pattern '{ ($.eventSource = s3.amazonaws.com) && (($.eventName = PutBucketAcl) || ($.eventName = PutBucketPolicy) || ($.eventName = PutBucketCors) || ($.eventName = PutBucketLifecycle) || ($.eventName = PutBucketReplication) || ($.eventName = DeleteBucketPolicy) || ($.eventName = DeleteBucketCors) || ($.eventName = DeleteBucketLifecycle) || ($.eventName = DeleteBucketReplication)) }' \
# --metric-transformations metricName=S3BucketEventCount,metricNamespace=CloudTrailMetrics,metricValue=1
#
# aws cloudwatch put-metric-alarm \
# --region us-east-1 \
# --alarm-name S3BucketConfigChangesAlarm \
# --alarm-description "Triggered by AWS S3 Bucket config changes." \
# --metric-name S3BucketEventCount \
# --namespace CloudTrailMetrics \
# --statistic Sum \
# --comparison-operator GreaterThanOrEqualToThreshold \
# --evaluation-periods 1 \
# --period 300 \
# --threshold 1 \
# --actions-enabled \
# --alarm-actions arn:aws:sns:us-east-1:123456789012:CloudWatchAlarmTopic
CHECK_ID_check38="3.8"
CHECK_TITLE_check38="[check38] Ensure a log metric filter and alarm exist for S3 bucket policy changes"
CHECK_SCORED_check38="SCORED"
CHECK_CIS_LEVEL_check38="LEVEL1"
CHECK_SEVERITY_check38="Medium"
CHECK_ASFF_TYPE_check38="Software and Configuration Checks/Industry and Regulatory Standards/CIS AWS Foundations Benchmark"
CHECK_ASFF_RESOURCE_TYPE_check38="AwsCloudTrailTrail"
CHECK_ALTERNATE_check308="check38"
CHECK_SERVICENAME_check38="s3"
CHECK_RISK_check38='Monitoring unauthorized API calls will help reveal application errors and may reduce time to detect malicious activity.'
CHECK_REMEDIATION_check38='It is recommended that a metric filter and alarm be established for unauthorized requests.'
CHECK_DOC_check38='https://docs.aws.amazon.com/awscloudtrail/latest/userguide/cloudwatch-alarms-for-cloudtrail.html'
CHECK_CAF_EPIC_check38='Logging and Monitoring'
check38(){
check3x '\$\.eventSource\s*=\s*s3.amazonaws.com.+\$\.eventName\s*=\s*PutBucketAcl.+\$\.eventName\s*=\s*PutBucketPolicy.+\$\.eventName\s*=\s*PutBucketCors.+\$\.eventName\s*=\s*PutBucketLifecycle.+\$\.eventName\s*=\s*PutBucketReplication.+\$\.eventName\s*=\s*DeleteBucketPolicy.+\$\.eventName\s*=\s*DeleteBucketCors.+\$\.eventName\s*=\s*DeleteBucketLifecycle.+\$\.eventName\s*=\s*DeleteBucketReplication'
}