Files
prowler/providers/aws/services/codeartifact/codeartifact_service.py
2022-11-14 16:28:00 +01:00

225 lines
8.4 KiB
Python

import threading
from enum import Enum
from pydantic import BaseModel
from lib.logger import logger
from providers.aws.aws_provider import generate_regional_clients
################## CodeArtifact
class CodeArtifact:
def __init__(self, audit_info):
self.service = "codeartifact"
self.session = audit_info.audit_session
self.audited_account = audit_info.audited_account
self.regional_clients = generate_regional_clients(self.service, audit_info)
# repositories is a dictionary containing all the codeartifact service information
self.repositories = {}
self.__threading_call__(self.__list_repositories__)
self.__threading_call__(self.__list_packages__)
def __get_session__(self):
return self.session
def __threading_call__(self, call):
threads = []
for regional_client in self.regional_clients.values():
threads.append(threading.Thread(target=call, args=(regional_client,)))
for t in threads:
t.start()
for t in threads:
t.join()
def __list_repositories__(self, regional_client):
logger.info("CodeArtifact - Listing Repositories...")
try:
list_repositories_paginator = regional_client.get_paginator(
"list_repositories"
)
for page in list_repositories_paginator.paginate():
for repository in page["repositories"]:
package_name = repository["name"]
package_domain_name = repository["domainName"]
package_domain_owner = repository["domainOwner"]
package_arn = repository["arn"]
# Save Repository
self.repositories[package_name] = Repository(
name=package_name,
arn=package_arn,
domain_name=package_domain_name,
domain_owner=package_domain_owner,
region=regional_client.region,
)
except Exception as error:
logger.error(
f"{regional_client.region} --"
f" {error.__class__.__name__}[{error.__traceback__.tb_lineno}]:"
f" {error}"
)
def __list_packages__(self, regional_client):
logger.info("CodeArtifact - Listing Packages and retrieving information...")
try:
for repository in self.repositories:
if self.repositories[repository].region == regional_client.region:
list_packages_paginator = regional_client.get_paginator(
"list_packages"
)
list_packages_parameters = {
"domain": self.repositories[repository].domain_name,
"domainOwner": self.repositories[repository].domain_owner,
"repository": repository,
}
packages = []
for page in list_packages_paginator.paginate(
**list_packages_parameters
):
for package in page["packages"]:
# Package information
package_format = package["format"]
package_namespace = package["namespace"]
package_name = package["package"]
package_origin_configuration_restrictions_publish = package[
"originConfiguration"
]["restrictions"]["publish"]
package_origin_configuration_restrictions_upstream = (
package["originConfiguration"]["restrictions"][
"upstream"
]
)
# Get Latest Package Version
latest_version_information = (
regional_client.list_package_versions(
domain=self.repositories[repository].domain_name,
domainOwner=self.repositories[
repository
].domain_owner,
repository=repository,
format=package_format,
namespace=package_namespace,
package=package_name,
short_by="PUBLISHED_TIME",
)
)
latest_version = latest_version_information["versions"][0][
"version"
]
latest_origin_type = latest_version_information["versions"][
0
]["origin"]["originType"]
latest_status = latest_version_information["versions"][0][
"status"
]
packages.append(
Package(
name=package_name,
namespace=package_namespace,
format=package_format,
origin_configuration=OriginConfiguration(
restrictions=Restrictions(
publish=package_origin_configuration_restrictions_publish,
upstream=package_origin_configuration_restrictions_upstream,
)
),
latest_version=LatestPackageVersion(
version=latest_version,
status=latest_status,
origin=OriginInformation(
origin_type=latest_origin_type
),
),
)
)
# Save all the packages information
self.repositories[repository].packages = packages
except Exception as error:
logger.error(
f"{regional_client.region} --"
f" {error.__class__.__name__}[{error.__traceback__.tb_lineno}]:"
f" {error}"
)
class RestrictionValues(Enum):
"""Possible values for the package origin restriction"""
ALLOW = "ALLOW"
BLOCK = "BLOCK"
class Restrictions(BaseModel):
"""Information about the upstream and publish package origin restrictions"""
publish: RestrictionValues
upstream: RestrictionValues
class OriginConfiguration(BaseModel):
"""Details about the package origin configuration of a package"""
restrictions: Restrictions
class OriginInformationValues(Enum):
"""Possible values for the OriginInformation"""
INTERNAL = "INTERNAL"
EXTERNAL = "EXTERNAL"
UNKNOWN = "UNKNOWN"
class OriginInformation(BaseModel):
"""
Describes how the package version was originally added to the domain.
An INTERNAL origin type means the package version was published directly to a repository in the domain.
An EXTERNAL origin type means the package version was ingested from an external connection.
"""
origin_type: OriginInformationValues
class LatestPackageVersionStatus(Enum):
"""Possibel values for the package status"""
Published = "Published"
Unfinished = "Unfinished"
Unlisted = "Unlisted"
Archived = "Archived"
Disposed = "Disposed"
Deleted = "Deleted"
class LatestPackageVersion(BaseModel):
"""Details of the latest package version"""
version: str
status: LatestPackageVersionStatus
origin: OriginInformation
class Package(BaseModel):
"""Details of a package"""
name: str
namespace: str
format: str
origin_configuration: OriginConfiguration
latest_version: LatestPackageVersion
class Repository(BaseModel):
"""Information about a Repository"""
name: str
arn: str
domain_name: str
domain_owner: str
packages: list[Package] = []
region: str