feat(aws): New CloudTrail, DLM, DocumentDB, EC2, Account and Support checks (#2675)

Co-authored-by: Pepe Fagoaga <pepe@verica.io>
Co-authored-by: Sergio Garcia <sergargar1@gmail.com>
This commit is contained in:
Jit
2023-10-17 18:00:37 +01:00
committed by GitHub
parent f3b7f841fb
commit 85e12e9479
44 changed files with 2164 additions and 36 deletions

View File

@@ -67,6 +67,10 @@ aws:
# MEDIUM
ecr_repository_vulnerability_minimum_severity: "MEDIUM"
# AWS Trusted Advisor
# trustedadvisor_premium_support_plan_subscribed
verify_premium_support_plans: True
# Azure Configuration
azure:

View File

@@ -0,0 +1,32 @@
{
"Provider": "aws",
"CheckID": "account_maintain_different_contact_details_to_security_billing_and_operations",
"CheckTitle": "Maintain different contact details to security, billing and operations.",
"CheckType": [
"IAM"
],
"ServiceName": "account",
"SubServiceName": "",
"ResourceIdTemplate": "arn:partition:access-recorder:region:account-id:recorder/resource-id",
"Severity": "medium",
"ResourceType": "Other",
"Description": "Maintain different contact details to security, billing and operations.",
"Risk": "Ensure contact email and telephone details for AWS accounts are current and map to more than one individual in your organization. An AWS account supports a number of contact details; and AWS will use these to contact the account owner if activity judged to be in breach of Acceptable Use Policy. If an AWS account is observed to be behaving in a prohibited or suspicious manner; AWS will attempt to contact the account owner by email and phone using the contact details listed. If this is unsuccessful and the account behavior needs urgent mitigation; proactive measures may be taken; including throttling of traffic between the account exhibiting suspicious behavior and the AWS API endpoints and the Internet. This will result in impaired service to and from the account in question.",
"RelatedUrl": "https://docs.aws.amazon.com/accounts/latest/reference/manage-acct-update-contact.html",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "https://docs.bridgecrew.io/docs/iam_18-maintain-contact-details#aws-console",
"Terraform": ""
},
"Recommendation": {
"Text": "Using the Billing and Cost Management console complete contact details.",
"Url": "https://docs.aws.amazon.com/accounts/latest/reference/manage-acct-update-contact.html"
}
},
"Categories": [],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}

View File

@@ -0,0 +1,27 @@
from prowler.lib.check.models import Check, Check_Report_AWS
from prowler.providers.aws.services.account.account_client import account_client
class account_maintain_different_contact_details_to_security_billing_and_operations(
Check
):
def execute(self):
report = Check_Report_AWS(self.metadata())
report.region = account_client.region
report.resource_id = account_client.audited_account
report.resource_arn = account_client.audited_account_arn
if (
len(account_client.contact_phone_numbers)
== account_client.number_of_contacts
and len(account_client.contact_names) == account_client.number_of_contacts
# This is because the primary contact has no email field
and len(account_client.contact_emails)
== account_client.number_of_contacts - 1
):
report.status = "PASS"
report.status_extended = "SECURITY, BILLING and OPERATIONS contacts found and they are different between each other and between ROOT contact."
else:
report.status = "FAIL"
report.status_extended = "SECURITY, BILLING and OPERATIONS contacts not found or they are not different between each other and between ROOT contact."
return [report]

View File

@@ -1,4 +1,10 @@
################## Account
from typing import Optional
from venv import logger
from botocore.client import ClientError
from pydantic import BaseModel
from prowler.providers.aws.lib.service.service import AWSService
@@ -6,6 +12,89 @@ class Account(AWSService):
def __init__(self, audit_info):
# Call AWSService's __init__
super().__init__(__class__.__name__, audit_info)
self.number_of_contacts = 4
self.contact_base = self.__get_contact_information__()
self.contacts_billing = self.__get_alternate_contact__("BILLING")
self.contacts_security = self.__get_alternate_contact__("SECURITY")
self.contacts_operations = self.__get_alternate_contact__("OPERATIONS")
# Set of contact phone numbers
self.contact_phone_numbers = {
self.contact_base.phone_number,
self.contacts_billing.phone_number,
self.contacts_security.phone_number,
self.contacts_operations.phone_number,
}
# Set of contact names
self.contact_names = {
self.contact_base.name,
self.contacts_billing.name,
self.contacts_security.name,
self.contacts_operations.name,
}
# Set of contact emails
self.contact_emails = {
self.contacts_billing.email,
self.contacts_security.email,
self.contacts_operations.email,
}
def __get_contact_information__(self):
try:
primary_account_contact = self.client.get_contact_information()[
"ContactInformation"
]
return Contact(
type="PRIMARY",
name=primary_account_contact.get("FullName"),
phone_number=primary_account_contact.get("PhoneNumber"),
)
except Exception as error:
logger.error(
f"{self.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return Contact(type="PRIMARY")
def __get_alternate_contact__(self, contact_type: str):
try:
account_contact = self.client.get_alternate_contact(
AlternateContactType=contact_type
)["AlternateContact"]
return Contact(
type=contact_type,
email=account_contact.get("EmailAddress"),
name=account_contact.get("Name"),
phone_number=account_contact.get("PhoneNumber"),
)
except ClientError as error:
if (
error.response["Error"]["Code"] == "ResourceNotFoundException"
and error.response["Error"]["Message"]
== "No contact of the inputted alternate contact type found."
):
logger.warning(
f"{self.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return Contact(
type=contact_type,
)
except Exception as error:
logger.error(
f"{self.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return Contact(
type=contact_type,
)
### This service don't need boto3 calls
class Contact(BaseModel):
type: str
email: Optional[str]
name: Optional[str]
phone_number: Optional[str]

View File

@@ -23,7 +23,7 @@ class cloudtrail_multi_region_enabled(Check):
)
else:
report.status_extended = f"Trail {trail.name} is not multiregion and it is logging."
# Since there exists a logging trail in that region there is no point in checking the reamaining trails
# Since there exists a logging trail in that region there is no point in checking the remaining trails
# Store the finding and exit the loop
findings.append(report)
break

View File

