feat(appstream): Service and Checks (#1452)

This commit is contained in:
Pepe Fagoaga
2022-11-07 16:16:58 +01:00
committed by GitHub
parent 4762e1cc4c
commit f5873fe0d7
22 changed files with 1123 additions and 0 deletions

View File

@@ -4,3 +4,8 @@ shodan_api_key: null
# Multi account environment: Any additional trusted account number should be added as a space separated list, e.g. # Multi account environment: Any additional trusted account number should be added as a space separated list, e.g.
# trusted_account_ids : ["123456789012", "098765432109", "678901234567"] # trusted_account_ids : ["123456789012", "098765432109", "678901234567"]
trusted_account_ids : [] trusted_account_ids : []
# AppStream Session Configuration
max_idle_disconnect_timeout_in_seconds: 600 # 10 Minutes
max_disconnect_timeout_in_seconds: 300 # 5 Minutes
max_session_duration_seconds: 36000 # 10 Hours

View File

@@ -65,6 +65,11 @@ class Test_AccessAnalyzer_Service:
access_analyzer = AccessAnalyzer(current_audit_info) access_analyzer = AccessAnalyzer(current_audit_info)
assert access_analyzer.session.__class__.__name__ == "Session" assert access_analyzer.session.__class__.__name__ == "Session"
# Test AccessAnalyzer Service
def test__get_service__(self):
access_analyzer = AccessAnalyzer(current_audit_info)
assert access_analyzer.service == "accessanalyzer"
def test__list_analyzers__(self): def test__list_analyzers__(self):
# Set partition for the service # Set partition for the service
current_audit_info.audited_partition = "aws" current_audit_info.audited_partition = "aws"

View File

@@ -0,0 +1,4 @@
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.appstream.appstream_service import AppStream
appstream_client = AppStream(current_audit_info)

View File

@@ -0,0 +1,36 @@
{
"Provider": "aws",
"CheckID": "appstream_fleet_default_internet_access_disabled",
"CheckTitle": "Ensure default Internet Access from your Amazon AppStream fleet streaming instances should remain unchecked.",
"CheckType": ["Software and Configuration Checks", "Industry and Regulatory Standards", "CIS AWS Foundations Benchmark"],
"ServiceName": "appstream",
"SubServiceName": "",
"ResourceIdTemplate": "arn:partition:appstream:region:account-id:fleet/resource-id",
"Severity": "medium",
"ResourceType": "AppStream",
"Description": "Ensure default Internet Access from your Amazon AppStream fleet streaming instances should remain unchecked.",
"Risk": "Default Internet Access from your fleet streaming instances should be controlled using a NAT gateway in the VPC.",
"RelatedUrl": "https://docs.aws.amazon.com/appstream2/latest/developerguide/set-up-stacks-fleets.html",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Uncheck the default internet access for the AppStream Fleet.",
"Url": "https://docs.aws.amazon.com/appstream2/latest/developerguide/set-up-stacks-fleets.html"
}
},
"Categories": [],
"Tags": {
"Tag1Key": "value",
"Tag2Key": "value"
},
"DependsOn": [],
"RelatedTo": [],
"Notes": "Infrastructure Security",
"Compliance": []
}

View File

@@ -0,0 +1,31 @@
from lib.check.models import Check, Check_Report
from providers.aws.services.appstream.appstream_client import appstream_client
# Check if there are AppStream Fleets with the default internet access enabled
class appstream_fleet_default_internet_access_disabled(Check):
"""Check if there are AppStream Fleets with the default internet access enabled"""
def execute(self):
"""Execute the appstream_fleet_default_internet_access_disabled check"""
findings = []
for fleet in appstream_client.fleets:
report = Check_Report(self.metadata)
report.region = fleet.region
report.resource_id = fleet.name
report.resource_arn = fleet.arn
if fleet.enable_default_internet_access:
report.status = "FAIL"
report.status_extended = (
f"Fleet {fleet.name} has default internet access enabled"
)
else:
report.status = "PASS"
report.status_extended = (
f"Fleet {fleet.name} has default internet access disabled"
)
findings.append(report)
return findings

View File

