import gzip
import os
import warnings
from django.apps import apps
from django.core import serializers
from django.core.management.base import BaseCommand, CommandError
from django.core.management.utils import parse_apps_and_model_labels
from django.db import DEFAULT_DB_ALIAS, router
try:
import bz2
has_bz2 = True
except ImportError:
has_bz2 = False
try:
import lzma
has_lzma = True
except ImportError:
has_lzma = False
class ProxyModelWarning(Warning):
pass
class Command(BaseCommand):
help = (
"Output the contents of the database as a fixture of the given format "
"(using each model's default manager unless --all is specified)."
)
def add_arguments(self, parser):
parser.add_argument(
"args",
metavar="app_label[.ModelName]",
nargs="*",
help=(
"Restricts dumped data to the specified app_label or "
"app_label.ModelName."
),
)
parser.add_argument(
"--format",
default="json",
help="Specifies the output serialization format for fixtures.",
)
parser.add_argument(
"--indent",
type=int,
help="Specifies the indent level to use when pretty-printing output.",
)
parser.add_argument(
"--database",
default=DEFAULT_DB_ALIAS,
help="Nominates a specific database to dump fixtures from. "
'Defaults to the "default" database.',
)
parser.add_argument(
"-e",
"--exclude",
action="append",
default=[],
help="An app_label or app_label.ModelName to exclude "
"(use multiple --exclude to exclude multiple apps/models).",
)
parser.add_argument(
"--natural-foreign",
action="store_true",
dest="use_natural_foreign_keys",
help="Use natural foreign keys if they are available.",
)
parser.add_argument(
"--natural-primary",
action="store_true",
dest="use_natural_primary_keys",
help="Use natural primary keys if they are available.",
)
parser.add_argument(
"-a",
"--all",
action="store_true",
dest="use_base_manager",
help=(
"Use Django's base manager to dump all models stored in the database, "
"including those that would otherwise be filtered or modified by a "
"custom manager."
),
)
parser.add_argument(
"--pks",
dest="primary_keys",
help="Only dump objects with given primary keys. Accepts a comma-separated "
"list of keys. This option only works when you specify one model.",
)
parser.add_argument(
"-o", "--output", help="Specifies file to which the output is written."
)
def handle(self, *app_labels, **options):
format = options["format"]
indent = options["indent"]
using = options["database"]
excludes = options["exclude"]
output = options["output"]
show_traceback = options["traceback"]
use_natural_foreign_keys = options["use_natural_foreign_keys"]
use_natural_primary_keys = options["use_natural_primary_keys"]
use_base_manager = options["use_base_manager"]
pks = options["primary_keys"]
if pks:
primary_keys = [pk.strip() for pk in pks.split(",")]
else:
primary_keys = []
excluded_models, excluded_apps = parse_apps_and_model_labels(excludes)
if not app_labels:
if primary_keys:
raise CommandError("You can only use --pks option with one model")
app_list = dict.fromkeys(
app_config
for app_config in apps.get_app_configs()
if app_config.models_module is not None
and app_config not in excluded_apps
)
else:
if len(app_labels) > 1 and primary_keys:
raise CommandError("You can only use --pks option with one model")
app_list = {}
for label in app_labels:
try:
app_label, model_label = label.split(".")
try:
app_config = apps.get_app_config(app_label)
except LookupError as e:
raise CommandError(str(e))
if app_config.models_module is None or app_config in excluded_apps:
continue
try:
model = app_config.get_model(model_label)
except LookupError:
raise CommandError(
"Unknown model: %s.%s" % (app_label, model_label)
)
app_list_value = app_list.setdefault(app_config, [])
# We may have previously seen an "all-models" request for
# this app (no model qualifier was given). In this case
# there is no need adding specific models to the list.
if app_list_value is not None and model not in app_list_value:
app_list_value.append(model)
except ValueError:
if primary_keys:
raise CommandError(
"You can only use --pks option with one model"
)
# This is just an app - no model qualifier
app_label = label
try:
app_config = apps.get_app_config(app_label)
except LookupError as e:
raise CommandError(str(e))
if app_config.models_module is None or app_config in excluded_apps:
continue
app_list[app_config] = None
# Check that the serialization format exists; this is a shortcut to
# avoid collating all the objects and _then_ failing.
if format not in serializers.get_public_serializer_formats():
try:
serializers.get_serializer(format)
except serializers.SerializerDoesNotExist:
pass
raise CommandError("Unknown serialization format: %s" % format)
def get_objects(count_only=False):
"""
Collate the objects to be serialized. If count_only is True, just
count the number of objects to be serialized.
"""
if use_natural_foreign_keys:
models = serializers.sort_dependencies(
app_list.items(), allow_cycles=True
)
else:
# There is no need to sort dependencies when natural foreign
# keys are not used.
models = []
for app_config, model_list in app_list.items():
if model_list is None:
models.extend(app_config.get_models())
else:
models.extend(model_list)
for model in models:
if model in excluded_models:
continue
if model._meta.proxy and model._meta.proxy_for_model not in models:
warnings.warn(
"%s is a proxy model and won't be serialized."
% model._meta.label,
category=ProxyModelWarning,
)
if not model._meta.proxy and router.allow_migrate_model(using, model):
if use_base_manager:
objects = model._base_manager
else:
objects = model._default_manager
queryset = objects.using(using).order_by(model._meta.pk.name)
if primary_keys:
queryset = queryset.filter(pk__in=primary_keys)
if count_only:
yield queryset.order_by().count()
else:
chunk_size = (
2000 if queryset._prefetch_related_lookups else None
)
yield from queryset.iterator(chunk_size=chunk_size)
try:
self.stdout.ending = None
progress_output = None
object_count = 0
# If dumpdata is outputting to stdout, there is no way to display progress
if output and self.stdout.isatty() and options["verbosity"] > 0:
progress_output = self.stdout
object_count = sum(get_objects(count_only=True))
if output:
file_root, file_ext = os.path.splitext(output)
compression_formats = {
".bz2": (open, {}, file_root),
".gz": (gzip.open, {}, output),
".lzma": (open, {}, file_root),
".xz": (open, {}, file_root),
".zip": (open, {}, file_root),
}
if has_bz2:
compression_formats[".bz2"] = (bz2.open, {}, output)
if has_lzma:
compression_formats[".lzma"] = (
lzma.open,
{"format": lzma.FORMAT_ALONE},
output,
)
compression_formats[".xz"] = (lzma.open, {}, output)
try:
open_method, kwargs, file_path = compression_formats[file_ext]
except KeyError:
open_method, kwargs, file_path = (open, {}, output)
if file_path != output:
file_name = os.path.basename(file_path)
warnings.warn(
f"Unsupported file extension ({file_ext}). "
f"Fixtures saved in '{file_name}'.",
RuntimeWarning,
)
stream = open_method(file_path, "wt", **kwargs)
else:
stream = None
try:
serializers.serialize(
format,
get_objects(),
indent=indent,
use_natural_foreign_keys=use_natural_foreign_keys,
use_natural_primary_keys=use_natural_primary_keys,
stream=stream or self.stdout,
progress_output=progress_output,
object_count=object_count,
)
finally:
if stream:
stream.close()
except Exception as e:
if show_traceback:
raise
raise CommandError("Unable to serialize database: %s" % e)