@@ -0,0 +1,34 @@
{
"Provider": "aws",
"CheckID": "cloudtrail_multi_region_enabled_logging_management_events",
"CheckTitle": "Ensure CloudTrail logging management events in All Regions",
"CheckType": [
"CIS AWS Foundations Benchmark"
],
"ServiceName": "cloudtrail",
"SubServiceName": "",
"ResourceIdTemplate": "arn:partition:service:region:account-id:resource-id",
"Severity": "low",
"ResourceType": "AwsCloudTrailTrail",
"Description": "Ensure CloudTrail logging management events in All Regions",
"Risk": "AWS CloudTrail enables governance, compliance, operational auditing, and risk auditing of your AWS account. To meet FTR requirements, you must have management events enabled for all AWS accounts and in all regions and aggregate these logs into an Amazon Simple Storage Service (Amazon S3) bucket owned by a separate AWS account.",
"RelatedUrl": "https://docs.bridgecrew.io/docs/logging_14",
"Remediation": {
"Code": {
"CLI": "aws cloudtrail update-trail --name <trail_name> --is-multi-region-trail",
"NativeIaC": "",
"Other": "https://docs.bridgecrew.io/docs/logging_14",
"Terraform": "https://docs.bridgecrew.io/docs/logging_14#terraform"
},
"Recommendation": {
"Text": "Enable CloudTrail logging management events in All Regions",
"Url": "https://docs.bridgecrew.io/docs/logging_14"
}
},
"Categories": [
"forensics-ready"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}

View File

@@ -0,0 +1,54 @@
from prowler.lib.check.models import Check, Check_Report_AWS
from prowler.providers.aws.services.cloudtrail.cloudtrail_client import (
cloudtrail_client,
)
class cloudtrail_multi_region_enabled_logging_management_events(Check):
def execute(self):
findings = []
report = Check_Report_AWS(self.metadata())
report.status = "FAIL"
report.status_extended = (
"No trail found with multi-region enabled and logging management events."
)
report.region = cloudtrail_client.region
report.resource_id = cloudtrail_client.audited_account
report.resource_arn = cloudtrail_client.audited_account_arn
for trail in cloudtrail_client.trails:
if trail.is_logging:
if trail.is_multiregion:
for event in trail.data_events:
# Classic event selectors
if not event.is_advanced:
# Check if trail has IncludeManagementEvents and ReadWriteType is All
if (
event.event_selector["ReadWriteType"] == "All"
and event.event_selector["IncludeManagementEvents"]
):
report.region = trail.region
report.resource_id = trail.name
report.resource_arn = trail.arn
report.resource_tags = trail.tags
report.status = "PASS"
report.status_extended = f"Trail {trail.name} from home region {trail.home_region} is multi-region, is logging and have management events enabled."
# Advanced event selectors
elif event.is_advanced:
if event.event_selector.get(
"Name"
) == "Management events selector" and all(
[
field["Field"] != "readOnly"
for field in event.event_selector["FieldSelectors"]
]
):
report.region = trail.region
report.resource_id = trail.name
report.resource_arn = trail.arn
report.resource_tags = trail.tags
report.status = "PASS"
report.status_extended = f"Trail {trail.name} from home region {trail.home_region} is multi-region, is logging and have management events enabled."
findings.append(report)
return findings

View File

@@ -15,7 +15,7 @@ class cloudtrail_s3_dataevents_write_enabled(Check):
report.status_extended = "No CloudTrail trails have a data event to record all S3 object-level API operations."
for trail in cloudtrail_client.trails:
for data_event in trail.data_events:
# classic event selectors
# Classic event selectors
if not data_event.is_advanced:
# Check if trail has a data event for all S3 Buckets for write
if (
@@ -37,7 +37,7 @@ class cloudtrail_s3_dataevents_write_enabled(Check):
report.resource_tags = trail.tags
report.status = "PASS"
report.status_extended = f"Trail {trail.name} from home region {trail.home_region} has a classic data event selector to record all S3 object-level API operations."
# advanced event selectors
# Advanced event selectors
elif data_event.is_advanced:
for field_selector in data_event.event_selector["FieldSelectors"]:
if (

View File

@@ -93,7 +93,7 @@ class Cloudtrail(AWSService):
for region, client in self.regional_clients.items():
if trail.region == region and trail.name:
data_events = client.get_event_selectors(TrailName=trail.arn)
# check if key exists and array associated to that key is not empty
# EventSelectors
if (
"EventSelectors" in data_events
and data_events["EventSelectors"]
@@ -103,7 +103,7 @@ class Cloudtrail(AWSService):
is_advanced=False, event_selector=event
)
trail.data_events.append(event_selector)
# check if key exists and array associated to that key is not empty
# AdvancedEventSelectors
elif (
"AdvancedEventSelectors" in data_events
and data_events["AdvancedEventSelectors"]

View File

@@ -0,0 +1,4 @@
from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info
from prowler.providers.aws.services.dlm.dlm_service import DLM
dlm_client = DLM(current_audit_info)

View File

@@ -0,0 +1,34 @@
{
"Provider": "aws",
"CheckID": "dlm_ebs_snapshot_lifecycle_policy_exists",
"CheckTitle": "Ensure EBS Snapshot lifecycle policies are defined.",
"CheckType": [
"Data Protection"
],
"ServiceName": "dlm",
"SubServiceName": "ebs",
"ResourceIdTemplate": "arn:aws:iam::account-id:resource-id",
"Severity": "medium",
"ResourceType": "Other",
"Description": "Ensure EBS Snapshot lifecycle policies are defined.",
"Risk": "With AWS DLM service, you can manage the lifecycle of your EBS volume snapshots. By automating the EBS volume backup management using lifecycle policies, you can protect your EBS data by enforcing a regular backup schedule, retain backups as required by auditors or internal compliance.",
"RelatedUrl": "https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/snapshot-lifecycle.html#dlm-elements",
"Remediation": {
"Code": {
"CLI": "aws dlm create-lifecycle-policy --region <region> --execution-role-arn <execution-role-arn> --description <description> --state ENABLED --policy-details file://lifecycle-policy-config.json",
"NativeIaC": "",
"Other": "https://www.trendmicro.com/cloudoneconformity/knowledge-base/aws/DLM/ebs-snapshot-automation.html",
"Terraform": ""
},
"Recommendation": {
"Text": "To use Amazon Data Lifecycle Manager (DLM) service to manage the lifecycle of your EBS volume snapshots, you have to tag your AWS EBS volumes and create data lifecycle policies via Amazon DLM.",
"Url": "https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/snapshot-lifecycle.html#dlm-elements"
}
},
"Categories": [
"forensics-ready"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}

View File

@@ -0,0 +1,24 @@
from prowler.lib.check.models import Check, Check_Report_AWS
from prowler.providers.aws.services.dlm.dlm_client import dlm_client
from prowler.providers.aws.services.ec2.ec2_client import ec2_client
class dlm_ebs_snapshot_lifecycle_policy_exists(Check):
def execute(self):
findings = []
for region in dlm_client.lifecycle_policies:
if (
region in ec2_client.regions_with_snapshots
and ec2_client.regions_with_snapshots[region]
):
report = Check_Report_AWS(self.metadata())
report.status = "FAIL"
report.status_extended = "No EBS Snapshot lifecycle policies found."
report.region = region
report.resource_id = dlm_client.audited_account
report.resource_arn = dlm_client.audited_account_arn
if dlm_client.lifecycle_policies[region]:
report.status = "PASS"
report.status_extended = "EBS snapshot lifecycle policies found."
findings.append(report)
return findings

View File

@@ -0,0 +1,39 @@
from pydantic import BaseModel
from prowler.lib.logger import logger
from prowler.providers.aws.lib.service.service import AWSService
################## Data Lifecycle Manager
class DLM(AWSService):
def __init__(self, audit_info):
# Call AWSService's __init__
super().__init__(__class__.__name__, audit_info)
self.lifecycle_policies = {}
self.__threading_call__(self.__get_lifecycle_policies__)
def __get_lifecycle_policies__(self, regional_client):
logger.info("DLM - Getting EBS Snapshots Lifecycle Policies...")
try:
lifecycle_policies = regional_client.get_lifecycle_policies()
policies = {}
for policy in lifecycle_policies["Policies"]:
policy_id = policy.get("PolicyId")
policies[policy_id] = LifecyclePolicy(
id=policy_id,
state=policy.get("State"),
tags=policy.get("Tags"),
type=policy.get("PolicyType"),
)
self.lifecycle_policies[regional_client.region] = policies
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
class LifecyclePolicy(BaseModel):
id: str
state: str
tags: dict
type: str

View File

@@ -0,0 +1,4 @@
from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info
from prowler.providers.aws.services.documentdb.documentdb_service import DocumentDB
documentdb_client = DocumentDB(current_audit_info)

View File

@@ -0,0 +1,32 @@
{
"Provider": "aws",
"CheckID": "documentdb_instance_storage_encrypted",
"CheckTitle": "Check if DocumentDB instances storage is encrypted.",
"CheckType": [
"Data Protection"
],
"ServiceName": "documentdb",
"SubServiceName": "",
"ResourceIdTemplate": "arn:partition:service:region:account-id:resource-id",
"Severity": "medium",
"ResourceType": "AwsDocDbInstance",
"Description": "Check if DocumentDB instances storage is encrypted.",
"Risk": "If not enabled sensitive information at rest is not protected.",
"RelatedUrl": "https://www.trendmicro.com/cloudoneconformity-staging/knowledge-base/aws/DocumentDB/encryption-enabled.html",
"Remediation": {
"Code": {
"CLI": "aws docdb create-db-cluster --db-cluster-identifier <db_cluster_id> --port 27017 --engine docdb --master-username <yourMasterUsername> --master-user-password <yourMasterPassword> --storage-encrypted",
"NativeIaC": "",
"Other": "https://www.trendmicro.com/cloudoneconformity-staging/knowledge-base/aws/DocumentDB/encryption-enabled.html",
"Terraform": "https://docs.bridgecrew.io/docs/bc_aws_general_28#terraform"
},
"Recommendation": {
"Text": "Enable Encryption. Use a CMK where possible. It will provide additional management and privacy benefits.",
"Url": "https://www.trendmicro.com/cloudoneconformity-staging/knowledge-base/aws/DocumentDB/encryption-enabled.html"
}
},
"Categories": [],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}

View File

@@ -0,0 +1,29 @@
from prowler.lib.check.models import Check, Check_Report_AWS
from prowler.providers.aws.services.documentdb.documentdb_client import (
documentdb_client,
)
class documentdb_instance_storage_encrypted(Check):
def execute(self):
findings = []
for db_instance in documentdb_client.db_instances.values():
report = Check_Report_AWS(self.metadata())
report.region = db_instance.region
report.resource_id = db_instance.id
report.resource_arn = db_instance.arn
report.resource_tags = db_instance.tags
if db_instance.encrypted:
report.status = "PASS"
report.status_extended = (
f"DocumentDB Instance {db_instance.id} is encrypted."
)
else:
report.status = "FAIL"
report.status_extended = (
f"DocumentDB Instance {db_instance.id} is not encrypted."
)
findings.append(report)
return findings

View File

@@ -0,0 +1,85 @@
from typing import Optional
from pydantic import BaseModel
from prowler.lib.logger import logger
from prowler.providers.aws.lib.service.service import AWSService
################## DocumentDB
class DocumentDB(AWSService):
def __init__(self, audit_info):
# Call AWSService's __init__
self.service_name = "docdb"
super().__init__(self.service_name, audit_info)
self.db_instances = {}
self.__threading_call__(self.__describe_db_instances__)
self.__list_tags_for_resource__()
def __describe_db_instances__(self, regional_client):
logger.info("DocumentDB - Describe Instances...")
try:
describe_db_instances_paginator = regional_client.get_paginator(
"describe_db_instances"
)
for page in describe_db_instances_paginator.paginate(
Filters=[
{
"Name": "engine",
"Values": [
self.service_name,
],
},
],
):
for instance in page["DBInstances"]:
instance_arn = instance["DBInstanceArn"]
self.db_instances[instance_arn] = Instance(
id=instance["DBInstanceIdentifier"],
arn=instance["DBInstanceArn"],
engine=instance["Engine"],
engine_version=instance["EngineVersion"],
status=instance["DBInstanceStatus"],
public=instance["PubliclyAccessible"],
encrypted=instance["StorageEncrypted"],
cluster_id=instance.get("DBClusterIdentifier"),
region=regional_client.region,
tags=instance.get("TagList", []),
)
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def __list_tags_for_resource__(self):
logger.info("DocumentDB - List Tags...")
try:
for instance_arn, instance in self.db_instances.items():
try:
regional_client = self.regional_clients[instance.region]
response = regional_client.list_tags_for_resource(
ResourceName=instance_arn
)["TagList"]
instance.tags = response
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
class Instance(BaseModel):
id: str
arn: str
engine: str
engine_version: str
status: str
public: bool
encrypted: bool
cluster_id: Optional[str]
region: str
tags: Optional[list]

View File

@@ -0,0 +1,34 @@
{
"Provider": "aws",
"CheckID": "ec2_ebs_volume_snapshots_exists",
"CheckTitle": "Check if EBS snapshots exists.",
"CheckType": [
"Data Protection"
],
"ServiceName": "ec2",
"SubServiceName": "snapshot",
"ResourceIdTemplate": "arn:partition:service:region:account-id:resource-id",
"Severity": "medium",
"ResourceType": "AwsEc2Snapshot",
"Description": "Check if EBS snapshots exists.",
"Risk": "Ensure that your EBS volumes (available or in-use) have recent snapshots (taken weekly) available for point-in-time recovery for a better, more reliable data backup strategy.",
"RelatedUrl": "https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/EBSSnapshots.html",
"Remediation": {
"Code": {
"CLI": "aws ec2 --region <REGION> create-snapshot --volume-id <VOLUME-ID>",
"NativeIaC": "",
"Other": "https://www.trendmicro.com/cloudoneconformity-staging/knowledge-base/aws/EBS/ebs-volumes-recent-snapshots.html",
"Terraform": ""
},
"Recommendation": {
"Text": "Creating point-in-time EBS snapshots periodically will allow you to handle efficiently your data recovery process in the event of a failure, to save your data before shutting down an EC2 instance, to back up data for geographical expansion and to maintain your disaster recovery stack up to date.",
"Url": "https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/EBSSnapshots.html"
}
},
"Categories": [
"forensics-ready"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}

View File

@@ -0,0 +1,24 @@
from prowler.lib.check.models import Check, Check_Report_AWS
from prowler.providers.aws.services.ec2.ec2_client import ec2_client
class ec2_ebs_volume_snapshots_exists(Check):
def execute(self):
findings = []
for volume in ec2_client.volumes:
report = Check_Report_AWS(self.metadata())
report.status = "FAIL"
report.region = volume.region
report.resource_id = volume.id
report.resource_arn = volume.arn
report.resource_tags = volume.tags
report.status_extended = (
f"Snapshots not found for the EBS volume {volume.id}."
)
if ec2_client.volumes_with_snapshots.get(volume.id, False):
report.status = "PASS"
report.status_extended = (
f"Snapshots found for the EBS volume {volume.id}."
)
findings.append(report)
return findings

View File

@@ -23,6 +23,8 @@ class EC2(AWSService):
self.network_acls = []
self.__threading_call__(self.__describe_network_acls__)
self.snapshots = []
self.volumes_with_snapshots = {}
self.regions_with_snapshots = {}
self.__threading_call__(self.__describe_snapshots__)
self.__get_snapshot_public__()
self.network_interfaces = []
@@ -172,6 +174,7 @@ class EC2(AWSService):
def __describe_snapshots__(self, regional_client):
logger.info("EC2 - Describing Snapshots...")
try:
snapshots_in_region = False
describe_snapshots_paginator = regional_client.get_paginator(
"describe_snapshots"
)
@@ -181,6 +184,8 @@ class EC2(AWSService):
if not self.audit_resources or (
is_resource_filtered(arn, self.audit_resources)
):
if snapshots_in_region is False:
snapshots_in_region = True
self.snapshots.append(
Snapshot(
id=snapshot["SnapshotId"],
@@ -188,8 +193,13 @@ class EC2(AWSService):
region=regional_client.region,
encrypted=snapshot.get("Encrypted", False),
tags=snapshot.get("Tags"),
volume=snapshot["VolumeId"],
)
)
# Store that the volume has at least one snapshot
self.volumes_with_snapshots[snapshot["VolumeId"]] = True
# Store that the region has at least one snapshot
self.regions_with_snapshots[regional_client.region] = snapshots_in_region
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
@@ -422,6 +432,7 @@ class Snapshot(BaseModel):
encrypted: bool
public: bool = False
tags: Optional[list] = []
volume: Optional[str]
class Volume(BaseModel):

View File

@@ -7,7 +7,7 @@ from prowler.providers.aws.services.trustedadvisor.trustedadvisor_client import
class trustedadvisor_errors_and_warnings(Check):
def execute(self):
findings = []
if trustedadvisor_client.enabled:
if trustedadvisor_client.premium_support.enabled:
if trustedadvisor_client.checks:
for check in trustedadvisor_client.checks:
if (

View File

@@ -0,0 +1,30 @@
{
"Provider": "aws",
"CheckID": "trustedadvisor_premium_support_plan_subscribed",
"CheckTitle": "Check if a Premium support plan is subscribed",
"CheckType": [],
"ServiceName": "support",
"SubServiceName": "",
"ResourceIdTemplate": "arn:aws:iam::AWS_ACCOUNT_NUMBER:root",
"Severity": "low",
"ResourceType": "",
"Description": "Check if a Premium support plan is subscribed.",
"Risk": "Ensure that the appropriate support level is enabled for the necessary AWS accounts. For example, if an AWS account is being used to host production systems and environments, it is highly recommended that the minimum AWS Support Plan should be Business.",
"RelatedUrl": "https://aws.amazon.com/premiumsupport/plans/",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "https://www.trendmicro.com/cloudoneconformity-staging/knowledge-base/aws/Support/support-plan.html",
"Terraform": ""
},
"Recommendation": {
"Text": "It is recommended that you subscribe to the AWS Business Support tier or higher for all of your AWS production accounts. If you don't have premium support, you must have an action plan to handle issues which require help from AWS Support. AWS Support provides a mix of tools and technology, people, and programs designed to proactively help you optimize performance, lower costs, and innovate faster.",
"Url": "https://www.trendmicro.com/cloudoneconformity-staging/knowledge-base/aws/Support/support-plan.html"
}
},
"Categories": [],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}

View File

@@ -0,0 +1,25 @@
from prowler.lib.check.models import Check, Check_Report_AWS
from prowler.providers.aws.services.trustedadvisor.trustedadvisor_client import (
trustedadvisor_client,
)
class trustedadvisor_premium_support_plan_subscribed(Check):
def execute(self):
findings = []
if trustedadvisor_client.audit_config.get("verify_premium_support_plans", True):
report = Check_Report_AWS(self.metadata())
report.status = "FAIL"
report.status_extended = (
"Amazon Web Services Premium Support Plan isn't subscribed."
)
report.region = trustedadvisor_client.region
report.resource_id = trustedadvisor_client.audited_account
report.resource_arn = trustedadvisor_client.audited_account_arn
if trustedadvisor_client.premium_support.enabled:
report.status = "PASS"
report.status_extended = (
"Amazon Web Services Premium Support Plan is subscribed."
)
findings.append(report)
return findings

View File

@@ -13,7 +13,7 @@ class TrustedAdvisor(AWSService):
# Call AWSService's __init__
super().__init__("support", audit_info)
self.checks = []
self.enabled = True
self.premium_support = PremiumSupport(enabled=False)
# Support API is not available in China Partition
# But only in us-east-1 or us-gov-west-1 https://docs.aws.amazon.com/general/latest/gr/awssupport.html
if audit_info.audited_partition != "aws-cn":
@@ -26,8 +26,10 @@ class TrustedAdvisor(AWSService):
self.service, region_name=support_region
)
self.client.region = support_region
self.__describe_trusted_advisor_checks__()
self.__describe_trusted_advisor_check_result__()
self.__describe_services__()
if self.premium_support.enabled:
self.__describe_trusted_advisor_checks__()
self.__describe_trusted_advisor_check_result__()
def __describe_trusted_advisor_checks__(self):
logger.info("TrustedAdvisor - Describing Checks...")
@@ -43,8 +45,14 @@ class TrustedAdvisor(AWSService):
)
)
except ClientError as error:
if error.response["Error"]["Code"] == "SubscriptionRequiredException":
self.enabled = False
if (
error.response["Error"]["Code"] == "SubscriptionRequiredException"
and error.response["Error"]["Message"]
== "Amazon Web Services Premium Support Subscription is required to use this service."
):
logger.warning(
f"{self.client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
else:
logger.error(
f"{self.client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
@@ -78,9 +86,40 @@ class TrustedAdvisor(AWSService):
f"{self.client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def __describe_services__(self):
logger.info("Support - Describing Services...")
try:
self.client.describe_services()
# If the above call succeeds the account has a Business,
# Enterprise On-Ramp, or Enterprise Support plan.
self.premium_support.enabled = True
except ClientError as error:
if (
error.response["Error"]["Code"] == "SubscriptionRequiredException"
and error.response["Error"]["Message"]
== "Amazon Web Services Premium Support Subscription is required to use this service."
):
logger.warning(
f"{self.region} --"
f" {error.__class__.__name__}[{error.__traceback__.tb_lineno}]:"
f" {error}"
)
except Exception as error:
logger.error(
f"{self.region} --"
f" {error.__class__.__name__}[{error.__traceback__.tb_lineno}]:"
f" {error}"
)
class Check(BaseModel):
id: str
name: str
status: Optional[str]
region: str
class PremiumSupport(BaseModel):
enabled: bool

View File

@@ -0,0 +1,103 @@
from unittest import mock
from prowler.providers.aws.services.account.account_service import Contact
AWS_ACCOUNT_NUMBER = "123456789012"
AWS_ACCOUNT_ARN = f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:root"
AWS_REGION = "us-east-1"
class Test_account_maintain_different_contact_details_to_security_billing_and_operations:
def test_contacts_not_configured_or_equal(self):
account_client = mock.MagicMock
account_client.region = AWS_REGION
account_client.audited_account = AWS_ACCOUNT_NUMBER
account_client.audited_account_arn = AWS_ACCOUNT_ARN
# Account Contacts
account_client.contact_base = Contact(type="PRIMARY")
account_client.contacts_billing = Contact(type="BILLING")
account_client.contacts_security = Contact(type="SECURITY")
account_client.contacts_operations = Contact(type="OPERATIONS")
# Account Sets
account_client.number_of_contacts = 4
account_client.contact_phone_numbers = {}
account_client.contact_names = {}
account_client.contact_emails = {}
with mock.patch(
"prowler.providers.aws.services.account.account_service.Account",
new=account_client,
), mock.patch(
"prowler.providers.aws.services.account.account_client.account_client",
new=account_client,
):
# Test Check
from prowler.providers.aws.services.account.account_maintain_different_contact_details_to_security_billing_and_operations.account_maintain_different_contact_details_to_security_billing_and_operations import (
account_maintain_different_contact_details_to_security_billing_and_operations,
)
check = (
account_maintain_different_contact_details_to_security_billing_and_operations()
)
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== "SECURITY, BILLING and OPERATIONS contacts not found or they are not different between each other and between ROOT contact."
)
assert result[0].region == AWS_REGION
assert result[0].resource_id == AWS_ACCOUNT_NUMBER
assert result[0].resource_arn == AWS_ACCOUNT_ARN
def test_contacts_diffent(self):
account_client = mock.MagicMock
account_client.region = AWS_REGION
account_client.audited_account = AWS_ACCOUNT_NUMBER
account_client.audited_account_arn = AWS_ACCOUNT_ARN
# Account Contacts
account_client.contact_base = Contact(type="PRIMARY")
account_client.contacts_billing = Contact(type="BILLING")
account_client.contacts_security = Contact(type="SECURITY")
account_client.contacts_operations = Contact(type="OPERATIONS")
# Account Sets
account_client.number_of_contacts = 4
account_client.contact_phone_numbers = {"666", "777", "888", "999"}
account_client.contact_names = {"A", "B", "C", "D"}
account_client.contact_emails = {
"test1@test.com",
"test2@test.com",
"test3@test.com",
}
with mock.patch(
"prowler.providers.aws.services.account.account_service.Account",
new=account_client,
), mock.patch(
"prowler.providers.aws.services.account.account_client.account_client",
new=account_client,
):
# Test Check
from prowler.providers.aws.services.account.account_maintain_different_contact_details_to_security_billing_and_operations.account_maintain_different_contact_details_to_security_billing_and_operations import (
account_maintain_different_contact_details_to_security_billing_and_operations,
)
check = (
account_maintain_different_contact_details_to_security_billing_and_operations()
)
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "SECURITY, BILLING and OPERATIONS contacts found and they are different between each other and between ROOT contact."
)
assert result[0].region == AWS_REGION
assert result[0].resource_id == AWS_ACCOUNT_NUMBER
assert result[0].resource_arn == AWS_ACCOUNT_ARN

View File

@@ -0,0 +1,142 @@
import botocore
from boto3 import session
from mock import patch
from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info
from prowler.providers.aws.services.account.account_service import Account, Contact
from prowler.providers.common.models import Audit_Metadata
AWS_ACCOUNT_NUMBER = "123456789012"
AWS_ACCOUNT_ARN = f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:root"
AWS_REGION = "us-east-1"
# Mocking Access Analyzer Calls
make_api_call = botocore.client.BaseClient._make_api_call
def mock_make_api_call(self, operation_name, kwargs):
"""
As you can see the operation_name has the list_analyzers snake_case form but
we are using the ListAnalyzers form.
Rationale -> https://github.com/boto/botocore/blob/develop/botocore/client.py#L810:L816
We have to mock every AWS API call using Boto3
"""
if operation_name == "GetContactInformation":
return {
"ContactInformation": {
"AddressLine1": "AddressLine1",
"AddressLine2": "AddressLine2",
"AddressLine3": "AddressLine3",
"City": "City",
"CompanyName": "Prowler",
"CountryCode": "CountryCode",
"DistrictOrCounty": "DistrictOrCounty",
"FullName": "Prowler",
"PhoneNumber": "666666666",
"PostalCode": "PostalCode",
"StateOrRegion": "StateOrRegion",
"WebsiteUrl": "WebsiteUrl",
}
}
if operation_name == "GetAlternateContact":
return {
"AlternateContact": {
"AlternateContactType": "SECURITY",
"EmailAddress": "test@test.com",
"Name": "Prowler",
"PhoneNumber": "666666666",
"Title": "Title",
}
}
return make_api_call(self, operation_name, kwargs)
# Patch every AWS call using Boto3
@patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call)
class Test_Account_Service:
# Mocked Audit Info
def set_mocked_audit_info(self):
audit_info = AWS_Audit_Info(
session_config=None,
original_session=None,
audit_session=session.Session(
profile_name=None,
botocore_session=None,
),
audited_account=AWS_ACCOUNT_NUMBER,
audited_account_arn=AWS_ACCOUNT_ARN,
audited_user_id=None,
audited_partition="aws",
audited_identity_arn=None,
profile=None,
profile_region=None,
credentials=None,
assumed_role_info=None,
audited_regions=None,
organizations_metadata=None,
audit_resources=None,
mfa_enabled=False,
audit_metadata=Audit_Metadata(
services_scanned=0,
expected_checks=[],
completed_checks=0,
audit_progress=0,
),
)
return audit_info
# Test Account Service
def test_service(self):
audit_info = self.set_mocked_audit_info()
account = Account(audit_info)
assert account.service == "account"
# Test Account Client
def test_client(self):
audit_info = self.set_mocked_audit_info()
account = Account(audit_info)
assert account.client.__class__.__name__ == "Account"
# Test Account Session
def test__get_session__(self):
audit_info = self.set_mocked_audit_info()
account = Account(audit_info)
assert account.session.__class__.__name__ == "Session"
# Test Account Session
def test_audited_account(self):
audit_info = self.set_mocked_audit_info()
account = Account(audit_info)
assert account.audited_account == AWS_ACCOUNT_NUMBER
# Test Account Get Account Contacts
def test_get_account_contacts(self):
# Account client for this test class
audit_info = self.set_mocked_audit_info()
account = Account(audit_info)
assert account.number_of_contacts == 4
assert account.contact_base == Contact(
type="PRIMARY",
name="Prowler",
phone_number="666666666",
)
assert account.contacts_billing == Contact(
type="BILLING",
email="test@test.com",
name="Prowler",
phone_number="666666666",
)
assert account.contacts_security == Contact(
type="SECURITY",
email="test@test.com",
name="Prowler",
phone_number="666666666",
)
assert account.contacts_operations == Contact(
type="OPERATIONS",
email="test@test.com",
name="Prowler",
phone_number="666666666",
)

View File

@@ -0,0 +1,293 @@
from unittest import mock
from boto3 import client, session
from moto import mock_cloudtrail, mock_s3
from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info
from prowler.providers.common.models import Audit_Metadata
AWS_ACCOUNT_NUMBER = "123456789012"
AWS_ACCOUNT_ARN = f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:root"
AWS_REGION = "us-east-1"
class Test_cloudtrail_multi_region_enabled_logging_management_events:
def set_mocked_audit_info(self):
audit_info = AWS_Audit_Info(
session_config=None,
original_session=None,
audit_session=session.Session(
profile_name=None,
botocore_session=None,
),
audited_account=AWS_ACCOUNT_NUMBER,
audited_account_arn=AWS_ACCOUNT_ARN,
audited_user_id=None,
audited_partition="aws",
audited_identity_arn=None,
profile=None,
profile_region=None,
credentials=None,
assumed_role_info=None,
audited_regions=[AWS_REGION],
organizations_metadata=None,
audit_resources=None,
mfa_enabled=False,
audit_metadata=Audit_Metadata(
services_scanned=0,
expected_checks=[],
completed_checks=0,
audit_progress=0,
),
)
return audit_info
@mock_cloudtrail
def test_no_trails(self):
from prowler.providers.aws.services.cloudtrail.cloudtrail_service import (
Cloudtrail,
)
current_audit_info = self.set_mocked_audit_info()
with mock.patch(
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
new=current_audit_info,
):
with mock.patch(
"prowler.providers.aws.services.cloudtrail.cloudtrail_multi_region_enabled_logging_management_events.cloudtrail_multi_region_enabled_logging_management_events.cloudtrail_client",
new=Cloudtrail(current_audit_info),
):
# Test Check
from prowler.providers.aws.services.cloudtrail.cloudtrail_multi_region_enabled_logging_management_events.cloudtrail_multi_region_enabled_logging_management_events import (
cloudtrail_multi_region_enabled_logging_management_events,
)
check = cloudtrail_multi_region_enabled_logging_management_events()
result = check.execute()
assert len(result) == 1
assert result[0].resource_id == AWS_ACCOUNT_NUMBER
assert result[0].resource_arn == AWS_ACCOUNT_ARN
assert result[0].region == AWS_REGION
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== "No trail found with multi-region enabled and logging management events."
)
@mock_cloudtrail
@mock_s3
def test_compliant_trail_advanced_event_selector(self):
cloudtrail_client_us_east_1 = client("cloudtrail", region_name=AWS_REGION)
s3_client_us_east_1 = client("s3", region_name=AWS_REGION)
trail_name_us = "trail_test_us"
bucket_name_us = "bucket_test_us"
s3_client_us_east_1.create_bucket(Bucket=bucket_name_us)
trail_us = cloudtrail_client_us_east_1.create_trail(
Name=trail_name_us, S3BucketName=bucket_name_us, IsMultiRegionTrail=True
)
_ = cloudtrail_client_us_east_1.start_logging(Name=trail_name_us)
_ = cloudtrail_client_us_east_1.put_event_selectors(
TrailName=trail_name_us,
AdvancedEventSelectors=[
{
"Name": "Management events selector",
"FieldSelectors": [
{"Field": "eventCategory", "Equals": ["Management"]}
],
}
],
)
from prowler.providers.aws.services.cloudtrail.cloudtrail_service import (
Cloudtrail,
)
current_audit_info = self.set_mocked_audit_info()
with mock.patch(
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
new=current_audit_info,
):
with mock.patch(
"prowler.providers.aws.services.cloudtrail.cloudtrail_multi_region_enabled_logging_management_events.cloudtrail_multi_region_enabled_logging_management_events.cloudtrail_client",
new=Cloudtrail(current_audit_info),
):
# Test Check
from prowler.providers.aws.services.cloudtrail.cloudtrail_multi_region_enabled_logging_management_events.cloudtrail_multi_region_enabled_logging_management_events import (
cloudtrail_multi_region_enabled_logging_management_events,
)
check = cloudtrail_multi_region_enabled_logging_management_events()
result = check.execute()
assert len(result) == 1
assert result[0].resource_id == trail_name_us
assert result[0].resource_arn == trail_us["TrailARN"]
assert result[0].region == AWS_REGION
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"Trail {trail_name_us} from home region {AWS_REGION} is multi-region, is logging and have management events enabled."
)
@mock_cloudtrail
@mock_s3
def test_non_compliant_trail_advanced_event_selector(self):
cloudtrail_client_us_east_1 = client("cloudtrail", region_name=AWS_REGION)
s3_client_us_east_1 = client("s3", region_name=AWS_REGION)
trail_name_us = "trail_test_us"
bucket_name_us = "bucket_test_us"
s3_client_us_east_1.create_bucket(Bucket=bucket_name_us)
_ = cloudtrail_client_us_east_1.create_trail(
Name=trail_name_us, S3BucketName=bucket_name_us, IsMultiRegionTrail=True
)
_ = cloudtrail_client_us_east_1.start_logging(Name=trail_name_us)
_ = cloudtrail_client_us_east_1.put_event_selectors(
TrailName=trail_name_us,
AdvancedEventSelectors=[
{
"Name": "Management events selector",
"FieldSelectors": [
{"Field": "eventCategory", "Equals": ["Managment"]},
{"Field": "readOnly", "Equals": ["true"]},
],
}
],
)
from prowler.providers.aws.services.cloudtrail.cloudtrail_service import (
Cloudtrail,
)
current_audit_info = self.set_mocked_audit_info()
with mock.patch(
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
new=current_audit_info,
):
with mock.patch(
"prowler.providers.aws.services.cloudtrail.cloudtrail_multi_region_enabled_logging_management_events.cloudtrail_multi_region_enabled_logging_management_events.cloudtrail_client",
new=Cloudtrail(current_audit_info),
):
# Test Check
from prowler.providers.aws.services.cloudtrail.cloudtrail_multi_region_enabled_logging_management_events.cloudtrail_multi_region_enabled_logging_management_events import (
cloudtrail_multi_region_enabled_logging_management_events,
)
check = cloudtrail_multi_region_enabled_logging_management_events()
result = check.execute()
assert result[0].resource_id == AWS_ACCOUNT_NUMBER
assert result[0].resource_arn == AWS_ACCOUNT_ARN
assert result[0].region == AWS_REGION
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== "No trail found with multi-region enabled and logging management events."
)
@mock_cloudtrail
@mock_s3
def test_compliant_trail_classic_event_selector(self):
cloudtrail_client_us_east_1 = client("cloudtrail", region_name=AWS_REGION)
s3_client_us_east_1 = client("s3", region_name=AWS_REGION)
trail_name_us = "trail_test_us"
bucket_name_us = "bucket_test_us"
s3_client_us_east_1.create_bucket(Bucket=bucket_name_us)
trail_us = cloudtrail_client_us_east_1.create_trail(
Name=trail_name_us, S3BucketName=bucket_name_us, IsMultiRegionTrail=True
)
_ = cloudtrail_client_us_east_1.start_logging(Name=trail_name_us)
_ = cloudtrail_client_us_east_1.put_event_selectors(
TrailName=trail_name_us,
EventSelectors=[
{
"ReadWriteType": "All",
"IncludeManagementEvents": True,
"DataResources": [],
}
],
)
from prowler.providers.aws.services.cloudtrail.cloudtrail_service import (
Cloudtrail,
)
current_audit_info = self.set_mocked_audit_info()
with mock.patch(
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
new=current_audit_info,
):
with mock.patch(
"prowler.providers.aws.services.cloudtrail.cloudtrail_multi_region_enabled_logging_management_events.cloudtrail_multi_region_enabled_logging_management_events.cloudtrail_client",
new=Cloudtrail(current_audit_info),
):
# Test Check
from prowler.providers.aws.services.cloudtrail.cloudtrail_multi_region_enabled_logging_management_events.cloudtrail_multi_region_enabled_logging_management_events import (
cloudtrail_multi_region_enabled_logging_management_events,
)
check = cloudtrail_multi_region_enabled_logging_management_events()
result = check.execute()
assert len(result) == 1
assert result[0].resource_id == trail_name_us
assert result[0].resource_arn == trail_us["TrailARN"]
assert result[0].region == AWS_REGION
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"Trail {trail_name_us} from home region {AWS_REGION} is multi-region, is logging and have management events enabled."
)
@mock_cloudtrail
@mock_s3
def test_non_compliant_trail_classic_event_selector(self):
cloudtrail_client_us_east_1 = client("cloudtrail", region_name=AWS_REGION)
s3_client_us_east_1 = client("s3", region_name=AWS_REGION)
trail_name_us = "trail_test_us"
bucket_name_us = "bucket_test_us"
s3_client_us_east_1.create_bucket(Bucket=bucket_name_us)
_ = cloudtrail_client_us_east_1.create_trail(
Name=trail_name_us, S3BucketName=bucket_name_us, IsMultiRegionTrail=True
)
_ = cloudtrail_client_us_east_1.start_logging(Name=trail_name_us)
_ = cloudtrail_client_us_east_1.put_event_selectors(
TrailName=trail_name_us,
EventSelectors=[
{
"ReadWriteType": "ReadOnly",
"IncludeManagementEvents": False,
"DataResources": [],
}
],
)
from prowler.providers.aws.services.cloudtrail.cloudtrail_service import (
Cloudtrail,
)
current_audit_info = self.set_mocked_audit_info()
with mock.patch(
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
new=current_audit_info,
):
with mock.patch(
"prowler.providers.aws.services.cloudtrail.cloudtrail_multi_region_enabled_logging_management_events.cloudtrail_multi_region_enabled_logging_management_events.cloudtrail_client",
new=Cloudtrail(current_audit_info),
):
# Test Check
from prowler.providers.aws.services.cloudtrail.cloudtrail_multi_region_enabled_logging_management_events.cloudtrail_multi_region_enabled_logging_management_events import (
cloudtrail_multi_region_enabled_logging_management_events,
)
check = cloudtrail_multi_region_enabled_logging_management_events()
result = check.execute()
assert result[0].resource_id == AWS_ACCOUNT_NUMBER
assert result[0].resource_arn == AWS_ACCOUNT_ARN
assert result[0].region == AWS_REGION
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== "No trail found with multi-region enabled and logging management events."
)

