diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index beb12a38..85d4df74 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -39,3 +39,9 @@ repos: rev: v0.0.5 hooks: - id: check-pipfile-lock + - repo: local + hooks: + - id: pytest-check + name: pytest-check + entry: bash -c 'pytest' + language: system diff --git a/Pipfile b/Pipfile index 86c061e3..ef2fd2cc 100644 --- a/Pipfile +++ b/Pipfile @@ -8,6 +8,7 @@ colorama = "0.4.4" boto3 = "1.24.8" arnparse = "0.0.2" botocore = "1.27.8" +pydantic = "1.9.1" [dev-packages] diff --git a/Pipfile.lock b/Pipfile.lock index 4adf964f..086252a8 100644 --- a/Pipfile.lock +++ b/Pipfile.lock @@ -1,7 +1,7 @@ { "_meta": { "hash": { - "sha256": "fd3c94872ea6726d11b810d5eb1e1d23dbf0f94663e2788d38d71d341bad747c" + "sha256": "b532ef32ebcb28be5438c1ef9c717aa6792cfd5098ad81a9ed35520a245bb8f2" }, "pipfile-spec": 6, "requires": { @@ -26,35 +26,76 @@ }, "boto3": { "hashes": [ - "sha256:0821212ff521cb934801b1f655cef3c0e976775324b1018f1751700d0f42dbb4", - "sha256:87d34861727699c795bf8d65703f2435e75f12879bdd483e08b35b7c5510e8c8" + "sha256:490f5e88f5551b33ae3019a37412158b76426d63d1fb910968ade9b6a024e5fe", + "sha256:e284705da36faa668c715ae1f74ebbff4320dbfbe3a733df3a8ab076d1ed1226" ], "index": "pypi", - "version": "==1.24.9" + "version": "==1.24.14" }, "botocore": { "hashes": [ - "sha256:5669b982b0583e73daef1fe0a4df311055e6287326f857dbb1dcc2de1d8412ad", - "sha256:7a7588b0170e571317496ac4104803329d5bc792bc008e8a757ffd440f1b6fa6" + "sha256:bb56fa77b8fa1ec367c2e16dee62d60000451aac5140dcce3ebddc167fd5c593", + "sha256:df1e9b208ff93daac7c645b0b04fb6dccd7f20262eae24d87941727025cbeece" ], "index": "pypi", - "version": "==1.27.9" + "version": "==1.27.14" }, "colorama": { "hashes": [ - "sha256:5941b2b48a20143d2267e95b1c2a7603ce057ee39fd88e7329b0c292aa16869b", - "sha256:9f47eda37229f68eee03b24b9748937c7dc3868f906e8ba69fbcbdd3bc5dc3e2" + "sha256:854bf444933e37f5824ae7bfc1e98d5bce2ebe4160d46b5edf346a89358e99da", + "sha256:e6c6b4334fc50988a639d9b98aa429a0b57da6e17b9a44f0451f930b6967b7a4" ], "index": "pypi", - "version": "==0.4.4" + "version": "==0.4.5" }, "jmespath": { "hashes": [ - "sha256:a490e280edd1f57d6de88636992d05b71e97d69a26a19f058ecf7d304474bf5e", - "sha256:e8dcd576ed616f14ec02eed0005c85973b5890083313860136657e24784e4c04" + "sha256:02e2e4cc71b5bcab88332eebf907519190dd9e6e82107fa7f83b1003a6252980", + "sha256:90261b206d6defd58fdd5e85f478bf633a2901798906be2ad389150c5c60edbe" ], "markers": "python_version >= '3.7'", - "version": "==1.0.0" + "version": "==1.0.1" + }, + "pydantic": { + "hashes": [ + "sha256:02eefd7087268b711a3ff4db528e9916ac9aa18616da7bca69c1871d0b7a091f", + "sha256:059b6c1795170809103a1538255883e1983e5b831faea6558ef873d4955b4a74", + "sha256:0bf07cab5b279859c253d26a9194a8906e6f4a210063b84b433cf90a569de0c1", + "sha256:1542636a39c4892c4f4fa6270696902acb186a9aaeac6f6cf92ce6ae2e88564b", + "sha256:177071dfc0df6248fd22b43036f936cfe2508077a72af0933d0c1fa269b18537", + "sha256:18f3e912f9ad1bdec27fb06b8198a2ccc32f201e24174cec1b3424dda605a310", + "sha256:1dd8fecbad028cd89d04a46688d2fcc14423e8a196d5b0a5c65105664901f810", + "sha256:1ed987c3ff29fff7fd8c3ea3a3ea877ad310aae2ef9889a119e22d3f2db0691a", + "sha256:447d5521575f18e18240906beadc58551e97ec98142266e521c34968c76c8761", + "sha256:494f7c8537f0c02b740c229af4cb47c0d39840b829ecdcfc93d91dcbb0779892", + "sha256:4988c0f13c42bfa9ddd2fe2f569c9d54646ce84adc5de84228cfe83396f3bd58", + "sha256:4ce9ae9e91f46c344bec3b03d6ee9612802682c1551aaf627ad24045ce090761", + "sha256:5d93d4e95eacd313d2c765ebe40d49ca9dd2ed90e5b37d0d421c597af830c195", + "sha256:61b6760b08b7c395975d893e0b814a11cf011ebb24f7d869e7118f5a339a82e1", + "sha256:72ccb318bf0c9ab97fc04c10c37683d9eea952ed526707fabf9ac5ae59b701fd", + "sha256:79b485767c13788ee314669008d01f9ef3bc05db9ea3298f6a50d3ef596a154b", + "sha256:7eb57ba90929bac0b6cc2af2373893d80ac559adda6933e562dcfb375029acee", + "sha256:8bc541a405423ce0e51c19f637050acdbdf8feca34150e0d17f675e72d119580", + "sha256:969dd06110cb780da01336b281f53e2e7eb3a482831df441fb65dd30403f4608", + "sha256:985ceb5d0a86fcaa61e45781e567a59baa0da292d5ed2e490d612d0de5796918", + "sha256:9bcf8b6e011be08fb729d110f3e22e654a50f8a826b0575c7196616780683380", + "sha256:9ce157d979f742a915b75f792dbd6aa63b8eccaf46a1005ba03aa8a986bde34a", + "sha256:9f659a5ee95c8baa2436d392267988fd0f43eb774e5eb8739252e5a7e9cf07e0", + "sha256:a4a88dcd6ff8fd47c18b3a3709a89adb39a6373f4482e04c1b765045c7e282fd", + "sha256:a955260d47f03df08acf45689bd163ed9df82c0e0124beb4251b1290fa7ae728", + "sha256:a9af62e9b5b9bc67b2a195ebc2c2662fdf498a822d62f902bf27cccb52dbbf49", + "sha256:ae72f8098acb368d877b210ebe02ba12585e77bd0db78ac04a1ee9b9f5dd2166", + "sha256:b83ba3825bc91dfa989d4eed76865e71aea3a6ca1388b59fc801ee04c4d8d0d6", + "sha256:c11951b404e08b01b151222a1cb1a9f0a860a8153ce8334149ab9199cd198131", + "sha256:c320c64dd876e45254bdd350f0179da737463eea41c43bacbee9d8c9d1021f11", + "sha256:c8098a724c2784bf03e8070993f6d46aa2eeca031f8d8a048dff277703e6e193", + "sha256:d12f96b5b64bec3f43c8e82b4aab7599d0157f11c798c9f9c528a72b9e0b339a", + "sha256:e565a785233c2d03724c4dc55464559639b1ba9ecf091288dd47ad9c629433bd", + "sha256:f0f047e11febe5c3198ed346b507e1d010330d56ad615a7e0a89fae604065a0e", + "sha256:fe4670cb32ea98ffbf5a1262f14c3e102cccd92b1869df3bb09538158ba90fe6" + ], + "index": "pypi", + "version": "==1.9.1" }, "python-dateutil": { "hashes": [ @@ -80,6 +121,14 @@ "markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3'", "version": "==1.16.0" }, + "typing-extensions": { + "hashes": [ + "sha256:6657594ee297170d19f67d55c05852a874e7eb634f4f753dbd667855e07c1708", + "sha256:f1c24655a0da0d1b67f07e17a5e6b2a105894e6824b92096378bb3668ef02376" + ], + "markers": "python_version >= '3.7'", + "version": "==4.2.0" + }, "urllib3": { "hashes": [ "sha256:44ece4d53fb1706f667c9bd1c648f5469a2ec925fcf3a776667042d645472c14", diff --git a/checklist.txt b/checklist.txt deleted file mode 100644 index 7f31c817..00000000 --- a/checklist.txt +++ /dev/null @@ -1,6 +0,0 @@ -# You can add a comma seperated list of checks like this: -check11,check12 -extra72 # You can also use newlines for each check -check13 # This way allows you to add inline comments -# Both of these can be combined if you have a standard list and want to add -# inline comments for other checks. diff --git a/lib/check/check.py b/lib/check/check.py index 7bb05271..e23128f0 100644 --- a/lib/check/check.py +++ b/lib/check/check.py @@ -1,18 +1,35 @@ import importlib -import pkgutil -from abc import ABC, abstractmethod -from dataclasses import dataclass +from pkgutil import walk_packages from types import ModuleType from typing import Any +# import time from colorama import Fore, Style from config.config import groups_file +from lib.check.models import Output_From_Options, load_check_metadata from lib.logger import logger from lib.outputs import report from lib.utils.utils import open_file, parse_json_file +# Load all checks metadata +def bulk_load_checks_metadata(provider: str) -> dict: + bulk_check_metadata = {} + checks = recover_checks_from_provider(provider) + # Build list of check's metadata files + for check_name in checks: + # Build check path name + check_path_name = check_name.replace(".", "/") + # Append metadata file extension + metadata_file = f"{check_path_name}.metadata.json" + # Load metadata + check_metadata = load_check_metadata(metadata_file) + bulk_check_metadata[check_metadata.CheckID] = check_metadata + + return bulk_check_metadata + + # Exclude checks to run def exclude_checks_to_run(checks_to_execute: set, excluded_checks: list) -> set: for check in excluded_checks: @@ -34,12 +51,13 @@ def exclude_groups_to_run( return checks_to_execute +# Exclude services to run def exclude_services_to_run( checks_to_execute: set, excluded_services: list, provider: str ) -> set: # Recover checks from the input services for service in excluded_services: - modules = recover_modules_from_provider(provider, service) + modules = recover_checks_from_provider(provider, service) if not modules: logger.error(f"Service '{service}' was not found for the AWS provider") else: @@ -98,86 +116,33 @@ def load_checks_to_execute_from_groups( return checks_to_execute -# Generate the list of checks to execute -def load_checks_to_execute( - checks_file: str, - check_list: list, - service_list: list, - group_list: list, - provider: str, -) -> set: - - checks_to_execute = set() - - # Handle if there are checks passed using -c/--checks - if check_list: - for check_name in check_list: - checks_to_execute.add(check_name) - - # Handle if there are checks passed using -C/--checks-file - elif checks_file: - try: - checks_to_execute = parse_checks_from_file(checks_file, provider) - except Exception as e: - logger.error(f"{e.__class__.__name__} -- {e}") - - # Handle if there are services passed using -s/--services - elif service_list: - # Loaded dynamically from modules within provider/services - for service in service_list: - modules = recover_modules_from_provider(provider, service) - if not modules: - logger.error(f"Service '{service}' was not found for the AWS provider") - else: - for check_module in modules: - # Recover check name and module name from import path - # Format: "providers.{provider}.services.{service}.{check_name}.{check_name}" - check_name = check_module.split(".")[-1] - # If the service is present in the group list passed as parameters - # if service_name in group_list: checks_to_execute.add(check_name) - checks_to_execute.add(check_name) - - # Handle if there are groups passed using -g/--groups - elif group_list: - try: - available_groups = parse_groups_from_file(groups_file) - checks_to_execute = load_checks_to_execute_from_groups( - available_groups, group_list, provider - ) - except Exception as e: - logger.error(f"{e.__class__.__name__} -- {e}") - - # If there are no checks passed as argument - else: - try: - # Get all check modules to run with the specific provider - modules = recover_modules_from_provider(provider) - except Exception as e: - logger.error(f"{e.__class__.__name__} -- {e}") - else: - for check_module in modules: - # Recover check name from import path (last part) - # Format: "providers.{provider}.services.{service}.{check_name}.{check_name}" - check_name = check_module.split(".")[-1] - checks_to_execute.add(check_name) - - return checks_to_execute +# Recover all checks from the selected provider and service +def recover_checks_from_provider(provider: str, service: str = None) -> list: + checks = [] + modules = list_modules(provider, service) + for module_name in modules: + # Format: "providers.{provider}.services.{service}.{check_name}.{check_name}" + check_name = module_name.name + if check_name.count(".") == 5: + checks.append(check_name) + return checks -def recover_modules_from_provider(provider: str, service: str = None) -> list: - modules = [] +# List all available modules in the selected provider and service +def list_modules(provider: str, service: str): module_path = f"providers.{provider}.services" if service: module_path += f".{service}" - - for module_name in pkgutil.walk_packages( + return walk_packages( importlib.import_module(module_path).__path__, importlib.import_module(module_path).__name__ + ".", - ): - # Format: "providers.{provider}.services.{service}.{check_name}.{check_name}" - if module_name.name.count(".") == 5: - modules.append(module_name.name) - return modules + ) + + +# Import an input check using its path +def import_check(check_path: str) -> ModuleType: + lib = importlib.import_module(f"{check_path}") + return lib def set_output_options(quiet): @@ -191,9 +156,9 @@ def set_output_options(quiet): def run_check(check): print( - f"\nCheck Name: {check.CheckName} - {Fore.MAGENTA}{check.ServiceName}{Fore.YELLOW}[{check.Severity}]{Style.RESET_ALL}" + f"\nCheck Name: {check.checkName} - {Fore.MAGENTA}{check.serviceName}{Fore.YELLOW}[{check.severity}]{Style.RESET_ALL}" ) - logger.debug(f"Executing check: {check.CheckName}") + logger.debug(f"Executing check: {check.checkName}") findings = check.execute() report(findings, output_options) @@ -201,136 +166,3 @@ def run_check(check): def import_check(check_path: str) -> ModuleType: lib = importlib.import_module(f"{check_path}") return lib - - -@dataclass -class Check_Report: - status: str - region: str - result_extended: str - - def __init__(self): - self.status = "" - self.region = "" - self.result_extended = "" - - -@dataclass -class Output_From_Options: - is_quiet: bool - - -class Check(ABC): - def __init__(self): - try: - self.metadata = self.__parse_metadata__( - self.__class__.__module__.replace(".", "/") + ".metadata.json" - ) - self.Provider = self.metadata["Provider"] - self.CheckID = self.metadata["CheckID"] - self.CheckName = self.metadata["CheckName"] - self.CheckTitle = self.metadata["CheckTitle"] - self.CheckAlias = self.metadata["CheckAlias"] - self.CheckType = self.metadata["CheckType"] - self.ServiceName = self.metadata["ServiceName"] - self.SubServiceName = self.metadata["SubServiceName"] - self.ResourceIdTemplate = self.metadata["ResourceIdTemplate"] - self.Severity = self.metadata["Severity"] - self.ResourceType = self.metadata["ResourceType"] - self.Description = self.metadata["Description"] - self.Risk = self.metadata["Risk"] - self.RelatedUrl = self.metadata["RelatedUrl"] - self.Remediation = self.metadata["Remediation"] - self.Categories = self.metadata["Categories"] - self.Tags = self.metadata["Tags"] - self.DependsOn = self.metadata["DependsOn"] - self.RelatedTo = self.metadata["RelatedTo"] - self.Notes = self.metadata["Notes"] - self.Compliance = self.metadata["Compliance"] - except: - print(f"Metadata check from file {self.__class__.__module__} not found") - - @property - def provider(self): - return self.Provider - - @property - def checkID(self): - return self.CheckID - - @property - def checkName(self): - return self.CheckName - - @property - def checkTitle(self): - return self.CheckTitle - - @property - def checkAlias(self): - return self.CheckAlias - - @property - def checkType(self): - return self.CheckType - - @property - def serviceName(self): - return self.ServiceName - - @property - def subServiceName(self): - return self.SubServiceName - - @property - def resourceIdTemplate(self): - return self.ResourceIdTemplate - - @property - def resourceType(self): - return self.ResourceType - - @property - def description(self): - return self.Description - - @property - def relatedUrl(self): - return self.RelatedUrl - - @property - def remediation(self): - return self.Remediation - - @property - def categories(self): - return self.Categories - - @property - def tags(self): - return self.Tags - - @property - def relatedTo(self): - return self.RelatedTo - - @property - def notes(self): - return self.Notes - - @property - def compliance(self): - return self.Compliance - - def __parse_metadata__(self, metadata_file): - # Opening JSON file - f = open_file(metadata_file) - # Parse JSON - check_metadata = parse_json_file(f) - return check_metadata - - # Validate metadata - - @abstractmethod - def execute(self): - pass diff --git a/lib/check/check_test.py b/lib/check/check_test.py index 856b7f42..a3f8038b 100644 --- a/lib/check/check_test.py +++ b/lib/check/check_test.py @@ -8,6 +8,7 @@ from lib.check.check import ( parse_checks_from_file, parse_groups_from_file, ) +from lib.check.models import load_check_metadata class Test_Check: @@ -50,6 +51,28 @@ class Test_Check: check_file = test["input"]["path"] assert parse_groups_from_file(check_file) == test["expected"] + def test_load_check_metadata(self): + test_cases = [ + { + "input": { + "metadata_path": f"{os.path.dirname(os.path.realpath(__file__))}/fixtures/metadata.json", + }, + "expected": { + "CheckID": "iam_disable_30_days_credentials", + "CheckTitle": "Ensure credentials unused for 30 days or greater are disabled", + "ServiceName": "iam", + "Severity": "low", + }, + } + ] + for test in test_cases: + metadata_path = test["input"]["metadata_path"] + check_metadata = load_check_metadata(metadata_path) + assert check_metadata.CheckID == test["expected"]["CheckID"] + assert check_metadata.CheckTitle == test["expected"]["CheckTitle"] + assert check_metadata.ServiceName == test["expected"]["ServiceName"] + assert check_metadata.Severity == test["expected"]["Severity"] + def test_parse_checks_from_file(self): test_cases = [ { diff --git a/lib/check/checks_loader.py b/lib/check/checks_loader.py new file mode 100644 index 00000000..80c2e342 --- /dev/null +++ b/lib/check/checks_loader.py @@ -0,0 +1,78 @@ +from config.config import groups_file +from lib.check.check import ( + parse_checks_from_file, + parse_groups_from_file, + recover_checks_from_provider, +) +from lib.logger import logger + + +# Generate the list of checks to execute +# test this function +def load_checks_to_execute( + bulk_checks_metadata: dict, + checks_file: str, + check_list: list, + service_list: list, + group_list: list, + provider: str, +) -> set: + + checks_to_execute = set() + + # Handle if there are checks passed using -c/--checks + if check_list: + for check_name in check_list: + checks_to_execute.add(check_name) + + # elif severity_list: + # using bulk_checks_metadata + # elif compliance_list: + # using bulk_checks_metadata + # Handle if there are checks passed using -C/--checks-file + elif checks_file: + try: + checks_to_execute = parse_checks_from_file(checks_file, provider) + except Exception as e: + logger.error(f"{e.__class__.__name__} -- {e}") + + # Handle if there are services passed using -s/--services + elif service_list: + # Loaded dynamically from modules within provider/services + for service in service_list: + modules = recover_checks_from_provider(provider, service) + if not modules: + logger.error(f"Service '{service}' was not found for the AWS provider") + else: + for check_module in modules: + # Recover check name and module name from import path + # Format: "providers.{provider}.services.{service}.{check_name}.{check_name}" + check_name = check_module.split(".")[-1] + # If the service is present in the group list passed as parameters + # if service_name in group_list: checks_to_execute.add(check_name) + checks_to_execute.add(check_name) + + # Handle if there are groups passed using -g/--groups + elif group_list: + try: + checks_to_execute = parse_groups_from_file( + groups_file, group_list, provider + ) + except Exception as e: + logger.error(f"{e.__class__.__name__} -- {e}") + + # If there are no checks passed as argument + else: + try: + # Get all check modules to run with the specific provider + checks = recover_checks_from_provider(provider) + except Exception as e: + logger.error(f"{e.__class__.__name__} -- {e}") + else: + for check_name in checks: + # Recover check name from import path (last part) + # Format: "providers.{provider}.services.{service}.{check_name}.{check_name}" + check_name = check_name.split(".")[-1] + checks_to_execute.add(check_name) + + return checks_to_execute diff --git a/lib/check/fixtures/metadata.json b/lib/check/fixtures/metadata.json new file mode 100644 index 00000000..5b821948 --- /dev/null +++ b/lib/check/fixtures/metadata.json @@ -0,0 +1,58 @@ +{ + "Categories": [ + "cat1", + "cat2" + ], + "CheckAlias": "extra764", + "CheckID": "iam_disable_30_days_credentials", + "CheckName": "iam_disable_30_days_credentials", + "CheckTitle": "Ensure credentials unused for 30 days or greater are disabled", + "CheckType": "Software and Configuration Checks", + "Compliance": [ + { + "Control": [ + "4.4" + ], + "Framework": "CIS-AWS", + "Group": [ + "level1", + "level2" + ], + "Version": "1.4" + } + ], + "DependsOn": [ + "othercheck1", + "othercheck2" + ], + "Description": "Ensure credentials unused for 30 days or greater are disabled", + "Notes": "additional information", + "Provider": "aws", + "RelatedTo": [ + "othercheck3", + "othercheck4" + ], + "RelatedUrl": "https://serviceofficialsiteorpageforthissubject", + "Remediation": { + "Code": { + "NativeIaC": "code or URL to the code location.", + "Terraform": "code or URL to the code location.", + "cli": "cli command or URL to the cli command location.", + "other": "cli command or URL to the cli command location." + }, + "Recommendation": { + "Text": "Run sudo yum update and cross your fingers and toes.", + "Url": "https://myfp.com/recommendations/dangerous_things_and_how_to_fix_them.html" + } + }, + "ResourceIdTemplate": "arn:partition:service:region:account-id:resource-id", + "ResourceType": "AwsIamAccessAnalyzer", + "Risk": "Risk associated.", + "ServiceName": "iam", + "Severity": "low", + "SubServiceName": "accessanalyzer", + "Tags": { + "Tag1Key": "value", + "Tag2Key": "value" + } +} diff --git a/lib/check/models.py b/lib/check/models.py new file mode 100644 index 00000000..5b661c50 --- /dev/null +++ b/lib/check/models.py @@ -0,0 +1,178 @@ +from abc import ABC, abstractmethod +from dataclasses import dataclass +from typing import List + +from pydantic import BaseModel, ValidationError + +from lib.logger import logger + + +@dataclass +class Check_Report: + status: str + region: str + result_extended: str + + def __init__(self): + self.status = "" + self.region = "" + self.result_extended = "" + + +@dataclass +class Output_From_Options: + is_quiet: bool + + +# Testing Pending +def load_check_metadata(metadata_file: str) -> dict: + try: + check_metadata = Check_Metadata_Model.parse_file(metadata_file) + except ValidationError as error: + logger.critical(f"Metadata from {metadata_file} is not valid: {error}") + quit() + else: + return check_metadata + + +# Check all values +class Check_Metadata_Model(BaseModel): + Provider: str + CheckID: str + CheckName: str + CheckTitle: str + # CheckAlias: str + CheckType: str + ServiceName: str + SubServiceName: str + ResourceIdTemplate: str + Severity: str + ResourceType: str + Description: str + Risk: str + RelatedUrl: str + Remediation: dict + Categories: List[str] + Tags: dict + DependsOn: List[str] + RelatedTo: List[str] + Notes: str + Compliance: List + + +class Check(ABC): + def __init__(self): + # Load metadata from check + check_path_name = self.__class__.__module__.replace(".", "/") + metadata_file = f"{check_path_name}.metadata.json" + self.__check_metadata__ = load_check_metadata(metadata_file) + # Assign metadata values + self.__Provider__ = self.__check_metadata__.Provider + self.__CheckID__ = self.__check_metadata__.CheckID + self.__CheckName__ = self.__check_metadata__.CheckName + self.__CheckTitle__ = self.__check_metadata__.CheckTitle + # self.__CheckAlias__ = self.__check_metadata__.CheckAlias + self.__CheckType__ = self.__check_metadata__.CheckType + self.__ServiceName__ = self.__check_metadata__.ServiceName + self.__SubServiceName__ = self.__check_metadata__.SubServiceName + self.__ResourceIdTemplate__ = self.__check_metadata__.ResourceIdTemplate + self.__Severity__ = self.__check_metadata__.Severity + self.__ResourceType__ = self.__check_metadata__.ResourceType + self.__Description__ = self.__check_metadata__.Description + self.__Risk__ = self.__check_metadata__.Risk + self.__RelatedUrl__ = self.__check_metadata__.RelatedUrl + self.__Remediation__ = self.__check_metadata__.Remediation + self.__Categories__ = self.__check_metadata__.Categories + self.__Tags__ = self.__check_metadata__.Tags + self.__DependsOn__ = self.__check_metadata__.DependsOn + self.__RelatedTo__ = self.__check_metadata__.RelatedTo + self.__Notes__ = self.__check_metadata__.Notes + self.__Compliance__ = self.__check_metadata__.Compliance + + @property + def provider(self): + return self.__Provider__ + + @property + def checkID(self): + return self.__CheckID__ + + @property + def checkName(self): + return self.__CheckName__ + + @property + def checkTitle(self): + return self.__CheckTitle__ + + # @property + # def checkAlias(self): + # return self.__CheckAlias__ + + @property + def checkType(self): + return self.__CheckType__ + + @property + def serviceName(self): + return self.__ServiceName__ + + @property + def subServiceName(self): + return self.__SubServiceName__ + + @property + def resourceIdTemplate(self): + return self.__ResourceIdTemplate__ + + @property + def severity(self): + return self.__Severity__ + + @property + def resourceType(self): + return self.__ResourceType__ + + @property + def description(self): + return self.__Description__ + + @property + def relatedUrl(self): + return self.__RelatedUrl__ + + @property + def risk(self): + return self.__Risk__ + + @property + def remediation(self): + return self.__Remediation__ + + @property + def categories(self): + return self.__Categories__ + + @property + def tags(self): + return self.__Tags__ + + @property + def dependsOn(self): + return self.__DependsOn__ + + @property + def relatedTo(self): + return self.__RelatedTo__ + + @property + def notes(self): + return self.__Notes__ + + @property + def compliance(self): + return self.__Compliance__ + + @abstractmethod + def execute(self): + pass diff --git a/providers/aws/services/ec2/ec2_ebs_snapshots_encrypted/ec2_ebs_snapshots_encrypted.py b/providers/aws/services/ec2/ec2_ebs_snapshots_encrypted/ec2_ebs_snapshots_encrypted.py index 4977d45e..345db830 100644 --- a/providers/aws/services/ec2/ec2_ebs_snapshots_encrypted/ec2_ebs_snapshots_encrypted.py +++ b/providers/aws/services/ec2/ec2_ebs_snapshots_encrypted/ec2_ebs_snapshots_encrypted.py @@ -1,4 +1,4 @@ -from lib.check.check import Check, Check_Report +from lib.check.models import Check, Check_Report from providers.aws.services.ec2.ec2_service import ec2_client diff --git a/providers/aws/services/iam/iam_disable_30_days_credentials/iam_disable_30_days_credentials.metadata.json b/providers/aws/services/iam/iam_disable_30_days_credentials/iam_disable_30_days_credentials.metadata.json index 6fc6b503..5b821948 100644 --- a/providers/aws/services/iam/iam_disable_30_days_credentials/iam_disable_30_days_credentials.metadata.json +++ b/providers/aws/services/iam/iam_disable_30_days_credentials/iam_disable_30_days_credentials.metadata.json @@ -4,9 +4,9 @@ "cat2" ], "CheckAlias": "extra764", - "CheckID": "iam-check-credentials-expiration-30-days", - "CheckName": "iam-check-credentials-expiration-30-days", - "CheckTitle": "IAM Access Analyzer Enabled", + "CheckID": "iam_disable_30_days_credentials", + "CheckName": "iam_disable_30_days_credentials", + "CheckTitle": "Ensure credentials unused for 30 days or greater are disabled", "CheckType": "Software and Configuration Checks", "Compliance": [ { @@ -25,7 +25,7 @@ "othercheck1", "othercheck2" ], - "Description": "If Security groups are not properly configured the attack surface is increased.", + "Description": "Ensure credentials unused for 30 days or greater are disabled", "Notes": "additional information", "Provider": "aws", "RelatedTo": [ diff --git a/providers/aws/services/iam/iam_disable_30_days_credentials/iam_disable_30_days_credentials.py b/providers/aws/services/iam/iam_disable_30_days_credentials/iam_disable_30_days_credentials.py index c0e3a1fb..c2c9516a 100644 --- a/providers/aws/services/iam/iam_disable_30_days_credentials/iam_disable_30_days_credentials.py +++ b/providers/aws/services/iam/iam_disable_30_days_credentials/iam_disable_30_days_credentials.py @@ -1,6 +1,6 @@ from datetime import datetime -from lib.check.check import Check, Check_Report +from lib.check.models import Check, Check_Report from providers.aws.services.iam.iam_service import iam_client maximum_expiration_days = 30 @@ -32,9 +32,7 @@ class iam_disable_30_days_credentials(Check): pass else: report.status = "PASS" - report.result_extended = ( - f"User {user['UserName']} has not a console password or is unused." - ) + report.result_extended = f"User {user['UserName']} has not a console password or is unused." report.region = "us-east-1" # Append report @@ -46,4 +44,4 @@ class iam_disable_30_days_credentials(Check): report.region = "us-east-1" findings.append(report) - return findings \ No newline at end of file + return findings diff --git a/providers/aws/services/iam/iam_disable_90_days_credentials/iam_disable_90_days_credentials.metadata.json b/providers/aws/services/iam/iam_disable_90_days_credentials/iam_disable_90_days_credentials.metadata.json index 72ad5b46..fbe39cd7 100644 --- a/providers/aws/services/iam/iam_disable_90_days_credentials/iam_disable_90_days_credentials.metadata.json +++ b/providers/aws/services/iam/iam_disable_90_days_credentials/iam_disable_90_days_credentials.metadata.json @@ -4,9 +4,9 @@ "cat2" ], "CheckAlias": "extra764", - "CheckID": "iam-check-credentials-expiration-90-days", - "CheckName": "iam-check-credentials-expiration-90-days", - "CheckTitle": "IAM Access Analyzer Enabled", + "CheckID": "iam_disable_90_days_credentials", + "CheckName": "iam_disable_90_days_credentials", + "CheckTitle": "Ensure credentials unused for 90 days or greater are disabled", "CheckType": "Software and Configuration Checks", "Compliance": [ { @@ -25,7 +25,7 @@ "othercheck1", "othercheck2" ], - "Description": "If Security groups are not properly configured the attack surface is increased.", + "Description": "Ensure credentials unused for 90 days or greater are disabled", "Notes": "additional information", "Provider": "aws", "RelatedTo": [ diff --git a/providers/aws/services/iam/iam_disable_90_days_credentials/iam_disable_90_days_credentials.py b/providers/aws/services/iam/iam_disable_90_days_credentials/iam_disable_90_days_credentials.py index cf18ed67..d0dff782 100644 --- a/providers/aws/services/iam/iam_disable_90_days_credentials/iam_disable_90_days_credentials.py +++ b/providers/aws/services/iam/iam_disable_90_days_credentials/iam_disable_90_days_credentials.py @@ -1,6 +1,6 @@ from datetime import datetime -from lib.check.check import Check, Check_Report +from lib.check.models import Check, Check_Report from providers.aws.services.iam.iam_service import iam_client maximum_expiration_days = 90 @@ -32,9 +32,7 @@ class iam_disable_90_days_credentials(Check): pass else: report.status = "PASS" - report.result_extended = ( - f"User {user['UserName']} has not a console password or is unused." - ) + report.result_extended = f"User {user['UserName']} has not a console password or is unused." report.region = "us-east-1" # Append report @@ -45,4 +43,4 @@ class iam_disable_90_days_credentials(Check): report.result_extended = "There is no IAM users" report.region = "us-east-1" - return findings \ No newline at end of file + return findings diff --git a/prowler.py b/prowler old mode 100644 new mode 100755 similarity index 83% rename from prowler.py rename to prowler index a5278653..765ddd1e --- a/prowler.py +++ b/prowler @@ -3,17 +3,20 @@ import argparse +from colorama import Fore, Style + from lib.banner import print_banner, print_version from lib.check.check import ( + bulk_load_checks_metadata, exclude_checks_to_run, exclude_groups_to_run, exclude_services_to_run, import_check, list_groups, - load_checks_to_execute, run_check, set_output_options, ) +from lib.check.checks_loader import load_checks_to_execute from lib.logger import logger, logging_levels from providers.aws.aws_provider import provider_set_session @@ -36,6 +39,8 @@ if __name__ == "__main__": "-S", "--excluded-services", nargs="+", help="Services to exclude" ) + parser.add_argument("-l", "--list-checks", action="store_true", help="List checks") + parser.add_argument( "-b", "--no-banner", action="store_false", help="Hide Prowler Banner" ) @@ -109,14 +114,7 @@ if __name__ == "__main__": if not args.role: logger.critical("To use -I/-T options -R option is needed") quit() - - if args.version: - print_version() - quit() - if args.no_banner: - print_banner() - if args.version: print_version() quit() @@ -128,22 +126,13 @@ if __name__ == "__main__": list_groups(provider) quit() - # Setting output options - set_output_options(args.quiet) - - # Set global session - provider_set_session( - args.profile, - args.role, - args.session_duration, - args.external_id, - args.filter_region - ) + # Load checks metadata + logger.debug("Loading checks metadata from .metadata.json files") + bulk_checks_metadata = bulk_load_checks_metadata(provider) # Load checks to execute - logger.debug("Loading checks") checks_to_execute = load_checks_to_execute( - checks_file, checks, services, groups, provider + bulk_checks_metadata, checks_file, checks, services, groups, provider ) # Exclude checks if -e/--excluded-checks if excluded_checks: @@ -161,6 +150,32 @@ if __name__ == "__main__": checks_to_execute, excluded_services, provider ) + # If -l/--list-checks passed as argument, print checks to execute and quit + if args.list_checks: + for check in checks_to_execute: + try: + print( + f"[{bulk_checks_metadata[check].CheckID}] {bulk_checks_metadata[check].CheckTitle} - {Fore.MAGENTA}{bulk_checks_metadata[check].ServiceName} {Fore.YELLOW}[{bulk_checks_metadata[check].Severity}]{Style.RESET_ALL}" + ) + except KeyError as error: + logger.error( + f"Check {error} was not found for the {provider.upper()} provider" + ) + quit() + + # Setting output options + set_output_options(args.quiet) + + # Set global session + provider_set_session( + args.profile, + args.role, + args.session_duration, + args.external_id, + args.filter_region, + ) + + # Execute checks if len(checks_to_execute): for check_name in checks_to_execute: