From 74e2af6b03df07616a08050174b82b4c2f29c755 Mon Sep 17 00:00:00 2001 From: Michael Berlin Date: Thu, 15 Sep 2016 17:27:10 -0700 Subject: [PATCH 1/5] worker: Change spelling from "cancelled" to "canceled". The new spelling is in line with other code in Vitess and the Go project in general. --- go/vt/worker/block.go | 6 +++--- go/vt/worker/block_cmd.go | 4 ++-- go/vt/worker/status.go | 2 +- go/vt/worker/vtworkerclienttest/client_testsuite.go | 6 +++--- go/vt/worker/worker.go | 4 ++-- 5 files changed, 11 insertions(+), 11 deletions(-) diff --git a/go/vt/worker/block.go b/go/vt/worker/block.go index 252eb41d161..621e6d19b21 100644 --- a/go/vt/worker/block.go +++ b/go/vt/worker/block.go @@ -12,7 +12,7 @@ import ( "github.com/youtube/vitess/go/vt/wrangler" ) -// BlockWorker will block infinitely until its context is cancelled. +// BlockWorker will block infinitely until its context is canceled. type BlockWorker struct { StatusWorker @@ -32,7 +32,7 @@ func NewBlockWorker(wr *wrangler.Wrangler) (Worker, error) { func (bw *BlockWorker) StatusAsHTML() template.HTML { state := bw.State() - result := "Block Command (blocking infinitely until context is cancelled)
\n" + result := "Block Command (blocking infinitely until context is canceled)
\n" result += "State: " + state.String() + "
\n" switch state { case WorkerStateDebugRunning: @@ -76,7 +76,7 @@ func (bw *BlockWorker) Run(ctx context.Context) error { func (bw *BlockWorker) run(ctx context.Context) error { // We reuse the Copy state to reflect that the blocking is in progress. bw.SetState(WorkerStateDebugRunning) - bw.wr.Logger().Printf("Block command was called and will block infinitely until the RPC context is cancelled.\n") + bw.wr.Logger().Printf("Block command was called and will block infinitely until the RPC context is canceled.\n") select { case <-ctx.Done(): } diff --git a/go/vt/worker/block_cmd.go b/go/vt/worker/block_cmd.go index 6f51005be06..329445fc7e5 100644 --- a/go/vt/worker/block_cmd.go +++ b/go/vt/worker/block_cmd.go @@ -26,7 +26,7 @@ const blockHTML = ` Error: {{.Error}}
{{else}}
- +
{{end}} @@ -71,5 +71,5 @@ func init() { AddCommand("Debugging", Command{"Block", commandBlock, interactiveBlock, "", - "For internal tests only. When triggered, the command will block until cancelled."}) + "For internal tests only. When triggered, the command will block until canceled."}) } diff --git a/go/vt/worker/status.go b/go/vt/worker/status.go index 4c31c4596c7..9c1b9d5e391 100644 --- a/go/vt/worker/status.go +++ b/go/vt/worker/status.go @@ -124,7 +124,7 @@ func (wi *Instance) InitStatusHandling() { } if wi.Cancel() { - // We cancelled the running worker. Go back to the status page. + // We canceled the running worker. Go back to the status page. http.Redirect(w, r, servenv.StatusURLPath(), http.StatusTemporaryRedirect) } else { // No worker, or not running, we go to the menu. diff --git a/go/vt/worker/vtworkerclienttest/client_testsuite.go b/go/vt/worker/vtworkerclienttest/client_testsuite.go index 1b6872f1123..c1d5fb955e5 100644 --- a/go/vt/worker/vtworkerclienttest/client_testsuite.go +++ b/go/vt/worker/vtworkerclienttest/client_testsuite.go @@ -122,7 +122,7 @@ func commandErrorsBecauseBusy(t *testing.T, client vtworkerclient.Client) { for { if _, err := stream.Recv(); err != nil { if vterrors.RecoverVtErrorCode(err) != vtrpcpb.ErrorCode_CANCELLED { - blockErr = fmt.Errorf("Block command should only error due to cancelled context: %v", err) + blockErr = fmt.Errorf("Block command should only error due to canceled context: %v", err) } // Stream has finished. break @@ -159,9 +159,9 @@ func commandErrorsBecauseBusy(t *testing.T, client vtworkerclient.Client) { t.Fatal(err) } // vtworker is now in a special state where the current job is already - // cancelled but not reset yet. New commands are still failing with a + // canceled but not reset yet. New commands are still failing with a // retryable error. - gotErr2 := runVtworkerCommand(client, []string{"Ping", "Cancelled and still busy?"}) + gotErr2 := runVtworkerCommand(client, []string{"Ping", "canceled and still busy?"}) wantCode2 := vtrpcpb.ErrorCode_TRANSIENT_ERROR if gotCode2 := vterrors.RecoverVtErrorCode(gotErr2); gotCode2 != wantCode2 { t.Fatalf("wrong error code for second cmd before reset: got = %v, want = %v, err: %v", gotCode2, wantCode2, gotErr2) diff --git a/go/vt/worker/worker.go b/go/vt/worker/worker.go index 0e68e38c67b..bc98113c3bf 100644 --- a/go/vt/worker/worker.go +++ b/go/vt/worker/worker.go @@ -30,8 +30,8 @@ type Worker interface { StatusAsText() string // Run is the main entry point for the worker. It will be - // called in a go routine. When the passed in context is - // cancelled, Run should exit as soon as possible. + // called in a go routine. When the passed in context is canceled, Run() + // should exit as soon as possible. Run(context.Context) error } From f50ea09ff1e34c9ef78034a56db9c8fe59e6bfab Mon Sep 17 00:00:00 2001 From: Michael Berlin Date: Thu, 15 Sep 2016 17:30:10 -0700 Subject: [PATCH 2/5] worker: Fix flaky unit test (b/30405420). Retry to "Reset" a command which was canceled until "Reset" succeeds. The fact that a canceled RPC did return was not enough to guarantee that the execution in "vtworker" finished and "Reset" succeeds. --- .../vtworkerclienttest/client_testsuite.go | 35 ++++++++++++++++++- 1 file changed, 34 insertions(+), 1 deletion(-) diff --git a/go/vt/worker/vtworkerclienttest/client_testsuite.go b/go/vt/worker/vtworkerclienttest/client_testsuite.go index c1d5fb955e5..9c0fc982ad1 100644 --- a/go/vt/worker/vtworkerclienttest/client_testsuite.go +++ b/go/vt/worker/vtworkerclienttest/client_testsuite.go @@ -167,7 +167,8 @@ func commandErrorsBecauseBusy(t *testing.T, client vtworkerclient.Client) { t.Fatalf("wrong error code for second cmd before reset: got = %v, want = %v, err: %v", gotCode2, wantCode2, gotErr2) } - if err := runVtworkerCommand(client, []string{"Reset"}); err != nil { + // Reset vtworker for the next test function. + if err := resetVtworker(t, client); err != nil { t.Fatal(err) } @@ -182,6 +183,38 @@ func commandErrorsBecauseBusy(t *testing.T, client vtworkerclient.Client) { } } +// resetVtworker will retry to "Reset" vtworker until it succeeds. +// Retries are necessary to cope with the following race: +// a) RPC started vtworker command e.g. "Block". +// b) A second command runs "Cancel" and vtworker cancels the first command. +// c) RPC returns with a response after cancelation was received by vtworker. +// d) vtworker is still canceling and shutting down the command. +// e) A new vtworker command e.g. "Reset" would fail at this point with +// "vtworker still executing" until the cancelation is complete. +func resetVtworker(t *testing.T, client vtworkerclient.Client) error { + start := time.Now() + attempts := 0 + for { + attempts++ + err := runVtworkerCommand(client, []string{"Reset"}) + + if err == nil { + return nil + } + + if time.Since(start) > 5*time.Second { + return fmt.Errorf("Reset was not successful after 5s and %d attempts: %v", attempts, err) + } + + if !strings.Contains(err.Error(), "worker still executing") { + return fmt.Errorf("Reset must not fail: %v", err) + } + + t.Logf("retrying to Reset vtworker because the previous command has not finished yet. got err: %v", err) + continue + } +} + func commandErrors(t *testing.T, client vtworkerclient.Client) { stream, err := client.ExecuteVtworkerCommand(context.Background(), []string{"NonexistingCommand"}) // The expected error could already be seen now or after the output channel is closed. From 5b67096895c9a64a7a614d079eb1857c8d95f920 Mon Sep 17 00:00:00 2001 From: Michael Berlin Date: Thu, 15 Sep 2016 17:35:50 -0700 Subject: [PATCH 3/5] worker: Pass the Context from the RPC server call to the executed command. --- go/cmd/vtworker/vtworker.go | 4 ++-- go/vt/worker/command.go | 10 +++++----- go/vt/worker/grpcvtworkerserver/server.go | 8 ++------ go/vt/worker/instance.go | 14 +++++++------- go/vt/worker/interactive.go | 7 +++++-- go/vt/worker/legacy_split_clone_test.go | 2 +- go/vt/worker/split_clone_test.go | 2 +- go/vt/worker/split_diff_test.go | 2 +- go/vt/worker/utils_test.go | 2 +- go/vt/worker/vertical_split_clone_test.go | 2 +- go/vt/worker/vertical_split_diff_test.go | 2 +- .../worker/vtworkerclienttest/client_testsuite.go | 12 ++++-------- 12 files changed, 31 insertions(+), 36 deletions(-) diff --git a/go/cmd/vtworker/vtworker.go b/go/cmd/vtworker/vtworker.go index 7fcb0358609..6b261407c61 100644 --- a/go/cmd/vtworker/vtworker.go +++ b/go/cmd/vtworker/vtworker.go @@ -61,7 +61,7 @@ func main() { ts := topo.GetServer() defer topo.CloseServers() - wi = worker.NewInstance(context.Background(), ts, *cell, *commandDisplayInterval) + wi = worker.NewInstance(ts, *cell, *commandDisplayInterval) wi.InstallSignalHandlers() wi.InitStatusHandling() @@ -70,7 +70,7 @@ func main() { wi.InitInteractiveMode() } else { // In single command mode, just run it. - worker, done, err := wi.RunCommand(args, nil /*custom wrangler*/, true /*runFromCli*/) + worker, done, err := wi.RunCommand(context.Background(), args, nil /*custom wrangler*/, true /*runFromCli*/) if err != nil { log.Error(err) exit.Return(1) diff --git a/go/vt/worker/command.go b/go/vt/worker/command.go index c0ed29bcc98..222e4a9b455 100644 --- a/go/vt/worker/command.go +++ b/go/vt/worker/command.go @@ -13,9 +13,10 @@ import ( "strings" "time" - log "github.com/golang/glog" "golang.org/x/net/context" + log "github.com/golang/glog" + "github.com/youtube/vitess/go/vt/logutil" "github.com/youtube/vitess/go/vt/vterrors" "github.com/youtube/vitess/go/vt/wrangler" @@ -105,12 +106,11 @@ func commandWorker(wi *Instance, wr *wrangler.Wrangler, args []string, cell stri // If wr is nil, the default wrangler will be used. // If you pass a wr wrangler, note that a MemoryLogger will be added to its current logger. // The returned worker and done channel may be nil if no worker was started e.g. in case of a "Reset". -func (wi *Instance) RunCommand(args []string, wr *wrangler.Wrangler, runFromCli bool) (Worker, chan struct{}, error) { +func (wi *Instance) RunCommand(ctx context.Context, args []string, wr *wrangler.Wrangler, runFromCli bool) (Worker, chan struct{}, error) { if len(args) >= 1 { switch args[0] { case "Reset": - err := wi.Reset() - return nil, nil, err + return nil, nil, wi.Reset() case "Cancel": wi.Cancel() return nil, nil, nil @@ -124,7 +124,7 @@ func (wi *Instance) RunCommand(args []string, wr *wrangler.Wrangler, runFromCli if err != nil { return nil, nil, err } - done, err := wi.setAndStartWorker(wrk, wr) + done, err := wi.setAndStartWorker(ctx, wrk, wr) if err != nil { return nil, nil, vterrors.WithPrefix("cannot set worker: ", err) } diff --git a/go/vt/worker/grpcvtworkerserver/server.go b/go/vt/worker/grpcvtworkerserver/server.go index 14a1672bc83..5206c45d592 100644 --- a/go/vt/worker/grpcvtworkerserver/server.go +++ b/go/vt/worker/grpcvtworkerserver/server.go @@ -40,9 +40,6 @@ func (s *VtworkerServer) ExecuteVtworkerCommand(args *vtworkerdatapb.ExecuteVtwo // Stream everything back what the Wrangler is logging. logstream := logutil.NewCallbackLogger(func(e *logutilpb.Event) { - // If the client disconnects, we will just fail - // to send the log events, but won't interrupt - // the command. stream.Send(&vtworkerdatapb.ExecuteVtworkerCommandResponse{ Event: e, }) @@ -52,11 +49,10 @@ func (s *VtworkerServer) ExecuteVtworkerCommand(args *vtworkerdatapb.ExecuteVtwo // is preserved in the logs in case the RPC or vtworker crashes. logger := logutil.NewTeeLogger(logstream, logutil.NewConsoleLogger()) - // create the wrangler wr := s.wi.CreateWrangler(logger) - // execute the command - worker, done, err := s.wi.RunCommand(args.Args, wr, false /*runFromCli*/) + // Run the command as long as the RPC Context is valid. + worker, done, err := s.wi.RunCommand(stream.Context(), args.Args, wr, false /*runFromCli*/) if err == nil && worker != nil && done != nil { err = s.wi.WaitForCommand(worker, done) } diff --git a/go/vt/worker/instance.go b/go/vt/worker/instance.go index e4882a67088..dee57b8903a 100644 --- a/go/vt/worker/instance.go +++ b/go/vt/worker/instance.go @@ -13,7 +13,10 @@ import ( "syscall" "time" + "golang.org/x/net/context" + log "github.com/golang/glog" + "github.com/youtube/vitess/go/tb" "github.com/youtube/vitess/go/vt/logutil" vtrpcpb "github.com/youtube/vitess/go/vt/proto/vtrpc" @@ -21,7 +24,6 @@ import ( "github.com/youtube/vitess/go/vt/topo" "github.com/youtube/vitess/go/vt/vterrors" "github.com/youtube/vitess/go/vt/wrangler" - "golang.org/x/net/context" ) // Instance encapsulate the execution state of vtworker. @@ -47,16 +49,14 @@ type Instance struct { lastRunError error lastRunStopTime time.Time - // backgroundContext is context.Background() from main() which has to be plumbed through. - backgroundContext context.Context topoServer topo.Server cell string commandDisplayInterval time.Duration } // NewInstance creates a new Instance. -func NewInstance(ctx context.Context, ts topo.Server, cell string, commandDisplayInterval time.Duration) *Instance { - wi := &Instance{backgroundContext: ctx, topoServer: ts, cell: cell, commandDisplayInterval: commandDisplayInterval} +func NewInstance(ts topo.Server, cell string, commandDisplayInterval time.Duration) *Instance { + wi := &Instance{topoServer: ts, cell: cell, commandDisplayInterval: commandDisplayInterval} // Note: setAndStartWorker() also adds a MemoryLogger for the webserver. wi.wr = wi.CreateWrangler(logutil.NewConsoleLogger()) return wi @@ -70,7 +70,7 @@ func (wi *Instance) CreateWrangler(logger logutil.Logger) *wrangler.Wrangler { // setAndStartWorker will set the current worker. // We always log to both memory logger (for display on the web) and // console logger (for records / display of command line worker). -func (wi *Instance) setAndStartWorker(wrk Worker, wr *wrangler.Wrangler) (chan struct{}, error) { +func (wi *Instance) setAndStartWorker(ctx context.Context, wrk Worker, wr *wrangler.Wrangler) (chan struct{}, error) { wi.currentWorkerMutex.Lock() defer wi.currentWorkerMutex.Unlock() @@ -99,7 +99,7 @@ func (wi *Instance) setAndStartWorker(wrk Worker, wr *wrangler.Wrangler) (chan s wi.currentWorker = wrk wi.currentMemoryLogger = logutil.NewMemoryLogger() - wi.currentContext, wi.currentCancelFunc = context.WithCancel(wi.backgroundContext) + wi.currentContext, wi.currentCancelFunc = context.WithCancel(ctx) wi.lastRunError = nil wi.lastRunStopTime = time.Unix(0, 0) done := make(chan struct{}) diff --git a/go/vt/worker/interactive.go b/go/vt/worker/interactive.go index b553aeb4fd6..8c474ffc38d 100644 --- a/go/vt/worker/interactive.go +++ b/go/vt/worker/interactive.go @@ -9,7 +9,10 @@ import ( "html/template" "net/http" + "golang.org/x/net/context" + log "github.com/golang/glog" + "github.com/youtube/vitess/go/acl" "github.com/youtube/vitess/go/vt/servenv" ) @@ -100,7 +103,7 @@ func (wi *Instance) InitInteractiveMode() { return } - wrk, template, data, err := pc.Interactive(wi.backgroundContext, wi, wi.wr, w, r) + wrk, template, data, err := pc.Interactive(context.Background(), wi, wi.wr, w, r) if err != nil { httpError(w, "%s", err) } else if template != nil && data != nil { @@ -113,7 +116,7 @@ func (wi *Instance) InitInteractiveMode() { return } - if _, err := wi.setAndStartWorker(wrk, wi.wr); err != nil { + if _, err := wi.setAndStartWorker(context.Background(), wrk, wi.wr); err != nil { httpError(w, "Could not set %s worker: %s", c.Name, err) return } diff --git a/go/vt/worker/legacy_split_clone_test.go b/go/vt/worker/legacy_split_clone_test.go index 7506616d9ca..18e848aa7aa 100644 --- a/go/vt/worker/legacy_split_clone_test.go +++ b/go/vt/worker/legacy_split_clone_test.go @@ -65,7 +65,7 @@ func (tc *legacySplitCloneTestCase) setUp(v3 bool) { db := fakesqldb.Register() tc.ts = zktestserver.New(tc.t, []string{"cell1", "cell2"}) ctx := context.Background() - tc.wi = NewInstance(ctx, tc.ts, "cell1", time.Second) + tc.wi = NewInstance(tc.ts, "cell1", time.Second) if v3 { if err := tc.ts.CreateKeyspace(ctx, "ks", &topodatapb.Keyspace{}); err != nil { diff --git a/go/vt/worker/split_clone_test.go b/go/vt/worker/split_clone_test.go index 0c8f12f02aa..407b0a65af0 100644 --- a/go/vt/worker/split_clone_test.go +++ b/go/vt/worker/split_clone_test.go @@ -90,7 +90,7 @@ func (tc *splitCloneTestCase) setUpWithConcurreny(v3 bool, concurrency, writeQue db := fakesqldb.Register() tc.ts = zktestserver.New(tc.t, []string{"cell1", "cell2"}) ctx := context.Background() - tc.wi = NewInstance(ctx, tc.ts, "cell1", time.Second) + tc.wi = NewInstance(tc.ts, "cell1", time.Second) if v3 { if err := tc.ts.CreateKeyspace(ctx, "ks", &topodatapb.Keyspace{}); err != nil { diff --git a/go/vt/worker/split_diff_test.go b/go/vt/worker/split_diff_test.go index 145c1953e63..570f5ff12f8 100644 --- a/go/vt/worker/split_diff_test.go +++ b/go/vt/worker/split_diff_test.go @@ -161,7 +161,7 @@ func testSplitDiff(t *testing.T, v3 bool) { db := fakesqldb.Register() ts := zktestserver.New(t, []string{"cell1", "cell2"}) ctx := context.Background() - wi := NewInstance(ctx, ts, "cell1", time.Second) + wi := NewInstance(ts, "cell1", time.Second) if v3 { if err := ts.CreateKeyspace(ctx, "ks", &topodatapb.Keyspace{}); err != nil { diff --git a/go/vt/worker/utils_test.go b/go/vt/worker/utils_test.go index 4ab2b0f3f13..ad0e89f447f 100644 --- a/go/vt/worker/utils_test.go +++ b/go/vt/worker/utils_test.go @@ -25,7 +25,7 @@ import ( // This file contains common test helper. func runCommand(t *testing.T, wi *Instance, wr *wrangler.Wrangler, args []string) error { - worker, done, err := wi.RunCommand(args, wr, false /* runFromCli */) + worker, done, err := wi.RunCommand(context.Background(), args, wr, false /* runFromCli */) if err != nil { return fmt.Errorf("Worker creation failed: %v", err) } diff --git a/go/vt/worker/vertical_split_clone_test.go b/go/vt/worker/vertical_split_clone_test.go index 6566d450c1f..f168ca5220b 100644 --- a/go/vt/worker/vertical_split_clone_test.go +++ b/go/vt/worker/vertical_split_clone_test.go @@ -52,7 +52,7 @@ func TestVerticalSplitClone(t *testing.T) { db := fakesqldb.Register() ts := zktestserver.New(t, []string{"cell1", "cell2"}) ctx := context.Background() - wi := NewInstance(ctx, ts, "cell1", time.Second) + wi := NewInstance(ts, "cell1", time.Second) sourceMaster := testlib.NewFakeTablet(t, wi.wr, "cell1", 0, topodatapb.TabletType_MASTER, db, testlib.TabletKeyspaceShard(t, "source_ks", "0")) diff --git a/go/vt/worker/vertical_split_diff_test.go b/go/vt/worker/vertical_split_diff_test.go index 355953ef608..2099948558d 100644 --- a/go/vt/worker/vertical_split_diff_test.go +++ b/go/vt/worker/vertical_split_diff_test.go @@ -83,7 +83,7 @@ func TestVerticalSplitDiff(t *testing.T) { db := fakesqldb.Register() ts := zktestserver.New(t, []string{"cell1", "cell2"}) ctx := context.Background() - wi := NewInstance(ctx, ts, "cell1", time.Second) + wi := NewInstance(ts, "cell1", time.Second) sourceMaster := testlib.NewFakeTablet(t, wi.wr, "cell1", 0, topodatapb.TabletType_MASTER, db, testlib.TabletKeyspaceShard(t, "source_ks", "0")) diff --git a/go/vt/worker/vtworkerclienttest/client_testsuite.go b/go/vt/worker/vtworkerclienttest/client_testsuite.go index 9c0fc982ad1..870c86c78f9 100644 --- a/go/vt/worker/vtworkerclienttest/client_testsuite.go +++ b/go/vt/worker/vtworkerclienttest/client_testsuite.go @@ -45,7 +45,7 @@ func init() { // CreateWorkerInstance returns a properly configured vtworker instance. func CreateWorkerInstance(t *testing.T) *worker.Instance { ts := zktestserver.New(t, []string{"cell1", "cell2"}) - return worker.NewInstance(context.Background(), ts, "cell1", 1*time.Second) + return worker.NewInstance(ts, "cell1", 1*time.Second) } // TestSuite runs the test suite on the given vtworker and vtworkerclient. @@ -147,17 +147,13 @@ func commandErrorsBecauseBusy(t *testing.T, client vtworkerclient.Client) { t.Fatalf("wrong error code for second cmd: got = %v, want = %v, err: %v", gotCode, wantCode, gotErr) } - // Cancel Block. + // Cancel running "Block" command. cancel() wg.Wait() if blockErr != nil { t.Fatalf("Block command should not have failed: %v", blockErr) } - // It looks like gRPC cancels the RPC only on the client-side. Therefore, we - // have to explicitly cancel the (still) running vtworker command. - if err := runVtworkerCommand(client, []string{"Cancel"}); err != nil { - t.Fatal(err) - } + // vtworker is now in a special state where the current job is already // canceled but not reset yet. New commands are still failing with a // retryable error. @@ -186,7 +182,7 @@ func commandErrorsBecauseBusy(t *testing.T, client vtworkerclient.Client) { // resetVtworker will retry to "Reset" vtworker until it succeeds. // Retries are necessary to cope with the following race: // a) RPC started vtworker command e.g. "Block". -// b) A second command runs "Cancel" and vtworker cancels the first command. +// b) client cancels RPC and triggers vtworker to cancel the running command. // c) RPC returns with a response after cancelation was received by vtworker. // d) vtworker is still canceling and shutting down the command. // e) A new vtworker command e.g. "Reset" would fail at this point with From 375554bbaf3f29a5de05d23a0f25c1b0396896ce Mon Sep 17 00:00:00 2001 From: Michael Berlin Date: Thu, 15 Sep 2016 22:03:49 -0700 Subject: [PATCH 4/5] worker: Return a CANCELED error code when the command was canceled. For example, the command gets canceled when the process is shutdown. By returning a detailed error code, the caller can decide if it wants to retry in special situations like these. As part of this commit, I'm also returning a DEADLINE_EXCEEDED error code when grpc.Dial() failed (most likely due to a timeout error). --- go/vt/worker/block.go | 3 ++- go/vt/worker/grpcvtworkerclient/client.go | 8 +++--- go/vt/worker/instance.go | 10 +++++++ .../vtworkerclienttest/client_testsuite.go | 27 ++++++++++++++----- 4 files changed, 38 insertions(+), 10 deletions(-) diff --git a/go/vt/worker/block.go b/go/vt/worker/block.go index 621e6d19b21..780d04f62b2 100644 --- a/go/vt/worker/block.go +++ b/go/vt/worker/block.go @@ -5,6 +5,7 @@ package worker import ( + "errors" "html/template" "golang.org/x/net/context" @@ -83,5 +84,5 @@ func (bw *BlockWorker) run(ctx context.Context) error { bw.wr.Logger().Printf("Block command finished because the context is done: '%v'.\n", ctx.Err()) bw.SetState(WorkerStateDone) - return nil + return errors.New("command 'Block' was canceled") } diff --git a/go/vt/worker/grpcvtworkerclient/client.go b/go/vt/worker/grpcvtworkerclient/client.go index 509ff366b3f..7e120dea354 100644 --- a/go/vt/worker/grpcvtworkerclient/client.go +++ b/go/vt/worker/grpcvtworkerclient/client.go @@ -9,14 +9,16 @@ import ( "flag" "time" + "golang.org/x/net/context" + "google.golang.org/grpc" + "github.com/youtube/vitess/go/vt/logutil" "github.com/youtube/vitess/go/vt/servenv/grpcutils" "github.com/youtube/vitess/go/vt/vterrors" "github.com/youtube/vitess/go/vt/worker/vtworkerclient" - "golang.org/x/net/context" - "google.golang.org/grpc" logutilpb "github.com/youtube/vitess/go/vt/proto/logutil" + vtrpcpb "github.com/youtube/vitess/go/vt/proto/vtrpc" vtworkerdatapb "github.com/youtube/vitess/go/vt/proto/vtworkerdata" vtworkerservicepb "github.com/youtube/vitess/go/vt/proto/vtworkerservice" ) @@ -41,7 +43,7 @@ func gRPCVtworkerClientFactory(addr string, dialTimeout time.Duration) (vtworker } cc, err := grpc.Dial(addr, opt, grpc.WithBlock(), grpc.WithTimeout(dialTimeout)) if err != nil { - return nil, err + return nil, vterrors.NewVitessError(vtrpcpb.ErrorCode_DEADLINE_EXCEEDED, err, "grpc.Dial() err: %v", err) } c := vtworkerservicepb.NewVtworkerClient(cc) diff --git a/go/vt/worker/instance.go b/go/vt/worker/instance.go index dee57b8903a..f905e1edf63 100644 --- a/go/vt/worker/instance.go +++ b/go/vt/worker/instance.go @@ -135,6 +135,16 @@ func (wi *Instance) setAndStartWorker(ctx context.Context, wrk Worker, wr *wrang // run will take a long time err = wrk.Run(wi.currentContext) + + // If the context was canceled, include the respective error code. + select { + case <-wi.currentContext.Done(): + // Context is done i.e. probably canceled. + if wi.currentContext.Err() == context.Canceled { + err = vterrors.NewVitessError(vtrpcpb.ErrorCode_CANCELLED, err, "vtworker command was canceled: %v", err) + } + default: + } }() return done, nil diff --git a/go/vt/worker/vtworkerclienttest/client_testsuite.go b/go/vt/worker/vtworkerclienttest/client_testsuite.go index 870c86c78f9..8a70801f56f 100644 --- a/go/vt/worker/vtworkerclienttest/client_testsuite.go +++ b/go/vt/worker/vtworkerclienttest/client_testsuite.go @@ -54,7 +54,9 @@ func TestSuite(t *testing.T, c vtworkerclient.Client) { commandErrors(t, c) - commandErrorsBecauseBusy(t, c) + commandErrorsBecauseBusy(t, c, false /* client side cancelation */) + + commandErrorsBecauseBusy(t, c, true /* server side cancelation */) commandPanics(t, c) } @@ -103,14 +105,17 @@ func runVtworkerCommand(client vtworkerclient.Client, args []string) error { } } -func commandErrorsBecauseBusy(t *testing.T, client vtworkerclient.Client) { +// commandErrorsBecauseBusy tests that concurrent commands are rejected with +// TRANSIENT_ERROR while a command is already running. +// It also tests the correct propagation of the CANCELED error code. +func commandErrorsBecauseBusy(t *testing.T, client vtworkerclient.Client, serverSideCancelation bool) { // Run the vtworker "Block" command which blocks until we cancel the context. var wg sync.WaitGroup ctx, cancel := context.WithCancel(context.Background()) // blockCommandStarted will be closed after we're sure that vtworker is // running the "Block" command. blockCommandStarted := make(chan struct{}) - var blockErr error + var errorCodeCheck error wg.Add(1) go func() { stream, err := client.ExecuteVtworkerCommand(ctx, []string{"Block"}) @@ -121,8 +126,10 @@ func commandErrorsBecauseBusy(t *testing.T, client vtworkerclient.Client) { firstLineReceived := false for { if _, err := stream.Recv(); err != nil { + // We see CANCELED from the RPC client (client side cancelation) or + // from vtworker itself (server side cancelation). if vterrors.RecoverVtErrorCode(err) != vtrpcpb.ErrorCode_CANCELLED { - blockErr = fmt.Errorf("Block command should only error due to canceled context: %v", err) + errorCodeCheck = fmt.Errorf("Block command should only error due to canceled context: %v", err) } // Stream has finished. break @@ -148,10 +155,18 @@ func commandErrorsBecauseBusy(t *testing.T, client vtworkerclient.Client) { } // Cancel running "Block" command. + if serverSideCancelation { + if err := runVtworkerCommand(client, []string{"Cancel"}); err != nil { + t.Fatal(err) + } + } + // Always cancel the context to not leak it (regardless of client or server + // side cancelation). cancel() + wg.Wait() - if blockErr != nil { - t.Fatalf("Block command should not have failed: %v", blockErr) + if errorCodeCheck != nil { + t.Fatalf("Block command did not return the CANCELED error code: %v", errorCodeCheck) } // vtworker is now in a special state where the current job is already From aac0e3b4f95908f361f3875c46c4f7c2411e5571 Mon Sep 17 00:00:00 2001 From: Michael Berlin Date: Thu, 15 Sep 2016 22:04:48 -0700 Subject: [PATCH 5/5] worker: Fix a data race. The problem was that we returned the full object and therefore all fields were read, including the unguarded ones. --- go/vt/worker/instance.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/vt/worker/instance.go b/go/vt/worker/instance.go index f905e1edf63..91a74879d42 100644 --- a/go/vt/worker/instance.go +++ b/go/vt/worker/instance.go @@ -76,7 +76,7 @@ func (wi *Instance) setAndStartWorker(ctx context.Context, wrk Worker, wr *wrang if wi.currentContext != nil { return nil, vterrors.FromError(vtrpcpb.ErrorCode_TRANSIENT_ERROR, - fmt.Errorf("A worker job is already in progress: %v", wi.currentWorker)) + fmt.Errorf("A worker job is already in progress: %v", wi.currentWorker.StatusAsText())) } if wi.currentWorker != nil {