Skip to content

Commit

Permalink
Merge pull request #400 from microsoft/164
Browse files Browse the repository at this point in the history
Add function to keep index when alter FK
  • Loading branch information
mShan0 authored Jul 4, 2024
2 parents b2eec58 + 6fd9a0b commit cdad07b
Show file tree
Hide file tree
Showing 2 changed files with 177 additions and 5 deletions.
28 changes: 23 additions & 5 deletions mssql/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from django import VERSION as django_version
from django.db.models import NOT_PROVIDED, Index, UniqueConstraint
from django.db.models.fields import AutoField, BigAutoField
from django.db.models.fields.related import ForeignKey
from django.db.models.sql.where import AND
from django.db.transaction import TransactionManagementError
from django.utils.encoding import force_str
Expand Down Expand Up @@ -183,7 +184,7 @@ def _alter_column_database_default_sql(
argument) a default to new_field's column.
"""
column = self.quote_name(new_field.column)

if drop:
# SQL Server requires the name of the default constraint
result = self.execute(
Expand Down Expand Up @@ -426,10 +427,20 @@ def _alter_field(self, model, old_field, new_field, old_type, new_type,
)
):
# Drop index, SQL Server requires explicit deletion
if not hasattr(new_field, 'db_constraint') or not new_field.db_constraint:
index_names = self._constraint_names(model, [old_field.column], index=True)
for index_name in index_names:
self.execute(self._delete_constraint_sql(self.sql_delete_index, model, index_name))
if (
not hasattr(new_field, "db_constraint")
or not new_field.db_constraint
):
if(django_version < (4, 2)
or (
not isinstance(new_field, ForeignKey)
or type(new_field.db_comment) == type(None)
or "fk_on_delete_keep_index" not in new_field.db_comment
)
):
index_names = self._constraint_names(model, [old_field.column], index=True)
for index_name in index_names:
self.execute(self._delete_constraint_sql(self.sql_delete_index, model, index_name))

fk_names = self._constraint_names(model, [old_field.column], foreign_key=True)
if strict and len(fk_names) != 1:
Expand Down Expand Up @@ -911,6 +922,13 @@ def _alter_field(self, model, old_field, new_field, old_type, new_type,
self.connection.close()

def _delete_indexes(self, model, old_field, new_field):
if (
django_version >= (4, 2)
and isinstance(new_field, ForeignKey)
and type(new_field.db_comment) != type(None)
and "fk_on_delete_keep_index" in new_field.db_comment
):
return
index_columns = []
index_names = []
if old_field.db_index and new_field.db_index:
Expand Down
154 changes: 154 additions & 0 deletions testapp/tests/test_indexes.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from django.db.models import UniqueConstraint
from django.db.utils import DEFAULT_DB_ALIAS, ConnectionHandler, ProgrammingError
from django.test import TestCase
from unittest import skipIf

from . import get_constraints
from ..models import (
Expand Down Expand Up @@ -210,3 +211,156 @@ def test_alter_unique_nullable_to_non_nullable(self):
migration.apply(new_state, editor)
except django.db.utils.ProgrammingError as e:
self.fail('Check if can alter field from unique, nullable to unique non-nullable for issue #23, AlterField failed with exception: %s' % e)

class TestKeepIndexWithDbcomment(TestCase):
def _find_key_with_type_idx(self, input_dict):
for key, value in input_dict.items():
if value.get("type") == "idx":
return key
return None

@skipIf(VERSION < (4, 2), "db_comment not available before 4.2")
def test_drop_foreignkey(self):
app_label = "test_drop_foreignkey"
operations = [
migrations.CreateModel(
name="brand",
fields=[
("id", models.AutoField(primary_key=True)),
("name", models.CharField(max_length=100)),
],
),
migrations.CreateModel(
name="car1",
fields=[
("id", models.AutoField(primary_key=True)),
(
"brand",
models.ForeignKey(
on_delete=django.db.models.deletion.CASCADE,
to="test_drop_foreignkey.brand",
related_name="car1",
db_constraint=True,
),
),
],
),
migrations.CreateModel(
name="car2",
fields=[
("id", models.AutoField(primary_key=True)),
(
"brand",
models.ForeignKey(
on_delete=django.db.models.deletion.CASCADE,
to="test_drop_foreignkey.brand",
related_name="car2",
db_constraint=True,
),
),
],
),
migrations.CreateModel(
name="car3",
fields=[
("id", models.AutoField(primary_key=True)),
(
"brand",
models.ForeignKey(
on_delete=django.db.models.deletion.CASCADE,
to="test_drop_foreignkey.brand",
related_name="car3",
db_constraint=True,
),
),
],
),
]
migration = Migration("name", app_label)
migration.operations = operations
with connection.schema_editor(atomic=True) as editor:
project_state = migration.apply(ProjectState(), editor)

alter_fk_car1 = migrations.AlterField(
model_name="car1",
name="brand",
field=models.ForeignKey(
to="test_drop_foreignkey.brand",
on_delete=django.db.models.deletion.CASCADE,
db_constraint=False,
related_name="car1",
),
)
alter_fk_car2 = migrations.AlterField(
model_name="car2",
name="brand",
field=models.ForeignKey(
to="test_drop_foreignkey.brand",
on_delete=django.db.models.deletion.CASCADE,
db_constraint=False,
related_name="car2",
db_comment=""
),
)
alter_fk_car3 = migrations.AlterField(
model_name="car3",
name="brand",
field=models.ForeignKey(
to="test_drop_foreignkey.brand",
on_delete=django.db.models.deletion.CASCADE,
db_constraint=False,
related_name="car3",
db_comment="fk_on_delete_keep_index"
),
)
new_state = project_state.clone()
with connection.schema_editor(atomic=True) as editor:
alter_fk_car1.state_forwards("test_drop_foreignkey", new_state)
alter_fk_car1.database_forwards(
"test_drop_foreignkey", editor, project_state, new_state
)
car_index = self._find_key_with_type_idx(
get_constraints(
table_name=new_state.apps.get_model(
"test_drop_foreignkey", "car1"
)._meta.db_table
)
)
# Test alter foreignkey without db_comment field
# The index should be dropped (keep the old behavior)
self.assertIsNone(car_index)

project_state = new_state
new_state = new_state.clone()
with connection.schema_editor(atomic=True) as editor:
alter_fk_car2.state_forwards("test_drop_foreignkey", new_state)
alter_fk_car2.database_forwards(
"test_drop_foreignkey", editor, project_state, new_state
)
car_index = self._find_key_with_type_idx(
get_constraints(
table_name=new_state.apps.get_model(
"test_drop_foreignkey", "car2"
)._meta.db_table
)
)
# Test alter fk with empty db_comment
self.assertIsNone(car_index)

project_state = new_state
new_state = new_state.clone()
with connection.schema_editor(atomic=True) as editor:
alter_fk_car3.state_forwards("test_drop_foreignkey", new_state)
alter_fk_car3.database_forwards(
"test_drop_foreignkey", editor, project_state, new_state
)
car_index = self._find_key_with_type_idx(
get_constraints(
table_name=new_state.apps.get_model(
"test_drop_foreignkey", "car3"
)._meta.db_table
)
)
# Test alter fk with fk_on_delete_keep_index in db_comment
# Index should be preserved in this case
self.assertIsNotNone(car_index)

0 comments on commit cdad07b

Please sign in to comment.