feat(CodeArtifact): Service and checks (#1473)

This commit is contained in:
Pepe Fagoaga
2022-11-14 16:28:00 +01:00
committed by GitHub
parent 9d3bff9e54
commit 9b035230ac
7 changed files with 643 additions and 0 deletions

View File

@@ -0,0 +1,4 @@
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.codeartifact.codeartifact_service import CodeArtifact
codeartifact_client = CodeArtifact(current_audit_info)

View File

@@ -0,0 +1,35 @@
{
"Provider": "aws",
"CheckID": "codeartifact_packages_external_public_publishing_disabled",
"CheckTitle": "Ensure CodeArtifact internal packages do not allow external public source publishing.",
"CheckType": [],
"ServiceName": "codeartifact",
"SubServiceName": "",
"ResourceIdTemplate": "arn:partition:codeartifact:region:account-id:repository/repository-name",
"Severity": "critical",
"ResourceType": "Other",
"Description": "Ensure CodeArtifact internal packages do not allow external public source publishing.",
"Risk": "Allowing package versions of a package to be added both by direct publishing and ingesting from public repositories makes you vulnerable to a dependency substitution attack.",
"RelatedUrl": "https://docs.aws.amazon.com/codeartifact/latest/ug/package-origin-controls.html",
"Remediation": {
"Code": {
"CLI": "aws codeartifact put-package-origin-configuration --package 'MyPackage' --namespace 'MyNamespace' --domain 'MyDomain' --repository 'MyRepository' --domain-owner 'MyOwnerAccount' --format 'MyFormat' --restrictions 'publish=ALLOW,upstream=BLOCK'",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Configure package origin controls on a package in a repository to limit how versions of that package can be added to the repository.",
"Url": "https://docs.aws.amazon.com/codeartifact/latest/ug/package-origin-controls.html"
}
},
"Categories": [],
"Tags": {
"Tag1Key": "value",
"Tag2Key": "value"
},
"DependsOn": [],
"RelatedTo": [],
"Notes": "",
"Compliance": []
}

View File

@@ -0,0 +1,34 @@
from lib.check.models import Check, Check_Report
from providers.aws.services.codeartifact.codeartifact_client import codeartifact_client
from providers.aws.services.codeartifact.codeartifact_service import (
OriginInformationValues,
RestrictionValues,
)
class codeartifact_packages_external_public_publishing_disabled(Check):
def execute(self):
findings = []
for repository in codeartifact_client.repositories.values():
for package in repository.packages:
report = Check_Report(self.metadata)
report.region = repository.region
report.resource_id = package.name
if package.latest_version.origin.origin_type in (
OriginInformationValues.INTERNAL,
OriginInformationValues.UNKNOWN,
):
if (
package.origin_configuration.restrictions.upstream
== RestrictionValues.ALLOW
):
report.status = "FAIL"
report.status_extended = f"Internal package {package.namespace} {package.name} is vulnerable to dependency confusion in repository {repository.arn}"
else:
report.status = "PASS"
report.status_extended = f"Internal package {package.namespace} {package.name} is not vulnerable to dependency confusion in repository {repository.arn}"
findings.append(report)
return findings

View File

@@ -0,0 +1,172 @@
from unittest import mock
from moto.core import DEFAULT_ACCOUNT_ID
from providers.aws.services.codeartifact.codeartifact_service import (
LatestPackageVersion,
LatestPackageVersionStatus,
OriginConfiguration,
OriginInformation,
OriginInformationValues,
Package,
Repository,
Restrictions,
RestrictionValues,
)
AWS_REGION = "eu-west-1"
class Test_accessanalyzer_enabled_without_findings:
def test_no_repositories(self):
codeartifact_client = mock.MagicMock
codeartifact_client.repositories = {}
with mock.patch(
"providers.aws.services.codeartifact.codeartifact_service.CodeArtifact",
new=codeartifact_client,
):
# Test Check
from providers.aws.services.codeartifact.codeartifact_packages_external_public_publishing_disabled.codeartifact_packages_external_public_publishing_disabled import (
codeartifact_packages_external_public_publishing_disabled,
)
check = codeartifact_packages_external_public_publishing_disabled()
result = check.execute()
assert len(result) == 0
def test_repository_without_packages(self):
codeartifact_client = mock.MagicMock
codeartifact_client.repositories = {
"test-repository": Repository(
name="test-repository",
arn="",
domain_name="",
domain_owner="",
region=AWS_REGION,
packages=[],
)
}
with mock.patch(
"providers.aws.services.codeartifact.codeartifact_service.CodeArtifact",
new=codeartifact_client,
):
# Test Check
from providers.aws.services.codeartifact.codeartifact_packages_external_public_publishing_disabled.codeartifact_packages_external_public_publishing_disabled import (
codeartifact_packages_external_public_publishing_disabled,
)
check = codeartifact_packages_external_public_publishing_disabled()
result = check.execute()
assert len(result) == 0
def test_repository_package_public_publishing_origin_internal(self):
codeartifact_client = mock.MagicMock
package_name = "test-package"
package_namespace = "test-namespace"
repository_arn = f"arn:aws:codebuild:{AWS_REGION}:{DEFAULT_ACCOUNT_ID}:repository/test-repository"
codeartifact_client.repositories = {
"test-repository": Repository(
name="test-repository",
arn=repository_arn,
domain_name="",
domain_owner="",
region=AWS_REGION,
packages=[
Package(
name=package_name,
namespace=package_namespace,
format="pypi",
origin_configuration=OriginConfiguration(
restrictions=Restrictions(
publish=RestrictionValues.ALLOW,
upstream=RestrictionValues.ALLOW,
)
),
latest_version=LatestPackageVersion(
version="latest",
status=LatestPackageVersionStatus.Published,
origin=OriginInformation(
origin_type=OriginInformationValues.INTERNAL
),
),
)
],
)
}
with mock.patch(
"providers.aws.services.codeartifact.codeartifact_service.CodeArtifact",
new=codeartifact_client,
):
# Test Check
from providers.aws.services.codeartifact.codeartifact_packages_external_public_publishing_disabled.codeartifact_packages_external_public_publishing_disabled import (
codeartifact_packages_external_public_publishing_disabled,
)
check = codeartifact_packages_external_public_publishing_disabled()
result = check.execute()
assert len(result) == 1
assert result[0].region == AWS_REGION
assert result[0].resource_id == "test-package"
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Internal package {package_namespace} {package_name} is vulnerable to dependency confusion in repository {repository_arn}"
)
def test_repository_package_private_publishing_origin_internal(self):
codeartifact_client = mock.MagicMock
package_name = "test-package"
package_namespace = "test-namespace"
repository_arn = f"arn:aws:codebuild:{AWS_REGION}:{DEFAULT_ACCOUNT_ID}:repository/test-repository"
codeartifact_client.repositories = {
"test-repository": Repository(
name="test-repository",
arn=repository_arn,
domain_name="",
domain_owner="",
region=AWS_REGION,
packages=[
Package(
name=package_name,
namespace=package_namespace,
format="pypi",
origin_configuration=OriginConfiguration(
restrictions=Restrictions(
publish=RestrictionValues.BLOCK,
upstream=RestrictionValues.BLOCK,
)
),
latest_version=LatestPackageVersion(
version="latest",
status=LatestPackageVersionStatus.Published,
origin=OriginInformation(
origin_type=OriginInformationValues.INTERNAL
),
),
)
],
)
}
with mock.patch(
"providers.aws.services.codeartifact.codeartifact_service.CodeArtifact",
new=codeartifact_client,
):
# Test Check
from providers.aws.services.codeartifact.codeartifact_packages_external_public_publishing_disabled.codeartifact_packages_external_public_publishing_disabled import (
codeartifact_packages_external_public_publishing_disabled,
)
check = codeartifact_packages_external_public_publishing_disabled()
result = check.execute()
assert len(result) == 1
assert result[0].region == AWS_REGION
assert result[0].resource_id == "test-package"
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"Internal package {package_namespace} {package_name} is not vulnerable to dependency confusion in repository {repository_arn}"
)

