feat(apigateway): Service and checks for APIGW v1 and v2 (#1415)

This commit is contained in:
Sergio Garcia
2022-10-31 14:13:11 +01:00
committed by GitHub
parent f7842fdcdd
commit adf04ba632
49 changed files with 2007 additions and 457 deletions

View File

@@ -18,6 +18,8 @@ coverage = "6.4.1"
pytest = "7.1.2"
pytest-xdist = "2.5.0"
shodan = "1.28.0"
openapi-spec-validator = "0.5.1"
docker = "6.0.0"
detect-secrets = "1.4.0"
[dev-packages]

189
Pipfile.lock generated
View File

@@ -1,7 +1,7 @@
{
"_meta": {
"hash": {
"sha256": "e77cc50f5e7584fdbbee29d2ef6480066993c5fd44ab6388bbef44f8add3e4f0"
"sha256": "02042e0bff2fe10e70314636dd3fe288654c6b0deb47a1270b0d1dbcf3660cd5"
},
"pipfile-spec": 6,
"requires": {
@@ -42,19 +42,18 @@
},
"boto3": {
"hashes": [
"sha256:6b8899542cff82becceb3498a2240bf77c96def0515b0a31f7f6a9d5b92e7a3d",
"sha256:748c055214c629744c34c7f94bfa888733dfac0b92e1daef9c243e1391ea4f53"
"sha256:2284a107d43f73b6007c7c8b946a8fd6f9baa6c97b5c956edc67d9be864def58"
],
"index": "pypi",
"version": "==1.24.96"
"version": "==1.25.3"
},
"botocore": {
"hashes": [
"sha256:e41a81a18511f2f9181b2a9ab302a55c0effecccbef846c55aad0c47bfdbefb9",
"sha256:fc0a13ef6042e890e361cf408759230f8574409bb51f81740d2e5d8ad5d1fbea"
"sha256:2c2604262e5ab35ea83e9d5cf8be267e7fcdab6c815a432cfe15f23d92ce723d",
"sha256:4ea45626d8c5875c12e5767aa637388ce81871162f494eaf7b3888e875de84b7"
],
"index": "pypi",
"version": "==1.27.96"
"version": "==1.28.3"
},
"certifi": {
"hashes": [
@@ -158,11 +157,11 @@
},
"colorama": {
"hashes": [
"sha256:854bf444933e37f5824ae7bfc1e98d5bce2ebe4160d46b5edf346a89358e99da",
"sha256:e6c6b4334fc50988a639d9b98aa429a0b57da6e17b9a44f0451f930b6967b7a4"
"sha256:08695f5cb7ed6e0531a20572697297273c47b8cae5a63ffc6d6ed5c201be6e44",
"sha256:4f1d9991f5acc0ca119f9d443620b77f9d6b33703e51011c16baf57afb285fc6"
],
"index": "pypi",
"version": "==0.4.5"
"version": "==0.4.6"
},
"coverage": {
"hashes": [
@@ -260,6 +259,14 @@
"index": "pypi",
"version": "==1.4.0"
},
"docker": {
"hashes": [
"sha256:19e330470af40167d293b0352578c1fa22d74b34d3edf5d4ff90ebc203bbb2f1",
"sha256:6e06ee8eca46cd88733df09b6b80c24a1a556bc5cb1e1ae54b2c239886d245cf"
],
"index": "pypi",
"version": "==6.0.0"
},
"dparse": {
"hashes": [
"sha256:8097076f1dd26c377f30d4745e6ec18fef42f3bf493933b842ac5bafad8c345f",
@@ -268,6 +275,14 @@
"markers": "python_version >= '3.5'",
"version": "==0.6.2"
},
"exceptiongroup": {
"hashes": [
"sha256:2ac84b496be68464a2da60da518af3785fff8b7ec0d090a581604bc870bdee41",
"sha256:affbabf13fb6e98988c38d9c5650e701569fe3c1de3233cfb61c5f33774690ad"
],
"markers": "python_version < '3.11'",
"version": "==1.0.0"
},
"execnet": {
"hashes": [
"sha256:8f694f3ba9cc92cab508b152dcfe322153975c29bda272e2fd7f3f00f36e47c5",
@@ -300,6 +315,14 @@
"markers": "python_version >= '3.5'",
"version": "==3.4"
},
"importlib-resources": {
"hashes": [
"sha256:c01b1b94210d9849f286b86bb51bcea7cd56dde0600d8db721d7b81330711668",
"sha256:ee17ec648f85480d523596ce49eae8ead87d5631ae1551f913c0100b5edd3437"
],
"markers": "python_version >= '3.7'",
"version": "==5.10.0"
},
"iniconfig": {
"hashes": [
"sha256:011e24c64b7f47f6ebd835bb12a743f2fbe9a26d4cecaa7f53bc4f35ee9da8b3",
@@ -323,6 +346,47 @@
"markers": "python_version >= '3.7'",
"version": "==1.0.1"
},
"jsonschema": {
"hashes": [
"sha256:165059f076eff6971bae5b742fc029a7b4ef3f9bcf04c14e4776a7605de14b23",
"sha256:9e74b8f9738d6a946d70705dc692b74b5429cd0960d58e79ffecfc43b2221eb9"
],
"markers": "python_version >= '3.7'",
"version": "==4.16.0"
},
"jsonschema-spec": {
"hashes": [
"sha256:1e525177574c23ae0f55cd62382632a083a0339928f0ca846a975a4da9851cec",
"sha256:780a22d517cdc857d9714a80d8349c546945063f20853ea32ba7f85bc643ec7d"
],
"markers": "python_full_version >= '3.7.0' and python_full_version < '4.0.0'",
"version": "==0.1.2"
},
"lazy-object-proxy": {
"hashes": [
"sha256:0c1c7c0433154bb7c54185714c6929acc0ba04ee1b167314a779b9025517eada",
"sha256:14010b49a2f56ec4943b6cf925f597b534ee2fe1f0738c84b3bce0c1a11ff10d",
"sha256:4e2d9f764f1befd8bdc97673261b8bb888764dfdbd7a4d8f55e4fbcabb8c3fb7",
"sha256:4fd031589121ad46e293629b39604031d354043bb5cdf83da4e93c2d7f3389fe",
"sha256:5b51d6f3bfeb289dfd4e95de2ecd464cd51982fe6f00e2be1d0bf94864d58acd",
"sha256:6850e4aeca6d0df35bb06e05c8b934ff7c533734eb51d0ceb2d63696f1e6030c",
"sha256:6f593f26c470a379cf7f5bc6db6b5f1722353e7bf937b8d0d0b3fba911998858",
"sha256:71d9ae8a82203511a6f60ca5a1b9f8ad201cac0fc75038b2dc5fa519589c9288",
"sha256:7e1561626c49cb394268edd00501b289053a652ed762c58e1081224c8d881cec",
"sha256:8f6ce2118a90efa7f62dd38c7dbfffd42f468b180287b748626293bf12ed468f",
"sha256:ae032743794fba4d171b5b67310d69176287b5bf82a21f588282406a79498891",
"sha256:afcaa24e48bb23b3be31e329deb3f1858f1f1df86aea3d70cb5c8578bfe5261c",
"sha256:b70d6e7a332eb0217e7872a73926ad4fdc14f846e85ad6749ad111084e76df25",
"sha256:c219a00245af0f6fa4e95901ed28044544f50152840c5b6a3e7b2568db34d156",
"sha256:ce58b2b3734c73e68f0e30e4e725264d4d6be95818ec0a0be4bb6bf9a7e79aa8",
"sha256:d176f392dbbdaacccf15919c77f526edf11a34aece58b55ab58539807b85436f",
"sha256:e20bfa6db17a39c706d24f82df8352488d2943a3b7ce7d4c22579cb89ca8896e",
"sha256:eac3a9a5ef13b332c059772fd40b4b1c3d45a3a2b05e33a361dee48e54a4dad0",
"sha256:eb329f8d8145379bf5dbe722182410fe8863d186e51bf034d2075eb8d85ee25b"
],
"markers": "python_version >= '3.7'",
"version": "==1.8.0"
},
"markupsafe": {
"hashes": [
"sha256:0212a68688482dc52b2d45013df70d169f542b7394fc744c02a57374a4207003",
@@ -388,6 +452,22 @@
"index": "pypi",
"version": "==4.0.8"
},
"openapi-schema-validator": {
"hashes": [
"sha256:34fbd14b7501abe25e64d7b4624a9db02cde1a578d285b3da6f34b290cdf0b3a",
"sha256:7cf27585dd7970b7257cefe48e1a3a10d4e34421831bdb472d96967433bc27bd"
],
"markers": "python_full_version >= '3.7.0' and python_full_version < '4.0.0'",
"version": "==0.3.4"
},
"openapi-spec-validator": {
"hashes": [
"sha256:4a8aee1e45b1ac868e07ab25e18828fe9837baddd29a8e20fdb3d3c61c8eea3d",
"sha256:8248634bad1f23cac5d5a34e193ab36e23914057ca69e91a1ede5af75552c465"
],
"index": "pypi",
"version": "==0.5.1"
},
"packaging": {
"hashes": [
"sha256:dd47c42927d89ab911e606518907cc2d3a1f38bbd026385970643f9c5b8ecfeb",
@@ -396,6 +476,14 @@
"markers": "python_version >= '3.6'",
"version": "==21.3"
},
"pathable": {
"hashes": [
"sha256:5c869d315be50776cc8a993f3af43e0c60dc01506b399643f919034ebf4cdcab",
"sha256:cdd7b1f9d7d5c8b8d3315dbf5a86b2596053ae845f056f57d97c0eefff84da14"
],
"markers": "python_full_version >= '3.7.0' and python_full_version < '4.0.0'",
"version": "==0.4.3"
},
"pbr": {
"hashes": [
"sha256:b97bc6695b2aff02144133c2e7399d5885223d42b7912ffaec2ca3898e673bfe",
@@ -412,14 +500,6 @@
"markers": "python_version >= '3.6'",
"version": "==1.0.0"
},
"py": {
"hashes": [
"sha256:51c75c4126074b472f746a24399ad32f6053d1b34b68d2fa41e558e6f4a98719",
"sha256:607c53218732647dff4acdfcd50cb62615cedf612e72d1724fb1a0cc6405b378"
],
"markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4'",
"version": "==1.11.0"
},
"pycparser": {
"hashes": [
"sha256:8ee45429555515e1f6b185e78100aea234072576aa43ab53aefcae078162fca9",
@@ -477,36 +557,55 @@
"markers": "python_full_version >= '3.6.8'",
"version": "==3.0.9"
},
"pyrsistent": {
"hashes": [
"sha256:0e3e1fcc45199df76053026a51cc59ab2ea3fc7c094c6627e93b7b44cdae2c8c",
"sha256:1b34eedd6812bf4d33814fca1b66005805d3640ce53140ab8bbb1e2651b0d9bc",
"sha256:4ed6784ceac462a7d6fcb7e9b663e93b9a6fb373b7f43594f9ff68875788e01e",
"sha256:5d45866ececf4a5fff8742c25722da6d4c9e180daa7b405dc0a2a2790d668c26",
"sha256:636ce2dc235046ccd3d8c56a7ad54e99d5c1cd0ef07d9ae847306c91d11b5fec",
"sha256:6455fc599df93d1f60e1c5c4fe471499f08d190d57eca040c0ea182301321286",
"sha256:6bc66318fb7ee012071b2792024564973ecc80e9522842eb4e17743604b5e045",
"sha256:7bfe2388663fd18bd8ce7db2c91c7400bf3e1a9e8bd7d63bf7e77d39051b85ec",
"sha256:7ec335fc998faa4febe75cc5268a9eac0478b3f681602c1f27befaf2a1abe1d8",
"sha256:914474c9f1d93080338ace89cb2acee74f4f666fb0424896fcfb8d86058bf17c",
"sha256:b568f35ad53a7b07ed9b1b2bae09eb15cdd671a5ba5d2c66caee40dbf91c68ca",
"sha256:cdfd2c361b8a8e5d9499b9082b501c452ade8bbf42aef97ea04854f4a3f43b22",
"sha256:d1b96547410f76078eaf66d282ddca2e4baae8964364abb4f4dcdde855cd123a",
"sha256:d4d61f8b993a7255ba714df3aca52700f8125289f84f704cf80916517c46eb96",
"sha256:d7a096646eab884bf8bed965bad63ea327e0d0c38989fc83c5ea7b8a87037bfc",
"sha256:df46c854f490f81210870e509818b729db4488e1f30f2a1ce1698b2295a878d1",
"sha256:e24a828f57e0c337c8d8bb9f6b12f09dfdf0273da25fda9e314f0b684b415a07",
"sha256:e4f3149fd5eb9b285d6bfb54d2e5173f6a116fe19172686797c056672689daf6",
"sha256:e92a52c166426efbe0d1ec1332ee9119b6d32fc1f0bbfd55d5c1088070e7fc1b",
"sha256:f87cc2863ef33c709e237d4b5f4502a62a00fab450c9e020892e8e2ede5847f5",
"sha256:fd8da6d0124efa2f67d86fa70c851022f87c98e205f0594e1fae044e7119a5a6"
],
"markers": "python_version >= '3.7'",
"version": "==0.18.1"
},
"pytest": {
"hashes": [
"sha256:1377bda3466d70b55e3f5cecfa55bb7cfcf219c7964629b967c37cf0bda818b7",
"sha256:4f365fec2dff9c1162f834d9f18af1ba13062db0c708bf7b946f8a5c76180c39"
"sha256:892f933d339f068883b6fd5a459f03d85bfcb355e4981e146d2c7616c21fef71",
"sha256:c4014eb40e10f11f355ad4e3c2fb2c6c6d1919c73f3b5a433de4708202cade59"
],
"index": "pypi",
"version": "==7.1.3"
},
"pytest-forked": {
"hashes": [
"sha256:8b67587c8f98cbbadfdd804539ed5455b6ed03802203485dd2f53c1422d7440e",
"sha256:bbbb6717efc886b9d64537b41fb1497cfaf3c9601276be8da2cccfea5a3c8ad8"
],
"markers": "python_version >= '3.6'",
"version": "==1.4.0"
"version": "==7.2.0"
},
"pytest-xdist": {
"hashes": [
"sha256:4580deca3ff04ddb2ac53eba39d76cb5dd5edeac050cb6fbc768b0dd712b4edf",
"sha256:6fe5c74fec98906deb8f2d2b616b5c782022744978e7bd4695d39c8f42d0ce65"
"sha256:688da9b814370e891ba5de650c9327d1a9d861721a524eb917e620eec3e90291",
"sha256:9feb9a18e1790696ea23e1434fa73b325ed4998b0e9fcb221f16fd1945e6df1b"
],
"index": "pypi",
"version": "==2.5.0"
"version": "==3.0.2"
},
"python-dateutil": {
"hashes": [
"sha256:0123cacc1627ae19ddf3c27a5de5bd67ee4586fbdd6440d9748f8abb483d3e86",
"sha256:961d03dc3453ebbc59dbdea9e4e11c5651520a876d0f4db161e8674aae935da9"
],
"markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3'",
"markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2'",
"version": "==2.8.2"
},
"pytz": {
@@ -622,7 +721,7 @@
"sha256:f01da5790e95815eb5a8a138508c01c758e5f5bc0ce4286c4f7028b8dd7ac3d0",
"sha256:f34019dced51047d6f70cb9383b2ae2853b7fc4dce65129a5acd49f4f9256646"
],
"markers": "python_version < '3.11' and platform_python_implementation == 'CPython'",
"markers": "platform_python_implementation == 'CPython' and python_version < '3.11'",
"version": "==0.2.7"
},
"s3transfer": {
@@ -661,7 +760,7 @@
"sha256:1e61c37477a1626458e36f7b1d82aa5c9b094fa4802892072e49de9c60c4c926",
"sha256:8abb2f1d86890a2dfb989f9a77cfcfd3e47c2a354b01111771326f8aa26e0254"
],
"markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3'",
"markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2'",
"version": "==1.16.0"
},
"smmap": {
@@ -692,7 +791,7 @@
"sha256:806143ae5bfb6a3c6e736a764057db0e6a0e05e338b5630894a5f779cabb4f9b",
"sha256:b3bda1d108d5dd99f4a20d24d9c348e91c4db7ab1b749200bded2f839ccbe68f"
],
"markers": "python_version >= '2.6' and python_version not in '3.0, 3.1, 3.2, 3.3'",
"markers": "python_version >= '2.6' and python_version not in '3.0, 3.1, 3.2'",
"version": "==0.10.2"
},
"tomli": {
@@ -700,7 +799,7 @@
"sha256:939de3e7a6161af0c887ef91b7d41a53e7c5a1ca976325f429cb46ea9bc30ecc",
"sha256:de526c12914f0c550d15924c62d72abc48d6fe7364aa87328337a31007fe8a4f"
],
"markers": "python_version >= '3.7'",
"markers": "python_version < '3.11'",
"version": "==2.0.1"
},
"types-toml": {
@@ -734,6 +833,14 @@
"index": "pypi",
"version": "==2.6"
},
"websocket-client": {
"hashes": [
"sha256:398909eb7e261f44b8f4bd474785b6ec5f5b499d4953342fe9755e01ef624090",
"sha256:f9611eb65c8241a67fb373bef040b3cf8ad377a9f6546a12b620b6511e8ea9ef"
],
"markers": "python_version >= '3.7'",
"version": "==1.4.1"
},
"werkzeug": {
"hashes": [
"sha256:7ea2d48322cc7c0f8b3a215ed73eabd7b5d75d0b50e31ab006286ccff9e00b8f",
@@ -757,6 +864,14 @@
],
"markers": "python_version >= '3.4'",
"version": "==0.13.0"
},
"zipp": {
"hashes": [
"sha256:4fcb6f278987a6605757302a6e40e896257570d11c51628968ccb2a47e80c6c1",
"sha256:7a7262fd930bd3e36c50b9a64897aec3fafff3dfdeec9623ae22b40e93f99bb8"
],
"markers": "python_version < '3.10'",
"version": "==3.10.0"
}
},
"develop": {}

View File

@@ -291,10 +291,16 @@ def generate_regional_clients(service: str, audit_info: AWS_Audit_Info) -> dict:
f = open_file(aws_services_json_file)
data = parse_json_file(f)
# Check if it is a subservice
if service == 'accessanalyzer':
if service == "accessanalyzer":
json_regions = data["services"]['iam']["regions"][audit_info.audited_partition]
elif service == "apigatewayv2":
json_regions = data["services"]["apigateway"]["regions"][
audit_info.audited_partition
]
else:
json_regions = data["services"][service]["regions"][audit_info.audited_partition]
json_regions = data["services"][service]["regions"][
audit_info.audited_partition
]
if audit_info.audited_regions: # Check for input aws audit_info.audited_regions
regions = list(
set(json_regions).intersection(audit_info.audited_regions)

View File

@@ -67,21 +67,3 @@ class Test_acm_certificates_expiration_check:
assert result[0].status == "PASS"
assert result[0].resource_id == "test.com"
assert result[0].resource_arn == certificate["CertificateArn"]
@mock_acm
def test_bad_response(self):
mock_client = mock.MagicMock()
with mock.patch(
"providers.aws.services.acm.acm_certificates_expiration_check.acm_certificates_expiration_check.acm_client",
new=mock_client,
):
# Test Check
from providers.aws.services.acm.acm_certificates_expiration_check.acm_certificates_expiration_check import (
acm_certificates_expiration_check,
)
check = acm_certificates_expiration_check()
result = check.execute()
assert len(result) == 0

View File

@@ -111,21 +111,3 @@ class Test_acm_certificates_transparency_logs_enabled:
)
assert result[0].resource_id == "test.com"
assert result[0].resource_arn == certificate["CertificateArn"]
@mock_acm
def test_bad_response(self):
mock_client = mock.MagicMock()
with mock.patch(
"providers.aws.services.acm.acm_certificates_transparency_logs_enabled.acm_certificates_transparency_logs_enabled.acm_client",
new=mock_client,
):
# Test Check
from providers.aws.services.acm.acm_certificates_transparency_logs_enabled.acm_certificates_transparency_logs_enabled import (
acm_certificates_transparency_logs_enabled,
)
check = acm_certificates_transparency_logs_enabled()
result = check.execute()
assert len(result) == 0

View File

@@ -0,0 +1,35 @@
{
"Provider": "aws",
"CheckID": "apigateway_authorizers_enabled",
"CheckTitle": "Check if API Gateway has configured authorizers.",
"CheckType": ["IAM"],
"ServiceName": "apigateway",
"SubServiceName": "rest_api",
"ResourceIdTemplate": "arn:partition:service:region:account-id:resource-id",
"Severity": "medium",
"ResourceType": "AwsApiGatewayRestApi",
"Description": "Check if API Gateway has configured authorizers.",
"Risk": "If no authorizer is enabled anyone can use the service.",
"RelatedUrl": "",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "https://docs.bridgecrew.io/docs/public_6-api-gateway-authorizer-set#cloudformation",
"Other": "",
"Terraform": "https://docs.bridgecrew.io/docs/public_6-api-gateway-authorizer-set#terraform"
},
"Recommendation": {
"Text": "Implement Amazon Cognito or a Lambda function to control access to your API.",
"Url": "https://docs.aws.amazon.com/apigateway/latest/developerguide/apigateway-use-lambda-authorizer.html"
}
},
"Categories": [],
"Tags": {
"Tag1Key": "value",
"Tag2Key": "value"
},
"DependsOn": [],
"RelatedTo": [],
"Notes": "",
"Compliance": []
}

View File

@@ -0,0 +1,21 @@
from lib.check.models import Check, Check_Report
from providers.aws.services.apigateway.apigateway_client import apigateway_client
class apigateway_authorizers_enabled(Check):
def execute(self):
findings = []
for rest_api in apigateway_client.rest_apis:
report = Check_Report(self.metadata)
report.region = rest_api.region
if rest_api.authorizer:
report.status = "PASS"
report.status_extended = f"API Gateway {rest_api.name} ID {rest_api.id} has authorizer configured."
report.resource_id = rest_api.name
else:
report.status = "FAIL"
report.status_extended = f"API Gateway {rest_api.name} ID {rest_api.id} has not authorizer configured."
report.resource_id = rest_api.name
findings.append(report)
return findings

View File

@@ -0,0 +1,119 @@
from unittest import mock
from boto3 import client
from moto import mock_apigateway, mock_iam, mock_lambda
from moto.core import DEFAULT_ACCOUNT_ID as ACCOUNT_ID
AWS_REGION = "us-east-1"
class Test_apigateway_authorizers_enabled:
@mock_apigateway
def test_apigateway_no_rest_apis(self):
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.apigateway.apigateway_service import APIGateway
current_audit_info.audited_partition = "aws"
with mock.patch(
"providers.aws.services.apigateway.apigateway_authorizers_enabled.apigateway_authorizers_enabled.apigateway_client",
new=APIGateway(current_audit_info),
):
# Test Check
from providers.aws.services.apigateway.apigateway_authorizers_enabled.apigateway_authorizers_enabled import (
apigateway_authorizers_enabled,
)
check = apigateway_authorizers_enabled()
result = check.execute()
assert len(result) == 0
@mock_apigateway
@mock_iam
@mock_lambda
def test_apigateway_one_rest_api_with_lambda_authorizer(self):
# Create APIGateway Mocked Resources
apigateway_client = client("apigateway", region_name=AWS_REGION)
lambda_client = client("lambda", region_name=AWS_REGION)
iam_client = client("iam")
# Create APIGateway Rest API
role_arn = iam_client.create_role(
RoleName="my-role",
AssumeRolePolicyDocument="some policy",
)["Role"]["Arn"]
rest_api = apigateway_client.create_rest_api(
name="test-rest-api",
)
authorizer = lambda_client.create_function(
FunctionName="lambda-authorizer",
Runtime="python3.7",
Role=role_arn,
Handler="lambda_function.lambda_handler",
Code={
"ImageUri": "123456789012.dkr.ecr.us-east-1.amazonaws.com/hello-world:latest"
},
)
apigateway_client.create_authorizer(
name="test",
restApiId=rest_api["id"],
type="TOKEN",
authorizerUri=f"arn:aws:apigateway:{apigateway_client.meta.region_name}:lambda:path/2015-03-31/functions/arn:aws:lambda:{apigateway_client.meta.region_name}:{ACCOUNT_ID}:function:{authorizer['FunctionName']}/invocations",
)
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.apigateway.apigateway_service import APIGateway
current_audit_info.audited_partition = "aws"
with mock.patch(
"providers.aws.services.apigateway.apigateway_authorizers_enabled.apigateway_authorizers_enabled.apigateway_client",
new=APIGateway(current_audit_info),
):
# Test Check
from providers.aws.services.apigateway.apigateway_authorizers_enabled.apigateway_authorizers_enabled import (
apigateway_authorizers_enabled,
)
check = apigateway_authorizers_enabled()
result = check.execute()
assert result[0].status == "PASS"
assert len(result) == 1
assert (
result[0].status_extended
== f"API Gateway test-rest-api ID {rest_api['id']} has authorizer configured."
)
assert result[0].resource_id == "test-rest-api"
@mock_apigateway
def test_apigateway_one_rest_api_without_lambda_authorizer(self):
# Create APIGateway Mocked Resources
apigateway_client = client("apigateway", region_name=AWS_REGION)
# Create APIGateway Rest API
rest_api = apigateway_client.create_rest_api(
name="test-rest-api",
)
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.apigateway.apigateway_service import APIGateway
current_audit_info.audited_partition = "aws"
with mock.patch(
"providers.aws.services.apigateway.apigateway_authorizers_enabled.apigateway_authorizers_enabled.apigateway_client",
new=APIGateway(current_audit_info),
):
# Test Check
from providers.aws.services.apigateway.apigateway_authorizers_enabled.apigateway_authorizers_enabled import (
apigateway_authorizers_enabled,
)
check = apigateway_authorizers_enabled()
result = check.execute()
assert result[0].status == "FAIL"
assert len(result) == 1
assert (
result[0].status_extended
== f"API Gateway test-rest-api ID {rest_api['id']} has not authorizer configured."
)
assert result[0].resource_id == "test-rest-api"

View File

@@ -0,0 +1,4 @@
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.apigateway.apigateway_service import APIGateway
apigateway_client = APIGateway(current_audit_info)

View File

@@ -0,0 +1,35 @@
{
"Provider": "aws",
"CheckID": "apigateway_client_certificate_enabled",
"CheckTitle": "Check if API Gateway has client certificate enabled to access your backend endpoint.",
"CheckType": ["Data Protection"],
"ServiceName": "apigateway",
"SubServiceName": "rest_api",
"ResourceIdTemplate": "arn:partition:service:region:account-id:resource-id",
"Severity": "medium",
"ResourceType": "AwsApiGatewayRestApi",
"Description": "Check if API Gateway has client certificate enabled to access your backend endpoint.",
"Risk": "Possible man in the middle attacks and other similar risks.",
"RelatedUrl": "",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Enable client certificate. Mutual TLS is recommended and commonly used for business-to-business (B2B) applications. It iss used in standards such as Open Banking. API Gateway now provides integrated mutual TLS authentication at no additional cost.",
"Url": "https://aws.amazon.com/blogs/compute/introducing-mutual-tls-authentication-for-amazon-api-gateway/"
}
},
"Categories": [],
"Tags": {
"Tag1Key": "value",
"Tag2Key": "value"
},
"DependsOn": [],
"RelatedTo": [],
"Notes": "",
"Compliance": []
}

View File

@@ -0,0 +1,23 @@
from lib.check.models import Check, Check_Report
from providers.aws.services.apigateway.apigateway_client import apigateway_client
class apigateway_client_certificate_enabled(Check):
def execute(self):
findings = []
for rest_api in apigateway_client.rest_apis:
for stage in rest_api.stages:
report = Check_Report(self.metadata)
if stage.client_certificate:
report.status = "PASS"
report.status_extended = f"API Gateway {rest_api.name} ID {rest_api.id} in stage {stage.name} has client certificate enabled."
report.resource_id = rest_api.name
report.region = rest_api.region
else:
report.status = "FAIL"
report.status_extended = f"API Gateway {rest_api.name} ID {rest_api.id} in stage {stage.name} has not client certificate enabled."
report.resource_id = rest_api.name
report.region = rest_api.region
findings.append(report)
return findings

View File

@@ -0,0 +1,140 @@
from unittest import mock
from boto3 import client
from moto import mock_apigateway
from providers.aws.services.apigateway.apigateway_service import Stage
AWS_REGION = "us-east-1"
class Test_apigateway_client_certificate_enabled:
@mock_apigateway
def test_apigateway_no_stages(self):
# Create APIGateway Mocked Resources
apigateway_client = client("apigateway", region_name=AWS_REGION)
# Create APIGateway Rest API
apigateway_client.create_rest_api(
name="test-rest-api",
)
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.apigateway.apigateway_service import APIGateway
current_audit_info.audited_partition = "aws"
with mock.patch(
"providers.aws.services.apigateway.apigateway_client_certificate_enabled.apigateway_client_certificate_enabled.apigateway_client",
new=APIGateway(current_audit_info),
):
# Test Check
from providers.aws.services.apigateway.apigateway_client_certificate_enabled.apigateway_client_certificate_enabled import (
apigateway_client_certificate_enabled,
)
check = apigateway_client_certificate_enabled()
result = check.execute()
assert len(result) == 0
@mock_apigateway
def test_apigateway_one_stage_without_certificate(self):
# Create APIGateway Mocked Resources
apigateway_client = client("apigateway", region_name=AWS_REGION)
# Create APIGateway Deployment Stage
rest_api = apigateway_client.create_rest_api(
name="test-rest-api",
)
# Get the rest api's root id
root_resource_id = apigateway_client.get_resources(restApiId=rest_api["id"])[
"items"
][0]["id"]
resource = apigateway_client.create_resource(
restApiId=rest_api["id"],
parentId=root_resource_id,
pathPart="test-path",
)
apigateway_client.put_method(
restApiId=rest_api["id"],
resourceId=resource["id"],
httpMethod="GET",
authorizationType="NONE",
)
apigateway_client.put_integration(
restApiId=rest_api["id"],
resourceId=resource["id"],
httpMethod="GET",
type="HTTP",
integrationHttpMethod="POST",
uri="http://test.com",
)
apigateway_client.create_deployment(
restApiId=rest_api["id"],
stageName="test",
)
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.apigateway.apigateway_service import APIGateway
current_audit_info.audited_partition = "aws"
with mock.patch(
"providers.aws.services.apigateway.apigateway_client_certificate_enabled.apigateway_client_certificate_enabled.apigateway_client",
new=APIGateway(current_audit_info),
):
# Test Check
from providers.aws.services.apigateway.apigateway_client_certificate_enabled.apigateway_client_certificate_enabled import (
apigateway_client_certificate_enabled,
)
check = apigateway_client_certificate_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"API Gateway test-rest-api ID {rest_api['id']} in stage test has not client certificate enabled."
)
assert result[0].resource_id == "test-rest-api"
@mock_apigateway
def test_apigateway_one_stage_with_certificate(self):
# Create APIGateway Mocked Resources
apigateway_client = client("apigateway", region_name=AWS_REGION)
# Create APIGateway Deployment Stage
rest_api = apigateway_client.create_rest_api(
name="test-rest-api",
)
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.apigateway.apigateway_service import APIGateway
current_audit_info.audited_partition = "aws"
with mock.patch(
"providers.aws.services.apigateway.apigateway_client_certificate_enabled.apigateway_client_certificate_enabled.apigateway_client",
new=APIGateway(current_audit_info),
) as service_client:
# Test Check
from providers.aws.services.apigateway.apigateway_client_certificate_enabled.apigateway_client_certificate_enabled import (
apigateway_client_certificate_enabled,
)
service_client.rest_apis[0].stages.append(
Stage(
"test",
logging=True,
client_certificate=True,
waf=True,
)
)
check = apigateway_client_certificate_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"API Gateway test-rest-api ID {rest_api['id']} in stage test has client certificate enabled."
)
assert result[0].resource_id == "test-rest-api"

View File

@@ -0,0 +1,35 @@
{
"Provider": "aws",
"CheckID": "apigateway_endpoint_public",
"CheckTitle": "Check if API Gateway endpoint is public or private.",
"CheckType": ["Infrastructure Security"],
"ServiceName": "apigateway",
"SubServiceName": "rest_api",
"ResourceIdTemplate": "arn:partition:service:region:account-id:resource-id",
"Severity": "medium",
"ResourceType": "AwsApiGatewayRestApi",
"Description": "Check if API Gateway endpoint is public or private.",
"Risk": "If accessible from internet without restrictions opens up attack / abuse surface for any malicious user.",
"RelatedUrl": "",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Verify that any public Api Gateway is protected and audited. Detective controls for common risks should be implemented.",
"Url": "https://d1.awsstatic.com/whitepapers/api-gateway-security.pdf?svrd_sip6"
}
},
"Categories": [],
"Tags": {
"Tag1Key": "value",
"Tag2Key": "value"
},
"DependsOn": [],
"RelatedTo": [],
"Notes": "",
"Compliance": []
}