@@ -0,0 +1,158 @@
from unittest import mock
from providers.aws.services.appstream.appstream_service import Fleet
# Mock Test Region
AWS_REGION = "eu-west-1"
class Test_appstream_fleet_default_internet_access_disabled:
def test_no_fleets(self):
appstream_client = mock.MagicMock
appstream_client.fleets = []
with mock.patch(
"providers.aws.services.appstream.appstream_service.AppStream",
new=appstream_client,
):
# Test Check
from providers.aws.services.appstream.appstream_fleet_default_internet_access_disabled.appstream_fleet_default_internet_access_disabled import (
appstream_fleet_default_internet_access_disabled,
)
check = appstream_fleet_default_internet_access_disabled()
result = check.execute()
assert len(result) == 0
def test_one_fleet_internet_access_enabled(self):
appstream_client = mock.MagicMock
appstream_client.fleets = []
fleet1 = Fleet(
arn="arn",
name="test-fleet",
max_user_duration_in_seconds=900,
disconnect_timeout_in_seconds=900,
idle_disconnect_timeout_in_seconds=900,
enable_default_internet_access=True,
region=AWS_REGION,
)
appstream_client.fleets.append(fleet1)
with mock.patch(
"providers.aws.services.appstream.appstream_service.AppStream",
new=appstream_client,
):
# Test Check
from providers.aws.services.appstream.appstream_fleet_default_internet_access_disabled.appstream_fleet_default_internet_access_disabled import (
appstream_fleet_default_internet_access_disabled,
)
check = appstream_fleet_default_internet_access_disabled()
result = check.execute()
assert len(result) == 1
assert result[0].resource_arn == fleet1.arn
assert result[0].region == fleet1.region
assert result[0].resource_id == fleet1.name
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Fleet {fleet1.name} has default internet access enabled"
)
def test_one_fleet_internet_access_disbaled(self):
appstream_client = mock.MagicMock
appstream_client.fleets = []
fleet1 = Fleet(
arn="arn",
name="test-fleet",
max_user_duration_in_seconds=900,
disconnect_timeout_in_seconds=900,
idle_disconnect_timeout_in_seconds=900,
enable_default_internet_access=False,
region=AWS_REGION,
)
appstream_client.fleets.append(fleet1)
with mock.patch(
"providers.aws.services.appstream.appstream_service.AppStream",
new=appstream_client,
):
# Test Check
from providers.aws.services.appstream.appstream_fleet_default_internet_access_disabled.appstream_fleet_default_internet_access_disabled import (
appstream_fleet_default_internet_access_disabled,
)
check = appstream_fleet_default_internet_access_disabled()
result = check.execute()
assert len(result) == 1
assert result[0].resource_arn == fleet1.arn
assert result[0].region == fleet1.region
assert result[0].resource_id == fleet1.name
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"Fleet {fleet1.name} has default internet access disabled"
)
def test_two_fleets_internet_access_one_enabled_two_disabled(self):
appstream_client = mock.MagicMock
appstream_client.fleets = []
fleet1 = Fleet(
arn="arn",
name="test-fleet-1",
max_user_duration_in_seconds=900,
disconnect_timeout_in_seconds=900,
idle_disconnect_timeout_in_seconds=900,
enable_default_internet_access=True,
region=AWS_REGION,
)
fleet2 = Fleet(
arn="arn",
name="test-fleet-2",
max_user_duration_in_seconds=900,
disconnect_timeout_in_seconds=900,
idle_disconnect_timeout_in_seconds=900,
enable_default_internet_access=False,
region=AWS_REGION,
)
appstream_client.fleets.append(fleet1)
appstream_client.fleets.append(fleet2)
with mock.patch(
"providers.aws.services.appstream.appstream_service.AppStream",
new=appstream_client,
):
# Test Check
from providers.aws.services.appstream.appstream_fleet_default_internet_access_disabled.appstream_fleet_default_internet_access_disabled import (
appstream_fleet_default_internet_access_disabled,
)
check = appstream_fleet_default_internet_access_disabled()
result = check.execute()
assert len(result) == 2
for res in result:
if res.resource_id == fleet1.name:
assert result[0].resource_arn == fleet1.arn
assert result[0].region == fleet1.region
assert result[0].resource_id == fleet1.name
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Fleet {fleet1.name} has default internet access enabled"
)
if res.resource_id == fleet2.name:
assert result[1].resource_arn == fleet2.arn
assert result[1].region == fleet2.region
assert result[1].resource_id == fleet2.name
assert result[1].status == "PASS"
assert (
result[1].status_extended
== f"Fleet {fleet2.name} has default internet access disabled"
)

View File

@@ -0,0 +1,36 @@
{
"Provider": "aws",
"CheckID": "appstream_fleet_maximum_session_duration",
"CheckTitle": "Ensure user maximum session duration is no longer than 10 hours.",
"CheckType": ["Infrastructure Security"],
"ServiceName": "appstream",
"SubServiceName": "",
"ResourceIdTemplate": "arn:partition:appstream:region:account-id:fleet/resource-id",
"Severity": "medium",
"ResourceType": "AppStream",
"Description": "Ensure user maximum session duration is no longer than 10 hours.",
"Risk": "Having a session duration lasting longer than 10 hours should not be necessary and if running for any malicious reasons provides a greater time for usage than should be allowed.",
"RelatedUrl": "https://docs.aws.amazon.com/appstream2/latest/developerguide/set-up-stacks-fleets.html",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Change the Maximum session duration is set to 600 minutes or less for the AppStream Fleet.",
"Url": "https://docs.aws.amazon.com/appstream2/latest/developerguide/set-up-stacks-fleets.html"
}
},
"Categories": [],
"Tags": {
"Tag1Key": "value",
"Tag2Key": "value"
},
"DependsOn": [],
"RelatedTo": [],
"Notes": "Infrastructure Security",
"Compliance": []
}

