feat(Route53): Service and checks (#1493)

Co-authored-by: Sergio Garcia <38561120+sergargar@users.noreply.github.com>
This commit is contained in:
Pepe Fagoaga
2022-11-17 19:57:20 +01:00
committed by GitHub
parent 62ffe26b42
commit 12896cceaa
21 changed files with 930 additions and 147 deletions

View File

@@ -1,54 +0,0 @@
#!/usr/bin/env bash
# Prowler - the handy cloud security tool (copyright 2019) by Toni de la Fuente
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed
# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
# Remediation:
#
# https://docs.aws.amazon.com/cli/latest/reference/route53domains/update-domain-contact-privacy.html
#
# update-domain-contact-privacy \
# --region us-east-1 \
# --domain-name example.com \
# --admin-privacy \
# --registrant-privacy \
# --tech-privacy
CHECK_ID_extra7152="7.152"
CHECK_TITLE_extra7152="[extra7152] Enable Privacy Protection for for a Route53 Domain (us-east-1 only)"
CHECK_SCORED_extra7152="NOT_SCORED"
CHECK_CIS_LEVEL_extra7152="EXTRA"
CHECK_SEVERITY_extra7152="Medium"
CHECK_ASFF_RESOURCE_TYPE_extra7152="AwsRoute53Domain"
CHECK_ALTERNATE_check7152="extra7152"
CHECK_SERVICENAME_extra7152="route53"
CHECK_RISK_extra7152='Without privacy protection enabled; ones personal information is published to the public WHOIS database'
CHECK_REMEDIATION_extra7152='Ensure default Privacy is enabled'
CHECK_DOC_extra7152='https://docs.aws.amazon.com/Route53/latest/DeveloperGuide/domain-privacy-protection.html'
CHECK_CAF_EPIC_extra7152='Data Protection'
extra7152(){
# Route53 is a global service, looking for domains in US-EAST-1
# this is also valid for GovCloud https://docs.aws.amazon.com/govcloud-us/latest/UserGuide/setting-up-route53.html
DOMAIN_NAMES=$($AWSCLI route53domains list-domains $PROFILE_OPT --region us-east-1 --query 'Domains[*].DomainName' --output text )
if [[ $DOMAIN_NAMES ]];then
for domain_name in $DOMAIN_NAMES;do
DOMAIN_DETAIL=$($AWSCLI route53domains get-domain-detail $PROFILE_OPT --region us-east-1 --query 'AdminPrivacy' --domain-name $domain_name)
if [[ $DOMAIN_DETAIL == false ]]; then
textFail "us-east-1: Contact information public for: $domain_name" "us-east-1" "$domain_name"
else
textPass "us-east-1: All contact information is private for: $domain_name" "us-east-1" "$domain_name"
fi
done
else
textInfo "us-east-1: No Domain Names found" "us-east-1"
fi
}

View File

@@ -1,52 +0,0 @@
#!/usr/bin/env bash
# Prowler - the handy cloud security tool (copyright 2019) by Toni de la Fuente
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed
# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
# Remediation:
#
# https://docs.aws.amazon.com/cli/latest/reference/route53domains/enable-domain-transfer-lock.html
#
# enable-domain-transfer-lock \
# --domain-name example.com
CHECK_ID_extra7153="7.153"
CHECK_TITLE_extra7153="[extra7153] Enable Transfer Lock for a Route53 Domain (us-east-1 only)"
CHECK_SCORED_extra7153="NOT_SCORED"
CHECK_CIS_LEVEL_extra7153="EXTRA"
CHECK_SEVERITY_extra7153="Medium"
CHECK_ASFF_RESOURCE_TYPE_extra7153="AwsRoute53Domain"
CHECK_ALTERNATE_check7153="extra7153"
CHECK_SERVICENAME_extra7153="route53"
CHECK_RISK_extra7153='Without transfer lock enabled; a domain name could be incorrectly moved to a new registrar'
CHECK_REMEDIATION_extra7153='Ensure transfer lock is enabled'
CHECK_DOC_extra7153='https://docs.aws.amazon.com/Route53/latest/DeveloperGuide/domain-lock.html'
CHECK_CAF_EPIC_extra7153='Data Protection'
extra7153(){
# Route53 is a global service, looking for domains in US-EAST-1
# this is also valid for GovCloud https://docs.aws.amazon.com/govcloud-us/latest/UserGuide/setting-up-route53.html
DOMAIN_NAMES=$($AWSCLI route53domains list-domains $PROFILE_OPT --region us-east-1 --query 'Domains[*].DomainName' --output text )
if [[ $DOMAIN_NAMES ]];then
for domain_name in $DOMAIN_NAMES;do
DOMAIN_DETAIL=$($AWSCLI route53domains get-domain-detail $PROFILE_OPT --region us-east-1 --query 'StatusList' --domain-name $domain_name)
HAS_TRANSFER_LOCK=$( grep -o 'clientTransferProhibited' <<< $DOMAIN_DETAIL)
if [[ $HAS_TRANSFER_LOCK ]]; then
textPass "us-east-1: clientTransferProhibited found for: $domain_name" "us-east-1" "$domain_name"
else
textFail "us-east-1: clientTransferProhibited not found for: $domain_name" "us-east-1" "$domain_name"
fi
done
else
textInfo "us-east-1: No Domain Names found" "us-east-1"
fi
}

View File

@@ -1,41 +0,0 @@
#!/usr/bin/env bash
# Prowler - the handy cloud security tool (copyright 2018) by Toni de la Fuente
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed
# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
CHECK_ID_extra719="7.19"
CHECK_TITLE_extra719="[extra719] Check if Route53 public hosted zones are logging queries to CloudWatch Logs"
CHECK_SCORED_extra719="NOT_SCORED"
CHECK_CIS_LEVEL_extra719="EXTRA"
CHECK_SEVERITY_extra719="Medium"
CHECK_ALTERNATE_check719="extra719"
CHECK_ASFF_RESOURCE_TYPE_extra719="AwsRoute53HostedZone"
CHECK_SERVICENAME_extra719="route53"
CHECK_RISK_extra719='If logs are not enabled; monitoring of service use and threat analysis is not possible.'
CHECK_REMEDIATION_extra719='Enable CloudWatch logs and define metrics and uses cases for the events recorded.'
CHECK_DOC_extra719='https://docs.aws.amazon.com/Route53/latest/DeveloperGuide/monitoring-hosted-zones-with-cloudwatch.html'
CHECK_CAF_EPIC_extra719='Logging and Monitoring'
extra719(){
# You can't create a query logging config for a private hosted zone.
LIST_OF_HOSTED_ZONES=$($AWSCLI route53 list-hosted-zones $PROFILE_OPT | jq -r ".HostedZones[] | select(.Config.PrivateZone == false) | .Id")
if [[ $LIST_OF_HOSTED_ZONES ]]; then
for hostedzoneid in $LIST_OF_HOSTED_ZONES;do
HOSTED_ZONE_QUERY_LOG_ENABLED=$($AWSCLI route53 list-query-logging-configs --hosted-zone-id $hostedzoneid $PROFILE_OPT --query QueryLoggingConfigs[*].CloudWatchLogsLogGroupArn --output text|cut -d: -f7)
if [[ $HOSTED_ZONE_QUERY_LOG_ENABLED ]];then
textPass "$REGION: Route53 public hosted zone Id $hostedzoneid has query logging enabled in Log Group $HOSTED_ZONE_QUERY_LOG_ENABLED" "$REGION" "$hostedzoneid"
else
textFail "$REGION: Route53 public hosted zone Id $hostedzoneid has query logging disabled!" "$REGION" "$hostedzoneid"
fi
done
else
textInfo "$REGION: No Route53 hosted zones found" "$REGION"
fi
}

View File

@@ -0,0 +1,4 @@
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.route53.route53_service import Route53
route53_client = Route53(current_audit_info)

View File

@@ -0,0 +1,35 @@
{
"Provider": "aws",
"CheckID": "route53_domains_privacy_protection_enabled",
"CheckTitle": "Enable Privacy Protection for for a Route53 Domain.",
"CheckType": [],
"ServiceName": "route53",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "AwsRoute53Domain",
"Description": "Enable Privacy Protection for for a Route53 Domain.",
"Risk": "Without privacy protection enabled, ones personal information is published to the public WHOIS database.",
"RelatedUrl": "https://docs.aws.amazon.com/Route53/latest/DeveloperGuide/domain-privacy-protection.html",
"Remediation": {
"Code": {
"CLI": "aws route53domains update-domain-contact-privacy --domain-name domain.com --registrant-privacy",
"NativeIaC": "",
"Other": "https://www.trendmicro.com/cloudoneconformity/knowledge-base/aws/Route53/privacy-protection.html",
"Terraform": ""
},
"Recommendation": {
"Text": "Ensure default Privacy is enabled.",
"Url": "https://docs.aws.amazon.com/Route53/latest/DeveloperGuide/domain-privacy-protection.html"
}
},
"Categories": [],
"Tags": {
"Tag1Key": "value",
"Tag2Key": "value"
},
"DependsOn": [],
"RelatedTo": [],
"Notes": "",
"Compliance": []
}

View File

@@ -0,0 +1,27 @@
from lib.check.models import Check, Check_Report
from providers.aws.services.route53.route53domains_client import route53domains_client
class route53_domains_privacy_protection_enabled(Check):
def execute(self) -> Check_Report:
findings = []
for domain in route53domains_client.domains.values():
report = Check_Report(self.metadata)
report.resource_id = domain.name
report.region = domain.region
if domain.admin_privacy:
report.status = "PASS"
report.status_extended = (
f"Contact information is private for the {domain.name} domain"
)
else:
report.status = "FAIL"
report.status_extended = (
f"Contact information is public for the {domain.name} domain"
)
findings.append(report)
return findings

View File

@@ -0,0 +1,83 @@
from unittest import mock
from providers.aws.services.route53.route53_service import Domain
AWS_REGION = "us-east-1"
class Test_route53_domains_privacy_protection_enabled:
def test_no_domains(self):
route53domains = mock.MagicMock
route53domains.domains = {}
with mock.patch(
"providers.aws.services.route53.route53_service.Route53Domains",
new=route53domains,
):
# Test Check
from providers.aws.services.route53.route53_domains_privacy_protection_enabled.route53_domains_privacy_protection_enabled import (
route53_domains_privacy_protection_enabled,
)
check = route53_domains_privacy_protection_enabled()
result = check.execute()
assert len(result) == 0
def test_domain_privacy_protection_disabled(self):
route53domains = mock.MagicMock
domain_name = "test-domain.com"
route53domains.domains = {
domain_name: Domain(
name=domain_name, region=AWS_REGION, admin_privacy=False
)
}
with mock.patch(
"providers.aws.services.route53.route53_service.Route53Domains",
new=route53domains,
):
# Test Check
from providers.aws.services.route53.route53_domains_privacy_protection_enabled.route53_domains_privacy_protection_enabled import (
route53_domains_privacy_protection_enabled,
)
check = route53_domains_privacy_protection_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].resource_id == domain_name
assert result[0].region == AWS_REGION
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Contact information is public for the {domain_name} domain"
)
def test_domain_privacy_protection_enabled(self):
route53domains = mock.MagicMock
domain_name = "test-domain.com"
route53domains.domains = {
domain_name: Domain(name=domain_name, region=AWS_REGION, admin_privacy=True)
}
with mock.patch(
"providers.aws.services.route53.route53_service.Route53Domains",
new=route53domains,
):
# Test Check
from providers.aws.services.route53.route53_domains_privacy_protection_enabled.route53_domains_privacy_protection_enabled import (
route53_domains_privacy_protection_enabled,
)
check = route53_domains_privacy_protection_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].resource_id == domain_name
assert result[0].region == AWS_REGION
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"Contact information is private for the {domain_name} domain"
)