View File

@@ -0,0 +1,23 @@
from lib.check.models import Check, Check_Report
from providers.aws.services.apigateway.apigateway_client import apigateway_client
class apigateway_endpoint_public(Check):
def execute(self):
findings = []
for rest_api in apigateway_client.rest_apis:
report = Check_Report(self.metadata)
report.region = rest_api.region
if rest_api.public_endpoint:
report.status = "FAIL"
report.status_extended = f"API Gateway {rest_api.name} ID {rest_api.id} is internet accesible."
report.resource_id = rest_api.name
else:
report.status = "PASS"
report.status_extended = (
f"API Gateway {rest_api.name} ID {rest_api.id} is private."
)
report.resource_id = rest_api.name
findings.append(report)
return findings

View File

@@ -0,0 +1,105 @@
from unittest import mock
from boto3 import client
from moto import mock_apigateway
AWS_REGION = "us-east-1"
class Test_apigateway_endpoint_public:
@mock_apigateway
def test_apigateway_no_rest_apis(self):
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.apigateway.apigateway_service import APIGateway
current_audit_info.audited_partition = "aws"
with mock.patch(
"providers.aws.services.apigateway.apigateway_endpoint_public.apigateway_endpoint_public.apigateway_client",
new=APIGateway(current_audit_info),
):
# Test Check
from providers.aws.services.apigateway.apigateway_endpoint_public.apigateway_endpoint_public import (
apigateway_endpoint_public,
)
check = apigateway_endpoint_public()
result = check.execute()
assert len(result) == 0
@mock_apigateway
def test_apigateway_one_private_rest_api(self):
# Create APIGateway Mocked Resources
apigateway_client = client("apigateway", region_name=AWS_REGION)
# Create APIGateway Deployment Stage
rest_api = apigateway_client.create_rest_api(
name="test-rest-api",
endpointConfiguration={
"types": [
"PRIVATE",
]
},
)
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.apigateway.apigateway_service import APIGateway
current_audit_info.audited_partition = "aws"
with mock.patch(
"providers.aws.services.apigateway.apigateway_endpoint_public.apigateway_endpoint_public.apigateway_client",
new=APIGateway(current_audit_info),
):
# Test Check
from providers.aws.services.apigateway.apigateway_endpoint_public.apigateway_endpoint_public import (
apigateway_endpoint_public,
)
check = apigateway_endpoint_public()
result = check.execute()
assert result[0].status == "PASS"
assert len(result) == 1
assert (
result[0].status_extended
== f"API Gateway test-rest-api ID {rest_api['id']} is private."
)
assert result[0].resource_id == "test-rest-api"
@mock_apigateway
def test_apigateway_one_public_rest_api(self):
# Create APIGateway Mocked Resources
apigateway_client = client("apigateway", region_name=AWS_REGION)
# Create APIGateway Deployment Stage
rest_api = apigateway_client.create_rest_api(
name="test-rest-api",
endpointConfiguration={
"types": [
"EDGE",
]
},
)
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.apigateway.apigateway_service import APIGateway
current_audit_info.audited_partition = "aws"
with mock.patch(
"providers.aws.services.apigateway.apigateway_endpoint_public.apigateway_endpoint_public.apigateway_client",
new=APIGateway(current_audit_info),
):
# Test Check
from providers.aws.services.apigateway.apigateway_endpoint_public.apigateway_endpoint_public import (
apigateway_endpoint_public,
)
check = apigateway_endpoint_public()
result = check.execute()
assert result[0].status == "FAIL"
assert len(result) == 1
assert (
result[0].status_extended
== f"API Gateway test-rest-api ID {rest_api['id']} is internet accesible."
)
assert result[0].resource_id == "test-rest-api"

