Skip to content

Commit

Permalink
Port over changes from vitessio#13238
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
mattlord committed Jul 5, 2023
1 parent a849a54 commit ad4f20d
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 0 deletions.
7 changes: 7 additions & 0 deletions go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2473,6 +2473,13 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit
return 0, nil, err
}

ts.Logger().Infof("Resetting sequences")
if err := sw.resetSequences(ctx); err != nil {
ts.Logger().Errorf("resetSequences failed: %v", err)
sw.cancelMigration(ctx, sm)
return 0, nil, err
}

ts.Logger().Infof("Creating reverse streams")
if err := sw.createReverseVReplication(ctx); err != nil {
ts.Logger().Errorf("createReverseVReplication failed: %v", err)
Expand Down
4 changes: 4 additions & 0 deletions go/vt/vtctl/workflow/switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,3 +137,7 @@ func (r *switcher) dropTargetShards(ctx context.Context) error {
func (r *switcher) logs() *[]string {
return nil
}

func (r *switcher) resetSequences(ctx context.Context) error {
return r.ts.resetSequences(ctx)
}
13 changes: 13 additions & 0 deletions go/vt/vtctl/workflow/switcher_dry_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,3 +369,16 @@ func (dr *switcherDryRun) dropTargetShards(ctx context.Context) error {

return nil
}

func (dr *switcherDryRun) resetSequences(ctx context.Context) error {
var err error
mustReset := false
if mustReset, err = dr.ts.mustResetSequences(ctx); err != nil {
return err
}
if !mustReset {
return nil
}
dr.drLog.Log("The sequence caches will be reset on the source since sequence tables are being moved")
return nil
}
1 change: 1 addition & 0 deletions go/vt/vtctl/workflow/switcher_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,5 @@ type iswitcher interface {
deleteShardRoutingRules(ctx context.Context) error
addParticipatingTablesToKeyspace(ctx context.Context, keyspace, tableSpecs string) error
logs() *[]string
resetSequences(ctx context.Context) error
}
48 changes: 48 additions & 0 deletions go/vt/vtctl/workflow/traffic_switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -1169,3 +1169,51 @@ func (ts *trafficSwitcher) gatherPositions(ctx context.Context) error {
return err
})
}

func (ts *trafficSwitcher) isSequenceParticipating(ctx context.Context) (bool, error) {
vschema, err := ts.TopoServer().GetVSchema(ctx, ts.targetKeyspace)
if err != nil {
return false, err
}
if vschema == nil || vschema.Tables == nil {
return false, nil
}
sequenceFound := false
for _, table := range ts.Tables() {
vs, ok := vschema.Tables[table]
if !ok || vs == nil {
continue
}
if vs.Type == vindexes.TypeSequence {
sequenceFound = true
break
}
}
return sequenceFound, nil
}

func (ts *trafficSwitcher) mustResetSequences(ctx context.Context) (bool, error) {
switch ts.workflowType {
case binlogdatapb.VReplicationWorkflowType_Migrate,
binlogdatapb.VReplicationWorkflowType_MoveTables:
return ts.isSequenceParticipating(ctx)
default:
return false, nil
}
}

func (ts *trafficSwitcher) resetSequences(ctx context.Context) error {
var err error
mustReset := false
if mustReset, err = ts.mustResetSequences(ctx); err != nil {
return err
}
if !mustReset {
return nil
}
return ts.ForAllSources(func(source *MigrationSource) error {
ts.Logger().Infof("Resetting sequences for source shard %s.%s on tablet %s",
source.GetShard().Keyspace(), source.GetShard().ShardName(), source.GetPrimary().String())
return ts.TabletManagerClient().ResetSequences(ctx, source.GetPrimary().Tablet, ts.Tables())
})
}

0 comments on commit ad4f20d

Please sign in to comment.