mirror of
https://github.com/ghndrx/prowler.git
synced 2026-02-10 14:55:00 +00:00
feat(): ECS service and checks (#1476)
Co-authored-by: Sergio Garcia <38561120+sergargar@users.noreply.github.com> Co-authored-by: Pepe Fagoaga <pepe@verica.io> Co-authored-by: sergargar <sergio@verica.io>
This commit is contained in:
0
providers/aws/services/ecs/__init__.py
Normal file
0
providers/aws/services/ecs/__init__.py
Normal file
@@ -1,71 +0,0 @@
|
|||||||
#!/usr/bin/env bash
|
|
||||||
|
|
||||||
# Prowler - the handy cloud security tool (copyright 2019) by Toni de la Fuente
|
|
||||||
#
|
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
|
|
||||||
# use this file except in compliance with the License. You may obtain a copy
|
|
||||||
# of the License at http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing, software distributed
|
|
||||||
# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
|
|
||||||
# CONDITIONS OF ANY KIND, either express or implied. See the License for the
|
|
||||||
# specific language governing permissions and limitations under the License.
|
|
||||||
CHECK_ID_extra768="7.68"
|
|
||||||
CHECK_TITLE_extra768="[extra768] Find secrets in ECS task definitions environment variables "
|
|
||||||
CHECK_SCORED_extra768="NOT_SCORED"
|
|
||||||
CHECK_CIS_LEVEL_extra768="EXTRA"
|
|
||||||
CHECK_SEVERITY_extra768="Critical"
|
|
||||||
CHECK_ASFF_RESOURCE_TYPE_extra768="AwsEcsTaskDefinition"
|
|
||||||
CHECK_ALTERNATE_check768="extra768"
|
|
||||||
CHECK_SERVICENAME_extra768="ecs"
|
|
||||||
CHECK_RISK_extra768='The use of a hard-coded password increases the possibility of password guessing. If hard-coded passwords are used; it is possible that malicious users gain access through the account in question.'
|
|
||||||
CHECK_REMEDIATION_extra768='Use Secrets Manager or Parameter Store to securely provide credentials to containers without hardcoding the secrets in code or passing them through environment variables.'
|
|
||||||
CHECK_DOC_extra768='https://docs.aws.amazon.com/AmazonECS/latest/developerguide/specifying-sensitive-data.html'
|
|
||||||
CHECK_CAF_EPIC_extra768='Logging and Monitoring'
|
|
||||||
|
|
||||||
extra768(){
|
|
||||||
SECRETS_TEMP_FOLDER="$PROWLER_DIR/secrets-$ACCOUNT_NUM-$PROWLER_START_TIME"
|
|
||||||
if [[ ! -d $SECRETS_TEMP_FOLDER ]]; then
|
|
||||||
mkdir $SECRETS_TEMP_FOLDER
|
|
||||||
fi
|
|
||||||
for regx in $REGIONS; do
|
|
||||||
CHECK_DETECT_SECRETS_INSTALLATION=$(secretsDetector)
|
|
||||||
if [[ $? -eq 241 ]]; then
|
|
||||||
textInfo "$regx: python library detect-secrets not found. Make sure it is installed correctly." "$regx"
|
|
||||||
else
|
|
||||||
# Get a list of all task definition families first:
|
|
||||||
FAMILIES=$($AWSCLI ecs list-task-definition-families $PROFILE_OPT --region $regx --status ACTIVE 2>&1 )
|
|
||||||
if [[ $(echo "$FAMILIES" | grep -E 'AccessDenied|UnauthorizedOperation|AuthorizationError') ]]; then
|
|
||||||
textInfo "$regx: Access Denied trying to list task definition families" "$regx"
|
|
||||||
continue
|
|
||||||
fi
|
|
||||||
if [[ $(echo $FAMILIES | jq -r .families[]) ]]; then
|
|
||||||
for FAMILY in $(echo $FAMILIES | jq -r .families[]);do
|
|
||||||
# Get the full task definition arn:
|
|
||||||
TASK_DEFINITION_TEMP=$($AWSCLI ecs list-task-definitions $PROFILE_OPT --region $regx --family-prefix $FAMILY --sort DESC --max-items 1 | jq -r .taskDefinitionArns[0])
|
|
||||||
# We only care about the task definition name:
|
|
||||||
IFS='/' read -r -a splitArn <<< "$TASK_DEFINITION_TEMP"
|
|
||||||
TASK_DEFINITION=${splitArn[1]}
|
|
||||||
TASK_DEFINITION_ENV_VARIABLES_FILE="$SECRETS_TEMP_FOLDER/extra768-$TASK_DEFINITION-$regx-variables.txt"
|
|
||||||
TASK_DEFINITION_ENV_VARIABLES=$($AWSCLI ecs $PROFILE_OPT --region $regx describe-task-definition --task-definition $TASK_DEFINITION --query 'taskDefinition.containerDefinitions[*].environment' --output text > $TASK_DEFINITION_ENV_VARIABLES_FILE)
|
|
||||||
if [ -s $TASK_DEFINITION_ENV_VARIABLES_FILE ];then
|
|
||||||
# Implementation using https://github.com/Yelp/detect-secrets
|
|
||||||
FINDINGS=$(secretsDetector file $TASK_DEFINITION_ENV_VARIABLES_FILE)
|
|
||||||
if [[ $FINDINGS -eq 0 ]]; then
|
|
||||||
textPass "$regx: No secrets found in ECS task definition $TASK_DEFINITION variables" "$regx" "$TASK_DEFINITION"
|
|
||||||
# delete file if nothing interesting is there
|
|
||||||
rm -f $TASK_DEFINITION_ENV_VARIABLES_FILE
|
|
||||||
else
|
|
||||||
textFail "$regx: Potential secret found in ECS task definition $TASK_DEFINITION variables" "$regx" "$TASK_DEFINITION"
|
|
||||||
fi
|
|
||||||
else
|
|
||||||
textInfo "$regx: ECS task definition $TASK_DEFINITION has no variables" "$regx" "$TASK_DEFINITION"
|
|
||||||
rm -f $TASK_DEFINITION_ENV_VARIABLES_FILE
|
|
||||||
fi
|
|
||||||
done
|
|
||||||
else
|
|
||||||
textInfo "$regx: No ECS task definitions found" "$regx"
|
|
||||||
fi
|
|
||||||
fi
|
|
||||||
done
|
|
||||||
}
|
|
||||||
4
providers/aws/services/ecs/ecs_client.py
Normal file
4
providers/aws/services/ecs/ecs_client.py
Normal file
@@ -0,0 +1,4 @@
|
|||||||
|
from providers.aws.lib.audit_info.audit_info import current_audit_info
|
||||||
|
from providers.aws.services.ecs.ecs_service import ECS
|
||||||
|
|
||||||
|
ecs_client = ECS(current_audit_info)
|
||||||
84
providers/aws/services/ecs/ecs_service.py
Normal file
84
providers/aws/services/ecs/ecs_service.py
Normal file
@@ -0,0 +1,84 @@
|
|||||||
|
import threading
|
||||||
|
from re import sub
|
||||||
|
|
||||||
|
from pydantic import BaseModel
|
||||||
|
|
||||||
|
from lib.logger import logger
|
||||||
|
from providers.aws.aws_provider import generate_regional_clients
|
||||||
|
|
||||||
|
|
||||||
|
################################ ECS
|
||||||
|
class ECS:
|
||||||
|
def __init__(self, audit_info):
|
||||||
|
self.service = "ecs"
|
||||||
|
self.session = audit_info.audit_session
|
||||||
|
self.regional_clients = generate_regional_clients(self.service, audit_info)
|
||||||
|
self.task_definitions = []
|
||||||
|
self.__threading_call__(self.__list_task_definitions__)
|
||||||
|
self.__describe_task_definition__()
|
||||||
|
|
||||||
|
def __get_session__(self):
|
||||||
|
return self.session
|
||||||
|
|
||||||
|
def __threading_call__(self, call):
|
||||||
|
threads = []
|
||||||
|
for regional_client in self.regional_clients.values():
|
||||||
|
threads.append(threading.Thread(target=call, args=(regional_client,)))
|
||||||
|
for t in threads:
|
||||||
|
t.start()
|
||||||
|
for t in threads:
|
||||||
|
t.join()
|
||||||
|
|
||||||
|
def __list_task_definitions__(self, regional_client):
|
||||||
|
logger.info("ECS - Listing Task Definitions...")
|
||||||
|
try:
|
||||||
|
|
||||||
|
list_ecs_paginator = regional_client.get_paginator("list_task_definitions")
|
||||||
|
for page in list_ecs_paginator.paginate():
|
||||||
|
for task_definition in page["taskDefinitionArns"]:
|
||||||
|
self.task_definitions.append(
|
||||||
|
TaskDefinition(
|
||||||
|
# we want the family name without the revision
|
||||||
|
name=sub(":.*", "", task_definition.split("/")[1]),
|
||||||
|
arn=task_definition,
|
||||||
|
region=regional_client.region,
|
||||||
|
environment_variables=[],
|
||||||
|
)
|
||||||
|
)
|
||||||
|
except Exception as error:
|
||||||
|
logger.error(
|
||||||
|
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
|
||||||
|
)
|
||||||
|
|
||||||
|
def __describe_task_definition__(self):
|
||||||
|
logger.info("ECS - Describing Task Definitions...")
|
||||||
|
try:
|
||||||
|
for task_definition in self.task_definitions:
|
||||||
|
client = self.regional_clients[task_definition.region]
|
||||||
|
container_definitions = client.describe_task_definition(
|
||||||
|
taskDefinition=task_definition.arn
|
||||||
|
)["taskDefinition"]["containerDefinitions"]
|
||||||
|
for container in container_definitions:
|
||||||
|
if "environment" in container:
|
||||||
|
for env_var in container["environment"]:
|
||||||
|
task_definition.environment_variables.append(
|
||||||
|
ContainerEnvVariable(
|
||||||
|
name=env_var["name"], value=env_var["value"]
|
||||||
|
)
|
||||||
|
)
|
||||||
|
except Exception as error:
|
||||||
|
logger.error(
|
||||||
|
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class ContainerEnvVariable(BaseModel):
|
||||||
|
name: str
|
||||||
|
value: str
|
||||||
|
|
||||||
|
|
||||||
|
class TaskDefinition(BaseModel):
|
||||||
|
name: str
|
||||||
|
arn: str
|
||||||
|
region: str
|
||||||
|
environment_variables: list[ContainerEnvVariable]
|
||||||
149
providers/aws/services/ecs/ecs_service_test.py
Normal file
149
providers/aws/services/ecs/ecs_service_test.py
Normal file
@@ -0,0 +1,149 @@
|
|||||||
|
from unittest.mock import patch
|
||||||
|
|
||||||
|
from boto3 import client, session
|
||||||
|
from moto import mock_ecs
|
||||||
|
|
||||||
|
from providers.aws.lib.audit_info.models import AWS_Audit_Info
|
||||||
|
from providers.aws.services.ecs.ecs_service import ECS
|
||||||
|
|
||||||
|
AWS_ACCOUNT_NUMBER = 123456789012
|
||||||
|
AWS_REGION = "eu-west-1"
|
||||||
|
|
||||||
|
|
||||||
|
def mock_generate_regional_clients(service, audit_info):
|
||||||
|
regional_client = audit_info.audit_session.client(service, region_name=AWS_REGION)
|
||||||
|
regional_client.region = AWS_REGION
|
||||||
|
return {AWS_REGION: regional_client}
|
||||||
|
|
||||||
|
|
||||||
|
@patch(
|
||||||
|
"providers.aws.services.ecs.ecs_service.generate_regional_clients",
|
||||||
|
new=mock_generate_regional_clients,
|
||||||
|
)
|
||||||
|
class Test_ECS_Service:
|
||||||
|
# Mocked Audit Info
|
||||||
|
def set_mocked_audit_info(self):
|
||||||
|
audit_info = AWS_Audit_Info(
|
||||||
|
original_session=None,
|
||||||
|
audit_session=session.Session(
|
||||||
|
profile_name=None,
|
||||||
|
botocore_session=None,
|
||||||
|
),
|
||||||
|
audited_account=AWS_ACCOUNT_NUMBER,
|
||||||
|
audited_user_id=None,
|
||||||
|
audited_partition="aws",
|
||||||
|
audited_identity_arn=None,
|
||||||
|
profile=None,
|
||||||
|
profile_region=None,
|
||||||
|
credentials=None,
|
||||||
|
assumed_role_info=None,
|
||||||
|
audited_regions=None,
|
||||||
|
organizations_metadata=None,
|
||||||
|
)
|
||||||
|
return audit_info
|
||||||
|
|
||||||
|
# Test ECS Service
|
||||||
|
def test_service(self):
|
||||||
|
audit_info = self.set_mocked_audit_info()
|
||||||
|
ecs = ECS(audit_info)
|
||||||
|
assert ecs.service == "ecs"
|
||||||
|
|
||||||
|
# Test ECS client
|
||||||
|
def test_client(self):
|
||||||
|
audit_info = self.set_mocked_audit_info()
|
||||||
|
ecs = ECS(audit_info)
|
||||||
|
for reg_client in ecs.regional_clients.values():
|
||||||
|
assert reg_client.__class__.__name__ == "ECS"
|
||||||
|
|
||||||
|
# Test ECS session
|
||||||
|
def test__get_session__(self):
|
||||||
|
audit_info = self.set_mocked_audit_info()
|
||||||
|
ecs = ECS(audit_info)
|
||||||
|
assert ecs.session.__class__.__name__ == "Session"
|
||||||
|
|
||||||
|
# Test list ECS task definitions
|
||||||
|
@mock_ecs
|
||||||
|
def test__list_task_definitions__(self):
|
||||||
|
ecs_client = client("ecs", region_name=AWS_REGION)
|
||||||
|
|
||||||
|
definition = dict(
|
||||||
|
family="test_ecs_task",
|
||||||
|
containerDefinitions=[
|
||||||
|
{
|
||||||
|
"name": "hello_world",
|
||||||
|
"image": "hello-world:latest",
|
||||||
|
"memory": 400,
|
||||||
|
}
|
||||||
|
],
|
||||||
|
)
|
||||||
|
|
||||||
|
task_definition = ecs_client.register_task_definition(**definition)
|
||||||
|
audit_info = self.set_mocked_audit_info()
|
||||||
|
ecs = ECS(audit_info)
|
||||||
|
|
||||||
|
assert len(ecs.task_definitions) == 1
|
||||||
|
assert (
|
||||||
|
ecs.task_definitions[0].name == task_definition["taskDefinition"]["family"]
|
||||||
|
)
|
||||||
|
assert (
|
||||||
|
ecs.task_definitions[0].arn
|
||||||
|
== task_definition["taskDefinition"]["taskDefinitionArn"]
|
||||||
|
)
|
||||||
|
assert ecs.task_definitions[0].environment_variables == []
|
||||||
|
|
||||||
|
@mock_ecs
|
||||||
|
# Test describe ECS task definitions
|
||||||
|
def test__describe_task_definitions__(self):
|
||||||
|
ecs_client = client("ecs", region_name=AWS_REGION)
|
||||||
|
|
||||||
|
definition = dict(
|
||||||
|
family="test_ecs_task",
|
||||||
|
containerDefinitions=[
|
||||||
|
{
|
||||||
|
"name": "hello_world",
|
||||||
|
"image": "hello-world:latest",
|
||||||
|
"memory": 400,
|
||||||
|
"environment": [
|
||||||
|
{"name": "test-env", "value": "test-env-value"},
|
||||||
|
{"name": "test-env2", "value": "test-env-value2"},
|
||||||
|
],
|
||||||
|
}
|
||||||
|
],
|
||||||
|
)
|
||||||
|
|
||||||
|
task_definition = ecs_client.register_task_definition(**definition)
|
||||||
|
audit_info = self.set_mocked_audit_info()
|
||||||
|
ecs = ECS(audit_info)
|
||||||
|
|
||||||
|
assert len(ecs.task_definitions) == 1
|
||||||
|
assert (
|
||||||
|
ecs.task_definitions[0].name == task_definition["taskDefinition"]["family"]
|
||||||
|
)
|
||||||
|
assert (
|
||||||
|
ecs.task_definitions[0].arn
|
||||||
|
== task_definition["taskDefinition"]["taskDefinitionArn"]
|
||||||
|
)
|
||||||
|
assert (
|
||||||
|
ecs.task_definitions[0].environment_variables[0].name
|
||||||
|
== task_definition["taskDefinition"]["containerDefinitions"][0][
|
||||||
|
"environment"
|
||||||
|
][0]["name"]
|
||||||
|
)
|
||||||
|
assert (
|
||||||
|
ecs.task_definitions[0].environment_variables[0].value
|
||||||
|
== task_definition["taskDefinition"]["containerDefinitions"][0][
|
||||||
|
"environment"
|
||||||
|
][0]["value"]
|
||||||
|
)
|
||||||
|
assert (
|
||||||
|
ecs.task_definitions[0].environment_variables[1].name
|
||||||
|
== task_definition["taskDefinition"]["containerDefinitions"][0][
|
||||||
|
"environment"
|
||||||
|
][1]["name"]
|
||||||
|
)
|
||||||
|
assert (
|
||||||
|
ecs.task_definitions[0].environment_variables[1].value
|
||||||
|
== task_definition["taskDefinition"]["containerDefinitions"][0][
|
||||||
|
"environment"
|
||||||
|
][1]["value"]
|
||||||
|
)
|
||||||
@@ -0,0 +1,35 @@
|
|||||||
|
{
|
||||||
|
"Provider": "aws",
|
||||||
|
"CheckID": "ecs_task_definitions_no_environment_secrets",
|
||||||
|
"CheckTitle": "Check if secrets exists in ECS task definitions environment variables",
|
||||||
|
"CheckType": ["Protect", "Secure development", "Credentials not hard-coded"],
|
||||||
|
"ServiceName": "ecs",
|
||||||
|
"SubServiceName": "",
|
||||||
|
"ResourceIdTemplate": "arn:partition:service:region:account-id:resource-id",
|
||||||
|
"Severity": "critical",
|
||||||
|
"ResourceType": "AwsEcsTaskDefinition",
|
||||||
|
"Description": "Check if secrets exists in ECS task definitions environment variables",
|
||||||
|
"Risk": "The use of a hard-coded password increases the possibility of password guessing. If hard-coded passwords are used; it is possible that malicious users gain access through the account in question.",
|
||||||
|
"RelatedUrl": "",
|
||||||
|
"Remediation": {
|
||||||
|
"Code": {
|
||||||
|
"CLI": "",
|
||||||
|
"NativeIaC": "",
|
||||||
|
"Other": "",
|
||||||
|
"Terraform": ""
|
||||||
|
},
|
||||||
|
"Recommendation": {
|
||||||
|
"Text": "Use Secrets Manager or Parameter Store to securely provide credentials to containers without hardcoding the secrets in code or passing them through environment variables.",
|
||||||
|
"Url": "https://docs.aws.amazon.com/AmazonECS/latest/developerguide/specifying-sensitive-data.html"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"Categories": [],
|
||||||
|
"Tags": {
|
||||||
|
"Tag1Key": "value",
|
||||||
|
"Tag2Key": "value"
|
||||||
|
},
|
||||||
|
"DependsOn": [],
|
||||||
|
"RelatedTo": [],
|
||||||
|
"Notes": "",
|
||||||
|
"Compliance": []
|
||||||
|
}
|
||||||
@@ -0,0 +1,45 @@
|
|||||||
|
import os
|
||||||
|
import tempfile
|
||||||
|
from json import dumps
|
||||||
|
|
||||||
|
from detect_secrets import SecretsCollection
|
||||||
|
from detect_secrets.settings import default_settings
|
||||||
|
|
||||||
|
from lib.check.models import Check, Check_Report
|
||||||
|
from providers.aws.services.ecs.ecs_client import ecs_client
|
||||||
|
|
||||||
|
|
||||||
|
class ecs_task_definitions_no_environment_secrets(Check):
|
||||||
|
def execute(self):
|
||||||
|
findings = []
|
||||||
|
for task_definition in ecs_client.task_definitions:
|
||||||
|
report = Check_Report(self.metadata)
|
||||||
|
report.region = task_definition.region
|
||||||
|
report.resource_id = task_definition.name
|
||||||
|
report.resource_arn = task_definition.arn
|
||||||
|
report.status = "PASS"
|
||||||
|
report.status_extended = f"No secrets found in ECS task definition {task_definition.name} variables"
|
||||||
|
if task_definition.environment_variables:
|
||||||
|
for env_var in task_definition.environment_variables:
|
||||||
|
dump_env_vars = {}
|
||||||
|
dump_env_vars.update({env_var.name: env_var.value})
|
||||||
|
|
||||||
|
temp_env_data_file = tempfile.NamedTemporaryFile(delete=False)
|
||||||
|
|
||||||
|
env_data = dumps(dump_env_vars)
|
||||||
|
temp_env_data_file.write(bytes(env_data, encoding="raw_unicode_escape"))
|
||||||
|
temp_env_data_file.close()
|
||||||
|
|
||||||
|
secrets = SecretsCollection()
|
||||||
|
with default_settings():
|
||||||
|
secrets.scan_file(temp_env_data_file.name)
|
||||||
|
|
||||||
|
if secrets.json():
|
||||||
|
report.status = "FAIL"
|
||||||
|
report.status_extended = f"Potential secret found in ECS in ECS task definition {task_definition.name} variables"
|
||||||
|
|
||||||
|
os.remove(temp_env_data_file.name)
|
||||||
|
|
||||||
|
findings.append(report)
|
||||||
|
|
||||||
|
return findings
|
||||||
@@ -0,0 +1,105 @@
|
|||||||
|
from re import search
|
||||||
|
from unittest import mock
|
||||||
|
|
||||||
|
from providers.aws.services.ecs.ecs_service import ContainerEnvVariable, TaskDefinition
|
||||||
|
|
||||||
|
AWS_REGION = "eu-west-1"
|
||||||
|
AWS_ACCOUNT_NUMBER = "123456789012"
|
||||||
|
task_name = "test-task"
|
||||||
|
env_var_name_no_secrets = "host"
|
||||||
|
env_var_value_no_secrets = "localhost:1234"
|
||||||
|
env_var_name_with_secrets = "DB_PASSWORD"
|
||||||
|
env_var_value_with_secrets = "pass-12343"
|
||||||
|
|
||||||
|
|
||||||
|
class Test_ecs_task_definitions_no_environment_secrets:
|
||||||
|
def test_no_task_definitions(self):
|
||||||
|
ecs_client = mock.MagicMock
|
||||||
|
ecs_client.task_definitions = []
|
||||||
|
|
||||||
|
with mock.patch(
|
||||||
|
"providers.aws.services.ecs.ecs_service.ECS",
|
||||||
|
ecs_client,
|
||||||
|
):
|
||||||
|
from providers.aws.services.ecs.ecs_task_definitions_no_environment_secrets.ecs_task_definitions_no_environment_secrets import (
|
||||||
|
ecs_task_definitions_no_environment_secrets,
|
||||||
|
)
|
||||||
|
|
||||||
|
check = ecs_task_definitions_no_environment_secrets()
|
||||||
|
result = check.execute()
|
||||||
|
assert len(result) == 0
|
||||||
|
|
||||||
|
def test_container_env_var_no_secrets(self):
|
||||||
|
ecs_client = mock.MagicMock
|
||||||
|
ecs_client.task_definitions = []
|
||||||
|
ecs_client.task_definitions.append(
|
||||||
|
TaskDefinition(
|
||||||
|
name=task_name,
|
||||||
|
arn=f"arn:aws:ecs:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:task-definition/{task_name}:1",
|
||||||
|
region=AWS_REGION,
|
||||||
|
environment_variables=[
|
||||||
|
ContainerEnvVariable(
|
||||||
|
name=env_var_name_no_secrets, value=env_var_value_no_secrets
|
||||||
|
)
|
||||||
|
],
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
with mock.patch(
|
||||||
|
"providers.aws.services.ecs.ecs_service.ECS",
|
||||||
|
ecs_client,
|
||||||
|
):
|
||||||
|
from providers.aws.services.ecs.ecs_task_definitions_no_environment_secrets.ecs_task_definitions_no_environment_secrets import (
|
||||||
|
ecs_task_definitions_no_environment_secrets,
|
||||||
|
)
|
||||||
|
|
||||||
|
check = ecs_task_definitions_no_environment_secrets()
|
||||||
|
result = check.execute()
|
||||||
|
assert len(result) == 1
|
||||||
|
assert result[0].status == "PASS"
|
||||||
|
assert search(
|
||||||
|
"No secrets found in ECS task definition", result[0].status_extended
|
||||||
|
)
|
||||||
|
assert result[0].resource_id == task_name
|
||||||
|
assert (
|
||||||
|
result[0].resource_arn
|
||||||
|
== f"arn:aws:ecs:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:task-definition/{task_name}:1"
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_container_env_var_with_secrets(self):
|
||||||
|
ecs_client = mock.MagicMock
|
||||||
|
ecs_client.task_definitions = []
|
||||||
|
ecs_client.task_definitions.append(
|
||||||
|
TaskDefinition(
|
||||||
|
name=task_name,
|
||||||
|
arn=f"arn:aws:ecs:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:task-definition/{task_name}:1",
|
||||||
|
region=AWS_REGION,
|
||||||
|
environment_variables=[
|
||||||
|
ContainerEnvVariable(
|
||||||
|
name=env_var_name_with_secrets, value=env_var_value_with_secrets
|
||||||
|
)
|
||||||
|
],
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
with mock.patch(
|
||||||
|
"providers.aws.services.ecs.ecs_service.ECS",
|
||||||
|
ecs_client,
|
||||||
|
):
|
||||||
|
from providers.aws.services.ecs.ecs_task_definitions_no_environment_secrets.ecs_task_definitions_no_environment_secrets import (
|
||||||
|
ecs_task_definitions_no_environment_secrets,
|
||||||
|
)
|
||||||
|
|
||||||
|
check = ecs_task_definitions_no_environment_secrets()
|
||||||
|
result = check.execute()
|
||||||
|
assert len(result) == 1
|
||||||
|
assert result[0].status == "FAIL"
|
||||||
|
assert search(
|
||||||
|
"Potential secret found in ECS in ECS task definition",
|
||||||
|
result[0].status_extended,
|
||||||
|
)
|
||||||
|
assert result[0].resource_id == task_name
|
||||||
|
assert (
|
||||||
|
result[0].resource_arn
|
||||||
|
== f"arn:aws:ecs:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:task-definition/{task_name}:1"
|
||||||
|
)
|
||||||
Reference in New Issue
Block a user