View File

@@ -0,0 +1,29 @@
from config.config import get_config_var
from lib.check.models import Check, Check_Report
from providers.aws.services.appstream.appstream_client import appstream_client
max_session_duration_seconds = get_config_var("max_session_duration_seconds")
"""max_session_duration_seconds, default: 36000 seconds (10 hours)"""
# Check if there are AppStream Fleets with the user maximum session duration no longer than 10 hours
class appstream_fleet_maximum_session_duration(Check):
"""Check if there are AppStream Fleets with the user maximum session duration no longer than 10 hours"""
def execute(self):
"""Execute the appstream_fleet_maximum_session_duration check"""
findings = []
for fleet in appstream_client.fleets:
report = Check_Report(self.metadata)
report.region = fleet.region
report.resource_id = fleet.name
report.resource_arn = fleet.arn
if fleet.max_user_duration_in_seconds < max_session_duration_seconds:
report.status = "PASS"
report.status_extended = f"Fleet {fleet.name} has the maximum session duration configured for less that 10 hours"
else:
report.status = "FAIL"
report.status_extended = f"Fleet {fleet.name} has the maximum session duration configured for more that 10 hours"
findings.append(report)
return findings

View File

@@ -0,0 +1,164 @@
from unittest import mock
from providers.aws.services.appstream.appstream_service import Fleet
# Mock Test Region
AWS_REGION = "eu-west-1"
class Test_appstream_fleet_maximum_session_duration:
def test_no_fleets(self):
appstream_client = mock.MagicMock
appstream_client.fleets = []
with mock.patch(
"providers.aws.services.appstream.appstream_service.AppStream",
new=appstream_client,
):
# Test Check
from providers.aws.services.appstream.appstream_fleet_maximum_session_duration.appstream_fleet_maximum_session_duration import (
appstream_fleet_maximum_session_duration,
)
check = appstream_fleet_maximum_session_duration()
result = check.execute()
assert len(result) == 0
def test_one_fleet_maximum_session_duration_more_than_10_hours(self):
appstream_client = mock.MagicMock
appstream_client.fleets = []
fleet1 = Fleet(
arn="arn",
name="test-fleet",
# 11 Hours
max_user_duration_in_seconds=11 * 60 * 60,
disconnect_timeout_in_seconds=900,
idle_disconnect_timeout_in_seconds=900,
enable_default_internet_access=True,
region=AWS_REGION,
)
appstream_client.fleets.append(fleet1)
with mock.patch(
"providers.aws.services.appstream.appstream_service.AppStream",
new=appstream_client,
):
# Test Check
from providers.aws.services.appstream.appstream_fleet_maximum_session_duration.appstream_fleet_maximum_session_duration import (
appstream_fleet_maximum_session_duration,
)
check = appstream_fleet_maximum_session_duration()
result = check.execute()
assert len(result) == 1
assert result[0].resource_arn == fleet1.arn
assert result[0].region == fleet1.region
assert result[0].resource_id == fleet1.name
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Fleet {fleet1.name} has the maximum session duration configured for more that 10 hours"
)
def test_one_fleet_maximum_session_duration_less_than_10_hours(self):
appstream_client = mock.MagicMock
appstream_client.fleets = []
fleet1 = Fleet(
arn="arn",
name="test-fleet",
# 9 Hours
max_user_duration_in_seconds=9 * 60 * 60,
disconnect_timeout_in_seconds=900,
idle_disconnect_timeout_in_seconds=900,
enable_default_internet_access=True,
region=AWS_REGION,
)
appstream_client.fleets.append(fleet1)
with mock.patch(
"providers.aws.services.appstream.appstream_service.AppStream",
new=appstream_client,
):
# Test Check
from providers.aws.services.appstream.appstream_fleet_maximum_session_duration.appstream_fleet_maximum_session_duration import (
appstream_fleet_maximum_session_duration,
)
check = appstream_fleet_maximum_session_duration()
result = check.execute()
assert len(result) == 1
assert result[0].resource_arn == fleet1.arn
assert result[0].region == fleet1.region
assert result[0].resource_id == fleet1.name
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"Fleet {fleet1.name} has the maximum session duration configured for less that 10 hours"
)
def test_two_fleets_one_maximum_session_duration_less_than_10_hours_on_more_than_10_hours(
self,
):
appstream_client = mock.MagicMock
appstream_client.fleets = []
fleet1 = Fleet(
arn="arn",
name="test-fleet-1",
# 1 Hours
max_user_duration_in_seconds=1 * 60 * 60,
disconnect_timeout_in_seconds=900,
idle_disconnect_timeout_in_seconds=900,
enable_default_internet_access=True,
region=AWS_REGION,
)
fleet2 = Fleet(
arn="arn",
name="test-fleet-2",
# 24 Hours
max_user_duration_in_seconds=24 * 60 * 60,
disconnect_timeout_in_seconds=900,
idle_disconnect_timeout_in_seconds=900,
enable_default_internet_access=False,
region=AWS_REGION,
)
appstream_client.fleets.append(fleet1)
appstream_client.fleets.append(fleet2)
with mock.patch(
"providers.aws.services.appstream.appstream_service.AppStream",
new=appstream_client,
):
# Test Check
from providers.aws.services.appstream.appstream_fleet_maximum_session_duration.appstream_fleet_maximum_session_duration import (
appstream_fleet_maximum_session_duration,
)
check = appstream_fleet_maximum_session_duration()
result = check.execute()
assert len(result) == 2
for res in result:
if res.resource_id == fleet1.name:
assert result[0].resource_arn == fleet1.arn
assert result[0].region == fleet1.region
assert result[0].resource_id == fleet1.name
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"Fleet {fleet1.name} has the maximum session duration configured for less that 10 hours"
)
if res.resource_id == fleet2.name:
assert result[1].resource_arn == fleet2.arn
assert result[1].region == fleet2.region
assert result[1].resource_id == fleet2.name
assert result[1].status == "FAIL"
assert (
result[1].status_extended
== f"Fleet {fleet2.name} has the maximum session duration configured for more that 10 hours"
)

