From 299ece19a8ff2c9c625f9a0ab28c51181d72fd35 Mon Sep 17 00:00:00 2001 From: Nacho Rivera Date: Wed, 8 Nov 2023 10:05:24 +0100 Subject: [PATCH] fix(clean local output dirs): clean dirs when output to s3 (#2997) --- prowler/__main__.py | 4 ++ prowler/providers/common/clean.py | 32 ++++++++++ tests/providers/common/clean_test.py | 87 ++++++++++++++++++++++++++++ 3 files changed, 123 insertions(+) create mode 100644 prowler/providers/common/clean.py create mode 100644 tests/providers/common/clean_test.py diff --git a/prowler/__main__.py b/prowler/__main__.py index 7cb07e36..e7d2569a 100644 --- a/prowler/__main__.py +++ b/prowler/__main__.py @@ -47,6 +47,7 @@ from prowler.providers.common.audit_info import ( set_provider_audit_info, set_provider_execution_parameters, ) +from prowler.providers.common.clean import clean_provider_local_output_directories from prowler.providers.common.outputs import set_provider_output_options from prowler.providers.common.quick_inventory import run_provider_quick_inventory @@ -301,6 +302,9 @@ def prowler(): if checks_folder: remove_custom_checks_module(checks_folder, provider) + # clean local directories + clean_provider_local_output_directories(args) + # If there are failed findings exit code 3, except if -z is input if not args.ignore_exit_code_3 and stats["total_fail"] > 0: sys.exit(3) diff --git a/prowler/providers/common/clean.py b/prowler/providers/common/clean.py new file mode 100644 index 00000000..d06e1e69 --- /dev/null +++ b/prowler/providers/common/clean.py @@ -0,0 +1,32 @@ +import importlib +import sys +from shutil import rmtree + +from prowler.config.config import default_output_directory +from prowler.lib.logger import logger + + +def clean_provider_local_output_directories(args): + """ + clean_provider_local_output_directories cleans deletes local custom dirs when output is sent to remote provider storage + """ + try: + # import provider cleaning function + provider_clean_function = f"clean_{args.provider}_local_output_directories" + getattr(importlib.import_module(__name__), provider_clean_function)(args) + except AttributeError as attribute_exception: + logger.info( + f"Cleaning local output directories not initialized for provider {args.provider}: {attribute_exception}" + ) + except Exception as error: + logger.critical( + f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + ) + sys.exit(1) + + +def clean_aws_local_output_directories(args): + """clean_aws_provider_local_output_directories deletes local custom dirs when output is sent to remote provider storage for aws provider""" + if args.output_bucket or args.output_bucket_no_assume: + if args.output_directory != default_output_directory: + rmtree(args.output_directory) diff --git a/tests/providers/common/clean_test.py b/tests/providers/common/clean_test.py new file mode 100644 index 00000000..7b58217a --- /dev/null +++ b/tests/providers/common/clean_test.py @@ -0,0 +1,87 @@ +import importlib +import logging +import tempfile +from argparse import Namespace +from os import path + +from mock import patch + +from prowler.providers.common.clean import clean_provider_local_output_directories + + +class Test_Common_Clean: + def set_provider_input_args(self, provider): + set_args_function = f"set_{provider}_input_args" + args = getattr( + getattr(importlib.import_module(__name__), __class__.__name__), + set_args_function, + )(self) + return args + + def set_aws_input_args(self): + args = Namespace() + args.provider = "aws" + args.output_bucket = "test-bucket" + args.output_bucket_no_assume = None + return args + + def set_azure_input_args(self): + args = Namespace() + args.provider = "azure" + return args + + def test_clean_provider_local_output_directories_non_initialized(self, caplog): + provider = "azure" + input_args = self.set_provider_input_args(provider) + caplog.set_level(logging.INFO) + clean_provider_local_output_directories(input_args) + assert ( + f"Cleaning local output directories not initialized for provider {provider}:" + in caplog.text + ) + + def test_clean_aws_local_output_directories_non_default_dir_output_bucket(self): + provider = "aws" + input_args = self.set_provider_input_args(provider) + with tempfile.TemporaryDirectory() as temp_dir: + input_args.output_directory = temp_dir + clean_provider_local_output_directories(input_args) + assert not path.exists(input_args.output_directory) + + def test_clean_aws_local_output_directories_non_default_dir_output_bucket_no_assume( + self, + ): + provider = "aws" + input_args = self.set_provider_input_args(provider) + input_args.output_bucket = None + input_args.output_bucket_no_assume = "test" + with tempfile.TemporaryDirectory() as temp_dir: + input_args.output_directory = temp_dir + clean_provider_local_output_directories(input_args) + assert not path.exists(input_args.output_directory) + + def test_clean_aws_local_output_directories_default_dir_output_bucket(self): + provider = "aws" + input_args = self.set_provider_input_args(provider) + with tempfile.TemporaryDirectory() as temp_dir: + with patch( + "prowler.providers.common.clean.default_output_directory", new=temp_dir + ): + input_args.output_directory = temp_dir + clean_provider_local_output_directories(input_args) + assert path.exists(input_args.output_directory) + + def test_clean_aws_local_output_directories_default_dir_output_bucket_no_assume( + self, + ): + provider = "aws" + input_args = self.set_provider_input_args(provider) + input_args.output_bucket_no_assume = "test" + input_args.ouput_bucket = None + with tempfile.TemporaryDirectory() as temp_dir: + with patch( + "prowler.providers.common.clean.default_output_directory", new=temp_dir + ): + input_args.output_directory = temp_dir + clean_provider_local_output_directories(input_args) + assert path.exists(input_args.output_directory)