fix(errors): solve several services errors (AccessAnalyzer, AppStream, KMS, S3, SQS, R53, IAM, CodeArtifact and EC2) (#1879)

This commit is contained in:
Sergio Garcia
2023-02-10 12:26:00 +01:00
committed by GitHub
parent 57a2fca3a4
commit d88640fd20
12 changed files with 69 additions and 81 deletions

View File

@@ -75,7 +75,7 @@ class AccessAnalyzer:
logger.info("AccessAnalyzer - Get Finding status...")
try:
for analyzer in self.analyzers:
if analyzer.status != "NOT_AVAILABLE":
if analyzer.status == "ACTIVE":
regional_client = self.regional_clients[analyzer.region]
for finding in analyzer.findings:
finding_information = regional_client.get_finding(
@@ -92,7 +92,7 @@ class AccessAnalyzer:
logger.info("AccessAnalyzer - Listing Findings per Analyzer...")
try:
for analyzer in self.analyzers:
if analyzer.status != "NOT_AVAILABLE":
if analyzer.status == "ACTIVE":
regional_client = self.regional_clients[analyzer.region]
list_findings_paginator = regional_client.get_paginator(
"list_findings"

View File

@@ -22,6 +22,7 @@ class appstream_fleet_session_idle_disconnect_timeout(Check):
if (
fleet.idle_disconnect_timeout_in_seconds
and fleet.idle_disconnect_timeout_in_seconds
<= max_idle_disconnect_timeout_in_seconds
):
report.status = "PASS"

View File

@@ -1,5 +1,7 @@
import threading
from dataclasses import dataclass
from typing import Optional
from pydantic import BaseModel
from prowler.lib.logger import logger
from prowler.lib.scan_filters.scan_filters import is_resource_filtered
@@ -48,9 +50,9 @@ class AppStream:
disconnect_timeout_in_seconds=fleet[
"DisconnectTimeoutInSeconds"
],
idle_disconnect_timeout_in_seconds=fleet[
idle_disconnect_timeout_in_seconds=fleet.get(
"IdleDisconnectTimeoutInSeconds"
],
),
enable_default_internet_access=fleet[
"EnableDefaultInternetAccess"
],
@@ -64,29 +66,11 @@ class AppStream:
)
@dataclass
class Fleet:
class Fleet(BaseModel):
arn: str
name: str
max_user_duration_in_seconds: int
disconnect_timeout_in_seconds: int
idle_disconnect_timeout_in_seconds: int
idle_disconnect_timeout_in_seconds: Optional[int]
enable_default_internet_access: bool
def __init__(
self,
arn,
name,
max_user_duration_in_seconds,
disconnect_timeout_in_seconds,
idle_disconnect_timeout_in_seconds,
enable_default_internet_access,
region,
):
self.arn = arn
self.name = name
self.max_user_duration_in_seconds = max_user_duration_in_seconds
self.disconnect_timeout_in_seconds = disconnect_timeout_in_seconds
self.idle_disconnect_timeout_in_seconds = idle_disconnect_timeout_in_seconds
self.enable_default_internet_access = enable_default_internet_access
self.region = region
region: str

View File

@@ -1,5 +1,6 @@
import os
import tempfile
import zlib
from base64 import b64decode
from detect_secrets import SecretsCollection
@@ -20,7 +21,13 @@ class ec2_instance_secrets_user_data(Check):
if instance.user_data:
temp_user_data_file = tempfile.NamedTemporaryFile(delete=False)
user_data = b64decode(instance.user_data).decode("utf-8")
user_data = b64decode(instance.user_data)
if user_data[0:2] == b"\x1f\x8b": # GZIP magic number
user_data = zlib.decompress(user_data, zlib.MAX_WBITS | 32).decode(
"utf-8"
)
else:
user_data = user_data.decode("utf-8")
temp_user_data_file.write(
bytes(user_data, encoding="raw_unicode_escape")

View File

@@ -52,7 +52,7 @@ class iam_role_cross_service_confused_deputy_prevention(Check):
and "aws:SourceArn" in statement["Condition"]["ArnLike"]
and iam_client.account
in str(
statement["Condition"]["ArnEquals"]["aws:SourceArn"]
statement["Condition"]["ArnLike"]["aws:SourceArn"]
)
)
)

View File

@@ -16,22 +16,29 @@ class kms_key_not_publicly_accessible(Check):
report.resource_arn = key.arn
report.region = key.region
# If the "Principal" element value is set to { "AWS": "*" } and the policy statement is not using any Condition clauses to filter the access, the selected AWS KMS master key is publicly accessible.
for statement in key.policy["Statement"]:
if "*" == statement["Principal"] and "Condition" not in statement:
report.status = "FAIL"
report.status_extended = (
f"KMS key {key.id} may be publicly accessible!"
)
elif "AWS" in statement["Principal"]:
if type(statement["Principal"]["AWS"]) == str:
principals = [statement["Principal"]["AWS"]]
else:
principals = statement["Principal"]["AWS"]
for principal_arn in principals:
if principal_arn == "*" and "Condition" not in statement:
report.status = "FAIL"
report.status_extended = (
f"KMS key {key.id} may be publicly accessible!"
)
if key.policy and "Statement" in key.policy:
for statement in key.policy["Statement"]:
if (
"*" == statement["Principal"]
and "Condition" not in statement
):
report.status = "FAIL"
report.status_extended = (
f"KMS key {key.id} may be publicly accessible!"
)
elif "AWS" in statement["Principal"]:
if type(statement["Principal"]["AWS"]) == str:
principals = [statement["Principal"]["AWS"]]
else:
principals = statement["Principal"]["AWS"]
for principal_arn in principals:
if (
principal_arn == "*"
and "Condition" not in statement
):
report.status = "FAIL"
report.status_extended = (
f"KMS key {key.id} may be publicly accessible!"
)
findings.append(report)
return findings

View File

@@ -1,6 +1,8 @@
import json
import threading
from dataclasses import dataclass
from typing import Optional
from pydantic import BaseModel
from prowler.lib.logger import logger
from prowler.lib.scan_filters.scan_filters import is_resource_filtered
@@ -17,9 +19,10 @@ class KMS:
self.regional_clients = generate_regional_clients(self.service, audit_info)
self.keys = []
self.__threading_call__(self.__list_keys__)
self.__describe_key__()
self.__get_key_rotation_status__()
self.__get_key_policy__()
if self.keys:
self.__describe_key__()
self.__get_key_rotation_status__()
self.__get_key_policy__()
def __get_session__(self):
return self.session
@@ -44,9 +47,9 @@ class KMS:
):
self.keys.append(
Key(
key["KeyId"],
key["KeyArn"],
regional_client.region,
id=key["KeyId"],
arn=key["KeyArn"],
region=regional_client.region,
)
)
except Exception as error:
@@ -100,29 +103,13 @@ class KMS:
)
@dataclass
class Key:
class Key(BaseModel):
id: str
arn: str
state: str
origin: str
manager: str
rotation_enabled: bool
policy: dict
spec: str
state: Optional[str]
origin: Optional[str]
manager: Optional[str]
rotation_enabled: Optional[bool]
policy: Optional[dict]
spec: Optional[str]
region: str
def __init__(
self,
id,
arn,
region,
):
self.id = id
self.arn = arn
self.state = None
self.origin = None
self.manager = None
self.rotation_enabled = False
self.policy = {}
self.region = region