View File

@@ -0,0 +1,241 @@
from unittest import mock
from boto3 import client, resource, session
from moto import mock_ec2
from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info
from prowler.providers.aws.services.dlm.dlm_service import LifecyclePolicy
from prowler.providers.common.models import Audit_Metadata
AWS_ACCOUNT_NUMBER = "123456789012"
AWS_ACCOUNT_ARN = f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:root"
AWS_REGION = "us-east-1"
LIFECYCLE_POLICY_ID = "policy-XXXXXXXXXXXX"
class Test_dlm_ebs_snapshot_lifecycle_policy_exists:
def set_mocked_audit_info(self):
return AWS_Audit_Info(
session_config=None,
original_session=None,
audit_session=session.Session(
profile_name=None,
botocore_session=None,
),
audit_config=None,
audited_account=AWS_ACCOUNT_NUMBER,
audited_account_arn=f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:root",
audited_user_id=None,
audited_partition="aws",
audited_identity_arn=None,
profile=None,
profile_region=None,
credentials=None,
assumed_role_info=None,
audited_regions=[AWS_REGION],
organizations_metadata=None,
audit_resources=None,
mfa_enabled=False,
audit_metadata=Audit_Metadata(
services_scanned=0,
expected_checks=[],
completed_checks=0,
audit_progress=0,
),
)
@mock_ec2
def test_no_ebs_snapshot_no_lifecycle_policies(self):
# DLM Mock Client
dlm_client = mock.MagicMock
dlm_client.audited_account = AWS_ACCOUNT_NUMBER
dlm_client.audited_account_arn = AWS_ACCOUNT_ARN
dlm_client.lifecycle_policies = {}
audit_info = self.set_mocked_audit_info()
from prowler.providers.aws.services.ec2.ec2_service import EC2
with mock.patch(
"prowler.providers.aws.services.dlm.dlm_service.DLM",
new=dlm_client,
), mock.patch(
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
new=audit_info,
), mock.patch(
"prowler.providers.aws.services.ec2.ec2_service.EC2",
return_value=EC2(audit_info),
) as ec2_client, mock.patch(
"prowler.providers.aws.services.ec2.ec2_client.ec2_client",
new=ec2_client,
):
from prowler.providers.aws.services.dlm.dlm_ebs_snapshot_lifecycle_policy_exists.dlm_ebs_snapshot_lifecycle_policy_exists import (
dlm_ebs_snapshot_lifecycle_policy_exists,
)
check = dlm_ebs_snapshot_lifecycle_policy_exists()
result = check.execute()
assert len(result) == 0
@mock_ec2
def test_one_ebs_snapshot_and_dlm_lifecycle_policy(self):
# Generate EC2 Client
ec2_client = client("ec2", region_name=AWS_REGION)
ec2_resource = resource("ec2", region_name=AWS_REGION)
# Create EC2 Volume and Snapshot
volume_id = ec2_resource.create_volume(
AvailabilityZone="us-east-1a",
Size=80,
VolumeType="gp2",
).id
_ = ec2_client.create_snapshot(
VolumeId=volume_id,
TagSpecifications=[
{
"ResourceType": "snapshot",
"Tags": [
{"Key": "test", "Value": "test"},
],
},
],
)["SnapshotId"]
# DLM Mock Client
dlm_client = mock.MagicMock
dlm_client.audited_account = AWS_ACCOUNT_NUMBER
dlm_client.audited_account_arn = AWS_ACCOUNT_ARN
dlm_client.lifecycle_policies = {
AWS_REGION: {
LIFECYCLE_POLICY_ID: LifecyclePolicy(
id=LIFECYCLE_POLICY_ID,
state="ENABLED",
tags={},
type="EBS_SNAPSHOT_MANAGEMENT",
)
}
}
audit_info = self.set_mocked_audit_info()
from prowler.providers.aws.services.ec2.ec2_service import EC2
with mock.patch(
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
new=audit_info,
), mock.patch(
"prowler.providers.aws.services.dlm.dlm_ebs_snapshot_lifecycle_policy_exists.dlm_ebs_snapshot_lifecycle_policy_exists.ec2_client",
new=EC2(audit_info),
), mock.patch(
"prowler.providers.aws.services.dlm.dlm_ebs_snapshot_lifecycle_policy_exists.dlm_ebs_snapshot_lifecycle_policy_exists.dlm_client",
new=dlm_client,
):
from prowler.providers.aws.services.dlm.dlm_ebs_snapshot_lifecycle_policy_exists.dlm_ebs_snapshot_lifecycle_policy_exists import (
dlm_ebs_snapshot_lifecycle_policy_exists,
)
check = dlm_ebs_snapshot_lifecycle_policy_exists()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert result[0].status_extended == "EBS snapshot lifecycle policies found."
assert result[0].region == AWS_REGION
assert result[0].resource_id == AWS_ACCOUNT_NUMBER
assert result[0].resource_arn == AWS_ACCOUNT_ARN
@mock_ec2
def test_one_ebs_snapshot_and_no_dlm_lifecycle_policy(self):
# Generate EC2 Client
ec2_client = client("ec2", region_name=AWS_REGION)
ec2_resource = resource("ec2", region_name=AWS_REGION)
# Create EC2 Volume and Snapshot
volume_id = ec2_resource.create_volume(
AvailabilityZone="us-east-1a",
Size=80,
VolumeType="gp2",
).id
_ = ec2_client.create_snapshot(
VolumeId=volume_id,
TagSpecifications=[
{
"ResourceType": "snapshot",
"Tags": [
{"Key": "test", "Value": "test"},
],
},
],
)["SnapshotId"]
# DLM Mock Client
dlm_client = mock.MagicMock
dlm_client.audited_account = AWS_ACCOUNT_NUMBER
dlm_client.audited_account_arn = AWS_ACCOUNT_ARN
dlm_client.lifecycle_policies = {}
# from prowler.providers.aws.services.ec2.ec2_service import EC2
audit_info = self.set_mocked_audit_info()
from prowler.providers.aws.services.ec2.ec2_service import EC2
with mock.patch(
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
new=audit_info,
), mock.patch(
"prowler.providers.aws.services.dlm.dlm_ebs_snapshot_lifecycle_policy_exists.dlm_ebs_snapshot_lifecycle_policy_exists.ec2_client",
new=EC2(audit_info),
), mock.patch(
"prowler.providers.aws.services.dlm.dlm_ebs_snapshot_lifecycle_policy_exists.dlm_ebs_snapshot_lifecycle_policy_exists.dlm_client",
new=dlm_client,
):
from prowler.providers.aws.services.dlm.dlm_ebs_snapshot_lifecycle_policy_exists.dlm_ebs_snapshot_lifecycle_policy_exists import (
dlm_ebs_snapshot_lifecycle_policy_exists,
)
check = dlm_ebs_snapshot_lifecycle_policy_exists()
result = check.execute()
assert len(result) == 0
@mock_ec2
def test_no_ebs_snapshot_and_dlm_lifecycle_policy(self):
# DLM Mock Client
dlm_client = mock.MagicMock
dlm_client.audited_account = AWS_ACCOUNT_NUMBER
dlm_client.audited_account_arn = AWS_ACCOUNT_ARN
dlm_client.lifecycle_policies = {
AWS_REGION: {
LIFECYCLE_POLICY_ID: LifecyclePolicy(
id=LIFECYCLE_POLICY_ID,
state="ENABLED",
tags={},
type="EBS_SNAPSHOT_MANAGEMENT",
)
}
}
# from prowler.providers.aws.services.ec2.ec2_service import EC2
audit_info = self.set_mocked_audit_info()
from prowler.providers.aws.services.ec2.ec2_service import EC2
with mock.patch(
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
new=audit_info,
), mock.patch(
"prowler.providers.aws.services.dlm.dlm_ebs_snapshot_lifecycle_policy_exists.dlm_ebs_snapshot_lifecycle_policy_exists.ec2_client",
new=EC2(audit_info),
) as ec2_client, mock.patch(
"prowler.providers.aws.services.dlm.dlm_ebs_snapshot_lifecycle_policy_exists.dlm_ebs_snapshot_lifecycle_policy_exists.dlm_client",
new=dlm_client,
):
# Remove all snapshots
ec2_client.regions_with_snapshots = {}
from prowler.providers.aws.services.dlm.dlm_ebs_snapshot_lifecycle_policy_exists.dlm_ebs_snapshot_lifecycle_policy_exists import (
dlm_ebs_snapshot_lifecycle_policy_exists,
)
check = dlm_ebs_snapshot_lifecycle_policy_exists()
result = check.execute()
assert len(result) == 0

