Skip to content

Commit

Permalink
vexec for schema-migrations, major refactor
Browse files Browse the repository at this point in the history
Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com>
  • Loading branch information
shlomi-noach committed Sep 9, 2020
1 parent a237ba0 commit c22146e
Show file tree
Hide file tree
Showing 21 changed files with 635 additions and 284 deletions.
367 changes: 224 additions & 143 deletions go/vt/proto/tabletmanagerdata/tabletmanagerdata.pb.go

Large diffs are not rendered by default.

181 changes: 110 additions & 71 deletions go/vt/proto/tabletmanagerservice/tabletmanagerservice.pb.go

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions go/vt/vtcombo/tablet_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,10 @@ func (itmc *internalTabletManagerClient) GetSlaves(ctx context.Context, tablet *
return nil, fmt.Errorf("not implemented in vtcombo")
}

func (itmc *internalTabletManagerClient) VExec(ctx context.Context, tablet *topodatapb.Tablet, query string) (*querypb.QueryResult, error) {
return nil, fmt.Errorf("not implemented in vtcombo")
}

func (itmc *internalTabletManagerClient) VReplicationExec(ctx context.Context, tablet *topodatapb.Tablet, query string) (*querypb.QueryResult, error) {
return nil, fmt.Errorf("not implemented in vtcombo")
}
Expand Down
9 changes: 9 additions & 0 deletions go/vt/vttablet/faketmclient/fake_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,15 @@ func (client *FakeTabletManagerClient) WaitForPosition(ctx context.Context, tabl
return nil
}

func (c *FakeTabletManagerClient) VExec(ctx context.Context, tablet *topodatapb.Tablet, query string) (*querypb.QueryResult, error) {
// This result satisfies a generic VExec command
result := sqltypes.MakeTestResult(
sqltypes.MakeTestFields("id", "int"),
"complete",
)
return sqltypes.ResultToProto3(result), nil
}