View File

@@ -0,0 +1,224 @@
import threading
from enum import Enum
from pydantic import BaseModel
from lib.logger import logger
from providers.aws.aws_provider import generate_regional_clients
################## CodeArtifact
class CodeArtifact:
def __init__(self, audit_info):
self.service = "codeartifact"
self.session = audit_info.audit_session
self.audited_account = audit_info.audited_account
self.regional_clients = generate_regional_clients(self.service, audit_info)
# repositories is a dictionary containing all the codeartifact service information
self.repositories = {}
self.__threading_call__(self.__list_repositories__)
self.__threading_call__(self.__list_packages__)
def __get_session__(self):
return self.session
def __threading_call__(self, call):
threads = []
for regional_client in self.regional_clients.values():
threads.append(threading.Thread(target=call, args=(regional_client,)))
for t in threads:
t.start()
for t in threads:
t.join()
def __list_repositories__(self, regional_client):
logger.info("CodeArtifact - Listing Repositories...")
try:
list_repositories_paginator = regional_client.get_paginator(
"list_repositories"
)
for page in list_repositories_paginator.paginate():
for repository in page["repositories"]:
package_name = repository["name"]
package_domain_name = repository["domainName"]
package_domain_owner = repository["domainOwner"]
package_arn = repository["arn"]
# Save Repository
self.repositories[package_name] = Repository(
name=package_name,
arn=package_arn,
domain_name=package_domain_name,
domain_owner=package_domain_owner,
region=regional_client.region,
)
except Exception as error:
logger.error(
f"{regional_client.region} --"
f" {error.__class__.__name__}[{error.__traceback__.tb_lineno}]:"
f" {error}"
)
def __list_packages__(self, regional_client):
logger.info("CodeArtifact - Listing Packages and retrieving information...")
try:
for repository in self.repositories:
if self.repositories[repository].region == regional_client.region:
list_packages_paginator = regional_client.get_paginator(
"list_packages"
)
list_packages_parameters = {
"domain": self.repositories[repository].domain_name,
"domainOwner": self.repositories[repository].domain_owner,
"repository": repository,
}
packages = []
for page in list_packages_paginator.paginate(
**list_packages_parameters
):
for package in page["packages"]:
# Package information
package_format = package["format"]
package_namespace = package["namespace"]
package_name = package["package"]
package_origin_configuration_restrictions_publish = package[
"originConfiguration"
]["restrictions"]["publish"]
package_origin_configuration_restrictions_upstream = (
package["originConfiguration"]["restrictions"][
"upstream"
]
)
# Get Latest Package Version
latest_version_information = (
regional_client.list_package_versions(
domain=self.repositories[repository].domain_name,
domainOwner=self.repositories[
repository
].domain_owner,
repository=repository,
format=package_format,
namespace=package_namespace,
package=package_name,
short_by="PUBLISHED_TIME",
)
)
latest_version = latest_version_information["versions"][0][
"version"
]
latest_origin_type = latest_version_information["versions"][
0
]["origin"]["originType"]
latest_status = latest_version_information["versions"][0][
"status"
]
packages.append(
Package(
name=package_name,
namespace=package_namespace,
format=package_format,
origin_configuration=OriginConfiguration(
restrictions=Restrictions(
publish=package_origin_configuration_restrictions_publish,
upstream=package_origin_configuration_restrictions_upstream,
)
),
latest_version=LatestPackageVersion(
version=latest_version,
status=latest_status,
origin=OriginInformation(
origin_type=latest_origin_type
),
),
)
)
# Save all the packages information
self.repositories[repository].packages = packages
except Exception as error:
logger.error(
f"{regional_client.region} --"
f" {error.__class__.__name__}[{error.__traceback__.tb_lineno}]:"
f" {error}"
)
class RestrictionValues(Enum):
"""Possible values for the package origin restriction"""
ALLOW = "ALLOW"
BLOCK = "BLOCK"
class Restrictions(BaseModel):
"""Information about the upstream and publish package origin restrictions"""
publish: RestrictionValues
upstream: RestrictionValues
class OriginConfiguration(BaseModel):
"""Details about the package origin configuration of a package"""
restrictions: Restrictions
class OriginInformationValues(Enum):
"""Possible values for the OriginInformation"""
INTERNAL = "INTERNAL"
EXTERNAL = "EXTERNAL"
UNKNOWN = "UNKNOWN"
class OriginInformation(BaseModel):
"""
Describes how the package version was originally added to the domain.
An INTERNAL origin type means the package version was published directly to a repository in the domain.
An EXTERNAL origin type means the package version was ingested from an external connection.
"""
origin_type: OriginInformationValues
class LatestPackageVersionStatus(Enum):
"""Possibel values for the package status"""
Published = "Published"
Unfinished = "Unfinished"
Unlisted = "Unlisted"
Archived = "Archived"
Disposed = "Disposed"
Deleted = "Deleted"
class LatestPackageVersion(BaseModel):
"""Details of the latest package version"""
version: str
status: LatestPackageVersionStatus
origin: OriginInformation
class Package(BaseModel):
"""Details of a package"""
name: str
namespace: str
format: str
origin_configuration: OriginConfiguration
latest_version: LatestPackageVersion
class Repository(BaseModel):
"""Information about a Repository"""
name: str
arn: str
domain_name: str
domain_owner: str
packages: list[Package] = []
region: str