View File

@@ -0,0 +1,35 @@
{
"Provider": "aws",
"CheckID": "route53_domains_transferlock_enabled",
"CheckTitle": "Enable Transfer Lock for a Route53 Domain.",
"CheckType": [],
"ServiceName": "route53",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "AwsRoute53Domain",
"Description": "Enable Transfer Lock for a Route53 Domain.",
"Risk": "Without transfer lock enabled; a domain name could be incorrectly moved to a new registrar.",
"RelatedUrl": "https://docs.aws.amazon.com/Route53/latest/DeveloperGuide/domain-lock.html",
"Remediation": {
"Code": {
"CLI": "aws route53domains enable-domain-transfer-lock --domain-name DOMAIN",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Ensure transfer lock is enabled.",
"Url": "https://docs.aws.amazon.com/Route53/latest/DeveloperGuide/domain-lock.html"
}
},
"Categories": [],
"Tags": {
"Tag1Key": "value",
"Tag2Key": "value"
},
"DependsOn": [],
"RelatedTo": [],
"Notes": "",
"Compliance": []
}

View File

@@ -0,0 +1,27 @@
from lib.check.models import Check, Check_Report
from providers.aws.services.route53.route53domains_client import route53domains_client
class route53_domains_transferlock_enabled(Check):
def execute(self) -> Check_Report:
findings = []
for domain in route53domains_client.domains.values():
report = Check_Report(self.metadata)
report.resource_id = domain.name
report.region = domain.region
if "clientTransferProhibited" in domain.status_list:
report.status = "PASS"
report.status_extended = (
f"Transfer Lock is enabled for the {domain.name} domain"
)
else:
report.status = "FAIL"
report.status_extended = (
f"Transfer Lock is disabled for the {domain.name} domain"
)
findings.append(report)
return findings