View File

@@ -0,0 +1,35 @@
{
"Provider": "aws",
"CheckID": "apigateway_logging_enabled",
"CheckTitle": "Check if API Gateway has logging enabled.",
"CheckType": ["Logging and Monitoring"],
"ServiceName": "apigateway",
"SubServiceName": "rest_api",
"ResourceIdTemplate": "arn:partition:service:region:account-id:resource-id",
"Severity": "medium",
"ResourceType": "AwsApiGatewayRestApi",
"Description": "Check if API Gateway has logging enabled.",
"Risk": "If not enabled, monitoring of service use is not possible. Real-time monitoring of API calls can be achieved by directing CloudTrail Logs to CloudWatch Logs and establishing corresponding metric filters and alarms.",
"RelatedUrl": "",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "",
"Terraform": "https://docs.bridgecrew.io/docs/ensure-api-gateway-stage-have-logging-level-defined-as-appropiate#terraform"
},
"Recommendation": {
"Text": "Monitoring is an important part of maintaining the reliability, availability and performance of API Gateway and your AWS solutions. You should collect monitoring data from all of the parts of your AWS solution. CloudTrail provides a record of actions taken by a user, role, or an AWS service in API Gateway. Using the information collected by CloudTrail, you can determine the request that was made to API Gateway, the IP address from which the request was made, who made the request, etc.",
"Url": "https://docs.aws.amazon.com/apigateway/latest/developerguide/security-monitoring.html"
}
},
"Categories": [],
"Tags": {
"Tag1Key": "value",
"Tag2Key": "value"
},
"DependsOn": [],
"RelatedTo": [],
"Notes": "",
"Compliance": []
}

View File

@@ -0,0 +1,22 @@
from lib.check.models import Check, Check_Report
from providers.aws.services.apigateway.apigateway_client import apigateway_client
class apigateway_logging_enabled(Check):
def execute(self):
findings = []
for rest_api in apigateway_client.rest_apis:
report = Check_Report(self.metadata)
report.region = rest_api.region
for stage in rest_api.stages:
if stage.logging:
report.status = "PASS"
report.status_extended = f"API Gateway {rest_api.name} ID {rest_api.id} in stage {stage.name} has logging enabled."
report.resource_id = rest_api.name
else:
report.status = "FAIL"
report.status_extended = f"API Gateway {rest_api.name} ID {rest_api.id} in stage {stage.name} has logging disabled."
report.resource_id = rest_api.name
findings.append(report)
return findings

View File

@@ -0,0 +1,160 @@
from unittest import mock
from boto3 import client
from moto import mock_apigateway
AWS_REGION = "us-east-1"
class Test_apigateway_logging_enabled:
@mock_apigateway
def test_apigateway_no_rest_apis(self):
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.apigateway.apigateway_service import APIGateway
current_audit_info.audited_partition = "aws"
with mock.patch(
"providers.aws.services.apigateway.apigateway_logging_enabled.apigateway_logging_enabled.apigateway_client",
new=APIGateway(current_audit_info),
):
# Test Check
from providers.aws.services.apigateway.apigateway_logging_enabled.apigateway_logging_enabled import (
apigateway_logging_enabled,
)
check = apigateway_logging_enabled()
result = check.execute()
assert len(result) == 0
@mock_apigateway
def test_apigateway_one_rest_api_with_logging(self):
# Create APIGateway Mocked Resources
apigateway_client = client("apigateway", region_name=AWS_REGION)
rest_api = apigateway_client.create_rest_api(
name="test-rest-api",
)
# Get the rest api's root id
root_resource_id = apigateway_client.get_resources(restApiId=rest_api["id"])[
"items"
][0]["id"]
resource = apigateway_client.create_resource(
restApiId=rest_api["id"],
parentId=root_resource_id,
pathPart="test-path",
)
apigateway_client.put_method(
restApiId=rest_api["id"],
resourceId=resource["id"],
httpMethod="GET",
authorizationType="NONE",
)
apigateway_client.put_integration(
restApiId=rest_api["id"],
resourceId=resource["id"],
httpMethod="GET",
type="HTTP",
integrationHttpMethod="POST",
uri="http://test.com",
)
apigateway_client.create_deployment(
restApiId=rest_api["id"],
stageName="test",
)
apigateway_client.update_stage(
restApiId=rest_api["id"],
stageName="test",
patchOperations=[
{
"op": "replace",
"path": "/*/*/logging/loglevel",
"value": "INFO",
},
],
)
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.apigateway.apigateway_service import APIGateway
current_audit_info.audited_partition = "aws"
with mock.patch(
"providers.aws.services.apigateway.apigateway_logging_enabled.apigateway_logging_enabled.apigateway_client",
new=APIGateway(current_audit_info),
):
# Test Check
from providers.aws.services.apigateway.apigateway_logging_enabled.apigateway_logging_enabled import (
apigateway_logging_enabled,
)
check = apigateway_logging_enabled()
result = check.execute()
assert result[0].status == "PASS"
assert len(result) == 1
assert (
result[0].status_extended
== f"API Gateway test-rest-api ID {rest_api['id']} in stage test has logging enabled."
)
assert result[0].resource_id == "test-rest-api"
@mock_apigateway
def test_apigateway_one_rest_api_without_logging(self):
# Create APIGateway Mocked Resources
apigateway_client = client("apigateway", region_name=AWS_REGION)
# Create APIGateway Rest API
rest_api = apigateway_client.create_rest_api(
name="test-rest-api",
)
# Get the rest api's root id
root_resource_id = apigateway_client.get_resources(restApiId=rest_api["id"])[
"items"
][0]["id"]
resource = apigateway_client.create_resource(
restApiId=rest_api["id"],
parentId=root_resource_id,
pathPart="test-path",
)
apigateway_client.put_method(
restApiId=rest_api["id"],
resourceId=resource["id"],
httpMethod="GET",
authorizationType="NONE",
)
apigateway_client.put_integration(
restApiId=rest_api["id"],
resourceId=resource["id"],
httpMethod="GET",
type="HTTP",
integrationHttpMethod="POST",
uri="http://test.com",
)
apigateway_client.create_deployment(
restApiId=rest_api["id"],
stageName="test",
)
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.apigateway.apigateway_service import APIGateway
current_audit_info.audited_partition = "aws"
with mock.patch(
"providers.aws.services.apigateway.apigateway_logging_enabled.apigateway_logging_enabled.apigateway_client",
new=APIGateway(current_audit_info),
):
# Test Check
from providers.aws.services.apigateway.apigateway_logging_enabled.apigateway_logging_enabled import (
apigateway_logging_enabled,
)
check = apigateway_logging_enabled()
result = check.execute()
assert result[0].status == "FAIL"
assert len(result) == 1
assert (
result[0].status_extended
== f"API Gateway test-rest-api ID {rest_api['id']} in stage test has logging disabled."
)
assert result[0].resource_id == "test-rest-api"

View File

@@ -0,0 +1,144 @@
import threading
from dataclasses import dataclass
from lib.logger import logger
from providers.aws.aws_provider import generate_regional_clients
################## APIGateway
class APIGateway:
def __init__(self, audit_info):
self.service = "apigateway"
self.session = audit_info.audit_session
self.audited_account = audit_info.audited_account
self.regional_clients = generate_regional_clients(self.service, audit_info)
self.rest_apis = []
self.__threading_call__(self.__get_rest_apis__)
self.__get_authorizers__()
self.__get_rest_api__()
self.__get_stages__()
def __get_session__(self):
return self.session
def __threading_call__(self, call):
threads = []
for regional_client in self.regional_clients.values():
threads.append(threading.Thread(target=call, args=(regional_client,)))
for t in threads:
t.start()
for t in threads:
t.join()
def __get_rest_apis__(self, regional_client):
logger.info("APIGateway - Getting Rest APIs...")
try:
get_rest_apis_paginator = regional_client.get_paginator("get_rest_apis")
for page in get_rest_apis_paginator.paginate():
for apigw in page["items"]:
self.rest_apis.append(
RestAPI(
apigw["id"],
regional_client.region,
apigw["name"],
)
)
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}: {error}"
)
def __get_authorizers__(self):
logger.info("APIGateway - Getting Rest APIs authorizer...")
try:
for rest_api in self.rest_apis:
regional_client = self.regional_clients[rest_api.region]
authorizers = regional_client.get_authorizers(restApiId=rest_api.id)[
"items"
]
if authorizers:
rest_api.authorizer = True
except Exception as error:
logger.error(f"{error.__class__.__name__}: {error}")
def __get_rest_api__(self):
logger.info("APIGateway - Describing Rest API...")
try:
for rest_api in self.rest_apis:
regional_client = self.regional_clients[rest_api.region]
rest_api_info = regional_client.get_rest_api(restApiId=rest_api.id)
if rest_api_info["endpointConfiguration"]["types"] == ["PRIVATE"]:
rest_api.public_endpoint = False
except Exception as error:
logger.error(f"{error.__class__.__name__}: {error}")
def __get_stages__(self):
logger.info("APIGateway - Getting stages for Rest APIs...")
try:
for rest_api in self.rest_apis:
regional_client = self.regional_clients[rest_api.region]
stages = regional_client.get_stages(restApiId=rest_api.id)
for stage in stages["item"]:
waf = None
logging = False
client_certificate = False
if "webAclArn" in stage:
waf = stage["webAclArn"]
if "methodSettings" in stage:
if stage["methodSettings"]:
logging = True
if "clientCertificateId" in stage:
client_certificate = True
rest_api.stages.append(
Stage(
stage["stageName"],
logging,
client_certificate,
waf,
)
)
except Exception as error:
logger.error(f"{error.__class__.__name__}: {error}")
@dataclass
class Stage:
name: str
logging: bool
client_certificate: bool
waf: str
def __init__(
self,
name,
logging,
client_certificate,
waf,
):
self.name = name
self.logging = logging
self.client_certificate = client_certificate
self.waf = waf
@dataclass
class RestAPI:
id: str
region: str
name: str
authorizer: bool
public_endpoint: bool
stages: list[Stage]
def __init__(
self,
id,
region,
name,
):
self.id = id
self.region = region
self.name = name
self.authorizer = False
self.public_endpoint = True
self.stages = []

View File

