from django.db.backends.base.schema import BaseDatabaseSchemaEditor
from django.db.models import NOT_PROVIDED, F, UniqueConstraint
from django.db.models.constants import LOOKUP_SEP
class DatabaseSchemaEditor(BaseDatabaseSchemaEditor):
sql_rename_table = "RENAME TABLE %(old_table)s TO %(new_table)s"
sql_alter_column_null = "MODIFY %(column)s %(type)s NULL"
sql_alter_column_not_null = "MODIFY %(column)s %(type)s NOT NULL"
sql_alter_column_type = "MODIFY %(column)s %(type)s%(collation)s%(comment)s"
sql_alter_column_no_default_null = "ALTER COLUMN %(column)s SET DEFAULT NULL"
# No 'CASCADE' which works as a no-op in MySQL but is undocumented
sql_delete_column = "ALTER TABLE %(table)s DROP COLUMN %(column)s"
sql_delete_unique = "ALTER TABLE %(table)s DROP INDEX %(name)s"
sql_create_column_inline_fk = (
", ADD CONSTRAINT %(name)s FOREIGN KEY (%(column)s) "
"REFERENCES %(to_table)s(%(to_column)s)"
)
sql_delete_fk = "ALTER TABLE %(table)s DROP FOREIGN KEY %(name)s"
sql_delete_index = "DROP INDEX %(name)s ON %(table)s"
sql_rename_index = "ALTER TABLE %(table)s RENAME INDEX %(old_name)s TO %(new_name)s"
sql_create_pk = (
"ALTER TABLE %(table)s ADD CONSTRAINT %(name)s PRIMARY KEY (%(columns)s)"
)
sql_delete_pk = "ALTER TABLE %(table)s DROP PRIMARY KEY"
sql_create_index = "CREATE INDEX %(name)s ON %(table)s (%(columns)s)%(extra)s"
sql_alter_table_comment = "ALTER TABLE %(table)s COMMENT = %(comment)s"
sql_alter_column_comment = None
@property
def sql_delete_check(self):
if self.connection.mysql_is_mariadb:
# The name of the column check constraint is the same as the field
# name on MariaDB. Adding IF EXISTS clause prevents migrations
# crash. Constraint is removed during a "MODIFY" column statement.
return "ALTER TABLE %(table)s DROP CONSTRAINT IF EXISTS %(name)s"
return "ALTER TABLE %(table)s DROP CHECK %(name)s"
@property
def sql_rename_column(self):
# MariaDB >= 10.5.2 and MySQL >= 8.0.4 support an
# "ALTER TABLE ... RENAME COLUMN" statement.
if self.connection.mysql_is_mariadb:
if self.connection.mysql_version >= (10, 5, 2):
return super().sql_rename_column
elif self.connection.mysql_version >= (8, 0, 4):
return super().sql_rename_column
return "ALTER TABLE %(table)s CHANGE %(old_column)s %(new_column)s %(type)s"
def quote_value(self, value):
self.connection.ensure_connection()
if isinstance(value, str):
value = value.replace("%", "%%")
# MySQLdb escapes to string, PyMySQL to bytes.
quoted = self.connection.connection.escape(
value, self.connection.connection.encoders
)
if isinstance(value, str) and isinstance(quoted, bytes):
quoted = quoted.decode()
return quoted
def _is_limited_data_type(self, field):
db_type = field.db_type(self.connection)
return (
db_type is not None
and db_type.lower() in self.connection._limited_data_types
)
def skip_default(self, field):
if not self._supports_limited_data_type_defaults:
return self._is_limited_data_type(field)
return False
def skip_default_on_alter(self, field):
if self._is_limited_data_type(field) and not self.connection.mysql_is_mariadb:
# MySQL doesn't support defaults for BLOB and TEXT in the
# ALTER COLUMN statement.
return True
return False
@property
def _supports_limited_data_type_defaults(self):
# MariaDB and MySQL >= 8.0.13 support defaults for BLOB and TEXT.
if self.connection.mysql_is_mariadb:
return True
return self.connection.mysql_version >= (8, 0, 13)
def _column_default_sql(self, field):
if (
not self.connection.mysql_is_mariadb
and self._supports_limited_data_type_defaults
and self._is_limited_data_type(field)
):
# MySQL supports defaults for BLOB and TEXT columns only if the
# default value is written as an expression i.e. in parentheses.
return "(%s)"
return super()._column_default_sql(field)
def add_field(self, model, field):
super().add_field(model, field)
# Simulate the effect of a one-off default.
# field.default may be unhashable, so a set isn't used for "in" check.
if self.skip_default(field) and field.default not in (None, NOT_PROVIDED):
effective_default = self.effective_default(field)
self.execute(
"UPDATE %(table)s SET %(column)s = %%s"
% {
"table": self.quote_name(model._meta.db_table),
"column": self.quote_name(field.column),
},
[effective_default],
)
def remove_constraint(self, model, constraint):
if (
isinstance(constraint, UniqueConstraint)
and constraint.create_sql(model, self) is not None
):
self._create_missing_fk_index(
model,
fields=constraint.fields,
expressions=constraint.expressions,
)
super().remove_constraint(model, constraint)
def remove_index(self, model, index):
self._create_missing_fk_index(
model,
fields=[field_name for field_name, _ in index.fields_orders],
expressions=index.expressions,
)
super().remove_index(model, index)
def _field_should_be_indexed(self, model, field):
if not super()._field_should_be_indexed(model, field):
return False
storage = self.connection.introspection.get_storage_engine(
self.connection.cursor(), model._meta.db_table
)
# No need to create an index for ForeignKey fields except if
# db_constraint=False because the index from that constraint won't be
# created.
if (
storage == "InnoDB"
and field.get_internal_type() == "ForeignKey"
and field.db_constraint
):
return False
return not self._is_limited_data_type(field)
def _create_missing_fk_index(
self,
model,
*,
fields,
expressions=None,
):
"""
MySQL can remove an implicit FK index on a field when that field is
covered by another index like a unique_together. "covered" here means
that the more complex index has the FK field as its first field (see
https://bugs.mysql.com/bug.php?id=37910).
Manually create an implicit FK index to make it possible to remove the
composed index.
"""
first_field_name = None
if fields:
first_field_name = fields[0]
elif (
expressions
and self.connection.features.supports_expression_indexes
and isinstance(expressions[0], F)
and LOOKUP_SEP not in expressions[0].name
):
first_field_name = expressions[0].name
if not first_field_name:
return
first_field = model._meta.get_field(first_field_name)
if first_field.get_internal_type() == "ForeignKey":
column = self.connection.introspection.identifier_converter(
first_field.column
)
with self.connection.cursor() as cursor:
constraint_names = [
name
for name, infodict in self.connection.introspection.get_constraints(
cursor, model._meta.db_table
).items()
if infodict["index"] and infodict["columns"][0] == column
]
# There are no other indexes that starts with the FK field, only
# the index that is expected to be deleted.
if len(constraint_names) == 1:
self.execute(
self._create_index_sql(model, fields=[first_field], suffix="")
)
def _delete_composed_index(self, model, fields, *args):
self._create_missing_fk_index(model, fields=fields)
return super()._delete_composed_index(model, fields, *args)
def _set_field_new_type_null_status(self, field, new_type):
"""
Keep the null property of the old field. If it has changed, it will be
handled separately.
"""
if field.null:
new_type += " NULL"
else:
new_type += " NOT NULL"
return new_type
def _alter_column_type_sql(
self, model, old_field, new_field, new_type, old_collation, new_collation
):
new_type = self._set_field_new_type_null_status(old_field, new_type)
return super()._alter_column_type_sql(
model, old_field, new_field, new_type, old_collation, new_collation
)
def _rename_field_sql(self, table, old_field, new_field, new_type):
new_type = self._set_field_new_type_null_status(old_field, new_type)
return super()._rename_field_sql(table, old_field, new_field, new_type)
def _alter_column_comment_sql(self, model, new_field, new_type, new_db_comment):
# Comment is alter when altering the column type.
return "", []
def _comment_sql(self, comment):
comment_sql = super()._comment_sql(comment)
return f" COMMENT {comment_sql}"