View File

@@ -0,0 +1,91 @@
from unittest import mock
from providers.aws.services.route53.route53_service import Domain
AWS_REGION = "us-east-1"
class Test_route53_domains_transferlock_enabled:
def test_no_domains(self):
route53domains = mock.MagicMock
route53domains.domains = {}
with mock.patch(
"providers.aws.services.route53.route53_service.Route53Domains",
new=route53domains,
):
# Test Check
from providers.aws.services.route53.route53_domains_transferlock_enabled.route53_domains_transferlock_enabled import (
route53_domains_transferlock_enabled,
)
check = route53_domains_transferlock_enabled()
result = check.execute()
assert len(result) == 0
def test_domain_transfer_lock_disabled(self):
route53domains = mock.MagicMock
domain_name = "test-domain.com"
route53domains.domains = {
domain_name: Domain(
name=domain_name,
region=AWS_REGION,
admin_privacy=False,
status_list=[""],
)
}
with mock.patch(
"providers.aws.services.route53.route53_service.Route53Domains",
new=route53domains,
):
# Test Check
from providers.aws.services.route53.route53_domains_transferlock_enabled.route53_domains_transferlock_enabled import (
route53_domains_transferlock_enabled,
)
check = route53_domains_transferlock_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].resource_id == domain_name
assert result[0].region == AWS_REGION
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Transfer Lock is disabled for the {domain_name} domain"
)
def test_domain_transfer_lock_enabled(self):
route53domains = mock.MagicMock
domain_name = "test-domain.com"
route53domains.domains = {
domain_name: Domain(
name=domain_name,
region=AWS_REGION,
admin_privacy=False,
status_list=["clientTransferProhibited"],
)
}
with mock.patch(
"providers.aws.services.route53.route53_service.Route53Domains",
new=route53domains,
):
# Test Check
from providers.aws.services.route53.route53_domains_transferlock_enabled.route53_domains_transferlock_enabled import (
route53_domains_transferlock_enabled,
)
check = route53_domains_transferlock_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].resource_id == domain_name
assert result[0].region == AWS_REGION
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"Transfer Lock is enabled for the {domain_name} domain"
)