@@ -0,0 +1,165 @@
from boto3 import client, session
from moto import mock_apigateway
from providers.aws.lib.audit_info.models import AWS_Audit_Info
from providers.aws.services.apigateway.apigateway_service import APIGateway
AWS_ACCOUNT_NUMBER = 123456789012
AWS_REGION = "us-east-1"
class Test_APIGateway_Service:
# Mocked Audit Info
def set_mocked_audit_info(self):
audit_info = AWS_Audit_Info(
original_session=None,
audit_session=session.Session(
profile_name=None,
botocore_session=None,
),
audited_account=AWS_ACCOUNT_NUMBER,
audited_user_id=None,
audited_partition="aws",
audited_identity_arn=None,
profile=None,
profile_region=None,
credentials=None,
assumed_role_info=None,
audited_regions=None,
organizations_metadata=None,
)
return audit_info
# Test APIGateway Service
@mock_apigateway
def test_service(self):
# APIGateway client for this test class
audit_info = self.set_mocked_audit_info()
apigateway = APIGateway(audit_info)
assert apigateway.service == "apigateway"
# Test APIGateway Client
@mock_apigateway
def test_client(self):
# APIGateway client for this test class
audit_info = self.set_mocked_audit_info()
apigateway = APIGateway(audit_info)
for client in apigateway.regional_clients.values():
assert client.__class__.__name__ == "APIGateway"
# Test APIGateway Session
@mock_apigateway
def test__get_session__(self):
# APIGateway client for this test class
audit_info = self.set_mocked_audit_info()
apigateway = APIGateway(audit_info)
assert apigateway.session.__class__.__name__ == "Session"
# Test APIGateway Session
@mock_apigateway
def test_audited_account(self):
# APIGateway client for this test class
audit_info = self.set_mocked_audit_info()
apigateway = APIGateway(audit_info)
assert apigateway.audited_account == AWS_ACCOUNT_NUMBER
# Test APIGateway Get Rest APIs
@mock_apigateway
def test__get_rest_apis__(self):
# Generate APIGateway Client
apigateway_client = client("apigateway", region_name=AWS_REGION)
# Create APIGateway Rest API
apigateway_client.create_rest_api(
name="test-rest-api",
)
# APIGateway client for this test class
audit_info = self.set_mocked_audit_info()
apigateway = APIGateway(audit_info)
assert len(apigateway.rest_apis) == len(
apigateway_client.get_rest_apis()["items"]
)
# Test APIGateway Get Authorizers
@mock_apigateway
def test__get_authorizers__(self):
# Generate APIGateway Client
apigateway_client = client("apigateway", region_name=AWS_REGION)
# Create APIGateway Rest API
rest_api = apigateway_client.create_rest_api(
name="test-rest-api",
)
# Create authorizer
apigateway_client.create_authorizer(
name="test-authorizer",
restApiId=rest_api["id"],
type="TOKEN",
)
# APIGateway client for this test class
audit_info = self.set_mocked_audit_info()
apigateway = APIGateway(audit_info)
assert apigateway.rest_apis[0].authorizer == True
# Test APIGateway Get Rest API
@mock_apigateway
def test__get_rest_api__(self):
# Generate APIGateway Client
apigateway_client = client("apigateway", region_name=AWS_REGION)
# Create private APIGateway Rest API
apigateway_client.create_rest_api(
name="test-rest-api", endpointConfiguration={"types": ["PRIVATE"]}
)
# APIGateway client for this test class
audit_info = self.set_mocked_audit_info()
apigateway = APIGateway(audit_info)
assert apigateway.rest_apis[0].public_endpoint == False
# Test APIGateway Get Stages
@mock_apigateway
def test__get_stages__(self):
# Generate APIGateway Client
apigateway_client = client("apigateway", region_name=AWS_REGION)
# Create APIGateway Rest API and a deployment stage
rest_api = apigateway_client.create_rest_api(
name="test-rest-api",
)
# Get the rest api's root id
root_resource_id = apigateway_client.get_resources(restApiId=rest_api["id"])[
"items"
][0]["id"]
resource = apigateway_client.create_resource(
restApiId=rest_api["id"],
parentId=root_resource_id,
pathPart="test-path",
)
apigateway_client.put_method(
restApiId=rest_api["id"],
resourceId=resource["id"],
httpMethod="GET",
authorizationType="NONE",
)
apigateway_client.put_integration(
restApiId=rest_api["id"],
resourceId=resource["id"],
httpMethod="GET",
type="HTTP",
integrationHttpMethod="POST",
uri="http://test.com",
)
apigateway_client.create_deployment(
restApiId=rest_api["id"],
stageName="test",
)
apigateway_client.update_stage(
restApiId=rest_api["id"],
stageName="test",
patchOperations=[
{
"op": "replace",
"path": "/*/*/logging/loglevel",
"value": "INFO",
},
],
)
audit_info = self.set_mocked_audit_info()
apigateway = APIGateway(audit_info)
assert apigateway.rest_apis[0].stages[0].logging == True

View File

@@ -0,0 +1,35 @@
{
"Provider": "aws",
"CheckID": "apigateway_waf_acl_attached",
"CheckTitle": "Check if API Gateway has a WAF ACL attached.",
"CheckType": ["Infrastructure Security"],
"ServiceName": "apigateway",
"SubServiceName": "rest_api",
"ResourceIdTemplate": "arn:partition:service:region:account-id:resource-id",
"Severity": "medium",
"ResourceType": "AwsApiGatewayRestApi",
"Description": "Check if API Gateway has a WAF ACL attached.",
"Risk": "Potential attacks and / or abuse of service, more even for even for internet reachable services.",
"RelatedUrl": "",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Use AWS WAF to protect your API Gateway API from common web exploits, such as SQL injection and cross-site scripting (XSS) attacks. These could affect API availability and performance, compromise security or consume excessive resources.",
"Url": "https://docs.aws.amazon.com/apigateway/latest/developerguide/security-monitoring.html"
}
},
"Categories": [],
"Tags": {
"Tag1Key": "value",
"Tag2Key": "value"
},
"DependsOn": [],
"RelatedTo": [],
"Notes": "",
"Compliance": []
}

View File

@@ -0,0 +1,22 @@
from lib.check.models import Check, Check_Report
from providers.aws.services.apigateway.apigateway_client import apigateway_client
class apigateway_waf_acl_attached(Check):
def execute(self):
findings = []
for rest_api in apigateway_client.rest_apis:
report = Check_Report(self.metadata)
report.region = rest_api.region
for stage in rest_api.stages:
if stage.waf:
report.status = "PASS"
report.status_extended = f"API Gateway {rest_api.name} ID {rest_api.id} in stage {stage.name} has {stage.waf} WAF ACL attached."
report.resource_id = rest_api.name
else:
report.status = "FAIL"
report.status_extended = f"API Gateway {rest_api.name} ID {rest_api.id} in stage {stage.name} has not WAF ACL attached."
report.resource_id = rest_api.name
findings.append(report)
return findings

View File

@@ -0,0 +1,166 @@
from unittest import mock
from boto3 import client
from moto import mock_apigateway, mock_wafv2
AWS_REGION = "us-east-1"
class Test_apigateway_waf_acl_attached:
@mock_apigateway
def test_apigateway_no_rest_apis(self):
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.apigateway.apigateway_service import APIGateway
current_audit_info.audited_partition = "aws"
with mock.patch(
"providers.aws.services.apigateway.apigateway_waf_acl_attached.apigateway_waf_acl_attached.apigateway_client",
new=APIGateway(current_audit_info),
):
# Test Check
from providers.aws.services.apigateway.apigateway_waf_acl_attached.apigateway_waf_acl_attached import (
apigateway_waf_acl_attached,
)
check = apigateway_waf_acl_attached()
result = check.execute()
assert len(result) == 0
@mock_apigateway
@mock_wafv2
def test_apigateway_one_rest_api_with_waf(self):
# Create APIGateway Mocked Resources
apigateway_client = client("apigateway", region_name=AWS_REGION)
waf_client = client("wafv2", region_name=AWS_REGION)
rest_api = apigateway_client.create_rest_api(
name="test-rest-api",
)
# Get the rest api's root id
root_resource_id = apigateway_client.get_resources(restApiId=rest_api["id"])[
"items"
][0]["id"]
resource = apigateway_client.create_resource(
restApiId=rest_api["id"],
parentId=root_resource_id,
pathPart="test-path",
)
apigateway_client.put_method(
restApiId=rest_api["id"],
resourceId=resource["id"],
httpMethod="GET",
authorizationType="NONE",
)
apigateway_client.put_integration(
restApiId=rest_api["id"],
resourceId=resource["id"],
httpMethod="GET",
type="HTTP",
integrationHttpMethod="POST",
uri="http://test.com",
)
apigateway_client.create_deployment(
restApiId=rest_api["id"],
stageName="test",
)
waf_arn = waf_client.create_web_acl(
Name="test",
Scope="REGIONAL",
DefaultAction={"Allow": {}},
VisibilityConfig={
"SampledRequestsEnabled": False,
"CloudWatchMetricsEnabled": False,
"MetricName": "idk",
},
)["Summary"]["ARN"]
waf_client.associate_web_acl(
WebACLArn=waf_arn,
ResourceArn=f"arn:aws:apigateway:{apigateway_client.meta.region_name}::/restapis/{rest_api['id']}/stages/test",
)
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.apigateway.apigateway_service import APIGateway
current_audit_info.audited_partition = "aws"
with mock.patch(
"providers.aws.services.apigateway.apigateway_waf_acl_attached.apigateway_waf_acl_attached.apigateway_client",
new=APIGateway(current_audit_info),
):
# Test Check
from providers.aws.services.apigateway.apigateway_waf_acl_attached.apigateway_waf_acl_attached import (
apigateway_waf_acl_attached,
)
check = apigateway_waf_acl_attached()
result = check.execute()
assert result[0].status == "PASS"
assert len(result) == 1
assert (
result[0].status_extended
== f"API Gateway test-rest-api ID {rest_api['id']} in stage test has {waf_arn} WAF ACL attached."
)
assert result[0].resource_id == "test-rest-api"
@mock_apigateway
def test_apigateway_one_rest_api_without_waf(self):
# Create APIGateway Mocked Resources
apigateway_client = client("apigateway", region_name=AWS_REGION)
# Create APIGateway Rest API
rest_api = apigateway_client.create_rest_api(
name="test-rest-api",
)
# Get the rest api's root id
root_resource_id = apigateway_client.get_resources(restApiId=rest_api["id"])[
"items"
][0]["id"]
resource = apigateway_client.create_resource(
restApiId=rest_api["id"],
parentId=root_resource_id,
pathPart="test-path",
)
apigateway_client.put_method(
restApiId=rest_api["id"],
resourceId=resource["id"],
httpMethod="GET",
authorizationType="NONE",
)
apigateway_client.put_integration(
restApiId=rest_api["id"],
resourceId=resource["id"],
httpMethod="GET",
type="HTTP",
integrationHttpMethod="POST",
uri="http://test.com",
)
apigateway_client.create_deployment(
restApiId=rest_api["id"],
stageName="test",
)
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.apigateway.apigateway_service import APIGateway
current_audit_info.audited_partition = "aws"
with mock.patch(
"providers.aws.services.apigateway.apigateway_waf_acl_attached.apigateway_waf_acl_attached.apigateway_client",
new=APIGateway(current_audit_info),
):
# Test Check
from providers.aws.services.apigateway.apigateway_waf_acl_attached.apigateway_waf_acl_attached import (
apigateway_waf_acl_attached,
)
check = apigateway_waf_acl_attached()
result = check.execute()
assert result[0].status == "FAIL"
assert len(result) == 1
assert (
result[0].status_extended
== f"API Gateway test-rest-api ID {rest_api['id']} in stage test has not WAF ACL attached."
)
assert result[0].resource_id == "test-rest-api"

View File

@@ -1,57 +0,0 @@
#!/usr/bin/env bash
# Prowler - the handy cloud security tool (copyright 2018) by Toni de la Fuente
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed
# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
CHECK_ID_extra7156="7.156"
CHECK_TITLE_extra7156="[extra7156] Checks if API Gateway V2 has Access Logging enabled"
CHECK_SCORED_extra7156="NOT_SCORED"
CHECK_CIS_LEVEL_extra7156="EXTRA"
CHECK_SEVERITY_extra7156="Medium"
CHECK_ASFF_RESOURCE_TYPE_extra7156="AwsApiGatewayV2Api"
CHECK_ALTERNATE_check7156="extra7156"
CHECK_SERVICENAME_extra7156="apigateway"
CHECK_RISK_extra7156="If not enabled the logging of API calls is not possible. This information is important for monitoring API access."
CHECK_REMEDIATION_extra7156="Enable Access Logging in the API stage."
CHECK_DOC_extra7156="https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-apigatewayv2-stage-accesslogsettings.html"
CHECK_CAF_EPIC_extra7156="Logging and Monitoring"
extra7156(){
# "Check if API Gateway V2 has Access Logging enabled "
for regx in $REGIONS; do
LIST_OF_API_GW=$($AWSCLI apigatewayv2 get-apis $PROFILE_OPT --region $regx --query Items[*].ApiId --output text 2>&1)
if [[ $(echo "$LIST_OF_API_GW" | grep -E 'AccessDenied|UnauthorizedOperation|AuthorizationError') ]]; then
textInfo "$regx: Access Denied trying to get APIs" "$regx"
continue
fi
if [[ $LIST_OF_API_GW ]];then
for apigwid in $LIST_OF_API_GW;do
API_GW_NAME=$($AWSCLI apigatewayv2 get-apis $PROFILE_OPT --region $regx --query "Items[?ApiId==\`$apigwid\`].Name" --output text)
CHECK_STAGES_NAME=$($AWSCLI apigatewayv2 get-stages $PROFILE_OPT --region $regx --api-id $apigwid --query "Items[*].StageName" --output text)
if [[ $CHECK_STAGES_NAME ]];then
for stagename in $CHECK_STAGES_NAME;do
CHECK_STAGE_METHOD_LOGGING=$($AWSCLI apigatewayv2 get-stages $PROFILE_OPT --region $regx --api-id $apigwid --query "Items[?StageName == \`$stagename\` ].AccessLogSettings.DestinationArn" --output text)
if [[ $CHECK_STAGE_METHOD_LOGGING ]];then
textPass "$regx: API Gateway V2 $API_GW_NAME ID: $apigwid with stage: $stagename has access logging enabled to $CHECK_STAGE_METHOD_LOGGING" "$regx" "$API_GW_NAME"
else
textFail "$regx: API Gateway V2 $API_GW_NAME ID: $apigwid with stage: $stagename has access logging disabled" "$regx" "$API_GW_NAME"
fi
done
else
textFail "$regx: No Stage name found for $API_GW_NAME" "$regx" "$API_GW_NAME"
fi
done
else
textInfo "$regx: No API Gateway found" "$regx"
fi
done
}

