fix(output bucket): solve IsADirectoryError using compliance flag (#2121)

This commit is contained in:
Sergio Garcia
2023-03-22 13:38:41 +01:00
committed by GitHub
parent ff9c4c717e
commit 6c3db9646e
3 changed files with 58 additions and 0 deletions

View File

@@ -210,6 +210,8 @@ def send_to_s3_bucket(
filename = f"{output_filename}{json_asff_file_suffix}"
elif output_mode == "html":
filename = f"{output_filename}{html_file_suffix}"
else: # Compliance output mode
filename = f"{output_filename}_{output_mode}{csv_file_suffix}"
logger.info(f"Sending outputs to S3 bucket {output_bucket}")
bucket_remote_dir = output_directory
while "prowler/" in bucket_remote_dir: # Check if it is not a custom directory

View File

@@ -512,6 +512,62 @@ class Test_Outputs:
== "binary/octet-stream"
)
@mock_s3
def test_send_to_s3_bucket_compliance(self):
# Create mock session
session = boto3.session.Session(
region_name="us-east-1",
)
# Create mock audit_info
input_audit_info = AWS_Audit_Info(
session_config=None,
original_session=None,
audit_session=session,
audited_account=AWS_ACCOUNT_ID,
audited_identity_arn="test-arn",
audited_user_id="test",
audited_partition="aws",
profile="default",
profile_region="eu-west-1",
credentials=None,
assumed_role_info=None,
audited_regions=["eu-west-2", "eu-west-1"],
organizations_metadata=None,
audit_resources=None,
)
# Creat mock bucket
bucket_name = "test_bucket"
client = boto3.client("s3")
client.create_bucket(Bucket=bucket_name)
# Create mock csv output file
fixtures_dir = "tests/lib/outputs/fixtures"
output_directory = getcwd() + "/" + fixtures_dir
output_mode = "cis_1.4_aws"
filename = f"prowler-output-{input_audit_info.audited_account}"
# Send mock csv file to mock S3 Bucket
send_to_s3_bucket(
filename,
output_directory,
output_mode,
bucket_name,
input_audit_info.audit_session,
)
# Check if the file has been sent by checking its content type
assert (
client.get_object(
Bucket=bucket_name,
Key=fixtures_dir
+ "/"
+ output_mode
+ "/"
+ filename
+ "_"
+ output_mode
+ csv_file_suffix,
)["ContentType"]
== "binary/octet-stream"
)
@mock_s3
def test_send_to_s3_bucket_custom_directory(self):
# Create mock session