chore(azure): Improve AzureService class with __set_clients__ (#2676)

This commit is contained in:
Pepe Fagoaga
2023-08-04 13:04:05 +02:00
committed by GitHub
parent e0bfef2ece
commit bf77f817cb
5 changed files with 30 additions and 94 deletions

View File

@@ -1,8 +1,28 @@
class AzureService:
def __init__(self, service, audit_info):
# We receive the service using __class__.__name__ or the service name in lowercase
# e.g.: Storage --> we need a lowercase string, so service.lower()
self.service = service.lower() if not service.islower() else service
from prowler.lib.logger import logger
class AzureService:
def __init__(
self,
service,
audit_info,
):
self.clients = self.__set_clients__(
audit_info.identity.subscriptions, audit_info.credentials, service
)
self.credentials = audit_info.credentials
self.subscriptions = audit_info.identity.subscriptions
def __set_clients__(self, subscriptions, credentials, service):
clients = {}
try:
for display_name, id in subscriptions.items():
clients.update(
{display_name: service(credential=credentials, subscription_id=id)}
)
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
else:
return clients

View File

@@ -10,31 +10,10 @@ from prowler.providers.azure.lib.service.service import AzureService
########################## Defender
class Defender(AzureService):
def __init__(self, audit_info):
super().__init__(__class__.__name__, audit_info)
super().__init__(SecurityCenter, audit_info)
self.clients = self.__set_clients__(
audit_info.identity.subscriptions, audit_info.credentials
)
self.pricings = self.__get_pricings__()
def __set_clients__(self, subscriptions, credentials):
clients = {}
try:
for display_name, id in subscriptions.items():
clients.update(
{
display_name: SecurityCenter(
credential=credentials, subscription_id=id
)
}
)
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
else:
return clients
def __get_pricings__(self):
logger.info("Defender - Getting pricings...")
pricings = {}

View File

@@ -10,30 +10,9 @@ from prowler.providers.azure.lib.service.service import AzureService
########################## IAM
class IAM(AzureService):
def __init__(self, audit_info):
super().__init__(__class__.__name__, audit_info)
self.clients = self.__set_clients__(
audit_info.identity.subscriptions, audit_info.credentials
)
super().__init__(AuthorizationManagementClient, audit_info)
self.roles = self.__get_roles__()
def __set_clients__(self, subscriptions, credentials):
clients = {}
try:
for display_name, id in subscriptions.items():
clients.update(
{
display_name: AuthorizationManagementClient(
credential=credentials, subscription_id=id
)
}
)
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
else:
return clients
def __get_roles__(self):
logger.info("IAM - Getting roles...")
roles = {}

View File

@@ -14,30 +14,9 @@ from prowler.providers.azure.lib.service.service import AzureService
########################## SQLServer
class SQLServer(AzureService):
def __init__(self, audit_info):
super().__init__(__class__.__name__, audit_info)
self.clients = self.__set_clients__(
audit_info.identity.subscriptions, audit_info.credentials
)
super().__init__(SqlManagementClient, audit_info)
self.sql_servers = self.__get_sql_servers__()
def __set_clients__(self, subscriptions, credentials):
clients = {}
try:
for display_name, id in subscriptions.items():
clients.update(
{
display_name: SqlManagementClient(
credential=credentials, subscription_id=id
)
}
)
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
else:
return clients
def __get_sql_servers__(self):
logger.info("SQL Server - Getting SQL servers...")
sql_servers = {}

View File

@@ -10,30 +10,9 @@ from prowler.providers.azure.lib.service.service import AzureService
########################## Storage
class Storage(AzureService):
def __init__(self, audit_info):
super().__init__(__class__.__name__, audit_info)
self.clients = self.__set_clients__(
audit_info.identity.subscriptions, audit_info.credentials
)
super().__init__(StorageManagementClient, audit_info)
self.storage_accounts = self.__get_storage_accounts__()
def __set_clients__(self, subscriptions, credentials):
clients = {}
try:
for display_name, id in subscriptions.items():
clients.update(
{
display_name: StorageManagementClient(
credential=credentials, subscription_id=id
)
}
)
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
else:
return clients
def __get_storage_accounts__(self):
logger.info("Storage - Getting storage accounts...")
storage_accounts = {}