View File

@@ -1,47 +0,0 @@
#!/usr/bin/env bash
# Prowler - the handy cloud security tool (copyright 2019) by Toni de la Fuente
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed
# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
CHECK_ID_extra7157="7.157"
CHECK_TITLE_extra7157="[extra7157] Check if API Gateway V2 has configured authorizers"
CHECK_SCORED_extra7157="NOT_SCORED"
CHECK_CIS_LEVEL_extra7157="EXTRA"
CHECK_SEVERITY_extra7157="Medium"
CHECK_ASFF_RESOURCE_TYPE_extra7157="AwsApiGatewayV2Api"
CHECK_ALTERNATE_check746="extra7157"
CHECK_SERVICENAME_extra7157="apigateway"
CHECK_RISK_extra7157='If no authorizer is enabled anyone can use the service.'
CHECK_REMEDIATION_extra7157='Implement JWT or Lambda Function to control access to your API.'
CHECK_DOC_extra7157='https://docs.aws.amazon.com/apigatewayv2/latest/api-reference/apis-apiid-authorizers.html'
CHECK_CAF_EPIC_extra7157='IAM'
extra7157(){
for regx in $REGIONS; do
LIST_OF_API_GW=$($AWSCLI apigatewayv2 get-apis $PROFILE_OPT --region $regx --query "Items[*].ApiId" --output text 2>&1)
if [[ $(echo "$LIST_OF_API_GW" | grep -E 'AccessDenied|UnauthorizedOperation|AuthorizationError') ]]; then
textInfo "$regx: Access Denied trying to get APIs" "$regx"
continue
fi
if [[ $LIST_OF_API_GW ]];then
for api in $LIST_OF_API_GW; do
API_GW_NAME=$($AWSCLI apigatewayv2 get-apis $PROFILE_OPT --region $regx --query "Items[?ApiId==\`$api\`].Name" --output text)
AUTHORIZER_CONFIGURED=$($AWSCLI apigatewayv2 --region $regx get-authorizers --api-id $api --query "Items[*].AuthorizerType" --output text)
if [[ $AUTHORIZER_CONFIGURED ]]; then
textPass "$regx: API Gateway V2 $API_GW_NAME ID $api has authorizer configured" "$regx" "$API_GW_NAME"
else
textFail "$regx: API Gateway V2 $API_GW_NAME ID $api has no authorizer configured" "$regx" "$API_GW_NAME"
fi
done
else
textInfo "$regx: No API Gateways found" "$regx"
fi
done
}

View File

@@ -1,55 +0,0 @@
#!/usr/bin/env bash
# Prowler - the handy cloud security tool (copyright 2018) by Toni de la Fuente
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed
# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
CHECK_ID_extra722="7.22"
CHECK_TITLE_extra722="[extra722] Check if API Gateway has logging enabled"
CHECK_SCORED_extra722="NOT_SCORED"
CHECK_CIS_LEVEL_extra722="EXTRA"
CHECK_SEVERITY_extra722="Medium"
CHECK_ASFF_RESOURCE_TYPE_extra722="AwsApiGatewayRestApi"
CHECK_ALTERNATE_check722="extra722"
CHECK_SERVICENAME_extra722="apigateway"
CHECK_RISK_extra722='If not enabled; monitoring of service use is not possible. Real-time monitoring of API calls can be achieved by directing CloudTrail Logs to CloudWatch Logs and establishing corresponding metric filters and alarms.'
CHECK_REMEDIATION_extra722='Monitoring is an important part of maintaining the reliability; availability; and performance of API Gateway and your AWS solutions. You should collect monitoring data from all of the parts of your AWS solution. CloudTrail provides a record of actions taken by a user; role; or an AWS service in API Gateway. Using the information collected by CloudTrail; you can determine the request that was made to API Gateway; the IP address from which the request was made; who made the request; etc.'
CHECK_DOC_extra722='https://docs.aws.amazon.com/apigateway/latest/developerguide/security-monitoring.html'
CHECK_CAF_EPIC_extra722='Logging and Monitoring'
extra722(){
# "Check if API Gateway has logging enabled "
for regx in $REGIONS; do
LIST_OF_API_GW=$($AWSCLI apigateway get-rest-apis $PROFILE_OPT --region $regx --query items[*].id --output text 2>&1)
if [[ $(echo "$LIST_OF_API_GW" | grep -E 'AccessDenied|UnauthorizedOperation') ]]; then
textInfo "$regx: Access Denied trying to get rest APIs" "$regx"
continue
fi
if [[ $LIST_OF_API_GW ]];then
for apigwid in $LIST_OF_API_GW;do
API_GW_NAME=$($AWSCLI apigateway get-rest-apis $PROFILE_OPT --region $regx --query "items[?id==\`$apigwid\`].name" --output text)
CHECK_STAGES_NAME=$($AWSCLI apigateway get-stages $PROFILE_OPT --region $regx --rest-api-id $apigwid --query "item[*].stageName" --output text)
if [[ $CHECK_STAGES_NAME ]];then
for stagname in $CHECK_STAGES_NAME;do
CHECK_STAGE_METHOD_LOGGING=$($AWSCLI apigateway get-stages $PROFILE_OPT --region $regx --rest-api-id $apigwid --query "item[?stageName == \`$stagname\` ].methodSettings" --output text |awk '{ print $6 }' |egrep 'ERROR|INFO')
if [[ $CHECK_STAGE_METHOD_LOGGING ]];then
textPass "$regx: API Gateway $API_GW_NAME ID $apigwid in $stagname has logging enabled as $CHECK_STAGE_METHOD_LOGGING" "$regx" "$API_GW_NAME"
else
textFail "$regx: API Gateway $API_GW_NAME ID $apigwid in $stagname has logging disabled" "$regx" "$API_GW_NAME"
fi
done
else
textFail "$regx: No Stage name found for $API_GW_NAME" "$regx" "$API_GW_NAME"
fi
done
else
textInfo "$regx: No API Gateway found" "$regx"
fi
done
}

View File

@@ -1,52 +0,0 @@
#!/usr/bin/env bash
# Prowler - the handy cloud security tool (copyright 2019) by Toni de la Fuente
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed
# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
CHECK_ID_extra743="7.43"
CHECK_TITLE_extra743="[extra743] Check if API Gateway has client certificate enabled to access your backend endpoint"
CHECK_SCORED_extra743="NOT_SCORED"
CHECK_CIS_LEVEL_extra743="EXTRA"
CHECK_SEVERITY_extra743="Medium"
CHECK_ASFF_RESOURCE_TYPE_extra743="AwsApiGatewayRestApi"
CHECK_ALTERNATE_check743="extra743"
CHECK_SERVICENAME_extra743="apigateway"
CHECK_RISK_extra743='Possible man in the middle attacks and other similar risks.'
CHECK_REMEDIATION_extra743='Enable client certificate. Mutual TLS is recommended and commonly used for business-to-business (B2B) applications. Its used in standards such as Open Banking. API Gateway now provides integrated mutual TLS authentication at no additional cost.'
CHECK_DOC_extra743='https://aws.amazon.com/blogs/compute/introducing-mutual-tls-authentication-for-amazon-api-gateway/'
CHECK_CAF_EPIC_extra743='Data Protection'
extra743(){
for regx in $REGIONS; do
LIST_OF_REST_APIS=$($AWSCLI $PROFILE_OPT --region $regx apigateway get-rest-apis --query 'items[*].id' --output text 2>&1)
if [[ $(echo "$LIST_OF_REST_APIS" | grep -E 'AccessDenied|UnauthorizedOperation|AuthorizationError') ]]; then
textInfo "$regx: Access Denied trying to get rest APIs" "$regx"
continue
fi
if [[ $LIST_OF_REST_APIS ]];then
for api in $LIST_OF_REST_APIS; do
API_GW_NAME=$($AWSCLI apigateway get-rest-apis $PROFILE_OPT --region $regx --query "items[?id==\`$api\`].name" --output text)
LIST_OF_STAGES=$($AWSCLI $PROFILE_OPT --region $regx apigateway get-stages --rest-api-id $api --query 'item[*].stageName' --output text)
if [[ $LIST_OF_STAGES ]]; then
for stage in $LIST_OF_STAGES; do
CHECK_CERTIFICATE=$($AWSCLI $PROFILE_OPT --region $regx apigateway get-stages --rest-api-id $api --query "item[?stageName==\`$stage\`].clientCertificateId" --output text)
if [[ $CHECK_CERTIFICATE ]]; then
textPass "$regx: API Gateway $API_GW_NAME ID $api in $stage has client certificate enabled" "$regx" "$API_GW_NAME"
else
textFail "$regx: API Gateway $API_GW_NAME ID $api in $stage has not client certificate enabled" "$regx" "$API_GW_NAME"
fi
done
fi
done
else
textInfo "$regx: No API Gateways found" "$regx"
fi
done
}

View File

@@ -1,53 +0,0 @@
#!/usr/bin/env bash
# Prowler - the handy cloud security tool (copyright 2019) by Toni de la Fuente
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed
# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
CHECK_ID_extra744="7.44"
CHECK_TITLE_extra744="[extra744] Check if API Gateway has a WAF ACL attached"
CHECK_SCORED_extra744="NOT_SCORED"
CHECK_CIS_LEVEL_extra744="EXTRA"
CHECK_SEVERITY_extra744="Medium"
CHECK_ASFF_RESOURCE_TYPE_extra744="AwsApiGatewayRestApi"
CHECK_ALTERNATE_check744="extra744"
CHECK_ASFF_COMPLIANCE_TYPE_extra744="ens-mp.s.2.aws.waf.2"
CHECK_SERVICENAME_extra744="apigateway"
CHECK_RISK_extra744='Potential attacks and / or abuse of service; more even for even for internet reachable services.'
CHECK_REMEDIATION_extra744='Use AWS WAF to protect your API Gateway API from common web exploits; such as SQL injection and cross-site scripting (XSS) attacks. These could affect API availability and performance; compromise security; or consume excessive resources.'
CHECK_DOC_extra744='https://docs.aws.amazon.com/apigateway/latest/developerguide/apigateway-control-access-aws-waf.html'
CHECK_CAF_EPIC_extra744='Infrastructure Security'
extra744(){
for regx in $REGIONS; do
LIST_OF_REST_APIS=$($AWSCLI $PROFILE_OPT --region $regx apigateway get-rest-apis --query 'items[*].id' --output text 2>&1)
if [[ $(echo "$LIST_OF_REST_APIS" | grep -E 'AccessDenied|UnauthorizedOperation|AuthorizationError') ]]; then
textInfo "$regx: Access Denied trying to get rest APIs" "$regx"
continue
fi
if [[ $LIST_OF_REST_APIS ]];then
for api in $LIST_OF_REST_APIS; do
API_GW_NAME=$($AWSCLI apigateway get-rest-apis $PROFILE_OPT --region $regx --query "items[?id==\`$api\`].name" --output text)
LIST_OF_STAGES=$($AWSCLI $PROFILE_OPT --region $regx apigateway get-stages --rest-api-id $api --query 'item[*].stageName' --output text)
if [[ $LIST_OF_STAGES ]]; then
for stage in $LIST_OF_STAGES; do
CHECK_WAFACL=$($AWSCLI $PROFILE_OPT --region $regx apigateway get-stages --rest-api-id $api --query "item[?stageName==\`$stage\`].webAclArn" --output text)
if [[ $CHECK_WAFACL ]]; then
textPass "$regx: API Gateway $API_GW_NAME ID $api in $stage has $CHECK_WAFACL WAF ACL attached" "$regx" "$API_GW_NAME"
else
textFail "$regx: API Gateway $API_GW_NAME ID $api in $stage has not WAF ACL attached" "$regx" "$API_GW_NAME"
fi
done
fi
done
else
textInfo "$regx: No API Gateways found" "$regx"
fi
done
}

