feat(validate-metadata): Validate Check's metadata and list checks (#1215)

This commit is contained in:
Pepe Fagoaga
2022-06-22 10:12:55 +02:00
committed by GitHub
parent b07b7f3f26
commit 6ac6ef359f
15 changed files with 501 additions and 271 deletions

View File

@@ -1,18 +1,35 @@
import importlib
import pkgutil
from abc import ABC, abstractmethod
from dataclasses import dataclass
from pkgutil import walk_packages
from types import ModuleType
from typing import Any
# import time
from colorama import Fore, Style
from config.config import groups_file
from lib.check.models import Output_From_Options, load_check_metadata
from lib.logger import logger
from lib.outputs import report
from lib.utils.utils import open_file, parse_json_file
# Load all checks metadata
def bulk_load_checks_metadata(provider: str) -> dict:
bulk_check_metadata = {}
checks = recover_checks_from_provider(provider)
# Build list of check's metadata files
for check_name in checks:
# Build check path name
check_path_name = check_name.replace(".", "/")
# Append metadata file extension
metadata_file = f"{check_path_name}.metadata.json"
# Load metadata
check_metadata = load_check_metadata(metadata_file)
bulk_check_metadata[check_metadata.CheckID] = check_metadata
return bulk_check_metadata
# Exclude checks to run
def exclude_checks_to_run(checks_to_execute: set, excluded_checks: list) -> set:
for check in excluded_checks:
@@ -34,12 +51,13 @@ def exclude_groups_to_run(
return checks_to_execute
# Exclude services to run
def exclude_services_to_run(
checks_to_execute: set, excluded_services: list, provider: str
) -> set:
# Recover checks from the input services
for service in excluded_services:
modules = recover_modules_from_provider(provider, service)
modules = recover_checks_from_provider(provider, service)
if not modules:
logger.error(f"Service '{service}' was not found for the AWS provider")
else:
@@ -98,86 +116,33 @@ def load_checks_to_execute_from_groups(
return checks_to_execute
# Generate the list of checks to execute
def load_checks_to_execute(
checks_file: str,
check_list: list,
service_list: list,
group_list: list,
provider: str,
) -> set:
checks_to_execute = set()
# Handle if there are checks passed using -c/--checks
if check_list:
for check_name in check_list:
checks_to_execute.add(check_name)
# Handle if there are checks passed using -C/--checks-file
elif checks_file:
try:
checks_to_execute = parse_checks_from_file(checks_file, provider)
except Exception as e:
logger.error(f"{e.__class__.__name__} -- {e}")
# Handle if there are services passed using -s/--services
elif service_list:
# Loaded dynamically from modules within provider/services
for service in service_list:
modules = recover_modules_from_provider(provider, service)
if not modules:
logger.error(f"Service '{service}' was not found for the AWS provider")
else:
for check_module in modules:
# Recover check name and module name from import path
# Format: "providers.{provider}.services.{service}.{check_name}.{check_name}"
check_name = check_module.split(".")[-1]
# If the service is present in the group list passed as parameters
# if service_name in group_list: checks_to_execute.add(check_name)
checks_to_execute.add(check_name)
# Handle if there are groups passed using -g/--groups
elif group_list:
try:
available_groups = parse_groups_from_file(groups_file)
checks_to_execute = load_checks_to_execute_from_groups(
available_groups, group_list, provider
)
except Exception as e:
logger.error(f"{e.__class__.__name__} -- {e}")
# If there are no checks passed as argument
else:
try:
# Get all check modules to run with the specific provider
modules = recover_modules_from_provider(provider)
except Exception as e:
logger.error(f"{e.__class__.__name__} -- {e}")
else:
for check_module in modules:
# Recover check name from import path (last part)
# Format: "providers.{provider}.services.{service}.{check_name}.{check_name}"
check_name = check_module.split(".")[-1]
checks_to_execute.add(check_name)
return checks_to_execute
# Recover all checks from the selected provider and service
def recover_checks_from_provider(provider: str, service: str = None) -> list:
checks = []
modules = list_modules(provider, service)
for module_name in modules:
# Format: "providers.{provider}.services.{service}.{check_name}.{check_name}"
check_name = module_name.name
if check_name.count(".") == 5:
checks.append(check_name)
return checks
def recover_modules_from_provider(provider: str, service: str = None) -> list:
modules = []
# List all available modules in the selected provider and service
def list_modules(provider: str, service: str):
module_path = f"providers.{provider}.services"
if service:
module_path += f".{service}"
for module_name in pkgutil.walk_packages(
return walk_packages(
importlib.import_module(module_path).__path__,
importlib.import_module(module_path).__name__ + ".",
):
# Format: "providers.{provider}.services.{service}.{check_name}.{check_name}"
if module_name.name.count(".") == 5:
modules.append(module_name.name)
return modules
)
# Import an input check using its path
def import_check(check_path: str) -> ModuleType:
lib = importlib.import_module(f"{check_path}")
return lib
def set_output_options(quiet):
@@ -191,9 +156,9 @@ def set_output_options(quiet):
def run_check(check):
print(
f"\nCheck Name: {check.CheckName} - {Fore.MAGENTA}{check.ServiceName}{Fore.YELLOW}[{check.Severity}]{Style.RESET_ALL}"
f"\nCheck Name: {check.checkName} - {Fore.MAGENTA}{check.serviceName}{Fore.YELLOW}[{check.severity}]{Style.RESET_ALL}"
)
logger.debug(f"Executing check: {check.CheckName}")
logger.debug(f"Executing check: {check.checkName}")
findings = check.execute()
report(findings, output_options)
@@ -201,136 +166,3 @@ def run_check(check):
def import_check(check_path: str) -> ModuleType:
lib = importlib.import_module(f"{check_path}")
return lib
@dataclass
class Check_Report:
status: str
region: str
result_extended: str
def __init__(self):
self.status = ""
self.region = ""
self.result_extended = ""
@dataclass
class Output_From_Options:
is_quiet: bool
class Check(ABC):
def __init__(self):
try:
self.metadata = self.__parse_metadata__(
self.__class__.__module__.replace(".", "/") + ".metadata.json"
)
self.Provider = self.metadata["Provider"]
self.CheckID = self.metadata["CheckID"]
self.CheckName = self.metadata["CheckName"]
self.CheckTitle = self.metadata["CheckTitle"]
self.CheckAlias = self.metadata["CheckAlias"]
self.CheckType = self.metadata["CheckType"]
self.ServiceName = self.metadata["ServiceName"]
self.SubServiceName = self.metadata["SubServiceName"]
self.ResourceIdTemplate = self.metadata["ResourceIdTemplate"]
self.Severity = self.metadata["Severity"]
self.ResourceType = self.metadata["ResourceType"]
self.Description = self.metadata["Description"]
self.Risk = self.metadata["Risk"]
self.RelatedUrl = self.metadata["RelatedUrl"]
self.Remediation = self.metadata["Remediation"]
self.Categories = self.metadata["Categories"]
self.Tags = self.metadata["Tags"]
self.DependsOn = self.metadata["DependsOn"]
self.RelatedTo = self.metadata["RelatedTo"]
self.Notes = self.metadata["Notes"]
self.Compliance = self.metadata["Compliance"]
except:
print(f"Metadata check from file {self.__class__.__module__} not found")
@property
def provider(self):
return self.Provider
@property
def checkID(self):
return self.CheckID
@property
def checkName(self):
return self.CheckName
@property
def checkTitle(self):
return self.CheckTitle
@property
def checkAlias(self):
return self.CheckAlias
@property
def checkType(self):
return self.CheckType
@property
def serviceName(self):
return self.ServiceName
@property
def subServiceName(self):
return self.SubServiceName
@property
def resourceIdTemplate(self):
return self.ResourceIdTemplate
@property
def resourceType(self):
return self.ResourceType
@property
def description(self):
return self.Description
@property
def relatedUrl(self):
return self.RelatedUrl
@property
def remediation(self):
return self.Remediation
@property
def categories(self):
return self.Categories
@property
def tags(self):
return self.Tags
@property
def relatedTo(self):
return self.RelatedTo
@property
def notes(self):
return self.Notes
@property
def compliance(self):
return self.Compliance
def __parse_metadata__(self, metadata_file):
# Opening JSON file
f = open_file(metadata_file)
# Parse JSON
check_metadata = parse_json_file(f)
return check_metadata
# Validate metadata
@abstractmethod
def execute(self):
pass

View File

@@ -8,6 +8,7 @@ from lib.check.check import (
parse_checks_from_file,
parse_groups_from_file,
)
from lib.check.models import load_check_metadata
class Test_Check:
@@ -50,6 +51,28 @@ class Test_Check:
check_file = test["input"]["path"]
assert parse_groups_from_file(check_file) == test["expected"]
def test_load_check_metadata(self):
test_cases = [
{
"input": {
"metadata_path": f"{os.path.dirname(os.path.realpath(__file__))}/fixtures/metadata.json",
},
"expected": {
"CheckID": "iam_disable_30_days_credentials",
"CheckTitle": "Ensure credentials unused for 30 days or greater are disabled",
"ServiceName": "iam",
"Severity": "low",
},
}
]
for test in test_cases:
metadata_path = test["input"]["metadata_path"]
check_metadata = load_check_metadata(metadata_path)
assert check_metadata.CheckID == test["expected"]["CheckID"]
assert check_metadata.CheckTitle == test["expected"]["CheckTitle"]
assert check_metadata.ServiceName == test["expected"]["ServiceName"]
assert check_metadata.Severity == test["expected"]["Severity"]
def test_parse_checks_from_file(self):
test_cases = [
{

View File

@@ -0,0 +1,78 @@
from config.config import groups_file
from lib.check.check import (
parse_checks_from_file,
parse_groups_from_file,
recover_checks_from_provider,
)
from lib.logger import logger
# Generate the list of checks to execute
# test this function
def load_checks_to_execute(
bulk_checks_metadata: dict,
checks_file: str,
check_list: list,
service_list: list,
group_list: list,
provider: str,
) -> set:
checks_to_execute = set()
# Handle if there are checks passed using -c/--checks
if check_list:
for check_name in check_list:
checks_to_execute.add(check_name)
# elif severity_list:
# using bulk_checks_metadata
# elif compliance_list:
# using bulk_checks_metadata
# Handle if there are checks passed using -C/--checks-file
elif checks_file:
try:
checks_to_execute = parse_checks_from_file(checks_file, provider)
except Exception as e:
logger.error(f"{e.__class__.__name__} -- {e}")
# Handle if there are services passed using -s/--services
elif service_list:
# Loaded dynamically from modules within provider/services
for service in service_list:
modules = recover_checks_from_provider(provider, service)
if not modules:
logger.error(f"Service '{service}' was not found for the AWS provider")
else:
for check_module in modules:
# Recover check name and module name from import path
# Format: "providers.{provider}.services.{service}.{check_name}.{check_name}"
check_name = check_module.split(".")[-1]
# If the service is present in the group list passed as parameters
# if service_name in group_list: checks_to_execute.add(check_name)
checks_to_execute.add(check_name)
# Handle if there are groups passed using -g/--groups
elif group_list:
try:
checks_to_execute = parse_groups_from_file(
groups_file, group_list, provider
)
except Exception as e:
logger.error(f"{e.__class__.__name__} -- {e}")
# If there are no checks passed as argument
else:
try:
# Get all check modules to run with the specific provider
checks = recover_checks_from_provider(provider)
except Exception as e:
logger.error(f"{e.__class__.__name__} -- {e}")
else:
for check_name in checks:
# Recover check name from import path (last part)
# Format: "providers.{provider}.services.{service}.{check_name}.{check_name}"
check_name = check_name.split(".")[-1]
checks_to_execute.add(check_name)
return checks_to_execute

View File

@@ -0,0 +1,58 @@
{
"Categories": [
"cat1",
"cat2"
],
"CheckAlias": "extra764",
"CheckID": "iam_disable_30_days_credentials",
"CheckName": "iam_disable_30_days_credentials",
"CheckTitle": "Ensure credentials unused for 30 days or greater are disabled",
"CheckType": "Software and Configuration Checks",
"Compliance": [
{
"Control": [
"4.4"
],
"Framework": "CIS-AWS",
"Group": [
"level1",
"level2"
],
"Version": "1.4"
}
],
"DependsOn": [
"othercheck1",
"othercheck2"
],
"Description": "Ensure credentials unused for 30 days or greater are disabled",
"Notes": "additional information",
"Provider": "aws",
"RelatedTo": [
"othercheck3",
"othercheck4"
],
"RelatedUrl": "https://serviceofficialsiteorpageforthissubject",
"Remediation": {
"Code": {
"NativeIaC": "code or URL to the code location.",
"Terraform": "code or URL to the code location.",
"cli": "cli command or URL to the cli command location.",
"other": "cli command or URL to the cli command location."
},
"Recommendation": {
"Text": "Run sudo yum update and cross your fingers and toes.",
"Url": "https://myfp.com/recommendations/dangerous_things_and_how_to_fix_them.html"
}
},
"ResourceIdTemplate": "arn:partition:service:region:account-id:resource-id",
"ResourceType": "AwsIamAccessAnalyzer",
"Risk": "Risk associated.",
"ServiceName": "iam",
"Severity": "low",
"SubServiceName": "accessanalyzer",
"Tags": {
"Tag1Key": "value",
"Tag2Key": "value"
}
}

178
lib/check/models.py Normal file
View File

@@ -0,0 +1,178 @@
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import List
from pydantic import BaseModel, ValidationError
from lib.logger import logger
@dataclass
class Check_Report:
status: str
region: str
result_extended: str
def __init__(self):
self.status = ""
self.region = ""
self.result_extended = ""
@dataclass
class Output_From_Options:
is_quiet: bool
# Testing Pending
def load_check_metadata(metadata_file: str) -> dict:
try:
check_metadata = Check_Metadata_Model.parse_file(metadata_file)
except ValidationError as error:
logger.critical(f"Metadata from {metadata_file} is not valid: {error}")
quit()
else:
return check_metadata
# Check all values
class Check_Metadata_Model(BaseModel):
Provider: str
CheckID: str
CheckName: str
CheckTitle: str
# CheckAlias: str
CheckType: str
ServiceName: str
SubServiceName: str
ResourceIdTemplate: str
Severity: str
ResourceType: str
Description: str
Risk: str
RelatedUrl: str
Remediation: dict
Categories: List[str]
Tags: dict
DependsOn: List[str]
RelatedTo: List[str]
Notes: str
Compliance: List
class Check(ABC):
def __init__(self):
# Load metadata from check
check_path_name = self.__class__.__module__.replace(".", "/")
metadata_file = f"{check_path_name}.metadata.json"
self.__check_metadata__ = load_check_metadata(metadata_file)
# Assign metadata values
self.__Provider__ = self.__check_metadata__.Provider
self.__CheckID__ = self.__check_metadata__.CheckID
self.__CheckName__ = self.__check_metadata__.CheckName
self.__CheckTitle__ = self.__check_metadata__.CheckTitle
# self.__CheckAlias__ = self.__check_metadata__.CheckAlias
self.__CheckType__ = self.__check_metadata__.CheckType
self.__ServiceName__ = self.__check_metadata__.ServiceName
self.__SubServiceName__ = self.__check_metadata__.SubServiceName
self.__ResourceIdTemplate__ = self.__check_metadata__.ResourceIdTemplate
self.__Severity__ = self.__check_metadata__.Severity
self.__ResourceType__ = self.__check_metadata__.ResourceType
self.__Description__ = self.__check_metadata__.Description
self.__Risk__ = self.__check_metadata__.Risk
self.__RelatedUrl__ = self.__check_metadata__.RelatedUrl
self.__Remediation__ = self.__check_metadata__.Remediation
self.__Categories__ = self.__check_metadata__.Categories
self.__Tags__ = self.__check_metadata__.Tags
self.__DependsOn__ = self.__check_metadata__.DependsOn
self.__RelatedTo__ = self.__check_metadata__.RelatedTo
self.__Notes__ = self.__check_metadata__.Notes
self.__Compliance__ = self.__check_metadata__.Compliance
@property
def provider(self):
return self.__Provider__
@property
def checkID(self):
return self.__CheckID__
@property
def checkName(self):
return self.__CheckName__
@property
def checkTitle(self):
return self.__CheckTitle__
# @property
# def checkAlias(self):
# return self.__CheckAlias__
@property
def checkType(self):
return self.__CheckType__
@property
def serviceName(self):
return self.__ServiceName__
@property
def subServiceName(self):
return self.__SubServiceName__
@property
def resourceIdTemplate(self):
return self.__ResourceIdTemplate__
@property
def severity(self):
return self.__Severity__
@property
def resourceType(self):
return self.__ResourceType__
@property
def description(self):
return self.__Description__
@property
def relatedUrl(self):
return self.__RelatedUrl__
@property
def risk(self):
return self.__Risk__
@property
def remediation(self):
return self.__Remediation__
@property
def categories(self):
return self.__Categories__
@property
def tags(self):
return self.__Tags__
@property
def dependsOn(self):
return self.__DependsOn__
@property
def relatedTo(self):
return self.__RelatedTo__
@property
def notes(self):
return self.__Notes__
@property
def compliance(self):
return self.__Compliance__
@abstractmethod
def execute(self):
pass