feat(Lambda): Service and checks (#1491)

This commit is contained in:
Pepe Fagoaga
2022-11-17 22:59:28 +01:00
committed by GitHub
parent 538496ed6b
commit 9954763356
42 changed files with 3187 additions and 1770 deletions

View File

@@ -0,0 +1,4 @@
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.awslambda.awslambda_service import Lambda
awslambda_client = Lambda(current_audit_info)

View File

@@ -0,0 +1,35 @@
{
"Provider": "aws",
"CheckID": "awslambda_function_invoke_api_operations_cloudtrail_logging_enabled",
"CheckTitle": "Check if Lambda functions invoke API operations are being recorded by CloudTrail.",
"CheckType": [],
"ServiceName": "lambda",
"SubServiceName": "",
"ResourceIdTemplate": "arn:partition:lambda:region:account-id:function/function-name",
"Severity": "low",
"ResourceType": "AwsLambdaFunction",
"Description": "Check if Lambda functions invoke API operations are being recorded by CloudTrail.",
"Risk": "If logs are not enabled; monitoring of service use and threat analysis is not possible.",
"RelatedUrl": "https://docs.aws.amazon.com/lambda/latest/dg/logging-using-cloudtrail.html",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Make sure you are logging information about Lambda operations. Create a lifecycle and use cases for each trail.",
"Url": "https://docs.aws.amazon.com/lambda/latest/dg/logging-using-cloudtrail.html"
}
},
"Categories": [],
"Tags": {
"Tag1Key": "value",
"Tag2Key": "value"
},
"DependsOn": [],
"RelatedTo": [],
"Notes": "",
"Compliance": []
}

View File

@@ -0,0 +1,38 @@
from lib.check.models import Check, Check_Report
from providers.aws.services.awslambda.awslambda_client import awslambda_client
from providers.aws.services.cloudtrail.cloudtrail_client import cloudtrail_client
class awslambda_function_invoke_api_operations_cloudtrail_logging_enabled(Check):
def execute(self):
findings = []
for function in awslambda_client.functions.values():
report = Check_Report(self.metadata)
report.region = function.region
report.resource_id = function.name
report.resource_arn = function.arn
report.status = "FAIL"
report.status_extended = (
f"Lambda function {function.name} is not recorded by CloudTrail"
)
lambda_recorded_cloudtrail = False
for trail in cloudtrail_client.trails:
for data_event in trail.data_events:
for resource in data_event["DataResources"]:
if (
resource["Type"] == "AWS::Lambda::Function"
and function.arn in resource["Values"]
):
lambda_recorded_cloudtrail = True
break
if lambda_recorded_cloudtrail:
break
if lambda_recorded_cloudtrail:
report.status = "PASS"
report.status_extended = f"Lambda function {function.name} is recorded by CloudTrail {trail.name}"
break
findings.append(report)
return findings

View File

@@ -0,0 +1,211 @@
# from datetime import datetime
# from unittest import mock
# from boto3 import session
# from moto.core import DEFAULT_ACCOUNT_ID
# from providers.aws.lib.audit_info.audit_info import AWS_Audit_Info
# from providers.aws.services.awslambda.awslambda_service import Function
# from providers.aws.services.cloudtrail.cloudtrail_service import Trail
# AWS_REGION = "us-east-1"
# class Test_awslambda_function_invoke_api_operations_cloudtrail_logging_enabled:
# # Mocked Audit Info
# def set_mocked_audit_info(self):
# audit_info = AWS_Audit_Info(
# original_session=None,
# audit_session=session.Session(
# profile_name=None,
# botocore_session=None,
# ),
# audited_account=None,
# audited_user_id=None,
# audited_partition="aws",
# audited_identity_arn=None,
# profile=None,
# profile_region=None,
# credentials=None,
# assumed_role_info=None,
# audited_regions=None,
# organizations_metadata=None,
# )
# return audit_info
# def test_no_functions(self):
# lambda_client = mock.MagicMock
# lambda_client.functions = {}
# cloudtrail_client = mock.MagicMock
# cloudtrail_client.trails = []
# with mock.patch(
# "providers.aws.services.awslambda.awslambda_service.Lambda",
# new=lambda_client,
# ), mock.patch(
# "providers.aws.lib.audit_info.audit_info.current_audit_info",
# self.set_mocked_audit_info(),
# ), mock.patch(
# "providers.aws.services.cloudtrail.cloudtrail_service.Cloudtrail",
# new=cloudtrail_client,
# ):
# # Test Check
# from providers.aws.services.awslambda.awslambda_function_invoke_api_operations_cloudtrail_logging_enabled.awslambda_function_invoke_api_operations_cloudtrail_logging_enabled import (
# awslambda_function_invoke_api_operations_cloudtrail_logging_enabled,
# )
# check = (
# awslambda_function_invoke_api_operations_cloudtrail_logging_enabled()
# )
# result = check.execute()
# assert len(result) == 0
# def test_lambda_not_recorded_by_cloudtrail(self):
# # Lambda Client
# lambda_client = mock.MagicMock
# function_name = "test-lambda"
# function_runtime = "python3.9"
# function_arn = (
# f"arn:aws:lambda:{AWS_REGION}:{DEFAULT_ACCOUNT_ID}:function/{function_name}"
# )
# lambda_client.functions = {
# function_name: Function(
# name=function_name,
# arn=function_arn,
# region=AWS_REGION,
# runtime=function_runtime,
# )
# }
# # CloudTrail Client
# cloudtrail_client = mock.MagicMock
# cloudtrail_client.trails = [
# Trail(
# name="test-trail",
# is_multiregion=False,
# home_region=AWS_REGION,
# arn="",
# region=AWS_REGION,
# is_logging=True,
# log_file_validation_enabled=True,
# latest_cloudwatch_delivery_time=datetime(2022, 1, 1),
# s3_bucket="",
# kms_key="",
# log_group_arn="",
# data_events=[
# {
# "ReadWriteType": "All",
# "IncludeManagementEvents": True,
# "DataResources": [],
# "ExcludeManagementEventSources": [],
# }
# ],
# )
# ]
# with mock.patch(
# "providers.aws.services.awslambda.awslambda_service.Lambda",
# new=lambda_client,
# ), mock.patch(
# "providers.aws.services.cloudtrail.cloudtrail_service.Cloudtrail",
# new=cloudtrail_client,
# ):
# # Test Check
# from providers.aws.services.awslambda.awslambda_function_invoke_api_operations_cloudtrail_logging_enabled.awslambda_function_invoke_api_operations_cloudtrail_logging_enabled import (
# awslambda_function_invoke_api_operations_cloudtrail_logging_enabled,
# )
# check = (
# awslambda_function_invoke_api_operations_cloudtrail_logging_enabled()
# )
# result = check.execute()
# assert len(result) == 1
# assert result[0].region == AWS_REGION
# assert result[0].resource_id == function_name
# assert result[0].resource_arn == function_arn
# assert result[0].status == "FAIL"
# assert (
# result[0].status_extended
# == f"Lambda function {function_name} is not recorded by CloudTrail"
# )
# def test_lambda_recorded_by_cloudtrail(self):
# # Lambda Client
# lambda_client = mock.MagicMock
# function_name = "test-lambda"
# function_runtime = "python3.9"
# function_arn = (
# f"arn:aws:lambda:{AWS_REGION}:{DEFAULT_ACCOUNT_ID}:function/{function_name}"
# )
# lambda_client.functions = {
# function_name: Function(
# name=function_name,
# arn=function_arn,
# region=AWS_REGION,
# runtime=function_runtime,
# )
# }
# # CloudTrail Client
# cloudtrail_client = mock.MagicMock
# trail_name = "test-trail"
# cloudtrail_client.trails = [
# Trail(
# name=trail_name,
# is_multiregion=False,
# home_region=AWS_REGION,
# arn="",
# region=AWS_REGION,
# is_logging=True,
# log_file_validation_enabled=True,
# latest_cloudwatch_delivery_time=datetime(2022, 1, 1),
# s3_bucket="",
# kms_key="",
# log_group_arn="",
# data_events=[
# {
# "ReadWriteType": "All",
# "IncludeManagementEvents": True,
# "DataResources": [
# {
# "Type": "AWS::Lambda::Function",
# "Values": [
# function_arn,
# ],
# },
# ],
# "ExcludeManagementEventSources": [],
# }
# ],
# )
# ]
# with mock.patch(
# "providers.aws.services.awslambda.awslambda_service.Lambda",
# new=lambda_client,
# ), mock.patch(
# "providers.aws.services.cloudtrail.cloudtrail_service.Cloudtrail",
# new=cloudtrail_client,
# ):
# #
# # Test Check
# from providers.aws.services.awslambda.awslambda_function_invoke_api_operations_cloudtrail_logging_enabled.awslambda_function_invoke_api_operations_cloudtrail_logging_enabled import (
# awslambda_function_invoke_api_operations_cloudtrail_logging_enabled,
# )
# check = (
# awslambda_function_invoke_api_operations_cloudtrail_logging_enabled()
# )
# result = check.execute()
# assert len(result) == 1
# assert result[0].region == AWS_REGION
# assert result[0].resource_id == function_name
# assert result[0].resource_arn == function_arn
# assert result[0].status == "PASS"
# assert (
# result[0].status_extended
# == f"Lambda function {function_name} is recorded by CloudTrail {trail_name}"
# )