View File

@@ -1,54 +0,0 @@
#!/usr/bin/env bash
# Prowler - the handy cloud security tool (copyright 2019) by Toni de la Fuente
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed
# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
CHECK_ID_extra745="7.45"
CHECK_TITLE_extra745="[extra745] Check if API Gateway endpoint is public or private"
CHECK_SCORED_extra745="NOT_SCORED"
CHECK_CIS_LEVEL_extra745="EXTRA"
CHECK_SEVERITY_extra745="Medium"
CHECK_ASFF_RESOURCE_TYPE_extra745="AwsApiGatewayRestApi"
CHECK_ALTERNATE_check745="extra745"
CHECK_SERVICENAME_extra745="apigateway"
CHECK_RISK_extra745='If accessible from internet without restrictions opens up attack / abuse surface for any malicious user.'
CHECK_REMEDIATION_extra745='Verify that any public Api Gateway is protected and audited. Detective controls for common risks should be implemented.'
CHECK_DOC_extra745='https://d1.awsstatic.com/whitepapers/api-gateway-security.pdf?svrd_sip6'
CHECK_CAF_EPIC_extra745='Infrastructure Security'
extra745(){
for regx in $REGIONS; do
LIST_OF_REST_APIS=$($AWSCLI $PROFILE_OPT --region $regx apigateway get-rest-apis --query 'items[*].id' --output text 2>&1)
if [[ $(echo "$LIST_OF_REST_APIS" | grep -E 'AccessDenied|UnauthorizedOperation|AuthorizationError') ]]; then
textInfo "$regx: Access Denied trying to get rest APIs" "$regx"
continue
fi
if [[ $LIST_OF_REST_APIS ]];then
for api in $LIST_OF_REST_APIS; do
API_GW_NAME=$($AWSCLI apigateway get-rest-apis $PROFILE_OPT --region $regx --query "items[?id==\`$api\`].name" --output text)
ENDPOINT_CONFIG_TYPE=$($AWSCLI $PROFILE_OPT --region $regx apigateway get-rest-api --rest-api-id $api --query endpointConfiguration.types --output text)
if [[ $ENDPOINT_CONFIG_TYPE ]]; then
case $ENDPOINT_CONFIG_TYPE in
PRIVATE )
textPass "$regx: API Gateway $API_GW_NAME ID $api is set as $ENDPOINT_CONFIG_TYPE" "$regx" "$API_GW_NAME"
;;
REGIONAL )
textFail "$regx: API Gateway $API_GW_NAME ID $api is internet accesible as $ENDPOINT_CONFIG_TYPE" "$regx" "$API_GW_NAME"
;;
EDGE )
textFail "$regx: API Gateway $API_GW_NAME ID $api is internet accesible as $ENDPOINT_CONFIG_TYPE" "$regx" "$API_GW_NAME"
esac
fi
done
else
textInfo "$regx: No API Gateways found" "$regx"
fi
done
}

View File

@@ -1,47 +0,0 @@
#!/usr/bin/env bash
# Prowler - the handy cloud security tool (copyright 2019) by Toni de la Fuente
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed
# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
CHECK_ID_extra746="7.46"
CHECK_TITLE_extra746="[extra746] Check if API Gateway has configured authorizers"
CHECK_SCORED_extra746="NOT_SCORED"
CHECK_CIS_LEVEL_extra746="EXTRA"
CHECK_SEVERITY_extra746="Medium"
CHECK_ASFF_RESOURCE_TYPE_extra746="AwsApiGatewayRestApi"
CHECK_ALTERNATE_check746="extra746"
CHECK_SERVICENAME_extra746="apigateway"
CHECK_RISK_extra746='If no authorizer is enabled anyone can use the service.'
CHECK_REMEDIATION_extra746='Implement Amazon Cognito or a Lambda function to control access to your API.'
CHECK_DOC_extra746='https://docs.aws.amazon.com/apigateway/latest/developerguide/apigateway-use-lambda-authorizer.html'
CHECK_CAF_EPIC_extra746='IAM'
extra746(){
for regx in $REGIONS; do
LIST_OF_REST_APIS=$($AWSCLI $PROFILE_OPT --region $regx apigateway get-rest-apis --query 'items[*].id' --output text 2>&1)
if [[ $(echo "$LIST_OF_REST_APIS" | grep -E 'AccessDenied|UnauthorizedOperation|AuthorizationError') ]]; then
textInfo "$regx: Access Denied trying to get rest APIs" "$regx"
continue
fi
if [[ $LIST_OF_REST_APIS ]];then
for api in $LIST_OF_REST_APIS; do
API_GW_NAME=$($AWSCLI apigateway get-rest-apis $PROFILE_OPT --region $regx --query "items[?id==\`$api\`].name" --output text)
AUTHORIZER_CONFIGURED=$($AWSCLI $PROFILE_OPT --region $regx apigateway get-authorizers --rest-api-id $api --query items[*].type --output text)
if [[ $AUTHORIZER_CONFIGURED ]]; then
textPass "$regx: API Gateway $API_GW_NAME ID $api has authorizer configured" "$regx" "$API_GW_NAME"
else
textFail "$regx: API Gateway $API_GW_NAME ID $api has not authorizer configured" "$regx" "$API_GW_NAME"
fi
done
else
textInfo "$regx: No API Gateways found" "$regx"
fi
done
}

View File

@@ -0,0 +1,35 @@
{
"Provider": "aws",
"CheckID": "apigatewayv2_authorizers_enabled",
"CheckTitle": "Check if API Gateway V2 has configured authorizers.",
"CheckType": ["IAM"],
"ServiceName": "apigateway",
"SubServiceName": "rest_api",
"ResourceIdTemplate": "arn:partition:service:region:account-id:resource-id",
"Severity": "medium",
"ResourceType": "AwsApiGatewayV2Api",
"Description": "Check if API Gateway V2 has configured authorizers.",
"Risk": "If no authorizer is enabled anyone can use the service.",
"RelatedUrl": "",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "https://docs.bridgecrew.io/docs/bc_aws_logging_30#aws-console",
"Terraform": "https://docs.bridgecrew.io/docs/bc_aws_logging_30#cloudformation"
},
"Recommendation": {
"Text": "Implement Amazon Cognito or a Lambda function to control access to your API.",
"Url": "https://docs.aws.amazon.com/apigatewayv2/latest/api-reference/apis-apiid-authorizers.html"
}
},
"Categories": [],
"Tags": {
"Tag1Key": "value",
"Tag2Key": "value"
},
"DependsOn": [],
"RelatedTo": [],
"Notes": "",
"Compliance": []
}

View File

@@ -0,0 +1,22 @@
from lib.check.models import Check, Check_Report
from providers.aws.services.apigatewayv2.apigatewayv2_client import apigatewayv2_client
class apigatewayv2_access_logging_enabled(Check):
def execute(self):
findings = []
for api in apigatewayv2_client.apis:
report = Check_Report(self.metadata)
report.region = api.region
for stage in api.stages:
if stage.logging:
report.status = "PASS"
report.status_extended = f"API Gateway V2 {api.name} ID {api.id} in stage {stage.name} has access logging enabled."
report.resource_id = api.name
else:
report.status = "FAIL"
report.status_extended = f"API Gateway V2 {api.name} ID {api.id} in stage {stage.name} has access logging disabled."
report.resource_id = api.name
findings.append(report)
return findings

View File

@@ -0,0 +1,90 @@
from unittest import mock
import botocore
from boto3 import client
from mock import patch
from moto import mock_apigatewayv2
AWS_REGION = "us-east-1"
# Mocking ApiGatewayV2 Calls
make_api_call = botocore.client.BaseClient._make_api_call
# Rationale -> https://github.com/boto/botocore/blob/develop/botocore/client.py#L810:L816
#
# We have to mock every AWS API call using Boto3
def mock_make_api_call(self, operation_name, kwarg):
if operation_name == "GetAuthorizers":
return {"Items": [{"AuthorizerId": "authorizer-id", "Name": "test-authorizer"}]}
elif operation_name == "GetStages":
return {
"Items": [
{
"AccessLogSettings": {
"DestinationArn": "string",
"Format": "string",
},
"StageName": "test-stage",
}
]
}
return make_api_call(self, operation_name, kwarg)
@patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call)
class Test_apigatewayv2_access_logging_enabled:
@mock_apigatewayv2
def test_apigateway_no_apis(self):
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.apigatewayv2.apigatewayv2_service import (
ApiGatewayV2,
)
current_audit_info.audited_partition = "aws"
with mock.patch(
"providers.aws.services.apigatewayv2.apigatewayv2_access_logging_enabled.apigatewayv2_access_logging_enabled.apigatewayv2_client",
new=ApiGatewayV2(current_audit_info),
):
# Test Check
from providers.aws.services.apigatewayv2.apigatewayv2_access_logging_enabled.apigatewayv2_access_logging_enabled import (
apigatewayv2_access_logging_enabled,
)
check = apigatewayv2_access_logging_enabled()
result = check.execute()
assert len(result) == 0
@mock_apigatewayv2
def test_apigateway_one_api_with_logging_in_stage(self):
# Create ApiGatewayV2 Mocked Resources
apigatewayv2_client = client("apigatewayv2", region_name=AWS_REGION)
# Create ApiGatewayV2 API
api = apigatewayv2_client.create_api(Name="test-api", ProtocolType="HTTP")
# Get stages mock with stage with logging
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.apigatewayv2.apigatewayv2_service import (
ApiGatewayV2,
)
current_audit_info.audited_partition = "aws"
with mock.patch(
"providers.aws.services.apigatewayv2.apigatewayv2_access_logging_enabled.apigatewayv2_access_logging_enabled.apigatewayv2_client",
new=ApiGatewayV2(current_audit_info),
):
# Test Check
from providers.aws.services.apigatewayv2.apigatewayv2_access_logging_enabled.apigatewayv2_access_logging_enabled import (
apigatewayv2_access_logging_enabled,
)
check = apigatewayv2_access_logging_enabled()
result = check.execute()
assert result[0].status == "PASS"
assert len(result) == 1
assert (
result[0].status_extended
== f"API Gateway V2 test-api ID {api['ApiId']} in stage test-stage has access logging enabled."
)
assert result[0].resource_id == "test-api"

View File

@@ -0,0 +1,35 @@
{
"Provider": "aws",
"CheckID": "apigatewayv2_authorizers_enabled",
"CheckTitle": "Checks if API Gateway V2 has Access Logging enabled.",
"CheckType": ["Logging and Monitoring"],
"ServiceName": "apigateway",
"SubServiceName": "api",
"ResourceIdTemplate": "arn:partition:service:region:account-id:resource-id",
"Severity": "medium",
"ResourceType": "AwsApiGatewayV2Api",
"Description": "Checks if API Gateway V2 has Access Logging enabled.",
"Risk": "If not enabled the logging of API calls is not possible. This information is important for monitoring API access.",
"RelatedUrl": "",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Enable Access Logging in the API stage.",
"Url": "https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-apigatewayv2-stage-accesslogsettings.html"
}
},
"Categories": [],
"Tags": {
"Tag1Key": "value",
"Tag2Key": "value"
},
"DependsOn": [],
"RelatedTo": [],
"Notes": "",
"Compliance": []
}

