From 583194085c88354ba9a93be021612f087bfaa9b5 Mon Sep 17 00:00:00 2001 From: Pepe Fagoaga Date: Mon, 2 Oct 2023 11:29:09 +0200 Subject: [PATCH] test(utils): Include missing tests (#2884) --- prowler/lib/utils/utils.py | 22 +++--- tests/lib/utils/utils_test.py | 123 +++++++++++++++++++++++++++++++++- 2 files changed, 134 insertions(+), 11 deletions(-) diff --git a/prowler/lib/utils/utils.py b/prowler/lib/utils/utils.py index fc3a5b30..687f6b20 100644 --- a/prowler/lib/utils/utils.py +++ b/prowler/lib/utils/utils.py @@ -8,7 +8,6 @@ from io import TextIOWrapper from ipaddress import ip_address from os.path import exists from time import mktime -from typing import Any from detect_secrets import SecretsCollection from detect_secrets.settings import default_settings @@ -17,15 +16,18 @@ from prowler.lib.logger import logger def open_file(input_file: str, mode: str = "r") -> TextIOWrapper: + """open_file returns a handler to the file using the specified mode.""" try: f = open(input_file, mode) - except OSError as ose: - if ose.strerror == "Too many open files": + except OSError as os_error: + if os_error.strerror == "Too many open files": logger.critical( "Ooops! You reached your user session maximum open files. To solve this issue, increase the shell session limit by running this command `ulimit -n 4096`. For more info visit https://docs.prowler.cloud/en/latest/troubleshooting/" ) else: - logger.critical(f"{input_file}: OSError[{ose.errno}] {ose.strerror}") + logger.critical( + f"{input_file}: OSError[{os_error.errno}] {os_error.strerror}" + ) sys.exit(1) except Exception as e: logger.critical( @@ -36,8 +38,8 @@ def open_file(input_file: str, mode: str = "r") -> TextIOWrapper: return f -# Parse checks from file -def parse_json_file(input_file: TextIOWrapper) -> Any: +def parse_json_file(input_file: TextIOWrapper) -> dict: + """parse_json_file loads a JSON file and returns a dictionary with the JSON content.""" try: json_file = json.load(input_file) except Exception as e: @@ -49,21 +51,21 @@ def parse_json_file(input_file: TextIOWrapper) -> Any: return json_file -# check if file exists def file_exists(filename: str): + """file_exists returns True if the given file exists, otherwise returns False.""" try: exists_filename = exists(filename) except Exception as e: logger.critical( - f"{exists_filename.name}: {e.__class__.__name__}[{e.__traceback__.tb_lineno}]" + f"{filename}: {e.__class__.__name__}[{e.__traceback__.tb_lineno}]" ) sys.exit(1) else: return exists_filename -# create sha512 hash for string def hash_sha512(string: str) -> str: + """hash_sha512 returns the first 9 bytes of the SHA512 representation for the given string.""" return sha512(string.encode("utf-8")).hexdigest()[0:9] @@ -85,6 +87,7 @@ def detect_secrets_scan(data): def validate_ip_address(ip_string): + """validate_ip_address return True if the IP is valid, otherwise returns False.""" try: ip_address(ip_string) return True @@ -93,6 +96,7 @@ def validate_ip_address(ip_string): def outputs_unix_timestamp(is_unix_timestamp: bool, timestamp: datetime): + """outputs_unix_timestamp returns the epoch representation of the timestamp if the is_unix_timestamp is True, otherwise returns the ISO representation.""" if is_unix_timestamp: timestamp = int(mktime(timestamp.timetuple())) else: diff --git a/tests/lib/utils/utils_test.py b/tests/lib/utils/utils_test.py index 4d5dfe63..ec7946a0 100644 --- a/tests/lib/utils/utils_test.py +++ b/tests/lib/utils/utils_test.py @@ -1,14 +1,133 @@ +import os +import tempfile from datetime import datetime from time import mktime -from prowler.lib.utils.utils import outputs_unix_timestamp, validate_ip_address +import pytest +from mock import patch + +from prowler.lib.utils.utils import ( + detect_secrets_scan, + file_exists, + hash_sha512, + open_file, + outputs_unix_timestamp, + parse_json_file, + validate_ip_address, +) -class Test_utils: +class Test_utils_open_file: + def test_open_read_file(self): + temp_data_file = tempfile.NamedTemporaryFile(delete=False) + mode = "r" + f = open_file(temp_data_file.name, mode) + assert f.__class__.__name__ == "TextIOWrapper" + os.remove(temp_data_file.name) + + def test_open_raise_too_many_open_files(self): + temp_data_file = tempfile.NamedTemporaryFile(delete=False) + mode = "r" + with patch("prowler.lib.utils.utils.open") as mock_open: + mock_open.side_effect = OSError(1, "Too many open files") + with pytest.raises(SystemExit) as exception: + open_file(temp_data_file.name, mode) + assert exception.type == SystemExit + assert exception.value.code == 1 + os.remove(temp_data_file.name) + + def test_open_raise_os_error(self): + temp_data_file = tempfile.NamedTemporaryFile(delete=False) + mode = "r" + with patch("prowler.lib.utils.utils.open") as mock_open: + mock_open.side_effect = OSError(1, "Another OS error") + with pytest.raises(SystemExit) as exception: + open_file(temp_data_file.name, mode) + assert exception.type == SystemExit + assert exception.value.code == 1 + os.remove(temp_data_file.name) + + def test_open_raise_exception(self): + temp_data_file = tempfile.NamedTemporaryFile(delete=False) + mode = "r" + with patch("prowler.lib.utils.utils.open") as mock_open: + mock_open.side_effect = Exception() + with pytest.raises(SystemExit) as exception: + open_file(temp_data_file.name, mode) + assert exception.type == SystemExit + assert exception.value.code == 1 + os.remove(temp_data_file.name) + + +class Test_parse_json_file: + def test_parse_json_file_invalid(self): + temp_data_file = tempfile.NamedTemporaryFile(delete=False) + with pytest.raises(SystemExit) as exception: + parse_json_file(temp_data_file) + + assert exception.type == SystemExit + assert exception.value.code == 1 + os.remove(temp_data_file.name) + + def test_parse_json_file_valid(self): + temp_data_file = tempfile.NamedTemporaryFile(delete=False) + temp_data_file.write(b"{}") + temp_data_file.seek(0) + f = parse_json_file(temp_data_file) + assert f == {} + + +class Test_file_exists: + def test_file_exists_false(self): + assert not file_exists("not_existing.txt") + + def test_file_exists(self): + temp_data_file = tempfile.NamedTemporaryFile(delete=False) + assert file_exists(temp_data_file.name) + os.remove(temp_data_file.name) + + def test_file_exists_raised_exception(self): + temp_data_file = tempfile.NamedTemporaryFile(delete=False) + with patch("prowler.lib.utils.utils.exists") as mock_exists: + mock_exists.side_effect = Exception() + with pytest.raises(SystemExit) as exception: + file_exists(temp_data_file.name) + + assert exception.type == SystemExit + assert exception.value.code == 1 + + os.remove(temp_data_file.name) + + +class Test_utils_validate_ip_address: def test_validate_ip_address(self): assert validate_ip_address("88.26.151.198") assert not validate_ip_address("Not an IP") + +class Test_detect_secrets_scan: + def test_detect_secrets_scan(self): + data = "password=password" + secrets_detected = detect_secrets_scan(data) + assert type(secrets_detected) is list + assert len(secrets_detected) == 1 + assert "filename" in secrets_detected[0] + assert "hashed_secret" in secrets_detected[0] + assert "is_verified" in secrets_detected[0] + assert secrets_detected[0]["line_number"] == 1 + assert secrets_detected[0]["type"] == "Secret Keyword" + + def test_detect_secrets_scan_no_secrets(self): + data = "" + assert detect_secrets_scan(data) is None + + +class Test_hash_sha512: + def test_hash_sha512(self): + assert hash_sha512("test") == "ee26b0dd4" + + +class Test_outputs_unix_timestamp: def test_outputs_unix_timestamp_false(self): time = datetime.now() assert outputs_unix_timestamp(False, time) == time.isoformat()