View File

@@ -0,0 +1,35 @@
{
"Provider": "aws",
"CheckID": "awslambda_function_no_secrets_in_code",
"CheckTitle": "Find secrets in Lambda functions code.",
"CheckType": [],
"ServiceName": "lambda",
"SubServiceName": "",
"ResourceIdTemplate": "arn:partition:lambda:region:account-id:function/function-name",
"Severity": "critical",
"ResourceType": "AwsLambdaFunction",
"Description": "Find secrets in Lambda functions code.",
"Risk": "The use of a hard-coded password increases the possibility of password guessing. If hard-coded passwords are used; it is possible that malicious users gain access through the account in question.",
"RelatedUrl": "https://docs.aws.amazon.com/secretsmanager/latest/userguide/lambda-functions.html",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Use Secrets Manager to securely provide database credentials to Lambda functions and secure the databases as well as use the credentials to connect and query them without hardcoding the secrets in code or passing them through environmental variables.",
"Url": "https://docs.aws.amazon.com/secretsmanager/latest/userguide/lambda-functions.html"
}
},
"Categories": [],
"Tags": {
"Tag1Key": "value",
"Tag2Key": "value"
},
"DependsOn": [],
"RelatedTo": [],
"Notes": "",
"Compliance": []
}

View File

@@ -0,0 +1,40 @@
from lib.check.models import Check, Check_Report
from providers.aws.services.awslambda.awslambda_client import awslambda_client
import tempfile
import os
from detect_secrets import SecretsCollection
from detect_secrets.settings import default_settings
class awslambda_function_no_secrets_in_code(Check):
def execute(self):
findings = []
for function in awslambda_client.functions.values():
report = Check_Report(self.metadata)
report.region = function.region
report.resource_id = function.name
report.resource_arn = function.arn
report.status = "PASS"
report.status_extended = (
f"No secrets found in Lambda function {function.name} code"
)
with tempfile.TemporaryDirectory() as tmp_dir_name:
function.code.code_zip.extractall(tmp_dir_name)
# List all files
files_in_zip = next(os.walk(tmp_dir_name))[2]
for file in files_in_zip:
secrets = SecretsCollection()
with default_settings():
secrets.scan_file(f"{tmp_dir_name}/{file}")
if secrets.json():
report.status = "FAIL"
report.status_extended = f"Potential secret found in Lambda function {function.name} code"
break
findings.append(report)
return findings

View File

@@ -0,0 +1,124 @@
import zipfile
from unittest import mock
from moto.core import DEFAULT_ACCOUNT_ID
from providers.aws.services.awslambda.awslambda_service import Function, LambdaCode
from providers.aws.services.awslambda.awslambda_service_test import create_zip_file
AWS_REGION = "us-east-1"
class Test_awslambda_function_no_secrets_in_code:
def test_no_functions(self):
lambda_client = mock.MagicMock
lambda_client.functions = {}
with mock.patch(
"providers.aws.services.awslambda.awslambda_service.Lambda",
new=lambda_client,
):
# Test Check
from providers.aws.services.awslambda.awslambda_function_no_secrets_in_code.awslambda_function_no_secrets_in_code import (
awslambda_function_no_secrets_in_code,
)
check = awslambda_function_no_secrets_in_code()
result = check.execute()
assert len(result) == 0
def test_function_code_with_secrets(self):
lambda_client = mock.MagicMock
function_name = "test-lambda"
function_runtime = "nodejs4.3"
function_arn = (
f"arn:aws:lambda:{AWS_REGION}:{DEFAULT_ACCOUNT_ID}:function/{function_name}"
)
code_with_secrets = """
def lambda_handler(event, context):
db_password = "test-password"
print("custom log event")
return event
"""
lambda_client.functions = {
"function_name": Function(
name=function_name,
arn=function_arn,
region=AWS_REGION,
runtime=function_runtime,
code=LambdaCode(
location="",
code_zip=zipfile.ZipFile(create_zip_file(code_with_secrets)),
),
)
}
with mock.patch(
"providers.aws.services.awslambda.awslambda_service.Lambda",
new=lambda_client,
):
# Test Check
from providers.aws.services.awslambda.awslambda_function_no_secrets_in_code.awslambda_function_no_secrets_in_code import (
awslambda_function_no_secrets_in_code,
)
check = awslambda_function_no_secrets_in_code()
result = check.execute()
assert len(result) == 1
assert result[0].region == AWS_REGION
assert result[0].resource_id == function_name
assert result[0].resource_arn == function_arn
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Potential secret found in Lambda function {function_name} code"
)
def test_function_code_without_secrets(self):
lambda_client = mock.MagicMock
function_name = "test-lambda"
function_runtime = "nodejs4.3"
function_arn = (
f"arn:aws:lambda:{AWS_REGION}:{DEFAULT_ACCOUNT_ID}:function/{function_name}"
)
code_with_secrets = """
def lambda_handler(event, context):
print("custom log event")
return event
"""
lambda_client.functions = {
"function_name": Function(
name=function_name,
arn=function_arn,
region=AWS_REGION,
runtime=function_runtime,
code=LambdaCode(
location="",
code_zip=zipfile.ZipFile(create_zip_file(code_with_secrets)),
),
)
}
with mock.patch(
"providers.aws.services.awslambda.awslambda_service.Lambda",
new=lambda_client,
):
# Test Check
from providers.aws.services.awslambda.awslambda_function_no_secrets_in_code.awslambda_function_no_secrets_in_code import (
awslambda_function_no_secrets_in_code,
)
check = awslambda_function_no_secrets_in_code()
result = check.execute()
assert len(result) == 1
assert result[0].region == AWS_REGION
assert result[0].resource_id == function_name
assert result[0].resource_arn == function_arn
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"No secrets found in Lambda function {function_name} code"
)