View File

@@ -0,0 +1,35 @@
{
"Provider": "aws",
"CheckID": "route53_public_hosted_zones_cloudwatch_logging_enabled",
"CheckTitle": "Check if Route53 public hosted zones are logging queries to CloudWatch Logs.",
"CheckType": [],
"ServiceName": "route53",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "AwsRoute53HostedZone",
"Description": "Check if Route53 public hosted zones are logging queries to CloudWatch Logs.",
"Risk": "If logs are not enabled; monitoring of service use and threat analysis is not possible.",
"RelatedUrl": "https://docs.aws.amazon.com/Route53/latest/DeveloperGuide/monitoring-hosted-zones-with-cloudwatch.html",
"Remediation": {
"Code": {
"CLI": "aws route53 create-query-logging-config --hosted-zone-id <zone_id> --cloud-watch-logs-log-group-arn <log_group_arn>",
"NativeIaC": "",
"Other": "https://www.trendmicro.com/cloudoneconformity/knowledge-base/aws/Route53/enable-query-logging.html",
"Terraform": ""
},
"Recommendation": {
"Text": "Enable CloudWatch logs and define metrics and uses cases for the events recorded.",
"Url": "https://docs.aws.amazon.com/Route53/latest/DeveloperGuide/monitoring-hosted-zones-with-cloudwatch.html"
}
},
"Categories": [],
"Tags": {
"Tag1Key": "value",
"Tag2Key": "value"
},
"DependsOn": [],
"RelatedTo": [],
"Notes": "",
"Compliance": []
}

View File

@@ -0,0 +1,27 @@
from lib.check.models import Check, Check_Report
from providers.aws.services.route53.route53_client import route53_client
class route53_public_hosted_zones_cloudwatch_logging_enabled(Check):
def execute(self) -> Check_Report:
findings = []
for hosted_zone in route53_client.hosted_zones.values():
if not hosted_zone.private_zone:
report = Check_Report(self.metadata)
report.resource_id = hosted_zone.id
report.region = hosted_zone.region
if (
hosted_zone.logging_config
and hosted_zone.logging_config.cloudwatch_log_group_arn
):
report.status = "PASS"
report.status_extended = f"Route53 Public Hosted Zone {hosted_zone.id} has query logging enabled in Log Group {hosted_zone.logging_config.cloudwatch_log_group_arn}"
else:
report.status = "FAIL"
report.status_extended = f"Route53 Public Hosted Zone {hosted_zone.id} has query logging disabled"
findings.append(report)
return findings