View File

@@ -0,0 +1,125 @@
import botocore
from boto3 import session
from mock import patch
from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info
from prowler.providers.aws.services.dlm.dlm_service import DLM, LifecyclePolicy
from prowler.providers.common.models import Audit_Metadata
AWS_ACCOUNT_NUMBER = "123456789012"
AWS_ACCOUNT_ARN = f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:root"
AWS_REGION = "us-east-1"
LIFECYCLE_POLICY_ID = "policy-XXXXXXXXXXXX"
# Mocking Access Analyzer Calls
make_api_call = botocore.client.BaseClient._make_api_call
def mock_make_api_call(self, operation_name, kwargs):
"""
As you can see the operation_name has the list_analyzers snake_case form but
we are using the ListAnalyzers form.
Rationale -> https://github.com/boto/botocore/blob/develop/botocore/client.py#L810:L816
We have to mock every AWS API call using Boto3
"""
if operation_name == "GetLifecyclePolicies":
return {
"Policies": [
{
"PolicyId": "policy-XXXXXXXXXXXX",
"Description": "test",
"State": "ENABLED",
"Tags": {"environment": "dev"},
"PolicyType": "EBS_SNAPSHOT_MANAGEMENT",
}
]
}
return make_api_call(self, operation_name, kwargs)
def mock_generate_regional_clients(service, audit_info, _):
regional_client = audit_info.audit_session.client(service, region_name=AWS_REGION)
regional_client.region = AWS_REGION
return {AWS_REGION: regional_client}
@patch(
"prowler.providers.aws.lib.service.service.generate_regional_clients",
new=mock_generate_regional_clients,
)
# Patch every AWS call using Boto3
@patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call)
class Test_DLM_Service:
# Mocked Audit Info
def set_mocked_audit_info(self):
audit_info = AWS_Audit_Info(
session_config=None,
original_session=None,
audit_session=session.Session(
profile_name=None,
botocore_session=None,
),
audited_account=AWS_ACCOUNT_NUMBER,
audited_account_arn=AWS_ACCOUNT_ARN,
audited_user_id=None,
audited_partition="aws",
audited_identity_arn=None,
profile=None,
profile_region=None,
credentials=None,
assumed_role_info=None,
audited_regions=None,
organizations_metadata=None,
audit_resources=None,
mfa_enabled=False,
audit_metadata=Audit_Metadata(
services_scanned=0,
expected_checks=[],
completed_checks=0,
audit_progress=0,
),
)
return audit_info
# Test DLM Service
def test_service(self):
audit_info = self.set_mocked_audit_info()
dlm = DLM(audit_info)
assert dlm.service == "dlm"
# Test DLM Client
def test_client(self):
audit_info = self.set_mocked_audit_info()
dlm = DLM(audit_info)
assert dlm.client.__class__.__name__ == "DLM"
# Test DLM Session
def test__get_session__(self):
audit_info = self.set_mocked_audit_info()
dlm = DLM(audit_info)
assert dlm.session.__class__.__name__ == "Session"
# Test DLM Session
def test_audited_account(self):
audit_info = self.set_mocked_audit_info()
dlm = DLM(audit_info)
assert dlm.audited_account == AWS_ACCOUNT_NUMBER
# Test DLM Get DLM Contacts
def test_get_lifecycle_policies(self):
# DLM client for this test class
audit_info = self.set_mocked_audit_info()
dlm = DLM(audit_info)
assert dlm.lifecycle_policies == {
AWS_REGION: {
LIFECYCLE_POLICY_ID: LifecyclePolicy(
id=LIFECYCLE_POLICY_ID,
state="ENABLED",
tags={"environment": "dev"},
type="EBS_SNAPSHOT_MANAGEMENT",
)
}
}