View File

@@ -0,0 +1,35 @@
{
"Provider": "aws",
"CheckID": "awslambda_function_no_secrets_in_variables",
"CheckTitle": "Find secrets in Lambda functions variables.",
"CheckType": [],
"ServiceName": "lambda",
"SubServiceName": "",
"ResourceIdTemplate": "arn:partition:lambda:region:account-id:function/function-name",
"Severity": "critical",
"ResourceType": "AwsLambdaFunction",
"Description": "Find secrets in Lambda functions variables.",
"Risk": "The use of a hard-coded password increases the possibility of password guessing. If hard-coded passwords are used; it is possible that malicious users gain access through the account in question.",
"RelatedUrl": "https://docs.aws.amazon.com/secretsmanager/latest/userguide/lambda-functions.html",
"Remediation": {
"Code": {
"CLI": "https://docs.bridgecrew.io/docs/bc_aws_secrets_3#cli-command",
"NativeIaC": "https://docs.bridgecrew.io/docs/bc_aws_secrets_3#cloudformation",
"Other": "",
"Terraform": "https://docs.bridgecrew.io/docs/bc_aws_secrets_3#terraform"
},
"Recommendation": {
"Text": "Use Secrets Manager to securely provide database credentials to Lambda functions and secure the databases as well as use the credentials to connect and query them without hardcoding the secrets in code or passing them through environmental variables.",
"Url": "https://docs.aws.amazon.com/secretsmanager/latest/userguide/lambda-functions.html"
}
},
"Categories": [],
"Tags": {
"Tag1Key": "value",
"Tag2Key": "value"
},
"DependsOn": [],
"RelatedTo": [],
"Notes": "",
"Compliance": []
}

View File

@@ -0,0 +1,44 @@
from lib.check.models import Check, Check_Report
from providers.aws.services.awslambda.awslambda_client import awslambda_client
import tempfile
import json
from detect_secrets import SecretsCollection
from detect_secrets.settings import default_settings
import os
class awslambda_function_no_secrets_in_variables(Check):
def execute(self):
findings = []
for function in awslambda_client.functions.values():
report = Check_Report(self.metadata)
report.region = function.region
report.resource_id = function.name
report.resource_arn = function.arn
report.status = "PASS"
report.status_extended = (
f"No secrets found in Lambda function {function.name} variables"
)
if function.environment:
temp_env_data_file = tempfile.NamedTemporaryFile(delete=False)
temp_env_data_file.write(
bytes(
json.dumps(function.environment), encoding="raw_unicode_escape"
)
)
temp_env_data_file.close()
secrets = SecretsCollection()
with default_settings():
secrets.scan_file(temp_env_data_file.name)
if secrets.json():
report.status = "FAIL"
report.status_extended = f"Potential secret found in Lambda function {function.name} variables"
os.remove(temp_env_data_file.name)
findings.append(report)
return findings

View File

@@ -0,0 +1,146 @@
from unittest import mock
from moto.core import DEFAULT_ACCOUNT_ID
from providers.aws.services.awslambda.awslambda_service import Function
AWS_REGION = "us-east-1"
class Test_awslambda_function_no_secrets_in_variables:
def test_no_functions(self):
lambda_client = mock.MagicMock
lambda_client.functions = {}
with mock.patch(
"providers.aws.services.awslambda.awslambda_service.Lambda",
new=lambda_client,
):
# Test Check
from providers.aws.services.awslambda.awslambda_function_no_secrets_in_variables.awslambda_function_no_secrets_in_variables import (
awslambda_function_no_secrets_in_variables,
)
check = awslambda_function_no_secrets_in_variables()
result = check.execute()
assert len(result) == 0
def test_function_no_variables(self):
lambda_client = mock.MagicMock
function_name = "test-lambda"
function_runtime = "nodejs4.3"
function_arn = (
f"arn:aws:lambda:{AWS_REGION}:{DEFAULT_ACCOUNT_ID}:function/{function_name}"
)
lambda_client.functions = {
"function_name": Function(
name=function_name,
arn=function_arn,
region=AWS_REGION,
runtime=function_runtime,
)
}
with mock.patch(
"providers.aws.services.awslambda.awslambda_service.Lambda",
new=lambda_client,
):
# Test Check
from providers.aws.services.awslambda.awslambda_function_no_secrets_in_variables.awslambda_function_no_secrets_in_variables import (
awslambda_function_no_secrets_in_variables,
)
check = awslambda_function_no_secrets_in_variables()
result = check.execute()
assert len(result) == 1
assert result[0].region == AWS_REGION
assert result[0].resource_id == function_name
assert result[0].resource_arn == function_arn
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"No secrets found in Lambda function {function_name} variables"
)
def test_function_secrets_in_variables(self):
lambda_client = mock.MagicMock
function_name = "test-lambda"
function_runtime = "nodejs4.3"
function_arn = (
f"arn:aws:lambda:{AWS_REGION}:{DEFAULT_ACCOUNT_ID}:function/{function_name}"
)
lambda_client.functions = {
"function_name": Function(
name=function_name,
arn=function_arn,
region=AWS_REGION,
runtime=function_runtime,
environment={"db_password": "test-password"},
)
}
with mock.patch(
"providers.aws.services.awslambda.awslambda_service.Lambda",
new=lambda_client,
):
# Test Check
from providers.aws.services.awslambda.awslambda_function_no_secrets_in_variables.awslambda_function_no_secrets_in_variables import (
awslambda_function_no_secrets_in_variables,
)
check = awslambda_function_no_secrets_in_variables()
result = check.execute()
assert len(result) == 1
assert result[0].region == AWS_REGION
assert result[0].resource_id == function_name
assert result[0].resource_arn == function_arn
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Potential secret found in Lambda function {function_name} variables"
)
def test_function_no_secrets_in_variables(self):
lambda_client = mock.MagicMock
function_name = "test-lambda"
function_runtime = "nodejs4.3"
function_arn = (
f"arn:aws:lambda:{AWS_REGION}:{DEFAULT_ACCOUNT_ID}:function/{function_name}"
)
lambda_client.functions = {
"function_name": Function(
name=function_name,
arn=function_arn,
region=AWS_REGION,
runtime=function_runtime,
environment={"db_username": "test-user"},
)
}
with mock.patch(
"providers.aws.services.awslambda.awslambda_service.Lambda",
new=lambda_client,
):
# Test Check
from providers.aws.services.awslambda.awslambda_function_no_secrets_in_variables.awslambda_function_no_secrets_in_variables import (
awslambda_function_no_secrets_in_variables,
)
check = awslambda_function_no_secrets_in_variables()
result = check.execute()
assert len(result) == 1
assert result[0].region == AWS_REGION
assert result[0].resource_id == function_name
assert result[0].resource_arn == function_arn
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"No secrets found in Lambda function {function_name} variables"
)