View File

@@ -0,0 +1,127 @@
from unittest import mock
from moto.core import DEFAULT_ACCOUNT_ID
from providers.aws.services.route53.route53_service import HostedZone, LoggingConfig
AWS_REGION = "us-east-1"
class Test_route53_public_hosted_zones_cloudwatch_logging_enabled:
def test_no_hosted_zones(self):
route53 = mock.MagicMock
route53.hosted_zones = {}
with mock.patch(
"providers.aws.services.route53.route53_service.Route53",
new=route53,
):
# Test Check
from providers.aws.services.route53.route53_public_hosted_zones_cloudwatch_logging_enabled.route53_public_hosted_zones_cloudwatch_logging_enabled import (
route53_public_hosted_zones_cloudwatch_logging_enabled,
)
check = route53_public_hosted_zones_cloudwatch_logging_enabled()
result = check.execute()
assert len(result) == 0
def test_hosted_zone__public_logging_enabled(self):
route53 = mock.MagicMock
hosted_zone_name = "test-domain.com"
hosted_zone_id = "ABCDEF12345678"
log_group_name = "test-log-group"
log_group_arn = (
f"rn:aws:logs:{AWS_REGION}:{DEFAULT_ACCOUNT_ID}:log-group:{log_group_name}"
)
route53.hosted_zones = {
hosted_zone_name: HostedZone(
name=hosted_zone_name,
id=hosted_zone_id,
private_zone=False,
region=AWS_REGION,
logging_config=LoggingConfig(cloudwatch_log_group_arn=log_group_arn),
)
}
with mock.patch(
"providers.aws.services.route53.route53_service.Route53",
new=route53,
):
# Test Check
from providers.aws.services.route53.route53_public_hosted_zones_cloudwatch_logging_enabled.route53_public_hosted_zones_cloudwatch_logging_enabled import (
route53_public_hosted_zones_cloudwatch_logging_enabled,
)
check = route53_public_hosted_zones_cloudwatch_logging_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].resource_id == hosted_zone_id
assert result[0].region == AWS_REGION
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"Route53 Public Hosted Zone {hosted_zone_id} has query logging enabled in Log Group {log_group_arn}"
)
def test_hosted_zone__public_logging_disabled(self):
route53 = mock.MagicMock
hosted_zone_name = "test-domain.com"
hosted_zone_id = "ABCDEF12345678"
route53.hosted_zones = {
hosted_zone_name: HostedZone(
name=hosted_zone_name,
id=hosted_zone_id,
private_zone=False,
region=AWS_REGION,
)
}
with mock.patch(
"providers.aws.services.route53.route53_service.Route53",
new=route53,
):
# Test Check
from providers.aws.services.route53.route53_public_hosted_zones_cloudwatch_logging_enabled.route53_public_hosted_zones_cloudwatch_logging_enabled import (
route53_public_hosted_zones_cloudwatch_logging_enabled,
)
check = route53_public_hosted_zones_cloudwatch_logging_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].resource_id == hosted_zone_id
assert result[0].region == AWS_REGION
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Route53 Public Hosted Zone {hosted_zone_id} has query logging disabled"
)
def test_hosted_zone__private(self):
route53 = mock.MagicMock
hosted_zone_name = "test-domain.com"
hosted_zone_id = "ABCDEF12345678"
route53.hosted_zones = {
hosted_zone_name: HostedZone(
name=hosted_zone_name,
id=hosted_zone_id,
private_zone=True,
region=AWS_REGION,
)
}
with mock.patch(
"providers.aws.services.route53.route53_service.Route53",
new=route53,
):
# Test Check
from providers.aws.services.route53.route53_public_hosted_zones_cloudwatch_logging_enabled.route53_public_hosted_zones_cloudwatch_logging_enabled import (
route53_public_hosted_zones_cloudwatch_logging_enabled,
)
check = route53_public_hosted_zones_cloudwatch_logging_enabled()
result = check.execute()
assert len(result) == 0

View File