View File

@@ -13,7 +13,7 @@ class route53_domains_transferlock_enabled(Check):
report.resource_id = domain.name
report.region = domain.region
if "clientTransferProhibited" in domain.status_list:
if domain.status_list and "clientTransferProhibited" in domain.status_list:
report.status = "PASS"
report.status_extended = (
f"Transfer Lock is enabled for the {domain.name} domain"

View File

@@ -129,7 +129,7 @@ class Route53Domains:
for domain in self.domains.values():
domain_detail = self.client.get_domain_detail(DomainName=domain.name)
self.domains[domain.name].admin_privacy = domain_detail["AdminPrivacy"]
self.domains[domain.name].status_list = domain_detail["StatusList"]
self.domains[domain.name].status_list = domain_detail.get("StatusList")
except Exception as error:
logger.error(

View File

@@ -48,7 +48,9 @@ class S3:
bucket_region = self.client.get_bucket_location(
Bucket=bucket["Name"]
)["LocationConstraint"]
if not bucket_region: # If us-east-1, bucket_region is none
if bucket_region == "EU": # If EU, bucket_region is eu-west-1
bucket_region = "eu-west-1"
if not bucket_region: # If Nonce, bucket_region is us-east-1
bucket_region = "us-east-1"
# Arn
arn = f"arn:{self.audited_partition}:s3:::{bucket['Name']}"

View File

@@ -16,7 +16,7 @@ class sqs_queues_not_publicly_accessible(Check):
for statement in queue.policy["Statement"]:
# Only check allow statements
if statement["Effect"] == "Allow":
if (
if "Principal" in statement and (
"*" in statement["Principal"]
or (
"AWS" in statement["Principal"]

View File

@@ -28,7 +28,7 @@ def mock_make_api_call(self, operation_name, kwarg):
{
"arn": "ARN",
"name": "Test Analyzer",
"status": "Enabled",
"status": "ACTIVE",
"findings": 0,
"tags": "",
"type": "ACCOUNT",
@@ -91,7 +91,7 @@ class Test_AccessAnalyzer_Service:
assert len(access_analyzer.analyzers) == 1
assert access_analyzer.analyzers[0].arn == "ARN"
assert access_analyzer.analyzers[0].name == "Test Analyzer"
assert access_analyzer.analyzers[0].status == "Enabled"
assert access_analyzer.analyzers[0].status == "ACTIVE"
assert access_analyzer.analyzers[0].tags == ""
assert access_analyzer.analyzers[0].type == "ACCOUNT"
assert access_analyzer.analyzers[0].region == AWS_REGION