feat(cond parser): add policy cond parser & apply in sqs public check (#2575)

This commit is contained in:
Nacho Rivera
2023-07-12 15:39:01 +02:00
committed by GitHub
parent 66fe101ccd
commit d1c91093e2
7 changed files with 231 additions and 15 deletions

View File

@@ -0,0 +1,123 @@
from prowler.providers.aws.lib.policy_condition_parser.policy_condition_parser import (
is_account_only_allowed_in_condition,
)
AWS_ACCOUNT_NUMBER = "123456789012"
class Test_policy_condition_parser:
def test_condition_parser_string_equals_list(self):
condition_statement = {"StringEquals": {"aws:SourceAccount": ["123456789012"]}}
assert is_account_only_allowed_in_condition(
condition_statement, AWS_ACCOUNT_NUMBER
)
def test_condition_parser_string_equals_str(self):
condition_statement = {"StringEquals": {"aws:SourceAccount": "123456789012"}}
assert is_account_only_allowed_in_condition(
condition_statement, AWS_ACCOUNT_NUMBER
)
def test_condition_parser_string_equals_list_not_valid(self):
condition_statement = {
"StringEquals": {"aws:SourceAccount": ["123456789012", "111222333444"]}
}
assert not is_account_only_allowed_in_condition(
condition_statement, AWS_ACCOUNT_NUMBER
)
def test_condition_parser_string_equals_str_not_valid(self):
condition_statement = {"StringEquals": {"aws:SourceAccount": "111222333444"}}
assert not is_account_only_allowed_in_condition(
condition_statement, AWS_ACCOUNT_NUMBER
)
def test_condition_parser_arnlike_list(self):
condition_statement = {
"ArnLike": {"aws:SourceArn": ["arn:aws:cloudtrail:*:123456789012:trail/*"]}
}
assert is_account_only_allowed_in_condition(
condition_statement, AWS_ACCOUNT_NUMBER
)
def test_condition_parser_arnlike_list_not_valid(self):
condition_statement = {
"ArnLike": {
"aws:SourceArn": [
"arn:aws:cloudtrail:*:123456789012:trail/*",
"arn:aws:cloudtrail:*:111222333444:trail/*",
]
}
}
assert not is_account_only_allowed_in_condition(
condition_statement, AWS_ACCOUNT_NUMBER
)
def test_condition_parser_arnlike_str(self):
condition_statement = {
"ArnLike": {"aws:SourceArn": "arn:aws:cloudtrail:*:123456789012:trail/*"}
}
assert is_account_only_allowed_in_condition(
condition_statement, AWS_ACCOUNT_NUMBER
)
def test_condition_parser_arnlike_str_not_valid(self):
condition_statement = {
"ArnLike": {"aws:SourceArn": "arn:aws:cloudtrail:*:111222333444:trail/*"}
}
assert not is_account_only_allowed_in_condition(
condition_statement, AWS_ACCOUNT_NUMBER
)
def test_condition_parser_arnequals_list(self):
condition_statement = {
"ArnEquals": {
"aws:SourceArn": [
"arn:aws:cloudtrail:eu-west-1:123456789012:trail/test"
]
}
}
assert is_account_only_allowed_in_condition(
condition_statement, AWS_ACCOUNT_NUMBER
)
def test_condition_parser_arnequals_list_not_valid(self):
condition_statement = {
"ArnEquals": {
"aws:SourceArn": [
"arn:aws:cloudtrail:eu-west-1:123456789012:trail/test",
"arn:aws:cloudtrail:eu-west-1:111222333444:trail/test",
]
}
}
assert not is_account_only_allowed_in_condition(
condition_statement, AWS_ACCOUNT_NUMBER
)
def test_condition_parser_arnequals_str(self):
condition_statement = {
"ArnEquals": {
"aws:SourceArn": "arn:aws:cloudtrail:eu-west-1:123456789012:trail/test"
}
}
assert is_account_only_allowed_in_condition(
condition_statement, AWS_ACCOUNT_NUMBER
)
def test_condition_parser_arnequals_str_not_valid(self):
condition_statement = {
"ArnEquals": {
"aws:SourceArn": "arn:aws:cloudtrail:eu-west-1:111222333444:trail/test"
}
}
assert not is_account_only_allowed_in_condition(
condition_statement, AWS_ACCOUNT_NUMBER
)

View File

@@ -38,7 +38,7 @@ test_public_policy = {
],
}
test_public_policy_with_condition = {
test_public_policy_with_condition_not_valid = {
"Version": "2012-10-17",
"Id": "Queue1_Policy_UUID",
"Statement": [
@@ -56,6 +56,21 @@ test_public_policy_with_condition = {
],
}
test_public_policy_with_condition = {
"Version": "2012-10-17",
"Id": "Queue1_Policy_UUID",
"Statement": [
{
"Sid": "Queue1_AnonymousAccess_ReceiveMessage",
"Effect": "Allow",
"Principal": "*",
"Action": "sqs:ReceiveMessage",
"Resource": topic_arn,
"Condition": {"StringEquals": {"aws:SourceAccount": "123456789012"}},
}
],
}
class Test_sqs_queues_not_publicly_accessible:
def test_no_queues(self):
@@ -123,13 +138,48 @@ class Test_sqs_queues_not_publicly_accessible:
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert search("policy with public access", result[0].status_extended)
assert search(
"is public because its policy allows public access",
result[0].status_extended,
)
assert result[0].resource_id == queue_id
assert result[0].resource_arn == "arn_test"
def test_queues_public_with_condition(self):
def test_queues_public_with_condition_not_valid(self):
sqs_client = mock.MagicMock
sqs_client.queues = []
sqs_client.audited_account = AWS_ACCOUNT_NUMBER
sqs_client.queues.append(
Queue(
id=queue_id,
region=AWS_REGION,
policy=test_public_policy_with_condition_not_valid,
arn="arn_test",
)
)
with mock.patch(
"prowler.providers.aws.services.sqs.sqs_service.SQS",
sqs_client,
):
from prowler.providers.aws.services.sqs.sqs_queues_not_publicly_accessible.sqs_queues_not_publicly_accessible import (
sqs_queues_not_publicly_accessible,
)
check = sqs_queues_not_publicly_accessible()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert search(
"is public because its policy allows public access",
result[0].status_extended,
)
assert result[0].resource_id == queue_id
assert result[0].resource_arn == "arn_test"
def test_queues_public_with_condition_valid(self):
sqs_client = mock.MagicMock
sqs_client.queues = []
sqs_client.audited_account = AWS_ACCOUNT_NUMBER
sqs_client.queues.append(
Queue(
id=queue_id,
@@ -149,10 +199,10 @@ class Test_sqs_queues_not_publicly_accessible:
check = sqs_queues_not_publicly_accessible()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert search(
"policy with public access but has a Condition",
result[0].status_extended,
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"SQS queue {queue_id} is not public because its policy only allows access from the same account"
)
assert result[0].resource_id == queue_id
assert result[0].resource_arn == "arn_test"