Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Vreplication Cross-cell source: add support for cells in reshard #6456

Merged
merged 4 commits into from
Jul 22, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Vrepl Crosscell: add cell/tablettypes to reshard command
Signed-off-by: Rohit Nayak <rohit@planetscale.com>
  • Loading branch information
rohit-nayak-ps committed Jul 18, 2020
commit 5ed5fa04eeeee11553782e453b76a94c318eac90
4 changes: 3 additions & 1 deletion go/vt/vtctl/vtctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -1851,6 +1851,8 @@ func commandValidateKeyspace(ctx context.Context, wr *wrangler.Wrangler, subFlag
}

func commandReshard(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
cell := subFlags.String("cell", "", "Cell to replicate from.")
tabletTypes := subFlags.String("tablet_types", "", "Source tablet types to replicate from.")
skipSchemaCopy := subFlags.Bool("skip_schema_copy", false, "Skip copying of schema to targets")
if err := subFlags.Parse(args); err != nil {
return err
Expand All @@ -1864,7 +1866,7 @@ func commandReshard(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.F
}
source := strings.Split(subFlags.Arg(1), ",")
target := strings.Split(subFlags.Arg(2), ",")
return wr.Reshard(ctx, keyspace, workflow, source, target, *skipSchemaCopy)
return wr.Reshard(ctx, keyspace, workflow, source, target, *skipSchemaCopy, *cell, *tabletTypes)
}

func commandMoveTables(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
Expand Down
12 changes: 8 additions & 4 deletions go/vt/wrangler/resharder.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ type resharder struct {
targetMasters map[string]*topo.TabletInfo
vschema *vschemapb.Keyspace
refStreams map[string]*refStream
cell string
tabletTypes string
}

type refStream struct {
Expand All @@ -57,12 +59,12 @@ type refStream struct {
}

// Reshard initiates a resharding workflow.
func (wr *Wrangler) Reshard(ctx context.Context, keyspace, workflow string, sources, targets []string, skipSchemaCopy bool) error {
func (wr *Wrangler) Reshard(ctx context.Context, keyspace, workflow string, sources, targets []string, skipSchemaCopy bool, cell, tabletTypes string) error {
if err := wr.validateNewWorkflow(ctx, keyspace, workflow); err != nil {
return err
}

rs, err := wr.buildResharder(ctx, keyspace, workflow, sources, targets)
rs, err := wr.buildResharder(ctx, keyspace, workflow, sources, targets, cell, tabletTypes)
if err != nil {
return vterrors.Wrap(err, "buildResharder")
}
Expand All @@ -80,13 +82,15 @@ func (wr *Wrangler) Reshard(ctx context.Context, keyspace, workflow string, sour
return nil
}

func (wr *Wrangler) buildResharder(ctx context.Context, keyspace, workflow string, sources, targets []string) (*resharder, error) {
func (wr *Wrangler) buildResharder(ctx context.Context, keyspace, workflow string, sources, targets []string, cell, tabletTypes string) (*resharder, error) {
rs := &resharder{
wr: wr,
keyspace: keyspace,
workflow: workflow,
sourceMasters: make(map[string]*topo.TabletInfo),
targetMasters: make(map[string]*topo.TabletInfo),
cell: cell,
tabletTypes: tabletTypes,
}
for _, shard := range sources {
si, err := wr.ts.GetShard(ctx, keyspace, shard)
Expand Down Expand Up @@ -298,7 +302,7 @@ func (rs *resharder) createStreams(ctx context.Context) error {
Shard: source.ShardName(),
Filter: filter,
}
ig.AddRow(rs.workflow, bls, "", "", "")
ig.AddRow(rs.workflow, bls, "", rs.cell, rs.tabletTypes)
}

for _, rstream := range rs.refStreams {
Expand Down
34 changes: 17 additions & 17 deletions go/vt/wrangler/resharder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func TestResharderOneToMany(t *testing.T) {
env.tmc.expectVRQuery(200, "update _vt.vreplication set state='Running' where db_name='vt_ks'", &sqltypes.Result{})
env.tmc.expectVRQuery(210, "update _vt.vreplication set state='Running' where db_name='vt_ks'", &sqltypes.Result{})

err := env.wr.Reshard(context.Background(), env.keyspace, env.workflow, env.sources, env.targets, true)
err := env.wr.Reshard(context.Background(), env.keyspace, env.workflow, env.sources, env.targets, true, "", "")
assert.NoError(t, err)
env.tmc.verifyQueries(t)
}
Expand Down Expand Up @@ -101,7 +101,7 @@ func TestResharderManyToOne(t *testing.T) {

env.tmc.expectVRQuery(200, "update _vt.vreplication set state='Running' where db_name='vt_ks'", &sqltypes.Result{})

err := env.wr.Reshard(context.Background(), env.keyspace, env.workflow, env.sources, env.targets, true)
err := env.wr.Reshard(context.Background(), env.keyspace, env.workflow, env.sources, env.targets, true, "", "")
assert.NoError(t, err)
env.tmc.verifyQueries(t)
}
Expand Down Expand Up @@ -142,7 +142,7 @@ func TestResharderManyToMany(t *testing.T) {
env.tmc.expectVRQuery(200, "update _vt.vreplication set state='Running' where db_name='vt_ks'", &sqltypes.Result{})
env.tmc.expectVRQuery(210, "update _vt.vreplication set state='Running' where db_name='vt_ks'", &sqltypes.Result{})

err := env.wr.Reshard(context.Background(), env.keyspace, env.workflow, env.sources, env.targets, true)
err := env.wr.Reshard(context.Background(), env.keyspace, env.workflow, env.sources, env.targets, true, "", "")
assert.NoError(t, err)
env.tmc.verifyQueries(t)
}
Expand Down Expand Up @@ -195,7 +195,7 @@ func TestResharderOneRefTable(t *testing.T) {
env.tmc.expectVRQuery(200, "update _vt.vreplication set state='Running' where db_name='vt_ks'", &sqltypes.Result{})
env.tmc.expectVRQuery(210, "update _vt.vreplication set state='Running' where db_name='vt_ks'", &sqltypes.Result{})

err := env.wr.Reshard(context.Background(), env.keyspace, env.workflow, env.sources, env.targets, true)
err := env.wr.Reshard(context.Background(), env.keyspace, env.workflow, env.sources, env.targets, true, "", "")
assert.NoError(t, err)
env.tmc.verifyQueries(t)
}
Expand Down Expand Up @@ -263,7 +263,7 @@ func TestResharderOneRefStream(t *testing.T) {
env.tmc.expectVRQuery(200, "update _vt.vreplication set state='Running' where db_name='vt_ks'", &sqltypes.Result{})
env.tmc.expectVRQuery(210, "update _vt.vreplication set state='Running' where db_name='vt_ks'", &sqltypes.Result{})

err := env.wr.Reshard(context.Background(), env.keyspace, env.workflow, env.sources, env.targets, true)
err := env.wr.Reshard(context.Background(), env.keyspace, env.workflow, env.sources, env.targets, true, "", "")
assert.NoError(t, err)
env.tmc.verifyQueries(t)
}
Expand Down Expand Up @@ -340,7 +340,7 @@ func TestResharderNoRefStream(t *testing.T) {
env.tmc.expectVRQuery(200, "update _vt.vreplication set state='Running' where db_name='vt_ks'", &sqltypes.Result{})
env.tmc.expectVRQuery(210, "update _vt.vreplication set state='Running' where db_name='vt_ks'", &sqltypes.Result{})

err := env.wr.Reshard(context.Background(), env.keyspace, env.workflow, env.sources, env.targets, true)
err := env.wr.Reshard(context.Background(), env.keyspace, env.workflow, env.sources, env.targets, true, "", "")
assert.NoError(t, err)
env.tmc.verifyQueries(t)
}
Expand Down Expand Up @@ -384,7 +384,7 @@ func TestResharderCopySchema(t *testing.T) {
env.tmc.expectVRQuery(200, "update _vt.vreplication set state='Running' where db_name='vt_ks'", &sqltypes.Result{})
env.tmc.expectVRQuery(210, "update _vt.vreplication set state='Running' where db_name='vt_ks'", &sqltypes.Result{})

err := env.wr.Reshard(context.Background(), env.keyspace, env.workflow, env.sources, env.targets, false)
err := env.wr.Reshard(context.Background(), env.keyspace, env.workflow, env.sources, env.targets, false, "", "")
assert.NoError(t, err)
env.tmc.verifyQueries(t)
}
Expand Down Expand Up @@ -412,7 +412,7 @@ func TestResharderDupWorkflow(t *testing.T) {
)
env.tmc.expectVRQuery(210, fmt.Sprintf("select 1 from _vt.vreplication where db_name='vt_%s' and workflow='%s'", env.keyspace, env.workflow), result)

err := env.wr.Reshard(context.Background(), env.keyspace, env.workflow, env.sources, env.targets, true)
err := env.wr.Reshard(context.Background(), env.keyspace, env.workflow, env.sources, env.targets, true, "", "")
assert.EqualError(t, err, "workflow resharderTest already exists in keyspace ks")
env.tmc.verifyQueries(t)
}
Expand All @@ -434,19 +434,19 @@ func TestResharderServingState(t *testing.T) {
env.tmc.expectVRQuery(100, fmt.Sprintf("select 1 from _vt.vreplication where db_name='vt_%s' and workflow='%s'", env.keyspace, env.workflow), &sqltypes.Result{})
env.tmc.expectVRQuery(200, fmt.Sprintf("select 1 from _vt.vreplication where db_name='vt_%s' and workflow='%s'", env.keyspace, env.workflow), &sqltypes.Result{})
env.tmc.expectVRQuery(210, fmt.Sprintf("select 1 from _vt.vreplication where db_name='vt_%s' and workflow='%s'", env.keyspace, env.workflow), &sqltypes.Result{})
err := env.wr.Reshard(context.Background(), env.keyspace, env.workflow, []string{"-80"}, nil, true)
err := env.wr.Reshard(context.Background(), env.keyspace, env.workflow, []string{"-80"}, nil, true, "", "")
assert.EqualError(t, err, "buildResharder: source shard -80 is not in serving state")

env.tmc.expectVRQuery(100, fmt.Sprintf("select 1 from _vt.vreplication where db_name='vt_%s' and workflow='%s'", env.keyspace, env.workflow), &sqltypes.Result{})
env.tmc.expectVRQuery(200, fmt.Sprintf("select 1 from _vt.vreplication where db_name='vt_%s' and workflow='%s'", env.keyspace, env.workflow), &sqltypes.Result{})
env.tmc.expectVRQuery(210, fmt.Sprintf("select 1 from _vt.vreplication where db_name='vt_%s' and workflow='%s'", env.keyspace, env.workflow), &sqltypes.Result{})
err = env.wr.Reshard(context.Background(), env.keyspace, env.workflow, []string{"0"}, []string{"0"}, true)
err = env.wr.Reshard(context.Background(), env.keyspace, env.workflow, []string{"0"}, []string{"0"}, true, "", "")
assert.EqualError(t, err, "buildResharder: target shard 0 is in serving state")

env.tmc.expectVRQuery(100, fmt.Sprintf("select 1 from _vt.vreplication where db_name='vt_%s' and workflow='%s'", env.keyspace, env.workflow), &sqltypes.Result{})
env.tmc.expectVRQuery(200, fmt.Sprintf("select 1 from _vt.vreplication where db_name='vt_%s' and workflow='%s'", env.keyspace, env.workflow), &sqltypes.Result{})
env.tmc.expectVRQuery(210, fmt.Sprintf("select 1 from _vt.vreplication where db_name='vt_%s' and workflow='%s'", env.keyspace, env.workflow), &sqltypes.Result{})
err = env.wr.Reshard(context.Background(), env.keyspace, env.workflow, []string{"0"}, []string{"-80"}, true)
err = env.wr.Reshard(context.Background(), env.keyspace, env.workflow, []string{"0"}, []string{"-80"}, true, "", "")
assert.EqualError(t, err, "buildResharder: ValidateForReshard: source and target keyranges don't match: - vs -80")
}

Expand Down Expand Up @@ -476,7 +476,7 @@ func TestResharderTargetAlreadyResharding(t *testing.T) {
env.tmc.expectVRQuery(200, fmt.Sprintf("select 1 from _vt.vreplication where db_name='vt_%s'", env.keyspace), result)
env.tmc.expectVRQuery(210, fmt.Sprintf("select 1 from _vt.vreplication where db_name='vt_%s'", env.keyspace), &sqltypes.Result{})

err := env.wr.Reshard(context.Background(), env.keyspace, env.workflow, env.sources, env.targets, true)
err := env.wr.Reshard(context.Background(), env.keyspace, env.workflow, env.sources, env.targets, true, "", "")
assert.EqualError(t, err, "buildResharder: validateTargets: some streams already exist in the target shards, please clean them up and retry the command")
env.tmc.verifyQueries(t)
}
Expand Down Expand Up @@ -524,7 +524,7 @@ func TestResharderUnnamedStream(t *testing.T) {
)
env.tmc.expectVRQuery(100, fmt.Sprintf("select workflow, source, cell, tablet_types from _vt.vreplication where db_name='vt_%s'", env.keyspace), result)

err := env.wr.Reshard(context.Background(), env.keyspace, env.workflow, env.sources, env.targets, true)
err := env.wr.Reshard(context.Background(), env.keyspace, env.workflow, env.sources, env.targets, true, "", "")
assert.EqualError(t, err, "buildResharder: readRefStreams: VReplication streams must have named workflows for migration: shard: ks:0")
env.tmc.verifyQueries(t)
}
Expand Down Expand Up @@ -588,7 +588,7 @@ func TestResharderMismatchedRefStreams(t *testing.T) {
)
env.tmc.expectVRQuery(110, fmt.Sprintf("select workflow, source, cell, tablet_types from _vt.vreplication where db_name='vt_%s'", env.keyspace), result2)

err := env.wr.Reshard(context.Background(), env.keyspace, env.workflow, env.sources, env.targets, true)
err := env.wr.Reshard(context.Background(), env.keyspace, env.workflow, env.sources, env.targets, true, "", "")
want := "buildResharder: readRefStreams: streams are mismatched across source shards"
if err == nil || !strings.HasPrefix(err.Error(), want) {
t.Errorf("Reshard err: %v, want %v", err, want)
Expand Down Expand Up @@ -628,7 +628,7 @@ func TestResharderTableNotInVSchema(t *testing.T) {
)
env.tmc.expectVRQuery(100, fmt.Sprintf("select workflow, source, cell, tablet_types from _vt.vreplication where db_name='vt_%s'", env.keyspace), result)

err := env.wr.Reshard(context.Background(), env.keyspace, env.workflow, env.sources, env.targets, true)
err := env.wr.Reshard(context.Background(), env.keyspace, env.workflow, env.sources, env.targets, true, "", "")
assert.EqualError(t, err, "buildResharder: readRefStreams: blsIsReference: table t1 not found in vschema")
env.tmc.verifyQueries(t)
}
Expand Down Expand Up @@ -692,7 +692,7 @@ func TestResharderMixedTablesOrder1(t *testing.T) {
)
env.tmc.expectVRQuery(100, fmt.Sprintf("select workflow, source, cell, tablet_types from _vt.vreplication where db_name='vt_%s'", env.keyspace), result)

err := env.wr.Reshard(context.Background(), env.keyspace, env.workflow, env.sources, env.targets, true)
err := env.wr.Reshard(context.Background(), env.keyspace, env.workflow, env.sources, env.targets, true, "", "")
want := "buildResharder: readRefStreams: blsIsReference: cannot reshard streams with a mix of reference and sharded tables"
if err == nil || !strings.HasPrefix(err.Error(), want) {
t.Errorf("Reshard err: %v, want %v", err.Error(), want)
Expand Down Expand Up @@ -759,7 +759,7 @@ func TestResharderMixedTablesOrder2(t *testing.T) {
)
env.tmc.expectVRQuery(100, fmt.Sprintf("select workflow, source, cell, tablet_types from _vt.vreplication where db_name='vt_%s'", env.keyspace), result)

err := env.wr.Reshard(context.Background(), env.keyspace, env.workflow, env.sources, env.targets, true)
err := env.wr.Reshard(context.Background(), env.keyspace, env.workflow, env.sources, env.targets, true, "", "")
want := "buildResharder: readRefStreams: blsIsReference: cannot reshard streams with a mix of reference and sharded tables"
if err == nil || !strings.HasPrefix(err.Error(), want) {
t.Errorf("Reshard err: %v, want %v", err.Error(), want)
Expand Down