Skip to content

Closes #1: Enable branch migrations #264

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 18 commits into
base: feature
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 16 additions & 11 deletions docs/models/branch.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,22 @@ The unique, randomly-generated identifier of the PostgreSQL schema which houses

The current status of the branch. This must be one of the following values.

| Status | Description |
|--------------|-------------------------------------------------------------------|
| New | Not yet provisioned in the database |
| Provisioning | A job is running to provision the branch's PostgreSQL schema |
| Ready | The branch is healthy and ready to be synchronized or merged |
| Syncing | A job is running to synchronize changes from main into the branch |
| Merging | A job is running to merge changes from the branch into main |
| Reverting | A job is running to revert previously merged changes in main |
| Merged | Changes from this branch have been successfully merged into main |
| Archived | A merged branch which has been deprovisioned in the database |
| Failed | Provisioning the schema for this branch has failed |
| Status | Description |
|--------------|--------------------------------------------------------------------|
| New | Not yet provisioned in the database |
| Provisioning | A job is running to provision the branch's PostgreSQL schema |
| Ready | The branch is healthy and ready to be synchronized or merged |
| Syncing | A job is running to synchronize changes from main into the branch |
| Migrating | A job is running to apply database migrations to the branch schema |
| Merging | A job is running to merge changes from the branch into main |
| Reverting | A job is running to revert previously merged changes in main |
| Merged | Changes from this branch have been successfully merged into main |
| Archived | A merged branch which has been deprovisioned in the database |
| Failed | Provisioning the schema for this branch has failed |

### Applied Migrations

A list of database migrations which have been applied to the branch since it was created. This may be necessary to keep open branches up to date during NetBox upgrades.

### Last Sync

Expand Down
1 change: 1 addition & 0 deletions netbox_branching/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class AppConfig(PluginConfig):
# Branch action validators
'sync_validators': [],
'merge_validators': [],
'migrate_validators': [],
'revert_validators': [],
'archive_validators': [],
}
Expand Down
5 changes: 5 additions & 0 deletions netbox_branching/choices.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ class BranchStatusChoices(ChoiceSet):
PROVISIONING = 'provisioning'
READY = 'ready'
SYNCING = 'syncing'
MIGRATING = 'migrating'
MERGING = 'merging'
REVERTING = 'reverting'
MERGED = 'merged'
Expand All @@ -19,6 +20,7 @@ class BranchStatusChoices(ChoiceSet):
(PROVISIONING, _('Provisioning'), 'orange'),
(READY, _('Ready'), 'green'),
(SYNCING, _('Syncing'), 'orange'),
(MIGRATING, _('Migrating'), 'orange'),
(MERGING, _('Merging'), 'orange'),
(REVERTING, _('Reverting'), 'orange'),
(MERGED, _('Merged'), 'blue'),
Expand All @@ -29,6 +31,7 @@ class BranchStatusChoices(ChoiceSet):
TRANSITIONAL = (
PROVISIONING,
SYNCING,
MIGRATING,
MERGING,
REVERTING,
)
Expand All @@ -43,13 +46,15 @@ class BranchStatusChoices(ChoiceSet):
class BranchEventTypeChoices(ChoiceSet):
PROVISIONED = 'provisioned'
SYNCED = 'synced'
MIGRATED = 'migrated'
MERGED = 'merged'
REVERTED = 'reverted'
ARCHIVED = 'archived'