View File

@@ -0,0 +1,100 @@
from unittest import mock
from prowler.providers.aws.services.documentdb.documentdb_service import Instance
AWS_ACCOUNT_NUMBER = "123456789012"
AWS_REGION = "us-east-1"
DOC_DB_INSTANCE_NAME = "test-db"
DOC_DB_INSTANCE_ARN = (
f"arn:aws:rds:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:db:{DOC_DB_INSTANCE_NAME}"
)
DOC_DB_ENGINE_VERSION = "5.0.0"
class Test_documentdb_instance_storage_encrypted:
def test_documentdb_no_instances(self):
documentdb_client = mock.MagicMock
documentdb_client.db_instances = {}
with mock.patch(
"prowler.providers.aws.services.documentdb.documentdb_service.DocumentDB",
new=documentdb_client,
):
from prowler.providers.aws.services.documentdb.documentdb_instance_storage_encrypted.documentdb_instance_storage_encrypted import (
documentdb_instance_storage_encrypted,
)
check = documentdb_instance_storage_encrypted()
result = check.execute()
assert len(result) == 0
def test_documentdb_instance_not_encrypted(self):
documentdb_client = mock.MagicMock
documentdb_client.db_instances = {
DOC_DB_INSTANCE_ARN: Instance(
id=DOC_DB_INSTANCE_NAME,
arn=DOC_DB_INSTANCE_ARN,
engine="docdb",
engine_version=DOC_DB_ENGINE_VERSION,
status="available",
public=False,
encrypted=False,
auto_minor_version_upgrade=False,
region=AWS_REGION,
)
}
with mock.patch(
"prowler.providers.aws.services.documentdb.documentdb_service.DocumentDB",
new=documentdb_client,
):
from prowler.providers.aws.services.documentdb.documentdb_instance_storage_encrypted.documentdb_instance_storage_encrypted import (
documentdb_instance_storage_encrypted,
)
check = documentdb_instance_storage_encrypted()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"DocumentDB Instance {DOC_DB_INSTANCE_NAME} is not encrypted."
)
assert result[0].region == AWS_REGION
assert result[0].resource_id == DOC_DB_INSTANCE_NAME
assert result[0].resource_arn == DOC_DB_INSTANCE_ARN
def test_documentdb_instance_with_encryption(self):
documentdb_client = mock.MagicMock
documentdb_client.db_instances = {
DOC_DB_INSTANCE_ARN: Instance(
id=DOC_DB_INSTANCE_NAME,
arn=DOC_DB_INSTANCE_ARN,
engine="docdb",
engine_version=DOC_DB_ENGINE_VERSION,
status="available",
public=False,
encrypted=True,
auto_minor_version_upgrade=False,
region=AWS_REGION,
)
}
with mock.patch(
"prowler.providers.aws.services.documentdb.documentdb_service.DocumentDB",
new=documentdb_client,
):
from prowler.providers.aws.services.documentdb.documentdb_instance_storage_encrypted.documentdb_instance_storage_encrypted import (
documentdb_instance_storage_encrypted,
)
check = documentdb_instance_storage_encrypted()
result = check.execute()
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"DocumentDB Instance {DOC_DB_INSTANCE_NAME} is encrypted."
)
assert result[0].region == AWS_REGION
assert result[0].resource_id == DOC_DB_INSTANCE_NAME
assert result[0].resource_arn == DOC_DB_INSTANCE_ARN