// VReplicationExec is part of the tmclient.TabletManagerClient interface.
func (client *FakeTabletManagerClient) VReplicationExec(ctx context.Context, tablet *topodatapb.Tablet, query string) (*querypb.QueryResult, error) {
// This result satisfies 'select pos from _vt.vreplication...' called from split clone unit tests in go/vt/worker.
Expand Down
13 changes: 13 additions & 0 deletions go/vt/vttablet/grpctmclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,19 @@ func (client *Client) GetReplicas(ctx context.Context, tablet *topodatapb.Tablet
return response.Addrs, nil
}

func (client *Client) VExec(ctx context.Context, tablet *topodatapb.Tablet, query string) (*querypb.QueryResult, error) {
cc, c, err := client.dial(tablet)
if err != nil {
return nil, err
}
defer cc.Close()
response, err := c.VExec(ctx, &tabletmanagerdatapb.VExecRequest{Query: query})
if err != nil {
return nil, err
}
return response.Result, nil
}

// VReplicationExec is part of the tmclient.TabletManagerClient interface.
func (client *Client) VReplicationExec(ctx context.Context, tablet *topodatapb.Tablet, query string) (*querypb.QueryResult, error) {
cc, c, err := client.dial(tablet)
Expand Down
8 changes: 8 additions & 0 deletions go/vt/vttablet/grpctmserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,14 @@ func (s *server) GetReplicas(ctx context.Context, request *tabletmanagerdatapb.G
return response, err
}

func (s *server) VExec(ctx context.Context, request *tabletmanagerdatapb.VExecRequest) (response *tabletmanagerdatapb.VExecResponse, err error) {
defer s.tm.HandleRPCPanic(ctx, "VExec", request, response, true /*verbose*/, &err)
ctx = callinfo.GRPCCallInfo(ctx)
response = &tabletmanagerdatapb.VExecResponse{}
response.Result, err = s.tm.VExec(ctx, request.Query)
return response, err
}

func (s *server) VReplicationExec(ctx context.Context, request *tabletmanagerdatapb.VReplicationExecRequest) (response *tabletmanagerdatapb.VReplicationExecResponse, err error) {
defer s.tm.HandleRPCPanic(ctx, "VReplicationExec", request, response, true /*verbose*/, &err)
ctx = callinfo.GRPCCallInfo(ctx)
Expand Down
4 changes: 4 additions & 0 deletions go/vt/vttablet/onlineddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -851,3 +851,7 @@ func (e *Executor) OnSchemaMigrationStatus(ctx context.Context, uuidParam, statu

return nil
}

func (e *Executor) VExec(ctx context.Context, query string, stmt sqlparser.Statement) (*querypb.QueryResult, error) {
return nil, nil
}
1 change: 1 addition & 0 deletions go/vt/vttablet/onlineddl/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
)

const (
SchemaMigrationsTableName = "schema_migrations"
sqlCreateSidecarDB = "create database if not exists %s"
sqlCreateSchemaMigrationsTable = `CREATE TABLE IF NOT EXISTS %s.schema_migrations (
id bigint(20) unsigned NOT NULL AUTO_INCREMENT,
Expand Down
3 changes: 3 additions & 0 deletions go/vt/vttablet/tabletmanager/rpc_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,9 @@ type RPCTM interface {

WaitForPosition(ctx context.Context, pos string) error

// VExec generic API
VExec(ctx context.Context, query string) (*querypb.QueryResult, error)

// VReplication API
VReplicationExec(ctx context.Context, query string) (*querypb.QueryResult, error)
VReplicationWaitForPos(ctx context.Context, id int, pos string) error
Expand Down
59 changes: 59 additions & 0 deletions go/vt/vttablet/tabletmanager/rpc_vexec.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
Copyright 2019 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package tabletmanager

import (
"fmt"

querypb "vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vttablet/onlineddl"

"golang.org/x/net/context"
)

func (tm *TabletManager) extractTableName(stmt sqlparser.Statement) (string, error) {
switch stmt := stmt.(type) {
case *sqlparser.Update:
return sqlparser.String(stmt.TableExprs), nil
case *sqlparser.Delete:
return sqlparser.String(stmt.TableExprs), nil
case *sqlparser.Select:
return sqlparser.String(stmt.From), nil
}
return "", fmt.Errorf("query not supported by vexec: %+v", sqlparser.String(stmt))
}

// VExec executes a generic VExec command.
func (tm *TabletManager) VExec(ctx context.Context, query string) (*querypb.QueryResult, error) {
stmt, err := sqlparser.Parse(query)
if err != nil {
return nil, err
}
tableName, err := tm.extractTableName(stmt)
if err != nil {
return nil, err
}
// TODO(shlomi) do something here!!!
fmt.Printf("======= VExec query: %x, table: %s \n", query, tableName)
switch tableName {
case onlineddl.SchemaMigrationsTableName:
return tm.QueryServiceControl.OnlineDDLExecutor().VExec(ctx, query, stmt)
default:
return nil, fmt.Errorf("table not supported by vexec: %v", tableName)
}
}
4 changes: 4 additions & 0 deletions go/vt/vttablet/tabletserver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"vitess.io/vitess/go/vt/dbconfigs"
"vitess.io/vitess/go/vt/mysqlctl"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vttablet/onlineddl"
"vitess.io/vitess/go/vt/vttablet/queryservice"
"vitess.io/vitess/go/vt/vttablet/tabletserver/rules"
"vitess.io/vitess/go/vt/vttablet/tabletserver/schema"
Expand Down Expand Up @@ -81,6 +82,9 @@ type Controller interface {
// QueryService returns the QueryService object used by this Controller
QueryService() queryservice.QueryService

// OnlineDDLExecutor the online DDL executor used by this Controller
OnlineDDLExecutor() *onlineddl.Executor

// SchemaEngine returns the SchemaEngine object used by this Controller
SchemaEngine() *schema.Engine

Expand Down
5 changes: 5 additions & 0 deletions go/vt/vttablet/tabletserver/tabletserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,11 @@ func (tsv *TabletServer) QueryService() queryservice.QueryService {
return tsv
}

// OnlineDDLExecutor returns the onlineddl.Executor part of TabletServer.
func (tsv *TabletServer) OnlineDDLExecutor() *onlineddl.Executor {
return tsv.onlineDDLExecutor
}

// SchemaEngine returns the SchemaEngine part of TabletServer.
func (tsv *TabletServer) SchemaEngine() *schema.Engine {
return tsv.se
Expand Down
6 changes: 6 additions & 0 deletions go/vt/vttablet/tabletservermock/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"vitess.io/vitess/go/vt/mysqlctl"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vttablet/onlineddl"
"vitess.io/vitess/go/vt/vttablet/queryservice"
"vitess.io/vitess/go/vt/vttablet/tabletserver/rules"
"vitess.io/vitess/go/vt/vttablet/tabletserver/schema"
Expand Down Expand Up @@ -174,6 +175,11 @@ func (tqsc *Controller) ReloadSchema(ctx context.Context) error {
return nil
}

// OnlineDDLExecutor is part of the tabletserver.Controller interface
func (tqsc *Controller) OnlineDDLExecutor() *onlineddl.Executor {
return nil
}

//ClearQueryPlanCache is part of the tabletserver.Controller interface
func (tqsc *Controller) ClearQueryPlanCache() {
}
Expand Down
3 changes: 3 additions & 0 deletions go/vt/vttablet/tmclient/rpc_client_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,9 @@ type TabletManagerClient interface {
// WaitForPosition waits for the position to be reached
WaitForPosition(ctx context.Context, tablet *topodatapb.Tablet, pos string) error

// VExec executes a generic VExec command
VExec(ctx context.Context, tablet *topodatapb.Tablet, query string) (*querypb.QueryResult, error)

// VReplicationExec executes a VReplication command
VReplicationExec(ctx context.Context, tablet *topodatapb.Tablet, query string) (*querypb.QueryResult, error)
VReplicationWaitForPos(ctx context.Context, tablet *topodatapb.Tablet, id int, pos string) error
Expand Down
8 changes: 8 additions & 0 deletions go/vt/vttablet/tmrpctest/test_tm_rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -960,6 +960,14 @@ func tmRPCTestGetReplicasPanic(ctx context.Context, t *testing.T, client tmclien
expectHandleRPCPanic(t, "GetReplicas", false /*verbose*/, err)
}

func (fra *fakeRPCTM) VExec(ctx context.Context, query string) (*querypb.QueryResult, error) {
if fra.panics {
panic(fmt.Errorf("test-triggered panic"))
}
compare(fra.t, "VExec query", query, "query")
return testExecuteFetchResult, nil
}

var testVRQuery = "query"

func (fra *fakeRPCTM) VReplicationExec(ctx context.Context, query string) (*querypb.QueryResult, error) {
Expand Down
9 changes: 9 additions & 0 deletions go/vt/wrangler/tablet.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,15 @@ func (wr *Wrangler) VReplicationExec(ctx context.Context, tabletAlias *topodatap
return wr.tmc.VReplicationExec(ctx, ti.Tablet, query)
}

// VReplicationExec executes a query remotely using the DBA pool
func (wr *Wrangler) GenericVExec(ctx context.Context, tabletAlias *topodatapb.TabletAlias, query string) (*querypb.QueryResult, error) {
ti, err := wr.ts.GetTablet(ctx, tabletAlias)
if err != nil {
return nil, err
}
return wr.tmc.VExec(ctx, ti.Tablet, query)
}

// isMasterTablet is a shortcut way to determine whether the current tablet
// is a master before we allow its tablet record to be deleted. The canonical
// way to determine the only true master in a shard is to list all the tablets
Expand Down
43 changes: 9 additions & 34 deletions go/vt/wrangler/vexec.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
topodatapb "vitess.io/vitess/go/vt/proto/topodata"

"github.com/golang/protobuf/proto"
"github.com/olekukonko/tablewriter"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/concurrency"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
Expand All @@ -41,7 +40,8 @@ import (
)

const (
vreplicationTableName = "_vt.vreplication"
vreplicationTableName = "_vt.vreplication"
schemaMigrationsTableName = "_vt.schema_migrations"
)

type vexec struct {
Expand All @@ -66,7 +66,7 @@ func newVExec(ctx context.Context, workflow, keyspace, query string, wr *Wrangle
}
}

// VExec executes queries on _vt.vreplication on all masters in the target keyspace of the workflow
// VExec executes queries on a table on all masters in the target keyspace of the workflow
func (wr *Wrangler) VExec(ctx context.Context, workflow, keyspace, query string, dryRun bool) (map[*topo.TabletInfo]*sqltypes.Result, error) {
results, err := wr.runVexec(ctx, workflow, keyspace, query, dryRun)
retResults := make(map[*topo.TabletInfo]*sqltypes.Result)
Expand All @@ -86,40 +86,17 @@ func (wr *Wrangler) runVexec(ctx context.Context, workflow, keyspace, query stri
}
fullQuery := vx.plan.parsedQuery.Query
if dryRun {
return nil, vx.outputDryRunInfo(wr)
return nil, vx.outputDryRunInfo()
}
return vx.exec(fullQuery)
}

func (vx *vexec) outputDryRunInfo(wr *Wrangler) error {
rsr, err := vx.wr.getStreams(vx.ctx, vx.workflow, vx.keyspace)
if err != nil {
return err
}

wr.Logger().Printf("Query: %s\nwill be run on the following streams in keyspace %s for workflow %s:\n\n",
vx.plan.parsedQuery.Query, vx.keyspace, vx.workflow)
tableString := &strings.Builder{}
table := tablewriter.NewWriter(tableString)
table.SetHeader([]string{"Tablet", "ID", "BinLogSource", "State", "DBName", "Current GTID", "MaxReplicationLag"})
for _, master := range vx.masters {
key := fmt.Sprintf("%s/%s", master.Shard, master.AliasString())
for _, stream := range rsr.ShardStatuses[key].MasterReplicationStatuses {
table.Append([]string{key, fmt.Sprintf("%d", stream.ID), stream.Bls.String(), stream.State, stream.DBName, stream.Pos, fmt.Sprintf("%d", stream.MaxReplicationLag)})
}
}
table.SetAutoMergeCellsByColumnIndex([]int{0})
table.SetRowLine(true)
table.Render()
wr.Logger().Printf(tableString.String())
wr.Logger().Printf("\n\n")

return nil
func (vx *vexec) outputDryRunInfo() error {
return vx.plan.planner.dryRun()
}

func (vx *vexec) exec(query string) (map[*topo.TabletInfo]*querypb.QueryResult, error) {
var wg sync.WaitGroup
workflow := vx.workflow
allErrors := &concurrency.AllErrorRecorder{}
results := make(map[*topo.TabletInfo]*querypb.QueryResult)
var mu sync.Mutex
Expand All @@ -130,14 +107,12 @@ func (vx *vexec) exec(query string) (map[*topo.TabletInfo]*querypb.QueryResult,
go func(ctx context.Context, master *topo.TabletInfo) {
defer wg.Done()
log.Infof("Running %s on %s\n", query, master.AliasString())
qr, err := vx.wr.VReplicationExec(ctx, master.Alias, query)
qr, err := vx.plan.planner.exec(ctx, master.Alias, query)
log.Infof("Result is %s: %v", master.AliasString(), qr)
if err != nil {
allErrors.RecordError(err)
} else {
if qr.RowsAffected == 0 {
log.Infof("no matching streams found for workflow %s, tablet %s, query %s", workflow, master.Alias, query)
} else {
if qr.RowsAffected > 0 {
mu.Lock()
results[master] = qr
mu.Unlock()
Expand Down Expand Up @@ -240,7 +215,7 @@ func (wr *Wrangler) execWorkflowAction(ctx context.Context, workflow, keyspace,
}
fullQuery := vx.plan.parsedQuery.Query
if dryRun {
return nil, vx.outputDryRunInfo(wr)
return nil, vx.outputDryRunInfo()
}
results, err := vx.exec(fullQuery)
return results, err
Expand Down
Loading

0 comments on commit c22146e

Please sign in to comment.