diff --git a/README.md b/README.md index c0f3b53c..3dd8df5d 100644 --- a/README.md +++ b/README.md @@ -39,7 +39,7 @@ It contains hundreds of controls covering CIS, NIST 800, NIST CSF, CISA, RBI, Fe | Provider | Checks | Services | [Compliance Frameworks](https://docs.prowler.cloud/en/latest/tutorials/compliance/) | [Categories](https://docs.prowler.cloud/en/latest/tutorials/misc/#categories) | |---|---|---|---|---| -| AWS | 290 | 56 -> `prowler aws --list-services` | 25 -> `prowler aws --list-compliance` | 5 -> `prowler aws --list-categories` | +| AWS | 301 | 61 -> `prowler aws --list-services` | 25 -> `prowler aws --list-compliance` | 5 -> `prowler aws --list-categories` | | GCP | 73 | 11 -> `prowler gcp --list-services` | 1 -> `prowler gcp --list-compliance` | 2 -> `prowler gcp --list-categories`| | Azure | 23 | 4 -> `prowler azure --list-services` | CIS soon | 1 -> `prowler azure --list-categories` | | Kubernetes | Planned | - | - | - | diff --git a/prowler/providers/aws/services/apigateway/apigateway_endpoint_public_without_authorizer/__init__.py b/prowler/providers/aws/services/apigateway/apigateway_endpoint_public_without_authorizer/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/prowler/providers/aws/services/apigateway/apigateway_endpoint_public_without_authorizer/apigateway_endpoint_public_without_authorizer.metadata.json b/prowler/providers/aws/services/apigateway/apigateway_endpoint_public_without_authorizer/apigateway_endpoint_public_without_authorizer.metadata.json new file mode 100644 index 00000000..4bc916d4 --- /dev/null +++ b/prowler/providers/aws/services/apigateway/apigateway_endpoint_public_without_authorizer/apigateway_endpoint_public_without_authorizer.metadata.json @@ -0,0 +1,34 @@ +{ + "Provider": "aws", + "CheckID": "apigateway_endpoint_public_without_authorizer", + "CheckTitle": "Check if API Gateway public endpoint has an authorizer configured.", + "CheckType": [ + "Infrastructure Security" + ], + "ServiceName": "apigateway", + "SubServiceName": "rest_api", + "ResourceIdTemplate": "arn:partition:service:region:account-id:resource-id", + "Severity": "medium", + "ResourceType": "AwsApiGatewayRestApi", + "Description": "Check if API Gateway public endpoint has an authorizer configured.", + "Risk": "If accessible from internet without restrictions opens up attack / abuse surface for any malicious user.", + "RelatedUrl": "https://docs.aws.amazon.com/apigateway/latest/developerguide/api-gateway-api-endpoint-types.html", + "Remediation": { + "Code": { + "CLI": "", + "NativeIaC": "", + "Other": "", + "Terraform": "" + }, + "Recommendation": { + "Text": "Verify that any public API Gateway is protected and audited. Detective controls for common risks should be implemented.", + "Url": "https://docs.aws.amazon.com/apigateway/latest/developerguide/api-gateway-api-endpoint-types.html" + } + }, + "Categories": [ + "internet-exposed" + ], + "DependsOn": [], + "RelatedTo": [], + "Notes": "" +} diff --git a/prowler/providers/aws/services/apigateway/apigateway_endpoint_public_without_authorizer/apigateway_endpoint_public_without_authorizer.py b/prowler/providers/aws/services/apigateway/apigateway_endpoint_public_without_authorizer/apigateway_endpoint_public_without_authorizer.py new file mode 100644 index 00000000..84902182 --- /dev/null +++ b/prowler/providers/aws/services/apigateway/apigateway_endpoint_public_without_authorizer/apigateway_endpoint_public_without_authorizer.py @@ -0,0 +1,27 @@ +from prowler.lib.check.models import Check, Check_Report_AWS +from prowler.providers.aws.services.apigateway.apigateway_client import ( + apigateway_client, +) + + +class apigateway_endpoint_public_without_authorizer(Check): + def execute(self): + findings = [] + for rest_api in apigateway_client.rest_apis: + if rest_api.public_endpoint: + report = Check_Report_AWS(self.metadata()) + report.region = rest_api.region + report.resource_id = rest_api.name + report.resource_arn = rest_api.arn + report.resource_tags = rest_api.tags + + report.status = "PASS" + report.status_extended = f"API Gateway REST API {rest_api.name} with ID {rest_api.id} has a public endpoint with an authorizer." + + if not rest_api.authorizer: + report.status = "FAIL" + report.status_extended = f"API Gateway REST API {rest_api.name} with ID {rest_api.id} has a public endpoint without an authorizer." + + findings.append(report) + + return findings diff --git a/prowler/providers/aws/services/ecs/ecs_service.py b/prowler/providers/aws/services/ecs/ecs_service.py index 43c4905b..1dad4752 100644 --- a/prowler/providers/aws/services/ecs/ecs_service.py +++ b/prowler/providers/aws/services/ecs/ecs_service.py @@ -64,6 +64,9 @@ class ECS(AWSService): ) ) task_definition.tags = response.get("tags") + task_definition.network_mode = response["taskDefinition"].get( + "networkMode" + ) except Exception as error: logger.error( f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" @@ -82,3 +85,4 @@ class TaskDefinition(BaseModel): region: str environment_variables: list[ContainerEnvVariable] tags: Optional[list] = [] + network_mode: Optional[str] diff --git a/prowler/providers/aws/services/elasticache/__init__.py b/prowler/providers/aws/services/elasticache/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/prowler/providers/aws/services/elasticache/elasticache_client.py b/prowler/providers/aws/services/elasticache/elasticache_client.py new file mode 100644 index 00000000..5b6669bd --- /dev/null +++ b/prowler/providers/aws/services/elasticache/elasticache_client.py @@ -0,0 +1,4 @@ +from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info +from prowler.providers.aws.services.elasticache.elasticache_service import ElastiCache + +elasticache_client = ElastiCache(current_audit_info) diff --git a/prowler/providers/aws/services/elasticache/elasticache_cluster_uses_public_subnet/__init__.py b/prowler/providers/aws/services/elasticache/elasticache_cluster_uses_public_subnet/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/prowler/providers/aws/services/elasticache/elasticache_cluster_uses_public_subnet/elasticache_cluster_uses_public_subnet.metadata.json b/prowler/providers/aws/services/elasticache/elasticache_cluster_uses_public_subnet/elasticache_cluster_uses_public_subnet.metadata.json new file mode 100644 index 00000000..e6b0e497 --- /dev/null +++ b/prowler/providers/aws/services/elasticache/elasticache_cluster_uses_public_subnet/elasticache_cluster_uses_public_subnet.metadata.json @@ -0,0 +1,32 @@ +{ + "Provider": "aws", + "CheckID": "elasticache_cluster_uses_public_subnet", + "CheckTitle": "Ensure Elasticache Cluster is not using a public subnet", + "CheckType": [], + "ServiceName": "elasticache", + "SubServiceName": "", + "ResourceIdTemplate": "arn:partition:service:region:account-id:resource-id", + "Severity": "medium", + "ResourceType": "AWSElastiCacheCacheCluster", + "Description": "Ensure Elasticache Cluster is not using a public subnet", + "Risk": "There is a risk of exposing sensitive data if Elasticache Cluster uses a public subnet.", + "RelatedUrl": "https://docs.aws.amazon.com/AmazonElastiCache/latest/red-ug/VPCs.html", + "Remediation": { + "Code": { + "CLI": "aws elasticache modify-cache-cluster --cache-cluster-id my-elasticache-cluster --cache-subnet-group-name my-private-subnet-group", + "NativeIaC": "", + "Other": "", + "Terraform": "" + }, + "Recommendation": { + "Text": "To ensure your Elasticache cluster is not using a public subnet, follow the recommended remediation steps based on your preferred method.", + "Url": "https://docs.aws.amazon.com/AmazonElastiCache/latest/red-ug/VPCs.html" + } + }, + "Categories": [ + "internet-exposed" + ], + "DependsOn": [], + "RelatedTo": [], + "Notes": "" +} diff --git a/prowler/providers/aws/services/elasticache/elasticache_cluster_uses_public_subnet/elasticache_cluster_uses_public_subnet.py b/prowler/providers/aws/services/elasticache/elasticache_cluster_uses_public_subnet/elasticache_cluster_uses_public_subnet.py new file mode 100644 index 00000000..43e733c7 --- /dev/null +++ b/prowler/providers/aws/services/elasticache/elasticache_cluster_uses_public_subnet/elasticache_cluster_uses_public_subnet.py @@ -0,0 +1,34 @@ +from prowler.lib.check.models import Check, Check_Report_AWS +from prowler.providers.aws.services.elasticache.elasticache_client import ( + elasticache_client, +) +from prowler.providers.aws.services.vpc.vpc_client import vpc_client + + +class elasticache_cluster_uses_public_subnet(Check): + def execute(self): + findings = [] + for cluster in elasticache_client.clusters.values(): + report = Check_Report_AWS(self.metadata()) + report.resource_id = cluster.id + report.resource_arn = cluster.arn + report.resource_tags = cluster.tags + report.region = cluster.region + + report.status = "PASS" + report.status_extended = ( + f"Cluster {cluster.id} is not using public subnets." + ) + + public_subnets = [] + for subnet in cluster.subnets: + if vpc_client.vpc_subnets[subnet].public: + public_subnets.append(vpc_client.vpc_subnets[subnet].id) + + if len(public_subnets) > 0: + report.status = "FAIL" + report.status_extended = f"Cluster {cluster.id} is using {', '.join(public_subnets)} public subnets." + + findings.append(report) + + return findings diff --git a/prowler/providers/aws/services/elasticache/elasticache_service.py b/prowler/providers/aws/services/elasticache/elasticache_service.py new file mode 100644 index 00000000..605e0cfa --- /dev/null +++ b/prowler/providers/aws/services/elasticache/elasticache_service.py @@ -0,0 +1,89 @@ +from typing import Optional + +from pydantic import BaseModel + +from prowler.lib.logger import logger +from prowler.providers.aws.lib.service.service import AWSService + + +################################ Elasticache +class ElastiCache(AWSService): + def __init__(self, audit_info): + # Call AWSService's __init__ + super().__init__(__class__.__name__, audit_info) + self.clusters = {} + self.__threading_call__(self.__describe_cache_clusters__) + self.__threading_call__(self.__describe_cache_subnet_groups__) + self.__list_tags_for_resource__() + + def __describe_cache_clusters__(self, regional_client): + logger.info("Elasticache - Describing Cache Clusters...") + try: + for cache_cluster in regional_client.describe_cache_clusters()[ + "CacheClusters" + ]: + cluster_arn = cache_cluster["ARN"] + self.clusters[cluster_arn] = Cluster( + id=cache_cluster["CacheClusterId"], + arn=cluster_arn, + region=regional_client.region, + cache_subnet_group_id=cache_cluster["CacheSubnetGroupName"], + ) + except Exception as error: + logger.error( + f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + ) + + def __describe_cache_subnet_groups__(self, regional_client): + logger.info("Elasticache - Describing Cache Subnet Groups...") + try: + for cluster in self.clusters.values(): + if cluster.region == regional_client.region: + try: + subnets = [] + cache_subnet_groups = ( + regional_client.describe_cache_subnet_groups( + CacheSubnetGroupName=cluster.cache_subnet_group_id + )["CacheSubnetGroups"] + ) + for subnet_group in cache_subnet_groups: + for subnet in subnet_group["Subnets"]: + subnets.append(subnet["SubnetIdentifier"]) + + cluster.subnets = subnets + except Exception as error: + logger.error( + f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + ) + except Exception as error: + logger.error( + f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + ) + + def __list_tags_for_resource__(self): + logger.info("Elasticache - Listing Tags...") + try: + for cluster in self.clusters.values(): + try: + regional_client = self.regional_clients[cluster.region] + cluster.tags = regional_client.list_tags_for_resource( + ResourceName=cluster.arn + )["TagList"] + + except Exception as error: + logger.error( + f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + ) + except Exception as error: + logger.error( + f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + ) + + +class Cluster(BaseModel): + id: str + arn: str + region: str + cache_subnet_group_id: str + subnets: Optional[list] + tags: Optional[list] diff --git a/prowler/providers/aws/services/iam/iam_service.py b/prowler/providers/aws/services/iam/iam_service.py index 18629f5e..7bb79988 100644 --- a/prowler/providers/aws/services/iam/iam_service.py +++ b/prowler/providers/aws/services/iam/iam_service.py @@ -84,6 +84,12 @@ class IAM(AWSService): self.saml_providers = self.__list_saml_providers__() self.server_certificates = self.__list_server_certificates__() self.__list_tags_for_resource__() + self.access_keys_metadata = {} + self.__get_access_keys_metadata__() + self.last_accessed_services = {} + self.__get_last_accessed_services__() + self.user_temporary_credentials_usage = {} + self.__get_user_temporary_credentials_usage__() def __get_client__(self): return self.client @@ -671,6 +677,87 @@ class IAM(AWSService): f"{self.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" ) + def __get_last_accessed_services__(self): + logger.info("IAM - Getting Last Accessed Services ...") + try: + for user in self.users: + try: + details = self.client.generate_service_last_accessed_details( + Arn=user.arn + ) + response = self.client.get_service_last_accessed_details( + JobId=details["JobId"] + ) + while response["JobStatus"] == "IN_PROGRESS": + response = self.client.get_service_last_accessed_details( + JobId=details["JobId"] + ) + self.last_accessed_services[(user.name, user.arn)] = response[ + "ServicesLastAccessed" + ] + + except ClientError as error: + logger.error( + f"{self.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + ) + except Exception as error: + logger.error( + f"{self.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + ) + + def __get_access_keys_metadata__(self): + logger.info("IAM - Getting Access Keys Metadata ...") + try: + for user in self.users: + try: + paginator = self.client.get_paginator("list_access_keys") + self.access_keys_metadata[(user.name, user.arn)] = [] + for response in paginator.paginate(UserName=user.name): + self.access_keys_metadata[(user.name, user.arn)] = response[ + "AccessKeyMetadata" + ] + except ClientError as error: + logger.error( + f"{self.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + ) + except Exception as error: + logger.error( + f"{self.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + ) + + def __get_user_temporary_credentials_usage__(self): + logger.info("IAM - Getting User Temporary Credentials Usage ...") + try: + temporary_credentials_usage = False + for ( + user_data, + last_accessed_services, + ) in self.last_accessed_services.items(): + # Get AWS services number used more than IAM and STS + services_accessed = len( + [ + service + for service in last_accessed_services + if service["ServiceNamespace"] not in ["iam", "sts"] + ] + ) + # Get IAM user access keys number + access_keys_number = len(self.access_keys_metadata[user_data]) + + # If the user has access keys and uses more services than IAM and STS store True, otherwise False + temporary_credentials_usage = ( + services_accessed > 0 and access_keys_number > 0 + ) + + self.user_temporary_credentials_usage[ + user_data + ] = temporary_credentials_usage + + except Exception as error: + logger.error( + f"{self.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + ) + class MFADevice(BaseModel): serial_number: str diff --git a/prowler/providers/aws/services/iam/iam_user_with_temporary_credentials/__init__.py b/prowler/providers/aws/services/iam/iam_user_with_temporary_credentials/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/prowler/providers/aws/services/iam/iam_user_with_temporary_credentials/iam_user_with_temporary_credentials.metadata.json b/prowler/providers/aws/services/iam/iam_user_with_temporary_credentials/iam_user_with_temporary_credentials.metadata.json new file mode 100644 index 00000000..35b77c91 --- /dev/null +++ b/prowler/providers/aws/services/iam/iam_user_with_temporary_credentials/iam_user_with_temporary_credentials.metadata.json @@ -0,0 +1,32 @@ +{ + "Provider": "aws", + "CheckID": "iam_user_with_temporary_credentials", + "CheckTitle": "Ensure users make use of temporary credentials assuming IAM roles", + "CheckType": [ + "Infrastructure Security" + ], + "ServiceName": "iam", + "SubServiceName": "", + "ResourceIdTemplate": "arn:partition:iam::account-id:user/user-name", + "Severity": "medium", + "ResourceType": "AwsIamUser", + "Description": "Ensure users make use of temporary credentials assuming IAM roles", + "Risk": "As a best practice, use temporary security credentials (IAM roles) instead of creating long-term credentials like access keys, and don't create AWS account root user access keys.", + "RelatedUrl": "https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp.html", + "Remediation": { + "Code": { + "CLI": "", + "NativeIaC": "", + "Other": "", + "Terraform": "" + }, + "Recommendation": { + "Text": "As a best practice, use temporary security credentials (IAM roles) instead of creating long-term credentials like access keys, and don't create AWS account root user access keys.", + "Url": "https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp.html" + } + }, + "Categories": [], + "DependsOn": [], + "RelatedTo": [], + "Notes": "" +} diff --git a/prowler/providers/aws/services/iam/iam_user_with_temporary_credentials/iam_user_with_temporary_credentials.py b/prowler/providers/aws/services/iam/iam_user_with_temporary_credentials/iam_user_with_temporary_credentials.py new file mode 100644 index 00000000..d3657d53 --- /dev/null +++ b/prowler/providers/aws/services/iam/iam_user_with_temporary_credentials/iam_user_with_temporary_credentials.py @@ -0,0 +1,30 @@ +from prowler.lib.check.models import Check, Check_Report_AWS +from prowler.providers.aws.services.iam.iam_client import iam_client + + +class iam_user_with_temporary_credentials(Check): + def execute(self) -> Check_Report_AWS: + findings = [] + + for ( + user_data, + last_accessed_services, + ) in iam_client.user_temporary_credentials_usage.items(): + user_name = user_data[0] + user_arn = user_data[1] + + report = Check_Report_AWS(self.metadata()) + report.resource_id = user_name + report.resource_arn = user_arn + report.region = iam_client.region + + report.status = "PASS" + report.status_extended = f"User {user_name} doesn't have long lived credentials with access to other services than IAM or STS." + + if last_accessed_services: + report.status = "FAIL" + report.status_extended = f"User {user_name} has long lived credentials with access to other services than IAM or STS." + + findings.append(report) + + return findings diff --git a/prowler/providers/aws/services/neptune/__init__.py b/prowler/providers/aws/services/neptune/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/prowler/providers/aws/services/neptune/neptune_client.py b/prowler/providers/aws/services/neptune/neptune_client.py new file mode 100644 index 00000000..1708be45 --- /dev/null +++ b/prowler/providers/aws/services/neptune/neptune_client.py @@ -0,0 +1,6 @@ +from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info +from prowler.providers.aws.services.neptune.neptune_service import ( + Neptune, +) + +neptune_client = Neptune(current_audit_info) diff --git a/prowler/providers/aws/services/neptune/neptune_cluster_uses_public_subnet/__init__.py b/prowler/providers/aws/services/neptune/neptune_cluster_uses_public_subnet/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/prowler/providers/aws/services/neptune/neptune_cluster_uses_public_subnet/neptune_cluster_uses_public_subnet.metadata.json b/prowler/providers/aws/services/neptune/neptune_cluster_uses_public_subnet/neptune_cluster_uses_public_subnet.metadata.json new file mode 100644 index 00000000..86f9cc0b --- /dev/null +++ b/prowler/providers/aws/services/neptune/neptune_cluster_uses_public_subnet/neptune_cluster_uses_public_subnet.metadata.json @@ -0,0 +1,32 @@ +{ + "Provider": "aws", + "CheckID": "neptune_cluster_uses_public_subnet", + "CheckTitle": "Ensure Neptune Cluster is not using a public subnet", + "CheckType": [], + "ServiceName": "neptune", + "SubServiceName": "", + "ResourceIdTemplate": "arn:aws:rds:::cluster:", + "Severity": "medium", + "ResourceType": "AWSNeptuneDBCluster", + "Description": "Ensure Neptune Cluster is not using a public subnet", + "Risk": "There is a risk of exposing sensitive data if Neptune Cluster uses a public subnet.", + "RelatedUrl": "https://docs.aws.amazon.com/neptune/latest/userguide/get-started-vpc.html", + "Remediation": { + "Code": { + "CLI": "", + "NativeIaC": "", + "Other": "", + "Terraform": "" + }, + "Recommendation": { + "Text": "To ensure your Neptune cluster is not using a public subnet, follow the recommended remediation steps based on your preferred method.", + "Url": "https://docs.aws.amazon.com/neptune/latest/userguide/get-started-vpc.html" + } + }, + "Categories": [ + "internet-exposed" + ], + "DependsOn": [], + "RelatedTo": [], + "Notes": "" +} diff --git a/prowler/providers/aws/services/neptune/neptune_cluster_uses_public_subnet/neptune_cluster_uses_public_subnet.py b/prowler/providers/aws/services/neptune/neptune_cluster_uses_public_subnet/neptune_cluster_uses_public_subnet.py new file mode 100644 index 00000000..f1545116 --- /dev/null +++ b/prowler/providers/aws/services/neptune/neptune_cluster_uses_public_subnet/neptune_cluster_uses_public_subnet.py @@ -0,0 +1,30 @@ +from prowler.lib.check.models import Check, Check_Report_AWS +from prowler.providers.aws.services.neptune.neptune_client import neptune_client +from prowler.providers.aws.services.vpc.vpc_client import vpc_client + + +class neptune_cluster_uses_public_subnet(Check): + def execute(self): + findings = [] + for cluster in neptune_client.clusters.values(): + report = Check_Report_AWS(self.metadata()) + report.resource_id = cluster.id + report.resource_arn = cluster.arn + report.resource_tags = cluster.tags + report.region = cluster.region + report.status = "PASS" + report.status_extended = ( + f"Cluster {cluster.id} is not using public subnets." + ) + + public_subnets = [] + for subnet in cluster.subnets: + if vpc_client.vpc_subnets[subnet].public: + public_subnets.append(vpc_client.vpc_subnets[subnet].id) + + if len(public_subnets) > 0: + report.status = "FAIL" + report.status_extended = f"Cluster {cluster.id} is using {', '.join(public_subnets)} public subnets." + findings.append(report) + + return findings diff --git a/prowler/providers/aws/services/neptune/neptune_service.py b/prowler/providers/aws/services/neptune/neptune_service.py new file mode 100644 index 00000000..8201ece1 --- /dev/null +++ b/prowler/providers/aws/services/neptune/neptune_service.py @@ -0,0 +1,98 @@ +from typing import Optional + +from pydantic import BaseModel + +from prowler.lib.logger import logger +from prowler.providers.aws.lib.service.service import AWSService + + +################## Neptune +class Neptune(AWSService): + def __init__(self, audit_info): + # Call AWSService's __init__ + self.service_name = "neptune" + super().__init__(self.service_name, audit_info) + self.clusters = {} + self.__threading_call__(self.__describe_clusters__) + self.__threading_call__(self.__describe_db_subnet_groups__) + self.__list_tags_for_resource__() + + def __describe_clusters__(self, regional_client): + logger.info("Neptune - Describing DB Clusters...") + try: + for cluster in regional_client.describe_db_clusters( + Filters=[ + { + "Name": "engine", + "Values": [ + self.service_name, + ], + }, + ], + )["DBClusters"]: + cluster_arn = cluster["DBClusterArn"] + self.clusters[cluster_arn] = Cluster( + arn=cluster_arn, + name=cluster["DBClusterIdentifier"], + id=cluster["DbClusterResourceId"], + db_subnet_group_id=cluster["DBSubnetGroup"], + region=regional_client.region, + ) + + except Exception as error: + logger.error( + f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + ) + + def __describe_db_subnet_groups__(self, regional_client): + logger.info("Neptune - Describing DB Subnet Groups...") + try: + for cluster in self.clusters.values(): + if cluster.region == regional_client.region: + try: + subnets = [] + db_subnet_groups = regional_client.describe_db_subnet_groups( + DBSubnetGroupName=cluster.db_subnet_group_id + )["DBSubnetGroups"] + for subnet_group in db_subnet_groups: + for subnet in subnet_group["Subnets"]: + subnets.append(subnet["SubnetIdentifier"]) + + cluster.subnets = subnets + except Exception as error: + logger.error( + f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + ) + + except Exception as error: + logger.error( + f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + ) + + def __list_tags_for_resource__(self): + logger.info("Neptune - Listing Tags...") + try: + for cluster in self.clusters.values(): + try: + regional_client = self.regional_clients[cluster.region] + cluster.tags = regional_client.list_tags_for_resource( + ResourceName=cluster.arn + )["TagList"] + except Exception as error: + logger.error( + f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + ) + except Exception as error: + logger.error( + f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + ) + + +class Cluster(BaseModel): + arn: str + name: str + id: str + region: str + db_subnet_group_id: str + subnets: Optional[list] + tags: Optional[list] diff --git a/tests/providers/aws/services/apigateway/apigateway_endpoint_public_without_authorizer/apigateway_endpoint_public_without_authorizer_test.py b/tests/providers/aws/services/apigateway/apigateway_endpoint_public_without_authorizer/apigateway_endpoint_public_without_authorizer_test.py new file mode 100644 index 00000000..fb13c1c5 --- /dev/null +++ b/tests/providers/aws/services/apigateway/apigateway_endpoint_public_without_authorizer/apigateway_endpoint_public_without_authorizer_test.py @@ -0,0 +1,169 @@ +from unittest import mock + +from boto3 import client, session +from moto import mock_apigateway + +from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info +from prowler.providers.common.models import Audit_Metadata + +AWS_REGION = "us-east-1" +AWS_ACCOUNT_NUMBER = "123456789012" + +API_GW_NAME = "test-rest-api" + + +class Test_apigateway_endpoint_public_without_authorizer: + def set_mocked_audit_info(self): + audit_info = AWS_Audit_Info( + session_config=None, + original_session=None, + audit_session=session.Session( + profile_name=None, + botocore_session=None, + ), + audited_account=AWS_ACCOUNT_NUMBER, + audited_account_arn=f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:root", + audited_user_id=None, + audited_partition="aws", + audited_identity_arn=None, + profile=None, + profile_region=None, + credentials=None, + assumed_role_info=None, + audited_regions=["us-east-1", "eu-west-1"], + organizations_metadata=None, + audit_resources=None, + mfa_enabled=False, + audit_metadata=Audit_Metadata( + services_scanned=0, + expected_checks=[], + completed_checks=0, + audit_progress=0, + ), + ) + + return audit_info + + @mock_apigateway + def test_apigateway_no_rest_apis(self): + from prowler.providers.aws.services.apigateway.apigateway_service import ( + APIGateway, + ) + + current_audit_info = self.set_mocked_audit_info() + + with mock.patch( + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=current_audit_info, + ), mock.patch( + "prowler.providers.aws.services.apigateway.apigateway_endpoint_public_without_authorizer.apigateway_endpoint_public_without_authorizer.apigateway_client", + new=APIGateway(current_audit_info), + ): + # Test Check + from prowler.providers.aws.services.apigateway.apigateway_endpoint_public_without_authorizer.apigateway_endpoint_public_without_authorizer import ( + apigateway_endpoint_public_without_authorizer, + ) + + check = apigateway_endpoint_public_without_authorizer() + result = check.execute() + + assert len(result) == 0 + + @mock_apigateway + def test_apigateway_one_public_rest_api_without_authorizer(self): + # Create APIGateway Mocked Resources + apigateway_client = client("apigateway", region_name=AWS_REGION) + # Create APIGateway Deployment Stage + rest_api = apigateway_client.create_rest_api( + name=API_GW_NAME, + endpointConfiguration={ + "types": [ + "EDGE", + ] + }, + ) + from prowler.providers.aws.services.apigateway.apigateway_service import ( + APIGateway, + ) + + current_audit_info = self.set_mocked_audit_info() + + with mock.patch( + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=current_audit_info, + ), mock.patch( + "prowler.providers.aws.services.apigateway.apigateway_endpoint_public_without_authorizer.apigateway_endpoint_public_without_authorizer.apigateway_client", + new=APIGateway(current_audit_info), + ): + # Test Check + from prowler.providers.aws.services.apigateway.apigateway_endpoint_public_without_authorizer.apigateway_endpoint_public_without_authorizer import ( + apigateway_endpoint_public_without_authorizer, + ) + + check = apigateway_endpoint_public_without_authorizer() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "FAIL" + assert ( + result[0].status_extended + == f"API Gateway REST API {API_GW_NAME} with ID {rest_api['id']} has a public endpoint without an authorizer." + ) + assert result[0].resource_id == API_GW_NAME + assert ( + result[0].resource_arn + == f"arn:{current_audit_info.audited_partition}:apigateway:{AWS_REGION}::/restapis/{rest_api['id']}" + ) + assert result[0].region == AWS_REGION + assert result[0].resource_tags == [{}] + + @mock_apigateway + def test_apigateway_one_public_rest_api_with_authorizer(self): + # Create APIGateway Mocked Resources + apigateway_client = client("apigateway", region_name=AWS_REGION) + # Create APIGateway Deployment Stage + rest_api = apigateway_client.create_rest_api( + name="test-rest-api", + endpointConfiguration={ + "types": [ + "EDGE", + ] + }, + ) + apigateway_client.create_authorizer( + restApiId=rest_api["id"], name="test-rest-api-with-authorizer", type="TOKEN" + ) + from prowler.providers.aws.services.apigateway.apigateway_service import ( + APIGateway, + ) + + current_audit_info = self.set_mocked_audit_info() + + with mock.patch( + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=current_audit_info, + ), mock.patch( + "prowler.providers.aws.services.apigateway.apigateway_endpoint_public_without_authorizer.apigateway_endpoint_public_without_authorizer.apigateway_client", + new=APIGateway(current_audit_info), + ): + # Test Check + from prowler.providers.aws.services.apigateway.apigateway_endpoint_public_without_authorizer.apigateway_endpoint_public_without_authorizer import ( + apigateway_endpoint_public_without_authorizer, + ) + + check = apigateway_endpoint_public_without_authorizer() + result = check.execute() + + assert result[0].status == "PASS" + assert len(result) == 1 + assert ( + result[0].status_extended + == f"API Gateway REST API {API_GW_NAME} with ID {rest_api['id']} has a public endpoint with an authorizer." + ) + assert result[0].resource_id == API_GW_NAME + assert ( + result[0].resource_arn + == f"arn:{current_audit_info.audited_partition}:apigateway:{AWS_REGION}::/restapis/{rest_api['id']}" + ) + assert result[0].region == AWS_REGION + assert result[0].resource_tags == [{}] diff --git a/tests/providers/aws/services/elasticache/elasticache_cluster_uses_public_subnet/elasticache_cluster_uses_public_subnet_test.py b/tests/providers/aws/services/elasticache/elasticache_cluster_uses_public_subnet/elasticache_cluster_uses_public_subnet_test.py new file mode 100644 index 00000000..ae917ba8 --- /dev/null +++ b/tests/providers/aws/services/elasticache/elasticache_cluster_uses_public_subnet/elasticache_cluster_uses_public_subnet_test.py @@ -0,0 +1,246 @@ +from unittest import mock + +from boto3 import session +from mock import MagicMock, patch +from moto import mock_ec2 + +from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info +from prowler.providers.aws.services.elasticache.elasticache_service import Cluster +from prowler.providers.aws.services.vpc.vpc_service import VpcSubnet +from prowler.providers.common.models import Audit_Metadata +from tests.providers.aws.services.elasticache.elasticache_service_test import ( + AWS_REGION_AZ1, + AWS_REGION_AZ2, + ELASTICACHE_CLUSTER_ARN, + ELASTICACHE_CLUSTER_NAME, + ELASTICACHE_CLUSTER_TAGS, + SUBNET_1, + SUBNET_2, + SUBNET_GROUP_NAME, + mock_make_api_call, +) + +AWS_ACCOUNT_NUMBER = "123456789012" +AWS_ACCOUNT_ARN = f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:root" +AWS_REGION = "us-east-1" + +VPC_ID = "vpc-12345678901234567" + + +# Patch every AWS call using Boto3 +@patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call) +class Test_elasticache_cluster_uses_public_subnet: + def set_mocked_audit_info(self): + audit_info = AWS_Audit_Info( + session_config=None, + original_session=None, + audit_session=session.Session( + profile_name=None, + botocore_session=None, + ), + audited_account=AWS_ACCOUNT_NUMBER, + audited_account_arn=AWS_ACCOUNT_ARN, + audited_user_id=None, + audited_partition="aws", + audited_identity_arn=None, + profile=None, + profile_region=None, + credentials=None, + assumed_role_info=None, + audited_regions=[AWS_REGION], + organizations_metadata=None, + audit_resources=None, + mfa_enabled=False, + audit_metadata=Audit_Metadata( + services_scanned=0, + expected_checks=[], + completed_checks=0, + audit_progress=0, + ), + ) + return audit_info + + @mock_ec2 + def test_elasticache_no_clusters(self): + # Mock VPC Service + vpc_client = MagicMock + vpc_client.vpc_subnets = {} + + # Mock ElastiCache Service + elasticache_service = MagicMock + elasticache_service.clusters = {} + + with mock.patch( + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=self.set_mocked_audit_info(), + ), mock.patch( + "prowler.providers.aws.services.elasticache.elasticache_service.ElastiCache", + new=elasticache_service, + ), mock.patch( + "prowler.providers.aws.services.vpc.vpc_service.VPC", + new=vpc_client, + ), mock.patch( + "prowler.providers.aws.services.vpc.vpc_client.vpc_client", + new=vpc_client, + ): + from prowler.providers.aws.services.elasticache.elasticache_cluster_uses_public_subnet.elasticache_cluster_uses_public_subnet import ( + elasticache_cluster_uses_public_subnet, + ) + + check = elasticache_cluster_uses_public_subnet() + result = check.execute() + assert len(result) == 0 + + def test_elasticache_clusters_using_private_subnets(self): + # Mock ElastiCache Service + elasticache_service = MagicMock + elasticache_service.clusters = {} + + elasticache_service.clusters[ELASTICACHE_CLUSTER_ARN] = Cluster( + arn=ELASTICACHE_CLUSTER_ARN, + name=ELASTICACHE_CLUSTER_NAME, + id=ELASTICACHE_CLUSTER_NAME, + region=AWS_REGION, + cache_subnet_group_id=SUBNET_GROUP_NAME, + subnets=[SUBNET_1, SUBNET_2], + tags=ELASTICACHE_CLUSTER_TAGS, + ) + + # Mock VPC Service + vpc_client = MagicMock + vpc_client.vpc_subnets = {} + vpc_client.vpc_subnets[SUBNET_1] = VpcSubnet( + id=SUBNET_1, + name=SUBNET_1, + arn="arn_test", + default=False, + vpc_id=VPC_ID, + cidr_block="192.168.0.0/24", + availability_zone=AWS_REGION_AZ1, + public=False, + nat_gateway=False, + region=AWS_REGION, + tags=[], + mapPublicIpOnLaunch=False, + ) + vpc_client.vpc_subnets[SUBNET_2] = VpcSubnet( + id=SUBNET_2, + name=SUBNET_2, + arn="arn_test", + default=False, + vpc_id=VPC_ID, + cidr_block="192.168.0.1/24", + availability_zone=AWS_REGION_AZ2, + public=False, + nat_gateway=False, + region=AWS_REGION, + tags=[], + mapPublicIpOnLaunch=False, + ) + + with mock.patch( + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=self.set_mocked_audit_info(), + ), mock.patch( + "prowler.providers.aws.services.elasticache.elasticache_service.ElastiCache", + new=elasticache_service, + ), mock.patch( + "prowler.providers.aws.services.vpc.vpc_service.VPC", + new=vpc_client, + ), mock.patch( + "prowler.providers.aws.services.vpc.vpc_client.vpc_client", + new=vpc_client, + ): + from prowler.providers.aws.services.elasticache.elasticache_cluster_uses_public_subnet.elasticache_cluster_uses_public_subnet import ( + elasticache_cluster_uses_public_subnet, + ) + + check = elasticache_cluster_uses_public_subnet() + result = check.execute() + assert len(result) == 1 + assert result[0].status == "PASS" + assert ( + result[0].status_extended + == f"Cluster {ELASTICACHE_CLUSTER_NAME} is not using public subnets." + ) + assert result[0].region == AWS_REGION + assert result[0].resource_id == ELASTICACHE_CLUSTER_NAME + assert result[0].resource_arn == ELASTICACHE_CLUSTER_ARN + assert result[0].resource_tags == ELASTICACHE_CLUSTER_TAGS + + def test_elasticache_clusters_using_public_subnets(self): + # Mock ElastiCache Service + elasticache_service = MagicMock + elasticache_service.clusters = {} + + elasticache_service.clusters[ELASTICACHE_CLUSTER_ARN] = Cluster( + arn=ELASTICACHE_CLUSTER_ARN, + name=ELASTICACHE_CLUSTER_NAME, + id=ELASTICACHE_CLUSTER_NAME, + region=AWS_REGION, + cache_subnet_group_id=SUBNET_GROUP_NAME, + subnets=[SUBNET_1, SUBNET_2], + tags=ELASTICACHE_CLUSTER_TAGS, + ) + + # Mock VPC Service + vpc_client = MagicMock + vpc_client.vpc_subnets = {} + vpc_client.vpc_subnets[SUBNET_1] = VpcSubnet( + id=SUBNET_1, + name=SUBNET_1, + arn="arn_test", + default=False, + vpc_id=VPC_ID, + cidr_block="192.168.0.0/24", + availability_zone=AWS_REGION_AZ1, + public=True, + nat_gateway=False, + region=AWS_REGION, + tags=[], + mapPublicIpOnLaunch=False, + ) + vpc_client.vpc_subnets[SUBNET_2] = VpcSubnet( + id=SUBNET_2, + name=SUBNET_2, + arn="arn_test", + default=False, + vpc_id=VPC_ID, + cidr_block="192.168.0.1/24", + availability_zone=AWS_REGION_AZ2, + public=True, + nat_gateway=False, + region=AWS_REGION, + tags=[], + mapPublicIpOnLaunch=False, + ) + + with mock.patch( + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=self.set_mocked_audit_info(), + ), mock.patch( + "prowler.providers.aws.services.elasticache.elasticache_service.ElastiCache", + new=elasticache_service, + ), mock.patch( + "prowler.providers.aws.services.vpc.vpc_service.VPC", + new=vpc_client, + ), mock.patch( + "prowler.providers.aws.services.vpc.vpc_client.vpc_client", + new=vpc_client, + ): + from prowler.providers.aws.services.elasticache.elasticache_cluster_uses_public_subnet.elasticache_cluster_uses_public_subnet import ( + elasticache_cluster_uses_public_subnet, + ) + + check = elasticache_cluster_uses_public_subnet() + result = check.execute() + assert len(result) == 1 + assert result[0].status == "FAIL" + assert ( + result[0].status_extended + == f"Cluster {ELASTICACHE_CLUSTER_NAME} is using subnet-1, subnet-2 public subnets." + ) + assert result[0].region == AWS_REGION + assert result[0].resource_id == ELASTICACHE_CLUSTER_NAME + assert result[0].resource_arn == ELASTICACHE_CLUSTER_ARN + assert result[0].resource_tags == ELASTICACHE_CLUSTER_TAGS diff --git a/tests/providers/aws/services/elasticache/elasticache_service_test.py b/tests/providers/aws/services/elasticache/elasticache_service_test.py new file mode 100644 index 00000000..e41853da --- /dev/null +++ b/tests/providers/aws/services/elasticache/elasticache_service_test.py @@ -0,0 +1,168 @@ +import botocore +from boto3 import session +from mock import patch + +from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info +from prowler.providers.aws.services.elasticache.elasticache_service import ( + Cluster, + ElastiCache, +) +from prowler.providers.common.models import Audit_Metadata + +AWS_ACCOUNT_NUMBER = "123456789012" +AWS_ACCOUNT_ARN = f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:root" + +AWS_REGION = "us-east-1" +AWS_REGION_AZ1 = "us-east-1a" +AWS_REGION_AZ2 = "us-east-b" + +SUBNET_GROUP_NAME = "default" +SUBNET_1 = "subnet-1" +SUBNET_2 = "subnet-2" + +ELASTICACHE_CLUSTER_NAME = "test-cluster" +ELASTICACHE_CLUSTER_ARN = ( + f"arn:aws:elasticache:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:{ELASTICACHE_CLUSTER_NAME}" +) +ELASTICACHE_ENGINE = "redis" + +ELASTICACHE_CLUSTER_TAGS = [ + {"Key": "environment", "Value": "test"}, +] + +# Mocking Access Analyzer Calls +make_api_call = botocore.client.BaseClient._make_api_call + + +def mock_make_api_call(self, operation_name, kwargs): + """ + As you can see the operation_name has the list_analyzers snake_case form but + we are using the ListAnalyzers form. + Rationale -> https://github.com/boto/botocore/blob/develop/botocore/client.py#L810:L816 + + We have to mock every AWS API call using Boto3 + """ + if operation_name == "DescribeCacheClusters": + return { + "CacheClusters": [ + { + "CacheClusterId": ELASTICACHE_CLUSTER_NAME, + "CacheSubnetGroupName": SUBNET_GROUP_NAME, + "ARN": ELASTICACHE_CLUSTER_ARN, + }, + ] + } + if operation_name == "DescribeCacheSubnetGroups": + return { + "CacheSubnetGroups": [ + { + "CacheSubnetGroupName": SUBNET_GROUP_NAME, + "CacheSubnetGroupDescription": "Subnet Group", + "VpcId": "vpc-1", + "SubnetGroupStatus": "Complete", + "Subnets": [ + { + "SubnetIdentifier": "subnet-1", + "SubnetAvailabilityZone": {"Name": AWS_REGION_AZ1}, + "SubnetStatus": "Active", + }, + { + "SubnetIdentifier": "subnet-2", + "SubnetAvailabilityZone": {"Name": AWS_REGION_AZ2}, + "SubnetStatus": "Active", + }, + ], + "DBSubnetGroupArn": f"arn:aws:rds:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:subgrp:{SUBNET_GROUP_NAME}", + } + ] + } + if operation_name == "ListTagsForResource": + return {"TagList": ELASTICACHE_CLUSTER_TAGS} + + return make_api_call(self, operation_name, kwargs) + + +def mock_generate_regional_clients(service, audit_info, _): + regional_client = audit_info.audit_session.client(service, region_name=AWS_REGION) + regional_client.region = AWS_REGION + return {AWS_REGION: regional_client} + + +@patch( + "prowler.providers.aws.lib.service.service.generate_regional_clients", + new=mock_generate_regional_clients, +) +# Patch every AWS call using Boto3 +@patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call) +class Test_ElastiCache_Service: + # Mocked Audit Info + def set_mocked_audit_info(self): + audit_info = AWS_Audit_Info( + session_config=None, + original_session=None, + audit_session=session.Session( + profile_name=None, + botocore_session=None, + ), + audited_account=AWS_ACCOUNT_NUMBER, + audited_account_arn=AWS_ACCOUNT_ARN, + audited_user_id=None, + audited_partition="aws", + audited_identity_arn=None, + profile=None, + profile_region=None, + credentials=None, + assumed_role_info=None, + audited_regions=None, + organizations_metadata=None, + audit_resources=None, + mfa_enabled=False, + audit_metadata=Audit_Metadata( + services_scanned=0, + expected_checks=[], + completed_checks=0, + audit_progress=0, + ), + ) + return audit_info + + # Test ElastiCache Service + def test_service(self): + audit_info = self.set_mocked_audit_info() + elasticache = ElastiCache(audit_info) + assert elasticache.service == "elasticache" + + # Test ElastiCache Client] + def test_client(self): + audit_info = self.set_mocked_audit_info() + elasticache = ElastiCache(audit_info) + assert elasticache.client.__class__.__name__ == "ElastiCache" + + # Test ElastiCache Session + def test__get_session__(self): + audit_info = self.set_mocked_audit_info() + elasticache = ElastiCache(audit_info) + assert elasticache.session.__class__.__name__ == "Session" + + # Test ElastiCache Session + def test_audited_account(self): + audit_info = self.set_mocked_audit_info() + elasticache = ElastiCache(audit_info) + assert elasticache.audited_account == AWS_ACCOUNT_NUMBER + + # Test ElastiCache Clusters + def test_describe_cache_clusters(self): + audit_info = self.set_mocked_audit_info() + elasticache = ElastiCache(audit_info) + + assert len(elasticache.clusters) == 1 + assert elasticache.clusters[ELASTICACHE_CLUSTER_ARN] + assert elasticache.clusters[ELASTICACHE_CLUSTER_ARN] == Cluster( + arn=ELASTICACHE_CLUSTER_ARN, + name=ELASTICACHE_CLUSTER_NAME, + id=ELASTICACHE_CLUSTER_NAME, + region=AWS_REGION, + cache_subnet_group_id=SUBNET_GROUP_NAME, + subnets=[SUBNET_1, SUBNET_2], + tags=ELASTICACHE_CLUSTER_TAGS, + ) diff --git a/tests/providers/aws/services/iam/iam_service_test.py b/tests/providers/aws/services/iam/iam_service_test.py index 44f58122..c8fbe0df 100644 --- a/tests/providers/aws/services/iam/iam_service_test.py +++ b/tests/providers/aws/services/iam/iam_service_test.py @@ -1,7 +1,10 @@ from json import dumps +from uuid import uuid4 +import botocore from boto3 import client, session from freezegun import freeze_time +from mock import patch from moto import mock_iam from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info @@ -33,7 +36,46 @@ SUPPORT_SERVICE_ROLE_POLICY_ARN = ( ) ADMINISTRATOR_ACCESS_POLICY_ARN = "arn:aws:iam::aws:policy/AdministratorAccess" +# Mocking Access Analyzer Calls +make_api_call = botocore.client.BaseClient._make_api_call + +IAM_LAST_ACCESSED_SERVICES = [ + { + "ServiceName": "AWS EC2", + "ServiceNamespace": "ec2", + "TotalAuthenticatedEntities": 1, + }, + { + "ServiceName": "AWS Identity and Access Management", + "ServiceNamespace": "iam", + "TotalAuthenticatedEntities": 0, + }, +] + + +def mock_make_api_call(self, operation_name, kwargs): + """ + As you can see the operation_name has the list_analyzers snake_case form but + we are using the ListAnalyzers form. + Rationale -> https://github.com/boto/botocore/blob/develop/botocore/client.py#L810:L816 + We have to mock every AWS API call using Boto3 + """ + if operation_name == "GenerateServiceLastAccessedDetails": + return {"JobId": str(uuid4())} + if operation_name == "GetServiceLastAccessedDetails": + return { + "JobStatus": "COMPLETED", + "JobType": "SERVICE_LEVEL", + "JobCreationDate": "2023-10-19T06:11:11.449000+00:00", + "ServicesLastAccessed": IAM_LAST_ACCESSED_SERVICES, + } + + return make_api_call(self, operation_name, kwargs) + + +# Patch every AWS call using Boto3 +@patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call) class Test_IAM_Service: # Mocked Audit Info def set_mocked_audit_info(self): @@ -783,9 +825,7 @@ nTTxU4a7x1naFxzYXK1iQ1vMARKMjDb19QEJIEJKZlDK4uS7yMlf1nFS iam_client = client("iam") # Create IAM User user_name = "test_user" - user_arn = iam_client.create_user(UserName=user_name,)[ - "User" - ]["Arn"] + user_arn = iam_client.create_user(UserName=user_name)["User"]["Arn"] # Put User Policy policy_name = "test_not_admin_inline_policy" @@ -828,9 +868,7 @@ nTTxU4a7x1naFxzYXK1iQ1vMARKMjDb19QEJIEJKZlDK4uS7yMlf1nFS iam_client = client("iam") # Create IAM Group group_name = "test_group" - group_arn = iam_client.create_group(GroupName=group_name,)[ - "Group" - ]["Arn"] + group_arn = iam_client.create_group(GroupName=group_name)["Group"]["Arn"] # Put User Policy policy_name = "test_not_admin_inline_policy" @@ -910,3 +948,41 @@ nTTxU4a7x1naFxzYXK1iQ1vMARKMjDb19QEJIEJKZlDK4uS7yMlf1nFS document=INLINE_POLICY_NOT_ADMIN, entity=role_name, ) + + # Test IAM List Attached Group Policies + @mock_iam + def test__get_user_temporary_credentials_usage__(self): + # Generate IAM Client + iam_client = client("iam") + # Generate IAM user + username = "test-user" + user = iam_client.create_user( + UserName=username, + ) + user_arn = user["User"]["Arn"] + # Create Access Key + access_key = iam_client.create_access_key(UserName="test-user") + access_key_id = access_key["AccessKey"]["AccessKeyId"] + # IAM client for this test class + audit_info = self.set_mocked_audit_info() + iam = IAM(audit_info) + + assert len(iam.users) == 1 + + assert len(iam.access_keys_metadata) == 1 + assert iam.access_keys_metadata[(username, user_arn)] + + assert iam.access_keys_metadata[(username, user_arn)][0]["UserName"] == username + assert ( + iam.access_keys_metadata[(username, user_arn)][0]["AccessKeyId"] + == access_key_id + ) + assert iam.access_keys_metadata[(username, user_arn)][0]["Status"] == "Active" + assert iam.access_keys_metadata[(username, user_arn)][0]["CreateDate"] + + assert ( + iam.last_accessed_services[(username, user_arn)] + == IAM_LAST_ACCESSED_SERVICES + ) + + assert iam.user_temporary_credentials_usage[(username, user_arn)] diff --git a/tests/providers/aws/services/iam/iam_user_with_temporary_credentials/iam_user_with_temporary_credentials_test.py b/tests/providers/aws/services/iam/iam_user_with_temporary_credentials/iam_user_with_temporary_credentials_test.py new file mode 100644 index 00000000..b1fe34cb --- /dev/null +++ b/tests/providers/aws/services/iam/iam_user_with_temporary_credentials/iam_user_with_temporary_credentials_test.py @@ -0,0 +1,230 @@ +from unittest import mock + +from prowler.providers.aws.services.iam.iam_service import IAM + +AWS_REGION = "us-east-1" +AWS_ACCOUNT_NUMBER = "123456789012" + +IAM_USER_NAME = "test-user" +IAM_USER_ARN = f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:user/{IAM_USER_NAME}" +USER_DATA = (IAM_USER_NAME, IAM_USER_ARN) + + +class Test_iam_user_with_temporary_credentials: + def test_no_users(self): + iam_client = mock.MagicMock + iam_client.region = AWS_REGION + + iam_client.access_keys_metadata = {} + iam_client.last_accessed_services = {} + + # Generate temporary credentials usage + iam_client.user_temporary_credentials_usage = {} + iam_client.__get_user_temporary_credentials_usage__ = ( + IAM.__get_user_temporary_credentials_usage__ + ) + iam_client.__get_user_temporary_credentials_usage__(iam_client) + + with mock.patch( + "prowler.providers.aws.services.iam.iam_service.IAM", + new=iam_client, + ) as iam_service, mock.patch( + "prowler.providers.aws.services.iam.iam_client.iam_client", + new=iam_service, + ): + from prowler.providers.aws.services.iam.iam_user_with_temporary_credentials.iam_user_with_temporary_credentials import ( + iam_user_with_temporary_credentials, + ) + + check = iam_user_with_temporary_credentials() + result = check.execute() + assert len(result) == 0 + + def test_user_no_access_keys_no_accesed_services(self): + iam_client = mock.MagicMock + iam_client.region = AWS_REGION + + iam_client.access_keys_metadata = {USER_DATA: []} + iam_client.last_accessed_services = {USER_DATA: []} + + # Generate temporary credentials usage + iam_client.user_temporary_credentials_usage = {} + iam_client.__get_user_temporary_credentials_usage__ = ( + IAM.__get_user_temporary_credentials_usage__ + ) + iam_client.__get_user_temporary_credentials_usage__(iam_client) + + with mock.patch( + "prowler.providers.aws.services.iam.iam_service.IAM", + new=iam_client, + ) as iam_service, mock.patch( + "prowler.providers.aws.services.iam.iam_client.iam_client", + new=iam_service, + ): + from prowler.providers.aws.services.iam.iam_user_with_temporary_credentials.iam_user_with_temporary_credentials import ( + iam_user_with_temporary_credentials, + ) + + check = iam_user_with_temporary_credentials() + result = check.execute() + assert len(result) == 1 + assert result[0].status == "PASS" + assert ( + result[0].status_extended + == f"User {IAM_USER_NAME} doesn't have long lived credentials with access to other services than IAM or STS." + ) + assert result[0].resource_id == IAM_USER_NAME + assert result[0].resource_arn == IAM_USER_ARN + assert result[0].region == AWS_REGION + + def test_user_access_keys_no_accesed_services(self): + iam_client = mock.MagicMock + iam_client.region = AWS_REGION + + iam_client.access_keys_metadata = {USER_DATA: [{"AccessKeyId": 1}]} + iam_client.last_accessed_services = {USER_DATA: []} + + # Generate temporary credentials usage + iam_client.user_temporary_credentials_usage = {} + iam_client.__get_user_temporary_credentials_usage__ = ( + IAM.__get_user_temporary_credentials_usage__ + ) + iam_client.__get_user_temporary_credentials_usage__(iam_client) + + with mock.patch( + "prowler.providers.aws.services.iam.iam_service.IAM", + new=iam_client, + ) as iam_service, mock.patch( + "prowler.providers.aws.services.iam.iam_client.iam_client", + new=iam_service, + ): + from prowler.providers.aws.services.iam.iam_user_with_temporary_credentials.iam_user_with_temporary_credentials import ( + iam_user_with_temporary_credentials, + ) + + check = iam_user_with_temporary_credentials() + result = check.execute() + assert len(result) == 1 + assert result[0].status == "PASS" + assert ( + result[0].status_extended + == f"User {IAM_USER_NAME} doesn't have long lived credentials with access to other services than IAM or STS." + ) + assert result[0].resource_id == IAM_USER_NAME + assert result[0].resource_arn == IAM_USER_ARN + assert result[0].region == AWS_REGION + + def test_user_access_keys_accesed_services_sts(self): + iam_client = mock.MagicMock + iam_client.region = AWS_REGION + + iam_client.access_keys_metadata = {USER_DATA: [{"AccessKeyId": 1}]} + iam_client.last_accessed_services = {USER_DATA: [{"ServiceNamespace": "sts"}]} + + # Generate temporary credentials usage + iam_client.user_temporary_credentials_usage = {} + iam_client.__get_user_temporary_credentials_usage__ = ( + IAM.__get_user_temporary_credentials_usage__ + ) + iam_client.__get_user_temporary_credentials_usage__(iam_client) + + with mock.patch( + "prowler.providers.aws.services.iam.iam_service.IAM", + new=iam_client, + ) as iam_service, mock.patch( + "prowler.providers.aws.services.iam.iam_client.iam_client", + new=iam_service, + ): + from prowler.providers.aws.services.iam.iam_user_with_temporary_credentials.iam_user_with_temporary_credentials import ( + iam_user_with_temporary_credentials, + ) + + check = iam_user_with_temporary_credentials() + result = check.execute() + assert len(result) == 1 + assert result[0].status == "PASS" + assert ( + result[0].status_extended + == f"User {IAM_USER_NAME} doesn't have long lived credentials with access to other services than IAM or STS." + ) + assert result[0].resource_id == IAM_USER_NAME + assert result[0].resource_arn == IAM_USER_ARN + assert result[0].region == AWS_REGION + + def test_access_keys_with_iam_and_sts(self): + iam_client = mock.MagicMock + iam_client.region = AWS_REGION + + iam_client.access_keys_metadata = {USER_DATA: [{"AccessKeyId": 1}]} + iam_client.last_accessed_services = { + USER_DATA: [{"ServiceNamespace": "sts"}, {"ServiceNamespace": "iam"}] + } + + # Generate temporary credentials usage + iam_client.user_temporary_credentials_usage = {} + iam_client.__get_user_temporary_credentials_usage__ = ( + IAM.__get_user_temporary_credentials_usage__ + ) + iam_client.__get_user_temporary_credentials_usage__(iam_client) + + with mock.patch( + "prowler.providers.aws.services.iam.iam_service.IAM", + new=iam_client, + ) as iam_service, mock.patch( + "prowler.providers.aws.services.iam.iam_client.iam_client", + new=iam_service, + ): + from prowler.providers.aws.services.iam.iam_user_with_temporary_credentials.iam_user_with_temporary_credentials import ( + iam_user_with_temporary_credentials, + ) + + check = iam_user_with_temporary_credentials() + result = check.execute() + assert len(result) == 1 + assert result[0].status == "PASS" + assert ( + result[0].status_extended + == f"User {IAM_USER_NAME} doesn't have long lived credentials with access to other services than IAM or STS." + ) + assert result[0].resource_id == IAM_USER_NAME + assert result[0].resource_arn == IAM_USER_ARN + assert result[0].region == AWS_REGION + + def test_access_keys_with_iam_and_ec2(self): + iam_client = mock.MagicMock + iam_client.region = AWS_REGION + + iam_client.access_keys_metadata = {USER_DATA: [{"AccessKeyId": 1}]} + iam_client.last_accessed_services = { + USER_DATA: [{"ServiceNamespace": "iam"}, {"ServiceNamespace": "ec2"}] + } + + # Generate temporary credentials usage + iam_client.user_temporary_credentials_usage = {} + iam_client.__get_user_temporary_credentials_usage__ = ( + IAM.__get_user_temporary_credentials_usage__ + ) + iam_client.__get_user_temporary_credentials_usage__(iam_client) + + with mock.patch( + "prowler.providers.aws.services.iam.iam_service.IAM", + new=iam_client, + ) as iam_service, mock.patch( + "prowler.providers.aws.services.iam.iam_client.iam_client", + new=iam_service, + ): + from prowler.providers.aws.services.iam.iam_user_with_temporary_credentials.iam_user_with_temporary_credentials import ( + iam_user_with_temporary_credentials, + ) + + check = iam_user_with_temporary_credentials() + result = check.execute() + assert len(result) == 1 + assert result[0].status == "FAIL" + assert ( + result[0].status_extended + == f"User {IAM_USER_NAME} has long lived credentials with access to other services than IAM or STS." + ) + assert result[0].resource_id == IAM_USER_NAME + assert result[0].resource_arn == IAM_USER_ARN + assert result[0].region == AWS_REGION diff --git a/tests/providers/aws/services/neptune/neptune_cluster_uses_public_subnet/neptune_cluster_uses_public_subnet_test.py b/tests/providers/aws/services/neptune/neptune_cluster_uses_public_subnet/neptune_cluster_uses_public_subnet_test.py new file mode 100644 index 00000000..c7ccbf7c --- /dev/null +++ b/tests/providers/aws/services/neptune/neptune_cluster_uses_public_subnet/neptune_cluster_uses_public_subnet_test.py @@ -0,0 +1,249 @@ +from unittest import mock + +from boto3 import client, session +from mock import MagicMock, patch +from moto import mock_ec2, mock_neptune + +from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info +from prowler.providers.aws.services.neptune.neptune_service import Neptune +from prowler.providers.aws.services.vpc.vpc_service import VpcSubnet +from prowler.providers.common.models import Audit_Metadata +from tests.providers.aws.services.neptune.neptune_service_test import ( + AWS_REGION_AZ1, + AWS_REGION_AZ2, + NEPTUNE_CLUSTER_NAME, + NEPTUNE_CLUSTER_TAGS, + NEPTUNE_ENGINE, + SUBNET_1, + SUBNET_2, + mock_make_api_call, +) + +AWS_ACCOUNT_NUMBER = "123456789012" +AWS_ACCOUNT_ARN = f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:root" +AWS_REGION = "us-east-1" + +VPC_ID = "vpc-12345678901234567" + + +# Patch every AWS call using Boto3 +@patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call) +class Test_neptune_cluster_uses_public_subnet: + def set_mocked_audit_info(self): + audit_info = AWS_Audit_Info( + session_config=None, + original_session=None, + audit_session=session.Session( + profile_name=None, + botocore_session=None, + ), + audited_account=AWS_ACCOUNT_NUMBER, + audited_account_arn=AWS_ACCOUNT_ARN, + audited_user_id=None, + audited_partition="aws", + audited_identity_arn=None, + profile=None, + profile_region=None, + credentials=None, + assumed_role_info=None, + audited_regions=[AWS_REGION], + organizations_metadata=None, + audit_resources=None, + mfa_enabled=False, + audit_metadata=Audit_Metadata( + services_scanned=0, + expected_checks=[], + completed_checks=0, + audit_progress=0, + ), + ) + return audit_info + + @mock_neptune + @mock_ec2 + def test_neptune_no_clusters(self): + # Mock VPC Service + vpc_client = MagicMock + vpc_client.vpc_subnets = {} + + audit_info = self.set_mocked_audit_info() + + with mock.patch( + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=audit_info, + ), mock.patch( + "prowler.providers.aws.services.neptune.neptune_cluster_uses_public_subnet.neptune_cluster_uses_public_subnet.neptune_client", + new=Neptune(audit_info), + ), mock.patch( + "prowler.providers.aws.services.neptune.neptune_cluster_uses_public_subnet.neptune_cluster_uses_public_subnet.vpc_client", + new=vpc_client, + ): + from prowler.providers.aws.services.neptune.neptune_cluster_uses_public_subnet.neptune_cluster_uses_public_subnet import ( + neptune_cluster_uses_public_subnet, + ) + + check = neptune_cluster_uses_public_subnet() + result = check.execute() + assert len(result) == 0 + + @mock_neptune + def test_neptune_clusters_using_private_subnets(self): + # Mock VPC Service + vpc_client = MagicMock + vpc_client.vpc_subnets = {} + vpc_client.vpc_subnets[SUBNET_1] = VpcSubnet( + id=SUBNET_1, + arn="arn_test", + name=SUBNET_1, + default=False, + vpc_id=VPC_ID, + cidr_block="192.168.0.0/24", + availability_zone=AWS_REGION_AZ1, + public=False, + nat_gateway=False, + region=AWS_REGION, + tags=[], + mapPublicIpOnLaunch=False, + ) + vpc_client.vpc_subnets[SUBNET_2] = VpcSubnet( + id=SUBNET_2, + arn="arn_test", + name=SUBNET_2, + default=False, + vpc_id=VPC_ID, + cidr_block="192.168.0.1/24", + availability_zone=AWS_REGION_AZ2, + public=False, + nat_gateway=False, + region=AWS_REGION, + tags=[], + mapPublicIpOnLaunch=False, + ) + + # Neptune client + neptune_client = client("neptune", region_name=AWS_REGION) + # Create Neptune Cluster + cluster = neptune_client.create_db_cluster( + AvailabilityZones=[AWS_REGION_AZ1, AWS_REGION_AZ2], + BackupRetentionPeriod=1, + CopyTagsToSnapshot=True, + Engine=NEPTUNE_ENGINE, + DatabaseName=NEPTUNE_CLUSTER_NAME, + DBClusterIdentifier=NEPTUNE_CLUSTER_NAME, + Port=123, + Tags=NEPTUNE_CLUSTER_TAGS, + StorageEncrypted=False, + DeletionProtection=True | False, + )["DBCluster"] + + cluster_arn = cluster["DBClusterArn"] + cluster_id = cluster["DbClusterResourceId"] + + audit_info = self.set_mocked_audit_info() + with mock.patch( + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=audit_info, + ), mock.patch( + "prowler.providers.aws.services.neptune.neptune_cluster_uses_public_subnet.neptune_cluster_uses_public_subnet.neptune_client", + new=Neptune(audit_info), + ), mock.patch( + "prowler.providers.aws.services.neptune.neptune_cluster_uses_public_subnet.neptune_cluster_uses_public_subnet.vpc_client", + new=vpc_client, + ): + from prowler.providers.aws.services.neptune.neptune_cluster_uses_public_subnet.neptune_cluster_uses_public_subnet import ( + neptune_cluster_uses_public_subnet, + ) + + check = neptune_cluster_uses_public_subnet() + result = check.execute() + assert len(result) == 1 + assert result[0].status == "PASS" + assert ( + result[0].status_extended + == f"Cluster {cluster_id} is not using public subnets." + ) + assert result[0].region == AWS_REGION + assert result[0].resource_id == cluster_id + assert result[0].resource_arn == cluster_arn + assert result[0].resource_tags == NEPTUNE_CLUSTER_TAGS + + @mock_neptune + def test_neptune_clusters_using_public_subnets(self): + # Mock VPC Service + vpc_client = MagicMock + vpc_client.vpc_subnets = {} + vpc_client.vpc_subnets[SUBNET_1] = VpcSubnet( + id=SUBNET_1, + arn="arn_test", + name=SUBNET_1, + default=False, + vpc_id=VPC_ID, + cidr_block="192.168.0.0/24", + availability_zone=AWS_REGION_AZ1, + public=True, + nat_gateway=False, + region=AWS_REGION, + tags=[], + mapPublicIpOnLaunch=False, + ) + vpc_client.vpc_subnets[SUBNET_2] = VpcSubnet( + id=SUBNET_2, + arn="arn_test", + name=SUBNET_2, + default=False, + vpc_id=VPC_ID, + cidr_block="192.168.0.1/24", + availability_zone=AWS_REGION_AZ2, + public=True, + nat_gateway=False, + region=AWS_REGION, + tags=[], + mapPublicIpOnLaunch=False, + ) + + # Neptune client + neptune_client = client("neptune", region_name=AWS_REGION) + # Create Neptune Cluster + cluster = neptune_client.create_db_cluster( + AvailabilityZones=[AWS_REGION_AZ1, AWS_REGION_AZ2], + BackupRetentionPeriod=1, + CopyTagsToSnapshot=True, + Engine=NEPTUNE_ENGINE, + DatabaseName=NEPTUNE_CLUSTER_NAME, + DBClusterIdentifier=NEPTUNE_CLUSTER_NAME, + Port=123, + Tags=NEPTUNE_CLUSTER_TAGS, + StorageEncrypted=False, + DeletionProtection=True | False, + )["DBCluster"] + + cluster_arn = cluster["DBClusterArn"] + cluster_id = cluster["DbClusterResourceId"] + + audit_info = self.set_mocked_audit_info() + with mock.patch( + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=audit_info, + ), mock.patch( + "prowler.providers.aws.services.neptune.neptune_cluster_uses_public_subnet.neptune_cluster_uses_public_subnet.neptune_client", + new=Neptune(audit_info), + ), mock.patch( + "prowler.providers.aws.services.neptune.neptune_cluster_uses_public_subnet.neptune_cluster_uses_public_subnet.vpc_client", + new=vpc_client, + ): + from prowler.providers.aws.services.neptune.neptune_cluster_uses_public_subnet.neptune_cluster_uses_public_subnet import ( + neptune_cluster_uses_public_subnet, + ) + + check = neptune_cluster_uses_public_subnet() + result = check.execute() + assert len(result) == 1 + assert result[0].status == "FAIL" + assert ( + result[0].status_extended + == f"Cluster {cluster_id} is using subnet-1, subnet-2 public subnets." + ) + assert result[0].region == AWS_REGION + assert result[0].resource_id == cluster_id + assert result[0].resource_arn == cluster_arn + assert result[0].resource_tags == NEPTUNE_CLUSTER_TAGS diff --git a/tests/providers/aws/services/neptune/neptune_service_test.py b/tests/providers/aws/services/neptune/neptune_service_test.py new file mode 100644 index 00000000..33a9e92e --- /dev/null +++ b/tests/providers/aws/services/neptune/neptune_service_test.py @@ -0,0 +1,177 @@ +import botocore +from boto3 import client, session +from mock import patch +from moto import mock_neptune + +from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info +from prowler.providers.aws.services.neptune.neptune_service import Cluster, Neptune +from prowler.providers.common.models import Audit_Metadata + +AWS_ACCOUNT_NUMBER = "123456789012" +AWS_ACCOUNT_ARN = f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:root" + +AWS_REGION = "us-east-1" +AWS_REGION_AZ1 = "us-east-1a" +AWS_REGION_AZ2 = "us-east-b" + +SUBNET_GROUP_NAME = "default" +SUBNET_1 = "subnet-1" +SUBNET_2 = "subnet-2" + +NEPTUNE_CLUSTER_NAME = "test-cluster" +NEPTUNE_ENGINE = "neptune" + +NEPTUNE_CLUSTER_TAGS = [ + {"Key": "environment", "Value": "test"}, +] + +# Mocking Access Analyzer Calls +make_api_call = botocore.client.BaseClient._make_api_call + + +def mock_make_api_call(self, operation_name, kwargs): + """ + As you can see the operation_name has the list_analyzers snake_case form but + we are using the ListAnalyzers form. + Rationale -> https://github.com/boto/botocore/blob/develop/botocore/client.py#L810:L816 + + We have to mock every AWS API call using Boto3 + """ + if operation_name == "DescribeDBSubnetGroups": + return { + "DBSubnetGroups": [ + { + "DBSubnetGroupName": SUBNET_GROUP_NAME, + "DBSubnetGroupDescription": "Subnet Group", + "VpcId": "vpc-1", + "SubnetGroupStatus": "Complete", + "Subnets": [ + { + "SubnetIdentifier": "subnet-1", + "SubnetAvailabilityZone": {"Name": AWS_REGION_AZ1}, + "SubnetStatus": "Active", + }, + { + "SubnetIdentifier": "subnet-2", + "SubnetAvailabilityZone": {"Name": AWS_REGION_AZ2}, + "SubnetStatus": "Active", + }, + ], + "DBSubnetGroupArn": f"arn:aws:rds:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:subgrp:{SUBNET_GROUP_NAME}", + } + ] + } + if operation_name == "ListTagsForResource": + return {"TagList": NEPTUNE_CLUSTER_TAGS} + + return make_api_call(self, operation_name, kwargs) + + +def mock_generate_regional_clients(service, audit_info, _): + regional_client = audit_info.audit_session.client(service, region_name=AWS_REGION) + regional_client.region = AWS_REGION + return {AWS_REGION: regional_client} + + +@patch( + "prowler.providers.aws.lib.service.service.generate_regional_clients", + new=mock_generate_regional_clients, +) +# Patch every AWS call using Boto3 +@patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call) +class Test_Neptune_Service: + # Mocked Audit Info + def set_mocked_audit_info(self): + audit_info = AWS_Audit_Info( + session_config=None, + original_session=None, + audit_session=session.Session( + profile_name=None, + botocore_session=None, + ), + audited_account=AWS_ACCOUNT_NUMBER, + audited_account_arn=AWS_ACCOUNT_ARN, + audited_user_id=None, + audited_partition="aws", + audited_identity_arn=None, + profile=None, + profile_region=None, + credentials=None, + assumed_role_info=None, + audited_regions=None, + organizations_metadata=None, + audit_resources=None, + mfa_enabled=False, + audit_metadata=Audit_Metadata( + services_scanned=0, + expected_checks=[], + completed_checks=0, + audit_progress=0, + ), + ) + return audit_info + + # Test Neptune Service + @mock_neptune + def test_service(self): + audit_info = self.set_mocked_audit_info() + neptune = Neptune(audit_info) + assert neptune.service == "neptune" + + # Test Neptune Client] + @mock_neptune + def test_client(self): + audit_info = self.set_mocked_audit_info() + neptune = Neptune(audit_info) + assert neptune.client.__class__.__name__ == "Neptune" + + # Test Neptune Session + @mock_neptune + def test__get_session__(self): + audit_info = self.set_mocked_audit_info() + neptune = Neptune(audit_info) + assert neptune.session.__class__.__name__ == "Session" + + # Test Neptune Session + @mock_neptune + def test_audited_account(self): + audit_info = self.set_mocked_audit_info() + neptune = Neptune(audit_info) + assert neptune.audited_account == AWS_ACCOUNT_NUMBER + + # Test Neptune Get Neptune Contacts + @mock_neptune + def test_describe_db_clusters(self): + # Neptune client + neptune_client = client("neptune", region_name=AWS_REGION) + # Create Neptune Cluster + cluster = neptune_client.create_db_cluster( + AvailabilityZones=[AWS_REGION_AZ1, AWS_REGION_AZ2], + BackupRetentionPeriod=1, + CopyTagsToSnapshot=True, + Engine=NEPTUNE_ENGINE, + DatabaseName=NEPTUNE_CLUSTER_NAME, + DBClusterIdentifier=NEPTUNE_CLUSTER_NAME, + Port=123, + Tags=NEPTUNE_CLUSTER_TAGS, + StorageEncrypted=False, + DeletionProtection=True | False, + )["DBCluster"] + + cluster_arn = cluster["DBClusterArn"] + cluster_id = cluster["DbClusterResourceId"] + + audit_info = self.set_mocked_audit_info() + neptune = Neptune(audit_info) + + assert len(neptune.clusters) == 1 + assert neptune.clusters[cluster_arn] + assert neptune.clusters[cluster_arn] == Cluster( + arn=cluster_arn, + name=NEPTUNE_CLUSTER_NAME, + id=cluster_id, + region=AWS_REGION, + db_subnet_group_id=SUBNET_GROUP_NAME, + subnets=[SUBNET_1, SUBNET_2], + tags=NEPTUNE_CLUSTER_TAGS, + )