mirror of
https://github.com/ghndrx/prowler.git
synced 2026-02-12 15:55:09 +00:00
chore(s3): Move lib to the AWS provider and include tests (#2664)
This commit is contained in:
@@ -1,5 +1,5 @@
|
||||
import os
|
||||
from os import getcwd, path, remove
|
||||
from os import path, remove
|
||||
from unittest import mock
|
||||
|
||||
import boto3
|
||||
@@ -7,7 +7,6 @@ import botocore
|
||||
import pytest
|
||||
from colorama import Fore
|
||||
from mock import patch
|
||||
from moto import mock_s3
|
||||
|
||||
from prowler.config.config import (
|
||||
csv_file_suffix,
|
||||
@@ -62,11 +61,7 @@ from prowler.lib.outputs.models import (
|
||||
unroll_list,
|
||||
unroll_tags,
|
||||
)
|
||||
from prowler.lib.outputs.outputs import (
|
||||
extract_findings_statistics,
|
||||
send_to_s3_bucket,
|
||||
set_report_color,
|
||||
)
|
||||
from prowler.lib.outputs.outputs import extract_findings_statistics, set_report_color
|
||||
from prowler.lib.utils.utils import hash_sha512, open_file
|
||||
from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info
|
||||
from prowler.providers.aws.lib.security_hub.security_hub import send_to_security_hub
|
||||
@@ -1105,189 +1100,6 @@ class Test_Outputs:
|
||||
output_options = mock.MagicMock()
|
||||
assert fill_json_ocsf(input_audit_info, finding, output_options) == expected
|
||||
|
||||
@mock_s3
|
||||
def test_send_to_s3_bucket(self):
|
||||
# Create mock session
|
||||
session = boto3.session.Session(
|
||||
region_name="us-east-1",
|
||||
)
|
||||
# Create mock audit_info
|
||||
input_audit_info = AWS_Audit_Info(
|
||||
session_config=None,
|
||||
original_session=None,
|
||||
audit_session=session,
|
||||
audited_account=AWS_ACCOUNT_ID,
|
||||
audited_account_arn=f"arn:aws:iam::{AWS_ACCOUNT_ID}:root",
|
||||
audited_identity_arn="test-arn",
|
||||
audited_user_id="test",
|
||||
audited_partition="aws",
|
||||
profile="default",
|
||||
profile_region="eu-west-1",
|
||||
credentials=None,
|
||||
assumed_role_info=None,
|
||||
audited_regions=["eu-west-2", "eu-west-1"],
|
||||
organizations_metadata=None,
|
||||
audit_resources=None,
|
||||
mfa_enabled=False,
|
||||
audit_metadata=Audit_Metadata(
|
||||
services_scanned=0,
|
||||
expected_checks=[],
|
||||
completed_checks=0,
|
||||
audit_progress=0,
|
||||
),
|
||||
)
|
||||
# Creat mock bucket
|
||||
bucket_name = "test_bucket"
|
||||
client = boto3.client("s3")
|
||||
client.create_bucket(Bucket=bucket_name)
|
||||
# Create mock csv output file
|
||||
fixtures_dir = "tests/lib/outputs/fixtures"
|
||||
output_directory = getcwd() + "/" + fixtures_dir
|
||||
output_mode = "csv"
|
||||
filename = f"prowler-output-{input_audit_info.audited_account}"
|
||||
# Send mock csv file to mock S3 Bucket
|
||||
send_to_s3_bucket(
|
||||
filename,
|
||||
output_directory,
|
||||
output_mode,
|
||||
bucket_name,
|
||||
input_audit_info.audit_session,
|
||||
)
|
||||
# Check if the file has been sent by checking its content type
|
||||
assert (
|
||||
client.get_object(
|
||||
Bucket=bucket_name,
|
||||
Key=fixtures_dir + "/" + output_mode + "/" + filename + csv_file_suffix,
|
||||
)["ContentType"]
|
||||
== "binary/octet-stream"
|
||||
)
|
||||
|
||||
@mock_s3
|
||||
def test_send_to_s3_bucket_compliance(self):
|
||||
# Create mock session
|
||||
session = boto3.session.Session(
|
||||
region_name="us-east-1",
|
||||
)
|
||||
# Create mock audit_info
|
||||
input_audit_info = AWS_Audit_Info(
|
||||
session_config=None,
|
||||
original_session=None,
|
||||
audit_session=session,
|
||||
audited_account=AWS_ACCOUNT_ID,
|
||||
audited_account_arn=f"arn:aws:iam::{AWS_ACCOUNT_ID}:root",
|
||||
audited_identity_arn="test-arn",
|
||||
audited_user_id="test",
|
||||
audited_partition="aws",
|
||||
profile="default",
|
||||
profile_region="eu-west-1",
|
||||
credentials=None,
|
||||
assumed_role_info=None,
|
||||
audited_regions=["eu-west-2", "eu-west-1"],
|
||||
organizations_metadata=None,
|
||||
audit_resources=None,
|
||||
mfa_enabled=False,
|
||||
audit_metadata=Audit_Metadata(
|
||||
services_scanned=0,
|
||||
expected_checks=[],
|
||||
completed_checks=0,
|
||||
audit_progress=0,
|
||||
),
|
||||
)
|
||||
# Creat mock bucket
|
||||
bucket_name = "test_bucket"
|
||||
client = boto3.client("s3")
|
||||
client.create_bucket(Bucket=bucket_name)
|
||||
# Create mock csv output file
|
||||
fixtures_dir = "tests/lib/outputs/fixtures"
|
||||
output_directory = getcwd() + "/" + fixtures_dir
|
||||
output_mode = "cis_1.4_aws"
|
||||
filename = f"prowler-output-{input_audit_info.audited_account}"
|
||||
# Send mock csv file to mock S3 Bucket
|
||||
send_to_s3_bucket(
|
||||
filename,
|
||||
output_directory,
|
||||
output_mode,
|
||||
bucket_name,
|
||||
input_audit_info.audit_session,
|
||||
)
|
||||
# Check if the file has been sent by checking its content type
|
||||
assert (
|
||||
client.get_object(
|
||||
Bucket=bucket_name,
|
||||
Key=fixtures_dir
|
||||
+ "/"
|
||||
+ output_mode
|
||||
+ "/"
|
||||
+ filename
|
||||
+ "_"
|
||||
+ output_mode
|
||||
+ csv_file_suffix,
|
||||
)["ContentType"]
|
||||
== "binary/octet-stream"
|
||||
)
|
||||
|
||||
@mock_s3
|
||||
def test_send_to_s3_bucket_custom_directory(self):
|
||||
# Create mock session
|
||||
session = boto3.session.Session(
|
||||
region_name="us-east-1",
|
||||
)
|
||||
# Create mock audit_info
|
||||
input_audit_info = AWS_Audit_Info(
|
||||
session_config=None,
|
||||
original_session=None,
|
||||
audit_session=session,
|
||||
audited_account=AWS_ACCOUNT_ID,
|
||||
audited_account_arn=f"arn:aws:iam::{AWS_ACCOUNT_ID}:root",
|
||||
audited_identity_arn="test-arn",
|
||||
audited_user_id="test",
|
||||
audited_partition="aws",
|
||||
profile="default",
|
||||
profile_region="eu-west-1",
|
||||
credentials=None,
|
||||
assumed_role_info=None,
|
||||
audited_regions=["eu-west-2", "eu-west-1"],
|
||||
organizations_metadata=None,
|
||||
audit_resources=None,
|
||||
mfa_enabled=False,
|
||||
audit_metadata=Audit_Metadata(
|
||||
services_scanned=0,
|
||||
expected_checks=[],
|
||||
completed_checks=0,
|
||||
audit_progress=0,
|
||||
),
|
||||
)
|
||||
# Creat mock bucket
|
||||
bucket_name = "test_bucket"
|
||||
client = boto3.client("s3")
|
||||
client.create_bucket(Bucket=bucket_name)
|
||||
# Create mock csv output file
|
||||
fixtures_dir = "fixtures"
|
||||
output_directory = f"tests/lib/outputs/{fixtures_dir}"
|
||||
output_mode = "csv"
|
||||
filename = f"prowler-output-{input_audit_info.audited_account}"
|
||||
# Send mock csv file to mock S3 Bucket
|
||||
send_to_s3_bucket(
|
||||
filename,
|
||||
output_directory,
|
||||
output_mode,
|
||||
bucket_name,
|
||||
input_audit_info.audit_session,
|
||||
)
|
||||
# Check if the file has been sent by checking its content type
|
||||
assert (
|
||||
client.get_object(
|
||||
Bucket=bucket_name,
|
||||
Key=output_directory
|
||||
+ "/"
|
||||
+ output_mode
|
||||
+ "/"
|
||||
+ filename
|
||||
+ csv_file_suffix,
|
||||
)["ContentType"]
|
||||
== "binary/octet-stream"
|
||||
)
|
||||
|
||||
def test_extract_findings_statistics_different_resources(self):
|
||||
finding_1 = mock.MagicMock()
|
||||
finding_1.status = "PASS"
|
||||
|
||||
109
tests/providers/aws/lib/s3/s3_test.py
Normal file
109
tests/providers/aws/lib/s3/s3_test.py
Normal file
@@ -0,0 +1,109 @@
|
||||
from os import path
|
||||
from pathlib import Path
|
||||
|
||||
import boto3
|
||||
from mock import MagicMock
|
||||
from moto import mock_s3
|
||||
|
||||
from prowler.config.config import csv_file_suffix
|
||||
from prowler.providers.aws.lib.s3.s3 import get_s3_object_path, send_to_s3_bucket
|
||||
|
||||
AWS_ACCOUNT_ID = "123456789012"
|
||||
AWS_REGION = "us-east-1"
|
||||
|
||||
ACTUAL_DIRECTORY = Path(path.dirname(path.realpath(__file__)))
|
||||
FIXTURES_DIR_NAME = "fixtures"
|
||||
|
||||
S3_BUCKET_NAME = "test_bucket"
|
||||
|
||||
OUTPUT_MODE_CSV = "csv"
|
||||
OUTPUT_MODE_CIS_1_4_AWS = "cis_1.4_aws"
|
||||
|
||||
|
||||
class TestS3:
|
||||
@mock_s3
|
||||
def test_send_to_s3_bucket(self):
|
||||
# Mock Audit Info
|
||||
audit_info = MagicMock()
|
||||
|
||||
# Create mock session
|
||||
audit_info.audit_session = boto3.session.Session(region_name=AWS_REGION)
|
||||
audit_info.audited_account = AWS_ACCOUNT_ID
|
||||
|
||||
# Create mock bucket
|
||||
client = audit_info.audit_session.client("s3")
|
||||
client.create_bucket(Bucket=S3_BUCKET_NAME)
|
||||
|
||||
# Mocked CSV output file
|
||||
output_directory = f"{ACTUAL_DIRECTORY}/{FIXTURES_DIR_NAME}"
|
||||
filename = f"prowler-output-{audit_info.audited_account}"
|
||||
|
||||
# Send mock CSV file to mock S3 Bucket
|
||||
send_to_s3_bucket(
|
||||
filename,
|
||||
output_directory,
|
||||
OUTPUT_MODE_CSV,
|
||||
S3_BUCKET_NAME,
|
||||
audit_info.audit_session,
|
||||
)
|
||||
|
||||
bucket_directory = get_s3_object_path(output_directory)
|
||||
object_name = (
|
||||
f"{bucket_directory}/{OUTPUT_MODE_CSV}/{filename}{csv_file_suffix}"
|
||||
)
|
||||
|
||||
assert (
|
||||
client.get_object(
|
||||
Bucket=S3_BUCKET_NAME,
|
||||
Key=object_name,
|
||||
)["ContentType"]
|
||||
== "binary/octet-stream"
|
||||
)
|
||||
|
||||
@mock_s3
|
||||
def test_send_to_s3_bucket_compliance(self):
|
||||
# Mock Audit Info
|
||||
audit_info = MagicMock()
|
||||
|
||||
# Create mock session
|
||||
audit_info.audit_session = boto3.session.Session(region_name=AWS_REGION)
|
||||
audit_info.audited_account = AWS_ACCOUNT_ID
|
||||
|
||||
# Create mock bucket
|
||||
client = audit_info.audit_session.client("s3")
|
||||
client.create_bucket(Bucket=S3_BUCKET_NAME)
|
||||
|
||||
# Mocked CSV output file
|
||||
output_directory = f"{ACTUAL_DIRECTORY}/{FIXTURES_DIR_NAME}"
|
||||
filename = f"prowler-output-{audit_info.audited_account}"
|
||||
|
||||
# Send mock CSV file to mock S3 Bucket
|
||||
send_to_s3_bucket(
|
||||
filename,
|
||||
output_directory,
|
||||
OUTPUT_MODE_CIS_1_4_AWS,
|
||||
S3_BUCKET_NAME,
|
||||
audit_info.audit_session,
|
||||
)
|
||||
|
||||
bucket_directory = get_s3_object_path(output_directory)
|
||||
object_name = f"{bucket_directory}/{OUTPUT_MODE_CIS_1_4_AWS}/{filename}_{OUTPUT_MODE_CIS_1_4_AWS}{csv_file_suffix}"
|
||||
|
||||
assert (
|
||||
client.get_object(
|
||||
Bucket=S3_BUCKET_NAME,
|
||||
Key=object_name,
|
||||
)["ContentType"]
|
||||
== "binary/octet-stream"
|
||||
)
|
||||
|
||||
def test_get_s3_object_path_with_prowler(self):
|
||||
output_directory = "/Users/admin/prowler/"
|
||||
assert (
|
||||
get_s3_object_path(output_directory)
|
||||
== output_directory.partition("prowler/")[-1]
|
||||
)
|
||||
|
||||
def test_get_s3_object_path_without_prowler(self):
|
||||
output_directory = "/Users/admin/"
|
||||
assert get_s3_object_path(output_directory) == output_directory
|
||||
Reference in New Issue
Block a user