View File

@@ -0,0 +1,148 @@
import botocore
from boto3 import session
from mock import patch
from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info
from prowler.providers.aws.services.documentdb.documentdb_service import (
DocumentDB,
Instance,
)
from prowler.providers.common.models import Audit_Metadata
AWS_ACCOUNT_NUMBER = "123456789012"
AWS_ACCOUNT_ARN = f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:root"
AWS_REGION = "us-east-1"
DOC_DB_CLUSTER_ID = "test-cluster"
DOC_DB_INSTANCE_NAME = "test-db"
DOC_DB_INSTANCE_ARN = (
f"arn:aws:rds:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:db:{DOC_DB_INSTANCE_NAME}"
)
DOC_DB_ENGINE_VERSION = "5.0.0"
# Mocking Access Analyzer Calls
make_api_call = botocore.client.BaseClient._make_api_call
def mock_make_api_call(self, operation_name, kwargs):
"""
As you can see the operation_name has the list_analyzers snake_case form but
we are using the ListAnalyzers form.
Rationale -> https://github.com/boto/botocore/blob/develop/botocore/client.py#L810:L816
We have to mock every AWS API call using Boto3
"""
if operation_name == "DescribeDBInstances":
return {
"DBInstances": [
{
"DBInstanceIdentifier": DOC_DB_INSTANCE_NAME,
"DBInstanceClass": "string",
"Engine": "docdb",
"DBInstanceStatus": "available",
"BackupRetentionPeriod": 1,
"EngineVersion": "5.0.0",
"AutoMinorVersionUpgrade": False,
"PubliclyAccessible": False,
"DBClusterIdentifier": DOC_DB_CLUSTER_ID,
"StorageEncrypted": False,
"DbiResourceId": "string",
"CACertificateIdentifier": "string",
"CopyTagsToSnapshot": True | False,
"PromotionTier": 123,
"DBInstanceArn": DOC_DB_INSTANCE_ARN,
},
]
}
if operation_name == "ListTagsForResource":
return {"TagList": [{"Key": "environment", "Value": "test"}]}
return make_api_call(self, operation_name, kwargs)
def mock_generate_regional_clients(service, audit_info, _):
regional_client = audit_info.audit_session.client(service, region_name=AWS_REGION)
regional_client.region = AWS_REGION
return {AWS_REGION: regional_client}
@patch(
"prowler.providers.aws.lib.service.service.generate_regional_clients",
new=mock_generate_regional_clients,
)
# Patch every AWS call using Boto3
@patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call)
class Test_DocumentDB_Service:
# Mocked Audit Info
def set_mocked_audit_info(self):
audit_info = AWS_Audit_Info(
session_config=None,
original_session=None,
audit_session=session.Session(
profile_name=None,
botocore_session=None,
),
audited_account=AWS_ACCOUNT_NUMBER,
audited_account_arn=AWS_ACCOUNT_ARN,
audited_user_id=None,
audited_partition="aws",
audited_identity_arn=None,
profile=None,
profile_region=None,
credentials=None,
assumed_role_info=None,
audited_regions=None,
organizations_metadata=None,
audit_resources=None,
mfa_enabled=False,
audit_metadata=Audit_Metadata(
services_scanned=0,
expected_checks=[],
completed_checks=0,
audit_progress=0,
),
)
return audit_info
# Test DocumentDB Service
def test_service(self):
audit_info = self.set_mocked_audit_info()
docdb = DocumentDB(audit_info)
assert docdb.service == "docdb"
# Test DocumentDB Client
def test_client(self):
audit_info = self.set_mocked_audit_info()
docdb = DocumentDB(audit_info)
assert docdb.client.__class__.__name__ == "DocDB"
# Test DocumentDB Session
def test__get_session__(self):
audit_info = self.set_mocked_audit_info()
docdb = DocumentDB(audit_info)
assert docdb.session.__class__.__name__ == "Session"
# Test DocumentDB Session
def test_audited_account(self):
audit_info = self.set_mocked_audit_info()
docdb = DocumentDB(audit_info)
assert docdb.audited_account == AWS_ACCOUNT_NUMBER
# Test DocumentDB Get DocumentDB Contacts
def test_describe_db_instances(self):
audit_info = self.set_mocked_audit_info()
docdb = DocumentDB(audit_info)
assert docdb.db_instances == {
DOC_DB_INSTANCE_ARN: Instance(
id=DOC_DB_INSTANCE_NAME,
arn=DOC_DB_INSTANCE_ARN,
engine="docdb",
engine_version="5.0.0",
status="available",
public=False,
encrypted=False,
cluster_id=DOC_DB_CLUSTER_ID,
region=AWS_REGION,
tags=[{"Key": "environment", "Value": "test"}],
)
}

