fix(output_bucket): Use full path for -o option with output to S3 bucket (#1854)

Co-authored-by: sergargar <sergargar@users.noreply.github.com>
Co-authored-by: Pepe Fagoaga <pepe@verica.io>
This commit is contained in:
Sergio Garcia
2023-02-07 17:28:25 +01:00
committed by GitHub
parent 0298ff9478
commit 231bc0605f
3 changed files with 61 additions and 9 deletions

View File

@@ -197,7 +197,9 @@ def send_to_s3_bucket(
elif output_mode == "html": elif output_mode == "html":
filename = f"{output_filename}{html_file_suffix}" filename = f"{output_filename}{html_file_suffix}"
logger.info(f"Sending outputs to S3 bucket {output_bucket}") logger.info(f"Sending outputs to S3 bucket {output_bucket}")
bucket_remote_dir = output_directory.split("/")[-1] bucket_remote_dir = output_directory
while "prowler/" in bucket_remote_dir: # Check if it is not a custom directory
bucket_remote_dir = bucket_remote_dir.partition("prowler/")[-1]
file_name = output_directory + "/" + filename file_name = output_directory + "/" + filename
bucket_name = output_bucket bucket_name = output_bucket
object_name = bucket_remote_dir + "/" + output_mode + "/" + filename object_name = bucket_remote_dir + "/" + output_mode + "/" + filename

View File

@@ -1,7 +1,7 @@
import importlib import importlib
import sys import sys
from dataclasses import dataclass from dataclasses import dataclass
from os import mkdir from os import makedirs
from os.path import isdir from os.path import isdir
from prowler.config.config import change_config_var, output_file_timestamp from prowler.config.config import change_config_var, output_file_timestamp
@@ -52,7 +52,7 @@ class Provider_Output_Options:
if arguments.output_directory: if arguments.output_directory:
if not isdir(arguments.output_directory): if not isdir(arguments.output_directory):
if arguments.output_modes: if arguments.output_modes:
mkdir(arguments.output_directory) makedirs(arguments.output_directory)
class Azure_Output_Options(Provider_Output_Options): class Azure_Output_Options(Provider_Output_Options):

View File

@@ -1,6 +1,5 @@
import os import os
import pathlib from os import getcwd, path, remove
from os import path, remove
from unittest import mock from unittest import mock
import boto3 import boto3
@@ -336,10 +335,8 @@ class Test_Outputs:
client = boto3.client("s3") client = boto3.client("s3")
client.create_bucket(Bucket=bucket_name) client.create_bucket(Bucket=bucket_name)
# Create mock csv output file # Create mock csv output file
fixtures_dir = "fixtures" fixtures_dir = "tests/lib/outputs/fixtures"
output_directory = ( output_directory = getcwd() + "/" + fixtures_dir
f"{pathlib.Path().absolute()}/tests/lib/outputs/{fixtures_dir}"
)
output_mode = "csv" output_mode = "csv"
filename = f"prowler-output-{input_audit_info.audited_account}" filename = f"prowler-output-{input_audit_info.audited_account}"
# Send mock csv file to mock S3 Bucket # Send mock csv file to mock S3 Bucket
@@ -359,6 +356,59 @@ class Test_Outputs:
== "binary/octet-stream" == "binary/octet-stream"
) )
@mock_s3
def test_send_to_s3_bucket_custom_directory(self):
# Create mock session
session = boto3.session.Session(
region_name="us-east-1",
)
# Create mock audit_info
input_audit_info = AWS_Audit_Info(
original_session=None,
audit_session=session,
audited_account=AWS_ACCOUNT_ID,
audited_identity_arn="test-arn",
audited_user_id="test",
audited_partition="aws",
profile="default",
profile_region="eu-west-1",
credentials=None,
assumed_role_info=None,
audited_regions=["eu-west-2", "eu-west-1"],
organizations_metadata=None,
audit_resources=None,
)
# Creat mock bucket
bucket_name = "test_bucket"
client = boto3.client("s3")
client.create_bucket(Bucket=bucket_name)
# Create mock csv output file
fixtures_dir = "fixtures"
output_directory = f"tests/lib/outputs/{fixtures_dir}"
output_mode = "csv"
filename = f"prowler-output-{input_audit_info.audited_account}"
# Send mock csv file to mock S3 Bucket
send_to_s3_bucket(
filename,
output_directory,
output_mode,
bucket_name,
input_audit_info.audit_session,
)
# Check if the file has been sent by checking its content type
assert (
client.get_object(
Bucket=bucket_name,
Key=output_directory
+ "/"
+ output_mode
+ "/"
+ filename
+ csv_file_suffix,
)["ContentType"]
== "binary/octet-stream"
)
def test_extract_findings_statistics_different_resources(self): def test_extract_findings_statistics_different_resources(self):
finding_1 = mock.MagicMock() finding_1 = mock.MagicMock()
finding_1.status = "PASS" finding_1.status = "PASS"