fix(apigw_restapi_auth check): add method auth testing (#3183)

This commit is contained in:
Nacho Rivera
2023-12-13 16:20:09 +01:00
committed by GitHub
parent 8b5c995486
commit c937b193d0
5 changed files with 461 additions and 10 deletions

View File

@@ -1,7 +1,7 @@
{
"Provider": "aws",
"CheckID": "apigateway_restapi_authorizers_enabled",
"CheckTitle": "Check if API Gateway has configured authorizers.",
"CheckTitle": "Check if API Gateway has configured authorizers at api or method level.",
"CheckAliases": [
"apigateway_authorizers_enabled"
],
@@ -13,7 +13,7 @@
"ResourceIdTemplate": "arn:partition:service:region:account-id:resource-id",
"Severity": "medium",
"ResourceType": "AwsApiGatewayRestApi",
"Description": "Check if API Gateway has configured authorizers.",
"Description": "Check if API Gateway has configured authorizers at api or method level.",
"Risk": "If no authorizer is enabled anyone can use the service.",
"RelatedUrl": "",
"Remediation": {

View File

@@ -13,12 +13,41 @@ class apigateway_restapi_authorizers_enabled(Check):
report.resource_id = rest_api.name
report.resource_arn = rest_api.arn
report.resource_tags = rest_api.tags
# it there are not authorizers at api level and resources without methods (default case) ->
report.status = "FAIL"
report.status_extended = f"API Gateway {rest_api.name} ID {rest_api.id} does not have an authorizer configured at api level."
if rest_api.authorizer:
report.status = "PASS"
report.status_extended = f"API Gateway {rest_api.name} ID {rest_api.id} has an authorizer configured."
report.status_extended = f"API Gateway {rest_api.name} ID {rest_api.id} has an authorizer configured at api level"
else:
report.status = "FAIL"
report.status_extended = f"API Gateway {rest_api.name} ID {rest_api.id} does not have an authorizer configured."
# we want to know if api has not authorizers and all the resources don't have methods configured
resources_have_methods = False
all_methods_authorized = True
resource_paths_with_unathorized_methods = []
for resource in rest_api.resources:
# if the resource has methods test if they have all configured authorizer
if resource.resource_methods:
resources_have_methods = True
for (
http_method,
authorization_method,
) in resource.resource_methods.items():
if authorization_method == "NONE":
all_methods_authorized = False
unauthorized_method = (
resource.path + " -> " + http_method
)
resource_paths_with_unathorized_methods.append(
unauthorized_method
)
# if there are methods in at least one resource and are all authorized
if all_methods_authorized and resources_have_methods:
report.status = "PASS"
report.status_extended = f"API Gateway {rest_api.name} ID {rest_api.id} has all methods authorized"
# if there are methods in at least one result but some of then are not authorized-> list it
elif not all_methods_authorized:
report.status_extended = f"API Gateway {rest_api.name} ID {rest_api.id} does not have authorizers at api level and the following paths and methods are unauthorized: {'; '.join(resource_paths_with_unathorized_methods)}."
findings.append(report)
return findings

View File

@@ -17,6 +17,7 @@ class APIGateway(AWSService):
self.__get_authorizers__()
self.__get_rest_api__()
self.__get_stages__()
self.__get_resources__()
def __get_rest_apis__(self, regional_client):
logger.info("APIGateway - Getting Rest APIs...")
@@ -53,7 +54,9 @@ class APIGateway(AWSService):
if authorizers:
rest_api.authorizer = True
except Exception as error:
logger.error(f"{error.__class__.__name__}: {error}")
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def __get_rest_api__(self):
logger.info("APIGateway - Describing Rest API...")
@@ -64,7 +67,9 @@ class APIGateway(AWSService):
if rest_api_info["endpointConfiguration"]["types"] == ["PRIVATE"]:
rest_api.public_endpoint = False
except Exception as error:
logger.error(f"{error.__class__.__name__}: {error}")
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def __get_stages__(self):
logger.info("APIGateway - Getting stages for Rest APIs...")
@@ -95,7 +100,46 @@ class APIGateway(AWSService):
)
)
except Exception as error:
logger.error(f"{error.__class__.__name__}: {error}")
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def __get_resources__(self):
logger.info("APIGateway - Getting API resources...")
try:
for rest_api in self.rest_apis:
regional_client = self.regional_clients[rest_api.region]
get_resources_paginator = regional_client.get_paginator("get_resources")
for page in get_resources_paginator.paginate(restApiId=rest_api.id):
for resource in page["items"]:
id = resource["id"]
resource_methods = []
methods_auth = {}
for resource_method in resource.get(
"resourceMethods", {}
).keys():
resource_methods.append(resource_method)
for resource_method in resource_methods:
if resource_method != "OPTIONS":
method_config = regional_client.get_method(
restApiId=rest_api.id,
resourceId=id,
httpMethod=resource_method,
)
auth_type = method_config["authorizationType"]
methods_auth.update({resource_method: auth_type})
rest_api.resources.append(
PathResourceMethods(
path=resource["path"], resource_methods=methods_auth
)
)
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
class Stage(BaseModel):
@@ -107,6 +151,11 @@ class Stage(BaseModel):
tags: Optional[list] = []
class PathResourceMethods(BaseModel):
path: str
resource_methods: dict
class RestAPI(BaseModel):
id: str
arn: str
@@ -116,3 +165,4 @@ class RestAPI(BaseModel):
public_endpoint: bool = True
stages: list[Stage] = []
tags: Optional[list] = []
resources: list[PathResourceMethods] = []

View File

@@ -97,7 +97,7 @@ class Test_apigateway_restapi_authorizers_enabled:
assert len(result) == 1
assert (
result[0].status_extended
== f"API Gateway test-rest-api ID {rest_api['id']} has an authorizer configured."
== f"API Gateway test-rest-api ID {rest_api['id']} has an authorizer configured at api level"
)
assert result[0].resource_id == "test-rest-api"
assert (
@@ -142,7 +142,337 @@ class Test_apigateway_restapi_authorizers_enabled:
assert len(result) == 1
assert (
result[0].status_extended
== f"API Gateway test-rest-api ID {rest_api['id']} does not have an authorizer configured."
== f"API Gateway test-rest-api ID {rest_api['id']} does not have an authorizer configured at api level."
)
assert result[0].resource_id == "test-rest-api"
assert (
result[0].resource_arn
== f"arn:{current_audit_info.audited_partition}:apigateway:{AWS_REGION_US_EAST_1}::/restapis/{rest_api['id']}"
)
assert result[0].region == AWS_REGION_US_EAST_1
assert result[0].resource_tags == [{}]
@mock_apigateway
@mock_iam
@mock_lambda
def test_apigateway_one_rest_api_without_api_or_methods_authorizer(self):
# Create APIGateway Mocked Resources
apigateway_client = client("apigateway", region_name=AWS_REGION_US_EAST_1)
rest_api = apigateway_client.create_rest_api(
name="test-rest-api",
)
default_resource_id = apigateway_client.get_resources(restApiId=rest_api["id"])[
"items"
][0]["id"]
api_resource = apigateway_client.create_resource(
restApiId=rest_api["id"], parentId=default_resource_id, pathPart="test"
)
apigateway_client.put_method(
restApiId=rest_api["id"],
resourceId=api_resource["id"],
httpMethod="GET",
authorizationType="NONE",
)
from prowler.providers.aws.services.apigateway.apigateway_service import (
APIGateway,
)
current_audit_info = current_audit_info = set_mocked_aws_audit_info(
[AWS_REGION_EU_WEST_1, AWS_REGION_US_EAST_1]
)
with mock.patch(
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
new=current_audit_info,
), mock.patch(
"prowler.providers.aws.services.apigateway.apigateway_restapi_authorizers_enabled.apigateway_restapi_authorizers_enabled.apigateway_client",
new=APIGateway(current_audit_info),
):
# Test Check
from prowler.providers.aws.services.apigateway.apigateway_restapi_authorizers_enabled.apigateway_restapi_authorizers_enabled import (
apigateway_restapi_authorizers_enabled,
)
check = apigateway_restapi_authorizers_enabled()
result = check.execute()
assert result[0].status == "FAIL"
assert len(result) == 1
assert (
result[0].status_extended
== f"API Gateway test-rest-api ID {rest_api['id']} does not have authorizers at api level and the following paths and methods are unauthorized: /test -> GET."
)
assert result[0].resource_id == "test-rest-api"
assert (
result[0].resource_arn
== f"arn:{current_audit_info.audited_partition}:apigateway:{AWS_REGION_US_EAST_1}::/restapis/{rest_api['id']}"
)
assert result[0].region == AWS_REGION_US_EAST_1
assert result[0].resource_tags == [{}]
@mock_apigateway
@mock_iam
@mock_lambda
def test_apigateway_one_rest_api_without_api_auth_but_one_method_auth(self):
# Create APIGateway Mocked Resources
apigateway_client = client("apigateway", region_name=AWS_REGION_US_EAST_1)
rest_api = apigateway_client.create_rest_api(
name="test-rest-api",
)
default_resource_id = apigateway_client.get_resources(restApiId=rest_api["id"])[
"items"
][0]["id"]
api_resource = apigateway_client.create_resource(
restApiId=rest_api["id"], parentId=default_resource_id, pathPart="test"
)
apigateway_client.put_method(
restApiId=rest_api["id"],
resourceId=api_resource["id"],
httpMethod="GET",
authorizationType="AWS_IAM",
)
from prowler.providers.aws.services.apigateway.apigateway_service import (
APIGateway,
)
current_audit_info = current_audit_info = set_mocked_aws_audit_info(
[AWS_REGION_EU_WEST_1, AWS_REGION_US_EAST_1]
)
with mock.patch(
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
new=current_audit_info,
), mock.patch(
"prowler.providers.aws.services.apigateway.apigateway_restapi_authorizers_enabled.apigateway_restapi_authorizers_enabled.apigateway_client",
new=APIGateway(current_audit_info),
):
# Test Check
from prowler.providers.aws.services.apigateway.apigateway_restapi_authorizers_enabled.apigateway_restapi_authorizers_enabled import (
apigateway_restapi_authorizers_enabled,
)
check = apigateway_restapi_authorizers_enabled()
result = check.execute()
assert result[0].status == "PASS"
assert len(result) == 1
assert (
result[0].status_extended
== f"API Gateway test-rest-api ID {rest_api['id']} has all methods authorized"
)
assert result[0].resource_id == "test-rest-api"
assert (
result[0].resource_arn
== f"arn:{current_audit_info.audited_partition}:apigateway:{AWS_REGION_US_EAST_1}::/restapis/{rest_api['id']}"
)
assert result[0].region == AWS_REGION_US_EAST_1
assert result[0].resource_tags == [{}]
@mock_apigateway
@mock_iam
@mock_lambda
def test_apigateway_one_rest_api_without_api_auth_but_methods_auth_and_not(self):
# Create APIGateway Mocked Resources
apigateway_client = client("apigateway", region_name=AWS_REGION_US_EAST_1)
rest_api = apigateway_client.create_rest_api(
name="test-rest-api",
)
default_resource_id = apigateway_client.get_resources(restApiId=rest_api["id"])[
"items"
][0]["id"]
api_resource = apigateway_client.create_resource(
restApiId=rest_api["id"], parentId=default_resource_id, pathPart="test"
)
apigateway_client.put_method(
restApiId=rest_api["id"],
resourceId=api_resource["id"],
httpMethod="POST",
authorizationType="AWS_IAM",
)
apigateway_client.put_method(
restApiId=rest_api["id"],
resourceId=api_resource["id"],
httpMethod="GET",
authorizationType="NONE",
)
from prowler.providers.aws.services.apigateway.apigateway_service import (
APIGateway,
)
current_audit_info = current_audit_info = set_mocked_aws_audit_info(
[AWS_REGION_EU_WEST_1, AWS_REGION_US_EAST_1]
)
with mock.patch(
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
new=current_audit_info,
), mock.patch(
"prowler.providers.aws.services.apigateway.apigateway_restapi_authorizers_enabled.apigateway_restapi_authorizers_enabled.apigateway_client",
new=APIGateway(current_audit_info),
):
# Test Check
from prowler.providers.aws.services.apigateway.apigateway_restapi_authorizers_enabled.apigateway_restapi_authorizers_enabled import (
apigateway_restapi_authorizers_enabled,
)
check = apigateway_restapi_authorizers_enabled()
result = check.execute()
assert result[0].status == "FAIL"
assert len(result) == 1
assert (
result[0].status_extended
== f"API Gateway test-rest-api ID {rest_api['id']} does not have authorizers at api level and the following paths and methods are unauthorized: /test -> GET."
)
assert result[0].resource_id == "test-rest-api"
assert (
result[0].resource_arn
== f"arn:{current_audit_info.audited_partition}:apigateway:{AWS_REGION_US_EAST_1}::/restapis/{rest_api['id']}"
)
assert result[0].region == AWS_REGION_US_EAST_1
assert result[0].resource_tags == [{}]
@mock_apigateway
@mock_iam
@mock_lambda
def test_apigateway_one_rest_api_without_api_auth_but_methods_not_auth_and_auth(
self,
):
# Create APIGateway Mocked Resources
apigateway_client = client("apigateway", region_name=AWS_REGION_US_EAST_1)
rest_api = apigateway_client.create_rest_api(
name="test-rest-api",
)
default_resource_id = apigateway_client.get_resources(restApiId=rest_api["id"])[
"items"
][0]["id"]
api_resource = apigateway_client.create_resource(
restApiId=rest_api["id"], parentId=default_resource_id, pathPart="test"
)
apigateway_client.put_method(
restApiId=rest_api["id"],
resourceId=api_resource["id"],
httpMethod="GET",
authorizationType="NONE",
)
apigateway_client.put_method(
restApiId=rest_api["id"],
resourceId=api_resource["id"],
httpMethod="POST",
authorizationType="AWS_IAM",
)
from prowler.providers.aws.services.apigateway.apigateway_service import (
APIGateway,
)
current_audit_info = current_audit_info = set_mocked_aws_audit_info(
[AWS_REGION_EU_WEST_1, AWS_REGION_US_EAST_1]
)
with mock.patch(
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
new=current_audit_info,
), mock.patch(
"prowler.providers.aws.services.apigateway.apigateway_restapi_authorizers_enabled.apigateway_restapi_authorizers_enabled.apigateway_client",
new=APIGateway(current_audit_info),
):
# Test Check
from prowler.providers.aws.services.apigateway.apigateway_restapi_authorizers_enabled.apigateway_restapi_authorizers_enabled import (
apigateway_restapi_authorizers_enabled,
)
check = apigateway_restapi_authorizers_enabled()
result = check.execute()
assert result[0].status == "FAIL"
assert len(result) == 1
assert (
result[0].status_extended
== f"API Gateway test-rest-api ID {rest_api['id']} does not have authorizers at api level and the following paths and methods are unauthorized: /test -> GET."
)
assert result[0].resource_id == "test-rest-api"
assert (
result[0].resource_arn
== f"arn:{current_audit_info.audited_partition}:apigateway:{AWS_REGION_US_EAST_1}::/restapis/{rest_api['id']}"
)
assert result[0].region == AWS_REGION_US_EAST_1
assert result[0].resource_tags == [{}]
@mock_apigateway
@mock_iam
@mock_lambda
def test_apigateway_one_rest_api_without_authorizers_with_various_resources_without_endpoints(
self,
):
# Create APIGateway Mocked Resources
apigateway_client = client("apigateway", region_name=AWS_REGION_US_EAST_1)
rest_api = apigateway_client.create_rest_api(
name="test-rest-api",
)
default_resource_id = apigateway_client.get_resources(restApiId=rest_api["id"])[
"items"
][0]["id"]
apigateway_client.create_resource(
restApiId=rest_api["id"], parentId=default_resource_id, pathPart="test"
)
apigateway_client.create_resource(
restApiId=rest_api["id"], parentId=default_resource_id, pathPart="test2"
)
from prowler.providers.aws.services.apigateway.apigateway_service import (
APIGateway,
)
current_audit_info = current_audit_info = set_mocked_aws_audit_info(
[AWS_REGION_EU_WEST_1, AWS_REGION_US_EAST_1]
)
with mock.patch(
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
new=current_audit_info,
), mock.patch(
"prowler.providers.aws.services.apigateway.apigateway_restapi_authorizers_enabled.apigateway_restapi_authorizers_enabled.apigateway_client",
new=APIGateway(current_audit_info),
):
# Test Check
from prowler.providers.aws.services.apigateway.apigateway_restapi_authorizers_enabled.apigateway_restapi_authorizers_enabled import (
apigateway_restapi_authorizers_enabled,
)
check = apigateway_restapi_authorizers_enabled()
result = check.execute()
assert result[0].status == "FAIL"
assert len(result) == 1
assert (
result[0].status_extended
== f"API Gateway test-rest-api ID {rest_api['id']} does not have an authorizer configured at api level."
)
assert result[0].resource_id == "test-rest-api"
assert (

View File

@@ -146,3 +146,45 @@ class Test_APIGateway_Service:
audit_info = set_mocked_aws_audit_info([AWS_REGION_US_EAST_1])
apigateway = APIGateway(audit_info)
assert apigateway.rest_apis[0].stages[0].logging is True
# Test APIGateway __get_resources__
@mock_apigateway
def test__get_resources__(self):
apigateway_client = client("apigateway", region_name=AWS_REGION_US_EAST_1)
rest_api = apigateway_client.create_rest_api(
name="test-rest-api",
)
default_resource_id = apigateway_client.get_resources(restApiId=rest_api["id"])[
"items"
][0]["id"]
api_resource = apigateway_client.create_resource(
restApiId=rest_api["id"], parentId=default_resource_id, pathPart="test"
)
apigateway_client.put_method(
restApiId=rest_api["id"],
resourceId=api_resource["id"],
httpMethod="GET",
authorizationType="AWS_IAM",
)
apigateway_client.put_method(
restApiId=rest_api["id"],
resourceId=api_resource["id"],
httpMethod="OPTIONS",
authorizationType="AWS_IAM",
)
audit_info = set_mocked_aws_audit_info([AWS_REGION_US_EAST_1])
apigateway = APIGateway(audit_info)
# we skip OPTIONS methods
assert list(apigateway.rest_apis[0].resources[1].resource_methods.keys()) == [
"GET"
]
assert list(apigateway.rest_apis[0].resources[1].resource_methods.values()) == [
"AWS_IAM"
]