View File

@@ -0,0 +1,210 @@
from unittest import mock
from boto3 import resource, session
from mock import patch
from moto import mock_ec2
from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info
from prowler.providers.common.models import Audit_Metadata
AWS_REGION = "us-east-1"
AWS_REGION_AZ = "us-east-1a"
AWS_ACCOUNT_NUMBER = "123456789012"
AWS_ACCOUNT_ARN = f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:root"
def mock_generate_regional_clients(service, audit_info, _):
regional_client = audit_info.audit_session.client(service, region_name=AWS_REGION)
regional_client.region = AWS_REGION
return {AWS_REGION: regional_client}
@patch(
"prowler.providers.aws.lib.service.service.generate_regional_clients",
new=mock_generate_regional_clients,
)
class Test_ec2_ebs_volume_snapshots_exists:
def set_mocked_audit_info(self):
audit_info = AWS_Audit_Info(
session_config=None,
original_session=None,
audit_session=session.Session(
profile_name=None,
botocore_session=None,
),
audited_account=AWS_ACCOUNT_NUMBER,
audited_account_arn=AWS_ACCOUNT_ARN,
audited_user_id=None,
audited_partition="aws",
audited_identity_arn=None,
profile=None,
profile_region=None,
credentials=None,
assumed_role_info=None,
audited_regions=[AWS_REGION],
organizations_metadata=None,
audit_resources=None,
mfa_enabled=False,
audit_metadata=Audit_Metadata(
services_scanned=0,
expected_checks=[],
completed_checks=0,
audit_progress=0,
),
)
return audit_info
@mock_ec2
def test_no_volumes(self):
from prowler.providers.aws.services.ec2.ec2_service import EC2
current_audit_info = self.set_mocked_audit_info()
with mock.patch(
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
new=current_audit_info,
), mock.patch(
"prowler.providers.aws.services.ec2.ec2_ebs_volume_snapshots_exists.ec2_ebs_volume_snapshots_exists.ec2_client",
new=EC2(current_audit_info),
):
# Test Check
from prowler.providers.aws.services.ec2.ec2_ebs_volume_snapshots_exists.ec2_ebs_volume_snapshots_exists import (
ec2_ebs_volume_snapshots_exists,
)
check = ec2_ebs_volume_snapshots_exists()
result = check.execute()
assert len(result) == 0
@mock_ec2
def test_ec2_volume_without_snapshots(self):
ec2 = resource("ec2", region_name=AWS_REGION)
volume = ec2.create_volume(Size=80, AvailabilityZone=AWS_REGION_AZ)
volume_arn = f"arn:aws:ec2:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:volume/{volume.id}"
from prowler.providers.aws.services.ec2.ec2_service import EC2
current_audit_info = self.set_mocked_audit_info()
with mock.patch(
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
new=current_audit_info,
), mock.patch(
"prowler.providers.aws.services.ec2.ec2_ebs_volume_snapshots_exists.ec2_ebs_volume_snapshots_exists.ec2_client",
new=EC2(current_audit_info),
):
# Test Check
from prowler.providers.aws.services.ec2.ec2_ebs_volume_snapshots_exists.ec2_ebs_volume_snapshots_exists import (
ec2_ebs_volume_snapshots_exists,
)
check = ec2_ebs_volume_snapshots_exists()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Snapshots not found for the EBS volume {volume.id}."
)
assert result[0].resource_id == volume.id
assert result[0].resource_arn == volume_arn
assert result[0].resource_tags is None
assert result[0].region == AWS_REGION
@mock_ec2
def test_ec2_volume_with_snapshot(self):
# Create EC2 Mocked Resources
ec2 = resource("ec2", region_name=AWS_REGION)
volume = ec2.create_volume(Size=80, AvailabilityZone=AWS_REGION_AZ)
volume_arn = f"arn:aws:ec2:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:volume/{volume.id}"
_ = volume.create_snapshot(Description="testsnap")
from prowler.providers.aws.services.ec2.ec2_service import EC2
current_audit_info = self.set_mocked_audit_info()
with mock.patch(
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
new=current_audit_info,
), mock.patch(
"prowler.providers.aws.services.ec2.ec2_ebs_volume_snapshots_exists.ec2_ebs_volume_snapshots_exists.ec2_client",
new=EC2(current_audit_info),
):
# Test Check
from prowler.providers.aws.services.ec2.ec2_ebs_volume_snapshots_exists.ec2_ebs_volume_snapshots_exists import (
ec2_ebs_volume_snapshots_exists,
)
check = ec2_ebs_volume_snapshots_exists()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"Snapshots found for the EBS volume {result[0].resource_id}."
)
assert result[0].resource_id == volume.id
assert result[0].resource_arn == volume_arn
assert result[0].resource_tags is None
assert result[0].region == AWS_REGION
@mock_ec2
def test_ec2_volume_with_and_without_snapshot(self):
# Create EC2 Mocked Resources
ec2 = resource("ec2", region_name=AWS_REGION)
volume1 = ec2.create_volume(Size=80, AvailabilityZone=AWS_REGION_AZ)
volume1_arn = (
f"arn:aws:ec2:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:volume/{volume1.id}"
)
_ = volume1.create_snapshot(Description="test-snap")
volume2 = ec2.create_volume(Size=80, AvailabilityZone=AWS_REGION_AZ)
volume2_arn = (
f"arn:aws:ec2:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:volume/{volume2.id}"
)
from prowler.providers.aws.services.ec2.ec2_service import EC2
current_audit_info = self.set_mocked_audit_info()
with mock.patch(
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
new=current_audit_info,
), mock.patch(
"prowler.providers.aws.services.ec2.ec2_ebs_volume_snapshots_exists.ec2_ebs_volume_snapshots_exists.ec2_client",
new=EC2(current_audit_info),
):
# Test Check
from prowler.providers.aws.services.ec2.ec2_ebs_volume_snapshots_exists.ec2_ebs_volume_snapshots_exists import (
ec2_ebs_volume_snapshots_exists,
)
check = ec2_ebs_volume_snapshots_exists()
result = check.execute()
assert len(result) == 2
for res in result:
if res.resource_id == volume1.id:
assert res.status == "PASS"
assert (
res.status_extended
== f"Snapshots found for the EBS volume {res.resource_id}."
)
assert res.resource_id == volume1.id
assert res.resource_arn == volume1_arn
assert res.resource_tags is None
assert res.region == AWS_REGION
if res.resource_id == volume2.id:
assert res.status == "FAIL"
assert (
res.status_extended
== f"Snapshots not found for the EBS volume {res.resource_id}."
)
assert res.resource_id == volume2.id
assert res.resource_arn == volume2_arn
assert res.resource_tags is None
assert res.region == AWS_REGION

