django/contrib/gis/db/backends/spatialite/operations.py

"""
SQL functions reference lists:
https://www.gaia-gis.it/gaia-sins/spatialite-sql-4.3.0.html
"""
from django.contrib.gis.db import models
from django.contrib.gis.db.backends.base.operations import (
    BaseSpatialOperations,
)
from django.contrib.gis.db.backends.spatialite.adapter import SpatiaLiteAdapter
from django.contrib.gis.db.backends.utils import SpatialOperator
from django.contrib.gis.geos.geometry import GEOSGeometry, GEOSGeometryBase
from django.contrib.gis.geos.prototypes.io import wkb_r
from django.contrib.gis.measure import Distance
from django.core.exceptions import ImproperlyConfigured
from django.db.backends.sqlite3.operations import DatabaseOperations
from django.utils.functional import cached_property
from django.utils.version import get_version_tuple


class SpatialiteNullCheckOperator(SpatialOperator):
    def as_sql(self, connection, lookup, template_params, sql_params):
        sql, params = super().as_sql(connection, lookup, template_params, sql_params)
        return '%s > 0' % sql, params


class SpatiaLiteOperations(BaseSpatialOperations, DatabaseOperations):
    name = 'spatialite'
    spatialite = True

    Adapter = SpatiaLiteAdapter

    collect = 'Collect'
    extent = 'Extent'
    makeline = 'MakeLine'
    unionagg = 'GUnion'

    from_text = 'GeomFromText'

    gis_operators = {
        # Binary predicates
        'equals': SpatialiteNullCheckOperator(func='Equals'),
        'disjoint': SpatialiteNullCheckOperator(func='Disjoint'),
        'touches': SpatialiteNullCheckOperator(func='Touches'),
        'crosses': SpatialiteNullCheckOperator(func='Crosses'),
        'within': SpatialiteNullCheckOperator(func='Within'),
        'overlaps': SpatialiteNullCheckOperator(func='Overlaps'),
        'contains': SpatialiteNullCheckOperator(func='Contains'),
        'intersects': SpatialiteNullCheckOperator(func='Intersects'),
        'relate': SpatialiteNullCheckOperator(func='Relate'),
        'coveredby': SpatialiteNullCheckOperator(func='CoveredBy'),
        'covers': SpatialiteNullCheckOperator(func='Covers'),
        # Returns true if B's bounding box completely contains A's bounding box.
        'contained': SpatialOperator(func='MbrWithin'),
        # Returns true if A's bounding box completely contains B's bounding box.
        'bbcontains': SpatialOperator(func='MbrContains'),
        # Returns true if A's bounding box overlaps B's bounding box.
        'bboverlaps': SpatialOperator(func='MbrOverlaps'),
        # These are implemented here as synonyms for Equals
        'same_as': SpatialiteNullCheckOperator(func='Equals'),
        'exact': SpatialiteNullCheckOperator(func='Equals'),
        # Distance predicates
        'dwithin': SpatialOperator(func='PtDistWithin'),
    }

    disallowed_aggregates = (models.Extent3D,)

    select = 'CAST (AsEWKB(%s) AS BLOB)'

    function_names = {
        'AsWKB': 'St_AsBinary',
        'ForcePolygonCW': 'ST_ForceLHR',
        'Length': 'ST_Length',
        'LineLocatePoint': 'ST_Line_Locate_Point',
        'NumPoints': 'ST_NPoints',
        'Reverse': 'ST_Reverse',
        'Scale': 'ScaleCoords',
        'Translate': 'ST_Translate',
        'Union': 'ST_Union',
    }

    @cached_property
    def unsupported_functions(self):
        unsupported = {'BoundingCircle', 'GeometryDistance', 'MemSize'}
        if not self.lwgeom_version():
            unsupported |= {'Azimuth', 'GeoHash', 'IsValid', 'MakeValid'}
        return unsupported

    @cached_property
    def spatial_version(self):
        """Determine the version of the SpatiaLite library."""
        try:
            version = self.spatialite_version_tuple()[1:]
        except Exception as exc:
            raise ImproperlyConfigured(
                'Cannot determine the SpatiaLite version for the "%s" database. '
                'Was the SpatiaLite initialization SQL loaded on this database?' % (
                    self.connection.settings_dict['NAME'],
                )
            ) from exc
        if version < (4, 3, 0):
            raise ImproperlyConfigured('GeoDjango supports SpatiaLite 4.3.0 and above.')
        return version

    def convert_extent(self, box):
        """
        Convert the polygon data received from SpatiaLite to min/max values.
        """
        if box is None:
            return None
        shell = GEOSGeometry(box).shell
        xmin, ymin = shell[0][:2]
        xmax, ymax = shell[2][:2]
        return (xmin, ymin, xmax, ymax)

    def geo_db_type(self, f):
        """
        Return None because geometry columns are added via the
        `AddGeometryColumn` stored procedure on SpatiaLite.
        """
        return None

    def get_distance(self, f, value, lookup_type):
        """
        Return the distance parameters for the given geometry field,
        lookup value, and lookup type.
        """
        if not value:
            return []
        value = value[0]
        if isinstance(value, Distance):
            if f.geodetic(self.connection):
                if lookup_type == 'dwithin':
                    raise ValueError(
                        'Only numeric values of degree units are allowed on '
                        'geographic DWithin queries.'
                    )
                dist_param = value.m
            else:
                dist_param = getattr(value, Distance.unit_attname(f.units_name(self.connection)))
        else:
            dist_param = value
        return [dist_param]

    def _get_spatialite_func(self, func):
        """
        Helper routine for calling SpatiaLite functions and returning
        their result.
        Any error occurring in this method should be handled by the caller.
        """
        cursor = self.connection._cursor()
        try:
            cursor.execute('SELECT %s' % func)
            row = cursor.fetchone()
        finally:
            cursor.close()
        return row[0]

    def geos_version(self):
        "Return the version of GEOS used by SpatiaLite as a string."
        return self._get_spatialite_func('geos_version()')

    def proj4_version(self):
        "Return the version of the PROJ.4 library used by SpatiaLite."
        return self._get_spatialite_func('proj4_version()')

    def lwgeom_version(self):
        """Return the version of LWGEOM library used by SpatiaLite."""
        return self._get_spatialite_func('lwgeom_version()')

    def spatialite_version(self):
        "Return the SpatiaLite library version as a string."
        return self._get_spatialite_func('spatialite_version()')

    def spatialite_version_tuple(self):
        """
        Return the SpatiaLite version as a tuple (version string, major,
        minor, subminor).
        """
        version = self.spatialite_version()
        return (version,) + get_version_tuple(version)

    def spatial_aggregate_name(self, agg_name):
        """
        Return the spatial aggregate SQL template and function for the
        given Aggregate instance.
        """
        agg_name = 'unionagg' if agg_name.lower() == 'union' else agg_name.lower()
        return getattr(self, agg_name)

    # Routines for getting the OGC-compliant models.
    def geometry_columns(self):
        from django.contrib.gis.db.backends.spatialite.models import (
            SpatialiteGeometryColumns,
        )
        return SpatialiteGeometryColumns

    def spatial_ref_sys(self):
        from django.contrib.gis.db.backends.spatialite.models import (
            SpatialiteSpatialRefSys,
        )
        return SpatialiteSpatialRefSys

    def get_geometry_converter(self, expression):
        geom_class = expression.output_field.geom_class
        read = wkb_r().read

        def converter(value, expression, connection):
            return None if value is None else GEOSGeometryBase(read(value), geom_class)
        return converter
Metadata
View Raw File