View File

@@ -0,0 +1,23 @@
from lib.check.models import Check, Check_Report
from providers.aws.services.apigatewayv2.apigatewayv2_client import apigatewayv2_client
class apigatewayv2_authorizers_enabled(Check):
def execute(self):
findings = []
for api in apigatewayv2_client.apis:
report = Check_Report(self.metadata)
report.region = api.region
if api.authorizer:
report.status = "PASS"
report.status_extended = (
f"API Gateway V2 {api.name} ID {api.id} has authorizer configured."
)
report.resource_id = api.name
else:
report.status = "FAIL"
report.status_extended = f"API Gateway V2 {api.name} ID {api.id} has not authorizer configured."
report.resource_id = api.name
findings.append(report)
return findings

View File

@@ -0,0 +1,96 @@
from unittest import mock
import botocore
from boto3 import client
from mock import patch
from moto import mock_apigatewayv2
AWS_REGION = "us-east-1"
# Mocking ApiGatewayV2 Calls
make_api_call = botocore.client.BaseClient._make_api_call
# Rationale -> https://github.com/boto/botocore/blob/develop/botocore/client.py#L810:L816
#
# We have to mock every AWS API call using Boto3
def mock_make_api_call(self, operation_name, kwarg):
if operation_name == "GetAuthorizers":
return {"Items": [{"AuthorizerId": "authorizer-id", "Name": "test-authorizer"}]}
elif operation_name == "GetStages":
return {
"Items": [
{
"AccessLogSettings": {
"DestinationArn": "string",
"Format": "string",
},
"StageName": "test-stage",
}
]
}
return make_api_call(self, operation_name, kwarg)
@patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call)
class Test_apigatewayv2_authorizers_enabled:
@mock_apigatewayv2
def test_apigateway_no_apis(self):
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.apigatewayv2.apigatewayv2_service import (
ApiGatewayV2,
)
current_audit_info.audited_partition = "aws"
with mock.patch(
"providers.aws.services.apigatewayv2.apigatewayv2_authorizers_enabled.apigatewayv2_authorizers_enabled.apigatewayv2_client",
new=ApiGatewayV2(current_audit_info),
):
# Test Check
from providers.aws.services.apigatewayv2.apigatewayv2_authorizers_enabled.apigatewayv2_authorizers_enabled import (
apigatewayv2_authorizers_enabled,
)
check = apigatewayv2_authorizers_enabled()
result = check.execute()
assert len(result) == 0
@mock_apigatewayv2
def test_apigateway_one_api_with_authorizer(self):
# Create ApiGatewayV2 Mocked Resources
apigatewayv2_client = client("apigatewayv2", region_name=AWS_REGION)
# Create ApiGatewayV2 API
api = apigatewayv2_client.create_api(Name="test-api", ProtocolType="HTTP")
apigatewayv2_client.create_authorizer(
ApiId=api["ApiId"],
AuthorizerType="REQUEST",
IdentitySource=[],
Name="auth1",
AuthorizerPayloadFormatVersion="2.0",
)
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.apigatewayv2.apigatewayv2_service import (
ApiGatewayV2,
)
current_audit_info.audited_partition = "aws"
with mock.patch(
"providers.aws.services.apigatewayv2.apigatewayv2_authorizers_enabled.apigatewayv2_authorizers_enabled.apigatewayv2_client",
new=ApiGatewayV2(current_audit_info),
):
# Test Check
from providers.aws.services.apigatewayv2.apigatewayv2_authorizers_enabled.apigatewayv2_authorizers_enabled import (
apigatewayv2_authorizers_enabled,
)
check = apigatewayv2_authorizers_enabled()
result = check.execute()
assert result[0].status == "PASS"
assert len(result) == 1
assert (
result[0].status_extended
== f"API Gateway V2 test-api ID {api['ApiId']} has authorizer configured."
)
assert result[0].resource_id == "test-api"

View File

@@ -0,0 +1,4 @@
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.apigatewayv2.apigatewayv2_service import ApiGatewayV2
apigatewayv2_client = ApiGatewayV2(current_audit_info)

View File

@@ -0,0 +1,118 @@
import threading
from dataclasses import dataclass
from lib.logger import logger
from providers.aws.aws_provider import generate_regional_clients
################## ApiGatewayV2
class ApiGatewayV2:
def __init__(self, audit_info):
self.service = "apigatewayv2"
self.session = audit_info.audit_session
self.audited_account = audit_info.audited_account
self.regional_clients = generate_regional_clients(self.service, audit_info)
self.apis = []
self.__threading_call__(self.__get_apis__)
self.__get_authorizers__()
self.__get_stages__()
def __get_session__(self):
return self.session
def __threading_call__(self, call):
threads = []
for regional_client in self.regional_clients.values():
threads.append(threading.Thread(target=call, args=(regional_client,)))
for t in threads:
t.start()
for t in threads:
t.join()
def __get_apis__(self, regional_client):
logger.info("APIGatewayv2 - Getting APIs...")
try:
get_rest_apis_paginator = regional_client.get_paginator("get_apis")
for page in get_rest_apis_paginator.paginate():
for apigw in page["Items"]:
self.apis.append(
API(
apigw["ApiId"],
regional_client.region,
apigw["Name"],
)
)
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}: {error}"
)
def __get_authorizers__(self):
logger.info("APIGatewayv2 - Getting APIs authorizer...")
try:
for api in self.apis:
regional_client = self.regional_clients[api.region]
authorizers = regional_client.get_authorizers(ApiId=api.id)["Items"]
print(authorizers)
if authorizers:
api.authorizer = True
except Exception as error:
logger.error(
f"{error.__class__.__name__}:{error.__traceback__.tb_lineno} -- {error}"
)
def __get_stages__(self):
logger.info("APIGatewayv2 - Getting stages for APIs...")
try:
for api in self.apis:
regional_client = self.regional_clients[api.region]
stages = regional_client.get_stages(ApiId=api.id)
for stage in stages["Items"]:
logging = False
if "AccessLogSettings" in stage:
logging = True
api.stages.append(
Stage(
stage["StageName"],
logging,
)
)
except Exception as error:
logger.error(
f"{error.__class__.__name__}:{error.__traceback__.tb_lineno} -- {error}"
)
@dataclass
class Stage:
name: str
logging: bool
def __init__(
self,
name,
logging,
):
self.name = name
self.logging = logging
@dataclass
class API:
id: str
region: str
name: str
authorizer: bool
stages: list[Stage]
def __init__(
self,
id,
region,
name,
):
self.id = id
self.region = region
self.name = name
self.authorizer = False
self.stages = []

View File

@@ -0,0 +1,133 @@
import botocore
from boto3 import client, session
from mock import patch
from moto import mock_apigatewayv2
from providers.aws.lib.audit_info.models import AWS_Audit_Info
from providers.aws.services.apigatewayv2.apigatewayv2_service import ApiGatewayV2
AWS_ACCOUNT_NUMBER = 123456789012
AWS_REGION = "us-east-1"
# Mocking ApiGatewayV2 Calls
make_api_call = botocore.client.BaseClient._make_api_call
# Rationale -> https://github.com/boto/botocore/blob/develop/botocore/client.py#L810:L816
#
# We have to mock every AWS API call using Boto3
def mock_make_api_call(self, operation_name, kwarg):
if operation_name == "GetAuthorizers":
return {"Items": [{"AuthorizerId": "authorizer-id", "Name": "test-authorizer"}]}
elif operation_name == "GetStages":
return {
"Items": [
{
"AccessLogSettings": {
"DestinationArn": "string",
"Format": "string",
},
"StageName": "test-stage",
}
]
}
return make_api_call(self, operation_name, kwarg)
@patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call)
class Test_ApiGatewayV2_Service:
# Mocked Audit Info
def set_mocked_audit_info(self):
audit_info = AWS_Audit_Info(
original_session=None,
audit_session=session.Session(
profile_name=None,
botocore_session=None,
),
audited_account=AWS_ACCOUNT_NUMBER,
audited_user_id=None,
audited_partition="aws",
audited_identity_arn=None,
profile=None,
profile_region=None,
credentials=None,
assumed_role_info=None,
audited_regions=None,
organizations_metadata=None,
)
return audit_info
# Test ApiGatewayV2 Service
@mock_apigatewayv2
def test_service(self):
# ApiGatewayV2 client for this test class
audit_info = self.set_mocked_audit_info()
apigatewayv2 = ApiGatewayV2(audit_info)
assert apigatewayv2.service == "apigatewayv2"
# Test ApiGatewayV2 Client
@mock_apigatewayv2
def test_client(self):
# ApiGatewayV2 client for this test class
audit_info = self.set_mocked_audit_info()
apigatewayv2 = ApiGatewayV2(audit_info)
for client in apigatewayv2.regional_clients.values():
assert client.__class__.__name__ == "ApiGatewayV2"
# Test ApiGatewayV2 Session
@mock_apigatewayv2
def test__get_session__(self):
# ApiGatewayV2 client for this test class
audit_info = self.set_mocked_audit_info()
apigatewayv2 = ApiGatewayV2(audit_info)
assert apigatewayv2.session.__class__.__name__ == "Session"
# Test ApiGatewayV2 Session
@mock_apigatewayv2
def test_audited_account(self):
# ApiGatewayV2 client for this test class
audit_info = self.set_mocked_audit_info()
apigatewayv2 = ApiGatewayV2(audit_info)
assert apigatewayv2.audited_account == AWS_ACCOUNT_NUMBER
# Test ApiGatewayV2 Get APIs
@mock_apigatewayv2
def test__get_apis__(self):
# Generate ApiGatewayV2 Client
apigatewayv2_client = client("apigatewayv2", region_name=AWS_REGION)
# Create ApiGatewayV2 API
apigatewayv2_client.create_api(Name="test-api", ProtocolType="HTTP")
# ApiGatewayV2 client for this test class
audit_info = self.set_mocked_audit_info()
apigatewayv2 = ApiGatewayV2(audit_info)
assert len(apigatewayv2.apis) == len(apigatewayv2_client.get_apis()["Items"])
# Test ApiGatewayV2 Get Authorizers
@mock_apigatewayv2
def test__get_authorizers__(self):
# Generate ApiGatewayV2 Client
apigatewayv2_client = client("apigatewayv2", region_name=AWS_REGION)
# Create ApiGatewayV2 Rest API
api = apigatewayv2_client.create_api(Name="test-api", ProtocolType="HTTP")
# Create authorizer
apigatewayv2_client.create_authorizer(
ApiId=api["ApiId"],
AuthorizerType="REQUEST",
IdentitySource=[],
Name="auth1",
AuthorizerPayloadFormatVersion="2.0",
)
# ApiGatewayV2 client for this test class
audit_info = self.set_mocked_audit_info()
apigatewayv2 = ApiGatewayV2(audit_info)
assert apigatewayv2.apis[0].authorizer == True
# Test ApiGatewayV2 Get Stages
@mock_apigatewayv2
def test__get_stages__(self):
# Generate ApiGatewayV2 Client
apigatewayv2_client = client("apigatewayv2", region_name=AWS_REGION)
# Create ApiGatewayV2 Rest API and a deployment stage
apigatewayv2_client.create_api(Name="test-api", ProtocolType="HTTP")
audit_info = self.set_mocked_audit_info()
apigatewayv2 = ApiGatewayV2(audit_info)
assert apigatewayv2.apis[0].stages[0].logging == True

View File

@@ -84,20 +84,3 @@ class Test_ec2_elastic_ip_shodan:
result = check.execute()
assert len(result) == 1
@mock_ec2
def test_bad_response(self):
mock_client = mock.MagicMock()
with mock.patch(
"providers.aws.services.ec2.ec2_elastic_ip_shodan.ec2_elastic_ip_shodan.ec2_client",
new=mock_client,
):
# Test Check
from providers.aws.services.ec2.ec2_elastic_ip_shodan.ec2_elastic_ip_shodan import (
ec2_elastic_ip_shodan,
)
check = ec2_elastic_ip_shodan()
result = check.execute()
assert len(result) == 0