from asgiref.local import Local
from django.conf import settings as django_settings
from django.utils.functional import cached_property
class ConnectionProxy:
"""Proxy for accessing a connection object's attributes."""
def __init__(self, connections, alias):
self.__dict__["_connections"] = connections
self.__dict__["_alias"] = alias
def __getattr__(self, item):
return getattr(self._connections[self._alias], item)
def __setattr__(self, name, value):
return setattr(self._connections[self._alias], name, value)
def __delattr__(self, name):
return delattr(self._connections[self._alias], name)
def __contains__(self, key):
return key in self._connections[self._alias]
def __eq__(self, other):
return self._connections[self._alias] == other
class ConnectionDoesNotExist(Exception):
pass
class BaseConnectionHandler:
settings_name = None
exception_class = ConnectionDoesNotExist
thread_critical = False
def __init__(self, settings=None):
self._settings = settings
self._connections = Local(self.thread_critical)
@cached_property
def settings(self):
self._settings = self.configure_settings(self._settings)
return self._settings
def configure_settings(self, settings):
if settings is None:
settings = getattr(django_settings, self.settings_name)
return settings
def create_connection(self, alias):
raise NotImplementedError("Subclasses must implement create_connection().")
def __getitem__(self, alias):
try:
return getattr(self._connections, alias)
except AttributeError:
if alias not in self.settings:
raise self.exception_class(f"The connection '{alias}' doesn't exist.")
conn = self.create_connection(alias)
setattr(self._connections, alias, conn)
return conn
def __setitem__(self, key, value):
setattr(self._connections, key, value)
def __delitem__(self, key):
delattr(self._connections, key)
def __iter__(self):
return iter(self.settings)
def all(self, initialized_only=False):
return [
self[alias]
for alias in self
# If initialized_only is True, return only initialized connections.
if not initialized_only or hasattr(self._connections, alias)
]
def close_all(self):
for conn in self.all(initialized_only=True):
conn.close()