diff --git a/prowler/providers/aws/services/ecr/ecr_registry_scan_images_on_push_enabled/ecr_registry_scan_images_on_push_enabled.py b/prowler/providers/aws/services/ecr/ecr_registry_scan_images_on_push_enabled/ecr_registry_scan_images_on_push_enabled.py index 768558db..5cc8127e 100644 --- a/prowler/providers/aws/services/ecr/ecr_registry_scan_images_on_push_enabled/ecr_registry_scan_images_on_push_enabled.py +++ b/prowler/providers/aws/services/ecr/ecr_registry_scan_images_on_push_enabled/ecr_registry_scan_images_on_push_enabled.py @@ -5,24 +5,27 @@ from prowler.providers.aws.services.ecr.ecr_client import ecr_client class ecr_registry_scan_images_on_push_enabled(Check): def execute(self): findings = [] - for registry in ecr_client.registries: - report = Check_Report_AWS(self.metadata()) - report.region = registry.region - report.resource_id = registry.id - report.resource_tags = registry.tags - report.status = "FAIL" - report.status_extended = f"ECR registry {registry.id} has {registry.scan_type} scanning without scan on push" - if registry.rules: - report.status = "PASS" - report.status_extended = f"ECR registry {registry.id} has {registry.scan_type} scan with scan on push" - filters = True - for rule in registry.rules: - if not rule.scan_filters or "'*'" in str(rule.scan_filters): - filters = False - if filters: - report.status = "FAIL" - report.status_extended = f"ECR registry {registry.id} has {registry.scan_type} scanning with scan on push but with repository filters" + for registry in ecr_client.registries.values(): + # We want to check the registry if it is in use, hence there are repositories + if len(registry.repositories) != 0: + report = Check_Report_AWS(self.metadata()) + report.region = registry.region + report.resource_id = registry.id + # A registry cannot have tags + report.resource_tags = [] + report.status = "FAIL" + report.status_extended = f"ECR registry {registry.id} has {registry.scan_type} scanning without scan on push enabled" + if registry.rules: + report.status = "PASS" + report.status_extended = f"ECR registry {registry.id} has {registry.scan_type} scan with scan on push enabled" + filters = True + for rule in registry.rules: + if not rule.scan_filters or "'*'" in str(rule.scan_filters): + filters = False + if filters: + report.status = "FAIL" + report.status_extended = f"ECR registry {registry.id} has {registry.scan_type} scanning with scan on push but with repository filters" - findings.append(report) + findings.append(report) return findings diff --git a/prowler/providers/aws/services/ecr/ecr_repositories_lifecycle_policy_enabled/ecr_repositories_lifecycle_policy_enabled.py b/prowler/providers/aws/services/ecr/ecr_repositories_lifecycle_policy_enabled/ecr_repositories_lifecycle_policy_enabled.py index 82542e4a..da4d329f 100644 --- a/prowler/providers/aws/services/ecr/ecr_repositories_lifecycle_policy_enabled/ecr_repositories_lifecycle_policy_enabled.py +++ b/prowler/providers/aws/services/ecr/ecr_repositories_lifecycle_policy_enabled/ecr_repositories_lifecycle_policy_enabled.py @@ -5,22 +5,19 @@ from prowler.providers.aws.services.ecr.ecr_client import ecr_client class ecr_repositories_lifecycle_policy_enabled(Check): def execute(self): findings = [] - for repository in ecr_client.repositories: - report = Check_Report_AWS(self.metadata()) - report.region = repository.region - report.resource_id = repository.name - report.resource_arn = repository.arn - report.resource_tags = repository.tags - report.status = "FAIL" - report.status_extended = ( - f"Repository {repository.name} has no lifecycle policy" - ) - if repository.lyfecicle_policy: - report.status = "PASS" - report.status_extended = ( - f"Repository {repository.name} has lifecycle policy" - ) + for registry in ecr_client.registries.values(): + for repository in registry.repositories: + report = Check_Report_AWS(self.metadata()) + report.region = repository.region + report.resource_id = repository.name + report.resource_arn = repository.arn + report.resource_tags = repository.tags + report.status = "FAIL" + report.status_extended = f"Repository {repository.name} has not a lifecycle policy configured" + if repository.lifecycle_policy: + report.status = "PASS" + report.status_extended = f"Repository {repository.name} has a lifecycle policy configured" - findings.append(report) + findings.append(report) return findings diff --git a/prowler/providers/aws/services/ecr/ecr_repositories_not_publicly_accessible/ecr_repositories_not_publicly_accessible.py b/prowler/providers/aws/services/ecr/ecr_repositories_not_publicly_accessible/ecr_repositories_not_publicly_accessible.py index e92bc6e2..3db731ec 100644 --- a/prowler/providers/aws/services/ecr/ecr_repositories_not_publicly_accessible/ecr_repositories_not_publicly_accessible.py +++ b/prowler/providers/aws/services/ecr/ecr_repositories_not_publicly_accessible/ecr_repositories_not_publicly_accessible.py @@ -5,25 +5,28 @@ from prowler.providers.aws.services.ecr.ecr_client import ecr_client class ecr_repositories_not_publicly_accessible(Check): def execute(self): findings = [] - for repository in ecr_client.repositories: - report = Check_Report_AWS(self.metadata()) - report.region = repository.region - report.resource_id = repository.name - report.resource_arn = repository.arn - report.resource_tags = repository.tags - report.status = "PASS" - report.status_extended = f"Repository {repository.name} is not open" - if repository.policy: - for statement in repository.policy["Statement"]: - if statement["Effect"] == "Allow": - if "*" in statement["Principal"] or ( - "AWS" in statement["Principal"] - and "*" in statement["Principal"]["AWS"] - ): - report.status = "FAIL" - report.status_extended = f"Repository {repository.name} policy may allow anonymous users to perform actions (Principal: '*')" - break + for registry in ecr_client.registries.values(): + for repository in registry.repositories: + report = Check_Report_AWS(self.metadata()) + report.region = repository.region + report.resource_id = repository.name + report.resource_arn = repository.arn + report.resource_tags = repository.tags + report.status = "PASS" + report.status_extended = ( + f"Repository {repository.name} is not publicly accesible" + ) + if repository.policy: + for statement in repository.policy["Statement"]: + if statement["Effect"] == "Allow": + if "*" in statement["Principal"] or ( + "AWS" in statement["Principal"] + and "*" in statement["Principal"]["AWS"] + ): + report.status = "FAIL" + report.status_extended = f"Repository {repository.name} policy may allow anonymous users to perform actions (Principal: '*')" + break - findings.append(report) + findings.append(report) return findings diff --git a/prowler/providers/aws/services/ecr/ecr_repositories_scan_images_on_push_enabled/ecr_repositories_scan_images_on_push_enabled.py b/prowler/providers/aws/services/ecr/ecr_repositories_scan_images_on_push_enabled/ecr_repositories_scan_images_on_push_enabled.py index f35b91f2..c46e9912 100644 --- a/prowler/providers/aws/services/ecr/ecr_repositories_scan_images_on_push_enabled/ecr_repositories_scan_images_on_push_enabled.py +++ b/prowler/providers/aws/services/ecr/ecr_repositories_scan_images_on_push_enabled/ecr_repositories_scan_images_on_push_enabled.py @@ -5,22 +5,23 @@ from prowler.providers.aws.services.ecr.ecr_client import ecr_client class ecr_repositories_scan_images_on_push_enabled(Check): def execute(self): findings = [] - for repository in ecr_client.repositories: - report = Check_Report_AWS(self.metadata()) - report.region = repository.region - report.resource_id = repository.name - report.resource_arn = repository.arn - report.resource_tags = repository.tags - report.status = "PASS" - report.status_extended = ( - f"ECR repository {repository.name} has scan on push enabled" - ) - if not repository.scan_on_push: - report.status = "FAIL" + for registry in ecr_client.registries.values(): + for repository in registry.repositories: + report = Check_Report_AWS(self.metadata()) + report.region = repository.region + report.resource_id = repository.name + report.resource_arn = repository.arn + report.resource_tags = repository.tags + report.status = "PASS" report.status_extended = ( - f"ECR repository {repository.name} has scan on push disabled" + f"ECR repository {repository.name} has scan on push enabled" ) + if not repository.scan_on_push: + report.status = "FAIL" + report.status_extended = ( + f"ECR repository {repository.name} has scan on push disabled" + ) - findings.append(report) + findings.append(report) return findings diff --git a/prowler/providers/aws/services/ecr/ecr_repositories_scan_vulnerabilities_in_latest_image/ecr_repositories_scan_vulnerabilities_in_latest_image.py b/prowler/providers/aws/services/ecr/ecr_repositories_scan_vulnerabilities_in_latest_image/ecr_repositories_scan_vulnerabilities_in_latest_image.py index 8cd62167..92b74f84 100644 --- a/prowler/providers/aws/services/ecr/ecr_repositories_scan_vulnerabilities_in_latest_image/ecr_repositories_scan_vulnerabilities_in_latest_image.py +++ b/prowler/providers/aws/services/ecr/ecr_repositories_scan_vulnerabilities_in_latest_image/ecr_repositories_scan_vulnerabilities_in_latest_image.py @@ -5,32 +5,37 @@ from prowler.providers.aws.services.ecr.ecr_client import ecr_client class ecr_repositories_scan_vulnerabilities_in_latest_image(Check): def execute(self): findings = [] - for repository in ecr_client.repositories: - for image in repository.images_details: - report = Check_Report_AWS(self.metadata()) - report.region = repository.region - report.resource_id = repository.name - report.resource_arn = repository.arn - report.resource_tags = repository.tags - report.status = "PASS" - report.status_extended = f"ECR repository {repository.name} has imageTag {image.latest_tag} scanned without findings" - if not image.scan_findings_status: - report.status = "FAIL" - report.status_extended = f"ECR repository {repository.name} has imageTag {image.latest_tag} without a scan" - elif image.scan_findings_status == "FAILED": - report.status = "FAIL" - report.status_extended = ( - f"ECR repository {repository.name} with scan status FAILED" - ) - elif image.scan_findings_status != "FAILED": - if image.scan_findings_severity_count and ( - image.scan_findings_severity_count.critical - or image.scan_findings_severity_count.high - or image.scan_findings_severity_count.medium - ): - report.status = "FAIL" - report.status_extended = f"ECR repository {repository.name} has imageTag {image.latest_tag} scanned with findings: CRITICAL->{image.scan_findings_severity_count.critical}, HIGH->{image.scan_findings_severity_count.high}, MEDIUM->{image.scan_findings_severity_count.medium} " + for registry in ecr_client.registries.values(): + for repository in registry.repositories: + # First check if the repository has images + if len(repository.images_details) > 0: + # We only want to check the latest image pushed + image = repository.images_details[-1] - findings.append(report) + report = Check_Report_AWS(self.metadata()) + report.region = repository.region + report.resource_id = repository.name + report.resource_arn = repository.arn + report.resource_tags = repository.tags + report.status = "PASS" + report.status_extended = f"ECR repository {repository.name} has imageTag {image.latest_tag} scanned without findings" + if not image.scan_findings_status: + report.status = "FAIL" + report.status_extended = f"ECR repository {repository.name} has imageTag {image.latest_tag} without a scan" + elif image.scan_findings_status == "FAILED": + report.status = "FAIL" + report.status_extended = ( + f"ECR repository {repository.name} with scan status FAILED" + ) + elif image.scan_findings_status != "FAILED": + if image.scan_findings_severity_count and ( + image.scan_findings_severity_count.critical + or image.scan_findings_severity_count.high + or image.scan_findings_severity_count.medium + ): + report.status = "FAIL" + report.status_extended = f"ECR repository {repository.name} has imageTag {image.latest_tag} scanned with findings: CRITICAL->{image.scan_findings_severity_count.critical}, HIGH->{image.scan_findings_severity_count.high}, MEDIUM->{image.scan_findings_severity_count.medium} " + + findings.append(report) return findings diff --git a/prowler/providers/aws/services/ecr/ecr_service.py b/prowler/providers/aws/services/ecr/ecr_service.py index ab6ff0f8..ba15ce8e 100644 --- a/prowler/providers/aws/services/ecr/ecr_service.py +++ b/prowler/providers/aws/services/ecr/ecr_service.py @@ -1,4 +1,5 @@ import threading +from datetime import datetime from json import loads from typing import Optional @@ -17,14 +18,14 @@ class ECR: self.session = audit_info.audit_session self.audit_resources = audit_info.audit_resources self.regional_clients = generate_regional_clients(self.service, audit_info) - self.repositories = [] - self.registries = [] - self.__threading_call__(self.__describe_repositories__) - self.__describe_repository_policies__() - self.__get_image_details__() - self.__get_repository_lifecycle_policy__() + self.registry_id = audit_info.audited_account + self.registries = {} + self.__threading_call__(self.__describe_registries_and_repositories__) + self.__threading_call__(self.__describe_repository_policies__) + self.__threading_call__(self.__get_image_details__) + self.__threading_call__(self.__get_repository_lifecycle_policy__) self.__threading_call__(self.__get_registry_scanning_configuration__) - self.__list_tags_for_resource__() + self.__threading_call__(self.__list_tags_for_resource__) def __get_session__(self): return self.session @@ -38,8 +39,9 @@ class ECR: for t in threads: t.join() - def __describe_repositories__(self, regional_client): - logger.info("ECR - Describing repositories...") + def __describe_registries_and_repositories__(self, regional_client): + logger.info("ECR - Describing registries and repositories...") + regional_registry_repositories = [] try: describe_ecr_paginator = regional_client.get_paginator( "describe_repositories" @@ -51,126 +53,157 @@ class ECR: repository["repositoryArn"], self.audit_resources ) ): - self.repositories.append( + regional_registry_repositories.append( Repository( name=repository["repositoryName"], arn=repository["repositoryArn"], + registry_id=repository["registryId"], region=regional_client.region, scan_on_push=repository["imageScanningConfiguration"][ "scanOnPush" ], policy=None, images_details=[], - lyfecicle_policy=None, + lifecycle_policy=None, ) ) + # The default ECR registry is assumed + self.registries[regional_client.region] = Registry( + id=self.registry_id, + region=regional_client.region, + repositories=regional_registry_repositories, + ) + except Exception as error: logger.error( f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" ) - def __describe_repository_policies__(self): + def __describe_repository_policies__(self, regional_client): logger.info("ECR - Describing repository policies...") try: - for repository in self.repositories: - client = self.regional_clients[repository.region] - policy = client.get_repository_policy(repositoryName=repository.name) - if "policyText" in policy: - repository.policy = loads(policy["policyText"]) + if regional_client.region in self.registries: + for repository in self.registries[regional_client.region].repositories: + client = self.regional_clients[repository.region] + policy = client.get_repository_policy( + repositoryName=repository.name + ) + if "policyText" in policy: + repository.policy = loads(policy["policyText"]) except Exception as error: if "RepositoryPolicyNotFoundException" not in str(error): logger.error( - f"-- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" ) - def __get_repository_lifecycle_policy__(self): + def __get_repository_lifecycle_policy__(self, regional_client): logger.info("ECR - Getting repository lifecycle policy...") try: - for repository in self.repositories: - client = self.regional_clients[repository.region] - policy = client.get_lifecycle_policy(repositoryName=repository.name) - if "lifecyclePolicyText" in policy: - repository.lyfecicle_policy = policy["lifecyclePolicyText"] + if regional_client.region in self.registries: + for repository in self.registries[regional_client.region].repositories: + client = self.regional_clients[repository.region] + policy = client.get_lifecycle_policy(repositoryName=repository.name) + if "lifecyclePolicyText" in policy: + repository.lifecycle_policy = policy["lifecyclePolicyText"] except Exception as error: if "LifecyclePolicyNotFoundException" not in str(error): logger.error( - f"-- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" ) - def __get_image_details__(self): + def __get_image_details__(self, regional_client): logger.info("ECR - Getting images details...") try: - for repository in self.repositories: - # if the repo is not scanning pushed images there is nothing to do - if repository.scan_on_push: - client = self.regional_clients[repository.region] - describe_images_paginator = client.get_paginator("describe_images") - for page in describe_images_paginator.paginate( - repositoryName=repository.name - ): - for image in page["imageDetails"]: - severity_counts = None - last_scan_status = None - if "imageScanStatus" in image: - last_scan_status = image["imageScanStatus"]["status"] + if regional_client.region in self.registries: + for repository in self.registries[regional_client.region].repositories: + # There is nothing to do if the repository is not scanning pushed images + if repository.scan_on_push: + client = self.regional_clients[repository.region] + describe_images_paginator = client.get_paginator( + "describe_images" + ) + for page in describe_images_paginator.paginate( + registryId=self.registries[regional_client.region].id, + repositoryName=repository.name, + PaginationConfig={"PageSize": 1000}, + ): + for image in page["imageDetails"]: + # The following condition is required since sometimes + # the AWS ECR API returns None using the iterator + if image is not None: + severity_counts = None + last_scan_status = None + if "imageScanStatus" in image: + last_scan_status = image["imageScanStatus"][ + "status" + ] - if "imageScanFindingsSummary" in image: - severity_counts = FindingSeverityCounts( - critical=0, high=0, medium=0 - ) - finding_severity_counts = image[ - "imageScanFindingsSummary" - ]["findingSeverityCounts"] - if "CRITICAL" in finding_severity_counts: - severity_counts.critical = finding_severity_counts[ - "CRITICAL" - ] - if "HIGH" in finding_severity_counts: - severity_counts.high = finding_severity_counts[ - "HIGH" - ] - if "MEDIUM" in finding_severity_counts: - severity_counts.medium = finding_severity_counts[ - "MEDIUM" - ] - latest_tag = "None" - if image.get("imageTags"): - latest_tag = image["imageTags"][0] - repository.images_details.append( - ImageDetails( - latest_tag=latest_tag, - latest_digest=image["imageDigest"], - scan_findings_status=last_scan_status, - scan_findings_severity_count=severity_counts, - ) - ) + if "imageScanFindingsSummary" in image: + severity_counts = FindingSeverityCounts( + critical=0, high=0, medium=0 + ) + finding_severity_counts = image[ + "imageScanFindingsSummary" + ]["findingSeverityCounts"] + if "CRITICAL" in finding_severity_counts: + severity_counts.critical = ( + finding_severity_counts["CRITICAL"] + ) + if "HIGH" in finding_severity_counts: + severity_counts.high = ( + finding_severity_counts["HIGH"] + ) + if "MEDIUM" in finding_severity_counts: + severity_counts.medium = ( + finding_severity_counts["MEDIUM"] + ) + latest_tag = "None" + if image.get("imageTags"): + latest_tag = image["imageTags"][0] + repository.images_details.append( + ImageDetails( + latest_tag=latest_tag, + image_pushed_at=image["imagePushedAt"], + latest_digest=image["imageDigest"], + scan_findings_status=last_scan_status, + scan_findings_severity_count=severity_counts, + ) + ) + # Sort the repository images by date pushed + repository.images_details.sort( + key=lambda image: image.image_pushed_at + ) except Exception as error: logger.error( - f"-- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" ) - def __list_tags_for_resource__(self): + def __list_tags_for_resource__(self, regional_client): logger.info("ECR - List Tags...") try: - for repository in self.repositories: - try: - regional_client = self.regional_clients[repository.region] - response = regional_client.list_tags_for_resource( - resourceArn=repository.arn - )["tags"] - repository.tags = response + if regional_client.region in self.registries: + for repository in self.registries[regional_client.region].repositories: + try: + regional_client = self.regional_clients[repository.region] + response = regional_client.list_tags_for_resource( + resourceArn=repository.arn + )["tags"] + repository.tags = response - except ClientError as error: - if error.response["Error"]["Code"] == "RepositoryNotFoundException": - logger.warning( - f"{regional_client.region} --" - f" {error.__class__.__name__}[{error.__traceback__.tb_lineno}]:" - f" {error}" - ) - continue + except ClientError as error: + if ( + error.response["Error"]["Code"] + == "RepositoryNotFoundException" + ): + logger.warning( + f"{regional_client.region} --" + f" {error.__class__.__name__}[{error.__traceback__.tb_lineno}]:" + f" {error}" + ) + continue except Exception as error: logger.error( f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" @@ -179,25 +212,34 @@ class ECR: def __get_registry_scanning_configuration__(self, regional_client): logger.info("ECR - Getting Registry Scanning Configuration...") try: - response = regional_client.get_registry_scanning_configuration() - rules = [] - for rule in response.get("scanningConfiguration").get("rules", []): - rules.append( - ScanningRule( - scan_frequency=rule.get("scanFrequency"), - scan_filters=rule.get("repositoryFilters"), + if regional_client.region in self.registries: + response = regional_client.get_registry_scanning_configuration() + rules = [] + for rule in response.get("scanningConfiguration").get("rules", []): + rules.append( + ScanningRule( + scan_frequency=rule.get("scanFrequency"), + scan_filters=rule.get("repositoryFilters", []), + ) ) + + self.registries[regional_client.region].scan_type = response.get( + "scanningConfiguration" + ).get("scanType", "BASIC") + self.registries[regional_client.region].rules = rules + except ClientError as error: + if error.response["Error"][ + "Code" + ] == "ValidationException" and "GetRegistryScanningConfiguration operation: This feature is disabled" in str( + error + ): + self.registries[regional_client.region].scan_type = "BASIC" + self.registries[regional_client.region].rules = [] + else: + logger.error( + f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" ) - self.registries.append( - Registry( - id=response.get("registryId", ""), - scan_type=response.get("scanningConfiguration").get( - "scanType", "BASIC" - ), - region=regional_client.region, - rules=rules, - ) - ) + except Exception as error: logger.error( f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" @@ -213,6 +255,7 @@ class FindingSeverityCounts(BaseModel): class ImageDetails(BaseModel): latest_tag: str latest_digest: str + image_pushed_at: datetime scan_findings_status: Optional[str] scan_findings_severity_count: Optional[FindingSeverityCounts] @@ -221,10 +264,11 @@ class Repository(BaseModel): name: str arn: str region: str + registry_id = str scan_on_push: bool policy: Optional[dict] images_details: Optional[list[ImageDetails]] - lyfecicle_policy: Optional[str] + lifecycle_policy: Optional[str] tags: Optional[list] = [] @@ -236,6 +280,6 @@ class ScanningRule(BaseModel): class Registry(BaseModel): id: str region: str - scan_type: str - rules: list[ScanningRule] - tags: Optional[list] = [] + repositories: list[Repository] + scan_type: Optional[str] + rules: Optional[list[ScanningRule]] diff --git a/tests/providers/aws/services/ecr/ecr_registry_scan_images_on_push_enabled/ecr_registry_scan_images_on_push_enabled_test.py b/tests/providers/aws/services/ecr/ecr_registry_scan_images_on_push_enabled/ecr_registry_scan_images_on_push_enabled_test.py index d9e149d3..10d9de36 100644 --- a/tests/providers/aws/services/ecr/ecr_registry_scan_images_on_push_enabled/ecr_registry_scan_images_on_push_enabled_test.py +++ b/tests/providers/aws/services/ecr/ecr_registry_scan_images_on_push_enabled/ecr_registry_scan_images_on_push_enabled_test.py @@ -1,17 +1,25 @@ from re import search from unittest import mock -from prowler.providers.aws.services.ecr.ecr_service import Registry, ScanningRule +from prowler.providers.aws.services.ecr.ecr_service import ( + Registry, + Repository, + ScanningRule, +) # Mock Test Region AWS_REGION = "eu-west-1" AWS_ACCOUNT_NUMBER = "123456789012" +repository_name = "test_repo" +repository_arn = ( + f"arn:aws:ecr:eu-west-1:{AWS_ACCOUNT_NUMBER}:repository/{repository_name}" +) class Test_ecr_registry_scan_images_on_push_enabled: def test_no_registries(self): ecr_client = mock.MagicMock - ecr_client.registries = [] + ecr_client.registries = {} with mock.patch( "prowler.providers.aws.services.ecr.ecr_service.ECR", @@ -25,21 +33,53 @@ class Test_ecr_registry_scan_images_on_push_enabled: result = check.execute() assert len(result) == 0 - def test_scan_on_push_enabled(self): + def test_registry_no_repositories(self): ecr_client = mock.MagicMock - ecr_client.registries = [] - ecr_client.registries.append( - Registry( - id=AWS_ACCOUNT_NUMBER, - region=AWS_REGION, - scan_type="BASIC", - rules=[ - ScanningRule( - scan_frequency="SCAN_ON_PUSH", - scan_filters=[{"filter": "*", "filterType": "WILDCARD"}], - ) - ], + ecr_client.registries = {} + ecr_client.registries[AWS_REGION] = Registry( + id=AWS_ACCOUNT_NUMBER, + region=AWS_REGION, + scan_type="BASIC", + repositories=[], + rules=[], + ) + + with mock.patch( + "prowler.providers.aws.services.ecr.ecr_service.ECR", + ecr_client, + ): + from prowler.providers.aws.services.ecr.ecr_registry_scan_images_on_push_enabled.ecr_registry_scan_images_on_push_enabled import ( + ecr_registry_scan_images_on_push_enabled, ) + + check = ecr_registry_scan_images_on_push_enabled() + result = check.execute() + assert len(result) == 0 + + def test_registry_scan_on_push_enabled(self): + ecr_client = mock.MagicMock + ecr_client.registries = {} + ecr_client.registries[AWS_REGION] = Registry( + id=AWS_ACCOUNT_NUMBER, + region=AWS_REGION, + scan_type="BASIC", + repositories=[ + Repository( + name=repository_name, + arn=repository_arn, + region=AWS_REGION, + scan_on_push=True, + policy="", + images_details=None, + lifecycle_policy="", + ) + ], + rules=[ + ScanningRule( + scan_frequency="SCAN_ON_PUSH", + scan_filters=[{"filter": "*", "filterType": "WILDCARD"}], + ) + ], ) with mock.patch( @@ -60,19 +100,28 @@ class Test_ecr_registry_scan_images_on_push_enabled: def test_scan_on_push_enabled_with_filters(self): ecr_client = mock.MagicMock - ecr_client.registries = [] - ecr_client.registries.append( - Registry( - id=AWS_ACCOUNT_NUMBER, - region=AWS_REGION, - scan_type="BASIC", - rules=[ - ScanningRule( - scan_frequency="SCAN_ON_PUSH", - scan_filters=[{"filter": "test", "filterType": "WILDCARD"}], - ) - ], - ) + ecr_client.registries = {} + ecr_client.registries[AWS_REGION] = Registry( + id=AWS_ACCOUNT_NUMBER, + region=AWS_REGION, + scan_type="BASIC", + repositories=[ + Repository( + name=repository_name, + arn=repository_arn, + region=AWS_REGION, + scan_on_push=True, + policy="", + images_details=None, + lifecycle_policy="", + ) + ], + rules=[ + ScanningRule( + scan_frequency="SCAN_ON_PUSH", + scan_filters=[{"filter": "test", "filterType": "WILDCARD"}], + ) + ], ) with mock.patch( @@ -96,14 +145,23 @@ class Test_ecr_registry_scan_images_on_push_enabled: def test_scan_on_push_disabled(self): ecr_client = mock.MagicMock - ecr_client.registries = [] - ecr_client.registries.append( - Registry( - id=AWS_ACCOUNT_NUMBER, - region=AWS_REGION, - scan_type="BASIC", - rules=[], - ) + ecr_client.registries = {} + ecr_client.registries[AWS_REGION] = Registry( + id=AWS_ACCOUNT_NUMBER, + region=AWS_REGION, + scan_type="BASIC", + repositories=[ + Repository( + name=repository_name, + arn=repository_arn, + region=AWS_REGION, + scan_on_push=True, + policy="", + images_details=None, + lifecycle_policy="", + ) + ], + rules=[], ) with mock.patch( diff --git a/tests/providers/aws/services/ecr/ecr_repositories_lifecycle_policy_enabled/ecr_repositories_lifecycle_policy_enabled_test.py b/tests/providers/aws/services/ecr/ecr_repositories_lifecycle_policy_enabled/ecr_repositories_lifecycle_policy_enabled_test.py index 50d914fd..87e4f070 100644 --- a/tests/providers/aws/services/ecr/ecr_repositories_lifecycle_policy_enabled/ecr_repositories_lifecycle_policy_enabled_test.py +++ b/tests/providers/aws/services/ecr/ecr_repositories_lifecycle_policy_enabled/ecr_repositories_lifecycle_policy_enabled_test.py @@ -1,7 +1,6 @@ -from re import search from unittest import mock -from prowler.providers.aws.services.ecr.ecr_service import Repository +from prowler.providers.aws.services.ecr.ecr_service import Registry, Repository # Mock Test Region AWS_REGION = "eu-west-1" @@ -24,19 +23,64 @@ repo_policy_public = { class Test_ecr_repositories_lifecycle_policy_enabled: - def test_no_lyfecicle_policy(self): + def test_no_registries(self): ecr_client = mock.MagicMock - ecr_client.repositories = [] - ecr_client.repositories.append( - Repository( - name=repository_name, - arn=repository_arn, - region=AWS_REGION, - scan_on_push=True, - policy=repo_policy_public, - images_details=None, - lyfecicle_policy="test-policy", + ecr_client.registries = {} + + with mock.patch( + "prowler.providers.aws.services.ecr.ecr_service.ECR", + ecr_client, + ): + from prowler.providers.aws.services.ecr.ecr_repositories_lifecycle_policy_enabled.ecr_repositories_lifecycle_policy_enabled import ( + ecr_repositories_lifecycle_policy_enabled, ) + + check = ecr_repositories_lifecycle_policy_enabled() + result = check.execute() + assert len(result) == 0 + + def test_registry_no_repositories(self): + ecr_client = mock.MagicMock + ecr_client.registries = {} + ecr_client.registries[AWS_REGION] = Registry( + id=AWS_ACCOUNT_NUMBER, + region=AWS_REGION, + scan_type="BASIC", + repositories=[], + rules=[], + ) + + with mock.patch( + "prowler.providers.aws.services.ecr.ecr_service.ECR", + ecr_client, + ): + from prowler.providers.aws.services.ecr.ecr_repositories_lifecycle_policy_enabled.ecr_repositories_lifecycle_policy_enabled import ( + ecr_repositories_lifecycle_policy_enabled, + ) + + check = ecr_repositories_lifecycle_policy_enabled() + result = check.execute() + assert len(result) == 0 + + def test_lifecycle_policy(self): + ecr_client = mock.MagicMock + ecr_client.registries = {} + ecr_client.registries[AWS_REGION] = Registry( + id=AWS_ACCOUNT_NUMBER, + region=AWS_REGION, + scan_type="BASIC", + rules=[], + repositories=[ + Repository( + name=repository_name, + arn=repository_arn, + region=AWS_REGION, + scan_on_push=True, + policy=repo_policy_public, + images_details=None, + lifecycle_policy="test-policy", + ) + ], ) with mock.patch( @@ -51,23 +95,33 @@ class Test_ecr_repositories_lifecycle_policy_enabled: result = check.execute() assert len(result) == 1 assert result[0].status == "PASS" - assert search("has lifecycle policy", result[0].status_extended) + assert ( + result[0].status_extended + == f"Repository {repository_name} has a lifecycle policy configured" + ) assert result[0].resource_id == repository_name assert result[0].resource_arn == repository_arn + assert result[0].resource_tags == [] - def test_lifecycle_policy(self): + def test_no_lifecycle_policy(self): ecr_client = mock.MagicMock - ecr_client.repositories = [] - ecr_client.repositories.append( - Repository( - name=repository_name, - arn=repository_arn, - region=AWS_REGION, - scan_on_push=False, - policy=repo_policy_public, - images_details=None, - lyfecicle_policy=None, - ) + ecr_client.registries = {} + ecr_client.registries[AWS_REGION] = Registry( + id=AWS_ACCOUNT_NUMBER, + region=AWS_REGION, + scan_type="BASIC", + rules=[], + repositories=[ + Repository( + name=repository_name, + arn=repository_arn, + region=AWS_REGION, + scan_on_push=False, + policy=repo_policy_public, + images_details=None, + lifecycle_policy=None, + ) + ], ) with mock.patch( @@ -82,6 +136,10 @@ class Test_ecr_repositories_lifecycle_policy_enabled: result = check.execute() assert len(result) == 1 assert result[0].status == "FAIL" - assert search("has no lifecycle policy", result[0].status_extended) + assert ( + result[0].status_extended + == f"Repository {repository_name} has not a lifecycle policy configured" + ) assert result[0].resource_id == repository_name assert result[0].resource_arn == repository_arn + assert result[0].resource_tags == [] diff --git a/tests/providers/aws/services/ecr/ecr_repositories_not_publicly_accessible/ecr_repositories_not_publicly_accessible_test.py b/tests/providers/aws/services/ecr/ecr_repositories_not_publicly_accessible/ecr_repositories_not_publicly_accessible_test.py index a9f0636c..86eebc4c 100644 --- a/tests/providers/aws/services/ecr/ecr_repositories_not_publicly_accessible/ecr_repositories_not_publicly_accessible_test.py +++ b/tests/providers/aws/services/ecr/ecr_repositories_not_publicly_accessible/ecr_repositories_not_publicly_accessible_test.py @@ -1,7 +1,6 @@ -from re import search from unittest import mock -from prowler.providers.aws.services.ecr.ecr_service import Repository +from prowler.providers.aws.services.ecr.ecr_service import Registry, Repository # Mock Test Region AWS_REGION = "eu-west-1" @@ -36,19 +35,64 @@ repo_policy_public = { class Test_ecr_repositories_not_publicly_accessible: + def test_no_registries(self): + ecr_client = mock.MagicMock + ecr_client.registries = {} + + with mock.patch( + "prowler.providers.aws.services.ecr.ecr_service.ECR", + ecr_client, + ): + from prowler.providers.aws.services.ecr.ecr_repositories_not_publicly_accessible.ecr_repositories_not_publicly_accessible import ( + ecr_repositories_not_publicly_accessible, + ) + + check = ecr_repositories_not_publicly_accessible() + result = check.execute() + assert len(result) == 0 + + def test_registry_no_repositories(self): + ecr_client = mock.MagicMock + ecr_client.registries = {} + ecr_client.registries[AWS_REGION] = Registry( + id=AWS_ACCOUNT_NUMBER, + region=AWS_REGION, + scan_type="BASIC", + repositories=[], + rules=[], + ) + + with mock.patch( + "prowler.providers.aws.services.ecr.ecr_service.ECR", + ecr_client, + ): + from prowler.providers.aws.services.ecr.ecr_repositories_not_publicly_accessible.ecr_repositories_not_publicly_accessible import ( + ecr_repositories_not_publicly_accessible, + ) + + check = ecr_repositories_not_publicly_accessible() + result = check.execute() + assert len(result) == 0 + def test_repository_not_public(self): ecr_client = mock.MagicMock - ecr_client.repositories = [] - ecr_client.repositories.append( - Repository( - name=repository_name, - arn=repository_arn, - region=AWS_REGION, - scan_on_push=True, - policy=repo_policy_not_public, - images_details=None, - lyfecicle_policy=None, - ) + ecr_client.registries = {} + ecr_client.registries[AWS_REGION] = Registry( + id=AWS_ACCOUNT_NUMBER, + region=AWS_REGION, + scan_type="BASIC", + repositories=[ + Repository( + name=repository_name, + arn=repository_arn, + region=AWS_REGION, + scan_on_push=True, + policy=repo_policy_not_public, + images_details=None, + lifecycle_policy=None, + ) + ], + rules=[], ) with mock.patch( @@ -63,23 +107,32 @@ class Test_ecr_repositories_not_publicly_accessible: result = check.execute() assert len(result) == 1 assert result[0].status == "PASS" - assert search("is not open", result[0].status_extended) + assert ( + result[0].status_extended + == f"Repository {repository_name} is not publicly accesible" + ) assert result[0].resource_id == repository_name assert result[0].resource_arn == repository_arn def test_repository_public(self): ecr_client = mock.MagicMock - ecr_client.repositories = [] - ecr_client.repositories.append( - Repository( - name=repository_name, - arn=repository_arn, - region=AWS_REGION, - scan_on_push=True, - policy=repo_policy_public, - images_details=None, - lyfecicle_policy=None, - ) + ecr_client.registries = {} + ecr_client.registries[AWS_REGION] = Registry( + id=AWS_ACCOUNT_NUMBER, + region=AWS_REGION, + scan_type="BASIC", + repositories=[ + Repository( + name=repository_name, + arn=repository_arn, + region=AWS_REGION, + scan_on_push=True, + policy=repo_policy_public, + images_details=None, + lifecycle_policy=None, + ) + ], + rules=[], ) with mock.patch( @@ -94,8 +147,9 @@ class Test_ecr_repositories_not_publicly_accessible: result = check.execute() assert len(result) == 1 assert result[0].status == "FAIL" - assert search( - "policy may allow anonymous users to", result[0].status_extended + assert ( + result[0].status_extended + == f"Repository {repository_name} policy may allow anonymous users to perform actions (Principal: '*')" ) assert result[0].resource_id == repository_name assert result[0].resource_arn == repository_arn diff --git a/tests/providers/aws/services/ecr/ecr_repositories_scan_images_on_push_enabled/ecr_repositories_scan_images_on_push_enabled_test.py b/tests/providers/aws/services/ecr/ecr_repositories_scan_images_on_push_enabled/ecr_repositories_scan_images_on_push_enabled_test.py index b6f63f65..16055c66 100644 --- a/tests/providers/aws/services/ecr/ecr_repositories_scan_images_on_push_enabled/ecr_repositories_scan_images_on_push_enabled_test.py +++ b/tests/providers/aws/services/ecr/ecr_repositories_scan_images_on_push_enabled/ecr_repositories_scan_images_on_push_enabled_test.py @@ -1,7 +1,6 @@ -from re import search from unittest import mock -from prowler.providers.aws.services.ecr.ecr_service import Repository +from prowler.providers.aws.services.ecr.ecr_service import Registry, Repository # Mock Test Region AWS_REGION = "eu-west-1" @@ -24,19 +23,64 @@ repo_policy_public = { class Test_ecr_repositories_scan_images_on_push_enabled: + def test_no_registries(self): + ecr_client = mock.MagicMock + ecr_client.registries = {} + + with mock.patch( + "prowler.providers.aws.services.ecr.ecr_service.ECR", + ecr_client, + ): + from prowler.providers.aws.services.ecr.ecr_repositories_scan_images_on_push_enabled.ecr_repositories_scan_images_on_push_enabled import ( + ecr_repositories_scan_images_on_push_enabled, + ) + + check = ecr_repositories_scan_images_on_push_enabled() + result = check.execute() + assert len(result) == 0 + + def test_registry_no_repositories(self): + ecr_client = mock.MagicMock + ecr_client.registries = {} + ecr_client.registries[AWS_REGION] = Registry( + id=AWS_ACCOUNT_NUMBER, + region=AWS_REGION, + scan_type="BASIC", + repositories=[], + rules=[], + ) + + with mock.patch( + "prowler.providers.aws.services.ecr.ecr_service.ECR", + ecr_client, + ): + from prowler.providers.aws.services.ecr.ecr_repositories_scan_images_on_push_enabled.ecr_repositories_scan_images_on_push_enabled import ( + ecr_repositories_scan_images_on_push_enabled, + ) + + check = ecr_repositories_scan_images_on_push_enabled() + result = check.execute() + assert len(result) == 0 + def test_scan_on_push_disabled(self): ecr_client = mock.MagicMock - ecr_client.repositories = [] - ecr_client.repositories.append( - Repository( - name=repository_name, - arn=repository_arn, - region=AWS_REGION, - scan_on_push=True, - policy=repo_policy_public, - images_details=None, - lyfecicle_policy=None, - ) + ecr_client.registries = {} + ecr_client.registries[AWS_REGION] = Registry( + id=AWS_ACCOUNT_NUMBER, + region=AWS_REGION, + scan_type="BASIC", + repositories=[ + Repository( + name=repository_name, + arn=repository_arn, + region=AWS_REGION, + scan_on_push=True, + policy=repo_policy_public, + images_details=None, + lifecycle_policy=None, + ) + ], + rules=[], ) with mock.patch( @@ -51,23 +95,32 @@ class Test_ecr_repositories_scan_images_on_push_enabled: result = check.execute() assert len(result) == 1 assert result[0].status == "PASS" - assert search("has scan on push enabled", result[0].status_extended) + assert ( + result[0].status_extended + == f"ECR repository {repository_name} has scan on push enabled" + ) assert result[0].resource_id == repository_name assert result[0].resource_arn == repository_arn def test_scan_on_push_enabled(self): ecr_client = mock.MagicMock - ecr_client.repositories = [] - ecr_client.repositories.append( - Repository( - name=repository_name, - arn=repository_arn, - region=AWS_REGION, - scan_on_push=False, - policy=repo_policy_public, - images_details=None, - lyfecicle_policy=None, - ) + ecr_client.registries = {} + ecr_client.registries[AWS_REGION] = Registry( + id=AWS_ACCOUNT_NUMBER, + region=AWS_REGION, + scan_type="BASIC", + repositories=[ + Repository( + name=repository_name, + arn=repository_arn, + region=AWS_REGION, + scan_on_push=False, + policy=repo_policy_public, + images_details=None, + lifecycle_policy=None, + ) + ], + rules=[], ) with mock.patch( @@ -82,6 +135,9 @@ class Test_ecr_repositories_scan_images_on_push_enabled: result = check.execute() assert len(result) == 1 assert result[0].status == "FAIL" - assert search("has scan on push disabled", result[0].status_extended) + assert ( + result[0].status_extended + == f"ECR repository {repository_name} has scan on push disabled" + ) assert result[0].resource_id == repository_name assert result[0].resource_arn == repository_arn diff --git a/tests/providers/aws/services/ecr/ecr_repositories_scan_vulnerabilities_in_latest_image/ecr_repositories_scan_vulnerabilities_in_latest_image_test.py b/tests/providers/aws/services/ecr/ecr_repositories_scan_vulnerabilities_in_latest_image/ecr_repositories_scan_vulnerabilities_in_latest_image_test.py index bfe3f88e..ecda0482 100644 --- a/tests/providers/aws/services/ecr/ecr_repositories_scan_vulnerabilities_in_latest_image/ecr_repositories_scan_vulnerabilities_in_latest_image_test.py +++ b/tests/providers/aws/services/ecr/ecr_repositories_scan_vulnerabilities_in_latest_image/ecr_repositories_scan_vulnerabilities_in_latest_image_test.py @@ -1,9 +1,11 @@ +from datetime import datetime from re import search from unittest import mock from prowler.providers.aws.services.ecr.ecr_service import ( FindingSeverityCounts, ImageDetails, + Registry, Repository, ) @@ -28,20 +30,66 @@ repo_policy_public = { class Test_ecr_repositories_scan_vulnerabilities_in_latest_image: + def test_no_registries(self): + ecr_client = mock.MagicMock + ecr_client.registries = {} + + with mock.patch( + "prowler.providers.aws.services.ecr.ecr_service.ECR", + ecr_client, + ): + from prowler.providers.aws.services.ecr.ecr_repositories_scan_vulnerabilities_in_latest_image.ecr_repositories_scan_vulnerabilities_in_latest_image import ( + ecr_repositories_scan_vulnerabilities_in_latest_image, + ) + + check = ecr_repositories_scan_vulnerabilities_in_latest_image() + result = check.execute() + assert len(result) == 0 + + def test_registry_no_repositories(self): + ecr_client = mock.MagicMock + ecr_client.registries = {} + ecr_client.registries[AWS_REGION] = Registry( + id=AWS_ACCOUNT_NUMBER, + region=AWS_REGION, + scan_type="BASIC", + repositories=[], + rules=[], + ) + + with mock.patch( + "prowler.providers.aws.services.ecr.ecr_service.ECR", + ecr_client, + ): + from prowler.providers.aws.services.ecr.ecr_repositories_scan_vulnerabilities_in_latest_image.ecr_repositories_scan_vulnerabilities_in_latest_image import ( + ecr_repositories_scan_vulnerabilities_in_latest_image, + ) + + check = ecr_repositories_scan_vulnerabilities_in_latest_image() + result = check.execute() + assert len(result) == 0 + def test_empty_repository(self): ecr_client = mock.MagicMock - ecr_client.repositories = [] - ecr_client.repositories.append( - Repository( - name=repository_name, - arn=repository_arn, - region=AWS_REGION, - scan_on_push=True, - policy=repo_policy_public, - images_details=[], - lyfecicle_policy=None, - ) + ecr_client.registries = {} + ecr_client.registries[AWS_REGION] = Registry( + id=AWS_ACCOUNT_NUMBER, + region=AWS_REGION, + scan_type="BASIC", + repositories=[ + Repository( + name=repository_name, + arn=repository_arn, + region=AWS_REGION, + scan_on_push=True, + policy=repo_policy_public, + images_details=[], + lifecycle_policy=None, + ) + ], + rules=[], ) + with mock.patch( "prowler.providers.aws.services.ecr.ecr_service.ECR", ecr_client, @@ -56,28 +104,35 @@ class Test_ecr_repositories_scan_vulnerabilities_in_latest_image: def test_image_scaned_without_findings(self): ecr_client = mock.MagicMock - ecr_client.repositories = [] - ecr_client.repositories.append( - Repository( - name=repository_name, - arn=repository_arn, - region=AWS_REGION, - scan_on_push=True, - policy=repo_policy_public, - images_details=[], - lyfecicle_policy=None, - ) + ecr_client.registries = {} + ecr_client.registries[AWS_REGION] = Registry( + id=AWS_ACCOUNT_NUMBER, + region=AWS_REGION, + scan_type="BASIC", + repositories=[ + Repository( + name=repository_name, + arn=repository_arn, + region=AWS_REGION, + scan_on_push=True, + policy=repo_policy_public, + images_details=[ + ImageDetails( + latest_tag="test-tag", + latest_digest="test-digest", + image_pushed_at=datetime(2023, 1, 1), + scan_findings_status="COMPLETE", + scan_findings_severity_count=FindingSeverityCounts( + critical=0, high=0, medium=0 + ), + ), + ], + lifecycle_policy=None, + ) + ], + rules=[], ) - ecr_client.repositories[0].images_details.append( - ImageDetails( - latest_tag="test-tag", - latest_digest="test-digest", - scan_findings_status="COMPLETE", - scan_findings_severity_count=FindingSeverityCounts( - critical=0, high=0, medium=0 - ), - ), - ), + with mock.patch( "prowler.providers.aws.services.ecr.ecr_service.ECR", ecr_client, @@ -96,28 +151,35 @@ class Test_ecr_repositories_scan_vulnerabilities_in_latest_image: def test_image_scanned_with_findings(self): ecr_client = mock.MagicMock - ecr_client.repositories = [] - ecr_client.repositories.append( - Repository( - name=repository_name, - arn=repository_arn, - region=AWS_REGION, - scan_on_push=True, - policy=repo_policy_public, - images_details=[], - lyfecicle_policy=None, - ) + ecr_client.registries = {} + ecr_client.registries[AWS_REGION] = Registry( + id=AWS_ACCOUNT_NUMBER, + region=AWS_REGION, + scan_type="BASIC", + repositories=[ + Repository( + name=repository_name, + arn=repository_arn, + region=AWS_REGION, + scan_on_push=True, + policy=repo_policy_public, + images_details=[ + ImageDetails( + latest_tag="test-tag", + latest_digest="test-digest", + image_pushed_at=datetime(2023, 1, 1), + scan_findings_status="COMPLETE", + scan_findings_severity_count=FindingSeverityCounts( + critical=12, high=34, medium=7 + ), + ) + ], + lifecycle_policy=None, + ) + ], + rules=[], ) - ecr_client.repositories[0].images_details.append( - ImageDetails( - latest_tag="test-tag", - latest_digest="test-digest", - scan_findings_status="COMPLETE", - scan_findings_severity_count=FindingSeverityCounts( - critical=12, high=34, medium=7 - ), - ), - ), + with mock.patch( "prowler.providers.aws.services.ecr.ecr_service.ECR", ecr_client, @@ -136,28 +198,35 @@ class Test_ecr_repositories_scan_vulnerabilities_in_latest_image: def test_image_scanned_fail_scan(self): ecr_client = mock.MagicMock - ecr_client.repositories = [] - ecr_client.repositories.append( - Repository( - name=repository_name, - arn=repository_arn, - region=AWS_REGION, - scan_on_push=True, - policy=repo_policy_public, - images_details=[], - lyfecicle_policy=None, - ) + ecr_client.registries = {} + ecr_client.registries[AWS_REGION] = Registry( + id=AWS_ACCOUNT_NUMBER, + region=AWS_REGION, + scan_type="BASIC", + repositories=[ + Repository( + name=repository_name, + arn=repository_arn, + region=AWS_REGION, + scan_on_push=True, + policy=repo_policy_public, + images_details=[ + ImageDetails( + latest_tag="test-tag", + latest_digest="test-digest", + image_pushed_at=datetime(2023, 1, 1), + scan_findings_status="FAILED", + scan_findings_severity_count=FindingSeverityCounts( + critical=0, high=0, medium=0 + ), + ) + ], + lifecycle_policy=None, + ) + ], + rules=[], ) - ecr_client.repositories[0].images_details.append( - ImageDetails( - latest_tag="test-tag", - latest_digest="test-digest", - scan_findings_status="FAILED", - scan_findings_severity_count=FindingSeverityCounts( - critical=0, high=0, medium=0 - ), - ), - ), + with mock.patch( "prowler.providers.aws.services.ecr.ecr_service.ECR", ecr_client, @@ -176,28 +245,35 @@ class Test_ecr_repositories_scan_vulnerabilities_in_latest_image: def test_image_not_scanned(self): ecr_client = mock.MagicMock - ecr_client.repositories = [] - ecr_client.repositories.append( - Repository( - name=repository_name, - arn=repository_arn, - region=AWS_REGION, - scan_on_push=True, - policy=repo_policy_public, - images_details=[], - lyfecicle_policy=None, - ) + ecr_client.registries = {} + ecr_client.registries[AWS_REGION] = Registry( + id=AWS_ACCOUNT_NUMBER, + region=AWS_REGION, + scan_type="BASIC", + repositories=[ + Repository( + name=repository_name, + arn=repository_arn, + region=AWS_REGION, + scan_on_push=True, + policy=repo_policy_public, + images_details=[ + ImageDetails( + latest_tag="test-tag", + latest_digest="test-digest", + image_pushed_at=datetime(2023, 1, 1), + scan_findings_status="", + scan_findings_severity_count=FindingSeverityCounts( + critical=0, high=0, medium=0 + ), + ) + ], + lifecycle_policy=None, + ) + ], + rules=[], ) - ecr_client.repositories[0].images_details.append( - ImageDetails( - latest_tag="test-tag", - latest_digest="test-digest", - scan_findings_status="", - scan_findings_severity_count=FindingSeverityCounts( - critical=0, high=0, medium=0 - ), - ), - ), + with mock.patch( "prowler.providers.aws.services.ecr.ecr_service.ECR", ecr_client, diff --git a/tests/providers/aws/services/ecr/ecr_service_test.py b/tests/providers/aws/services/ecr/ecr_service_test.py index e9b0d9f3..2a4b98b4 100644 --- a/tests/providers/aws/services/ecr/ecr_service_test.py +++ b/tests/providers/aws/services/ecr/ecr_service_test.py @@ -1,3 +1,4 @@ +from datetime import datetime from unittest.mock import patch import botocore @@ -24,8 +25,9 @@ def mock_make_api_call(self, operation_name, kwarg): { "imageDigest": "sha256:d8868e50ac4c7104d2200d42f432b661b2da8c1e417ccfae217e6a1e04bb9295", "imageTags": [ - "test-tag", + "test-tag1", ], + "imagePushedAt": datetime(2023, 1, 1), "imageScanStatus": { "status": "COMPLETE", }, @@ -38,6 +40,13 @@ def mock_make_api_call(self, operation_name, kwarg): "imageTags": [ "test-tag2", ], + "imagePushedAt": datetime(2023, 1, 2), + "imageScanStatus": { + "status": "COMPLETE", + }, + "imageScanFindingsSummary": { + "findingSeverityCounts": {"CRITICAL": 1, "HIGH": 2, "MEDIUM": 3} + }, }, ], } @@ -68,6 +77,7 @@ def mock_make_api_call(self, operation_name, kwarg): ], }, } + return make_api_call(self, operation_name, kwarg) @@ -128,7 +138,7 @@ class Test_ECR_Service: # Test describe ECR repositories @mock_ecr - def test__describe_repositories__(self): + def test__describe_registries_and_repositories__(self): ecr_client = client("ecr", region_name=AWS_REGION) ecr_client.create_repository( repositoryName=repo_name, @@ -139,11 +149,16 @@ class Test_ECR_Service: ) audit_info = self.set_mocked_audit_info() ecr = ECR(audit_info) - assert len(ecr.repositories) == 1 - assert ecr.repositories[0].name == repo_name - assert ecr.repositories[0].arn == repo_arn - assert ecr.repositories[0].scan_on_push - assert ecr.repositories[0].tags == [ + + assert len(ecr.registries) == 1 + assert ecr.registries[AWS_REGION].id == AWS_ACCOUNT_NUMBER + assert ecr.registries[AWS_REGION].region == AWS_REGION + assert len(ecr.registries[AWS_REGION].repositories) == 1 + + assert ecr.registries[AWS_REGION].repositories[0].name == repo_name + assert ecr.registries[AWS_REGION].repositories[0].arn == repo_arn + assert ecr.registries[AWS_REGION].repositories[0].scan_on_push + assert ecr.registries[AWS_REGION].repositories[0].tags == [ {"Key": "test", "Value": "test"}, ] @@ -157,28 +172,39 @@ class Test_ECR_Service: ) audit_info = self.set_mocked_audit_info() ecr = ECR(audit_info) - assert len(ecr.repositories) == 1 - assert ecr.repositories[0].name == repo_name - assert ecr.repositories[0].arn == repo_arn - assert ecr.repositories[0].scan_on_push + assert len(ecr.registries) == 1 + assert len(ecr.registries[AWS_REGION].repositories) == 1 + assert ecr.registries[AWS_REGION].repositories[0].name == repo_name + assert ecr.registries[AWS_REGION].repositories[0].arn == repo_arn + assert ecr.registries[AWS_REGION].repositories[0].scan_on_push assert ( - ecr.repositories[0].policy["Statement"][0]["Sid"] == "Allow Describe Images" + ecr.registries[AWS_REGION].repositories[0].policy["Statement"][0]["Sid"] + == "Allow Describe Images" ) - assert ecr.repositories[0].policy["Statement"][0]["Effect"] == "Allow" assert ( - ecr.repositories[0].policy["Statement"][0]["Principal"]["AWS"][0] + ecr.registries[AWS_REGION].repositories[0].policy["Statement"][0]["Effect"] + == "Allow" + ) + assert ( + ecr.registries[AWS_REGION] + .repositories[0] + .policy["Statement"][0]["Principal"]["AWS"][0] == f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:root" ) assert ( - ecr.repositories[0].policy["Statement"][0]["Action"][0] + ecr.registries[AWS_REGION] + .repositories[0] + .policy["Statement"][0]["Action"][0] == "ecr:DescribeImages" ) assert ( - ecr.repositories[0].policy["Statement"][0]["Action"][1] + ecr.registries[AWS_REGION] + .repositories[0] + .policy["Statement"][0]["Action"][1] == "ecr:DescribeRepositories" ) - # Test describe ECR repository policies + # Test describe ECR repository lifecycle policies @mock_ecr def test__get_lifecycle_policies__(self): ecr_client = client("ecr", region_name=AWS_REGION) @@ -188,11 +214,12 @@ class Test_ECR_Service: ) audit_info = self.set_mocked_audit_info() ecr = ECR(audit_info) - assert len(ecr.repositories) == 1 - assert ecr.repositories[0].name == repo_name - assert ecr.repositories[0].arn == repo_arn - assert ecr.repositories[0].scan_on_push - assert ecr.repositories[0].lyfecicle_policy + assert len(ecr.registries) == 1 + assert len(ecr.registries[AWS_REGION].repositories) == 1 + assert ecr.registries[AWS_REGION].repositories[0].name == repo_name + assert ecr.registries[AWS_REGION].repositories[0].arn == repo_arn + assert ecr.registries[AWS_REGION].repositories[0].scan_on_push + assert ecr.registries[AWS_REGION].repositories[0].lifecycle_policy # Test get image details @mock_ecr @@ -204,45 +231,103 @@ class Test_ECR_Service: ) audit_info = self.set_mocked_audit_info() ecr = ECR(audit_info) - assert len(ecr.repositories) == 1 - assert ecr.repositories[0].name == repo_name - assert ecr.repositories[0].arn == repo_arn - assert ecr.repositories[0].scan_on_push - assert len(ecr.repositories[0].images_details) == 2 - assert ecr.repositories[0].images_details[0].latest_tag == "test-tag" + assert len(ecr.registries) == 1 + assert len(ecr.registries[AWS_REGION].repositories) == 1 + assert ecr.registries[AWS_REGION].repositories[0].name == repo_name + assert ecr.registries[AWS_REGION].repositories[0].arn == repo_arn + assert ecr.registries[AWS_REGION].repositories[0].scan_on_push + assert len(ecr.registries[AWS_REGION].repositories[0].images_details) == 2 + # First image pushed + assert ecr.registries[AWS_REGION].repositories[0].images_details[ + 0 + ].image_pushed_at == datetime(2023, 1, 1) assert ( - ecr.repositories[0].images_details[0].latest_digest + ecr.registries[AWS_REGION].repositories[0].images_details[0].latest_tag + == "test-tag1" + ) + assert ( + ecr.registries[AWS_REGION].repositories[0].images_details[0].latest_digest == "sha256:d8868e50ac4c7104d2200d42f432b661b2da8c1e417ccfae217e6a1e04bb9295" ) - assert ecr.repositories[0].images_details[0].scan_findings_status == "COMPLETE" assert ( - ecr.repositories[0].images_details[0].scan_findings_severity_count.critical + ecr.registries[AWS_REGION] + .repositories[0] + .images_details[0] + .scan_findings_status + == "COMPLETE" + ) + assert ( + ecr.registries[AWS_REGION] + .repositories[0] + .images_details[0] + .scan_findings_severity_count.critical == 1 ) assert ( - ecr.repositories[0].images_details[0].scan_findings_severity_count.high == 2 + ecr.registries[AWS_REGION] + .repositories[0] + .images_details[0] + .scan_findings_severity_count.high + == 2 ) assert ( - ecr.repositories[0].images_details[0].scan_findings_severity_count.medium + ecr.registries[AWS_REGION] + .repositories[0] + .images_details[0] + .scan_findings_severity_count.medium == 3 ) - assert ecr.repositories[0].images_details[1].latest_tag == "test-tag2" + + # Second image pushed + assert ecr.registries[AWS_REGION].repositories[0].images_details[ + 1 + ].image_pushed_at == datetime(2023, 1, 2) assert ( - ecr.repositories[0].images_details[1].latest_digest + ecr.registries[AWS_REGION].repositories[0].images_details[1].latest_tag + == "test-tag2" + ) + assert ( + ecr.registries[AWS_REGION].repositories[0].images_details[1].latest_digest == "sha256:83251ac64627fc331584f6c498b3aba5badc01574e2c70b2499af3af16630eed" ) - assert not ecr.repositories[0].images_details[1].scan_findings_status - assert not ecr.repositories[0].images_details[1].scan_findings_severity_count + assert ( + ecr.registries[AWS_REGION] + .repositories[0] + .images_details[1] + .scan_findings_status + == "COMPLETE" + ) + assert ( + ecr.registries[AWS_REGION] + .repositories[0] + .images_details[1] + .scan_findings_severity_count.critical + == 1 + ) + assert ( + ecr.registries[AWS_REGION] + .repositories[0] + .images_details[1] + .scan_findings_severity_count.high + == 2 + ) + assert ( + ecr.registries[AWS_REGION] + .repositories[0] + .images_details[1] + .scan_findings_severity_count.medium + == 3 + ) - # Test get ECR Registries + # Test get ECR Registries Scanning Configuration @mock_ecr def test__get_registry_scanning_configuration__(self): audit_info = self.set_mocked_audit_info() ecr = ECR(audit_info) assert len(ecr.registries) == 1 - assert ecr.registries[0].id == AWS_ACCOUNT_NUMBER - assert ecr.registries[0].scan_type == "BASIC" - assert ecr.registries[0].rules == [ + assert ecr.registries[AWS_REGION].id == AWS_ACCOUNT_NUMBER + assert ecr.registries[AWS_REGION].scan_type == "BASIC" + assert ecr.registries[AWS_REGION].rules == [ ScanningRule( scan_frequency="SCAN_ON_PUSH", scan_filters=[{"filter": "*", "filterType": "WILDCARD"}], diff --git a/tests/providers/aws/services/iam/iam_role_cross_account_readonlyaccess_policy/iam_role_cross_account_readonlyaccess_policy_test.py b/tests/providers/aws/services/iam/iam_role_cross_account_readonlyaccess_policy/iam_role_cross_account_readonlyaccess_policy_test.py index 5c24f6fa..53cf3403 100644 --- a/tests/providers/aws/services/iam/iam_role_cross_account_readonlyaccess_policy/iam_role_cross_account_readonlyaccess_policy_test.py +++ b/tests/providers/aws/services/iam/iam_role_cross_account_readonlyaccess_policy/iam_role_cross_account_readonlyaccess_policy_test.py @@ -271,8 +271,8 @@ class Test_iam_role_cross_account_readonlyaccess_policy: ) with mock.patch( - "prowler.providers.aws.services.iam.iam_service.IAM", - iam_client, + "prowler.providers.aws.services.iam.iam_role_cross_account_readonlyaccess_policy.iam_role_cross_account_readonlyaccess_policy.iam_client", + new=iam_client, ): # Test Check from prowler.providers.aws.services.iam.iam_role_cross_account_readonlyaccess_policy.iam_role_cross_account_readonlyaccess_policy import (