View File

@@ -0,0 +1,35 @@
{
"Provider": "aws",
"CheckID": "awslambda_function_not_publicly_accessible",
"CheckTitle": "heck if Lambda functions have resource-based policy set as Public.",
"CheckType": [],
"ServiceName": "lambda",
"SubServiceName": "",
"ResourceIdTemplate": "arn:partition:lambda:region:account-id:function/function-name",
"Severity": "critical",
"ResourceType": "AwsLambdaFunction",
"Description": "Check if Lambda functions have resource-based policy set as Public.",
"Risk": "Publicly accessible services could expose sensitive data to bad actors.",
"RelatedUrl": "https://docs.aws.amazon.com/lambda/latest/dg/access-control-resource-based.html",
"Remediation": {
"Code": {
"CLI": "https://www.trendmicro.com/cloudoneconformity/knowledge-base/aws/Lambda/function-exposed.html",
"NativeIaC": "https://www.trendmicro.com/cloudoneconformity/knowledge-base/aws/Lambda/function-exposed.html",
"Other": "",
"Terraform": "https://www.trendmicro.com/cloudoneconformity/knowledge-base/aws/Lambda/function-exposed.html"
},
"Recommendation": {
"Text": "Grant usage permission on a per-resource basis and applying least privilege principle.",
"Url": "https://docs.aws.amazon.com/lambda/latest/dg/access-control-resource-based.html"
}
},
"Categories": [],
"Tags": {
"Tag1Key": "value",
"Tag2Key": "value"
},
"DependsOn": [],
"RelatedTo": [],
"Notes": "",
"Compliance": []
}

View File

@@ -0,0 +1,42 @@
from lib.check.models import Check, Check_Report
from providers.aws.services.awslambda.awslambda_client import awslambda_client
class awslambda_function_not_publicly_accessible(Check):
def execute(self):
findings = []
for function in awslambda_client.functions.values():
report = Check_Report(self.metadata)
report.region = function.region
report.resource_id = function.name
report.resource_arn = function.arn
report.status = "PASS"
report.status_extended = f"Lambda function {function.name} has a policy resource-based policy not public"
public_access = False
if function.policy:
for statement in function.policy["Statement"]:
# Only check allow statements
if statement["Effect"] == "Allow":
if (
"*" in statement["Principal"]
or (
"AWS" in statement["Principal"]
and "*" in statement["Principal"]["AWS"]
)
or (
"CanonicalUser" in statement["Principal"]
and "*" in statement["Principal"]["CanonicalUser"]
)
):
public_access = True
break
if public_access:
report.status = "FAIL"
report.status_extended = f"Lambda function {function.name} has a policy resource-based policy with public access"
findings.append(report)
return findings

View File

@@ -0,0 +1,189 @@
from unittest import mock
from moto.core import DEFAULT_ACCOUNT_ID
from providers.aws.services.awslambda.awslambda_service import Function
AWS_REGION = "us-east-1"
class Test_awslambda_function_not_publicly_accessible:
def test_no_functions(self):
lambda_client = mock.MagicMock
lambda_client.functions = {}
with mock.patch(
"providers.aws.services.awslambda.awslambda_service.Lambda",
new=lambda_client,
):
# Test Check
from providers.aws.services.awslambda.awslambda_function_not_publicly_accessible.awslambda_function_not_publicly_accessible import (
awslambda_function_not_publicly_accessible,
)
check = awslambda_function_not_publicly_accessible()
result = check.execute()
assert len(result) == 0
def test_function_public(self):
lambda_client = mock.MagicMock
function_name = "test-lambda"
function_runtime = "nodejs4.3"
function_arn = (
f"arn:aws:lambda:{AWS_REGION}:{DEFAULT_ACCOUNT_ID}:function/{function_name}"
)
lambda_policy = {
"Version": "2012-10-17",
"Statement": [
{
"Sid": "public-access",
"Principal": {"AWS": ["*", DEFAULT_ACCOUNT_ID]},
"Effect": "Allow",
"Action": [
"lambda:InvokeFunction",
],
"Resource": [function_arn],
}
],
}
lambda_client.functions = {
"function_name": Function(
name=function_name,
arn=function_arn,
region=AWS_REGION,
runtime=function_runtime,
policy=lambda_policy,
)
}
with mock.patch(
"providers.aws.services.awslambda.awslambda_service.Lambda",
new=lambda_client,
):
# Test Check
from providers.aws.services.awslambda.awslambda_function_not_publicly_accessible.awslambda_function_not_publicly_accessible import (
awslambda_function_not_publicly_accessible,
)
check = awslambda_function_not_publicly_accessible()
result = check.execute()
assert len(result) == 1
assert result[0].region == AWS_REGION
assert result[0].resource_id == function_name
assert result[0].resource_arn == function_arn
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Lambda function {function_name} has a policy resource-based policy with public access"
)
def test_function_not_public(self):
lambda_client = mock.MagicMock
function_name = "test-lambda"
function_runtime = "nodejs4.3"
function_arn = (
f"arn:aws:lambda:{AWS_REGION}:{DEFAULT_ACCOUNT_ID}:function/{function_name}"
)
lambda_policy = {
"Version": "2012-10-17",
"Statement": [
{
"Sid": "public-access",
"Principal": {"AWS": [DEFAULT_ACCOUNT_ID]},
"Effect": "Allow",
"Action": [
"lambda:InvokeFunction",
],
"Resource": [function_arn],
}
],
}
lambda_client.functions = {
"function_name": Function(
name=function_name,
arn=function_arn,
region=AWS_REGION,
runtime=function_runtime,
policy=lambda_policy,
)
}
with mock.patch(
"providers.aws.services.awslambda.awslambda_service.Lambda",
new=lambda_client,
):
# Test Check
from providers.aws.services.awslambda.awslambda_function_not_publicly_accessible.awslambda_function_not_publicly_accessible import (
awslambda_function_not_publicly_accessible,
)
check = awslambda_function_not_publicly_accessible()
result = check.execute()
assert len(result) == 1
assert result[0].region == AWS_REGION
assert result[0].resource_id == function_name
assert result[0].resource_arn == function_arn
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"Lambda function {function_name} has a policy resource-based policy not public"
)
def test_function_public_with_canonical(self):
lambda_client = mock.MagicMock
function_name = "test-lambda"
function_runtime = "nodejs4.3"
function_arn = (
f"arn:aws:lambda:{AWS_REGION}:{DEFAULT_ACCOUNT_ID}:function/{function_name}"
)
lambda_policy = {
"Version": "2012-10-17",
"Statement": [
{
"Sid": "public-access",
"Principal": {"CanonicalUser": ["*"]},
"Effect": "Allow",
"Action": [
"lambda:InvokeFunction",
],
"Resource": [function_arn],
}
],
}
lambda_client.functions = {
"function_name": Function(
name=function_name,
arn=function_arn,
region=AWS_REGION,
runtime=function_runtime,
policy=lambda_policy,
)
}
with mock.patch(
"providers.aws.services.awslambda.awslambda_service.Lambda",
new=lambda_client,
):
# Test Check
from providers.aws.services.awslambda.awslambda_function_not_publicly_accessible.awslambda_function_not_publicly_accessible import (
awslambda_function_not_publicly_accessible,
)
check = awslambda_function_not_publicly_accessible()
result = check.execute()
assert len(result) == 1
assert result[0].region == AWS_REGION
assert result[0].resource_id == function_name
assert result[0].resource_arn == function_arn
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Lambda function {function_name} has a policy resource-based policy with public access"
)