View File

@@ -253,18 +253,20 @@ class Test_EC2_Service:
},
],
)["SnapshotId"]
snapshot_arn = (
f"arn:aws:ec2:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:snapshot/{snapshot_id}"
)
# EC2 client for this test class
audit_info = self.set_mocked_audit_info()
ec2 = EC2(audit_info)
assert snapshot_id in str(ec2.snapshots)
assert ec2.volumes_with_snapshots[volume_id] is True
for snapshot in ec2.snapshots:
if snapshot.id == snapshot_id:
assert re.match(r"snap-[0-9a-z]{8}", snapshot.id)
assert (
snapshot.arn
== f"arn:{audit_info.audited_partition}:ec2:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:snapshot/{snapshot.id}"
)
assert snapshot.arn == snapshot_arn
assert snapshot.region == AWS_REGION
assert snapshot.tags == [
{"Key": "test", "Value": "test"},

View File

@@ -1,21 +1,24 @@
from re import search
from unittest import mock
from uuid import uuid4
from prowler.providers.aws.services.trustedadvisor.trustedadvisor_service import Check
from prowler.providers.aws.services.trustedadvisor.trustedadvisor_service import (
Check,
PremiumSupport,
)
AWS_REGION = "eu-west-1"
AWS_ACCOUNT_NUMBER = "123456789012"
AWS_ACCOUNT_ARN = f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:root"
detector_id = str(uuid4())
CHECK_NAME = "test-check"
class Test_trustedadvisor_errors_and_warnings:
def test_no_detectors(self):
def test_no_detectors_premium_support_disabled(self):
trustedadvisor_client = mock.MagicMock
trustedadvisor_client.checks = []
trustedadvisor_client.enabled = False
trustedadvisor_client.account = AWS_ACCOUNT_NUMBER
trustedadvisor_client.premium_support = PremiumSupport(enabled=False)
trustedadvisor_client.audited_account = AWS_ACCOUNT_NUMBER
trustedadvisor_client.audited_account_arn = AWS_ACCOUNT_ARN
trustedadvisor_client.region = AWS_REGION
with mock.patch(
"prowler.providers.aws.services.trustedadvisor.trustedadvisor_service.TrustedAdvisor",
@@ -28,19 +31,25 @@ class Test_trustedadvisor_errors_and_warnings:
check = trustedadvisor_errors_and_warnings()
result = check.execute()
assert len(result) == 1
assert result[0].status == "INFO"
assert (
result[0].status_extended
== "Amazon Web Services Premium Support Subscription is required to use this service."
)
assert result[0].region == AWS_REGION
assert result[0].resource_id == AWS_ACCOUNT_NUMBER
assert result[0].resource_arn == AWS_ACCOUNT_ARN
def test_trustedadvisor_all_passed_checks(self):
trustedadvisor_client = mock.MagicMock
trustedadvisor_client.checks = []
trustedadvisor_client.enabled = True
trustedadvisor_client.premium_support = PremiumSupport(enabled=True)
trustedadvisor_client.audited_account = AWS_ACCOUNT_NUMBER
trustedadvisor_client.audited_account_arn = AWS_ACCOUNT_ARN
trustedadvisor_client.checks.append(
Check(
id="check1",
name="check1",
id=CHECK_NAME,
name=CHECK_NAME,
region=AWS_REGION,
status="ok",
)
@@ -57,17 +66,23 @@ class Test_trustedadvisor_errors_and_warnings:
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert search("ok", result[0].status_extended)
assert result[0].resource_id == "check1"
assert (
result[0].status_extended
== f"Trusted Advisor check {CHECK_NAME} is in state ok."
)
assert result[0].resource_id == CHECK_NAME
assert result[0].region == AWS_REGION
def test_trustedadvisor_error_check(self):
trustedadvisor_client = mock.MagicMock
trustedadvisor_client.checks = []
trustedadvisor_client.enabled = True
trustedadvisor_client.premium_support = PremiumSupport(enabled=True)
trustedadvisor_client.audited_account = AWS_ACCOUNT_NUMBER
trustedadvisor_client.audited_account_arn = AWS_ACCOUNT_ARN
trustedadvisor_client.checks.append(
Check(
id="check1",
name="check1",
id=CHECK_NAME,
name=CHECK_NAME,
region=AWS_REGION,
status="error",
)
@@ -84,17 +99,23 @@ class Test_trustedadvisor_errors_and_warnings:
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert search("error", result[0].status_extended)
assert result[0].resource_id == "check1"
assert (
result[0].status_extended
== f"Trusted Advisor check {CHECK_NAME} is in state error."
)
assert result[0].resource_id == CHECK_NAME
assert result[0].region == AWS_REGION
def test_trustedadvisor_not_available_check(self):
trustedadvisor_client = mock.MagicMock
trustedadvisor_client.checks = []
trustedadvisor_client.enabled = True
trustedadvisor_client.premium_support = PremiumSupport(enabled=True)
trustedadvisor_client.audited_account = AWS_ACCOUNT_NUMBER
trustedadvisor_client.audited_account_arn = AWS_ACCOUNT_ARN
trustedadvisor_client.checks.append(
Check(
id="check1",
name="check1",
id=CHECK_NAME,
name=CHECK_NAME,
region=AWS_REGION,
status="not_available",
)

View File

@@ -0,0 +1,73 @@
from unittest import mock
from prowler.providers.aws.services.trustedadvisor.trustedadvisor_service import (
PremiumSupport,
)
AWS_REGION = "eu-west-1"
AWS_ACCOUNT_NUMBER = "123456789012"
AWS_ACCOUNT_ARN = f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:root"
class Test_trustedadvisor_premium_support_plan_subscribed:
def test_premium_support_not_susbcribed(self):
trustedadvisor_client = mock.MagicMock
trustedadvisor_client.checks = []
trustedadvisor_client.premium_support = PremiumSupport(enabled=False)
trustedadvisor_client.audited_account = AWS_ACCOUNT_NUMBER
trustedadvisor_client.audited_account_arn = AWS_ACCOUNT_ARN
trustedadvisor_client.region = AWS_REGION
# Set verify_premium_support_plans config
trustedadvisor_client.audit_config = {"verify_premium_support_plans": True}
with mock.patch(
"prowler.providers.aws.services.trustedadvisor.trustedadvisor_service.TrustedAdvisor",
trustedadvisor_client,
):
from prowler.providers.aws.services.trustedadvisor.trustedadvisor_premium_support_plan_subscribed.trustedadvisor_premium_support_plan_subscribed import (
trustedadvisor_premium_support_plan_subscribed,
)
check = trustedadvisor_premium_support_plan_subscribed()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== "Amazon Web Services Premium Support Plan isn't subscribed."
)
assert result[0].region == AWS_REGION
assert result[0].resource_id == AWS_ACCOUNT_NUMBER
assert result[0].resource_arn == AWS_ACCOUNT_ARN
def test_premium_support_susbcribed(self):
trustedadvisor_client = mock.MagicMock
trustedadvisor_client.checks = []
trustedadvisor_client.premium_support = PremiumSupport(enabled=True)
trustedadvisor_client.audited_account = AWS_ACCOUNT_NUMBER
trustedadvisor_client.audited_account_arn = AWS_ACCOUNT_ARN
trustedadvisor_client.region = AWS_REGION
# Set verify_premium_support_plans config
trustedadvisor_client.audit_config = {"verify_premium_support_plans": True}
with mock.patch(
"prowler.providers.aws.services.trustedadvisor.trustedadvisor_service.TrustedAdvisor",
trustedadvisor_client,
):
from prowler.providers.aws.services.trustedadvisor.trustedadvisor_premium_support_plan_subscribed.trustedadvisor_premium_support_plan_subscribed import (
trustedadvisor_premium_support_plan_subscribed,
)
check = trustedadvisor_premium_support_plan_subscribed()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "Amazon Web Services Premium Support Plan is subscribed."
)
assert result[0].region == AWS_REGION
assert result[0].resource_id == AWS_ACCOUNT_NUMBER
assert result[0].resource_arn == AWS_ACCOUNT_ARN

View File

@@ -19,6 +19,21 @@ make_api_call = botocore.client.BaseClient._make_api_call
def mock_make_api_call(self, operation_name, kwarg):
if operation_name == "DescribeTrustedAdvisorCheckResult":
return {}
if operation_name == "DescribeServices":
return {
"services": [
{
"code": "amazon-marketplace",
"name": "Marketplace",
"categories": [
{
"code": "general-marketplace-seller-inquiry",
"name": "General Marketplace Seller Inquiry",
},
],
}
]
}
return make_api_call(self, operation_name, kwarg)
@@ -78,5 +93,6 @@ class Test_TrustedAdvisor_Service:
def test__describe_trusted_advisor_checks__(self):
audit_info = self.set_mocked_audit_info()
trustedadvisor = TrustedAdvisor(audit_info)
assert trustedadvisor.premium_support.enabled
assert len(trustedadvisor.checks) == 104 # Default checks
assert trustedadvisor.checks[0].region == AWS_REGION