fix(ecr): Refactor service (#2302)

Co-authored-by: Gabriel Soltz <thegaby@gmail.com>
Co-authored-by: Kay Agahd <kagahd@users.noreply.github.com>
Co-authored-by: Nacho Rivera <nachor1992@gmail.com>
Co-authored-by: Kevin Pullin <kevin.pullin@gmail.com>
Co-authored-by: Sergio Garcia <sergargar1@gmail.com>
This commit is contained in:
Pepe Fagoaga
2023-05-09 17:04:21 +02:00
committed by GitHub
parent d344318dd4
commit 6f48012234
13 changed files with 894 additions and 454 deletions

View File

@@ -5,24 +5,27 @@ from prowler.providers.aws.services.ecr.ecr_client import ecr_client
class ecr_registry_scan_images_on_push_enabled(Check):
def execute(self):
findings = []
for registry in ecr_client.registries:
report = Check_Report_AWS(self.metadata())
report.region = registry.region
report.resource_id = registry.id
report.resource_tags = registry.tags
report.status = "FAIL"
report.status_extended = f"ECR registry {registry.id} has {registry.scan_type} scanning without scan on push"
if registry.rules:
report.status = "PASS"
report.status_extended = f"ECR registry {registry.id} has {registry.scan_type} scan with scan on push"
filters = True
for rule in registry.rules:
if not rule.scan_filters or "'*'" in str(rule.scan_filters):
filters = False
if filters:
report.status = "FAIL"
report.status_extended = f"ECR registry {registry.id} has {registry.scan_type} scanning with scan on push but with repository filters"
for registry in ecr_client.registries.values():
# We want to check the registry if it is in use, hence there are repositories
if len(registry.repositories) != 0:
report = Check_Report_AWS(self.metadata())
report.region = registry.region
report.resource_id = registry.id
# A registry cannot have tags
report.resource_tags = []
report.status = "FAIL"
report.status_extended = f"ECR registry {registry.id} has {registry.scan_type} scanning without scan on push enabled"
if registry.rules:
report.status = "PASS"
report.status_extended = f"ECR registry {registry.id} has {registry.scan_type} scan with scan on push enabled"
filters = True
for rule in registry.rules:
if not rule.scan_filters or "'*'" in str(rule.scan_filters):
filters = False
if filters:
report.status = "FAIL"
report.status_extended = f"ECR registry {registry.id} has {registry.scan_type} scanning with scan on push but with repository filters"
findings.append(report)
findings.append(report)
return findings

View File

@@ -5,22 +5,19 @@ from prowler.providers.aws.services.ecr.ecr_client import ecr_client
class ecr_repositories_lifecycle_policy_enabled(Check):
def execute(self):
findings = []
for repository in ecr_client.repositories:
report = Check_Report_AWS(self.metadata())
report.region = repository.region
report.resource_id = repository.name
report.resource_arn = repository.arn
report.resource_tags = repository.tags
report.status = "FAIL"
report.status_extended = (
f"Repository {repository.name} has no lifecycle policy"
)
if repository.lyfecicle_policy:
report.status = "PASS"
report.status_extended = (
f"Repository {repository.name} has lifecycle policy"
)
for registry in ecr_client.registries.values():
for repository in registry.repositories:
report = Check_Report_AWS(self.metadata())
report.region = repository.region
report.resource_id = repository.name
report.resource_arn = repository.arn
report.resource_tags = repository.tags
report.status = "FAIL"
report.status_extended = f"Repository {repository.name} has not a lifecycle policy configured"
if repository.lifecycle_policy:
report.status = "PASS"
report.status_extended = f"Repository {repository.name} has a lifecycle policy configured"
findings.append(report)
findings.append(report)
return findings

View File

@@ -5,25 +5,28 @@ from prowler.providers.aws.services.ecr.ecr_client import ecr_client
class ecr_repositories_not_publicly_accessible(Check):
def execute(self):
findings = []
for repository in ecr_client.repositories:
report = Check_Report_AWS(self.metadata())
report.region = repository.region
report.resource_id = repository.name
report.resource_arn = repository.arn
report.resource_tags = repository.tags
report.status = "PASS"
report.status_extended = f"Repository {repository.name} is not open"
if repository.policy:
for statement in repository.policy["Statement"]:
if statement["Effect"] == "Allow":
if "*" in statement["Principal"] or (
"AWS" in statement["Principal"]
and "*" in statement["Principal"]["AWS"]
):
report.status = "FAIL"
report.status_extended = f"Repository {repository.name} policy may allow anonymous users to perform actions (Principal: '*')"
break
for registry in ecr_client.registries.values():
for repository in registry.repositories:
report = Check_Report_AWS(self.metadata())
report.region = repository.region
report.resource_id = repository.name
report.resource_arn = repository.arn
report.resource_tags = repository.tags
report.status = "PASS"
report.status_extended = (
f"Repository {repository.name} is not publicly accesible"
)
if repository.policy:
for statement in repository.policy["Statement"]:
if statement["Effect"] == "Allow":
if "*" in statement["Principal"] or (
"AWS" in statement["Principal"]
and "*" in statement["Principal"]["AWS"]
):
report.status = "FAIL"
report.status_extended = f"Repository {repository.name} policy may allow anonymous users to perform actions (Principal: '*')"
break
findings.append(report)
findings.append(report)
return findings

View File

@@ -5,22 +5,23 @@ from prowler.providers.aws.services.ecr.ecr_client import ecr_client
class ecr_repositories_scan_images_on_push_enabled(Check):
def execute(self):
findings = []
for repository in ecr_client.repositories:
report = Check_Report_AWS(self.metadata())
report.region = repository.region
report.resource_id = repository.name
report.resource_arn = repository.arn
report.resource_tags = repository.tags
report.status = "PASS"
report.status_extended = (
f"ECR repository {repository.name} has scan on push enabled"
)
if not repository.scan_on_push:
report.status = "FAIL"
for registry in ecr_client.registries.values():
for repository in registry.repositories:
report = Check_Report_AWS(self.metadata())
report.region = repository.region
report.resource_id = repository.name
report.resource_arn = repository.arn
report.resource_tags = repository.tags
report.status = "PASS"
report.status_extended = (
f"ECR repository {repository.name} has scan on push disabled"
f"ECR repository {repository.name} has scan on push enabled"
)
if not repository.scan_on_push:
report.status = "FAIL"
report.status_extended = (
f"ECR repository {repository.name} has scan on push disabled"
)
findings.append(report)
findings.append(report)
return findings

View File

@@ -5,32 +5,37 @@ from prowler.providers.aws.services.ecr.ecr_client import ecr_client
class ecr_repositories_scan_vulnerabilities_in_latest_image(Check):
def execute(self):
findings = []
for repository in ecr_client.repositories:
for image in repository.images_details:
report = Check_Report_AWS(self.metadata())
report.region = repository.region
report.resource_id = repository.name
report.resource_arn = repository.arn
report.resource_tags = repository.tags
report.status = "PASS"
report.status_extended = f"ECR repository {repository.name} has imageTag {image.latest_tag} scanned without findings"
if not image.scan_findings_status:
report.status = "FAIL"
report.status_extended = f"ECR repository {repository.name} has imageTag {image.latest_tag} without a scan"
elif image.scan_findings_status == "FAILED":
report.status = "FAIL"
report.status_extended = (
f"ECR repository {repository.name} with scan status FAILED"
)
elif image.scan_findings_status != "FAILED":
if image.scan_findings_severity_count and (
image.scan_findings_severity_count.critical
or image.scan_findings_severity_count.high
or image.scan_findings_severity_count.medium
):
report.status = "FAIL"
report.status_extended = f"ECR repository {repository.name} has imageTag {image.latest_tag} scanned with findings: CRITICAL->{image.scan_findings_severity_count.critical}, HIGH->{image.scan_findings_severity_count.high}, MEDIUM->{image.scan_findings_severity_count.medium} "
for registry in ecr_client.registries.values():
for repository in registry.repositories:
# First check if the repository has images
if len(repository.images_details) > 0:
# We only want to check the latest image pushed
image = repository.images_details[-1]
findings.append(report)
report = Check_Report_AWS(self.metadata())
report.region = repository.region
report.resource_id = repository.name
report.resource_arn = repository.arn
report.resource_tags = repository.tags
report.status = "PASS"
report.status_extended = f"ECR repository {repository.name} has imageTag {image.latest_tag} scanned without findings"
if not image.scan_findings_status:
report.status = "FAIL"
report.status_extended = f"ECR repository {repository.name} has imageTag {image.latest_tag} without a scan"
elif image.scan_findings_status == "FAILED":
report.status = "FAIL"
report.status_extended = (
f"ECR repository {repository.name} with scan status FAILED"
)
elif image.scan_findings_status != "FAILED":
if image.scan_findings_severity_count and (
image.scan_findings_severity_count.critical
or image.scan_findings_severity_count.high
or image.scan_findings_severity_count.medium
):
report.status = "FAIL"
report.status_extended = f"ECR repository {repository.name} has imageTag {image.latest_tag} scanned with findings: CRITICAL->{image.scan_findings_severity_count.critical}, HIGH->{image.scan_findings_severity_count.high}, MEDIUM->{image.scan_findings_severity_count.medium} "
findings.append(report)
return findings

View File

@@ -1,4 +1,5 @@
import threading
from datetime import datetime
from json import loads
from typing import Optional
@@ -17,14 +18,14 @@ class ECR:
self.session = audit_info.audit_session
self.audit_resources = audit_info.audit_resources
self.regional_clients = generate_regional_clients(self.service, audit_info)
self.repositories = []
self.registries = []
self.__threading_call__(self.__describe_repositories__)
self.__describe_repository_policies__()
self.__get_image_details__()
self.__get_repository_lifecycle_policy__()
self.registry_id = audit_info.audited_account
self.registries = {}
self.__threading_call__(self.__describe_registries_and_repositories__)
self.__threading_call__(self.__describe_repository_policies__)
self.__threading_call__(self.__get_image_details__)
self.__threading_call__(self.__get_repository_lifecycle_policy__)
self.__threading_call__(self.__get_registry_scanning_configuration__)
self.__list_tags_for_resource__()
self.__threading_call__(self.__list_tags_for_resource__)
def __get_session__(self):
return self.session
@@ -38,8 +39,9 @@ class ECR:
for t in threads:
t.join()
def __describe_repositories__(self, regional_client):
logger.info("ECR - Describing repositories...")
def __describe_registries_and_repositories__(self, regional_client):
logger.info("ECR - Describing registries and repositories...")
regional_registry_repositories = []
try:
describe_ecr_paginator = regional_client.get_paginator(
"describe_repositories"
@@ -51,126 +53,157 @@ class ECR:
repository["repositoryArn"], self.audit_resources
)
):
self.repositories.append(
regional_registry_repositories.append(
Repository(
name=repository["repositoryName"],
arn=repository["repositoryArn"],
registry_id=repository["registryId"],
region=regional_client.region,
scan_on_push=repository["imageScanningConfiguration"][
"scanOnPush"
],
policy=None,
images_details=[],
lyfecicle_policy=None,
lifecycle_policy=None,
)
)
# The default ECR registry is assumed
self.registries[regional_client.region] = Registry(
id=self.registry_id,
region=regional_client.region,
repositories=regional_registry_repositories,
)
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def __describe_repository_policies__(self):
def __describe_repository_policies__(self, regional_client):
logger.info("ECR - Describing repository policies...")
try:
for repository in self.repositories:
client = self.regional_clients[repository.region]
policy = client.get_repository_policy(repositoryName=repository.name)
if "policyText" in policy:
repository.policy = loads(policy["policyText"])
if regional_client.region in self.registries:
for repository in self.registries[regional_client.region].repositories:
client = self.regional_clients[repository.region]
policy = client.get_repository_policy(
repositoryName=repository.name
)
if "policyText" in policy:
repository.policy = loads(policy["policyText"])
except Exception as error:
if "RepositoryPolicyNotFoundException" not in str(error):
logger.error(
f"-- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def __get_repository_lifecycle_policy__(self):
def __get_repository_lifecycle_policy__(self, regional_client):
logger.info("ECR - Getting repository lifecycle policy...")
try:
for repository in self.repositories:
client = self.regional_clients[repository.region]
policy = client.get_lifecycle_policy(repositoryName=repository.name)
if "lifecyclePolicyText" in policy:
repository.lyfecicle_policy = policy["lifecyclePolicyText"]
if regional_client.region in self.registries:
for repository in self.registries[regional_client.region].repositories:
client = self.regional_clients[repository.region]
policy = client.get_lifecycle_policy(repositoryName=repository.name)
if "lifecyclePolicyText" in policy:
repository.lifecycle_policy = policy["lifecyclePolicyText"]
except Exception as error:
if "LifecyclePolicyNotFoundException" not in str(error):
logger.error(
f"-- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def __get_image_details__(self):
def __get_image_details__(self, regional_client):
logger.info("ECR - Getting images details...")
try:
for repository in self.repositories:
# if the repo is not scanning pushed images there is nothing to do
if repository.scan_on_push:
client = self.regional_clients[repository.region]
describe_images_paginator = client.get_paginator("describe_images")
for page in describe_images_paginator.paginate(
repositoryName=repository.name
):
for image in page["imageDetails"]:
severity_counts = None
last_scan_status = None
if "imageScanStatus" in image:
last_scan_status = image["imageScanStatus"]["status"]
if regional_client.region in self.registries:
for repository in self.registries[regional_client.region].repositories:
# There is nothing to do if the repository is not scanning pushed images
if repository.scan_on_push:
client = self.regional_clients[repository.region]
describe_images_paginator = client.get_paginator(
"describe_images"
)
for page in describe_images_paginator.paginate(
registryId=self.registries[regional_client.region].id,
repositoryName=repository.name,
PaginationConfig={"PageSize": 1000},
):
for image in page["imageDetails"]:
# The following condition is required since sometimes
# the AWS ECR API returns None using the iterator
if image is not None:
severity_counts = None
last_scan_status = None
if "imageScanStatus" in image:
last_scan_status = image["imageScanStatus"][
"status"
]
if "imageScanFindingsSummary" in image:
severity_counts = FindingSeverityCounts(
critical=0, high=0, medium=0
)
finding_severity_counts = image[
"imageScanFindingsSummary"
]["findingSeverityCounts"]
if "CRITICAL" in finding_severity_counts:
severity_counts.critical = finding_severity_counts[
"CRITICAL"
]
if "HIGH" in finding_severity_counts:
severity_counts.high = finding_severity_counts[
"HIGH"
]
if "MEDIUM" in finding_severity_counts:
severity_counts.medium = finding_severity_counts[
"MEDIUM"
]
latest_tag = "None"
if image.get("imageTags"):
latest_tag = image["imageTags"][0]
repository.images_details.append(
ImageDetails(
latest_tag=latest_tag,
latest_digest=image["imageDigest"],
scan_findings_status=last_scan_status,
scan_findings_severity_count=severity_counts,
)
)
if "imageScanFindingsSummary" in image:
severity_counts = FindingSeverityCounts(
critical=0, high=0, medium=0
)
finding_severity_counts = image[
"imageScanFindingsSummary"
]["findingSeverityCounts"]
if "CRITICAL" in finding_severity_counts:
severity_counts.critical = (
finding_severity_counts["CRITICAL"]
)
if "HIGH" in finding_severity_counts:
severity_counts.high = (
finding_severity_counts["HIGH"]
)
if "MEDIUM" in finding_severity_counts:
severity_counts.medium = (
finding_severity_counts["MEDIUM"]
)
latest_tag = "None"
if image.get("imageTags"):
latest_tag = image["imageTags"][0]
repository.images_details.append(
ImageDetails(
latest_tag=latest_tag,
image_pushed_at=image["imagePushedAt"],
latest_digest=image["imageDigest"],
scan_findings_status=last_scan_status,
scan_findings_severity_count=severity_counts,
)
)
# Sort the repository images by date pushed
repository.images_details.sort(
key=lambda image: image.image_pushed_at
)
except Exception as error:
logger.error(
f"-- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def __list_tags_for_resource__(self):
def __list_tags_for_resource__(self, regional_client):
logger.info("ECR - List Tags...")
try:
for repository in self.repositories:
try:
regional_client = self.regional_clients[repository.region]
response = regional_client.list_tags_for_resource(
resourceArn=repository.arn
)["tags"]
repository.tags = response
if regional_client.region in self.registries:
for repository in self.registries[regional_client.region].repositories:
try:
regional_client = self.regional_clients[repository.region]
response = regional_client.list_tags_for_resource(
resourceArn=repository.arn
)["tags"]
repository.tags = response
except ClientError as error:
if error.response["Error"]["Code"] == "RepositoryNotFoundException":
logger.warning(
f"{regional_client.region} --"
f" {error.__class__.__name__}[{error.__traceback__.tb_lineno}]:"
f" {error}"
)
continue
except ClientError as error:
if (
error.response["Error"]["Code"]
== "RepositoryNotFoundException"
):
logger.warning(
f"{regional_client.region} --"
f" {error.__class__.__name__}[{error.__traceback__.tb_lineno}]:"
f" {error}"
)
continue
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
@@ -179,25 +212,34 @@ class ECR:
def __get_registry_scanning_configuration__(self, regional_client):
logger.info("ECR - Getting Registry Scanning Configuration...")
try:
response = regional_client.get_registry_scanning_configuration()
rules = []
for rule in response.get("scanningConfiguration").get("rules", []):
rules.append(
ScanningRule(
scan_frequency=rule.get("scanFrequency"),
scan_filters=rule.get("repositoryFilters"),
if regional_client.region in self.registries:
response = regional_client.get_registry_scanning_configuration()
rules = []
for rule in response.get("scanningConfiguration").get("rules", []):
rules.append(
ScanningRule(
scan_frequency=rule.get("scanFrequency"),
scan_filters=rule.get("repositoryFilters", []),
)
)
self.registries[regional_client.region].scan_type = response.get(
"scanningConfiguration"
).get("scanType", "BASIC")
self.registries[regional_client.region].rules = rules
except ClientError as error:
if error.response["Error"][
"Code"
] == "ValidationException" and "GetRegistryScanningConfiguration operation: This feature is disabled" in str(
error
):
self.registries[regional_client.region].scan_type = "BASIC"
self.registries[regional_client.region].rules = []
else:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
self.registries.append(
Registry(
id=response.get("registryId", ""),
scan_type=response.get("scanningConfiguration").get(
"scanType", "BASIC"
),
region=regional_client.region,
rules=rules,
)
)
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
@@ -213,6 +255,7 @@ class FindingSeverityCounts(BaseModel):
class ImageDetails(BaseModel):
latest_tag: str
latest_digest: str
image_pushed_at: datetime
scan_findings_status: Optional[str]
scan_findings_severity_count: Optional[FindingSeverityCounts]
@@ -221,10 +264,11 @@ class Repository(BaseModel):
name: str
arn: str
region: str
registry_id = str
scan_on_push: bool
policy: Optional[dict]
images_details: Optional[list[ImageDetails]]
lyfecicle_policy: Optional[str]
lifecycle_policy: Optional[str]
tags: Optional[list] = []
@@ -236,6 +280,6 @@ class ScanningRule(BaseModel):
class Registry(BaseModel):
id: str
region: str
scan_type: str
rules: list[ScanningRule]
tags: Optional[list] = []
repositories: list[Repository]
scan_type: Optional[str]
rules: Optional[list[ScanningRule]]

View File

@@ -1,17 +1,25 @@
from re import search
from unittest import mock
from prowler.providers.aws.services.ecr.ecr_service import Registry, ScanningRule
from prowler.providers.aws.services.ecr.ecr_service import (
Registry,
Repository,
ScanningRule,
)
# Mock Test Region
AWS_REGION = "eu-west-1"
AWS_ACCOUNT_NUMBER = "123456789012"
repository_name = "test_repo"
repository_arn = (
f"arn:aws:ecr:eu-west-1:{AWS_ACCOUNT_NUMBER}:repository/{repository_name}"
)
class Test_ecr_registry_scan_images_on_push_enabled:
def test_no_registries(self):
ecr_client = mock.MagicMock
ecr_client.registries = []
ecr_client.registries = {}
with mock.patch(
"prowler.providers.aws.services.ecr.ecr_service.ECR",
@@ -25,21 +33,53 @@ class Test_ecr_registry_scan_images_on_push_enabled:
result = check.execute()
assert len(result) == 0
def test_scan_on_push_enabled(self):
def test_registry_no_repositories(self):
ecr_client = mock.MagicMock
ecr_client.registries = []
ecr_client.registries.append(
Registry(
id=AWS_ACCOUNT_NUMBER,
region=AWS_REGION,
scan_type="BASIC",
rules=[
ScanningRule(
scan_frequency="SCAN_ON_PUSH",
scan_filters=[{"filter": "*", "filterType": "WILDCARD"}],
)
],
ecr_client.registries = {}
ecr_client.registries[AWS_REGION] = Registry(
id=AWS_ACCOUNT_NUMBER,
region=AWS_REGION,
scan_type="BASIC",
repositories=[],
rules=[],
)
with mock.patch(
"prowler.providers.aws.services.ecr.ecr_service.ECR",
ecr_client,
):
from prowler.providers.aws.services.ecr.ecr_registry_scan_images_on_push_enabled.ecr_registry_scan_images_on_push_enabled import (
ecr_registry_scan_images_on_push_enabled,
)
check = ecr_registry_scan_images_on_push_enabled()
result = check.execute()
assert len(result) == 0
def test_registry_scan_on_push_enabled(self):
ecr_client = mock.MagicMock
ecr_client.registries = {}
ecr_client.registries[AWS_REGION] = Registry(
id=AWS_ACCOUNT_NUMBER,
region=AWS_REGION,
scan_type="BASIC",
repositories=[
Repository(
name=repository_name,
arn=repository_arn,
region=AWS_REGION,
scan_on_push=True,
policy="",
images_details=None,
lifecycle_policy="",
)
],
rules=[
ScanningRule(
scan_frequency="SCAN_ON_PUSH",
scan_filters=[{"filter": "*", "filterType": "WILDCARD"}],
)
],
)
with mock.patch(
@@ -60,19 +100,28 @@ class Test_ecr_registry_scan_images_on_push_enabled:
def test_scan_on_push_enabled_with_filters(self):
ecr_client = mock.MagicMock
ecr_client.registries = []
ecr_client.registries.append(
Registry(
id=AWS_ACCOUNT_NUMBER,
region=AWS_REGION,
scan_type="BASIC",
rules=[
ScanningRule(
scan_frequency="SCAN_ON_PUSH",
scan_filters=[{"filter": "test", "filterType": "WILDCARD"}],
)
],
)
ecr_client.registries = {}
ecr_client.registries[AWS_REGION] = Registry(
id=AWS_ACCOUNT_NUMBER,
region=AWS_REGION,
scan_type="BASIC",
repositories=[
Repository(
name=repository_name,
arn=repository_arn,
region=AWS_REGION,
scan_on_push=True,
policy="",
images_details=None,
lifecycle_policy="",
)
],
rules=[
ScanningRule(
scan_frequency="SCAN_ON_PUSH",
scan_filters=[{"filter": "test", "filterType": "WILDCARD"}],
)
],
)
with mock.patch(
@@ -96,14 +145,23 @@ class Test_ecr_registry_scan_images_on_push_enabled:
def test_scan_on_push_disabled(self):
ecr_client = mock.MagicMock
ecr_client.registries = []
ecr_client.registries.append(
Registry(
id=AWS_ACCOUNT_NUMBER,
region=AWS_REGION,
scan_type="BASIC",
rules=[],
)
ecr_client.registries = {}
ecr_client.registries[AWS_REGION] = Registry(
id=AWS_ACCOUNT_NUMBER,
region=AWS_REGION,
scan_type="BASIC",
repositories=[
Repository(
name=repository_name,
arn=repository_arn,
region=AWS_REGION,
scan_on_push=True,
policy="",
images_details=None,
lifecycle_policy="",
)
],
rules=[],
)
with mock.patch(

View File

@@ -1,7 +1,6 @@
from re import search
from unittest import mock
from prowler.providers.aws.services.ecr.ecr_service import Repository
from prowler.providers.aws.services.ecr.ecr_service import Registry, Repository
# Mock Test Region
AWS_REGION = "eu-west-1"
@@ -24,19 +23,64 @@ repo_policy_public = {
class Test_ecr_repositories_lifecycle_policy_enabled:
def test_no_lyfecicle_policy(self):
def test_no_registries(self):
ecr_client = mock.MagicMock
ecr_client.repositories = []
ecr_client.repositories.append(
Repository(
name=repository_name,
arn=repository_arn,
region=AWS_REGION,
scan_on_push=True,
policy=repo_policy_public,
images_details=None,
lyfecicle_policy="test-policy",
ecr_client.registries = {}
with mock.patch(
"prowler.providers.aws.services.ecr.ecr_service.ECR",
ecr_client,
):
from prowler.providers.aws.services.ecr.ecr_repositories_lifecycle_policy_enabled.ecr_repositories_lifecycle_policy_enabled import (
ecr_repositories_lifecycle_policy_enabled,
)
check = ecr_repositories_lifecycle_policy_enabled()
result = check.execute()
assert len(result) == 0
def test_registry_no_repositories(self):
ecr_client = mock.MagicMock
ecr_client.registries = {}
ecr_client.registries[AWS_REGION] = Registry(
id=AWS_ACCOUNT_NUMBER,
region=AWS_REGION,
scan_type="BASIC",
repositories=[],
rules=[],
)
with mock.patch(
"prowler.providers.aws.services.ecr.ecr_service.ECR",
ecr_client,
):
from prowler.providers.aws.services.ecr.ecr_repositories_lifecycle_policy_enabled.ecr_repositories_lifecycle_policy_enabled import (
ecr_repositories_lifecycle_policy_enabled,
)
check = ecr_repositories_lifecycle_policy_enabled()
result = check.execute()
assert len(result) == 0
def test_lifecycle_policy(self):
ecr_client = mock.MagicMock
ecr_client.registries = {}
ecr_client.registries[AWS_REGION] = Registry(
id=AWS_ACCOUNT_NUMBER,
region=AWS_REGION,
scan_type="BASIC",
rules=[],
repositories=[
Repository(
name=repository_name,
arn=repository_arn,
region=AWS_REGION,
scan_on_push=True,
policy=repo_policy_public,
images_details=None,
lifecycle_policy="test-policy",
)
],
)
with mock.patch(
@@ -51,23 +95,33 @@ class Test_ecr_repositories_lifecycle_policy_enabled:
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert search("has lifecycle policy", result[0].status_extended)
assert (
result[0].status_extended
== f"Repository {repository_name} has a lifecycle policy configured"
)
assert result[0].resource_id == repository_name
assert result[0].resource_arn == repository_arn
assert result[0].resource_tags == []
def test_lifecycle_policy(self):
def test_no_lifecycle_policy(self):
ecr_client = mock.MagicMock
ecr_client.repositories = []
ecr_client.repositories.append(
Repository(
name=repository_name,
arn=repository_arn,
region=AWS_REGION,
scan_on_push=False,
policy=repo_policy_public,
images_details=None,
lyfecicle_policy=None,
)
ecr_client.registries = {}
ecr_client.registries[AWS_REGION] = Registry(
id=AWS_ACCOUNT_NUMBER,
region=AWS_REGION,
scan_type="BASIC",
rules=[],
repositories=[
Repository(
name=repository_name,
arn=repository_arn,
region=AWS_REGION,
scan_on_push=False,
policy=repo_policy_public,
images_details=None,
lifecycle_policy=None,
)
],
)
with mock.patch(
@@ -82,6 +136,10 @@ class Test_ecr_repositories_lifecycle_policy_enabled:
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert search("has no lifecycle policy", result[0].status_extended)
assert (
result[0].status_extended
== f"Repository {repository_name} has not a lifecycle policy configured"
)
assert result[0].resource_id == repository_name
assert result[0].resource_arn == repository_arn
assert result[0].resource_tags == []

View File

@@ -1,7 +1,6 @@
from re import search
from unittest import mock
from prowler.providers.aws.services.ecr.ecr_service import Repository
from prowler.providers.aws.services.ecr.ecr_service import Registry, Repository
# Mock Test Region
AWS_REGION = "eu-west-1"
@@ -36,19 +35,64 @@ repo_policy_public = {
class Test_ecr_repositories_not_publicly_accessible:
def test_no_registries(self):
ecr_client = mock.MagicMock
ecr_client.registries = {}
with mock.patch(
"prowler.providers.aws.services.ecr.ecr_service.ECR",
ecr_client,
):
from prowler.providers.aws.services.ecr.ecr_repositories_not_publicly_accessible.ecr_repositories_not_publicly_accessible import (
ecr_repositories_not_publicly_accessible,
)
check = ecr_repositories_not_publicly_accessible()
result = check.execute()
assert len(result) == 0
def test_registry_no_repositories(self):
ecr_client = mock.MagicMock
ecr_client.registries = {}
ecr_client.registries[AWS_REGION] = Registry(
id=AWS_ACCOUNT_NUMBER,
region=AWS_REGION,
scan_type="BASIC",
repositories=[],
rules=[],
)
with mock.patch(
"prowler.providers.aws.services.ecr.ecr_service.ECR",
ecr_client,
):
from prowler.providers.aws.services.ecr.ecr_repositories_not_publicly_accessible.ecr_repositories_not_publicly_accessible import (
ecr_repositories_not_publicly_accessible,
)
check = ecr_repositories_not_publicly_accessible()
result = check.execute()
assert len(result) == 0
def test_repository_not_public(self):
ecr_client = mock.MagicMock
ecr_client.repositories = []
ecr_client.repositories.append(
Repository(
name=repository_name,
arn=repository_arn,
region=AWS_REGION,
scan_on_push=True,
policy=repo_policy_not_public,
images_details=None,
lyfecicle_policy=None,
)
ecr_client.registries = {}
ecr_client.registries[AWS_REGION] = Registry(
id=AWS_ACCOUNT_NUMBER,
region=AWS_REGION,
scan_type="BASIC",
repositories=[
Repository(
name=repository_name,
arn=repository_arn,
region=AWS_REGION,
scan_on_push=True,
policy=repo_policy_not_public,
images_details=None,
lifecycle_policy=None,
)
],
rules=[],
)
with mock.patch(
@@ -63,23 +107,32 @@ class Test_ecr_repositories_not_publicly_accessible:
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert search("is not open", result[0].status_extended)
assert (
result[0].status_extended
== f"Repository {repository_name} is not publicly accesible"
)
assert result[0].resource_id == repository_name
assert result[0].resource_arn == repository_arn
def test_repository_public(self):
ecr_client = mock.MagicMock
ecr_client.repositories = []
ecr_client.repositories.append(
Repository(
name=repository_name,
arn=repository_arn,
region=AWS_REGION,
scan_on_push=True,
policy=repo_policy_public,
images_details=None,
lyfecicle_policy=None,
)
ecr_client.registries = {}
ecr_client.registries[AWS_REGION] = Registry(
id=AWS_ACCOUNT_NUMBER,
region=AWS_REGION,
scan_type="BASIC",
repositories=[
Repository(
name=repository_name,
arn=repository_arn,
region=AWS_REGION,
scan_on_push=True,
policy=repo_policy_public,
images_details=None,
lifecycle_policy=None,
)
],
rules=[],
)
with mock.patch(
@@ -94,8 +147,9 @@ class Test_ecr_repositories_not_publicly_accessible:
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert search(
"policy may allow anonymous users to", result[0].status_extended
assert (
result[0].status_extended
== f"Repository {repository_name} policy may allow anonymous users to perform actions (Principal: '*')"
)
assert result[0].resource_id == repository_name
assert result[0].resource_arn == repository_arn

View File

@@ -1,7 +1,6 @@
from re import search
from unittest import mock
from prowler.providers.aws.services.ecr.ecr_service import Repository
from prowler.providers.aws.services.ecr.ecr_service import Registry, Repository
# Mock Test Region
AWS_REGION = "eu-west-1"
@@ -24,19 +23,64 @@ repo_policy_public = {
class Test_ecr_repositories_scan_images_on_push_enabled:
def test_no_registries(self):
ecr_client = mock.MagicMock
ecr_client.registries = {}
with mock.patch(
"prowler.providers.aws.services.ecr.ecr_service.ECR",
ecr_client,
):
from prowler.providers.aws.services.ecr.ecr_repositories_scan_images_on_push_enabled.ecr_repositories_scan_images_on_push_enabled import (
ecr_repositories_scan_images_on_push_enabled,
)
check = ecr_repositories_scan_images_on_push_enabled()
result = check.execute()
assert len(result) == 0
def test_registry_no_repositories(self):
ecr_client = mock.MagicMock
ecr_client.registries = {}
ecr_client.registries[AWS_REGION] = Registry(
id=AWS_ACCOUNT_NUMBER,
region=AWS_REGION,
scan_type="BASIC",
repositories=[],
rules=[],
)
with mock.patch(
"prowler.providers.aws.services.ecr.ecr_service.ECR",
ecr_client,
):
from prowler.providers.aws.services.ecr.ecr_repositories_scan_images_on_push_enabled.ecr_repositories_scan_images_on_push_enabled import (
ecr_repositories_scan_images_on_push_enabled,
)
check = ecr_repositories_scan_images_on_push_enabled()
result = check.execute()
assert len(result) == 0
def test_scan_on_push_disabled(self):
ecr_client = mock.MagicMock
ecr_client.repositories = []
ecr_client.repositories.append(
Repository(
name=repository_name,
arn=repository_arn,
region=AWS_REGION,
scan_on_push=True,
policy=repo_policy_public,
images_details=None,
lyfecicle_policy=None,
)
ecr_client.registries = {}
ecr_client.registries[AWS_REGION] = Registry(
id=AWS_ACCOUNT_NUMBER,
region=AWS_REGION,
scan_type="BASIC",
repositories=[
Repository(
name=repository_name,
arn=repository_arn,
region=AWS_REGION,
scan_on_push=True,
policy=repo_policy_public,
images_details=None,
lifecycle_policy=None,
)
],
rules=[],
)
with mock.patch(
@@ -51,23 +95,32 @@ class Test_ecr_repositories_scan_images_on_push_enabled:
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert search("has scan on push enabled", result[0].status_extended)
assert (
result[0].status_extended
== f"ECR repository {repository_name} has scan on push enabled"
)
assert result[0].resource_id == repository_name
assert result[0].resource_arn == repository_arn
def test_scan_on_push_enabled(self):
ecr_client = mock.MagicMock
ecr_client.repositories = []
ecr_client.repositories.append(
Repository(
name=repository_name,
arn=repository_arn,
region=AWS_REGION,
scan_on_push=False,
policy=repo_policy_public,
images_details=None,
lyfecicle_policy=None,
)
ecr_client.registries = {}
ecr_client.registries[AWS_REGION] = Registry(
id=AWS_ACCOUNT_NUMBER,
region=AWS_REGION,
scan_type="BASIC",
repositories=[
Repository(
name=repository_name,
arn=repository_arn,
region=AWS_REGION,
scan_on_push=False,
policy=repo_policy_public,
images_details=None,
lifecycle_policy=None,
)
],
rules=[],
)
with mock.patch(
@@ -82,6 +135,9 @@ class Test_ecr_repositories_scan_images_on_push_enabled:
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert search("has scan on push disabled", result[0].status_extended)
assert (
result[0].status_extended
== f"ECR repository {repository_name} has scan on push disabled"
)
assert result[0].resource_id == repository_name
assert result[0].resource_arn == repository_arn

View File

@@ -1,9 +1,11 @@
from datetime import datetime
from re import search
from unittest import mock
from prowler.providers.aws.services.ecr.ecr_service import (
FindingSeverityCounts,
ImageDetails,
Registry,
Repository,
)
@@ -28,20 +30,66 @@ repo_policy_public = {
class Test_ecr_repositories_scan_vulnerabilities_in_latest_image:
def test_no_registries(self):
ecr_client = mock.MagicMock
ecr_client.registries = {}
with mock.patch(
"prowler.providers.aws.services.ecr.ecr_service.ECR",
ecr_client,
):
from prowler.providers.aws.services.ecr.ecr_repositories_scan_vulnerabilities_in_latest_image.ecr_repositories_scan_vulnerabilities_in_latest_image import (
ecr_repositories_scan_vulnerabilities_in_latest_image,
)
check = ecr_repositories_scan_vulnerabilities_in_latest_image()
result = check.execute()
assert len(result) == 0
def test_registry_no_repositories(self):
ecr_client = mock.MagicMock
ecr_client.registries = {}
ecr_client.registries[AWS_REGION] = Registry(
id=AWS_ACCOUNT_NUMBER,
region=AWS_REGION,
scan_type="BASIC",
repositories=[],
rules=[],
)
with mock.patch(
"prowler.providers.aws.services.ecr.ecr_service.ECR",
ecr_client,
):
from prowler.providers.aws.services.ecr.ecr_repositories_scan_vulnerabilities_in_latest_image.ecr_repositories_scan_vulnerabilities_in_latest_image import (
ecr_repositories_scan_vulnerabilities_in_latest_image,
)
check = ecr_repositories_scan_vulnerabilities_in_latest_image()
result = check.execute()
assert len(result) == 0
def test_empty_repository(self):
ecr_client = mock.MagicMock
ecr_client.repositories = []
ecr_client.repositories.append(
Repository(
name=repository_name,
arn=repository_arn,
region=AWS_REGION,
scan_on_push=True,
policy=repo_policy_public,
images_details=[],
lyfecicle_policy=None,
)
ecr_client.registries = {}
ecr_client.registries[AWS_REGION] = Registry(
id=AWS_ACCOUNT_NUMBER,
region=AWS_REGION,
scan_type="BASIC",
repositories=[
Repository(
name=repository_name,
arn=repository_arn,
region=AWS_REGION,
scan_on_push=True,
policy=repo_policy_public,
images_details=[],
lifecycle_policy=None,
)
],
rules=[],
)
with mock.patch(
"prowler.providers.aws.services.ecr.ecr_service.ECR",
ecr_client,
@@ -56,28 +104,35 @@ class Test_ecr_repositories_scan_vulnerabilities_in_latest_image:
def test_image_scaned_without_findings(self):
ecr_client = mock.MagicMock
ecr_client.repositories = []
ecr_client.repositories.append(
Repository(
name=repository_name,
arn=repository_arn,
region=AWS_REGION,
scan_on_push=True,
policy=repo_policy_public,
images_details=[],
lyfecicle_policy=None,
)
ecr_client.registries = {}
ecr_client.registries[AWS_REGION] = Registry(
id=AWS_ACCOUNT_NUMBER,
region=AWS_REGION,
scan_type="BASIC",
repositories=[
Repository(
name=repository_name,
arn=repository_arn,
region=AWS_REGION,
scan_on_push=True,
policy=repo_policy_public,
images_details=[
ImageDetails(
latest_tag="test-tag",
latest_digest="test-digest",
image_pushed_at=datetime(2023, 1, 1),
scan_findings_status="COMPLETE",
scan_findings_severity_count=FindingSeverityCounts(
critical=0, high=0, medium=0
),
),
],
lifecycle_policy=None,
)
],
rules=[],
)
ecr_client.repositories[0].images_details.append(
ImageDetails(
latest_tag="test-tag",
latest_digest="test-digest",
scan_findings_status="COMPLETE",
scan_findings_severity_count=FindingSeverityCounts(
critical=0, high=0, medium=0
),
),
),
with mock.patch(
"prowler.providers.aws.services.ecr.ecr_service.ECR",
ecr_client,
@@ -96,28 +151,35 @@ class Test_ecr_repositories_scan_vulnerabilities_in_latest_image:
def test_image_scanned_with_findings(self):
ecr_client = mock.MagicMock
ecr_client.repositories = []
ecr_client.repositories.append(
Repository(
name=repository_name,
arn=repository_arn,
region=AWS_REGION,
scan_on_push=True,
policy=repo_policy_public,
images_details=[],
lyfecicle_policy=None,
)
ecr_client.registries = {}
ecr_client.registries[AWS_REGION] = Registry(
id=AWS_ACCOUNT_NUMBER,
region=AWS_REGION,
scan_type="BASIC",
repositories=[
Repository(
name=repository_name,
arn=repository_arn,
region=AWS_REGION,
scan_on_push=True,
policy=repo_policy_public,
images_details=[
ImageDetails(
latest_tag="test-tag",
latest_digest="test-digest",
image_pushed_at=datetime(2023, 1, 1),
scan_findings_status="COMPLETE",
scan_findings_severity_count=FindingSeverityCounts(
critical=12, high=34, medium=7
),
)
],
lifecycle_policy=None,
)
],
rules=[],
)
ecr_client.repositories[0].images_details.append(
ImageDetails(
latest_tag="test-tag",
latest_digest="test-digest",
scan_findings_status="COMPLETE",
scan_findings_severity_count=FindingSeverityCounts(
critical=12, high=34, medium=7
),
),
),
with mock.patch(
"prowler.providers.aws.services.ecr.ecr_service.ECR",
ecr_client,
@@ -136,28 +198,35 @@ class Test_ecr_repositories_scan_vulnerabilities_in_latest_image:
def test_image_scanned_fail_scan(self):
ecr_client = mock.MagicMock
ecr_client.repositories = []
ecr_client.repositories.append(
Repository(
name=repository_name,
arn=repository_arn,
region=AWS_REGION,
scan_on_push=True,
policy=repo_policy_public,
images_details=[],
lyfecicle_policy=None,
)
ecr_client.registries = {}
ecr_client.registries[AWS_REGION] = Registry(
id=AWS_ACCOUNT_NUMBER,
region=AWS_REGION,
scan_type="BASIC",
repositories=[
Repository(
name=repository_name,
arn=repository_arn,
region=AWS_REGION,
scan_on_push=True,
policy=repo_policy_public,
images_details=[
ImageDetails(
latest_tag="test-tag",
latest_digest="test-digest",
image_pushed_at=datetime(2023, 1, 1),
scan_findings_status="FAILED",
scan_findings_severity_count=FindingSeverityCounts(
critical=0, high=0, medium=0
),
)
],
lifecycle_policy=None,
)
],
rules=[],
)
ecr_client.repositories[0].images_details.append(
ImageDetails(
latest_tag="test-tag",
latest_digest="test-digest",
scan_findings_status="FAILED",
scan_findings_severity_count=FindingSeverityCounts(
critical=0, high=0, medium=0
),
),
),
with mock.patch(
"prowler.providers.aws.services.ecr.ecr_service.ECR",
ecr_client,
@@ -176,28 +245,35 @@ class Test_ecr_repositories_scan_vulnerabilities_in_latest_image:
def test_image_not_scanned(self):
ecr_client = mock.MagicMock
ecr_client.repositories = []
ecr_client.repositories.append(
Repository(
name=repository_name,
arn=repository_arn,
region=AWS_REGION,
scan_on_push=True,
policy=repo_policy_public,
images_details=[],
lyfecicle_policy=None,
)
ecr_client.registries = {}
ecr_client.registries[AWS_REGION] = Registry(
id=AWS_ACCOUNT_NUMBER,
region=AWS_REGION,
scan_type="BASIC",
repositories=[
Repository(
name=repository_name,
arn=repository_arn,
region=AWS_REGION,
scan_on_push=True,
policy=repo_policy_public,
images_details=[
ImageDetails(
latest_tag="test-tag",
latest_digest="test-digest",
image_pushed_at=datetime(2023, 1, 1),
scan_findings_status="",
scan_findings_severity_count=FindingSeverityCounts(
critical=0, high=0, medium=0
),
)
],
lifecycle_policy=None,
)
],
rules=[],
)
ecr_client.repositories[0].images_details.append(
ImageDetails(
latest_tag="test-tag",
latest_digest="test-digest",
scan_findings_status="",
scan_findings_severity_count=FindingSeverityCounts(
critical=0, high=0, medium=0
),
),
),
with mock.patch(
"prowler.providers.aws.services.ecr.ecr_service.ECR",
ecr_client,

View File

@@ -1,3 +1,4 @@
from datetime import datetime
from unittest.mock import patch
import botocore
@@ -24,8 +25,9 @@ def mock_make_api_call(self, operation_name, kwarg):
{
"imageDigest": "sha256:d8868e50ac4c7104d2200d42f432b661b2da8c1e417ccfae217e6a1e04bb9295",
"imageTags": [
"test-tag",
"test-tag1",
],
"imagePushedAt": datetime(2023, 1, 1),
"imageScanStatus": {
"status": "COMPLETE",
},
@@ -38,6 +40,13 @@ def mock_make_api_call(self, operation_name, kwarg):
"imageTags": [
"test-tag2",
],
"imagePushedAt": datetime(2023, 1, 2),
"imageScanStatus": {
"status": "COMPLETE",
},
"imageScanFindingsSummary": {
"findingSeverityCounts": {"CRITICAL": 1, "HIGH": 2, "MEDIUM": 3}
},
},
],
}
@@ -68,6 +77,7 @@ def mock_make_api_call(self, operation_name, kwarg):
],
},
}
return make_api_call(self, operation_name, kwarg)
@@ -128,7 +138,7 @@ class Test_ECR_Service:
# Test describe ECR repositories
@mock_ecr
def test__describe_repositories__(self):
def test__describe_registries_and_repositories__(self):
ecr_client = client("ecr", region_name=AWS_REGION)
ecr_client.create_repository(
repositoryName=repo_name,
@@ -139,11 +149,16 @@ class Test_ECR_Service:
)
audit_info = self.set_mocked_audit_info()
ecr = ECR(audit_info)
assert len(ecr.repositories) == 1
assert ecr.repositories[0].name == repo_name
assert ecr.repositories[0].arn == repo_arn
assert ecr.repositories[0].scan_on_push
assert ecr.repositories[0].tags == [
assert len(ecr.registries) == 1
assert ecr.registries[AWS_REGION].id == AWS_ACCOUNT_NUMBER
assert ecr.registries[AWS_REGION].region == AWS_REGION
assert len(ecr.registries[AWS_REGION].repositories) == 1
assert ecr.registries[AWS_REGION].repositories[0].name == repo_name
assert ecr.registries[AWS_REGION].repositories[0].arn == repo_arn
assert ecr.registries[AWS_REGION].repositories[0].scan_on_push
assert ecr.registries[AWS_REGION].repositories[0].tags == [
{"Key": "test", "Value": "test"},
]
@@ -157,28 +172,39 @@ class Test_ECR_Service:
)
audit_info = self.set_mocked_audit_info()
ecr = ECR(audit_info)
assert len(ecr.repositories) == 1
assert ecr.repositories[0].name == repo_name
assert ecr.repositories[0].arn == repo_arn
assert ecr.repositories[0].scan_on_push
assert len(ecr.registries) == 1
assert len(ecr.registries[AWS_REGION].repositories) == 1
assert ecr.registries[AWS_REGION].repositories[0].name == repo_name
assert ecr.registries[AWS_REGION].repositories[0].arn == repo_arn
assert ecr.registries[AWS_REGION].repositories[0].scan_on_push
assert (
ecr.repositories[0].policy["Statement"][0]["Sid"] == "Allow Describe Images"
ecr.registries[AWS_REGION].repositories[0].policy["Statement"][0]["Sid"]
== "Allow Describe Images"
)
assert ecr.repositories[0].policy["Statement"][0]["Effect"] == "Allow"
assert (
ecr.repositories[0].policy["Statement"][0]["Principal"]["AWS"][0]
ecr.registries[AWS_REGION].repositories[0].policy["Statement"][0]["Effect"]
== "Allow"
)
assert (
ecr.registries[AWS_REGION]
.repositories[0]
.policy["Statement"][0]["Principal"]["AWS"][0]
== f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:root"
)
assert (
ecr.repositories[0].policy["Statement"][0]["Action"][0]
ecr.registries[AWS_REGION]
.repositories[0]
.policy["Statement"][0]["Action"][0]
== "ecr:DescribeImages"
)
assert (
ecr.repositories[0].policy["Statement"][0]["Action"][1]
ecr.registries[AWS_REGION]
.repositories[0]
.policy["Statement"][0]["Action"][1]
== "ecr:DescribeRepositories"
)
# Test describe ECR repository policies
# Test describe ECR repository lifecycle policies
@mock_ecr
def test__get_lifecycle_policies__(self):
ecr_client = client("ecr", region_name=AWS_REGION)
@@ -188,11 +214,12 @@ class Test_ECR_Service:
)
audit_info = self.set_mocked_audit_info()
ecr = ECR(audit_info)
assert len(ecr.repositories) == 1
assert ecr.repositories[0].name == repo_name
assert ecr.repositories[0].arn == repo_arn
assert ecr.repositories[0].scan_on_push
assert ecr.repositories[0].lyfecicle_policy
assert len(ecr.registries) == 1
assert len(ecr.registries[AWS_REGION].repositories) == 1
assert ecr.registries[AWS_REGION].repositories[0].name == repo_name
assert ecr.registries[AWS_REGION].repositories[0].arn == repo_arn
assert ecr.registries[AWS_REGION].repositories[0].scan_on_push
assert ecr.registries[AWS_REGION].repositories[0].lifecycle_policy
# Test get image details
@mock_ecr
@@ -204,45 +231,103 @@ class Test_ECR_Service:
)
audit_info = self.set_mocked_audit_info()
ecr = ECR(audit_info)
assert len(ecr.repositories) == 1
assert ecr.repositories[0].name == repo_name
assert ecr.repositories[0].arn == repo_arn
assert ecr.repositories[0].scan_on_push
assert len(ecr.repositories[0].images_details) == 2
assert ecr.repositories[0].images_details[0].latest_tag == "test-tag"
assert len(ecr.registries) == 1
assert len(ecr.registries[AWS_REGION].repositories) == 1
assert ecr.registries[AWS_REGION].repositories[0].name == repo_name
assert ecr.registries[AWS_REGION].repositories[0].arn == repo_arn
assert ecr.registries[AWS_REGION].repositories[0].scan_on_push
assert len(ecr.registries[AWS_REGION].repositories[0].images_details) == 2
# First image pushed
assert ecr.registries[AWS_REGION].repositories[0].images_details[
0
].image_pushed_at == datetime(2023, 1, 1)
assert (
ecr.repositories[0].images_details[0].latest_digest
ecr.registries[AWS_REGION].repositories[0].images_details[0].latest_tag
== "test-tag1"
)
assert (
ecr.registries[AWS_REGION].repositories[0].images_details[0].latest_digest
== "sha256:d8868e50ac4c7104d2200d42f432b661b2da8c1e417ccfae217e6a1e04bb9295"
)
assert ecr.repositories[0].images_details[0].scan_findings_status == "COMPLETE"
assert (
ecr.repositories[0].images_details[0].scan_findings_severity_count.critical
ecr.registries[AWS_REGION]
.repositories[0]
.images_details[0]
.scan_findings_status
== "COMPLETE"
)
assert (
ecr.registries[AWS_REGION]
.repositories[0]
.images_details[0]
.scan_findings_severity_count.critical
== 1
)
assert (
ecr.repositories[0].images_details[0].scan_findings_severity_count.high == 2
ecr.registries[AWS_REGION]
.repositories[0]
.images_details[0]
.scan_findings_severity_count.high
== 2
)
assert (
ecr.repositories[0].images_details[0].scan_findings_severity_count.medium
ecr.registries[AWS_REGION]
.repositories[0]
.images_details[0]
.scan_findings_severity_count.medium
== 3
)
assert ecr.repositories[0].images_details[1].latest_tag == "test-tag2"
# Second image pushed
assert ecr.registries[AWS_REGION].repositories[0].images_details[
1
].image_pushed_at == datetime(2023, 1, 2)
assert (
ecr.repositories[0].images_details[1].latest_digest
ecr.registries[AWS_REGION].repositories[0].images_details[1].latest_tag
== "test-tag2"
)
assert (
ecr.registries[AWS_REGION].repositories[0].images_details[1].latest_digest
== "sha256:83251ac64627fc331584f6c498b3aba5badc01574e2c70b2499af3af16630eed"
)
assert not ecr.repositories[0].images_details[1].scan_findings_status
assert not ecr.repositories[0].images_details[1].scan_findings_severity_count
assert (
ecr.registries[AWS_REGION]
.repositories[0]
.images_details[1]
.scan_findings_status
== "COMPLETE"
)
assert (
ecr.registries[AWS_REGION]
.repositories[0]
.images_details[1]
.scan_findings_severity_count.critical
== 1
)
assert (
ecr.registries[AWS_REGION]
.repositories[0]
.images_details[1]
.scan_findings_severity_count.high
== 2
)
assert (
ecr.registries[AWS_REGION]
.repositories[0]
.images_details[1]
.scan_findings_severity_count.medium
== 3
)
# Test get ECR Registries
# Test get ECR Registries Scanning Configuration
@mock_ecr
def test__get_registry_scanning_configuration__(self):
audit_info = self.set_mocked_audit_info()
ecr = ECR(audit_info)
assert len(ecr.registries) == 1
assert ecr.registries[0].id == AWS_ACCOUNT_NUMBER
assert ecr.registries[0].scan_type == "BASIC"
assert ecr.registries[0].rules == [
assert ecr.registries[AWS_REGION].id == AWS_ACCOUNT_NUMBER
assert ecr.registries[AWS_REGION].scan_type == "BASIC"
assert ecr.registries[AWS_REGION].rules == [
ScanningRule(
scan_frequency="SCAN_ON_PUSH",
scan_filters=[{"filter": "*", "filterType": "WILDCARD"}],

View File

@@ -271,8 +271,8 @@ class Test_iam_role_cross_account_readonlyaccess_policy:
)
with mock.patch(
"prowler.providers.aws.services.iam.iam_service.IAM",
iam_client,
"prowler.providers.aws.services.iam.iam_role_cross_account_readonlyaccess_policy.iam_role_cross_account_readonlyaccess_policy.iam_client",
new=iam_client,
):
# Test Check
from prowler.providers.aws.services.iam.iam_role_cross_account_readonlyaccess_policy.iam_role_cross_account_readonlyaccess_policy import (