View File

@@ -0,0 +1,35 @@
{
"Provider": "aws",
"CheckID": "awslambda_function_url_cors_policy",
"CheckTitle": "Check Lambda Function URL CORS configuration.",
"CheckType": [],
"ServiceName": "lambda",
"SubServiceName": "",
"ResourceIdTemplate": "arn:partition:lambda:region:account-id:function/function-name",
"Severity": "medium",
"ResourceType": "AwsLambdaFunction",
"Description": "Check Lambda Function URL CORS configuration.",
"Risk": "Publicly accessible services could expose sensitive data to bad actors.",
"RelatedUrl": "https://docs.aws.amazon.com/secretsmanager/latest/userguide/lambda-functions.html",
"Remediation": {
"Code": {
"CLI": "aws lambda update-function-url-config --region AWS_REGION --function-name FUNCTION-NAME --auth-type AWS_IAM --cors 'AllowOrigins=https://www.example.com,AllowMethods=*,ExposeHeaders=keep-alive,MaxAge=3600,AllowCredentials=false'",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Grant usage permission on a per-resource basis and applying least privilege principle.",
"Url": "https://docs.aws.amazon.com/secretsmanager/latest/userguide/lambda-functions.html"
}
},
"Categories": [],
"Tags": {
"Tag1Key": "value",
"Tag2Key": "value"
},
"DependsOn": [],
"RelatedTo": [],
"Notes": "",
"Compliance": []
}

View File

@@ -0,0 +1,23 @@
from lib.check.models import Check, Check_Report
from providers.aws.services.awslambda.awslambda_client import awslambda_client
class awslambda_function_url_cors_policy(Check):
def execute(self):
findings = []
for function in awslambda_client.functions.values():
report = Check_Report(self.metadata)
report.region = function.region
report.resource_id = function.name
report.resource_arn = function.arn
if function.url_config:
if "*" in function.url_config.cors_config.allow_origins:
report.status = "FAIL"
report.status_extended = f"Lambda function {function.name} URL has a wide CORS configuration"
else:
report.status = "PASS"
report.status_extended = f"Lambda function {function.name} has not a wide CORS configuration"
findings.append(report)
return findings

View File

@@ -0,0 +1,163 @@
from unittest import mock
from moto.core import DEFAULT_ACCOUNT_ID
from providers.aws.services.awslambda.awslambda_service import (
AuthType,
Function,
URLConfig,
URLConfigCORS,
)
AWS_REGION = "us-east-1"
class Test_awslambda_function_url_cors_policy:
def test_no_functions(self):
lambda_client = mock.MagicMock
lambda_client.functions = {}
with mock.patch(
"providers.aws.services.awslambda.awslambda_service.Lambda",
new=lambda_client,
):
# Test Check
from providers.aws.services.awslambda.awslambda_function_url_cors_policy.awslambda_function_url_cors_policy import (
awslambda_function_url_cors_policy,
)
check = awslambda_function_url_cors_policy()
result = check.execute()
assert len(result) == 0
def test_function_cors_asterisk(self):
lambda_client = mock.MagicMock
function_name = "test-lambda"
function_runtime = "nodejs4.3"
function_arn = (
f"arn:aws:lambda:{AWS_REGION}:{DEFAULT_ACCOUNT_ID}:function/{function_name}"
)
lambda_client.functions = {
"function_name": Function(
name=function_name,
arn=function_arn,
region=AWS_REGION,
runtime=function_runtime,
url_config=URLConfig(
auth_type=AuthType.NONE,
url="",
cors_config=URLConfigCORS(allow_origins=["*"]),
),
)
}
with mock.patch(
"providers.aws.services.awslambda.awslambda_service.Lambda",
new=lambda_client,
):
# Test Check
from providers.aws.services.awslambda.awslambda_function_url_cors_policy.awslambda_function_url_cors_policy import (
awslambda_function_url_cors_policy,
)
check = awslambda_function_url_cors_policy()
result = check.execute()
assert len(result) == 1
assert result[0].region == AWS_REGION
assert result[0].resource_id == function_name
assert result[0].resource_arn == function_arn
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Lambda function {function_name} URL has a wide CORS configuration"
)
def test_function_cors_not_wide(self):
lambda_client = mock.MagicMock
function_name = "test-lambda"
function_runtime = "python3.9"
function_arn = (
f"arn:aws:lambda:{AWS_REGION}:{DEFAULT_ACCOUNT_ID}:function/{function_name}"
)
lambda_client.functions = {
"function_name": Function(
name=function_name,
arn=function_arn,
region=AWS_REGION,
runtime=function_runtime,
url_config=URLConfig(
auth_type=AuthType.AWS_IAM,
url="",
cors_config=URLConfigCORS(allow_origins=["https://example.com"]),
),
)
}
with mock.patch(
"providers.aws.services.awslambda.awslambda_service.Lambda",
new=lambda_client,
):
# Test Check
from providers.aws.services.awslambda.awslambda_function_url_cors_policy.awslambda_function_url_cors_policy import (
awslambda_function_url_cors_policy,
)
check = awslambda_function_url_cors_policy()
result = check.execute()
assert len(result) == 1
assert result[0].region == AWS_REGION
assert result[0].resource_id == function_name
assert result[0].resource_arn == function_arn
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"Lambda function {function_name} has not a wide CORS configuration"
)
def test_function_cors_wide_with_two_origins(self):
lambda_client = mock.MagicMock
function_name = "test-lambda"
function_runtime = "python3.9"
function_arn = (
f"arn:aws:lambda:{AWS_REGION}:{DEFAULT_ACCOUNT_ID}:function/{function_name}"
)
lambda_client.functions = {
"function_name": Function(
name=function_name,
arn=function_arn,
region=AWS_REGION,
runtime=function_runtime,
url_config=URLConfig(
auth_type=AuthType.AWS_IAM,
url="",
cors_config=URLConfigCORS(
allow_origins=["https://example.com", "*"]
),
),
)
}
with mock.patch(
"providers.aws.services.awslambda.awslambda_service.Lambda",
new=lambda_client,
):
# Test Check
from providers.aws.services.awslambda.awslambda_function_url_cors_policy.awslambda_function_url_cors_policy import (
awslambda_function_url_cors_policy,
)
check = awslambda_function_url_cors_policy()
result = check.execute()
assert len(result) == 1
assert result[0].region == AWS_REGION
assert result[0].resource_id == function_name
assert result[0].resource_arn == function_arn
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Lambda function {function_name} URL has a wide CORS configuration"
)

View File