View File

@@ -0,0 +1,36 @@
{
"Provider": "aws",
"CheckID": "appstream_fleet_session_disconnect_timeout",
"CheckTitle": "Ensure session disconnect timeout is set to 5 minutes or lesss.",
"CheckType": ["Software and Configuration Checks", "Industry and Regulatory Standards", "CIS AWS Foundations Benchmark"],
"ServiceName": "appstream",
"SubServiceName": "",
"ResourceIdTemplate": "arn:partition:appstream:region:account-id:fleet/resource-id",
"Severity": "medium",
"ResourceType": "AppStream",
"Description": "Ensure session disconnect timeout is set to 5 minutes or less",
"Risk": "Disconnect timeout in minutes, is the amount of of time that a streaming session remains active after users disconnect.",
"RelatedUrl": "https://docs.aws.amazon.com/appstream2/latest/developerguide/set-up-stacks-fleets.html",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Change the Disconnect timeout to 5 minutes or less for the AppStream Fleet.",
"Url": "https://docs.aws.amazon.com/appstream2/latest/developerguide/set-up-stacks-fleets.html"
}
},
"Categories": [],
"Tags": {
"Tag1Key": "value",
"Tag2Key": "value"
},
"DependsOn": [],
"RelatedTo": [],
"Notes": "Infrastructure Security",
"Compliance": []
}

View File

@@ -0,0 +1,31 @@
from config.config import get_config_var
from lib.check.models import Check, Check_Report
from providers.aws.services.appstream.appstream_client import appstream_client
max_disconnect_timeout_in_seconds = get_config_var("max_disconnect_timeout_in_seconds")
"""max_disconnect_timeout_in_seconds, default: 300 seconds (5 minutes)"""
# Check if there are AppStream Fleets with the session disconnect timeout set to 5 minutes or less
class appstream_fleet_session_disconnect_timeout(Check):
"""Check if there are AppStream Fleets with the session disconnect timeout set to 5 minutes or less"""
def execute(self):
"""Execute the appstream_fleet_maximum_session_duration check"""
findings = []
for fleet in appstream_client.fleets:
report = Check_Report(self.metadata)
report.region = fleet.region
report.resource_id = fleet.name
report.resource_arn = fleet.arn
if fleet.disconnect_timeout_in_seconds <= max_disconnect_timeout_in_seconds:
report.status = "PASS"
report.status_extended = f"Fleet {fleet.name} has the session disconnect timeout set to less than 5 minutes"
else:
report.status = "FAIL"
report.status_extended = f"Fleet {fleet.name} has the session disconnect timeout set to more than 5 minutes"
findings.append(report)
return findings

View File

