Files
prowler/tests/providers/aws/services/awslambda/awslambda_service_test.py
Fennerr 8b5c995486 fix(lambda): memory leakage with lambda function code (#3167)
Co-authored-by: Justin Moorcroft <justin.moorcroft@mwrcybersec.com>
Co-authored-by: Pepe Fagoaga <pepe@verica.io>
2023-12-13 15:15:13 +01:00

260 lines
9.3 KiB
Python

import io
import os
import tempfile
import zipfile
from re import search
from unittest.mock import patch
import mock
from boto3 import client, resource
from moto import mock_iam, mock_lambda, mock_s3
from prowler.providers.aws.services.awslambda.awslambda_service import AuthType, Lambda
from tests.providers.aws.audit_info_utils import (
AWS_ACCOUNT_NUMBER,
AWS_REGION_EU_WEST_1,
AWS_REGION_US_EAST_1,
set_mocked_aws_audit_info,
)
LAMBDA_FUNCTION_CODE = """def lambda_handler(event, context):
print("custom log event")
return event
"""
def create_zip_file(code: str = "") -> io.BytesIO:
zip_output = io.BytesIO()
zip_file = zipfile.ZipFile(zip_output, "w", zipfile.ZIP_DEFLATED)
if not code:
zip_file.writestr(
"lambda_function.py",
LAMBDA_FUNCTION_CODE,
)
else:
zip_file.writestr("lambda_function.py", code)
zip_file.close()
zip_output.seek(0)
return zip_output
def mock_request_get(_):
"""Mock requests.get() to get the Lambda Code in Zip Format"""
mock_resp = mock.MagicMock
mock_resp.status_code = 200
mock_resp.content = create_zip_file().read()
return mock_resp
# Mock generate_regional_clients()
def mock_generate_regional_clients(service, audit_info):
regional_client_eu_west_1 = audit_info.audit_session.client(
service, region_name=AWS_REGION_EU_WEST_1
)
regional_client_us_east_1 = audit_info.audit_session.client(
service, region_name=AWS_REGION_US_EAST_1
)
regional_client_eu_west_1.region = AWS_REGION_EU_WEST_1
regional_client_us_east_1.region = AWS_REGION_US_EAST_1
return {
AWS_REGION_EU_WEST_1: regional_client_eu_west_1,
AWS_REGION_US_EAST_1: regional_client_us_east_1,
}
@patch(
"prowler.providers.aws.lib.service.service.generate_regional_clients",
new=mock_generate_regional_clients,
)
class Test_Lambda_Service:
# Test Lambda Client
def test__get_client__(self):
awslambda = Lambda(set_mocked_aws_audit_info([AWS_REGION_US_EAST_1]))
assert (
awslambda.regional_clients[AWS_REGION_EU_WEST_1].__class__.__name__
== "Lambda"
)
# Test Lambda Session
def test__get_session__(self):
awslambda = Lambda(set_mocked_aws_audit_info([AWS_REGION_US_EAST_1]))
assert awslambda.session.__class__.__name__ == "Session"
# Test Lambda Service
def test__get_service__(self):
awslambda = Lambda(set_mocked_aws_audit_info([AWS_REGION_US_EAST_1]))
assert awslambda.service == "lambda"
@mock_lambda
@mock_iam
@mock_s3
def test__list_functions__(self):
# Create IAM Lambda Role
iam_client = client("iam", region_name=AWS_REGION_EU_WEST_1)
iam_role = iam_client.create_role(
RoleName="test-lambda-role",
AssumeRolePolicyDocument="test-policy",
Path="/",
)["Role"]["Arn"]
# Create S3 Bucket
s3_client = resource("s3", region_name=AWS_REGION_EU_WEST_1)
s3_client.create_bucket(
Bucket="test-bucket",
CreateBucketConfiguration={"LocationConstraint": AWS_REGION_EU_WEST_1},
)
# Create Test Lambda 1
lambda_client = client("lambda", region_name=AWS_REGION_EU_WEST_1)
lambda_name_1 = "test-lambda-1"
resp = lambda_client.create_function(
FunctionName=lambda_name_1,
Runtime="python3.7",
Role=iam_role,
Handler="lambda_function.lambda_handler",
Code={"ZipFile": create_zip_file().read()},
Description="test lambda function",
Timeout=3,
MemorySize=128,
PackageType="ZIP",
Publish=True,
VpcConfig={
"SecurityGroupIds": ["sg-123abc"],
"SubnetIds": ["subnet-123abc"],
},
Environment={"Variables": {"db-password": "test-password"}},
Tags={"test": "test"},
)
lambda_arn_1 = resp["FunctionArn"]
# Update Lambda Policy
lambda_policy = {
"Version": "2012-10-17",
"Id": "default",
"Statement": [
{
"Action": "lambda:GetFunction",
"Principal": "*",
"Effect": "Allow",
"Resource": f"arn:aws:lambda:{AWS_REGION_EU_WEST_1}:{AWS_ACCOUNT_NUMBER}:function:{lambda_name_1}",
"Sid": "test",
}
],
}
_ = lambda_client.add_permission(
FunctionName=lambda_name_1,
StatementId="test",
Action="lambda:GetFunction",
Principal="*",
)
# Create Function URL Config
_ = lambda_client.create_function_url_config(
FunctionName=lambda_name_1,
AuthType=AuthType.AWS_IAM.value,
Cors={
"AllowCredentials": True,
"AllowHeaders": [
"string",
],
"AllowMethods": [
"string",
],
"AllowOrigins": [
"*",
],
"ExposeHeaders": [
"string",
],
"MaxAge": 123,
},
)
# Create Test Lambda 2 (with the same attributes but different region)
lambda_client_2 = client("lambda", region_name=AWS_REGION_US_EAST_1)
lambda_name_2 = "test-lambda-2"
resp_2 = lambda_client_2.create_function(
FunctionName=lambda_name_2,
Runtime="python3.7",
Role=iam_role,
Handler="lambda_function.lambda_handler",
Code={"ZipFile": create_zip_file().read()},
Description="test lambda function",
Timeout=3,
MemorySize=128,
PackageType="ZIP",
Publish=True,
VpcConfig={
"SecurityGroupIds": ["sg-123abc"],
"SubnetIds": ["subnet-123abc"],
},
Environment={"Variables": {"db-password": "test-password"}},
Tags={"test": "test"},
)
lambda_arn_2 = resp_2["FunctionArn"]
with mock.patch(
"prowler.providers.aws.services.awslambda.awslambda_service.requests.get",
new=mock_request_get,
):
awslambda = Lambda(
set_mocked_aws_audit_info(audited_regions=[AWS_REGION_US_EAST_1])
)
assert awslambda.functions
assert len(awslambda.functions) == 2
# Lambda 1
assert awslambda.functions[lambda_arn_1].name == lambda_name_1
assert awslambda.functions[lambda_arn_1].arn == lambda_arn_1
assert awslambda.functions[lambda_arn_1].runtime == "python3.7"
assert awslambda.functions[lambda_arn_1].environment == {
"db-password": "test-password"
}
assert awslambda.functions[lambda_arn_1].region == AWS_REGION_EU_WEST_1
assert awslambda.functions[lambda_arn_1].policy == lambda_policy
assert awslambda.functions[lambda_arn_1].url_config
assert (
awslambda.functions[lambda_arn_1].url_config.auth_type
== AuthType.AWS_IAM
)
assert search(
"lambda-url.eu-west-1.on.aws",
awslambda.functions[lambda_arn_1].url_config.url,
)
assert awslambda.functions[lambda_arn_1].url_config.cors_config
assert awslambda.functions[
lambda_arn_1
].url_config.cors_config.allow_origins == ["*"]
assert awslambda.functions[lambda_arn_1].tags == [{"test": "test"}]
# Lambda 2
assert awslambda.functions[lambda_arn_2].name == lambda_name_2
assert awslambda.functions[lambda_arn_2].arn == lambda_arn_2
assert awslambda.functions[lambda_arn_2].runtime == "python3.7"
assert awslambda.functions[lambda_arn_2].environment == {
"db-password": "test-password"
}
assert awslambda.functions[lambda_arn_2].region == AWS_REGION_US_EAST_1
# Emtpy policy
assert awslambda.functions[lambda_arn_2].policy == {
"Id": "default",
"Statement": [],
"Version": "2012-10-17",
}
# Lambda Code
with tempfile.TemporaryDirectory() as tmp_dir_name:
for function, function_code in awslambda.__get_function_code__():
if function.arn == lambda_arn_1 or function.arn == lambda_arn_2:
assert search(
f"s3://awslambda-{function.region}-tasks.s3-{function.region}.amazonaws.com",
function_code.location,
)
assert function_code
function_code.code_zip.extractall(tmp_dir_name)
files_in_zip = next(os.walk(tmp_dir_name))[2]
assert len(files_in_zip) == 1
assert files_in_zip[0] == "lambda_function.py"
with open(
f"{tmp_dir_name}/{files_in_zip[0]}", "r"
) as lambda_code_file:
assert lambda_code_file.read() == LAMBDA_FUNCTION_CODE