@@ -0,0 +1,35 @@
{
"Provider": "aws",
"CheckID": "awslambda_function_url_public",
"CheckTitle": "Check Public Lambda Function URL.",
"CheckType": [],
"ServiceName": "lambda",
"SubServiceName": "",
"ResourceIdTemplate": "arn:partition:lambda:region:account-id:function/function-name",
"Severity": "high",
"ResourceType": "AwsLambdaFunction",
"Description": "Check Public Lambda Function URL.",
"Risk": "Publicly accessible services could expose sensitive data to bad actors.",
"RelatedUrl": "https://docs.aws.amazon.com/secretsmanager/latest/userguide/lambda-functions.html",
"Remediation": {
"Code": {
"CLI": "aws lambda update-function-url-config --region AWS_REGION --function-name FUNCTION-NAME --auth-type AWS_IAM",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Grant usage permission on a per-resource basis and applying least privilege principle.",
"Url": "https://docs.aws.amazon.com/secretsmanager/latest/userguide/lambda-functions.html"
}
},
"Categories": [],
"Tags": {
"Tag1Key": "value",
"Tag2Key": "value"
},
"DependsOn": [],
"RelatedTo": [],
"Notes": "",
"Compliance": []
}

View File

@@ -0,0 +1,25 @@
from lib.check.models import Check, Check_Report
from providers.aws.services.awslambda.awslambda_client import awslambda_client
from providers.aws.services.awslambda.awslambda_service import AuthType
class awslambda_function_url_public(Check):
def execute(self):
findings = []
for function in awslambda_client.functions.values():
report = Check_Report(self.metadata)
report.region = function.region
report.resource_id = function.name
report.resource_arn = function.arn
if function.url_config:
if function.url_config.auth_type == AuthType.AWS_IAM:
report.status = "PASS"
report.status_extended = f"Lambda function {function.name} has not a publicly accessible function URL"
else:
report.status = "FAIL"
report.status_extended = f"Lambda function {function.name} has a publicly accessible function URL"
findings.append(report)
return findings

View File

@@ -0,0 +1,118 @@
from unittest import mock
from moto.core import DEFAULT_ACCOUNT_ID
from providers.aws.services.awslambda.awslambda_service import (
AuthType,
Function,
URLConfig,
URLConfigCORS,
)
AWS_REGION = "us-east-1"
class Test_awslambda_function_url_public:
def test_no_functions(self):
lambda_client = mock.MagicMock
lambda_client.functions = {}
with mock.patch(
"providers.aws.services.awslambda.awslambda_service.Lambda",
new=lambda_client,
):
# Test Check
from providers.aws.services.awslambda.awslambda_function_url_public.awslambda_function_url_public import (
awslambda_function_url_public,
)
check = awslambda_function_url_public()
result = check.execute()
assert len(result) == 0
def test_function_public_url(self):
lambda_client = mock.MagicMock
function_name = "test-lambda"
function_runtime = "nodejs4.3"
function_arn = (
f"arn:aws:lambda:{AWS_REGION}:{DEFAULT_ACCOUNT_ID}:function/{function_name}"
)
lambda_client.functions = {
"function_name": Function(
name=function_name,
arn=function_arn,
region=AWS_REGION,
runtime=function_runtime,
url_config=URLConfig(
auth_type=AuthType.NONE,
url="",
cors_config=URLConfigCORS(allow_origins=[]),
),
)
}
with mock.patch(
"providers.aws.services.awslambda.awslambda_service.Lambda",
new=lambda_client,
):
# Test Check
from providers.aws.services.awslambda.awslambda_function_url_public.awslambda_function_url_public import (
awslambda_function_url_public,
)
check = awslambda_function_url_public()
result = check.execute()
assert len(result) == 1
assert result[0].region == AWS_REGION
assert result[0].resource_id == function_name
assert result[0].resource_arn == function_arn
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Lambda function {function_name} has a publicly accessible function URL"
)
def test_function_private_url(self):
lambda_client = mock.MagicMock
function_name = "test-lambda"
function_runtime = "python3.9"
function_arn = (
f"arn:aws:lambda:{AWS_REGION}:{DEFAULT_ACCOUNT_ID}:function/{function_name}"
)
lambda_client.functions = {
"function_name": Function(
name=function_name,
arn=function_arn,
region=AWS_REGION,
runtime=function_runtime,
url_config=URLConfig(
auth_type=AuthType.AWS_IAM,
url="",
cors_config=URLConfigCORS(allow_origins=[]),
),
)
}
with mock.patch(
"providers.aws.services.awslambda.awslambda_service.Lambda",
new=lambda_client,
):
# Test Check
from providers.aws.services.awslambda.awslambda_function_url_public.awslambda_function_url_public import (
awslambda_function_url_public,
)
check = awslambda_function_url_public()
result = check.execute()
assert len(result) == 1
assert result[0].region == AWS_REGION
assert result[0].resource_id == function_name
assert result[0].resource_arn == function_arn
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"Lambda function {function_name} has not a publicly accessible function URL"
)

View File

@@ -0,0 +1,35 @@
{
"Provider": "aws",
"CheckID": "awslambda_function_using_supported_runtimes",
"CheckTitle": "Find obsolete Lambda runtimes.",
"CheckType": [],
"ServiceName": "lambda",
"SubServiceName": "",
"ResourceIdTemplate": "arn:partition:lambda:region:account-id:function/function-name",
"Severity": "medium",
"ResourceType": "AwsLambdaFunction",
"Description": "Find obsolete Lambda runtimes.",
"Risk": "If you have functions running on a runtime that will be deprecated in the next 60 days; Lambda notifies you by email that you should prepare by migrating your function to a supported runtime. In some cases; such as security issues that require a backwards-incompatible update; or software that does not support a long-term support (LTS) schedule; advance notice might not be possible. After a runtime is deprecated; Lambda might retire it completely at any time by disabling invocation. Deprecated runtimes are not eligible for security updates or technical support.",
"RelatedUrl": "https://docs.aws.amazon.com/lambda/latest/dg/runtime-support-policy.html",
"Remediation": {
"Code": {
"CLI": "aws lambda update-function-configuration --region AWS-REGION --function-name FUNCTION-NAME --runtime 'RUNTIME'",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Test new runtimes as they are made available. Implement them in production as soon as possible.",
"Url": "https://docs.aws.amazon.com/lambda/latest/dg/runtime-support-policy.html"
}
},
"Categories": [],
"Tags": {
"Tag1Key": "value",
"Tag2Key": "value"
},
"DependsOn": [],
"RelatedTo": [],
"Notes": "",
"Compliance": []
}

View File

@@ -0,0 +1,24 @@
from lib.check.models import Check, Check_Report
from providers.aws.services.awslambda.awslambda_client import awslambda_client
from config.config import get_config_var
class awslambda_function_using_supported_runtimes(Check):
def execute(self):
findings = []
for function in awslambda_client.functions.values():
report = Check_Report(self.metadata)
report.region = function.region
report.resource_id = function.name
report.resource_arn = function.arn
if function.runtime in get_config_var("obsolete_lambda_runtimes"):
report.status = "FAIL"
report.status_extended = f"Lambda function {function.name} is using {function.runtime} which is obsolete"
else:
report.status = "PASS"
report.status_extended = f"Lambda function {function.name} is using {function.runtime} which is supported"
findings.append(report)
return findings