@@ -0,0 +1,164 @@
from unittest import mock
from providers.aws.services.appstream.appstream_service import Fleet
# Mock Test Region
AWS_REGION = "eu-west-1"
class Test_appstream_fleet_session_disconnect_timeout:
def test_no_fleets(self):
appstream_client = mock.MagicMock
appstream_client.fleets = []
with mock.patch(
"providers.aws.services.appstream.appstream_service.AppStream",
new=appstream_client,
):
# Test Check
from providers.aws.services.appstream.appstream_fleet_session_disconnect_timeout.appstream_fleet_session_disconnect_timeout import (
appstream_fleet_session_disconnect_timeout,
)
check = appstream_fleet_session_disconnect_timeout()
result = check.execute()
assert len(result) == 0
def test_one_fleet_session_disconnect_timeout_more_than_5_minutes(self):
appstream_client = mock.MagicMock
appstream_client.fleets = []
fleet1 = Fleet(
arn="arn",
name="test-fleet",
max_user_duration_in_seconds=1 * 60 * 60,
# 1 hour
disconnect_timeout_in_seconds=1 * 60 * 60,
idle_disconnect_timeout_in_seconds=900,
enable_default_internet_access=True,
region=AWS_REGION,
)
appstream_client.fleets.append(fleet1)
with mock.patch(
"providers.aws.services.appstream.appstream_service.AppStream",
new=appstream_client,
):
# Test Check
from providers.aws.services.appstream.appstream_fleet_session_disconnect_timeout.appstream_fleet_session_disconnect_timeout import (
appstream_fleet_session_disconnect_timeout,
)
check = appstream_fleet_session_disconnect_timeout()
result = check.execute()
assert len(result) == 1
assert result[0].resource_arn == fleet1.arn
assert result[0].region == fleet1.region
assert result[0].resource_id == fleet1.name
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Fleet {fleet1.name} has the session disconnect timeout set to more than 5 minutes"
)
def test_one_fleet_session_disconnect_timeout_less_than_5_minutes(self):
appstream_client = mock.MagicMock
appstream_client.fleets = []
fleet1 = Fleet(
arn="arn",
name="test-fleet",
max_user_duration_in_seconds=900,
# 4 minutes
disconnect_timeout_in_seconds=4 * 60,
idle_disconnect_timeout_in_seconds=900,
enable_default_internet_access=True,
region=AWS_REGION,
)
appstream_client.fleets.append(fleet1)
with mock.patch(
"providers.aws.services.appstream.appstream_service.AppStream",
new=appstream_client,
):
# Test Check
from providers.aws.services.appstream.appstream_fleet_session_disconnect_timeout.appstream_fleet_session_disconnect_timeout import (
appstream_fleet_session_disconnect_timeout,
)
check = appstream_fleet_session_disconnect_timeout()
result = check.execute()
assert len(result) == 1
assert result[0].resource_arn == fleet1.arn
assert result[0].region == fleet1.region
assert result[0].resource_id == fleet1.name
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"Fleet {fleet1.name} has the session disconnect timeout set to less than 5 minutes"
)
def test_two_fleets_session_disconnect_timeout_less_than_5_minutes_one_more_than_5_minutes(
self,
):
appstream_client = mock.MagicMock
appstream_client.fleets = []
fleet1 = Fleet(
arn="arn",
name="test-fleet-1",
max_user_duration_in_seconds=1 * 60 * 60,
# 1 Hours
disconnect_timeout_in_seconds=1 * 60 * 60,
idle_disconnect_timeout_in_seconds=900,
enable_default_internet_access=True,
region=AWS_REGION,
)
fleet2 = Fleet(
arn="arn",
name="test-fleet-2",
max_user_duration_in_seconds=24 * 60 * 60,
# 3 minutes
disconnect_timeout_in_seconds=3 * 60,
idle_disconnect_timeout_in_seconds=900,
enable_default_internet_access=False,
region=AWS_REGION,
)
appstream_client.fleets.append(fleet1)
appstream_client.fleets.append(fleet2)
with mock.patch(
"providers.aws.services.appstream.appstream_service.AppStream",
new=appstream_client,
):
# Test Check
from providers.aws.services.appstream.appstream_fleet_session_disconnect_timeout.appstream_fleet_session_disconnect_timeout import (
appstream_fleet_session_disconnect_timeout,
)
check = appstream_fleet_session_disconnect_timeout()
result = check.execute()
assert len(result) == 2
for res in result:
if res.resource_id == fleet1.name:
assert result[0].resource_arn == fleet1.arn
assert result[0].region == fleet1.region
assert result[0].resource_id == fleet1.name
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Fleet {fleet1.name} has the session disconnect timeout set to more than 5 minutes"
)
if res.resource_id == fleet2.name:
assert result[1].resource_arn == fleet2.arn
assert result[1].region == fleet2.region
assert result[1].resource_id == fleet2.name
assert result[1].status == "PASS"
assert (
result[1].status_extended
== f"Fleet {fleet2.name} has the session disconnect timeout set to less than 5 minutes"
)

View File

