chore(sqs_...not_publicly_accessible): less restrictive condition test (#3211)

Co-authored-by: Pepe Fagoaga <pepe@verica.io>
This commit is contained in:
Nacho Rivera
2023-12-19 16:53:19 +01:00
committed by GitHub
parent f8d77d9a30
commit 78505cb0a8
7 changed files with 224 additions and 152 deletions

View File

@@ -1,8 +1,11 @@
def is_account_only_allowed_in_condition(
condition_statement: dict, source_account: str
def is_condition_block_restrictive(
condition_statement: dict, source_account: str, is_cross_account_allowed=False
):
"""
is_account_only_allowed_in_condition parses the IAM Condition policy block and returns True if the source_account passed as argument is within, False if not.
is_condition_block_restrictive parses the IAM Condition policy block and, by default, returns True if the source_account passed as argument is within, False if not.
If argument is_cross_account_allowed is True it tests if the Condition block includes any of the operators allowlisted returning True if does, False if not.
@param condition_statement: dict with an IAM Condition block, e.g.:
{
@@ -54,13 +57,16 @@ def is_account_only_allowed_in_condition(
condition_statement[condition_operator][value],
list,
):
# if there is an arn/account without the source account -> we do not consider it safe
# here by default we assume is true and look for false entries
is_condition_key_restrictive = True
for item in condition_statement[condition_operator][value]:
if source_account not in item:
is_condition_key_restrictive = False
break
# if cross account is not allowed check for each condition block looking for accounts
# different than default
if not is_cross_account_allowed:
# if there is an arn/account without the source account -> we do not consider it safe
# here by default we assume is true and look for false entries
for item in condition_statement[condition_operator][value]:
if source_account not in item:
is_condition_key_restrictive = False
break
if is_condition_key_restrictive:
is_condition_valid = True
@@ -70,10 +76,13 @@ def is_account_only_allowed_in_condition(
condition_statement[condition_operator][value],
str,
):
if (
source_account
in condition_statement[condition_operator][value]
):
if is_cross_account_allowed:
is_condition_valid = True
else:
if (
source_account
in condition_statement[condition_operator][value]
):
is_condition_valid = True
return is_condition_valid

View File

@@ -1,6 +1,6 @@
from prowler.lib.check.models import Check, Check_Report_AWS
from prowler.providers.aws.lib.policy_condition_parser.policy_condition_parser import (
is_account_only_allowed_in_condition,
is_condition_block_restrictive,
)
from prowler.providers.aws.services.iam.iam_client import iam_client
@@ -30,7 +30,7 @@ class iam_role_cross_service_confused_deputy_prevention(Check):
and "Service" in statement["Principal"]
# Check to see if the appropriate condition statements have been implemented
and "Condition" in statement
and is_account_only_allowed_in_condition(
and is_condition_block_restrictive(
statement["Condition"], iam_client.audited_account
)
):

View File

@@ -1,6 +1,6 @@
from prowler.lib.check.models import Check, Check_Report_AWS
from prowler.providers.aws.lib.policy_condition_parser.policy_condition_parser import (
is_account_only_allowed_in_condition,
is_condition_block_restrictive,
)
from prowler.providers.aws.services.sns.sns_client import sns_client
@@ -35,7 +35,7 @@ class sns_topics_not_publicly_accessible(Check):
):
if (
"Condition" in statement
and is_account_only_allowed_in_condition(
and is_condition_block_restrictive(
statement["Condition"], sns_client.audited_account
)
):

View File

@@ -1,6 +1,6 @@
from prowler.lib.check.models import Check, Check_Report_AWS
from prowler.providers.aws.lib.policy_condition_parser.policy_condition_parser import (
is_account_only_allowed_in_condition,
is_condition_block_restrictive,
)
from prowler.providers.aws.services.sqs.sqs_client import sqs_client
@@ -32,8 +32,10 @@ class sqs_queues_not_publicly_accessible(Check):
)
):
if "Condition" in statement:
if is_account_only_allowed_in_condition(
statement["Condition"], sqs_client.audited_account
if is_condition_block_restrictive(
statement["Condition"],
sqs_client.audited_account,
True,
):
report.status_extended = f"SQS queue {queue.id} is not public because its policy only allows access from the same account."
else:

View File

@@ -2,7 +2,7 @@ from re import compile
from prowler.lib.check.models import Check, Check_Report_AWS
from prowler.providers.aws.lib.policy_condition_parser.policy_condition_parser import (
is_account_only_allowed_in_condition,
is_condition_block_restrictive,
)
from prowler.providers.aws.services.vpc.vpc_client import vpc_client
@@ -35,7 +35,7 @@ class vpc_endpoint_connections_trust_boundaries(Check):
if "Condition" in statement:
for account_id in trusted_account_ids:
if is_account_only_allowed_in_condition(
if is_condition_block_restrictive(
statement["Condition"], account_id
):
access_from_trusted_accounts = True
@@ -70,7 +70,7 @@ class vpc_endpoint_connections_trust_boundaries(Check):
access_from_trusted_accounts = False
if "Condition" in statement:
for account_id in trusted_account_ids:
if is_account_only_allowed_in_condition(
if is_condition_block_restrictive(
statement["Condition"], account_id
):
access_from_trusted_accounts = True
@@ -102,7 +102,7 @@ class vpc_endpoint_connections_trust_boundaries(Check):
if "Condition" in statement:
for account_id in trusted_account_ids:
if is_account_only_allowed_in_condition(
if is_condition_block_restrictive(
statement["Condition"], account_id
):
access_from_trusted_accounts = True

View File

@@ -92,6 +92,21 @@ test_public_policy_with_condition_diff_account = {
],
}
test_public_policy_with_invalid_condition_block = {
"Version": "2012-10-17",
"Id": "Queue1_Policy_UUID",
"Statement": [
{
"Sid": "Queue1_AnonymousAccess_ReceiveMessage",
"Effect": "Allow",
"Principal": "*",
"Action": "sqs:ReceiveMessage",
"Resource": test_queue_arn,
"Condition": {"DateGreaterThan": {"aws:CurrentTime": "2009-01-31T12:00Z"}},
}
],
}
class Test_sqs_queues_not_publicly_accessible:
def test_no_queues(self):
@@ -240,7 +255,7 @@ class Test_sqs_queues_not_publicly_accessible:
assert result[0].resource_tags == []
assert result[0].region == AWS_REGION_EU_WEST_1
def test_queues_public_with_condition_invalid_other_account(self):
def test_queues_public_with_condition_valid_with_other_account(self):
sqs_client = mock.MagicMock
sqs_client.queues = []
sqs_client.audited_account = AWS_ACCOUNT_NUMBER
@@ -261,6 +276,40 @@ class Test_sqs_queues_not_publicly_accessible:
sqs_queues_not_publicly_accessible,
)
check = sqs_queues_not_publicly_accessible()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"SQS queue {test_queue_url} is not public because its policy only allows access from the same account."
)
assert result[0].resource_id == test_queue_url
assert result[0].resource_arn == test_queue_arn
assert result[0].resource_tags == []
assert result[0].region == AWS_REGION_EU_WEST_1
def test_queues_public_with_condition_with_invalid_block(self):
sqs_client = mock.MagicMock
sqs_client.queues = []
sqs_client.audited_account = AWS_ACCOUNT_NUMBER
sqs_client.queues.append(
Queue(
id=test_queue_url,
name=test_queue_name,
region=AWS_REGION_EU_WEST_1,
policy=test_public_policy_with_invalid_condition_block,
arn=test_queue_arn,
)
)
with mock.patch(
"prowler.providers.aws.services.sqs.sqs_service.SQS",
sqs_client,
):
from prowler.providers.aws.services.sqs.sqs_queues_not_publicly_accessible.sqs_queues_not_publicly_accessible import (
sqs_queues_not_publicly_accessible,
)
check = sqs_queues_not_publicly_accessible()
result = check.execute()
assert len(result) == 1