mirror of
https://github.com/ghndrx/prowler.git
synced 2026-02-10 14:55:00 +00:00
feat(backup): New backup service and checks (#2172)
Co-authored-by: Nacho Rivera <nacho@verica.io>
This commit is contained in:
0
prowler/providers/aws/services/backup/__init__.py
Normal file
0
prowler/providers/aws/services/backup/__init__.py
Normal file
4
prowler/providers/aws/services/backup/backup_client.py
Normal file
4
prowler/providers/aws/services/backup/backup_client.py
Normal file
@@ -0,0 +1,4 @@
|
||||
from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info
|
||||
from prowler.providers.aws.services.backup.backup_service import Backup
|
||||
|
||||
backup_client = Backup(current_audit_info)
|
||||
@@ -0,0 +1,34 @@
|
||||
{
|
||||
"Provider": "aws",
|
||||
"CheckID": "backup_plans_exist",
|
||||
"CheckTitle": "Ensure that there is at least one AWS Backup plan",
|
||||
"CheckType": [
|
||||
"Recover",
|
||||
"Resilience",
|
||||
"Backup"
|
||||
],
|
||||
"ServiceName": "backup",
|
||||
"SubServiceName": "",
|
||||
"ResourceIdTemplate": "arn:partition:service:region:account-id:backup-plan:backup-plan-id",
|
||||
"Severity": "medium",
|
||||
"ResourceType": "AwsBackupBackupPlan",
|
||||
"Description": "This check ensures that there is at least one backup plan in place.",
|
||||
"Risk": "Without a backup plan, an organization may be at risk of losing important data due to accidental deletion, system failures, or natural disasters. This can result in significant financial and reputational damage for the organization.",
|
||||
"RelatedUrl": "",
|
||||
"Remediation": {
|
||||
"Code": {
|
||||
"CLI": "aws backup create-backup-plan --backup-plan <backup_plan_name> --backup-plan-rule <backup_rule_name>",
|
||||
"NativeIaC": "",
|
||||
"Other": "",
|
||||
"Terraform": ""
|
||||
},
|
||||
"Recommendation": {
|
||||
"Text": "Use AWS Backup to create backup plans for your critical data and services.",
|
||||
"Url": ""
|
||||
}
|
||||
},
|
||||
"Categories": [],
|
||||
"DependsOn": [],
|
||||
"RelatedTo": [],
|
||||
"Notes": ""
|
||||
}
|
||||
@@ -0,0 +1,22 @@
|
||||
from prowler.lib.check.models import Check, Check_Report_AWS
|
||||
from prowler.providers.aws.services.backup.backup_client import backup_client
|
||||
|
||||
|
||||
class backup_plans_exist(Check):
|
||||
def execute(self):
|
||||
findings = []
|
||||
report = Check_Report_AWS(self.metadata())
|
||||
report.status = "FAIL"
|
||||
report.status_extended = "No Backup Plan Exist"
|
||||
report.resource_arn = ""
|
||||
report.resource_id = "No Backups"
|
||||
report.region = backup_client.general_region
|
||||
if backup_client.backup_plans:
|
||||
report.status = "PASS"
|
||||
report.status_extended = f"At least one backup plan exists: { backup_client.backup_plans[0].name}"
|
||||
report.resource_arn = backup_client.backup_plans[0].arn
|
||||
report.resource_id = backup_client.backup_plans[0].name
|
||||
report.region = backup_client.backup_plans[0].region
|
||||
|
||||
findings.append(report)
|
||||
return findings
|
||||
@@ -0,0 +1,34 @@
|
||||
{
|
||||
"Provider": "aws",
|
||||
"CheckID": "backup_reportplans_exist",
|
||||
"CheckTitle": "Ensure that there is at least one AWS Backup report plan",
|
||||
"CheckType": [
|
||||
"Recover",
|
||||
"Resilience",
|
||||
"Backup"
|
||||
],
|
||||
"ServiceName": "backup",
|
||||
"SubServiceName": "",
|
||||
"ResourceIdTemplate": "arn:partition:service:region:account-id:backup-report-plan:backup-report-plan-id",
|
||||
"Severity": "low",
|
||||
"ResourceType": "Other",
|
||||
"Description": "This check ensures that there is at least one backup report plan in place.",
|
||||
"Risk": "Without a backup report plan, an organization may lack visibility into the success or failure of backup operations.",
|
||||
"RelatedUrl": "",
|
||||
"Remediation": {
|
||||
"Code": {
|
||||
"CLI": "aws backup create-report-plan --report-plan-name <report-plan-name> --report-delivery-channel <value> --report-setting <value>",
|
||||
"NativeIaC": "",
|
||||
"Other": "",
|
||||
"Terraform": ""
|
||||
},
|
||||
"Recommendation": {
|
||||
"Text": "Use AWS Backup to create backup report plans that provide visibility into the success or failure of backup operations.",
|
||||
"Url": ""
|
||||
}
|
||||
},
|
||||
"Categories": [],
|
||||
"DependsOn": [],
|
||||
"RelatedTo": [],
|
||||
"Notes": ""
|
||||
}
|
||||
@@ -0,0 +1,22 @@
|
||||
from prowler.lib.check.models import Check, Check_Report_AWS
|
||||
from prowler.providers.aws.services.backup.backup_client import backup_client
|
||||
|
||||
|
||||
class backup_reportplans_exist(Check):
|
||||
def execute(self):
|
||||
findings = []
|
||||
report = Check_Report_AWS(self.metadata())
|
||||
report.status = "FAIL"
|
||||
report.status_extended = "No Backup Report Plan Exist"
|
||||
report.resource_arn = ""
|
||||
report.resource_id = "No Backups"
|
||||
report.region = backup_client.general_region
|
||||
if backup_client.backup_report_plans:
|
||||
report.status = "PASS"
|
||||
report.status_extended = f"At least one backup report plan exists: { backup_client.backup_report_plans[0].name}"
|
||||
report.resource_arn = backup_client.backup_report_plans[0].arn
|
||||
report.resource_id = backup_client.backup_report_plans[0].name
|
||||
report.region = backup_client.backup_report_plans[0].region
|
||||
|
||||
findings.append(report)
|
||||
return findings
|
||||
169
prowler/providers/aws/services/backup/backup_service.py
Normal file
169
prowler/providers/aws/services/backup/backup_service.py
Normal file
@@ -0,0 +1,169 @@
|
||||
import threading
|
||||
from datetime import datetime
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
from prowler.lib.logger import logger
|
||||
from prowler.lib.scan_filters.scan_filters import is_resource_filtered
|
||||
from prowler.providers.aws.aws_provider import generate_regional_clients
|
||||
|
||||
|
||||
################## Backup
|
||||
class Backup:
|
||||
def __init__(self, audit_info):
|
||||
self.service = "backup"
|
||||
self.session = audit_info.audit_session
|
||||
self.audited_account = audit_info.audited_account
|
||||
self.audit_resources = audit_info.audit_resources
|
||||
self.regional_clients = generate_regional_clients(self.service, audit_info)
|
||||
self.general_region = audit_info.profile_region
|
||||
self.backup_vaults = []
|
||||
self.__threading_call__(self.__list_backup_vaults__)
|
||||
self.backup_plans = []
|
||||
self.__threading_call__(self.__list_backup_plans__)
|
||||
self.backup_report_plans = []
|
||||
self.__threading_call__(self.__list_backup_report_plans__)
|
||||
|
||||
def __threading_call__(self, call):
|
||||
threads = []
|
||||
for regional_client in self.regional_clients.values():
|
||||
threads.append(threading.Thread(target=call, args=(regional_client,)))
|
||||
for t in threads:
|
||||
t.start()
|
||||
for t in threads:
|
||||
t.join()
|
||||
|
||||
def __list_backup_vaults__(self, regional_client):
|
||||
logger.info("Backup - Listing Backup Vaults...")
|
||||
try:
|
||||
list_backup_vaults_paginator = regional_client.get_paginator(
|
||||
"list_backup_vaults"
|
||||
)
|
||||
for page in list_backup_vaults_paginator.paginate():
|
||||
for configuration in page.get("BackupVaultList"):
|
||||
if not self.audit_resources or (
|
||||
is_resource_filtered(
|
||||
configuration.get("BackupVaultArn"),
|
||||
self.audit_resources,
|
||||
)
|
||||
):
|
||||
self.backup_vaults.append(
|
||||
BackupVault(
|
||||
arn=configuration.get("BackupVaultArn"),
|
||||
name=configuration.get("BackupVaultName"),
|
||||
region=regional_client.region,
|
||||
encryption=configuration.get("EncryptionKeyArn"),
|
||||
recovery_points=configuration.get(
|
||||
"NumberOfRecoveryPoints"
|
||||
),
|
||||
locked=configuration.get("Locked"),
|
||||
min_retention_days=configuration.get(
|
||||
"MinRetentionDays"
|
||||
),
|
||||
max_retention_days=configuration.get(
|
||||
"MaxRetentionDays"
|
||||
),
|
||||
)
|
||||
)
|
||||
|
||||
except Exception as error:
|
||||
logger.error(
|
||||
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
|
||||
)
|
||||
|
||||
def __list_backup_plans__(self, regional_client):
|
||||
logger.info("Backup - Listing Backup Plans...")
|
||||
try:
|
||||
list_backup_plans_paginator = regional_client.get_paginator(
|
||||
"list_backup_plans"
|
||||
)
|
||||
for page in list_backup_plans_paginator.paginate():
|
||||
for configuration in page.get("BackupPlansList"):
|
||||
if not self.audit_resources or (
|
||||
is_resource_filtered(
|
||||
configuration.get("BackupPlanArn"),
|
||||
self.audit_resources,
|
||||
)
|
||||
):
|
||||
self.backup_plans.append(
|
||||
BackupPlan(
|
||||
arn=configuration.get("BackupPlanArn"),
|
||||
id=configuration.get("BackupPlanId"),
|
||||
region=regional_client.region,
|
||||
name=configuration.get("BackupPlanName"),
|
||||
version_id=configuration.get("VersionId"),
|
||||
last_execution_date=configuration.get(
|
||||
"LastExecutionDate"
|
||||
),
|
||||
advanced_settings=configuration.get(
|
||||
"AdvancedBackupSettings"
|
||||
),
|
||||
)
|
||||
)
|
||||
|
||||
except Exception as error:
|
||||
logger.error(
|
||||
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
|
||||
)
|
||||
|
||||
def __list_backup_report_plans__(self, regional_client):
|
||||
logger.info("Backup - Listing Backup Report Plans...")
|
||||
|
||||
try:
|
||||
list_backup_report_plans = regional_client.list_report_plans()[
|
||||
"ReportPlans"
|
||||
]
|
||||
for backup_report_plan in list_backup_report_plans:
|
||||
if not self.audit_resources or (
|
||||
is_resource_filtered(
|
||||
backup_report_plan.get("ReportPlanArn"),
|
||||
self.audit_resources,
|
||||
)
|
||||
):
|
||||
self.backup_report_plans.append(
|
||||
BackupReportPlan(
|
||||
arn=backup_report_plan.get("ReportPlanArn"),
|
||||
region=regional_client.region,
|
||||
name=backup_report_plan.get("ReportPlanName"),
|
||||
last_attempted_execution_date=backup_report_plan.get(
|
||||
"LastAttemptedExecutionTime"
|
||||
),
|
||||
last_successful_execution_date=backup_report_plan.get(
|
||||
"LastSuccessfulExecutionTime"
|
||||
),
|
||||
)
|
||||
)
|
||||
|
||||
except Exception as error:
|
||||
logger.error(
|
||||
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
|
||||
)
|
||||
|
||||
|
||||
class BackupVault(BaseModel):
|
||||
arn: str
|
||||
name: str
|
||||
region: str
|
||||
encryption: str
|
||||
recovery_points: int
|
||||
locked: bool
|
||||
min_retention_days: int = None
|
||||
max_retention_days: int = None
|
||||
|
||||
|
||||
class BackupPlan(BaseModel):
|
||||
arn: str
|
||||
id: str
|
||||
region: str
|
||||
name: str
|
||||
version_id: str
|
||||
last_execution_date: datetime
|
||||
advanced_settings: list
|
||||
|
||||
|
||||
class BackupReportPlan(BaseModel):
|
||||
arn: str
|
||||
region: str
|
||||
name: str
|
||||
last_attempted_execution_date: datetime
|
||||
last_successful_execution_date: datetime
|
||||
@@ -0,0 +1,35 @@
|
||||
{
|
||||
"Provider": "aws",
|
||||
"CheckID": "backup_vaults_encrypted",
|
||||
"CheckTitle": "Ensure that AWS Backup vaults are encrypted with AWS KMS",
|
||||
"CheckType": [
|
||||
"Recover",
|
||||
"Resilience",
|
||||
"Backup",
|
||||
"Data Protection"
|
||||
],
|
||||
"ServiceName": "backup",
|
||||
"SubServiceName": "",
|
||||
"ResourceIdTemplate": "arn:partition:service:region:account-id:backup-vault:backup-vault-id",
|
||||
"Severity": "medium",
|
||||
"ResourceType": "AwsBackupBackupVault",
|
||||
"Description": "This check ensures that AWS Backup vaults are encrypted with AWS KMS.",
|
||||
"Risk": "Without encryption using AWS KMS, an organization's backup data may be at risk of unauthorized access, which can lead to data breaches and other security incidents.",
|
||||
"RelatedUrl": "",
|
||||
"Remediation": {
|
||||
"Code": {
|
||||
"CLI": "aws backup update-backup-vault --backup-vault-name <backup_vault_name> --encryption-key-arn <kms_key_arn>",
|
||||
"NativeIaC": "",
|
||||
"Other": "",
|
||||
"Terraform": ""
|
||||
},
|
||||
"Recommendation": {
|
||||
"Text": "Use AWS KMS to encrypt your AWS Backup vaults and backup data.",
|
||||
"Url": ""
|
||||
}
|
||||
},
|
||||
"Categories": [],
|
||||
"DependsOn": [],
|
||||
"RelatedTo": [],
|
||||
"Notes": ""
|
||||
}
|
||||
@@ -0,0 +1,28 @@
|
||||
from prowler.lib.check.models import Check, Check_Report_AWS
|
||||
from prowler.providers.aws.services.backup.backup_client import backup_client
|
||||
|
||||
|
||||
class backup_vaults_encrypted(Check):
|
||||
def execute(self):
|
||||
findings = []
|
||||
|
||||
for backup_vault in backup_client.backup_vaults:
|
||||
# By default we assume that the result is fail
|
||||
report = Check_Report_AWS(self.metadata())
|
||||
report.status = "FAIL"
|
||||
report.status_extended = (
|
||||
f"Backup Vault {backup_vault.name} is not encrypted"
|
||||
)
|
||||
report.resource_arn = backup_vault.arn
|
||||
report.resource_id = backup_vault.name
|
||||
report.region = backup_vault.region
|
||||
# if it is encrypted we only change the status and the status extended
|
||||
if backup_vault.encryption:
|
||||
report.status = "PASS"
|
||||
report.status_extended = (
|
||||
f"Backup Vault {backup_vault.name} is encrypted"
|
||||
)
|
||||
# then we store the finding
|
||||
findings.append(report)
|
||||
|
||||
return findings
|
||||
@@ -0,0 +1,34 @@
|
||||
{
|
||||
"Provider": "aws",
|
||||
"CheckID": "backup_vaults_exist",
|
||||
"CheckTitle": "Esure AWS Backup vaults exist",
|
||||
"CheckType": [
|
||||
"Recover",
|
||||
"Resilience",
|
||||
"Backup"
|
||||
],
|
||||
"ServiceName": "backup",
|
||||
"SubServiceName": "",
|
||||
"ResourceIdTemplate": "arn:partition:service:region:account-id:backup-vault:backup-vault-id",
|
||||
"Severity": "medium",
|
||||
"ResourceType": "AwsBackupBackupVault",
|
||||
"Description": "This check ensures that AWS Backup vaults exist to provide a secure and durable storage location for backup data.",
|
||||
"Risk": "Without an AWS Backup vault, an organization's critical data may be at risk of being lost in the event of an accidental deletion, system failures, or natural disasters.",
|
||||
"RelatedUrl": "",
|
||||
"Remediation": {
|
||||
"Code": {
|
||||
"CLI": "aws backup create-backup-vault --backup-vault-name <backup_vault_name>",
|
||||
"NativeIaC": "",
|
||||
"Other": "",
|
||||
"Terraform": ""
|
||||
},
|
||||
"Recommendation": {
|
||||
"Text": "Use AWS Backup to create backup vaults for your critical data and services.",
|
||||
"Url": ""
|
||||
}
|
||||
},
|
||||
"Categories": [],
|
||||
"DependsOn": [],
|
||||
"RelatedTo": [],
|
||||
"Notes": ""
|
||||
}
|
||||
@@ -0,0 +1,22 @@
|
||||
from prowler.lib.check.models import Check, Check_Report_AWS
|
||||
from prowler.providers.aws.services.backup.backup_client import backup_client
|
||||
|
||||
|
||||
class backup_vaults_exist(Check):
|
||||
def execute(self):
|
||||
findings = []
|
||||
report = Check_Report_AWS(self.metadata())
|
||||
report.status = "FAIL"
|
||||
report.status_extended = "No Backup Vault Exist"
|
||||
report.resource_arn = ""
|
||||
report.resource_id = "No Backups"
|
||||
report.region = backup_client.general_region
|
||||
if backup_client.backup_vaults:
|
||||
report.status = "PASS"
|
||||
report.status_extended = f"At least one backup vault exists: { backup_client.backup_vaults[0].name}"
|
||||
report.resource_arn = backup_client.backup_vaults[0].arn
|
||||
report.resource_id = backup_client.backup_vaults[0].name
|
||||
report.region = backup_client.backup_vaults[0].region
|
||||
|
||||
findings.append(report)
|
||||
return findings
|
||||
@@ -0,0 +1,67 @@
|
||||
from datetime import datetime
|
||||
from unittest import mock
|
||||
|
||||
from prowler.providers.aws.services.backup.backup_service import BackupPlan
|
||||
|
||||
AWS_REGION = "eu-west-1"
|
||||
|
||||
|
||||
class Test_backup_plans_exist:
|
||||
def test_no_backup_plans(self):
|
||||
backup_client = mock.MagicMock
|
||||
backup_client.general_region = AWS_REGION
|
||||
backup_client.backup_plans = []
|
||||
with mock.patch(
|
||||
"prowler.providers.aws.services.backup.backup_service.Backup",
|
||||
new=backup_client,
|
||||
):
|
||||
# Test Check
|
||||
from prowler.providers.aws.services.backup.backup_plans_exist.backup_plans_exist import (
|
||||
backup_plans_exist,
|
||||
)
|
||||
|
||||
check = backup_plans_exist()
|
||||
result = check.execute()
|
||||
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "FAIL"
|
||||
assert result[0].status_extended == "No Backup Plan Exist"
|
||||
assert result[0].resource_id == "No Backups"
|
||||
assert result[0].resource_arn == ""
|
||||
assert result[0].region == AWS_REGION
|
||||
|
||||
def test_one_backup_plan(self):
|
||||
backup_client = mock.MagicMock
|
||||
backup_client.general_region = AWS_REGION
|
||||
backup_client.backup_plans = [
|
||||
BackupPlan(
|
||||
arn="ARN",
|
||||
id="MyBackupPlan",
|
||||
region=AWS_REGION,
|
||||
name="MyBackupPlan",
|
||||
version_id="version_id",
|
||||
last_execution_date=datetime(2015, 1, 1),
|
||||
advanced_settings=[],
|
||||
)
|
||||
]
|
||||
with mock.patch(
|
||||
"prowler.providers.aws.services.backup.backup_service.Backup",
|
||||
new=backup_client,
|
||||
):
|
||||
# Test Check
|
||||
from prowler.providers.aws.services.backup.backup_plans_exist.backup_plans_exist import (
|
||||
backup_plans_exist,
|
||||
)
|
||||
|
||||
check = backup_plans_exist()
|
||||
result = check.execute()
|
||||
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "PASS"
|
||||
assert (
|
||||
result[0].status_extended
|
||||
== "At least one backup plan exists: " + result[0].resource_id
|
||||
)
|
||||
assert result[0].resource_id == "MyBackupPlan"
|
||||
assert result[0].resource_arn == "ARN"
|
||||
assert result[0].region == AWS_REGION
|
||||
@@ -0,0 +1,66 @@
|
||||
from datetime import datetime
|
||||
from unittest import mock
|
||||
|
||||
from prowler.providers.aws.services.backup.backup_service import BackupReportPlan
|
||||
|
||||
AWS_REGION = "eu-west-1"
|
||||
|
||||
|
||||
class Test_backup_reportplans_exist:
|
||||
def test_no_backup_report_plans(self):
|
||||
backup_client = mock.MagicMock
|
||||
backup_client.general_region = AWS_REGION
|
||||
backup_client.backup_report_plans = []
|
||||
with mock.patch(
|
||||
"prowler.providers.aws.services.backup.backup_service.Backup",
|
||||
new=backup_client,
|
||||
):
|
||||
# Test Check
|
||||
from prowler.providers.aws.services.backup.backup_reportplans_exist.backup_reportplans_exist import (
|
||||
backup_reportplans_exist,
|
||||
)
|
||||
|
||||
check = backup_reportplans_exist()
|
||||
result = check.execute()
|
||||
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "FAIL"
|
||||
assert result[0].status_extended == "No Backup Report Plan Exist"
|
||||
assert result[0].resource_id == "No Backups"
|
||||
assert result[0].resource_arn == ""
|
||||
assert result[0].region == AWS_REGION
|
||||
|
||||
def test_one_backup_report_plan(self):
|
||||
backup_client = mock.MagicMock
|
||||
backup_client.general_region = AWS_REGION
|
||||
backup_client.backup_report_plans = [
|
||||
BackupReportPlan(
|
||||
arn="ARN",
|
||||
region=AWS_REGION,
|
||||
name="MyBackupReportPlan",
|
||||
last_attempted_execution_date=datetime(2015, 1, 1),
|
||||
last_successful_execution_date=datetime(2015, 1, 1),
|
||||
)
|
||||
]
|
||||
|
||||
with mock.patch(
|
||||
"prowler.providers.aws.services.backup.backup_service.Backup",
|
||||
new=backup_client,
|
||||
):
|
||||
# Test Check
|
||||
from prowler.providers.aws.services.backup.backup_reportplans_exist.backup_reportplans_exist import (
|
||||
backup_reportplans_exist,
|
||||
)
|
||||
|
||||
check = backup_reportplans_exist()
|
||||
result = check.execute()
|
||||
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "PASS"
|
||||
assert (
|
||||
result[0].status_extended
|
||||
== "At least one backup report plan exists: " + result[0].resource_id
|
||||
)
|
||||
assert result[0].resource_id == "MyBackupReportPlan"
|
||||
assert result[0].resource_arn == "ARN"
|
||||
assert result[0].region == AWS_REGION
|
||||
157
tests/providers/aws/services/backup/backup_service_test.py
Normal file
157
tests/providers/aws/services/backup/backup_service_test.py
Normal file
@@ -0,0 +1,157 @@
|
||||
from datetime import datetime
|
||||
from unittest.mock import patch
|
||||
|
||||
import botocore
|
||||
from boto3 import session
|
||||
|
||||
from prowler.providers.aws.lib.audit_info.audit_info import AWS_Audit_Info
|
||||
from prowler.providers.aws.services.backup.backup_service import Backup
|
||||
|
||||
# Mock Test Region
|
||||
AWS_REGION = "eu-west-1"
|
||||
|
||||
# Mocking Backup Calls
|
||||
make_api_call = botocore.client.BaseClient._make_api_call
|
||||
|
||||
|
||||
def mock_make_api_call(self, operation_name, kwarg):
|
||||
"""
|
||||
Mock every AWS API call
|
||||
"""
|
||||
if operation_name == "ListBackupVaults":
|
||||
return {
|
||||
"BackupVaultList": [
|
||||
{
|
||||
"BackupVaultArn": "ARN",
|
||||
"BackupVaultName": "Test Vault",
|
||||
"EncryptionKeyArn": "",
|
||||
"NumberOfRecoveryPoints": 0,
|
||||
"Locked": True,
|
||||
"MinRetentionDays": 1,
|
||||
"MaxRetentionDays": 2,
|
||||
}
|
||||
]
|
||||
}
|
||||
if operation_name == "ListBackupPlans":
|
||||
return {
|
||||
"BackupPlansList": [
|
||||
{
|
||||
"BackupPlanArn": "ARN",
|
||||
"BackupPlanId": "ID",
|
||||
"BackupPlanName": "Test Plan",
|
||||
"VersionId": "test_version_id",
|
||||
"LastExecutionDate": datetime(2015, 1, 1),
|
||||
"AdvancedBackupSettings": [],
|
||||
}
|
||||
]
|
||||
}
|
||||
if operation_name == "ListReportPlans":
|
||||
return {
|
||||
"ReportPlans": [
|
||||
{
|
||||
"ReportPlanArn": "ARN",
|
||||
"ReportPlanName": "Test Report Plan",
|
||||
"LastAttemptedExecutionTime": datetime(2015, 1, 1),
|
||||
"LastSuccessfulExecutionTime": datetime(2015, 1, 1),
|
||||
}
|
||||
]
|
||||
}
|
||||
return make_api_call(self, operation_name, kwarg)
|
||||
|
||||
|
||||
def mock_generate_regional_clients(service, audit_info):
|
||||
regional_client = audit_info.audit_session.client(service, region_name=AWS_REGION)
|
||||
regional_client.region = AWS_REGION
|
||||
return {AWS_REGION: regional_client}
|
||||
|
||||
|
||||
# Patch every AWS call using Boto3 and generate_regional_clients to have 1 client
|
||||
@patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call)
|
||||
@patch(
|
||||
"prowler.providers.aws.services.backup.backup_service.generate_regional_clients",
|
||||
new=mock_generate_regional_clients,
|
||||
)
|
||||
class Test_Backup_Service:
|
||||
|
||||
# Mocked Audit Info
|
||||
def set_mocked_audit_info(self):
|
||||
audit_info = AWS_Audit_Info(
|
||||
session_config=None,
|
||||
original_session=None,
|
||||
audit_session=session.Session(
|
||||
profile_name=None,
|
||||
botocore_session=None,
|
||||
),
|
||||
audited_account=None,
|
||||
audited_user_id=None,
|
||||
audited_partition="aws",
|
||||
audited_identity_arn=None,
|
||||
profile=None,
|
||||
profile_region=None,
|
||||
credentials=None,
|
||||
assumed_role_info=None,
|
||||
audited_regions=None,
|
||||
organizations_metadata=None,
|
||||
audit_resources=None,
|
||||
)
|
||||
return audit_info
|
||||
|
||||
# Test Backup Client
|
||||
def test__get_client__(self):
|
||||
audit_info = self.set_mocked_audit_info()
|
||||
backup = Backup(audit_info)
|
||||
assert backup.regional_clients[AWS_REGION].__class__.__name__ == "Backup"
|
||||
|
||||
# Test Backup Session
|
||||
def test__get_session__(self):
|
||||
audit_info = self.set_mocked_audit_info()
|
||||
access_analyzer = Backup(audit_info)
|
||||
assert access_analyzer.session.__class__.__name__ == "Session"
|
||||
|
||||
# Test Backup Service
|
||||
def test__get_service__(self):
|
||||
audit_info = self.set_mocked_audit_info()
|
||||
access_analyzer = Backup(audit_info)
|
||||
assert access_analyzer.service == "backup"
|
||||
|
||||
# Test Backup List Backup Vaults
|
||||
def test__list_backup_vaults__(self):
|
||||
audit_info = self.set_mocked_audit_info()
|
||||
backup = Backup(audit_info)
|
||||
assert len(backup.backup_vaults) == 1
|
||||
assert backup.backup_vaults[0].arn == "ARN"
|
||||
assert backup.backup_vaults[0].name == "Test Vault"
|
||||
assert backup.backup_vaults[0].region == AWS_REGION
|
||||
assert backup.backup_vaults[0].encryption == ""
|
||||
assert backup.backup_vaults[0].recovery_points == 0
|
||||
assert backup.backup_vaults[0].locked is True
|
||||
assert backup.backup_vaults[0].min_retention_days == 1
|
||||
assert backup.backup_vaults[0].max_retention_days == 2
|
||||
|
||||
# Test Backup List Backup Plans
|
||||
def test__list_backup_plans__(self):
|
||||
audit_info = self.set_mocked_audit_info()
|
||||
backup = Backup(audit_info)
|
||||
assert len(backup.backup_plans) == 1
|
||||
assert backup.backup_plans[0].arn == "ARN"
|
||||
assert backup.backup_plans[0].id == "ID"
|
||||
assert backup.backup_plans[0].region == AWS_REGION
|
||||
assert backup.backup_plans[0].name == "Test Plan"
|
||||
assert backup.backup_plans[0].version_id == "test_version_id"
|
||||
assert backup.backup_plans[0].last_execution_date == datetime(2015, 1, 1)
|
||||
assert backup.backup_plans[0].advanced_settings == []
|
||||
|
||||
# Test Backup List Report Plans
|
||||
def test__list_backup_report_plans__(self):
|
||||
audit_info = self.set_mocked_audit_info()
|
||||
backup = Backup(audit_info)
|
||||
assert len(backup.backup_report_plans) == 1
|
||||
assert backup.backup_report_plans[0].arn == "ARN"
|
||||
assert backup.backup_report_plans[0].region == AWS_REGION
|
||||
assert backup.backup_report_plans[0].name == "Test Report Plan"
|
||||
assert backup.backup_report_plans[0].last_attempted_execution_date == datetime(
|
||||
2015, 1, 1
|
||||
)
|
||||
assert backup.backup_report_plans[0].last_successful_execution_date == datetime(
|
||||
2015, 1, 1
|
||||
)
|
||||
@@ -0,0 +1,98 @@
|
||||
from unittest import mock
|
||||
|
||||
from prowler.providers.aws.services.backup.backup_service import BackupVault
|
||||
|
||||
AWS_REGION = "eu-west-1"
|
||||
|
||||
|
||||
class Test_backup_vaults_encrypted:
|
||||
def test_no_backup_vaults(self):
|
||||
backup_client = mock.MagicMock
|
||||
backup_client.backup_vaults = []
|
||||
with mock.patch(
|
||||
"prowler.providers.aws.services.backup.backup_service.Backup",
|
||||
new=backup_client,
|
||||
):
|
||||
# Test Check
|
||||
from prowler.providers.aws.services.backup.backup_vaults_encrypted.backup_vaults_encrypted import (
|
||||
backup_vaults_encrypted,
|
||||
)
|
||||
|
||||
check = backup_vaults_encrypted()
|
||||
result = check.execute()
|
||||
|
||||
assert len(result) == 0
|
||||
|
||||
def test_one_backup_vault_unencrypted(self):
|
||||
backup_client = mock.MagicMock
|
||||
backup_client.backup_vaults = [
|
||||
BackupVault(
|
||||
arn="ARN",
|
||||
name="MyBackupVault",
|
||||
region=AWS_REGION,
|
||||
encryption="",
|
||||
recovery_points=1,
|
||||
locked=True,
|
||||
min_retention_days=1,
|
||||
max_retention_days=2,
|
||||
)
|
||||
]
|
||||
|
||||
with mock.patch(
|
||||
"prowler.providers.aws.services.backup.backup_service.Backup",
|
||||
new=backup_client,
|
||||
):
|
||||
# Test Check
|
||||
from prowler.providers.aws.services.backup.backup_vaults_encrypted.backup_vaults_encrypted import (
|
||||
backup_vaults_encrypted,
|
||||
)
|
||||
|
||||
check = backup_vaults_encrypted()
|
||||
result = check.execute()
|
||||
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "FAIL"
|
||||
assert (
|
||||
result[0].status_extended
|
||||
== "Backup Vault " + result[0].resource_id + " is not encrypted"
|
||||
)
|
||||
assert result[0].resource_id == "MyBackupVault"
|
||||
assert result[0].resource_arn == "ARN"
|
||||
assert result[0].region == AWS_REGION
|
||||
|
||||
def test_one_backup_vault_encrypted(self):
|
||||
backup_client = mock.MagicMock
|
||||
backup_client.backup_vaults = [
|
||||
BackupVault(
|
||||
arn="ARN",
|
||||
name="MyBackupVault",
|
||||
region=AWS_REGION,
|
||||
encryption="test",
|
||||
recovery_points=1,
|
||||
locked=True,
|
||||
min_retention_days=1,
|
||||
max_retention_days=2,
|
||||
)
|
||||
]
|
||||
|
||||
with mock.patch(
|
||||
"prowler.providers.aws.services.backup.backup_service.Backup",
|
||||
new=backup_client,
|
||||
):
|
||||
# Test Check
|
||||
from prowler.providers.aws.services.backup.backup_vaults_encrypted.backup_vaults_encrypted import (
|
||||
backup_vaults_encrypted,
|
||||
)
|
||||
|
||||
check = backup_vaults_encrypted()
|
||||
result = check.execute()
|
||||
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "PASS"
|
||||
assert (
|
||||
result[0].status_extended
|
||||
== "Backup Vault " + result[0].resource_id + " is encrypted"
|
||||
)
|
||||
assert result[0].resource_id == "MyBackupVault"
|
||||
assert result[0].resource_arn == "ARN"
|
||||
assert result[0].region == AWS_REGION
|
||||
@@ -0,0 +1,68 @@
|
||||
from unittest import mock
|
||||
|
||||
from prowler.providers.aws.services.backup.backup_service import BackupVault
|
||||
|
||||
AWS_REGION = "eu-west-1"
|
||||
|
||||
|
||||
class Test_backup_vaults_exist:
|
||||
def test_no_backup_vaults(self):
|
||||
backup_client = mock.MagicMock
|
||||
backup_client.general_region = AWS_REGION
|
||||
backup_client.backup_vaults = []
|
||||
with mock.patch(
|
||||
"prowler.providers.aws.services.backup.backup_service.Backup",
|
||||
new=backup_client,
|
||||
):
|
||||
# Test Check
|
||||
from prowler.providers.aws.services.backup.backup_vaults_exist.backup_vaults_exist import (
|
||||
backup_vaults_exist,
|
||||
)
|
||||
|
||||
check = backup_vaults_exist()
|
||||
result = check.execute()
|
||||
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "FAIL"
|
||||
assert result[0].status_extended == "No Backup Vault Exist"
|
||||
assert result[0].resource_id == "No Backups"
|
||||
assert result[0].resource_arn == ""
|
||||
assert result[0].region == AWS_REGION
|
||||
|
||||
def test_one_backup_vault(self):
|
||||
backup_client = mock.MagicMock
|
||||
backup_client.general_region = AWS_REGION
|
||||
backup_client.backup_vaults = [
|
||||
BackupVault(
|
||||
arn="ARN",
|
||||
name="MyBackupVault",
|
||||
region=AWS_REGION,
|
||||
encryption="",
|
||||
recovery_points=1,
|
||||
locked=True,
|
||||
min_retention_days=1,
|
||||
max_retention_days=2,
|
||||
)
|
||||
]
|
||||
|
||||
with mock.patch(
|
||||
"prowler.providers.aws.services.backup.backup_service.Backup",
|
||||
new=backup_client,
|
||||
):
|
||||
# Test Check
|
||||
from prowler.providers.aws.services.backup.backup_vaults_exist.backup_vaults_exist import (
|
||||
backup_vaults_exist,
|
||||
)
|
||||
|
||||
check = backup_vaults_exist()
|
||||
result = check.execute()
|
||||
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "PASS"
|
||||
assert (
|
||||
result[0].status_extended
|
||||
== "At least one backup vault exists: " + result[0].resource_id
|
||||
)
|
||||
assert result[0].resource_id == "MyBackupVault"
|
||||
assert result[0].resource_arn == "ARN"
|
||||
assert result[0].region == AWS_REGION
|
||||
Reference in New Issue
Block a user