@@ -0,0 +1,36 @@
{
"Provider": "aws",
"CheckID": "appstream_fleet_session_disconnect_timeout",
"CheckTitle": "Ensure session idle disconnect timeout is set to 10 minutes or less.",
"CheckType": ["Software and Configuration Checks", "Industry and Regulatory Standards", "CIS AWS Foundations Benchmark"],
"ServiceName": "appstream",
"SubServiceName": "",
"ResourceIdTemplate": "arn:partition:appstream:region:account-id:fleet/resource-id",
"Severity": "medium",
"ResourceType": "AppStream",
"Description": "Ensure session idle disconnect timeout is set to 10 minutes or less.",
"Risk": "Idle disconnect timeout in minutes is the amount of time that users can be inactive before they are disconnected from their streaming session and the Disconnect timeout in minutes time begins.",
"RelatedUrl": "https://docs.aws.amazon.com/appstream2/latest/developerguide/set-up-stacks-fleets.html",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Change the session idle timeout to 10 minutes or less for the AppStream Fleet.",
"Url": "https://docs.aws.amazon.com/appstream2/latest/developerguide/set-up-stacks-fleets.html"
}
},
"Categories": [],
"Tags": {
"Tag1Key": "value",
"Tag2Key": "value"
},
"DependsOn": [],
"RelatedTo": [],
"Notes": "Infrastructure Security",
"Compliance": []
}

View File

@@ -0,0 +1,36 @@
from config.config import get_config_var
from lib.check.models import Check, Check_Report
from providers.aws.services.appstream.appstream_client import appstream_client
max_idle_disconnect_timeout_in_seconds = get_config_var(
"max_idle_disconnect_timeout_in_seconds"
)
"""max_idle_disconnect_timeout_in_seconds, default: 600 seconds (10 minutes)"""
# Check if there are AppStream Fleets with the idle disconnect timeout set to 10 minutes or less
class appstream_fleet_session_idle_disconnect_timeout(Check):
"""Check if there are AppStream Fleets with the idle disconnect timeout set to 10 minutes or less"""
def execute(self):
"""Execute the appstream_fleet_session_idle_disconnect_timeout check"""
findings = []
for fleet in appstream_client.fleets:
report = Check_Report(self.metadata)
report.region = fleet.region
report.resource_id = fleet.name
report.resource_arn = fleet.arn
if (
fleet.idle_disconnect_timeout_in_seconds
<= max_idle_disconnect_timeout_in_seconds
):
report.status = "PASS"
report.status_extended = f"Fleet {fleet.name} has the session idle disconnect timeout set to less than 10 minutes"
else:
report.status = "FAIL"
report.status_extended = f"Fleet {fleet.name} has the session idle disconnect timeout set to more than 10 minutes"
findings.append(report)
return findings

View File

