From f4f4093466f30c8e79504f24f08e817a8f4f8ca2 Mon Sep 17 00:00:00 2001 From: Gabriel Soltz Date: Tue, 11 Apr 2023 07:43:40 +0200 Subject: [PATCH] feat(backup): New backup service and checks (#2172) Co-authored-by: Nacho Rivera --- .../providers/aws/services/backup/__init__.py | 0 .../aws/services/backup/backup_client.py | 4 + .../backup/backup_plans_exist/__init__.py | 0 .../backup_plans_exist.metadata.json | 34 ++++ .../backup_plans_exist/backup_plans_exist.py | 22 +++ .../backup_reportplans_exist/__init__.py | 0 .../backup_reportplans_exist.metadata.json | 34 ++++ .../backup_reportplans_exist.py | 22 +++ .../aws/services/backup/backup_service.py | 169 ++++++++++++++++++ .../backup_vaults_encrypted/__init__.py | 0 .../backup_vaults_encrypted.metadata.json | 35 ++++ .../backup_vaults_encrypted.py | 28 +++ .../backup/backup_vaults_exist/__init__.py | 0 .../backup_vaults_exist.metadata.json | 34 ++++ .../backup_vaults_exist.py | 22 +++ .../backup_plans_exist_test.py | 67 +++++++ .../backup_reportplans_exist_test.py | 66 +++++++ .../services/backup/backup_service_test.py | 157 ++++++++++++++++ .../backup_vaults_encrypted_test.py | 98 ++++++++++ .../backup_vaults_exist_test.py | 68 +++++++ 20 files changed, 860 insertions(+) create mode 100644 prowler/providers/aws/services/backup/__init__.py create mode 100644 prowler/providers/aws/services/backup/backup_client.py create mode 100644 prowler/providers/aws/services/backup/backup_plans_exist/__init__.py create mode 100644 prowler/providers/aws/services/backup/backup_plans_exist/backup_plans_exist.metadata.json create mode 100644 prowler/providers/aws/services/backup/backup_plans_exist/backup_plans_exist.py create mode 100644 prowler/providers/aws/services/backup/backup_reportplans_exist/__init__.py create mode 100644 prowler/providers/aws/services/backup/backup_reportplans_exist/backup_reportplans_exist.metadata.json create mode 100644 prowler/providers/aws/services/backup/backup_reportplans_exist/backup_reportplans_exist.py create mode 100644 prowler/providers/aws/services/backup/backup_service.py create mode 100644 prowler/providers/aws/services/backup/backup_vaults_encrypted/__init__.py create mode 100644 prowler/providers/aws/services/backup/backup_vaults_encrypted/backup_vaults_encrypted.metadata.json create mode 100644 prowler/providers/aws/services/backup/backup_vaults_encrypted/backup_vaults_encrypted.py create mode 100644 prowler/providers/aws/services/backup/backup_vaults_exist/__init__.py create mode 100644 prowler/providers/aws/services/backup/backup_vaults_exist/backup_vaults_exist.metadata.json create mode 100644 prowler/providers/aws/services/backup/backup_vaults_exist/backup_vaults_exist.py create mode 100644 tests/providers/aws/services/backup/backup_plans_exist/backup_plans_exist_test.py create mode 100644 tests/providers/aws/services/backup/backup_reportplans_exist/backup_reportplans_exist_test.py create mode 100644 tests/providers/aws/services/backup/backup_service_test.py create mode 100644 tests/providers/aws/services/backup/backup_vaults_encrypted/backup_vaults_encrypted_test.py create mode 100644 tests/providers/aws/services/backup/backup_vaults_exist/backup_vaults_exist_test.py diff --git a/prowler/providers/aws/services/backup/__init__.py b/prowler/providers/aws/services/backup/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/prowler/providers/aws/services/backup/backup_client.py b/prowler/providers/aws/services/backup/backup_client.py new file mode 100644 index 00000000..db6fdfe6 --- /dev/null +++ b/prowler/providers/aws/services/backup/backup_client.py @@ -0,0 +1,4 @@ +from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info +from prowler.providers.aws.services.backup.backup_service import Backup + +backup_client = Backup(current_audit_info) diff --git a/prowler/providers/aws/services/backup/backup_plans_exist/__init__.py b/prowler/providers/aws/services/backup/backup_plans_exist/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/prowler/providers/aws/services/backup/backup_plans_exist/backup_plans_exist.metadata.json b/prowler/providers/aws/services/backup/backup_plans_exist/backup_plans_exist.metadata.json new file mode 100644 index 00000000..f0d3b790 --- /dev/null +++ b/prowler/providers/aws/services/backup/backup_plans_exist/backup_plans_exist.metadata.json @@ -0,0 +1,34 @@ +{ + "Provider": "aws", + "CheckID": "backup_plans_exist", + "CheckTitle": "Ensure that there is at least one AWS Backup plan", + "CheckType": [ + "Recover", + "Resilience", + "Backup" + ], + "ServiceName": "backup", + "SubServiceName": "", + "ResourceIdTemplate": "arn:partition:service:region:account-id:backup-plan:backup-plan-id", + "Severity": "medium", + "ResourceType": "AwsBackupBackupPlan", + "Description": "This check ensures that there is at least one backup plan in place.", + "Risk": "Without a backup plan, an organization may be at risk of losing important data due to accidental deletion, system failures, or natural disasters. This can result in significant financial and reputational damage for the organization.", + "RelatedUrl": "", + "Remediation": { + "Code": { + "CLI": "aws backup create-backup-plan --backup-plan --backup-plan-rule ", + "NativeIaC": "", + "Other": "", + "Terraform": "" + }, + "Recommendation": { + "Text": "Use AWS Backup to create backup plans for your critical data and services.", + "Url": "" + } + }, + "Categories": [], + "DependsOn": [], + "RelatedTo": [], + "Notes": "" +} diff --git a/prowler/providers/aws/services/backup/backup_plans_exist/backup_plans_exist.py b/prowler/providers/aws/services/backup/backup_plans_exist/backup_plans_exist.py new file mode 100644 index 00000000..d82268c7 --- /dev/null +++ b/prowler/providers/aws/services/backup/backup_plans_exist/backup_plans_exist.py @@ -0,0 +1,22 @@ +from prowler.lib.check.models import Check, Check_Report_AWS +from prowler.providers.aws.services.backup.backup_client import backup_client + + +class backup_plans_exist(Check): + def execute(self): + findings = [] + report = Check_Report_AWS(self.metadata()) + report.status = "FAIL" + report.status_extended = "No Backup Plan Exist" + report.resource_arn = "" + report.resource_id = "No Backups" + report.region = backup_client.general_region + if backup_client.backup_plans: + report.status = "PASS" + report.status_extended = f"At least one backup plan exists: { backup_client.backup_plans[0].name}" + report.resource_arn = backup_client.backup_plans[0].arn + report.resource_id = backup_client.backup_plans[0].name + report.region = backup_client.backup_plans[0].region + + findings.append(report) + return findings diff --git a/prowler/providers/aws/services/backup/backup_reportplans_exist/__init__.py b/prowler/providers/aws/services/backup/backup_reportplans_exist/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/prowler/providers/aws/services/backup/backup_reportplans_exist/backup_reportplans_exist.metadata.json b/prowler/providers/aws/services/backup/backup_reportplans_exist/backup_reportplans_exist.metadata.json new file mode 100644 index 00000000..3f83b9a5 --- /dev/null +++ b/prowler/providers/aws/services/backup/backup_reportplans_exist/backup_reportplans_exist.metadata.json @@ -0,0 +1,34 @@ +{ + "Provider": "aws", + "CheckID": "backup_reportplans_exist", + "CheckTitle": "Ensure that there is at least one AWS Backup report plan", + "CheckType": [ + "Recover", + "Resilience", + "Backup" + ], + "ServiceName": "backup", + "SubServiceName": "", + "ResourceIdTemplate": "arn:partition:service:region:account-id:backup-report-plan:backup-report-plan-id", + "Severity": "low", + "ResourceType": "Other", + "Description": "This check ensures that there is at least one backup report plan in place.", + "Risk": "Without a backup report plan, an organization may lack visibility into the success or failure of backup operations.", + "RelatedUrl": "", + "Remediation": { + "Code": { + "CLI": "aws backup create-report-plan --report-plan-name --report-delivery-channel --report-setting ", + "NativeIaC": "", + "Other": "", + "Terraform": "" + }, + "Recommendation": { + "Text": "Use AWS Backup to create backup report plans that provide visibility into the success or failure of backup operations.", + "Url": "" + } + }, + "Categories": [], + "DependsOn": [], + "RelatedTo": [], + "Notes": "" +} diff --git a/prowler/providers/aws/services/backup/backup_reportplans_exist/backup_reportplans_exist.py b/prowler/providers/aws/services/backup/backup_reportplans_exist/backup_reportplans_exist.py new file mode 100644 index 00000000..ff5e9044 --- /dev/null +++ b/prowler/providers/aws/services/backup/backup_reportplans_exist/backup_reportplans_exist.py @@ -0,0 +1,22 @@ +from prowler.lib.check.models import Check, Check_Report_AWS +from prowler.providers.aws.services.backup.backup_client import backup_client + + +class backup_reportplans_exist(Check): + def execute(self): + findings = [] + report = Check_Report_AWS(self.metadata()) + report.status = "FAIL" + report.status_extended = "No Backup Report Plan Exist" + report.resource_arn = "" + report.resource_id = "No Backups" + report.region = backup_client.general_region + if backup_client.backup_report_plans: + report.status = "PASS" + report.status_extended = f"At least one backup report plan exists: { backup_client.backup_report_plans[0].name}" + report.resource_arn = backup_client.backup_report_plans[0].arn + report.resource_id = backup_client.backup_report_plans[0].name + report.region = backup_client.backup_report_plans[0].region + + findings.append(report) + return findings diff --git a/prowler/providers/aws/services/backup/backup_service.py b/prowler/providers/aws/services/backup/backup_service.py new file mode 100644 index 00000000..0a6b4caa --- /dev/null +++ b/prowler/providers/aws/services/backup/backup_service.py @@ -0,0 +1,169 @@ +import threading +from datetime import datetime + +from pydantic import BaseModel + +from prowler.lib.logger import logger +from prowler.lib.scan_filters.scan_filters import is_resource_filtered +from prowler.providers.aws.aws_provider import generate_regional_clients + + +################## Backup +class Backup: + def __init__(self, audit_info): + self.service = "backup" + self.session = audit_info.audit_session + self.audited_account = audit_info.audited_account + self.audit_resources = audit_info.audit_resources + self.regional_clients = generate_regional_clients(self.service, audit_info) + self.general_region = audit_info.profile_region + self.backup_vaults = [] + self.__threading_call__(self.__list_backup_vaults__) + self.backup_plans = [] + self.__threading_call__(self.__list_backup_plans__) + self.backup_report_plans = [] + self.__threading_call__(self.__list_backup_report_plans__) + + def __threading_call__(self, call): + threads = [] + for regional_client in self.regional_clients.values(): + threads.append(threading.Thread(target=call, args=(regional_client,))) + for t in threads: + t.start() + for t in threads: + t.join() + + def __list_backup_vaults__(self, regional_client): + logger.info("Backup - Listing Backup Vaults...") + try: + list_backup_vaults_paginator = regional_client.get_paginator( + "list_backup_vaults" + ) + for page in list_backup_vaults_paginator.paginate(): + for configuration in page.get("BackupVaultList"): + if not self.audit_resources or ( + is_resource_filtered( + configuration.get("BackupVaultArn"), + self.audit_resources, + ) + ): + self.backup_vaults.append( + BackupVault( + arn=configuration.get("BackupVaultArn"), + name=configuration.get("BackupVaultName"), + region=regional_client.region, + encryption=configuration.get("EncryptionKeyArn"), + recovery_points=configuration.get( + "NumberOfRecoveryPoints" + ), + locked=configuration.get("Locked"), + min_retention_days=configuration.get( + "MinRetentionDays" + ), + max_retention_days=configuration.get( + "MaxRetentionDays" + ), + ) + ) + + except Exception as error: + logger.error( + f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + ) + + def __list_backup_plans__(self, regional_client): + logger.info("Backup - Listing Backup Plans...") + try: + list_backup_plans_paginator = regional_client.get_paginator( + "list_backup_plans" + ) + for page in list_backup_plans_paginator.paginate(): + for configuration in page.get("BackupPlansList"): + if not self.audit_resources or ( + is_resource_filtered( + configuration.get("BackupPlanArn"), + self.audit_resources, + ) + ): + self.backup_plans.append( + BackupPlan( + arn=configuration.get("BackupPlanArn"), + id=configuration.get("BackupPlanId"), + region=regional_client.region, + name=configuration.get("BackupPlanName"), + version_id=configuration.get("VersionId"), + last_execution_date=configuration.get( + "LastExecutionDate" + ), + advanced_settings=configuration.get( + "AdvancedBackupSettings" + ), + ) + ) + + except Exception as error: + logger.error( + f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + ) + + def __list_backup_report_plans__(self, regional_client): + logger.info("Backup - Listing Backup Report Plans...") + + try: + list_backup_report_plans = regional_client.list_report_plans()[ + "ReportPlans" + ] + for backup_report_plan in list_backup_report_plans: + if not self.audit_resources or ( + is_resource_filtered( + backup_report_plan.get("ReportPlanArn"), + self.audit_resources, + ) + ): + self.backup_report_plans.append( + BackupReportPlan( + arn=backup_report_plan.get("ReportPlanArn"), + region=regional_client.region, + name=backup_report_plan.get("ReportPlanName"), + last_attempted_execution_date=backup_report_plan.get( + "LastAttemptedExecutionTime" + ), + last_successful_execution_date=backup_report_plan.get( + "LastSuccessfulExecutionTime" + ), + ) + ) + + except Exception as error: + logger.error( + f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + ) + + +class BackupVault(BaseModel): + arn: str + name: str + region: str + encryption: str + recovery_points: int + locked: bool + min_retention_days: int = None + max_retention_days: int = None + + +class BackupPlan(BaseModel): + arn: str + id: str + region: str + name: str + version_id: str + last_execution_date: datetime + advanced_settings: list + + +class BackupReportPlan(BaseModel): + arn: str + region: str + name: str + last_attempted_execution_date: datetime + last_successful_execution_date: datetime diff --git a/prowler/providers/aws/services/backup/backup_vaults_encrypted/__init__.py b/prowler/providers/aws/services/backup/backup_vaults_encrypted/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/prowler/providers/aws/services/backup/backup_vaults_encrypted/backup_vaults_encrypted.metadata.json b/prowler/providers/aws/services/backup/backup_vaults_encrypted/backup_vaults_encrypted.metadata.json new file mode 100644 index 00000000..9bafabf2 --- /dev/null +++ b/prowler/providers/aws/services/backup/backup_vaults_encrypted/backup_vaults_encrypted.metadata.json @@ -0,0 +1,35 @@ +{ + "Provider": "aws", + "CheckID": "backup_vaults_encrypted", + "CheckTitle": "Ensure that AWS Backup vaults are encrypted with AWS KMS", + "CheckType": [ + "Recover", + "Resilience", + "Backup", + "Data Protection" + ], + "ServiceName": "backup", + "SubServiceName": "", + "ResourceIdTemplate": "arn:partition:service:region:account-id:backup-vault:backup-vault-id", + "Severity": "medium", + "ResourceType": "AwsBackupBackupVault", + "Description": "This check ensures that AWS Backup vaults are encrypted with AWS KMS.", + "Risk": "Without encryption using AWS KMS, an organization's backup data may be at risk of unauthorized access, which can lead to data breaches and other security incidents.", + "RelatedUrl": "", + "Remediation": { + "Code": { + "CLI": "aws backup update-backup-vault --backup-vault-name --encryption-key-arn ", + "NativeIaC": "", + "Other": "", + "Terraform": "" + }, + "Recommendation": { + "Text": "Use AWS KMS to encrypt your AWS Backup vaults and backup data.", + "Url": "" + } + }, + "Categories": [], + "DependsOn": [], + "RelatedTo": [], + "Notes": "" +} diff --git a/prowler/providers/aws/services/backup/backup_vaults_encrypted/backup_vaults_encrypted.py b/prowler/providers/aws/services/backup/backup_vaults_encrypted/backup_vaults_encrypted.py new file mode 100644 index 00000000..c2ca3e69 --- /dev/null +++ b/prowler/providers/aws/services/backup/backup_vaults_encrypted/backup_vaults_encrypted.py @@ -0,0 +1,28 @@ +from prowler.lib.check.models import Check, Check_Report_AWS +from prowler.providers.aws.services.backup.backup_client import backup_client + + +class backup_vaults_encrypted(Check): + def execute(self): + findings = [] + + for backup_vault in backup_client.backup_vaults: + # By default we assume that the result is fail + report = Check_Report_AWS(self.metadata()) + report.status = "FAIL" + report.status_extended = ( + f"Backup Vault {backup_vault.name} is not encrypted" + ) + report.resource_arn = backup_vault.arn + report.resource_id = backup_vault.name + report.region = backup_vault.region + # if it is encrypted we only change the status and the status extended + if backup_vault.encryption: + report.status = "PASS" + report.status_extended = ( + f"Backup Vault {backup_vault.name} is encrypted" + ) + # then we store the finding + findings.append(report) + + return findings diff --git a/prowler/providers/aws/services/backup/backup_vaults_exist/__init__.py b/prowler/providers/aws/services/backup/backup_vaults_exist/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/prowler/providers/aws/services/backup/backup_vaults_exist/backup_vaults_exist.metadata.json b/prowler/providers/aws/services/backup/backup_vaults_exist/backup_vaults_exist.metadata.json new file mode 100644 index 00000000..c25d2d0c --- /dev/null +++ b/prowler/providers/aws/services/backup/backup_vaults_exist/backup_vaults_exist.metadata.json @@ -0,0 +1,34 @@ +{ + "Provider": "aws", + "CheckID": "backup_vaults_exist", + "CheckTitle": "Esure AWS Backup vaults exist", + "CheckType": [ + "Recover", + "Resilience", + "Backup" + ], + "ServiceName": "backup", + "SubServiceName": "", + "ResourceIdTemplate": "arn:partition:service:region:account-id:backup-vault:backup-vault-id", + "Severity": "medium", + "ResourceType": "AwsBackupBackupVault", + "Description": "This check ensures that AWS Backup vaults exist to provide a secure and durable storage location for backup data.", + "Risk": "Without an AWS Backup vault, an organization's critical data may be at risk of being lost in the event of an accidental deletion, system failures, or natural disasters.", + "RelatedUrl": "", + "Remediation": { + "Code": { + "CLI": "aws backup create-backup-vault --backup-vault-name ", + "NativeIaC": "", + "Other": "", + "Terraform": "" + }, + "Recommendation": { + "Text": "Use AWS Backup to create backup vaults for your critical data and services.", + "Url": "" + } + }, + "Categories": [], + "DependsOn": [], + "RelatedTo": [], + "Notes": "" +} diff --git a/prowler/providers/aws/services/backup/backup_vaults_exist/backup_vaults_exist.py b/prowler/providers/aws/services/backup/backup_vaults_exist/backup_vaults_exist.py new file mode 100644 index 00000000..5b299a4f --- /dev/null +++ b/prowler/providers/aws/services/backup/backup_vaults_exist/backup_vaults_exist.py @@ -0,0 +1,22 @@ +from prowler.lib.check.models import Check, Check_Report_AWS +from prowler.providers.aws.services.backup.backup_client import backup_client + + +class backup_vaults_exist(Check): + def execute(self): + findings = [] + report = Check_Report_AWS(self.metadata()) + report.status = "FAIL" + report.status_extended = "No Backup Vault Exist" + report.resource_arn = "" + report.resource_id = "No Backups" + report.region = backup_client.general_region + if backup_client.backup_vaults: + report.status = "PASS" + report.status_extended = f"At least one backup vault exists: { backup_client.backup_vaults[0].name}" + report.resource_arn = backup_client.backup_vaults[0].arn + report.resource_id = backup_client.backup_vaults[0].name + report.region = backup_client.backup_vaults[0].region + + findings.append(report) + return findings diff --git a/tests/providers/aws/services/backup/backup_plans_exist/backup_plans_exist_test.py b/tests/providers/aws/services/backup/backup_plans_exist/backup_plans_exist_test.py new file mode 100644 index 00000000..18fa5924 --- /dev/null +++ b/tests/providers/aws/services/backup/backup_plans_exist/backup_plans_exist_test.py @@ -0,0 +1,67 @@ +from datetime import datetime +from unittest import mock + +from prowler.providers.aws.services.backup.backup_service import BackupPlan + +AWS_REGION = "eu-west-1" + + +class Test_backup_plans_exist: + def test_no_backup_plans(self): + backup_client = mock.MagicMock + backup_client.general_region = AWS_REGION + backup_client.backup_plans = [] + with mock.patch( + "prowler.providers.aws.services.backup.backup_service.Backup", + new=backup_client, + ): + # Test Check + from prowler.providers.aws.services.backup.backup_plans_exist.backup_plans_exist import ( + backup_plans_exist, + ) + + check = backup_plans_exist() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "FAIL" + assert result[0].status_extended == "No Backup Plan Exist" + assert result[0].resource_id == "No Backups" + assert result[0].resource_arn == "" + assert result[0].region == AWS_REGION + + def test_one_backup_plan(self): + backup_client = mock.MagicMock + backup_client.general_region = AWS_REGION + backup_client.backup_plans = [ + BackupPlan( + arn="ARN", + id="MyBackupPlan", + region=AWS_REGION, + name="MyBackupPlan", + version_id="version_id", + last_execution_date=datetime(2015, 1, 1), + advanced_settings=[], + ) + ] + with mock.patch( + "prowler.providers.aws.services.backup.backup_service.Backup", + new=backup_client, + ): + # Test Check + from prowler.providers.aws.services.backup.backup_plans_exist.backup_plans_exist import ( + backup_plans_exist, + ) + + check = backup_plans_exist() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "PASS" + assert ( + result[0].status_extended + == "At least one backup plan exists: " + result[0].resource_id + ) + assert result[0].resource_id == "MyBackupPlan" + assert result[0].resource_arn == "ARN" + assert result[0].region == AWS_REGION diff --git a/tests/providers/aws/services/backup/backup_reportplans_exist/backup_reportplans_exist_test.py b/tests/providers/aws/services/backup/backup_reportplans_exist/backup_reportplans_exist_test.py new file mode 100644 index 00000000..071628d9 --- /dev/null +++ b/tests/providers/aws/services/backup/backup_reportplans_exist/backup_reportplans_exist_test.py @@ -0,0 +1,66 @@ +from datetime import datetime +from unittest import mock + +from prowler.providers.aws.services.backup.backup_service import BackupReportPlan + +AWS_REGION = "eu-west-1" + + +class Test_backup_reportplans_exist: + def test_no_backup_report_plans(self): + backup_client = mock.MagicMock + backup_client.general_region = AWS_REGION + backup_client.backup_report_plans = [] + with mock.patch( + "prowler.providers.aws.services.backup.backup_service.Backup", + new=backup_client, + ): + # Test Check + from prowler.providers.aws.services.backup.backup_reportplans_exist.backup_reportplans_exist import ( + backup_reportplans_exist, + ) + + check = backup_reportplans_exist() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "FAIL" + assert result[0].status_extended == "No Backup Report Plan Exist" + assert result[0].resource_id == "No Backups" + assert result[0].resource_arn == "" + assert result[0].region == AWS_REGION + + def test_one_backup_report_plan(self): + backup_client = mock.MagicMock + backup_client.general_region = AWS_REGION + backup_client.backup_report_plans = [ + BackupReportPlan( + arn="ARN", + region=AWS_REGION, + name="MyBackupReportPlan", + last_attempted_execution_date=datetime(2015, 1, 1), + last_successful_execution_date=datetime(2015, 1, 1), + ) + ] + + with mock.patch( + "prowler.providers.aws.services.backup.backup_service.Backup", + new=backup_client, + ): + # Test Check + from prowler.providers.aws.services.backup.backup_reportplans_exist.backup_reportplans_exist import ( + backup_reportplans_exist, + ) + + check = backup_reportplans_exist() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "PASS" + assert ( + result[0].status_extended + == "At least one backup report plan exists: " + result[0].resource_id + ) + assert result[0].resource_id == "MyBackupReportPlan" + assert result[0].resource_arn == "ARN" + assert result[0].region == AWS_REGION diff --git a/tests/providers/aws/services/backup/backup_service_test.py b/tests/providers/aws/services/backup/backup_service_test.py new file mode 100644 index 00000000..9d1396c8 --- /dev/null +++ b/tests/providers/aws/services/backup/backup_service_test.py @@ -0,0 +1,157 @@ +from datetime import datetime +from unittest.mock import patch + +import botocore +from boto3 import session + +from prowler.providers.aws.lib.audit_info.audit_info import AWS_Audit_Info +from prowler.providers.aws.services.backup.backup_service import Backup + +# Mock Test Region +AWS_REGION = "eu-west-1" + +# Mocking Backup Calls +make_api_call = botocore.client.BaseClient._make_api_call + + +def mock_make_api_call(self, operation_name, kwarg): + """ + Mock every AWS API call + """ + if operation_name == "ListBackupVaults": + return { + "BackupVaultList": [ + { + "BackupVaultArn": "ARN", + "BackupVaultName": "Test Vault", + "EncryptionKeyArn": "", + "NumberOfRecoveryPoints": 0, + "Locked": True, + "MinRetentionDays": 1, + "MaxRetentionDays": 2, + } + ] + } + if operation_name == "ListBackupPlans": + return { + "BackupPlansList": [ + { + "BackupPlanArn": "ARN", + "BackupPlanId": "ID", + "BackupPlanName": "Test Plan", + "VersionId": "test_version_id", + "LastExecutionDate": datetime(2015, 1, 1), + "AdvancedBackupSettings": [], + } + ] + } + if operation_name == "ListReportPlans": + return { + "ReportPlans": [ + { + "ReportPlanArn": "ARN", + "ReportPlanName": "Test Report Plan", + "LastAttemptedExecutionTime": datetime(2015, 1, 1), + "LastSuccessfulExecutionTime": datetime(2015, 1, 1), + } + ] + } + return make_api_call(self, operation_name, kwarg) + + +def mock_generate_regional_clients(service, audit_info): + regional_client = audit_info.audit_session.client(service, region_name=AWS_REGION) + regional_client.region = AWS_REGION + return {AWS_REGION: regional_client} + + +# Patch every AWS call using Boto3 and generate_regional_clients to have 1 client +@patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call) +@patch( + "prowler.providers.aws.services.backup.backup_service.generate_regional_clients", + new=mock_generate_regional_clients, +) +class Test_Backup_Service: + + # Mocked Audit Info + def set_mocked_audit_info(self): + audit_info = AWS_Audit_Info( + session_config=None, + original_session=None, + audit_session=session.Session( + profile_name=None, + botocore_session=None, + ), + audited_account=None, + audited_user_id=None, + audited_partition="aws", + audited_identity_arn=None, + profile=None, + profile_region=None, + credentials=None, + assumed_role_info=None, + audited_regions=None, + organizations_metadata=None, + audit_resources=None, + ) + return audit_info + + # Test Backup Client + def test__get_client__(self): + audit_info = self.set_mocked_audit_info() + backup = Backup(audit_info) + assert backup.regional_clients[AWS_REGION].__class__.__name__ == "Backup" + + # Test Backup Session + def test__get_session__(self): + audit_info = self.set_mocked_audit_info() + access_analyzer = Backup(audit_info) + assert access_analyzer.session.__class__.__name__ == "Session" + + # Test Backup Service + def test__get_service__(self): + audit_info = self.set_mocked_audit_info() + access_analyzer = Backup(audit_info) + assert access_analyzer.service == "backup" + + # Test Backup List Backup Vaults + def test__list_backup_vaults__(self): + audit_info = self.set_mocked_audit_info() + backup = Backup(audit_info) + assert len(backup.backup_vaults) == 1 + assert backup.backup_vaults[0].arn == "ARN" + assert backup.backup_vaults[0].name == "Test Vault" + assert backup.backup_vaults[0].region == AWS_REGION + assert backup.backup_vaults[0].encryption == "" + assert backup.backup_vaults[0].recovery_points == 0 + assert backup.backup_vaults[0].locked is True + assert backup.backup_vaults[0].min_retention_days == 1 + assert backup.backup_vaults[0].max_retention_days == 2 + + # Test Backup List Backup Plans + def test__list_backup_plans__(self): + audit_info = self.set_mocked_audit_info() + backup = Backup(audit_info) + assert len(backup.backup_plans) == 1 + assert backup.backup_plans[0].arn == "ARN" + assert backup.backup_plans[0].id == "ID" + assert backup.backup_plans[0].region == AWS_REGION + assert backup.backup_plans[0].name == "Test Plan" + assert backup.backup_plans[0].version_id == "test_version_id" + assert backup.backup_plans[0].last_execution_date == datetime(2015, 1, 1) + assert backup.backup_plans[0].advanced_settings == [] + + # Test Backup List Report Plans + def test__list_backup_report_plans__(self): + audit_info = self.set_mocked_audit_info() + backup = Backup(audit_info) + assert len(backup.backup_report_plans) == 1 + assert backup.backup_report_plans[0].arn == "ARN" + assert backup.backup_report_plans[0].region == AWS_REGION + assert backup.backup_report_plans[0].name == "Test Report Plan" + assert backup.backup_report_plans[0].last_attempted_execution_date == datetime( + 2015, 1, 1 + ) + assert backup.backup_report_plans[0].last_successful_execution_date == datetime( + 2015, 1, 1 + ) diff --git a/tests/providers/aws/services/backup/backup_vaults_encrypted/backup_vaults_encrypted_test.py b/tests/providers/aws/services/backup/backup_vaults_encrypted/backup_vaults_encrypted_test.py new file mode 100644 index 00000000..fb319664 --- /dev/null +++ b/tests/providers/aws/services/backup/backup_vaults_encrypted/backup_vaults_encrypted_test.py @@ -0,0 +1,98 @@ +from unittest import mock + +from prowler.providers.aws.services.backup.backup_service import BackupVault + +AWS_REGION = "eu-west-1" + + +class Test_backup_vaults_encrypted: + def test_no_backup_vaults(self): + backup_client = mock.MagicMock + backup_client.backup_vaults = [] + with mock.patch( + "prowler.providers.aws.services.backup.backup_service.Backup", + new=backup_client, + ): + # Test Check + from prowler.providers.aws.services.backup.backup_vaults_encrypted.backup_vaults_encrypted import ( + backup_vaults_encrypted, + ) + + check = backup_vaults_encrypted() + result = check.execute() + + assert len(result) == 0 + + def test_one_backup_vault_unencrypted(self): + backup_client = mock.MagicMock + backup_client.backup_vaults = [ + BackupVault( + arn="ARN", + name="MyBackupVault", + region=AWS_REGION, + encryption="", + recovery_points=1, + locked=True, + min_retention_days=1, + max_retention_days=2, + ) + ] + + with mock.patch( + "prowler.providers.aws.services.backup.backup_service.Backup", + new=backup_client, + ): + # Test Check + from prowler.providers.aws.services.backup.backup_vaults_encrypted.backup_vaults_encrypted import ( + backup_vaults_encrypted, + ) + + check = backup_vaults_encrypted() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "FAIL" + assert ( + result[0].status_extended + == "Backup Vault " + result[0].resource_id + " is not encrypted" + ) + assert result[0].resource_id == "MyBackupVault" + assert result[0].resource_arn == "ARN" + assert result[0].region == AWS_REGION + + def test_one_backup_vault_encrypted(self): + backup_client = mock.MagicMock + backup_client.backup_vaults = [ + BackupVault( + arn="ARN", + name="MyBackupVault", + region=AWS_REGION, + encryption="test", + recovery_points=1, + locked=True, + min_retention_days=1, + max_retention_days=2, + ) + ] + + with mock.patch( + "prowler.providers.aws.services.backup.backup_service.Backup", + new=backup_client, + ): + # Test Check + from prowler.providers.aws.services.backup.backup_vaults_encrypted.backup_vaults_encrypted import ( + backup_vaults_encrypted, + ) + + check = backup_vaults_encrypted() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "PASS" + assert ( + result[0].status_extended + == "Backup Vault " + result[0].resource_id + " is encrypted" + ) + assert result[0].resource_id == "MyBackupVault" + assert result[0].resource_arn == "ARN" + assert result[0].region == AWS_REGION diff --git a/tests/providers/aws/services/backup/backup_vaults_exist/backup_vaults_exist_test.py b/tests/providers/aws/services/backup/backup_vaults_exist/backup_vaults_exist_test.py new file mode 100644 index 00000000..4dcba779 --- /dev/null +++ b/tests/providers/aws/services/backup/backup_vaults_exist/backup_vaults_exist_test.py @@ -0,0 +1,68 @@ +from unittest import mock + +from prowler.providers.aws.services.backup.backup_service import BackupVault + +AWS_REGION = "eu-west-1" + + +class Test_backup_vaults_exist: + def test_no_backup_vaults(self): + backup_client = mock.MagicMock + backup_client.general_region = AWS_REGION + backup_client.backup_vaults = [] + with mock.patch( + "prowler.providers.aws.services.backup.backup_service.Backup", + new=backup_client, + ): + # Test Check + from prowler.providers.aws.services.backup.backup_vaults_exist.backup_vaults_exist import ( + backup_vaults_exist, + ) + + check = backup_vaults_exist() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "FAIL" + assert result[0].status_extended == "No Backup Vault Exist" + assert result[0].resource_id == "No Backups" + assert result[0].resource_arn == "" + assert result[0].region == AWS_REGION + + def test_one_backup_vault(self): + backup_client = mock.MagicMock + backup_client.general_region = AWS_REGION + backup_client.backup_vaults = [ + BackupVault( + arn="ARN", + name="MyBackupVault", + region=AWS_REGION, + encryption="", + recovery_points=1, + locked=True, + min_retention_days=1, + max_retention_days=2, + ) + ] + + with mock.patch( + "prowler.providers.aws.services.backup.backup_service.Backup", + new=backup_client, + ): + # Test Check + from prowler.providers.aws.services.backup.backup_vaults_exist.backup_vaults_exist import ( + backup_vaults_exist, + ) + + check = backup_vaults_exist() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "PASS" + assert ( + result[0].status_extended + == "At least one backup vault exists: " + result[0].resource_id + ) + assert result[0].resource_id == "MyBackupVault" + assert result[0].resource_arn == "ARN" + assert result[0].region == AWS_REGION