@@ -0,0 +1,128 @@
from pydantic import BaseModel
from lib.logger import logger
from providers.aws.aws_provider import get_region_global_service
################## Route53
class Route53:
def __init__(self, audit_info):
self.service = "route53"
self.session = audit_info.audit_session
self.client = self.session.client(self.service)
self.region = get_region_global_service(audit_info)
self.hosted_zones = {}
self.__list_hosted_zones__()
self.__list_query_logging_configs__()
def __get_session__(self):
return self.session
def __list_hosted_zones__(self):
logger.info("Route53 - Listing Hosting Zones...")
try:
list_hosted_zones_paginator = self.client.get_paginator("list_hosted_zones")
for page in list_hosted_zones_paginator.paginate():
for hosted_zone in page["HostedZones"]:
hosted_zone_id = hosted_zone["Id"].replace("/hostedzone/", "")
hosted_zone_name = hosted_zone["Name"]
private_zone = hosted_zone["Config"]["PrivateZone"]
self.hosted_zones[hosted_zone_id] = HostedZone(
id=hosted_zone_id,
name=hosted_zone_name,
private_zone=private_zone,
region=self.region,
)
except Exception as error:
logger.error(
f"{self.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def __list_query_logging_configs__(self):
logger.info("Route53 - Listing Query Logging Configs...")
try:
for hosted_zone in self.hosted_zones.values():
list_query_logging_configs_paginator = self.client.get_paginator(
"list_query_logging_configs"
)
for page in list_query_logging_configs_paginator.paginate():
for logging_config in page["QueryLoggingConfigs"]:
self.hosted_zones[
hosted_zone.id
].logging_config = LoggingConfig(
cloudwatch_log_group_arn=logging_config[
"CloudWatchLogsLogGroupArn"
]
)
except Exception as error:
logger.error(
f"{self.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
class LoggingConfig(BaseModel):
cloudwatch_log_group_arn: str
class HostedZone(BaseModel):
id: str
name: str
private_zone: bool
logging_config: LoggingConfig = None
region: str
################## Route53Domains
class Route53Domains:
def __init__(self, audit_info):
self.service = "route53domains"
self.session = audit_info.audit_session
self.audited_account = audit_info.audited_account
self.region = get_region_global_service(audit_info)
self.client = self.session.client(self.service, self.region)
self.domains = {}
self.__list_domains__()
self.__get_domain_detail__()
def __get_session__(self):
return self.session
def __list_domains__(self):
logger.info("Route53Domains - Listing Domains...")
try:
list_domains_zones_paginator = self.client.get_paginator("list_domains")
for page in list_domains_zones_paginator.paginate():
for domain in page["Domains"]:
domain_name = domain["DomainName"]
self.domains[domain_name] = Domain(
name=domain_name, region=self.region
)
except Exception as error:
logger.error(
f"{self.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def __get_domain_detail__(self):
logger.info("Route53Domains - Getting Domain Detail...")
try:
for domain in self.domains.values():
domain_detail = self.client.get_domain_detail(DomainName=domain.name)
self.domains[domain.name].admin_privacy = domain_detail["AdminPrivacy"]
self.domains[domain.name].status_list = domain_detail["StatusList"]
except Exception as error:
logger.error(
f"{self.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
class Domain(BaseModel):
name: str
region: str
admin_privacy: bool = False
status_list: list[str] = None

View File

@@ -0,0 +1,190 @@
from unittest.mock import patch
import botocore
from boto3 import client, session
from moto import mock_logs, mock_route53
from providers.aws.lib.audit_info.audit_info import AWS_Audit_Info
from providers.aws.services.route53.route53_service import Route53
# Mock Test Region
AWS_REGION = "us-east-1"
# Mocking Access Analyzer Calls
make_api_call = botocore.client.BaseClient._make_api_call
def mock_make_api_call(self, operation_name, kwarg):
"""We have to mock every AWS API call using Boto3"""
if operation_name == "DescribeDirectories":
return {}
return make_api_call(self, operation_name, kwarg)
# Patch every AWS call using Boto3 and generate_regional_clients to have 1 client
@patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call)
class Test_Route53_Service:
# Mocked Audit Info
def set_mocked_audit_info(self):
audit_info = AWS_Audit_Info(
original_session=None,
audit_session=session.Session(
profile_name=None,
botocore_session=None,
),
audited_account=None,
audited_user_id=None,
audited_partition="aws",
audited_identity_arn=None,
profile=None,
profile_region=AWS_REGION,
credentials=None,
assumed_role_info=None,
audited_regions=None,
organizations_metadata=None,
)
return audit_info
# Test Route53 Client
@mock_route53
def test__get_client__(self):
route53 = Route53(self.set_mocked_audit_info())
assert route53.client.__class__.__name__ == "Route53"
# Test Route53 Session
@mock_route53
def test__get_session__(self):
route53 = Route53(self.set_mocked_audit_info())
assert route53.session.__class__.__name__ == "Session"
# Test Route53 Service
@mock_route53
def test__get_service__(self):
route53 = Route53(self.set_mocked_audit_info())
assert route53.service == "route53"
@mock_route53
@mock_logs
def test__list_hosted_zones__private_with_logging(self):
# Create Hosted Zone
r53_client = client("route53", region_name=AWS_REGION)
hosted_zone_name = "testdns.aws.com."
response = r53_client.create_hosted_zone(
Name=hosted_zone_name,
CallerReference=str(hash("foo")),
HostedZoneConfig={"Comment": "", "PrivateZone": True},
)
hosted_zone_id = response["HostedZone"]["Id"].replace("/hostedzone/", "")
hosted_zone_name = response["HostedZone"]["Name"]
# CloudWatch Client
logs_client = client("logs", region_name=AWS_REGION)
log_group_name = "test-log-group"
_ = logs_client.create_log_group(logGroupName=log_group_name)
log_group_arn = logs_client.describe_log_groups()["logGroups"][0]["arn"]
# Create Query Logging Config
response = r53_client.create_query_logging_config(
HostedZoneId=hosted_zone_id, CloudWatchLogsLogGroupArn=log_group_arn
)
# Set partition for the service
route53 = Route53(self.set_mocked_audit_info())
assert len(route53.hosted_zones) == 1
assert route53.hosted_zones[hosted_zone_id]
assert route53.hosted_zones[hosted_zone_id].id == hosted_zone_id
assert route53.hosted_zones[hosted_zone_id].name == hosted_zone_name
assert route53.hosted_zones[hosted_zone_id].private_zone
assert route53.hosted_zones[hosted_zone_id].logging_config
assert (
route53.hosted_zones[hosted_zone_id].logging_config.cloudwatch_log_group_arn
== log_group_arn
)
assert route53.hosted_zones[hosted_zone_id].region == AWS_REGION
@mock_route53
@mock_logs
def test__list_hosted_zones__public_with_logging(self):
# Create Hosted Zone
r53_client = client("route53", region_name=AWS_REGION)
hosted_zone_name = "testdns.aws.com."
response = r53_client.create_hosted_zone(
Name=hosted_zone_name,
CallerReference=str(hash("foo")),
HostedZoneConfig={"Comment": "", "PrivateZone": False},
)
hosted_zone_id = response["HostedZone"]["Id"].replace("/hostedzone/", "")
hosted_zone_name = response["HostedZone"]["Name"]
# CloudWatch Client
logs_client = client("logs", region_name=AWS_REGION)
log_group_name = "test-log-group"
_ = logs_client.create_log_group(logGroupName=log_group_name)
log_group_arn = logs_client.describe_log_groups()["logGroups"][0]["arn"]
# Create Query Logging Config
response = r53_client.create_query_logging_config(
HostedZoneId=hosted_zone_id, CloudWatchLogsLogGroupArn=log_group_arn
)
# Set partition for the service
route53 = Route53(self.set_mocked_audit_info())
assert len(route53.hosted_zones) == 1
assert route53.hosted_zones[hosted_zone_id]
assert route53.hosted_zones[hosted_zone_id].id == hosted_zone_id
assert route53.hosted_zones[hosted_zone_id].name == hosted_zone_name
assert not route53.hosted_zones[hosted_zone_id].private_zone
assert route53.hosted_zones[hosted_zone_id].logging_config
assert (
route53.hosted_zones[hosted_zone_id].logging_config.cloudwatch_log_group_arn
== log_group_arn
)
assert route53.hosted_zones[hosted_zone_id].region == AWS_REGION
@mock_route53
@mock_logs
def test__list_hosted_zones__private_without_logging(self):
# Create Hosted Zone
r53_client = client("route53", region_name=AWS_REGION)
hosted_zone_name = "testdns.aws.com."
response = r53_client.create_hosted_zone(
Name=hosted_zone_name,
CallerReference=str(hash("foo")),
HostedZoneConfig={"Comment": "", "PrivateZone": True},
)
hosted_zone_id = response["HostedZone"]["Id"].replace("/hostedzone/", "")
hosted_zone_name = response["HostedZone"]["Name"]
# Set partition for the service
route53 = Route53(self.set_mocked_audit_info())
assert len(route53.hosted_zones) == 1
assert route53.hosted_zones[hosted_zone_id]
assert route53.hosted_zones[hosted_zone_id].id == hosted_zone_id
assert route53.hosted_zones[hosted_zone_id].name == hosted_zone_name
assert route53.hosted_zones[hosted_zone_id].private_zone
assert not route53.hosted_zones[hosted_zone_id].logging_config
assert route53.hosted_zones[hosted_zone_id].region == AWS_REGION
@mock_route53
@mock_logs
def test__list_hosted_zones__public_without_logging(self):
# Create Hosted Zone
r53_client = client("route53", region_name=AWS_REGION)
hosted_zone_name = "testdns.aws.com."
response = r53_client.create_hosted_zone(
Name=hosted_zone_name,
CallerReference=str(hash("foo")),
HostedZoneConfig={"Comment": "", "PrivateZone": False},
)
hosted_zone_id = response["HostedZone"]["Id"].replace("/hostedzone/", "")
hosted_zone_name = response["HostedZone"]["Name"]
# Set partition for the service
route53 = Route53(self.set_mocked_audit_info())
assert len(route53.hosted_zones) == 1
assert route53.hosted_zones[hosted_zone_id]
assert route53.hosted_zones[hosted_zone_id].id == hosted_zone_id
assert route53.hosted_zones[hosted_zone_id].name == hosted_zone_name
assert not route53.hosted_zones[hosted_zone_id].private_zone
assert not route53.hosted_zones[hosted_zone_id].logging_config
assert route53.hosted_zones[hosted_zone_id].region == AWS_REGION

View File

@@ -0,0 +1,4 @@
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.route53.route53_service import Route53Domains
route53domains_client = Route53Domains(current_audit_info)

View File

@@ -0,0 +1,117 @@
from datetime import datetime
from unittest.mock import patch
import botocore
from boto3 import session
from providers.aws.lib.audit_info.audit_info import AWS_Audit_Info
from providers.aws.services.route53.route53_service import Route53Domains
# Mock Test Region
AWS_REGION = "us-east-1"
# Mocking Access Analyzer Calls
make_api_call = botocore.client.BaseClient._make_api_call
def mock_make_api_call(self, operation_name, kwarg):
"""We have to mock every AWS API call using Boto3"""
if operation_name == "ListDomains":
return {
"Domains": [
{
"DomainName": "test.domain.com",
"AutoRenew": True,
"TransferLock": True,
"Expiry": datetime(2015, 1, 1),
},
],
"NextPageMarker": "string",
}
if operation_name == "GetDomainDetail":
return {
"DomainName": "test.domain.com",
"Nameservers": [
{
"Name": "8.8.8.8",
"GlueIps": [],
},
],
"AutoRenew": True,
"AdminContact": {},
"RegistrantContact": {},
"TechContact": {},
"AdminPrivacy": True,
"RegistrantPrivacy": True,
"TechPrivacy": True,
"RegistrarName": "string",
"WhoIsServer": "string",
"RegistrarUrl": "string",
"AbuseContactEmail": "string",
"AbuseContactPhone": "string",
"RegistryDomainId": "string",
"CreationDate": datetime(2015, 1, 1),
"UpdatedDate": datetime(2015, 1, 1),
"ExpirationDate": datetime(2015, 1, 1),
"Reseller": "string",
"DnsSec": "string",
"StatusList": ["clientTransferProhibited"],
}
return make_api_call(self, operation_name, kwarg)
# Patch every AWS call using Boto3 and generate_regional_clients to have 1 client
@patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call)
class Test_Route53_Service:
# Mocked Audit Info
def set_mocked_audit_info(self):
audit_info = AWS_Audit_Info(
original_session=None,
audit_session=session.Session(
profile_name=None,
botocore_session=None,
),
audited_account=None,
audited_user_id=None,
audited_partition="aws",
audited_identity_arn=None,
profile=None,
profile_region=AWS_REGION,
credentials=None,
assumed_role_info=None,
audited_regions=None,
organizations_metadata=None,
)
return audit_info
# Test Route53Domains Client
def test__get_client__(self):
route53domains = Route53Domains(self.set_mocked_audit_info())
assert route53domains.client.__class__.__name__ == "Route53Domains"
# Test Route53Domains Session
def test__get_session__(self):
route53domains = Route53Domains(self.set_mocked_audit_info())
assert route53domains.session.__class__.__name__ == "Session"
# Test Route53Domains Service
def test__get_service__(self):
route53domains = Route53Domains(self.set_mocked_audit_info())
assert route53domains.service == "route53domains"
def test__list_domains__(self):
route53domains = Route53Domains(self.set_mocked_audit_info())
domain_name = "test.domain.com"
assert len(route53domains.domains)
assert route53domains.domains
assert route53domains.domains[domain_name]
assert route53domains.domains[domain_name].name == domain_name
assert route53domains.domains[domain_name].region == AWS_REGION
assert route53domains.domains[domain_name].admin_privacy
assert route53domains.domains[domain_name].status_list
assert len(route53domains.domains[domain_name].status_list) == 1
assert (
"clientTransferProhibited"
in route53domains.domains[domain_name].status_list
)