@@ -0,0 +1,164 @@
from unittest import mock
from providers.aws.services.appstream.appstream_service import Fleet
# Mock Test Region
AWS_REGION = "eu-west-1"
class Test_appstream_fleet_session_idle_disconnect_timeout:
def test_no_fleets(self):
appstream_client = mock.MagicMock
appstream_client.fleets = []
with mock.patch(
"providers.aws.services.appstream.appstream_service.AppStream",
new=appstream_client,
):
# Test Check
from providers.aws.services.appstream.appstream_fleet_session_idle_disconnect_timeout.appstream_fleet_session_idle_disconnect_timeout import (
appstream_fleet_session_idle_disconnect_timeout,
)
check = appstream_fleet_session_idle_disconnect_timeout()
result = check.execute()
assert len(result) == 0
def test_one_fleet_session_idle_disconnect_timeout_more_than_10_minutes(self):
appstream_client = mock.MagicMock
appstream_client.fleets = []
fleet1 = Fleet(
arn="arn",
name="test-fleet",
max_user_duration_in_seconds=1 * 60 * 60,
disconnect_timeout_in_seconds=1 * 60 * 60,
# 15 minutes
idle_disconnect_timeout_in_seconds=15 * 60,
enable_default_internet_access=True,
region=AWS_REGION,
)
appstream_client.fleets.append(fleet1)
with mock.patch(
"providers.aws.services.appstream.appstream_service.AppStream",
new=appstream_client,
):
# Test Check
from providers.aws.services.appstream.appstream_fleet_session_idle_disconnect_timeout.appstream_fleet_session_idle_disconnect_timeout import (
appstream_fleet_session_idle_disconnect_timeout,
)
check = appstream_fleet_session_idle_disconnect_timeout()
result = check.execute()
assert len(result) == 1
assert result[0].resource_arn == fleet1.arn
assert result[0].region == fleet1.region
assert result[0].resource_id == fleet1.name
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Fleet {fleet1.name} has the session idle disconnect timeout set to more than 10 minutes"
)
def test_one_fleet_session_idle_disconnect_timeout_less_than_10_minutes(self):
appstream_client = mock.MagicMock
appstream_client.fleets = []
fleet1 = Fleet(
arn="arn",
name="test-fleet",
max_user_duration_in_seconds=900,
disconnect_timeout_in_seconds=4 * 60,
# 8 minutes
idle_disconnect_timeout_in_seconds=8 * 60,
enable_default_internet_access=True,
region=AWS_REGION,
)
appstream_client.fleets.append(fleet1)
with mock.patch(
"providers.aws.services.appstream.appstream_service.AppStream",
new=appstream_client,
):
# Test Check
from providers.aws.services.appstream.appstream_fleet_session_idle_disconnect_timeout.appstream_fleet_session_idle_disconnect_timeout import (
appstream_fleet_session_idle_disconnect_timeout,
)
check = appstream_fleet_session_idle_disconnect_timeout()
result = check.execute()
assert len(result) == 1
assert result[0].resource_arn == fleet1.arn
assert result[0].region == fleet1.region
assert result[0].resource_id == fleet1.name
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"Fleet {fleet1.name} has the session idle disconnect timeout set to less than 10 minutes"
)
def test_two_fleets_session_idle_disconnect_timeout_than_10_minutes_one_more_than_10_minutes(
self,
):
appstream_client = mock.MagicMock
appstream_client.fleets = []
fleet1 = Fleet(
arn="arn",
name="test-fleet-1",
max_user_duration_in_seconds=1 * 60 * 60,
disconnect_timeout_in_seconds=3 * 60,
# 5 minutes
idle_disconnect_timeout_in_seconds=5 * 60,
enable_default_internet_access=True,
region=AWS_REGION,
)
fleet2 = Fleet(
arn="arn",
name="test-fleet-2",
max_user_duration_in_seconds=24 * 60 * 60,
disconnect_timeout_in_seconds=3 * 60,
# 45 minutes
idle_disconnect_timeout_in_seconds=45 * 60,
enable_default_internet_access=False,
region=AWS_REGION,
)
appstream_client.fleets.append(fleet1)
appstream_client.fleets.append(fleet2)
with mock.patch(
"providers.aws.services.appstream.appstream_service.AppStream",
new=appstream_client,
):
# Test Check
from providers.aws.services.appstream.appstream_fleet_session_idle_disconnect_timeout.appstream_fleet_session_idle_disconnect_timeout import (
appstream_fleet_session_idle_disconnect_timeout,
)
check = appstream_fleet_session_idle_disconnect_timeout()
result = check.execute()
assert len(result) == 2
for res in result:
if res.resource_id == fleet1.name:
assert result[0].resource_arn == fleet1.arn
assert result[0].region == fleet1.region
assert result[0].resource_id == fleet1.name
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"Fleet {fleet1.name} has the session idle disconnect timeout set to less than 10 minutes"
)
if res.resource_id == fleet2.name:
assert result[1].resource_arn == fleet2.arn
assert result[1].region == fleet2.region
assert result[1].resource_id == fleet2.name
assert result[1].status == "FAIL"
assert (
result[1].status_extended
== f"Fleet {fleet2.name} has the session idle disconnect timeout set to more than 10 minutes"
)

View File

@@ -0,0 +1,87 @@
import threading
from dataclasses import dataclass
from lib.logger import logger
from providers.aws.aws_provider import generate_regional_clients
################## AppStream
class AppStream:
def __init__(self, audit_info):
self.service = "appstream"
self.session = audit_info.audit_session
self.audited_account = audit_info.audited_account
self.regional_clients = generate_regional_clients(self.service, audit_info)
self.fleets = []
self.__threading_call__(self.__describe_fleets__)
def __get_session__(self):
return self.session
def __threading_call__(self, call):
threads = []
for regional_client in self.regional_clients.values():
threads.append(threading.Thread(target=call, args=(regional_client,)))
for t in threads:
t.start()
for t in threads:
t.join()
def __describe_fleets__(self, regional_client):
logger.info("AppStream - Describing Fleets...")
try:
describe_fleets_paginator = regional_client.get_paginator("describe_fleets")
for page in describe_fleets_paginator.paginate():
for fleet in page["Fleets"]:
self.fleets.append(
Fleet(
arn=fleet["Arn"],
name=fleet["Name"],
max_user_duration_in_seconds=fleet[
"MaxUserDurationInSeconds"
],
disconnect_timeout_in_seconds=fleet[
"DisconnectTimeoutInSeconds"
],
idle_disconnect_timeout_in_seconds=fleet[
"IdleDisconnectTimeoutInSeconds"
],
enable_default_internet_access=fleet[
"EnableDefaultInternetAccess"
],
region=regional_client.region,
)
)
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
@dataclass
class Fleet:
arn: str
name: str
max_user_duration_in_seconds: int
disconnect_timeout_in_seconds: int
idle_disconnect_timeout_in_seconds: int
enable_default_internet_access: bool
def __init__(
self,
arn,
name,
max_user_duration_in_seconds,
disconnect_timeout_in_seconds,
idle_disconnect_timeout_in_seconds,
enable_default_internet_access,
region,
):
self.arn = arn
self.name = name
self.max_user_duration_in_seconds = max_user_duration_in_seconds
self.disconnect_timeout_in_seconds = disconnect_timeout_in_seconds
self.idle_disconnect_timeout_in_seconds = idle_disconnect_timeout_in_seconds
self.enable_default_internet_access = enable_default_internet_access
self.region = region