View File

@@ -0,0 +1,126 @@
from unittest import mock
from moto.core import DEFAULT_ACCOUNT_ID
from providers.aws.services.awslambda.awslambda_service import Function
AWS_REGION = "us-east-1"
def mock_get_config_var(config_var: str):
return [
"python3.6",
"python2.7",
"nodejs4.3",
"nodejs4.3-edge",
"nodejs6.10",
"nodejs",
"nodejs8.10",
"nodejs10.x",
"dotnetcore1.0",
"dotnetcore2.0",
"dotnetcore2.1",
"ruby2.5",
]
class Test_awslambda_function_using_supported_runtimes:
def test_no_functions(self):
lambda_client = mock.MagicMock
lambda_client.functions = {}
with mock.patch(
"providers.aws.services.awslambda.awslambda_service.Lambda",
new=lambda_client,
):
# Test Check
from providers.aws.services.awslambda.awslambda_function_using_supported_runtimes.awslambda_function_using_supported_runtimes import (
awslambda_function_using_supported_runtimes,
)
check = awslambda_function_using_supported_runtimes()
result = check.execute()
assert len(result) == 0
def test_function_obsolete_runtime(self):
lambda_client = mock.MagicMock
function_name = "test-lambda"
function_runtime = "nodejs4.3"
function_arn = (
f"arn:aws:lambda:{AWS_REGION}:{DEFAULT_ACCOUNT_ID}:function/{function_name}"
)
lambda_client.functions = {
"function_name": Function(
name=function_name,
arn=function_arn,
region=AWS_REGION,
runtime=function_runtime,
)
}
with mock.patch(
"providers.aws.services.awslambda.awslambda_service.Lambda",
new=lambda_client,
), mock.patch(
"providers.aws.services.awslambda.awslambda_function_using_supported_runtimes.awslambda_function_using_supported_runtimes.get_config_var",
new=mock_get_config_var,
):
# Test Check
from providers.aws.services.awslambda.awslambda_function_using_supported_runtimes.awslambda_function_using_supported_runtimes import (
awslambda_function_using_supported_runtimes,
)
check = awslambda_function_using_supported_runtimes()
result = check.execute()
assert len(result) == 1
assert result[0].region == AWS_REGION
assert result[0].resource_id == function_name
assert result[0].resource_arn == function_arn
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Lambda function {function_name} is using {function_runtime} which is obsolete"
)
def test_function_supported_runtime(self):
lambda_client = mock.MagicMock
function_name = "test-lambda"
function_runtime = "python3.9"
function_arn = (
f"arn:aws:lambda:{AWS_REGION}:{DEFAULT_ACCOUNT_ID}:function/{function_name}"
)
lambda_client.functions = {
"function_name": Function(
name=function_name,
arn=function_arn,
region=AWS_REGION,
runtime=function_runtime,
)
}
with mock.patch(
"providers.aws.services.awslambda.awslambda_service.Lambda",
new=lambda_client,
), mock.patch(
"providers.aws.services.awslambda.awslambda_function_using_supported_runtimes.awslambda_function_using_supported_runtimes.get_config_var",
new=mock_get_config_var,
):
# Test Check
from providers.aws.services.awslambda.awslambda_function_using_supported_runtimes.awslambda_function_using_supported_runtimes import (
awslambda_function_using_supported_runtimes,
)
check = awslambda_function_using_supported_runtimes()
result = check.execute()
assert len(result) == 1
assert result[0].region == AWS_REGION
assert result[0].resource_id == function_name
assert result[0].resource_arn == function_arn
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"Lambda function {function_name} is using {function_runtime} which is supported"
)

View File

@@ -0,0 +1,158 @@
import threading
from pydantic import BaseModel
from typing import Any
from lib.logger import logger
from providers.aws.aws_provider import generate_regional_clients
import requests
from enum import Enum
import io
import zipfile
import json
################## Lambda
class Lambda:
def __init__(self, audit_info):
self.service = "lambda"
self.session = audit_info.audit_session
self.audited_account = audit_info.audited_account
self.regional_clients = generate_regional_clients(self.service, audit_info)
self.functions = {}
self.__threading_call__(self.__list_functions__)
self.__threading_call__(self.__get_function__)
self.__threading_call__(self.__get_policy__)
self.__threading_call__(self.__get_function_url_config__)
def __get_session__(self):
return self.session
def __threading_call__(self, call):
threads = []
for regional_client in self.regional_clients.values():
threads.append(threading.Thread(target=call, args=(regional_client,)))
for t in threads:
t.start()
for t in threads:
t.join()
def __list_functions__(self, regional_client):
logger.info("Lambda - Listing Functions...")
try:
list_functions_paginator = regional_client.get_paginator("list_functions")
for page in list_functions_paginator.paginate():
for function in page["Functions"]:
lambda_name = function["FunctionName"]
lambda_arn = function["FunctionArn"]
lambda_runtime = function["Runtime"]
lambda_environment = function["Environment"]["Variables"]
self.functions[lambda_name] = Function(
name=lambda_name,
arn=lambda_arn,
runtime=lambda_runtime,
region=regional_client.region,
)
if "Environment" in function:
self.functions[lambda_name].environment = lambda_environment
except Exception as error:
logger.error(
f"{regional_client.region} --"
f" {error.__class__.__name__}[{error.__traceback__.tb_lineno}]:"
f" {error}"
)
def __get_function__(self, regional_client):
logger.info("Lambda - Getting Function...")
try:
for function in self.functions.values():
if function.region == regional_client.region:
function_information = regional_client.get_function(
FunctionName=function.name
)
code_location_uri = function_information["Code"]["Location"]
raw_code_zip = requests.get(code_location_uri).content
self.functions[function.name].code = LambdaCode(
location=code_location_uri,
code_zip=zipfile.ZipFile(io.BytesIO(raw_code_zip)),
)
except Exception as error:
logger.error(
f"{regional_client.region} --"
f" {error.__class__.__name__}[{error.__traceback__.tb_lineno}]:"
f" {error}"
)
def __get_policy__(self, regional_client):
logger.info("Lambda - Getting Policy...")
try:
for function in self.functions.values():
if function.region == regional_client.region:
function_policy = regional_client.get_policy(
FunctionName=function.name
)
self.functions[function.name].policy = json.loads(
function_policy["Policy"]
)
except Exception as error:
logger.error(
f"{regional_client.region} --"
f" {error.__class__.__name__}[{error.__traceback__.tb_lineno}]:"
f" {error}"
)
def __get_function_url_config__(self, regional_client):
logger.info("Lambda - Getting Function URL Config...")
try:
for function in self.functions.values():
if function.region == regional_client.region:
function_url_config = regional_client.get_function_url_config(
FunctionName=function.name
)
self.functions[function.name].url_config = URLConfig(
auth_type=function_url_config["AuthType"],
url=function_url_config["FunctionUrl"],
cors_config=URLConfigCORS(
allow_origins=function_url_config["Cors"]["AllowOrigins"]
),
)
except Exception as error:
logger.error(
f"{regional_client.region} --"
f" {error.__class__.__name__}[{error.__traceback__.tb_lineno}]:"
f" {error}"
)
class LambdaCode(BaseModel):
location: str
code_zip: Any
class AuthType(Enum):
NONE = "NONE"
AWS_IAM = "AWS_IAM"
class URLConfigCORS(BaseModel):
allow_origins: list[str]
class URLConfig(BaseModel):
auth_type: AuthType
url: str
cors_config: URLConfigCORS
class Function(BaseModel):
name: str
arn: str
runtime: str
environment: dict = None
region: str
policy: dict = None
code: LambdaCode = None
url_config: URLConfig = None

