Skip to content

Commit

Permalink
Merge pull request #5968 from planetscale/rn-vrepl-migrate-rename
Browse files Browse the repository at this point in the history
Vreplication workflow renaming
  • Loading branch information
sougou authored Mar 28, 2020
2 parents ffc8279 + 1f82d5f commit eb40e46
Show file tree
Hide file tree
Showing 14 changed files with 560 additions and 561 deletions.
2 changes: 1 addition & 1 deletion go/test/endtoend/vreplication/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ func (vc *VitessCluster) getVttabletsInKeyspace(t *testing.T, cell *Cell, ksName
tablets := make(map[string]*cluster.VttabletProcess)
for _, shard := range keyspace.Shards {
for _, tablet := range shard.Tablets {
if tablet.Vttablet.GetTabletStatus() == "SERVING" && strings.ToLower(tablet.Vttablet.VreplicationTabletType) == strings.ToLower(tabletType) {
if tablet.Vttablet.GetTabletStatus() == "SERVING" && strings.EqualFold(tablet.Vttablet.VreplicationTabletType, tabletType) {
fmt.Printf("Serving status of tablet %s is %s, %s\n", tablet.Name, tablet.Vttablet.ServingStatus, tablet.Vttablet.GetTabletStatus())
tablets[tablet.Name] = tablet.Vttablet
}
Expand Down
9 changes: 2 additions & 7 deletions go/test/endtoend/vreplication/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,7 @@ func validateThatQueryExecutesOnTablet(t *testing.T, conn *mysql.Conn, tablet *c
qr := execVtgateQuery(t, conn, ksName, query)
assert.NotNil(t, qr)
newCount := getQueryCount(tablet.QueryzURL, matchQuery)
if newCount == count+1 {
return true
}
return false
return newCount == count+1
}

func getQueryCount(url string, query string) int {
Expand Down Expand Up @@ -173,9 +170,7 @@ func getQueryCount(url string, query string) int {
foundQuery := re.ReplaceAllLiteralString(row[queryIndex], "")
cleanQuery := re.ReplaceAllLiteralString(query, "")
if foundQuery == cleanQuery {
count, err = strconv.Atoi(row[countIndex])
} else {
//fmt.Printf(">> %s %s %d %d\n", foundQuery, cleanQuery, len(foundQuery), len(cleanQuery))
count, _ = strconv.Atoi(row[countIndex])
}
}
return count
Expand Down
118 changes: 58 additions & 60 deletions go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func insertMoreCustomers(t *testing.T, numCustomers int) {
sql := "insert into customer (name) values "
i := 0
for i < numCustomers {
i += 1
i++
sql += fmt.Sprintf("('customer%d')", i)
if i != numCustomers {
sql += ","
Expand All @@ -117,20 +117,20 @@ func shardCustomer(t *testing.T, testReverse bool) {
t.Fatal(err)
}

if err := vc.VtctlClient.ExecuteCommand("Migrate", "-cell="+cell.Name, "-workflow=p2c",
if err := vc.VtctlClient.ExecuteCommand("MoveTables", "-cell="+cell.Name, "-workflow=p2c",
"-tablet_types="+"replica,rdonly", "product", "customer", "customer"); err != nil {
t.Fatalf("Migrate command failed with %+v\n", err)
t.Fatalf("MoveTables command failed with %+v\n", err)
}

customerTab1 := vc.Cells[cell.Name].Keyspaces["customer"].Shards["-80"].Tablets["zone1-200"].Vttablet
customerTab2 := vc.Cells[cell.Name].Keyspaces["customer"].Shards["80-"].Tablets["zone1-300"].Vttablet

if vc.WaitForVReplicationToCatchup(customerTab1, "p2c", "vt_customer", 1*time.Second) != nil {
t.Fatal("Migrate timed out for customer.p2c -80")
t.Fatal("MoveTables timed out for customer.p2c -80")

}
if vc.WaitForVReplicationToCatchup(customerTab2, "p2c", "vt_customer", 1*time.Second) != nil {
t.Fatal("Migrate timed out for customer.p2c 80-")
t.Fatal("MoveTables timed out for customer.p2c 80-")
}

productTab := vc.Cells[cell.Name].Keyspaces["product"].Shards["0"].Tablets["zone1-100"].Vttablet
Expand All @@ -141,17 +141,17 @@ func shardCustomer(t *testing.T, testReverse bool) {
matchInsertQuery1 := "insert into customer(cid, name) values (:vtg1, :vtg2)"
assert.True(t, validateThatQueryExecutesOnTablet(t, vtgateConn, productTab, "product", insertQuery1, matchInsertQuery1))
vdiff(t, "customer.p2c")
if output, err := vc.VtctlClient.ExecuteCommandWithOutput("MigrateReads", "-cells="+cell.Name, "-tablet_type=rdonly", "customer.p2c"); err != nil {
t.Fatalf("MigrateReads error: %s\n", output)
if output, err := vc.VtctlClient.ExecuteCommandWithOutput("SwitchReads", "-cells="+cell.Name, "-tablet_type=rdonly", "customer.p2c"); err != nil {
t.Fatalf("SwitchReads error: %s\n", output)
}
if output, err := vc.VtctlClient.ExecuteCommandWithOutput("MigrateReads", "-cells="+cell.Name, "-tablet_type=replica", "customer.p2c"); err != nil {
t.Fatalf("MigrateReads error: %s\n", output)
if output, err := vc.VtctlClient.ExecuteCommandWithOutput("SwitchReads", "-cells="+cell.Name, "-tablet_type=replica", "customer.p2c"); err != nil {
t.Fatalf("SwitchReads error: %s\n", output)
}

assert.False(t, validateThatQueryExecutesOnTablet(t, vtgateConn, productTabReplica, "customer", query, query))
assert.True(t, validateThatQueryExecutesOnTablet(t, vtgateConn, productTab, "customer", query, query))
if output, err := vc.VtctlClient.ExecuteCommandWithOutput("MigrateWrites", "customer.p2c"); err != nil {
t.Fatalf("MigrateWrites error: %s\n", output)
if output, err := vc.VtctlClient.ExecuteCommandWithOutput("SwitchWrites", "customer.p2c"); err != nil {
t.Fatalf("SwitchWrites error: %s\n", output)
}
insertQuery2 := "insert into customer(name) values('tempCustomer2')"
matchInsertQuery2 := "insert into customer(name, cid) values (:vtg1, :_cid0)"
Expand All @@ -163,14 +163,14 @@ func shardCustomer(t *testing.T, testReverse bool) {

if testReverse {
//Reverse Replicate
if output, err := vc.VtctlClient.ExecuteCommandWithOutput("MigrateReads", "-cells="+cell.Name, "-tablet_type=rdonly", "product.p2c_reverse"); err != nil {
t.Fatalf("MigrateReads error: %s\n", output)
if output, err := vc.VtctlClient.ExecuteCommandWithOutput("SwitchReads", "-cells="+cell.Name, "-tablet_type=rdonly", "product.p2c_reverse"); err != nil {
t.Fatalf("SwitchReads error: %s\n", output)
}
if output, err := vc.VtctlClient.ExecuteCommandWithOutput("MigrateReads", "-cells="+cell.Name, "-tablet_type=replica", "product.p2c_reverse"); err != nil {
t.Fatalf("MigrateReads error: %s\n", output)
if output, err := vc.VtctlClient.ExecuteCommandWithOutput("SwitchReads", "-cells="+cell.Name, "-tablet_type=replica", "product.p2c_reverse"); err != nil {
t.Fatalf("SwitchReads error: %s\n", output)
}
if output, err := vc.VtctlClient.ExecuteCommandWithOutput("MigrateWrites", "product.p2c_reverse"); err != nil {
t.Fatalf("MigrateWrites error: %s\n", output)
if output, err := vc.VtctlClient.ExecuteCommandWithOutput("SwitchWrites", "product.p2c_reverse"); err != nil {
t.Fatalf("SwitchWrites error: %s\n", output)
}
insertQuery1 = "insert into customer(cid, name) values(1002, 'tempCustomer5')"
assert.True(t, validateThatQueryExecutesOnTablet(t, vtgateConn, productTab, "product", insertQuery1, matchInsertQuery1))
Expand All @@ -180,14 +180,14 @@ func shardCustomer(t *testing.T, testReverse bool) {
assert.False(t, validateThatQueryExecutesOnTablet(t, vtgateConn, customerTab2, "customer", insertQuery1, matchInsertQuery1))

//Go forward again
if output, err := vc.VtctlClient.ExecuteCommandWithOutput("MigrateReads", "-cells="+cell.Name, "-tablet_type=rdonly", "customer.p2c"); err != nil {
t.Fatalf("MigrateReads error: %s\n", output)
if output, err := vc.VtctlClient.ExecuteCommandWithOutput("SwitchReads", "-cells="+cell.Name, "-tablet_type=rdonly", "customer.p2c"); err != nil {
t.Fatalf("SwitchReads error: %s\n", output)
}
if output, err := vc.VtctlClient.ExecuteCommandWithOutput("MigrateReads", "-cells="+cell.Name, "-tablet_type=replica", "customer.p2c"); err != nil {
t.Fatalf("MigrateReads error: %s\n", output)
if output, err := vc.VtctlClient.ExecuteCommandWithOutput("SwitchReads", "-cells="+cell.Name, "-tablet_type=replica", "customer.p2c"); err != nil {
t.Fatalf("SwitchReads error: %s\n", output)
}
if output, err := vc.VtctlClient.ExecuteCommandWithOutput("MigrateWrites", "customer.p2c"); err != nil {
t.Fatalf("MigrateWrites error: %s\n", output)
if output, err := vc.VtctlClient.ExecuteCommandWithOutput("SwitchWrites", "customer.p2c"); err != nil {
t.Fatalf("SwitchWrites error: %s\n", output)
}
insertQuery2 = "insert into customer(name) values('tempCustomer8')" //ID 103, hence due to reverse_bits in shard 80-
assert.False(t, validateThatQueryExecutesOnTablet(t, vtgateConn, productTab, "customer", insertQuery2, matchInsertQuery2))
Expand Down Expand Up @@ -271,63 +271,61 @@ func reshard(t *testing.T, ksName string, tableName string, workflow string, sou
tablets := vc.getVttabletsInKeyspace(t, cell, ksName, "master")
targetShards = "," + targetShards + ","
for _, tab := range tablets {
if strings.Index(targetShards, ","+tab.Shard+",") >= 0 {
if strings.Contains(targetShards, ","+tab.Shard+",") {
fmt.Printf("Waiting for vrepl to catch up on %s since it IS a target shard\n", tab.Shard)
if vc.WaitForVReplicationToCatchup(tab, workflow, "vt_"+ksName, 3*time.Second) != nil {
t.Fatal("Migrate timed out")
t.Fatal("Reshard timed out")
}
} else {
fmt.Printf("Not waiting for vrepl to catch up on %s since it is NOT a target shard\n", tab.Shard)
continue
}
}
vdiff(t, ksWorkflow)
if output, err := vc.VtctlClient.ExecuteCommandWithOutput("MigrateReads", "-cells="+cell.Name, "-tablet_type=rdonly", ksWorkflow); err != nil {
t.Fatalf("MigrateReads error: %s\n", output)
if output, err := vc.VtctlClient.ExecuteCommandWithOutput("SwitchReads", "-cells="+cell.Name, "-tablet_type=rdonly", ksWorkflow); err != nil {
t.Fatalf("SwitchReads error: %s\n", output)
}
if output, err := vc.VtctlClient.ExecuteCommandWithOutput("MigrateReads", "-cells="+cell.Name, "-tablet_type=replica", ksWorkflow); err != nil {
t.Fatalf("MigrateReads error: %s\n", output)
if output, err := vc.VtctlClient.ExecuteCommandWithOutput("SwitchReads", "-cells="+cell.Name, "-tablet_type=replica", ksWorkflow); err != nil {
t.Fatalf("SwitchReads error: %s\n", output)
}
if output, err := vc.VtctlClient.ExecuteCommandWithOutput("MigrateWrites", ksWorkflow); err != nil {
t.Fatalf("MigrateWrites error: %s\n", output)
if output, err := vc.VtctlClient.ExecuteCommandWithOutput("SwitchWrites", ksWorkflow); err != nil {
t.Fatalf("SwitchWrites error: %s\n", output)
}
if counts != nil {
for tabletName, count := range counts {
if tablets[tabletName] == nil {
continue
}
assert.Empty(t, validateCountInTablet(t, tablets[tabletName], ksName, tableName, count))
for tabletName, count := range counts {
if tablets[tabletName] == nil {
continue
}
assert.Empty(t, validateCountInTablet(t, tablets[tabletName], ksName, tableName, count))
}
}

func shardOrders(t *testing.T) {
if err := vc.VtctlClient.ExecuteCommand("ApplyVSchema", "-vschema", ordersVSchema, "customer"); err != nil {
t.Fatal(err)
}
if err := vc.VtctlClient.ExecuteCommand("Migrate", "-cell="+cell.Name, "-workflow=o2c",
if err := vc.VtctlClient.ExecuteCommand("MoveTables", "-cell="+cell.Name, "-workflow=o2c",
"-tablet_types="+"replica,rdonly", "product", "customer", "orders"); err != nil {
t.Fatal(err)
}
customerTab1 := vc.Cells[cell.Name].Keyspaces["customer"].Shards["-80"].Tablets["zone1-200"].Vttablet
customerTab2 := vc.Cells[cell.Name].Keyspaces["customer"].Shards["80-"].Tablets["zone1-300"].Vttablet
if vc.WaitForVReplicationToCatchup(customerTab1, "o2c", "vt_customer", 1*time.Second) != nil {
assert.Fail(t, "Migrate timed out for customer.o2c -80")
assert.Fail(t, "MoveTables timed out for customer.o2c -80")

}
if vc.WaitForVReplicationToCatchup(customerTab2, "o2c", "vt_customer", 1*time.Second) != nil {
assert.Fail(t, "Migrate timed out for customer.o2c 80-")
assert.Fail(t, "MoveTables timed out for customer.o2c 80-")
}

vdiff(t, "customer.o2c")
if output, err := vc.VtctlClient.ExecuteCommandWithOutput("MigrateReads", "-cells="+cell.Name, "-tablet_type=rdonly", "customer.o2c"); err != nil {
t.Fatalf("MigrateReads error: %s\n", output)
if output, err := vc.VtctlClient.ExecuteCommandWithOutput("SwitchReads", "-cells="+cell.Name, "-tablet_type=rdonly", "customer.o2c"); err != nil {
t.Fatalf("SwitchReads error: %s\n", output)
}
if output, err := vc.VtctlClient.ExecuteCommandWithOutput("MigrateReads", "-cells="+cell.Name, "-tablet_type=replica", "customer.o2c"); err != nil {
t.Fatalf("MigrateReads error: %s\n", output)
if output, err := vc.VtctlClient.ExecuteCommandWithOutput("SwitchReads", "-cells="+cell.Name, "-tablet_type=replica", "customer.o2c"); err != nil {
t.Fatalf("SwitchReads error: %s\n", output)
}
if output, err := vc.VtctlClient.ExecuteCommandWithOutput("MigrateWrites", "customer.o2c"); err != nil {
t.Fatalf("MigrateWrites error: %s\n", output)
if output, err := vc.VtctlClient.ExecuteCommandWithOutput("SwitchWrites", "customer.o2c"); err != nil {
t.Fatalf("SwitchWrites error: %s\n", output)
}

assert.Empty(t, validateCountInTablet(t, customerTab1, "customer", "orders", 1))
Expand All @@ -346,30 +344,30 @@ func shardMerchant(t *testing.T) {
if err := vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.master", "merchant", "80-"), 1); err != nil {
t.Fatal(err)
}
if err := vc.VtctlClient.ExecuteCommand("Migrate", "-cell="+cell.Name, "-workflow=p2m",
if err := vc.VtctlClient.ExecuteCommand("MoveTables", "-cell="+cell.Name, "-workflow=p2m",
"-tablet_types="+"replica,rdonly", "product", "merchant", "merchant"); err != nil {
t.Fatal(err)
}

merchantTab1 := vc.Cells[cell.Name].Keyspaces["merchant"].Shards["-80"].Tablets["zone1-400"].Vttablet
merchantTab2 := vc.Cells[cell.Name].Keyspaces["merchant"].Shards["80-"].Tablets["zone1-500"].Vttablet
if vc.WaitForVReplicationToCatchup(merchantTab1, "p2m", "vt_merchant", 1*time.Second) != nil {
t.Fatal("Migrate timed out for merchant.p2m -80")
t.Fatal("MoveTables timed out for merchant.p2m -80")

}
if vc.WaitForVReplicationToCatchup(merchantTab2, "p2m", "vt_merchant", 1*time.Second) != nil {
t.Fatal("Migrate timed out for merchant.p2m 80-")
t.Fatal("MoveTables timed out for merchant.p2m 80-")
}

vdiff(t, "merchant.p2m")
if output, err := vc.VtctlClient.ExecuteCommandWithOutput("MigrateReads", "-cells="+cell.Name, "-tablet_type=rdonly", "merchant.p2m"); err != nil {
t.Fatalf("MigrateReads error: %s\n", output)
if output, err := vc.VtctlClient.ExecuteCommandWithOutput("SwitchReads", "-cells="+cell.Name, "-tablet_type=rdonly", "merchant.p2m"); err != nil {
t.Fatalf("SwitchReads error: %s\n", output)
}
if output, err := vc.VtctlClient.ExecuteCommandWithOutput("MigrateReads", "-cells="+cell.Name, "-tablet_type=replica", "merchant.p2m"); err != nil {
t.Fatalf("MigrateReads error: %s\n", output)
if output, err := vc.VtctlClient.ExecuteCommandWithOutput("SwitchReads", "-cells="+cell.Name, "-tablet_type=replica", "merchant.p2m"); err != nil {
t.Fatalf("SwitchReads error: %s\n", output)
}
if output, err := vc.VtctlClient.ExecuteCommandWithOutput("MigrateWrites", "merchant.p2m"); err != nil {
t.Fatalf("MigrateWrites error: %s\n", output)
if output, err := vc.VtctlClient.ExecuteCommandWithOutput("SwitchWrites", "merchant.p2m"); err != nil {
t.Fatalf("SwitchWrites error: %s\n", output)
}

assert.Empty(t, validateCountInTablet(t, merchantTab1, "merchant", "merchant", 1))
Expand All @@ -394,7 +392,7 @@ func vdiff(t *testing.T, workflow string) {
assert.True(t, len(diffReports) > 0)
for key, diffReport := range diffReports {
if diffReport.ProcessedRows != diffReport.MatchingRows {
fmt.Errorf("vdiff error for %d : %#v\n", key, diffReport)
t.Errorf("vdiff error for %d : %#v\n", key, diffReport)
}
}
}
Expand All @@ -410,7 +408,7 @@ func materializeProduct(t *testing.T) {
customerTablets := vc.getVttabletsInKeyspace(t, cell, "customer", "master")
for _, tab := range customerTablets {
if vc.WaitForVReplicationToCatchup(tab, workflow, "vt_customer", 3*time.Second) != nil {
t.Fatal("Migrate timed out")
t.Fatal("Materialize timed out")
}
}
for _, tab := range customerTablets {
Expand All @@ -427,7 +425,7 @@ func materializeSales(t *testing.T) {
}
productTab := vc.Cells[cell.Name].Keyspaces["product"].Shards["0"].Tablets["zone1-100"].Vttablet
if vc.WaitForVReplicationToCatchup(productTab, "sales", "vt_product", 3*time.Second) != nil {
assert.Fail(t, "Migrate timed out for product.sales -80")
assert.Fail(t, "Materialize timed out for product.sales -80")

}
assert.Empty(t, validateCount(t, vtgateConn, "product", "sales", 2))
Expand All @@ -444,7 +442,7 @@ func materializeMerchantSales(t *testing.T) {
merchantTablets := vc.getVttabletsInKeyspace(t, cell, "merchant", "master")
for _, tab := range merchantTablets {
if vc.WaitForVReplicationToCatchup(tab, workflow, "vt_merchant", 1*time.Second) != nil {
t.Fatal("Migrate timed out")
t.Fatal("Materialize timed out")
}
}
assert.Empty(t, validateCountInTablet(t, merchantTablets["zone1-400"], "merchant", "msales", 1))
Expand All @@ -465,7 +463,7 @@ func materializeMerchantOrders(t *testing.T) {
merchantTablets := vc.getVttabletsInKeyspace(t, cell, "merchant", "master")
for _, tab := range merchantTablets {
if vc.WaitForVReplicationToCatchup(tab, workflow, "vt_merchant", 1*time.Second) != nil {
t.Fatal("Migrate timed out")
t.Fatal("Materialize timed out")
}
}
assert.Empty(t, validateCountInTablet(t, merchantTablets["zone1-400"], "merchant", "morders", 2))
Expand Down
5 changes: 4 additions & 1 deletion go/vt/discovery/healthcheck_flaky_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -697,7 +697,10 @@ func createFixedHealthConn(tablet *topodatapb.Tablet, fixedResult *querypb.Strea

func discoveryDialer(tablet *topodatapb.Tablet, failFast grpcclient.FailFast) (queryservice.QueryService, error) {
key := TabletToMapKey(tablet)
return connMap[key], nil
if qs, ok := connMap[key]; ok {
return qs, nil
}
return nil, fmt.Errorf("tablet %v not found", key)
}

// StreamHealth implements queryservice.QueryService.
Expand Down
Loading

0 comments on commit eb40e46

Please sign in to comment.