View File

@@ -0,0 +1,101 @@
from unittest.mock import patch
import botocore
from moto.core import DEFAULT_ACCOUNT_ID
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.appstream.appstream_service import AppStream
# Mock Test Region
AWS_REGION = "eu-west-1"
# Mocking Access Analyzer Calls
make_api_call = botocore.client.BaseClient._make_api_call
# As you can see the operation_name has the list_analyzers snake_case form but
# we are using the ListAnalyzers form.
# Rationale -> https://github.com/boto/botocore/blob/develop/botocore/client.py#L810:L816
#
# We have to mock every AWS API call using Boto3
def mock_make_api_call(self, operation_name, kwarg):
if operation_name == "DescribeFleets":
return {
"Fleets": [
{
"Arn": f"arn:aws:appstream:{AWS_REGION}:{DEFAULT_ACCOUNT_ID}:fleet/test-prowler3-0",
"Name": "test-prowler3-0",
"MaxUserDurationInSeconds": 100,
"DisconnectTimeoutInSeconds": 900,
"IdleDisconnectTimeoutInSeconds": 900,
"EnableDefaultInternetAccess": False,
},
{
"Arn": f"arn:aws:appstream:{AWS_REGION}:{DEFAULT_ACCOUNT_ID}:fleet/test-prowler3-1",
"Name": "test-prowler3-1",
"MaxUserDurationInSeconds": 57600,
"DisconnectTimeoutInSeconds": 900,
"IdleDisconnectTimeoutInSeconds": 900,
"EnableDefaultInternetAccess": True,
},
]
}
return make_api_call(self, operation_name, kwarg)
# Mock generate_regional_clients()
def mock_generate_regional_clients(service, audit_info):
regional_client = audit_info.audit_session.client(service, region_name=AWS_REGION)
regional_client.region = AWS_REGION
return {AWS_REGION: regional_client}
# Patch every AWS call using Boto3 and generate_regional_clients to have 1 client
@patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call)
@patch(
"providers.aws.services.appstream.appstream_service.generate_regional_clients",
new=mock_generate_regional_clients,
)
class Test_AppStream_Service:
# Test AppStream Client
def test__get_client__(self):
appstream = AppStream(current_audit_info)
assert appstream.regional_clients[AWS_REGION].__class__.__name__ == "AppStream"
# Test AppStream Session
def test__get_session__(self):
appstream = AppStream(current_audit_info)
assert appstream.session.__class__.__name__ == "Session"
# Test AppStream Session
def test__get_service__(self):
appstream = AppStream(current_audit_info)
assert appstream.service == "appstream"
def test__describe_fleets__(self):
# Set partition for the service
current_audit_info.audited_partition = "aws"
appstream = AppStream(current_audit_info)
assert len(appstream.fleets) == 2
assert (
appstream.fleets[0].arn
== f"arn:aws:appstream:{AWS_REGION}:{DEFAULT_ACCOUNT_ID}:fleet/test-prowler3-0"
)
assert appstream.fleets[0].name == "test-prowler3-0"
assert appstream.fleets[0].max_user_duration_in_seconds == 100
assert appstream.fleets[0].disconnect_timeout_in_seconds == 900
assert appstream.fleets[0].idle_disconnect_timeout_in_seconds == 900
assert appstream.fleets[0].enable_default_internet_access == False
assert appstream.fleets[0].region == AWS_REGION
assert (
appstream.fleets[1].arn
== f"arn:aws:appstream:{AWS_REGION}:{DEFAULT_ACCOUNT_ID}:fleet/test-prowler3-1"
)
assert appstream.fleets[1].name == "test-prowler3-1"
assert appstream.fleets[1].max_user_duration_in_seconds == 57600
assert appstream.fleets[1].disconnect_timeout_in_seconds == 900
assert appstream.fleets[1].idle_disconnect_timeout_in_seconds == 900
assert appstream.fleets[1].enable_default_internet_access == True
assert appstream.fleets[1].region == AWS_REGION