django/core/management/commands/optimizemigration.py

import shutil
import sys

from django.apps import apps
from django.core.management.base import BaseCommand, CommandError
from django.core.management.utils import run_formatters
from django.db import migrations
from django.db.migrations.exceptions import AmbiguityError
from django.db.migrations.loader import MigrationLoader
from django.db.migrations.optimizer import MigrationOptimizer
from django.db.migrations.writer import MigrationWriter
from django.utils.version import get_docs_version


class Command(BaseCommand):
    help = "Optimizes the operations for the named migration."

    def add_arguments(self, parser):
        parser.add_argument(
            "app_label",
            help="App label of the application to optimize the migration for.",
        )
        parser.add_argument(
            "migration_name", help="Migration name to optimize the operations for."
        )
        parser.add_argument(
            "--check",
            action="store_true",
            help="Exit with a non-zero status if the migration can be optimized.",
        )

    def handle(self, *args, **options):
        verbosity = options["verbosity"]
        app_label = options["app_label"]
        migration_name = options["migration_name"]
        check = options["check"]

        # Validate app_label.
        try:
            apps.get_app_config(app_label)
        except LookupError as err:
            raise CommandError(str(err))

        # Load the current graph state.
        loader = MigrationLoader(None)
        if app_label not in loader.migrated_apps:
            raise CommandError(f"App '{app_label}' does not have migrations.")
        # Find a migration.
        try:
            migration = loader.get_migration_by_prefix(app_label, migration_name)
        except AmbiguityError:
            raise CommandError(
                f"More than one migration matches '{migration_name}' in app "
                f"'{app_label}'. Please be more specific."
            )
        except KeyError:
            raise CommandError(
                f"Cannot find a migration matching '{migration_name}' from app "
                f"'{app_label}'."
            )

        # Optimize the migration.
        optimizer = MigrationOptimizer()
        new_operations = optimizer.optimize(migration.operations, migration.app_label)
        if len(migration.operations) == len(new_operations):
            if verbosity > 0:
                self.stdout.write("No optimizations possible.")
            return
        else:
            if verbosity > 0:
                self.stdout.write(
                    "Optimizing from %d operations to %d operations."
                    % (len(migration.operations), len(new_operations))
                )
            if check:
                sys.exit(1)

        # Set the new migration optimizations.
        migration.operations = new_operations

        # Write out the optimized migration file.
        writer = MigrationWriter(migration)
        migration_file_string = writer.as_string()
        if writer.needs_manual_porting:
            if migration.replaces:
                raise CommandError(
                    "Migration will require manual porting but is already a squashed "
                    "migration.\nTransition to a normal migration first: "
                    "https://docs.djangoproject.com/en/%s/topics/migrations/"
                    "#squashing-migrations" % get_docs_version()
                )
            # Make a new migration with those operations.
            subclass = type(
                "Migration",
                (migrations.Migration,),
                {
                    "dependencies": migration.dependencies,
                    "operations": new_operations,
                    "replaces": [(migration.app_label, migration.name)],
                },
            )
            optimized_migration_name = "%s_optimized" % migration.name
            optimized_migration = subclass(optimized_migration_name, app_label)
            writer = MigrationWriter(optimized_migration)
            migration_file_string = writer.as_string()
            if verbosity > 0:
                self.stdout.write(
                    self.style.MIGRATE_HEADING("Manual porting required") + "\n"
                    "  Your migrations contained functions that must be manually "
                    "copied over,\n"
                    "  as we could not safely copy their implementation.\n"
                    "  See the comment at the top of the optimized migration for "
                    "details."
                )
                if shutil.which("black"):
                    self.stdout.write(
                        self.style.WARNING(
                            "Optimized migration couldn't be formatted using the "
                            '"black" command. You can call it manually.'
                        )
                    )
        with open(writer.path, "w", encoding="utf-8") as fh:
            fh.write(migration_file_string)
        run_formatters([writer.path])

        if verbosity > 0:
            self.stdout.write(
                self.style.MIGRATE_HEADING(f"Optimized migration {writer.path}")
            )
Metadata
View Raw File