import datetime
import decimal
import json
from collections import defaultdict
from functools import reduce
from operator import or_
from django.core.exceptions import FieldDoesNotExist
from django.core.validators import EMPTY_VALUES
from django.db import models, router
from django.db.models.constants import LOOKUP_SEP
from django.db.models.deletion import Collector
from django.forms.utils import pretty_name
from django.urls import NoReverseMatch, reverse
from django.utils import formats, timezone
from django.utils.hashable import make_hashable
from django.utils.html import format_html
from django.utils.regex_helper import _lazy_re_compile
from django.utils.text import capfirst
from django.utils.translation import ngettext
from django.utils.translation import override as translation_override
QUOTE_MAP = {i: "_%02X" % i for i in b'":/_#?;@&=+$,"[]<>%\n\\'}
UNQUOTE_MAP = {v: chr(k) for k, v in QUOTE_MAP.items()}
UNQUOTE_RE = _lazy_re_compile("_(?:%s)" % "|".join([x[1:] for x in UNQUOTE_MAP]))
class FieldIsAForeignKeyColumnName(Exception):
"""A field is a foreign key attname, i.e. <FK>_id."""
pass
def lookup_spawns_duplicates(opts, lookup_path):
"""
Return True if the given lookup path spawns duplicates.
"""
lookup_fields = lookup_path.split(LOOKUP_SEP)
# Go through the fields (following all relations) and look for an m2m.
for field_name in lookup_fields:
if field_name == "pk":
field_name = opts.pk.name
try:
field = opts.get_field(field_name)
except FieldDoesNotExist:
# Ignore query lookups.
continue
else:
if hasattr(field, "path_infos"):
# This field is a relation; update opts to follow the relation.
path_info = field.path_infos
opts = path_info[-1].to_opts
if any(path.m2m for path in path_info):
# This field is a m2m relation so duplicates must be
# handled.
return True
return False
def get_last_value_from_parameters(parameters, key):
value = parameters.get(key)
return value[-1] if isinstance(value, list) else value
def prepare_lookup_value(key, value, separator=","):
"""
Return a lookup value prepared to be used in queryset filtering.
"""
if isinstance(value, list):
return [prepare_lookup_value(key, v, separator=separator) for v in value]
# if key ends with __in, split parameter into separate values
if key.endswith("__in"):
value = value.split(separator)
# if key ends with __isnull, special case '' and the string literals 'false' and '0'
elif key.endswith("__isnull"):
value = value.lower() not in ("", "false", "0")
return value
def build_q_object_from_lookup_parameters(parameters):
q_object = models.Q()
for param, param_item_list in parameters.items():
q_object &= reduce(or_, (models.Q((param, item)) for item in param_item_list))
return q_object
def quote(s):
"""
Ensure that primary key values do not confuse the admin URLs by escaping
any '/', '_' and ':' and similarly problematic characters.
Similar to urllib.parse.quote(), except that the quoting is slightly
different so that it doesn't get automatically unquoted by the web browser.
"""
return s.translate(QUOTE_MAP) if isinstance(s, str) else s
def unquote(s):
"""Undo the effects of quote()."""
return UNQUOTE_RE.sub(lambda m: UNQUOTE_MAP[m[0]], s)
def flatten(fields):
"""
Return a list which is a single level of flattening of the original list.
"""
flat = []
for field in fields:
if isinstance(field, (list, tuple)):
flat.extend(field)
else:
flat.append(field)
return flat
def flatten_fieldsets(fieldsets):
"""Return a list of field names from an admin fieldsets structure."""
field_names = []
for name, opts in fieldsets:
field_names.extend(flatten(opts["fields"]))
return field_names
def get_deleted_objects(objs, request, admin_site):
"""
Find all objects related to ``objs`` that should also be deleted. ``objs``
must be a homogeneous iterable of objects (e.g. a QuerySet).
Return a nested list of strings suitable for display in the
template with the ``unordered_list`` filter.
"""
try:
obj = objs[0]
except IndexError:
return [], {}, set(), []
else:
using = router.db_for_write(obj._meta.model)
collector = NestedObjects(using=using, origin=objs)
collector.collect(objs)
perms_needed = set()
def format_callback(obj):
model = obj.__class__
opts = obj._meta
no_edit_link = "%s: %s" % (capfirst(opts.verbose_name), obj)
if admin_site.is_registered(model):
if not admin_site.get_model_admin(model).has_delete_permission(
request, obj
):
perms_needed.add(opts.verbose_name)
try:
admin_url = reverse(
"%s:%s_%s_change"
% (admin_site.name, opts.app_label, opts.model_name),
None,
(quote(obj.pk),),
)
except NoReverseMatch:
# Change url doesn't exist -- don't display link to edit
return no_edit_link
# Display a link to the admin page.
return format_html(
'{}: <a href="{}">{}</a>', capfirst(opts.verbose_name), admin_url, obj
)
else:
# Don't display link to edit, because it either has no
# admin or is edited inline.
return no_edit_link
to_delete = collector.nested(format_callback)
protected = [format_callback(obj) for obj in collector.protected]
model_count = {
model._meta.verbose_name_plural: len(objs)
for model, objs in collector.model_objs.items()
}
return to_delete, model_count, perms_needed, protected
class NestedObjects(Collector):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.edges = {} # {from_instance: [to_instances]}
self.protected = set()
self.model_objs = defaultdict(set)
def add_edge(self, source, target):
self.edges.setdefault(source, []).append(target)
def collect(self, objs, source=None, source_attr=None, **kwargs):
for obj in objs:
if source_attr and not source_attr.endswith("+"):
related_name = source_attr % {
"class": source._meta.model_name,
"app_label": source._meta.app_label,
}
self.add_edge(getattr(obj, related_name), obj)
else:
self.add_edge(None, obj)
self.model_objs[obj._meta.model].add(obj)
try:
return super().collect(objs, source_attr=source_attr, **kwargs)
except models.ProtectedError as e:
self.protected.update(e.protected_objects)
except models.RestrictedError as e:
self.protected.update(e.restricted_objects)
def related_objects(self, related_model, related_fields, objs):
qs = super().related_objects(related_model, related_fields, objs)
return qs.select_related(
*[related_field.name for related_field in related_fields]
)
def _nested(self, obj, seen, format_callback):
if obj in seen:
return []
seen.add(obj)
children = []
for child in self.edges.get(obj, ()):
children.extend(self._nested(child, seen, format_callback))
if format_callback:
ret = [format_callback(obj)]
else:
ret = [obj]
if children:
ret.append(children)
return ret
def nested(self, format_callback=None):
"""
Return the graph as a nested list.
"""
seen = set()
roots = []
for root in self.edges.get(None, ()):
roots.extend(self._nested(root, seen, format_callback))
return roots
def can_fast_delete(self, *args, **kwargs):
"""
We always want to load the objects into memory so that we can display
them to the user in confirm page.
"""
return False
def model_format_dict(obj):
"""
Return a `dict` with keys 'verbose_name' and 'verbose_name_plural',
typically for use with string formatting.
`obj` may be a `Model` instance, `Model` subclass, or `QuerySet` instance.
"""
if isinstance(obj, (models.Model, models.base.ModelBase)):
opts = obj._meta
elif isinstance(obj, models.query.QuerySet):
opts = obj.model._meta
else:
opts = obj
return {
"verbose_name": opts.verbose_name,
"verbose_name_plural": opts.verbose_name_plural,
}
def model_ngettext(obj, n=None):
"""
Return the appropriate `verbose_name` or `verbose_name_plural` value for
`obj` depending on the count `n`.
`obj` may be a `Model` instance, `Model` subclass, or `QuerySet` instance.
If `obj` is a `QuerySet` instance, `n` is optional and the length of the
`QuerySet` is used.
"""
if isinstance(obj, models.query.QuerySet):
if n is None:
n = obj.count()
obj = obj.model
d = model_format_dict(obj)
singular, plural = d["verbose_name"], d["verbose_name_plural"]
return ngettext(singular, plural, n or 0)
def lookup_field(name, obj, model_admin=None):
opts = obj._meta
try:
f = _get_non_gfk_field(opts, name)
except (FieldDoesNotExist, FieldIsAForeignKeyColumnName):
# For non-regular field values, the value is either a method,
# property, related field, or returned via a callable.
if callable(name):
attr = name
value = attr(obj)
elif hasattr(model_admin, name) and name != "__str__":
attr = getattr(model_admin, name)
value = attr(obj)
else:
sentinel = object()
attr = getattr(obj, name, sentinel)
if callable(attr):
value = attr()
else:
if attr is sentinel:
attr = obj
for part in name.split(LOOKUP_SEP):
attr = getattr(attr, part, sentinel)
if attr is sentinel:
return None, None, None
value = attr
if hasattr(model_admin, "model") and hasattr(model_admin.model, name):
attr = getattr(model_admin.model, name)
f = None
else:
attr = None
value = getattr(obj, name)
return f, attr, value
def _get_non_gfk_field(opts, name):
"""
For historical reasons, the admin app relies on GenericForeignKeys as being
"not found" by get_field(). This could likely be cleaned up.
Reverse relations should also be excluded as these aren't attributes of the
model (rather something like `foo_set`).
"""
field = opts.get_field(name)
if (
field.is_relation
and
# Generic foreign keys OR reverse relations
((field.many_to_one and not field.related_model) or field.one_to_many)
):
raise FieldDoesNotExist()
# Avoid coercing <FK>_id fields to FK
if (
field.is_relation
and not field.many_to_many
and hasattr(field, "attname")
and field.attname == name
):
raise FieldIsAForeignKeyColumnName()
return field
def label_for_field(name, model, model_admin=None, return_attr=False, form=None):
"""
Return a sensible label for a field name. The name can be a callable,
property (but not created with @property decorator), or the name of an
object's attribute, as well as a model field, including across related
objects. If return_attr is True, also return the resolved attribute
(which could be a callable). This will be None if (and only if) the name
refers to a field.
"""
attr = None
try:
field = _get_non_gfk_field(model._meta, name)
try:
label = field.verbose_name
except AttributeError:
# field is likely a ForeignObjectRel
label = field.related_model._meta.verbose_name
except FieldDoesNotExist:
if name == "__str__":
label = str(model._meta.verbose_name)
attr = str
else:
if callable(name):
attr = name
elif hasattr(model_admin, name):
attr = getattr(model_admin, name)
elif hasattr(model, name):
attr = getattr(model, name)
elif form and name in form.fields:
attr = form.fields[name]
else:
try:
attr = get_fields_from_path(model, name)[-1]
except (FieldDoesNotExist, NotRelationField):
message = f"Unable to lookup '{name}' on {model._meta.object_name}"
if model_admin:
message += f" or {model_admin.__class__.__name__}"
if form:
message += f" or {form.__class__.__name__}"
raise AttributeError(message)
if hasattr(attr, "short_description"):
label = attr.short_description
elif (
isinstance(attr, property)
and hasattr(attr, "fget")
and hasattr(attr.fget, "short_description")
):
label = attr.fget.short_description
elif callable(attr):
if attr.__name__ == "<lambda>":
label = "--"
else:
label = pretty_name(attr.__name__)
else:
label = pretty_name(name)
except FieldIsAForeignKeyColumnName:
label = pretty_name(name)
attr = name
if return_attr:
return (label, attr)
else:
return label
def help_text_for_field(name, model):
help_text = ""
try:
field = _get_non_gfk_field(model._meta, name)
except (FieldDoesNotExist, FieldIsAForeignKeyColumnName):
pass
else:
if hasattr(field, "help_text"):
help_text = field.help_text
return help_text
def display_for_field(value, field, empty_value_display, avoid_link=False):
from django.contrib.admin.templatetags.admin_list import _boolean_icon
if getattr(field, "flatchoices", None):
try:
return dict(field.flatchoices).get(value, empty_value_display)
except TypeError:
# Allow list-like choices.
flatchoices = make_hashable(field.flatchoices)
value = make_hashable(value)
return dict(flatchoices).get(value, empty_value_display)
# BooleanField needs special-case null-handling, so it comes before the
# general null test.
elif isinstance(field, models.BooleanField):
return _boolean_icon(value)
elif value in field.empty_values:
return empty_value_display
elif isinstance(field, models.DateTimeField):
return formats.localize(timezone.template_localtime(value))
elif isinstance(field, (models.DateField, models.TimeField)):
return formats.localize(value)
elif isinstance(field, models.DecimalField):
return formats.number_format(value, field.decimal_places)
elif isinstance(field, (models.IntegerField, models.FloatField)):
return formats.number_format(value)
elif isinstance(field, models.FileField) and value and not avoid_link:
return format_html('<a href="{}">{}</a>', value.url, value)
elif isinstance(field, models.URLField) and value and not avoid_link:
return format_html('<a href="{}">{}</a>', value, value)
elif isinstance(field, models.JSONField) and value:
try:
return json.dumps(value, ensure_ascii=False, cls=field.encoder)
except TypeError:
return display_for_value(value, empty_value_display)
else:
return display_for_value(value, empty_value_display)
def display_for_value(value, empty_value_display, boolean=False):
from django.contrib.admin.templatetags.admin_list import _boolean_icon
if boolean:
return _boolean_icon(value)
elif value in EMPTY_VALUES:
return empty_value_display
elif isinstance(value, bool):
return str(value)
elif isinstance(value, datetime.datetime):
return formats.localize(timezone.template_localtime(value))
elif isinstance(value, (datetime.date, datetime.time)):
return formats.localize(value)
elif isinstance(value, (int, decimal.Decimal, float)):
return formats.number_format(value)
elif isinstance(value, (list, tuple)):
return ", ".join(str(v) for v in value)
else:
return str(value)
class NotRelationField(Exception):
pass
def get_model_from_relation(field):
if hasattr(field, "path_infos"):
return field.path_infos[-1].to_opts.model
else:
raise NotRelationField
def reverse_field_path(model, path):
"""Create a reversed field path.
E.g. Given (Order, "user__groups"),
return (Group, "user__order").
Final field must be a related model, not a data field.
"""
reversed_path = []
parent = model
pieces = path.split(LOOKUP_SEP)
for piece in pieces:
field = parent._meta.get_field(piece)
# skip trailing data field if extant:
if len(reversed_path) == len(pieces) - 1: # final iteration
try:
get_model_from_relation(field)
except NotRelationField:
break
# Field should point to another model
if field.is_relation and not (field.auto_created and not field.concrete):
related_name = field.related_query_name()
parent = field.remote_field.model
else:
related_name = field.field.name
parent = field.related_model
reversed_path.insert(0, related_name)
return (parent, LOOKUP_SEP.join(reversed_path))
def get_fields_from_path(model, path):
"""Return list of Fields given path relative to model.
e.g. (ModelX, "user__groups__name") -> [
<django.db.models.fields.related.ForeignKey object at 0x...>,
<django.db.models.fields.related.ManyToManyField object at 0x...>,
<django.db.models.fields.CharField object at 0x...>,
]
"""
pieces = path.split(LOOKUP_SEP)
fields = []
for piece in pieces:
if fields:
parent = get_model_from_relation(fields[-1])
else:
parent = model
fields.append(parent._meta.get_field(piece))
return fields
def construct_change_message(form, formsets, add):
"""
Construct a JSON structure describing changes from a changed object.
Translations are deactivated so that strings are stored untranslated.
Translation happens later on LogEntry access.
"""
# Evaluating `form.changed_data` prior to disabling translations is required
# to avoid fields affected by localization from being included incorrectly,
# e.g. where date formats differ such as MM/DD/YYYY vs DD/MM/YYYY.
changed_data = form.changed_data
with translation_override(None):
# Deactivate translations while fetching verbose_name for form
# field labels and using `field_name`, if verbose_name is not provided.
# Translations will happen later on LogEntry access.
changed_field_labels = _get_changed_field_labels_from_form(form, changed_data)
change_message = []
if add:
change_message.append({"added": {}})
elif form.changed_data:
change_message.append({"changed": {"fields": changed_field_labels}})
if formsets:
with translation_override(None):
for formset in formsets:
for added_object in formset.new_objects:
change_message.append(
{
"added": {
"name": str(added_object._meta.verbose_name),
"object": str(added_object),
}
}
)
for changed_object, changed_fields in formset.changed_objects:
change_message.append(
{
"changed": {
"name": str(changed_object._meta.verbose_name),
"object": str(changed_object),
"fields": _get_changed_field_labels_from_form(
formset.forms[0], changed_fields
),
}
}
)
for deleted_object in formset.deleted_objects:
change_message.append(
{
"deleted": {
"name": str(deleted_object._meta.verbose_name),
"object": str(deleted_object),
}
}
)
return change_message
def _get_changed_field_labels_from_form(form, changed_data):
changed_field_labels = []
for field_name in changed_data:
try:
verbose_field_name = form.fields[field_name].label or field_name
except KeyError:
verbose_field_name = field_name
changed_field_labels.append(str(verbose_field_name))
return changed_field_labels