django/contrib/gis/db/backends/mysql/operations.py

from django.contrib.gis.db.backends.base.adapter import WKTAdapter
from django.contrib.gis.db.backends.base.operations import (
    BaseSpatialOperations,
)
from django.contrib.gis.db.backends.utils import SpatialOperator
from django.contrib.gis.db.models import aggregates
from django.contrib.gis.geos.geometry import GEOSGeometryBase
from django.contrib.gis.geos.prototypes.io import wkb_r
from django.contrib.gis.measure import Distance
from django.db.backends.mysql.operations import DatabaseOperations
from django.utils.functional import cached_property


class MySQLOperations(BaseSpatialOperations, DatabaseOperations):

    mysql = True
    name = 'mysql'
    geom_func_prefix = 'ST_'

    Adapter = WKTAdapter

    @cached_property
    def is_mysql_5_6(self):
        return self.connection.mysql_version < (5, 7, 6)

    @cached_property
    def select(self):
        return self.geom_func_prefix + 'AsBinary(%s)'

    @cached_property
    def from_text(self):
        return self.geom_func_prefix + 'GeomFromText'

    @cached_property
    def gis_operators(self):
        MBREquals = 'MBREqual' if self.is_mysql_5_6 else 'MBREquals'
        return {
            'bbcontains': SpatialOperator(func='MBRContains'),  # For consistency w/PostGIS API
            'bboverlaps': SpatialOperator(func='MBROverlaps'),  # ...
            'contained': SpatialOperator(func='MBRWithin'),  # ...
            'contains': SpatialOperator(func='MBRContains'),
            'disjoint': SpatialOperator(func='MBRDisjoint'),
            'equals': SpatialOperator(func=MBREquals),
            'exact': SpatialOperator(func=MBREquals),
            'intersects': SpatialOperator(func='MBRIntersects'),
            'overlaps': SpatialOperator(func='MBROverlaps'),
            'same_as': SpatialOperator(func=MBREquals),
            'touches': SpatialOperator(func='MBRTouches'),
            'within': SpatialOperator(func='MBRWithin'),
        }

    disallowed_aggregates = (
        aggregates.Collect, aggregates.Extent, aggregates.Extent3D,
        aggregates.MakeLine, aggregates.Union,
    )

    @cached_property
    def unsupported_functions(self):
        unsupported = {
            'AsGML', 'AsKML', 'AsSVG', 'Azimuth', 'BoundingCircle',
            'ForcePolygonCW', 'ForceRHR', 'LineLocatePoint', 'MakeValid',
            'MemSize', 'Perimeter', 'PointOnSurface', 'Reverse', 'Scale',
            'SnapToGrid', 'Transform', 'Translate',
        }
        if self.connection.mysql_version < (5, 7, 5):
            unsupported.update({'AsGeoJSON', 'GeoHash', 'IsValid'})
        return unsupported

    def geo_db_type(self, f):
        return f.geom_type

    def get_distance(self, f, value, lookup_type):
        value = value[0]
        if isinstance(value, Distance):
            if f.geodetic(self.connection):
                raise ValueError(
                    'Only numeric values of degree units are allowed on '
                    'geodetic distance queries.'
                )
            dist_param = getattr(value, Distance.unit_attname(f.units_name(self.connection)))
        else:
            dist_param = value
        return [dist_param]

    def get_geometry_converter(self, expression):
        read = wkb_r().read
        srid = expression.output_field.srid
        if srid == -1:
            srid = None
        geom_class = expression.output_field.geom_class

        def converter(value, expression, connection):
            if value is not None:
                geom = GEOSGeometryBase(read(memoryview(value)), geom_class)
                if srid:
                    geom.srid = srid
                return geom
        return converter
Metadata
View Raw File