View File

@@ -0,0 +1,174 @@
from unittest.mock import patch
import botocore
from moto.core import DEFAULT_ACCOUNT_ID
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.codeartifact.codeartifact_service import (
CodeArtifact,
LatestPackageVersionStatus,
OriginInformationValues,
RestrictionValues,
)
# Mock Test Region
AWS_REGION = "eu-west-1"
# Mocking Access Analyzer Calls
make_api_call = botocore.client.BaseClient._make_api_call
def mock_make_api_call(self, operation_name, kwarg):
"""We have to mock every AWS API call using Boto3"""
if operation_name == "ListRepositories":
return {
"repositories": [
{
"name": "test-repository",
"administratorAccount": DEFAULT_ACCOUNT_ID,
"domainName": "test-domain",
"domainOwner": DEFAULT_ACCOUNT_ID,
"arn": f"arn:aws:codebuild:{AWS_REGION}:{DEFAULT_ACCOUNT_ID}:repository/test-repository",
"description": "test description",
},
]
}
if operation_name == "ListPackages":
return {
"packages": [
{
"format": "pypi",
"namespace": "test-namespace",
"package": "test-package",
"originConfiguration": {
"restrictions": {
"publish": "ALLOW",
"upstream": "ALLOW",
}
},
},
],
}
if operation_name == "ListPackageVersions":
return {
"defaultDisplayVersion": "latest",
"format": "pypi",
"namespace": "test-namespace",
"package": "test-package",
"versions": [
{
"version": "latest",
"revision": "lates",
"status": "Published",
"origin": {
"domainEntryPoint": {
"repositoryName": "test-repository",
"externalConnectionName": "",
},
"originType": "INTERNAL",
},
},
],
}
return make_api_call(self, operation_name, kwarg)
# Mock generate_regional_clients()
def mock_generate_regional_clients(service, audit_info):
regional_client = audit_info.audit_session.client(service, region_name=AWS_REGION)
regional_client.region = AWS_REGION
return {AWS_REGION: regional_client}
# Patch every AWS call using Boto3 and generate_regional_clients to have 1 client
@patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call)
@patch(
"providers.aws.services.codeartifact.codeartifact_service.generate_regional_clients",
new=mock_generate_regional_clients,
)
class Test_CodeArtifact_Service:
# Test CodeArtifact Client
def test__get_client__(self):
codeartifact = CodeArtifact(current_audit_info)
assert (
codeartifact.regional_clients[AWS_REGION].__class__.__name__
== "CodeArtifact"
)
# Test CodeArtifact Session
def test__get_session__(self):
codeartifact = CodeArtifact(current_audit_info)
assert codeartifact.session.__class__.__name__ == "Session"
# Test CodeArtifact Service
def test__get_service__(self):
codeartifact = CodeArtifact(current_audit_info)
assert codeartifact.service == "codeartifact"
def test__list_repositories__(self):
# Set partition for the service
current_audit_info.audited_partition = "aws"
codeartifact = CodeArtifact(current_audit_info)
assert len(codeartifact.repositories) == 1
assert codeartifact.repositories
assert codeartifact.repositories["test-repository"]
assert codeartifact.repositories["test-repository"].name == "test-repository"
assert (
codeartifact.repositories["test-repository"].arn
== f"arn:aws:codebuild:{AWS_REGION}:{DEFAULT_ACCOUNT_ID}:repository/test-repository"
)
assert codeartifact.repositories["test-repository"].domain_name == "test-domain"
assert (
codeartifact.repositories["test-repository"].domain_owner
== DEFAULT_ACCOUNT_ID
)
assert codeartifact.repositories["test-repository"].region == AWS_REGION
assert codeartifact.repositories["test-repository"].packages
assert len(codeartifact.repositories["test-repository"].packages) == 1
assert (
codeartifact.repositories["test-repository"].packages[0].name
== "test-package"
)
assert (
codeartifact.repositories["test-repository"].packages[0].namespace
== "test-namespace"
)
assert codeartifact.repositories["test-repository"].packages[0].format == "pypi"
assert (
codeartifact.repositories["test-repository"]
.packages[0]
.origin_configuration.restrictions.publish
== RestrictionValues.ALLOW
)
assert (
codeartifact.repositories["test-repository"]
.packages[0]
.origin_configuration.restrictions.upstream
== RestrictionValues.ALLOW
)
assert (
codeartifact.repositories["test-repository"]
.packages[0]
.latest_version.version
== "latest"
)
assert (
codeartifact.repositories["test-repository"]
.packages[0]
.latest_version.status
== LatestPackageVersionStatus.Published
)
assert (
codeartifact.repositories["test-repository"]
.packages[0]
.latest_version.origin.origin_type
== OriginInformationValues.INTERNAL
)