View File

@@ -0,0 +1,226 @@
from unittest.mock import patch
from moto.core import DEFAULT_ACCOUNT_ID
from boto3 import client, resource, session
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.awslambda.awslambda_service import Lambda, AuthType
from providers.aws.lib.audit_info.models import AWS_Audit_Info
from moto import mock_lambda, mock_iam, mock_s3
import zipfile
import io
import mock
from re import search
import tempfile
import os
# Mock Test Region
AWS_REGION = "eu-west-1"
def create_zip_file(code: str = "") -> io.BytesIO:
zip_output = io.BytesIO()
zip_file = zipfile.ZipFile(zip_output, "w", zipfile.ZIP_DEFLATED)
if not code:
zip_file.writestr(
"lambda_function.py",
"""
def lambda_handler(event, context):
print("custom log event")
return event
""",
)
else:
zip_file.writestr("lambda_function.py", code)
zip_file.close()
zip_output.seek(0)
return zip_output
def mock_request_get(_):
"""Mock requests.get() to get the Lambda Code in Zip Format"""
mock_resp = mock.MagicMock
mock_resp.status_code = 200
mock_resp.content = create_zip_file().read()
return mock_resp
# Mock generate_regional_clients()
def mock_generate_regional_clients(service, audit_info):
regional_client = audit_info.audit_session.client(service, region_name=AWS_REGION)
regional_client.region = AWS_REGION
return {AWS_REGION: regional_client}
@patch(
"providers.aws.services.awslambda.awslambda_service.generate_regional_clients",
new=mock_generate_regional_clients,
)
class Test_Lambda_Service:
def set_mocked_audit_info(self):
audit_info = AWS_Audit_Info(
original_session=None,
audit_session=session.Session(
profile_name=None,
botocore_session=None,
),
audited_account=DEFAULT_ACCOUNT_ID,
audited_user_id=None,
audited_partition="aws",
audited_identity_arn=None,
profile=None,
profile_region=None,
credentials=None,
assumed_role_info=None,
audited_regions=None,
organizations_metadata=None,
)
return audit_info
# Test Lambda Client
def test__get_client__(self):
awslambda = Lambda(current_audit_info)
assert awslambda.regional_clients[AWS_REGION].__class__.__name__ == "Lambda"
# Test Lambda Session
def test__get_session__(self):
awslambda = Lambda(current_audit_info)
assert awslambda.session.__class__.__name__ == "Session"
# Test Lambda Service
def test__get_service__(self):
awslambda = Lambda(current_audit_info)
assert awslambda.service == "lambda"
@mock_lambda
@mock_iam
@mock_s3
def test__list_functions__(self):
# Create IAM Lambda Role
iam_client = client("iam", region_name=AWS_REGION)
iam_role = iam_client.create_role(
RoleName="test-lambda-role",
AssumeRolePolicyDocument="test-policy",
Path="/",
)["Role"]["Arn"]
# Create S3 Bucket
s3_client = resource("s3", region_name=AWS_REGION)
s3_client.create_bucket(
Bucket="test-bucket",
CreateBucketConfiguration={"LocationConstraint": AWS_REGION},
)
# Create Test Lambda
lambda_client = client("lambda", region_name=AWS_REGION)
lambda_name = "test-lambda"
resp = lambda_client.create_function(
FunctionName=lambda_name,
Runtime="python3.7",
Role=iam_role,
Handler="lambda_function.lambda_handler",
Code={"ZipFile": create_zip_file().read()},
Description="test lambda function",
Timeout=3,
MemorySize=128,
PackageType="ZIP",
Publish=True,
VpcConfig={
"SecurityGroupIds": ["sg-123abc"],
"SubnetIds": ["subnet-123abc"],
},
Environment={"Variables": {"db-password": "test-password"}},
)
# Update Lambda Policy
lambda_policy = {
"Version": "2012-10-17",
"Id": "default",
"Statement": [
{
"Action": "lambda:GetFunction",
"Principal": "*",
"Effect": "Allow",
"Resource": f"arn:aws:lambda:{AWS_REGION}:{DEFAULT_ACCOUNT_ID}:function:{lambda_name}",
"Sid": "test",
}
],
}
_ = lambda_client.add_permission(
FunctionName=lambda_name,
StatementId="test",
Action="lambda:GetFunction",
Principal="*",
)
# Create Function URL Config
_ = lambda_client.create_function_url_config(
FunctionName=lambda_name,
AuthType=AuthType.AWS_IAM.value,
Cors={
"AllowCredentials": True,
"AllowHeaders": [
"string",
],
"AllowMethods": [
"string",
],
"AllowOrigins": [
"*",
],
"ExposeHeaders": [
"string",
],
"MaxAge": 123,
},
)
lambda_arn = resp["FunctionArn"]
with mock.patch(
"providers.aws.services.awslambda.awslambda_service.requests.get",
new=mock_request_get,
):
awslambda = Lambda(self.set_mocked_audit_info())
assert awslambda.functions
assert awslambda.functions[lambda_name].name == lambda_name
assert awslambda.functions[lambda_name].arn == lambda_arn
assert awslambda.functions[lambda_name].runtime == "python3.7"
assert awslambda.functions[lambda_name].environment == {
"db-password": "test-password"
}
assert awslambda.functions[lambda_name].region == AWS_REGION
assert awslambda.functions[lambda_name].policy == lambda_policy
assert awslambda.functions[lambda_name].code
assert search(
f"s3://awslambda-{AWS_REGION}-tasks.s3-{AWS_REGION}.amazonaws.com",
awslambda.functions[lambda_name].code.location,
)
assert awslambda.functions[lambda_name].url_config
assert (
awslambda.functions[lambda_name].url_config.auth_type
== AuthType.AWS_IAM
)
assert search(
"lambda-url.eu-west-1.on.aws",
awslambda.functions[lambda_name].url_config.url,
)
assert awslambda.functions[lambda_name].url_config.cors_config
assert awslambda.functions[
lambda_name
].url_config.cors_config.allow_origins == ["*"]
# Pending ZipFile tests
with tempfile.TemporaryDirectory() as tmp_dir_name:
awslambda.functions[lambda_name].code.code_zip.extractall(tmp_dir_name)
files_in_zip = next(os.walk(tmp_dir_name))[2]
assert len(files_in_zip) == 1
assert files_in_zip[0] == "lambda_function.py"
with open(f"{tmp_dir_name}/{files_in_zip[0]}", "r") as lambda_code_file:
_ = lambda_code_file
pass
# assert (
# lambda_code_file.read()
# == """
# def lambda_handler(event, context):
# print("custom log event")
# return event
# """
# )