CHOICES = (
(PROVISIONED, _('Provisioned'), 'green'),
(SYNCED, _('Synced'), 'cyan'),
(MIGRATED, _('Migrated'), 'purple'),
(MERGED, _('Merged'), 'blue'),
(REVERTED, _('Reverted'), 'orange'),
(ARCHIVED, _('Archived'), 'gray'),
Expand Down
2 changes: 2 additions & 0 deletions netbox_branching/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
BRANCH_ACTIONS = (
'sync',
'merge',
'migrate',
'revert',
'archive',
)
Expand All @@ -20,6 +21,7 @@
INCLUDE_MODELS = (
'dcim.cablepath',
'extras.cachedvalue',
'tenancy.contactgroupmembership', # Fix for NetBox v4.3.0
)

# Models for which branching support is explicitly disabled
Expand Down
17 changes: 16 additions & 1 deletion netbox_branching/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ class BranchAwareRouter:
A Django database router that returns the appropriate connection/schema for
the active branch (if any).
"""
connection_prefix = 'schema_'

def _get_db(self, model, **hints):
# Warn & exit if branching support has not yet been initialized
if 'branching' not in registry['model_features']:
Expand All @@ -28,7 +30,7 @@ def _get_db(self, model, **hints):

# Return the schema for the active branch (if any)
if branch := active_branch.get():
return f'schema_{branch.schema_name}'
return f'{self.connection_prefix}{branch.schema_name}'

def db_for_read(self, model, **hints):
return self._get_db(model, **hints)
Expand All @@ -39,3 +41,16 @@ def db_for_write(self, model, **hints):
def allow_relation(self, obj1, obj2, **hints):
# Permit relations from the branch schema to the main schema
return True

def allow_migrate(self, db, app_label, model_name=None, **hints):
# This router has no opinion on non-branch connections
if not db.startswith(self.connection_prefix):
return

# Disallow migrations for models from the plugin itself within a branch
if app_label == 'netbox_branching':
return False

# Disallow migrations for models which don't support branching
if model_name and model_name not in registry['model_features']['branching'].get(app_label, []):
return False
11 changes: 11 additions & 0 deletions netbox_branching/forms/misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
__all__ = (
'BranchActionForm',
'ConfirmationForm',
'MigrateBranchForm',
)


Expand Down Expand Up @@ -47,3 +48,13 @@ class ConfirmationForm(forms.Form):
required=True,
label=_('Confirm')
)


class MigrateBranchForm(forms.Form):
confirm = forms.BooleanField(
required=True,
label=_('Confirm migrations'),
help_text=_(
'All migrations will be applied in order. <strong>Migrations cannot be reversed once applied.</strong>'
)
)
22 changes: 22 additions & 0 deletions netbox_branching/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

__all__ = (
'MergeBranchJob',
'MigrateBranchJob',
'ProvisionBranchJob',
'RevertBranchJob',
'SyncBranchJob',
Expand Down Expand Up @@ -131,3 +132,24 @@ def run(self, commit=True, *args, **kwargs):
branch.revert(user=self.job.user, commit=commit)
except AbortTransaction:
logger.info("Dry run completed; rolling back changes")


class MigrateBranchJob(JobRunner):
"""
Apply any outstanding database migrations from the main schema to the Branch.
"""
class Meta:
name = 'Migrate branch'

def run(self, *args, **kwargs):
# Initialize logging
logger = logging.getLogger('netbox_branching.branch.migrate')
logger.setLevel(logging.DEBUG)
logger.addHandler(ListHandler(queue=get_job_log(self.job)))

# Migrate the Branch
try:
branch = self.job.object
branch.migrate(user=self.job.user)
except AbortTransaction:
logger.info("Dry run completed; rolling back changes")
22 changes: 22 additions & 0 deletions netbox_branching/migrations/0005_branch_applied_migrations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import django.contrib.postgres.fields
from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('netbox_branching', '0004_copy_migrations'),
]

operations = [
migrations.AddField(
model_name='branch',
name='applied_migrations',
field=django.contrib.postgres.fields.ArrayField(
base_field=models.CharField(max_length=200),
blank=True,
default=list,
size=None
),
),
]
113 changes: 108 additions & 5 deletions netbox_branching/models/branches.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
import importlib
import logging
import random
import string
from collections import defaultdict
from datetime import timedelta
from functools import cached_property, partial

from django.conf import settings
from django.contrib.auth import get_user_model
from django.contrib.postgres.fields import ArrayField
from django.core.exceptions import ValidationError
from django.db import DEFAULT_DB_ALIAS, connection, models, transaction
from django.db import DEFAULT_DB_ALIAS, connection, connections, models, transaction
from django.db.migrations.executor import MigrationExecutor
from django.db.models.signals import post_save
from django.db.utils import ProgrammingError
from django.test import RequestFactory
Expand Down Expand Up @@ -69,6 +73,12 @@ class Branch(JobsMixin, PrimaryModel):
default=BranchStatusChoices.NEW,
editable=False
)
applied_migrations = ArrayField(
verbose_name=_('applied migrations'),
base_field=models.CharField(max_length=200),
blank=True,
default=list,
)
last_sync = models.DateTimeField(
blank=True,
null=True,
Expand All @@ -89,6 +99,7 @@ class Branch(JobsMixin, PrimaryModel):

_preaction_validators = {
'sync': set(),
'migrate': set(),
'merge': set(),
'revert': set(),
'archive': set(),
Expand Down Expand Up @@ -121,7 +132,7 @@ def is_active(self):

@property
def ready(self):
return self.status == BranchStatusChoices.READY
return self.status == BranchStatusChoices.READY and not self.pending_migrations

@property
def merged(self):
Expand Down Expand Up @@ -277,6 +288,37 @@ def is_stale(self):
return False
return self.last_sync < timezone.now() - timedelta(days=changelog_retention)

#
# Migration handling
#

@cached_property
def pending_migrations(self):
"""
Return a list of database migrations which have been applied in main but not in the branch.
"""
connection = connections[self.connection_name]
executor = MigrationExecutor(connection)
targets = executor.loader.graph.leaf_nodes()
plan = executor.migration_plan(targets)
return [
(migration.app_label, migration.name) for migration, backward in plan
]

@cached_property
def migrators(self):
"""
Return a dictionary mapping object types to a list of migrators to be run when syncing, merging, or
reverting a Branch.
"""
migrators = defaultdict(list)
for migration in self.applied_migrations:
app_label, name = migration.split('.')
module = importlib.import_module(f'{app_label}.migrations.{name}')
for object_type, migrator in getattr(module, 'objectchange_migrators', {}).items():
migrators[object_type].append(migrator)
return migrators

#
# Branch action indicators
#
Expand Down Expand Up @@ -306,6 +348,13 @@ def can_sync(self):
"""
return self._can_do_action('sync')

@cached_property
def can_migrate(self):
"""
Indicates whether the branch can be migrated.
"""
return self._can_do_action('migrate')

@cached_property
def can_merge(self):
"""
Expand Down Expand Up @@ -367,7 +416,7 @@ def sync(self, user, commit=True):
# Apply each change from the main schema
for change in changes:
models.add(change.changed_object_type.model_class())
change.apply(using=self.connection_name, logger=logger)
change.apply(self, using=self.connection_name, logger=logger)
if not commit:
raise AbortTransaction()

Expand Down Expand Up @@ -398,6 +447,60 @@ def sync(self, user, commit=True):

sync.alters_data = True

def migrate(self, user):
"""
Apply any pending database migrations to the branch schema.
"""
logger = logging.getLogger('netbox_branching.branch.migrate')
logger.info(f'Migrating branch {self} ({self.schema_name})')

def migration_progress_callback(action, migration=None, fake=False):
if action == "apply_start":
logger.info(f"Applying migration {migration}")
elif action == "apply_success" and migration is not None:
self.applied_migrations.append(migration)

# Emit pre-migration signal
pre_migrate.send(sender=self.__class__, branch=self, user=user)

# Set Branch status
logger.debug(f"Setting branch status to {BranchStatusChoices.MIGRATING}")
Branch.objects.filter(pk=self.pk).update(status=BranchStatusChoices.MIGRATING)

# Generate migration plan & apply any migrations
connection = connections[self.connection_name]
executor = MigrationExecutor(connection, progress_callback=migration_progress_callback)
targets = executor.loader.graph.leaf_nodes()
if plan := executor.migration_plan(targets):
try:
# Run migrations
executor.migrate(targets, plan)
except Exception as e:
if err_message := str(e):
logger.error(err_message)
# Save applied migrations & reset status
self.status = BranchStatusChoices.READY
self.save()
raise e
else:
logger.info("Found no migrations to apply")

# Reset Branch status to ready
logger.debug(f"Setting branch status to {BranchStatusChoices.READY}")
self.status = BranchStatusChoices.READY
self.save()

# Record a branch event for the migration
logger.debug(f"Recording branch event: {BranchEventTypeChoices.MIGRATED}")
BranchEvent.objects.create(branch=self, user=user, type=BranchEventTypeChoices.MIGRATED)

# Emit post-migration signal
post_migrate.send(sender=self.__class__, branch=self, user=user)

logger.info('Migration completed')

migrate.alters_data = True

def merge(self, user, commit=True):
"""
Apply all changes in the Branch to the main schema by replaying them in
Expand Down Expand Up @@ -442,7 +545,7 @@ def merge(self, user, commit=True):
with event_tracking(request):
request.id = change.request_id
request.user = change.user
change.apply(using=DEFAULT_DB_ALIAS, logger=logger)
change.apply(self, using=DEFAULT_DB_ALIAS, logger=logger)
if not commit:
raise AbortTransaction()

Expand Down Expand Up @@ -522,7 +625,7 @@ def revert(self, user, commit=True):
with event_tracking(request):
request.id = change.request_id
request.user = change.user
change.undo(logger=logger)
change.undo(self, logger=logger)
if not commit:
raise AbortTransaction()

Expand Down
Loading