mirror of
https://github.com/ghndrx/prowler.git
synced 2026-02-10 14:55:00 +00:00
feat(categories): Remove old groups and use categories from metadata (#1523)
This commit is contained in:
@@ -4,12 +4,11 @@ import os
|
||||
import sys
|
||||
from pkgutil import walk_packages
|
||||
from types import ModuleType
|
||||
from typing import Any
|
||||
|
||||
from alive_progress import alive_bar
|
||||
from colorama import Fore, Style
|
||||
|
||||
from config.config import compliance_specification_dir, groups_file, orange_color
|
||||
from config.config import compliance_specification_dir, orange_color
|
||||
from lib.check.compliance_models import load_compliance_framework
|
||||
from lib.check.models import Check, Output_From_Options, load_check_metadata
|
||||
from lib.logger import logger
|
||||
@@ -65,20 +64,6 @@ def exclude_checks_to_run(checks_to_execute: set, excluded_checks: list) -> set:
|
||||
return checks_to_execute
|
||||
|
||||
|
||||
# Exclude groups to run
|
||||
def exclude_groups_to_run(
|
||||
checks_to_execute: set, excluded_groups: list, provider: str
|
||||
) -> set:
|
||||
# Recover checks from the input groups
|
||||
available_groups = parse_groups_from_file(groups_file)
|
||||
checks_from_groups = load_checks_to_execute_from_groups(
|
||||
available_groups, excluded_groups, provider
|
||||
)
|
||||
for check_name in checks_from_groups:
|
||||
checks_to_execute.discard(check_name)
|
||||
return checks_to_execute
|
||||
|
||||
|
||||
# Exclude services to run
|
||||
def exclude_services_to_run(
|
||||
checks_to_execute: set, excluded_services: list, provider: str
|
||||
@@ -110,7 +95,7 @@ def parse_checks_from_file(input_file: str, provider: str) -> set:
|
||||
return checks_to_execute
|
||||
|
||||
|
||||
def list_services(provider: str) -> set:
|
||||
def list_services(provider: str) -> set():
|
||||
available_services = set()
|
||||
checks = recover_checks_from_provider(provider)
|
||||
for check_name in checks:
|
||||
@@ -120,6 +105,22 @@ def list_services(provider: str) -> set:
|
||||
return sorted(available_services)
|
||||
|
||||
|
||||
def list_categories(provider: str, bulk_checks_metadata: dict) -> set():
|
||||
available_categories = set()
|
||||
for check in bulk_checks_metadata.values():
|
||||
for cat in check.Categories:
|
||||
available_categories.add(cat)
|
||||
return available_categories
|
||||
|
||||
|
||||
def print_categories(categories: set):
|
||||
print(
|
||||
f"There are {Fore.YELLOW}{len(categories)}{Style.RESET_ALL} available categories: \n"
|
||||
)
|
||||
for category in categories:
|
||||
print(f"- {category}")
|
||||
|
||||
|
||||
def print_services(service_list: set):
|
||||
print(
|
||||
f"There are {Fore.YELLOW}{len(service_list)}{Style.RESET_ALL} available services: \n"
|
||||
@@ -181,40 +182,6 @@ def print_checks(
|
||||
)
|
||||
|
||||
|
||||
# List available groups
|
||||
def list_groups(provider: str):
|
||||
groups = parse_groups_from_file(groups_file)
|
||||
print("Available Groups:")
|
||||
|
||||
for group, value in groups[provider].items():
|
||||
group_description = value["description"]
|
||||
print(f"\t - {group_description} -- [{group}] ")
|
||||
|
||||
|
||||
# Parse groups from groups.json
|
||||
def parse_groups_from_file(group_file: str) -> Any:
|
||||
f = open_file(group_file)
|
||||
available_groups = parse_json_file(f)
|
||||
return available_groups
|
||||
|
||||
|
||||
# Parse checks from groups to execute
|
||||
def load_checks_to_execute_from_groups(
|
||||
available_groups: Any, group_list: list, provider: str
|
||||
) -> set:
|
||||
checks_to_execute = set()
|
||||
|
||||
for group in group_list:
|
||||
if group in available_groups[provider]:
|
||||
for check_name in available_groups[provider][group]["checks"]:
|
||||
checks_to_execute.add(check_name)
|
||||
else:
|
||||
logger.error(
|
||||
f"Group '{group}' was not found for the {provider.upper()} provider"
|
||||
)
|
||||
return checks_to_execute
|
||||
|
||||
|
||||
# Parse checks from compliance frameworks specification
|
||||
def parse_checks_from_compliance_framework(
|
||||
compliance_frameworks: list, bulk_compliance_frameworks: dict
|
||||
|
||||
@@ -4,45 +4,14 @@ from unittest import mock
|
||||
from lib.check.check import (
|
||||
bulk_load_compliance_frameworks,
|
||||
exclude_checks_to_run,
|
||||
exclude_groups_to_run,
|
||||
exclude_services_to_run,
|
||||
load_checks_to_execute_from_groups,
|
||||
parse_checks_from_compliance_framework,
|
||||
parse_checks_from_file,
|
||||
parse_groups_from_file,
|
||||
)
|
||||
from lib.check.models import load_check_metadata
|
||||
|
||||
|
||||
class Test_Check:
|
||||
def test_parse_groups_from_file(self):
|
||||
test_cases = [
|
||||
{
|
||||
"input": {
|
||||
"path": f"{os.path.dirname(os.path.realpath(__file__))}/fixtures/groupsA.json",
|
||||
"provider": "aws",
|
||||
},
|
||||
"expected": {
|
||||
"aws": {
|
||||
"gdpr": {
|
||||
"description": "GDPR Readiness",
|
||||
"checks": ["check11", "check12"],
|
||||
},
|
||||
"iam": {
|
||||
"description": "Identity and Access Management",
|
||||
"checks": [
|
||||
"iam_disable_30_days_credentials",
|
||||
"iam_disable_90_days_credentials",
|
||||
],
|
||||
},
|
||||
}
|
||||
},
|
||||
}
|
||||
]
|
||||
for test in test_cases:
|
||||
check_file = test["input"]["path"]
|
||||
assert parse_groups_from_file(check_file) == test["expected"]
|
||||
|
||||
def test_load_check_metadata(self):
|
||||
test_cases = [
|
||||
{
|
||||
@@ -80,42 +49,6 @@ class Test_Check:
|
||||
provider = test["input"]["provider"]
|
||||
assert parse_checks_from_file(check_file, provider) == test["expected"]
|
||||
|
||||
def test_load_checks_to_execute_from_groups(self):
|
||||
test_cases = [
|
||||
{
|
||||
"input": {
|
||||
"groups_json": {
|
||||
"aws": {
|
||||
"gdpr": {
|
||||
"description": "GDPR Readiness",
|
||||
"checks": ["check11", "check12"],
|
||||
},
|
||||
"iam": {
|
||||
"description": "Identity and Access Management",
|
||||
"checks": [
|
||||
"iam_disable_30_days_credentials",
|
||||
"iam_disable_90_days_credentials",
|
||||
],
|
||||
},
|
||||
}
|
||||
},
|
||||
"provider": "aws",
|
||||
"groups": ["gdpr"],
|
||||
},
|
||||
"expected": {"check11", "check12"},
|
||||
}
|
||||
]
|
||||
|
||||
for test in test_cases:
|
||||
provider = test["input"]["provider"]
|
||||
groups = test["input"]["groups"]
|
||||
group_file = test["input"]["groups_json"]
|
||||
|
||||
assert (
|
||||
load_checks_to_execute_from_groups(group_file, groups, provider)
|
||||
== test["expected"]
|
||||
)
|
||||
|
||||
def test_exclude_checks_to_run(self):
|
||||
test_cases = [
|
||||
{
|
||||
@@ -140,44 +73,6 @@ class Test_Check:
|
||||
exclude_checks_to_run(check_list, excluded_checks) == test["expected"]
|
||||
)
|
||||
|
||||
def test_exclude_groups_to_run(self):
|
||||
test_cases = [
|
||||
{
|
||||
"input": {
|
||||
"excluded_group_list": {"gdpr"},
|
||||
"provider": "aws",
|
||||
"checks_to_run": {
|
||||
"iam_disable_30_days_credentials",
|
||||
"iam_disable_90_days_credentials",
|
||||
},
|
||||
},
|
||||
"expected": {
|
||||
"iam_disable_30_days_credentials",
|
||||
},
|
||||
},
|
||||
{
|
||||
"input": {
|
||||
"excluded_group_list": {"pci"},
|
||||
"provider": "aws",
|
||||
"checks_to_run": {
|
||||
"iam_disable_30_days_credentials",
|
||||
"iam_disable_90_days_credentials",
|
||||
},
|
||||
},
|
||||
"expected": {
|
||||
"iam_disable_30_days_credentials",
|
||||
},
|
||||
},
|
||||
]
|
||||
for test in test_cases:
|
||||
excluded_group_list = test["input"]["excluded_group_list"]
|
||||
checks_to_run = test["input"]["checks_to_run"]
|
||||
provider = test["input"]["provider"]
|
||||
assert (
|
||||
exclude_groups_to_run(checks_to_run, excluded_group_list, provider)
|
||||
== test["expected"]
|
||||
)
|
||||
|
||||
def test_exclude_services_to_run(self):
|
||||
test_cases = [
|
||||
{
|
||||
|
||||
@@ -1,9 +1,6 @@
|
||||
from config.config import groups_file
|
||||
from lib.check.check import ( # load_checks_to_execute_from_compliance_framework,
|
||||
load_checks_to_execute_from_groups,
|
||||
from lib.check.check import (
|
||||
parse_checks_from_compliance_framework,
|
||||
parse_checks_from_file,
|
||||
parse_groups_from_file,
|
||||
recover_checks_from_provider,
|
||||
)
|
||||
from lib.logger import logger
|
||||
@@ -17,9 +14,9 @@ def load_checks_to_execute(
|
||||
checks_file: str,
|
||||
check_list: list,
|
||||
service_list: list,
|
||||
group_list: list,
|
||||
severities: list,
|
||||
compliance_frameworks: list,
|
||||
categories: set,
|
||||
provider: str,
|
||||
) -> set:
|
||||
"""Generate the list of checks to execute based on the cloud provider and input arguments specified"""
|
||||
@@ -60,16 +57,6 @@ def load_checks_to_execute(
|
||||
# if service_name in group_list: checks_to_execute.add(check_name)
|
||||
checks_to_execute.add(check_name)
|
||||
|
||||
# Handle if there are groups passed using -g/--groups
|
||||
elif group_list:
|
||||
try:
|
||||
available_groups = parse_groups_from_file(groups_file)
|
||||
checks_to_execute = load_checks_to_execute_from_groups(
|
||||
available_groups, group_list, provider
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"{e.__class__.__name__}[{e.__traceback__.tb_lineno}] -- {e}")
|
||||
|
||||
# Handle if there are compliance frameworks passed using --compliance
|
||||
elif compliance_frameworks:
|
||||
try:
|
||||
@@ -79,6 +66,14 @@ def load_checks_to_execute(
|
||||
except Exception as e:
|
||||
logger.error(f"{e.__class__.__name__}[{e.__traceback__.tb_lineno}] -- {e}")
|
||||
|
||||
# Handle if there are categories passed using --categories
|
||||
elif categories:
|
||||
for cat in categories:
|
||||
for check in bulk_checks_metadata:
|
||||
# Check check's categories
|
||||
if cat in bulk_checks_metadata[check].Categories:
|
||||
checks_to_execute.add(check)
|
||||
|
||||
# If there are no checks passed as argument
|
||||
else:
|
||||
try:
|
||||
|
||||
Reference in New Issue
Block a user