mirror of
https://github.com/ghndrx/prowler.git
synced 2026-02-10 14:55:00 +00:00
test(utils): Include missing tests (#2884)
This commit is contained in:
@@ -8,7 +8,6 @@ from io import TextIOWrapper
|
|||||||
from ipaddress import ip_address
|
from ipaddress import ip_address
|
||||||
from os.path import exists
|
from os.path import exists
|
||||||
from time import mktime
|
from time import mktime
|
||||||
from typing import Any
|
|
||||||
|
|
||||||
from detect_secrets import SecretsCollection
|
from detect_secrets import SecretsCollection
|
||||||
from detect_secrets.settings import default_settings
|
from detect_secrets.settings import default_settings
|
||||||
@@ -17,15 +16,18 @@ from prowler.lib.logger import logger
|
|||||||
|
|
||||||
|
|
||||||
def open_file(input_file: str, mode: str = "r") -> TextIOWrapper:
|
def open_file(input_file: str, mode: str = "r") -> TextIOWrapper:
|
||||||
|
"""open_file returns a handler to the file using the specified mode."""
|
||||||
try:
|
try:
|
||||||
f = open(input_file, mode)
|
f = open(input_file, mode)
|
||||||
except OSError as ose:
|
except OSError as os_error:
|
||||||
if ose.strerror == "Too many open files":
|
if os_error.strerror == "Too many open files":
|
||||||
logger.critical(
|
logger.critical(
|
||||||
"Ooops! You reached your user session maximum open files. To solve this issue, increase the shell session limit by running this command `ulimit -n 4096`. For more info visit https://docs.prowler.cloud/en/latest/troubleshooting/"
|
"Ooops! You reached your user session maximum open files. To solve this issue, increase the shell session limit by running this command `ulimit -n 4096`. For more info visit https://docs.prowler.cloud/en/latest/troubleshooting/"
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
logger.critical(f"{input_file}: OSError[{ose.errno}] {ose.strerror}")
|
logger.critical(
|
||||||
|
f"{input_file}: OSError[{os_error.errno}] {os_error.strerror}"
|
||||||
|
)
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.critical(
|
logger.critical(
|
||||||
@@ -36,8 +38,8 @@ def open_file(input_file: str, mode: str = "r") -> TextIOWrapper:
|
|||||||
return f
|
return f
|
||||||
|
|
||||||
|
|
||||||
# Parse checks from file
|
def parse_json_file(input_file: TextIOWrapper) -> dict:
|
||||||
def parse_json_file(input_file: TextIOWrapper) -> Any:
|
"""parse_json_file loads a JSON file and returns a dictionary with the JSON content."""
|
||||||
try:
|
try:
|
||||||
json_file = json.load(input_file)
|
json_file = json.load(input_file)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
@@ -49,21 +51,21 @@ def parse_json_file(input_file: TextIOWrapper) -> Any:
|
|||||||
return json_file
|
return json_file
|
||||||
|
|
||||||
|
|
||||||
# check if file exists
|
|
||||||
def file_exists(filename: str):
|
def file_exists(filename: str):
|
||||||
|
"""file_exists returns True if the given file exists, otherwise returns False."""
|
||||||
try:
|
try:
|
||||||
exists_filename = exists(filename)
|
exists_filename = exists(filename)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.critical(
|
logger.critical(
|
||||||
f"{exists_filename.name}: {e.__class__.__name__}[{e.__traceback__.tb_lineno}]"
|
f"{filename}: {e.__class__.__name__}[{e.__traceback__.tb_lineno}]"
|
||||||
)
|
)
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
else:
|
else:
|
||||||
return exists_filename
|
return exists_filename
|
||||||
|
|
||||||
|
|
||||||
# create sha512 hash for string
|
|
||||||
def hash_sha512(string: str) -> str:
|
def hash_sha512(string: str) -> str:
|
||||||
|
"""hash_sha512 returns the first 9 bytes of the SHA512 representation for the given string."""
|
||||||
return sha512(string.encode("utf-8")).hexdigest()[0:9]
|
return sha512(string.encode("utf-8")).hexdigest()[0:9]
|
||||||
|
|
||||||
|
|
||||||
@@ -85,6 +87,7 @@ def detect_secrets_scan(data):
|
|||||||
|
|
||||||
|
|
||||||
def validate_ip_address(ip_string):
|
def validate_ip_address(ip_string):
|
||||||
|
"""validate_ip_address return True if the IP is valid, otherwise returns False."""
|
||||||
try:
|
try:
|
||||||
ip_address(ip_string)
|
ip_address(ip_string)
|
||||||
return True
|
return True
|
||||||
@@ -93,6 +96,7 @@ def validate_ip_address(ip_string):
|
|||||||
|
|
||||||
|
|
||||||
def outputs_unix_timestamp(is_unix_timestamp: bool, timestamp: datetime):
|
def outputs_unix_timestamp(is_unix_timestamp: bool, timestamp: datetime):
|
||||||
|
"""outputs_unix_timestamp returns the epoch representation of the timestamp if the is_unix_timestamp is True, otherwise returns the ISO representation."""
|
||||||
if is_unix_timestamp:
|
if is_unix_timestamp:
|
||||||
timestamp = int(mktime(timestamp.timetuple()))
|
timestamp = int(mktime(timestamp.timetuple()))
|
||||||
else:
|
else:
|
||||||
|
|||||||
@@ -1,14 +1,133 @@
|
|||||||
|
import os
|
||||||
|
import tempfile
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from time import mktime
|
from time import mktime
|
||||||
|
|
||||||
from prowler.lib.utils.utils import outputs_unix_timestamp, validate_ip_address
|
import pytest
|
||||||
|
from mock import patch
|
||||||
|
|
||||||
|
from prowler.lib.utils.utils import (
|
||||||
|
detect_secrets_scan,
|
||||||
|
file_exists,
|
||||||
|
hash_sha512,
|
||||||
|
open_file,
|
||||||
|
outputs_unix_timestamp,
|
||||||
|
parse_json_file,
|
||||||
|
validate_ip_address,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class Test_utils:
|
class Test_utils_open_file:
|
||||||
|
def test_open_read_file(self):
|
||||||
|
temp_data_file = tempfile.NamedTemporaryFile(delete=False)
|
||||||
|
mode = "r"
|
||||||
|
f = open_file(temp_data_file.name, mode)
|
||||||
|
assert f.__class__.__name__ == "TextIOWrapper"
|
||||||
|
os.remove(temp_data_file.name)
|
||||||
|
|
||||||
|
def test_open_raise_too_many_open_files(self):
|
||||||
|
temp_data_file = tempfile.NamedTemporaryFile(delete=False)
|
||||||
|
mode = "r"
|
||||||
|
with patch("prowler.lib.utils.utils.open") as mock_open:
|
||||||
|
mock_open.side_effect = OSError(1, "Too many open files")
|
||||||
|
with pytest.raises(SystemExit) as exception:
|
||||||
|
open_file(temp_data_file.name, mode)
|
||||||
|
assert exception.type == SystemExit
|
||||||
|
assert exception.value.code == 1
|
||||||
|
os.remove(temp_data_file.name)
|
||||||
|
|
||||||
|
def test_open_raise_os_error(self):
|
||||||
|
temp_data_file = tempfile.NamedTemporaryFile(delete=False)
|
||||||
|
mode = "r"
|
||||||
|
with patch("prowler.lib.utils.utils.open") as mock_open:
|
||||||
|
mock_open.side_effect = OSError(1, "Another OS error")
|
||||||
|
with pytest.raises(SystemExit) as exception:
|
||||||
|
open_file(temp_data_file.name, mode)
|
||||||
|
assert exception.type == SystemExit
|
||||||
|
assert exception.value.code == 1
|
||||||
|
os.remove(temp_data_file.name)
|
||||||
|
|
||||||
|
def test_open_raise_exception(self):
|
||||||
|
temp_data_file = tempfile.NamedTemporaryFile(delete=False)
|
||||||
|
mode = "r"
|
||||||
|
with patch("prowler.lib.utils.utils.open") as mock_open:
|
||||||
|
mock_open.side_effect = Exception()
|
||||||
|
with pytest.raises(SystemExit) as exception:
|
||||||
|
open_file(temp_data_file.name, mode)
|
||||||
|
assert exception.type == SystemExit
|
||||||
|
assert exception.value.code == 1
|
||||||
|
os.remove(temp_data_file.name)
|
||||||
|
|
||||||
|
|
||||||
|
class Test_parse_json_file:
|
||||||
|
def test_parse_json_file_invalid(self):
|
||||||
|
temp_data_file = tempfile.NamedTemporaryFile(delete=False)
|
||||||
|
with pytest.raises(SystemExit) as exception:
|
||||||
|
parse_json_file(temp_data_file)
|
||||||
|
|
||||||
|
assert exception.type == SystemExit
|
||||||
|
assert exception.value.code == 1
|
||||||
|
os.remove(temp_data_file.name)
|
||||||
|
|
||||||
|
def test_parse_json_file_valid(self):
|
||||||
|
temp_data_file = tempfile.NamedTemporaryFile(delete=False)
|
||||||
|
temp_data_file.write(b"{}")
|
||||||
|
temp_data_file.seek(0)
|
||||||
|
f = parse_json_file(temp_data_file)
|
||||||
|
assert f == {}
|
||||||
|
|
||||||
|
|
||||||
|
class Test_file_exists:
|
||||||
|
def test_file_exists_false(self):
|
||||||
|
assert not file_exists("not_existing.txt")
|
||||||
|
|
||||||
|
def test_file_exists(self):
|
||||||
|
temp_data_file = tempfile.NamedTemporaryFile(delete=False)
|
||||||
|
assert file_exists(temp_data_file.name)
|
||||||
|
os.remove(temp_data_file.name)
|
||||||
|
|
||||||
|
def test_file_exists_raised_exception(self):
|
||||||
|
temp_data_file = tempfile.NamedTemporaryFile(delete=False)
|
||||||
|
with patch("prowler.lib.utils.utils.exists") as mock_exists:
|
||||||
|
mock_exists.side_effect = Exception()
|
||||||
|
with pytest.raises(SystemExit) as exception:
|
||||||
|
file_exists(temp_data_file.name)
|
||||||
|
|
||||||
|
assert exception.type == SystemExit
|
||||||
|
assert exception.value.code == 1
|
||||||
|
|
||||||
|
os.remove(temp_data_file.name)
|
||||||
|
|
||||||
|
|
||||||
|
class Test_utils_validate_ip_address:
|
||||||
def test_validate_ip_address(self):
|
def test_validate_ip_address(self):
|
||||||
assert validate_ip_address("88.26.151.198")
|
assert validate_ip_address("88.26.151.198")
|
||||||
assert not validate_ip_address("Not an IP")
|
assert not validate_ip_address("Not an IP")
|
||||||
|
|
||||||
|
|
||||||
|
class Test_detect_secrets_scan:
|
||||||
|
def test_detect_secrets_scan(self):
|
||||||
|
data = "password=password"
|
||||||
|
secrets_detected = detect_secrets_scan(data)
|
||||||
|
assert type(secrets_detected) is list
|
||||||
|
assert len(secrets_detected) == 1
|
||||||
|
assert "filename" in secrets_detected[0]
|
||||||
|
assert "hashed_secret" in secrets_detected[0]
|
||||||
|
assert "is_verified" in secrets_detected[0]
|
||||||
|
assert secrets_detected[0]["line_number"] == 1
|
||||||
|
assert secrets_detected[0]["type"] == "Secret Keyword"
|
||||||
|
|
||||||
|
def test_detect_secrets_scan_no_secrets(self):
|
||||||
|
data = ""
|
||||||
|
assert detect_secrets_scan(data) is None
|
||||||
|
|
||||||
|
|
||||||
|
class Test_hash_sha512:
|
||||||
|
def test_hash_sha512(self):
|
||||||
|
assert hash_sha512("test") == "ee26b0dd4"
|
||||||
|
|
||||||
|
|
||||||
|
class Test_outputs_unix_timestamp:
|
||||||
def test_outputs_unix_timestamp_false(self):
|
def test_outputs_unix_timestamp_false(self):
|
||||||
time = datetime.now()
|
time = datetime.now()
|
||||||
assert outputs_unix_timestamp(False, time) == time.isoformat()
|
assert outputs_unix_timestamp(False, time) == time.isoformat()
|
||||||
|
|||||||
Reference in New Issue
Block a user