Skip to content

Commit

Permalink
Merge pull request vitessio#2062 from michael-berlin/vtworker_ctx_in_…
Browse files Browse the repository at this point in the history
…server

worker: Pass the Context from the RPC server call to the executed command.
  • Loading branch information
michael-berlin authored Sep 16, 2016
2 parents 4edf1b2 + aac0e3b commit d1956ff
Show file tree
Hide file tree
Showing 17 changed files with 113 additions and 57 deletions.
4 changes: 2 additions & 2 deletions go/cmd/vtworker/vtworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func main() {
ts := topo.GetServer()
defer topo.CloseServers()

wi = worker.NewInstance(context.Background(), ts, *cell, *commandDisplayInterval)
wi = worker.NewInstance(ts, *cell, *commandDisplayInterval)
wi.InstallSignalHandlers()
wi.InitStatusHandling()

Expand All @@ -70,7 +70,7 @@ func main() {
wi.InitInteractiveMode()
} else {
// In single command mode, just run it.
worker, done, err := wi.RunCommand(args, nil /*custom wrangler*/, true /*runFromCli*/)
worker, done, err := wi.RunCommand(context.Background(), args, nil /*custom wrangler*/, true /*runFromCli*/)
if err != nil {
log.Error(err)
exit.Return(1)
Expand Down
9 changes: 5 additions & 4 deletions go/vt/worker/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@
package worker

import (
"errors"
"html/template"

"golang.org/x/net/context"

"github.com/youtube/vitess/go/vt/wrangler"
)

// BlockWorker will block infinitely until its context is cancelled.
// BlockWorker will block infinitely until its context is canceled.
type BlockWorker struct {
StatusWorker

Expand All @@ -32,7 +33,7 @@ func NewBlockWorker(wr *wrangler.Wrangler) (Worker, error) {
func (bw *BlockWorker) StatusAsHTML() template.HTML {
state := bw.State()

result := "<b>Block Command</b> (blocking infinitely until context is cancelled)</br>\n"
result := "<b>Block Command</b> (blocking infinitely until context is canceled)</br>\n"
result += "<b>State:</b> " + state.String() + "</br>\n"
switch state {
case WorkerStateDebugRunning:
Expand Down Expand Up @@ -76,12 +77,12 @@ func (bw *BlockWorker) Run(ctx context.Context) error {
func (bw *BlockWorker) run(ctx context.Context) error {
// We reuse the Copy state to reflect that the blocking is in progress.
bw.SetState(WorkerStateDebugRunning)
bw.wr.Logger().Printf("Block command was called and will block infinitely until the RPC context is cancelled.\n")
bw.wr.Logger().Printf("Block command was called and will block infinitely until the RPC context is canceled.\n")
select {
case <-ctx.Done():
}
bw.wr.Logger().Printf("Block command finished because the context is done: '%v'.\n", ctx.Err())
bw.SetState(WorkerStateDone)

return nil
return errors.New("command 'Block' was canceled")
}
4 changes: 2 additions & 2 deletions go/vt/worker/block_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ const blockHTML = `
<b>Error:</b> {{.Error}}</br>
{{else}}
<form action="/Debugging/Block" method="post">
<INPUT type="submit" name="submit" value="Block (until cancelled)"/>
<INPUT type="submit" name="submit" value="Block (until canceled)"/>
</form>
{{end}}
</body>
Expand Down Expand Up @@ -71,5 +71,5 @@ func init() {
AddCommand("Debugging", Command{"Block",
commandBlock, interactiveBlock,
"<message>",
"For internal tests only. When triggered, the command will block until cancelled."})
"For internal tests only. When triggered, the command will block until canceled."})
}
10 changes: 5 additions & 5 deletions go/vt/worker/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@ import (
"strings"
"time"

log "github.com/golang/glog"
"golang.org/x/net/context"

log "github.com/golang/glog"

"github.com/youtube/vitess/go/vt/logutil"
"github.com/youtube/vitess/go/vt/vterrors"
"github.com/youtube/vitess/go/vt/wrangler"
Expand Down Expand Up @@ -105,12 +106,11 @@ func commandWorker(wi *Instance, wr *wrangler.Wrangler, args []string, cell stri
// If wr is nil, the default wrangler will be used.
// If you pass a wr wrangler, note that a MemoryLogger will be added to its current logger.
// The returned worker and done channel may be nil if no worker was started e.g. in case of a "Reset".
func (wi *Instance) RunCommand(args []string, wr *wrangler.Wrangler, runFromCli bool) (Worker, chan struct{}, error) {
func (wi *Instance) RunCommand(ctx context.Context, args []string, wr *wrangler.Wrangler, runFromCli bool) (Worker, chan struct{}, error) {
if len(args) >= 1 {
switch args[0] {
case "Reset":
err := wi.Reset()
return nil, nil, err
return nil, nil, wi.Reset()
case "Cancel":
wi.Cancel()
return nil, nil, nil
Expand All @@ -124,7 +124,7 @@ func (wi *Instance) RunCommand(args []string, wr *wrangler.Wrangler, runFromCli
if err != nil {
return nil, nil, err
}
done, err := wi.setAndStartWorker(wrk, wr)
done, err := wi.setAndStartWorker(ctx, wrk, wr)
if err != nil {
return nil, nil, vterrors.WithPrefix("cannot set worker: ", err)
}
Expand Down
8 changes: 5 additions & 3 deletions go/vt/worker/grpcvtworkerclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,16 @@ import (
"flag"
"time"

"golang.org/x/net/context"
"google.golang.org/grpc"

"github.com/youtube/vitess/go/vt/logutil"
"github.com/youtube/vitess/go/vt/servenv/grpcutils"
"github.com/youtube/vitess/go/vt/vterrors"
"github.com/youtube/vitess/go/vt/worker/vtworkerclient"
"golang.org/x/net/context"
"google.golang.org/grpc"

logutilpb "github.com/youtube/vitess/go/vt/proto/logutil"
vtrpcpb "github.com/youtube/vitess/go/vt/proto/vtrpc"
vtworkerdatapb "github.com/youtube/vitess/go/vt/proto/vtworkerdata"
vtworkerservicepb "github.com/youtube/vitess/go/vt/proto/vtworkerservice"
)
Expand All @@ -41,7 +43,7 @@ func gRPCVtworkerClientFactory(addr string, dialTimeout time.Duration) (vtworker
}
cc, err := grpc.Dial(addr, opt, grpc.WithBlock(), grpc.WithTimeout(dialTimeout))
if err != nil {
return nil, err
return nil, vterrors.NewVitessError(vtrpcpb.ErrorCode_DEADLINE_EXCEEDED, err, "grpc.Dial() err: %v", err)
}
c := vtworkerservicepb.NewVtworkerClient(cc)

Expand Down
8 changes: 2 additions & 6 deletions go/vt/worker/grpcvtworkerserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,6 @@ func (s *VtworkerServer) ExecuteVtworkerCommand(args *vtworkerdatapb.ExecuteVtwo

// Stream everything back what the Wrangler is logging.
logstream := logutil.NewCallbackLogger(func(e *logutilpb.Event) {
// If the client disconnects, we will just fail
// to send the log events, but won't interrupt
// the command.
stream.Send(&vtworkerdatapb.ExecuteVtworkerCommandResponse{
Event: e,
})
Expand All @@ -52,11 +49,10 @@ func (s *VtworkerServer) ExecuteVtworkerCommand(args *vtworkerdatapb.ExecuteVtwo
// is preserved in the logs in case the RPC or vtworker crashes.
logger := logutil.NewTeeLogger(logstream, logutil.NewConsoleLogger())

// create the wrangler
wr := s.wi.CreateWrangler(logger)

// execute the command
worker, done, err := s.wi.RunCommand(args.Args, wr, false /*runFromCli*/)
// Run the command as long as the RPC Context is valid.
worker, done, err := s.wi.RunCommand(stream.Context(), args.Args, wr, false /*runFromCli*/)
if err == nil && worker != nil && done != nil {
err = s.wi.WaitForCommand(worker, done)
}
Expand Down
26 changes: 18 additions & 8 deletions go/vt/worker/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,17 @@ import (
"syscall"
"time"

"golang.org/x/net/context"

log "github.com/golang/glog"

"github.com/youtube/vitess/go/tb"
"github.com/youtube/vitess/go/vt/logutil"
vtrpcpb "github.com/youtube/vitess/go/vt/proto/vtrpc"
"github.com/youtube/vitess/go/vt/tabletmanager/tmclient"
"github.com/youtube/vitess/go/vt/topo"
"github.com/youtube/vitess/go/vt/vterrors"
"github.com/youtube/vitess/go/vt/wrangler"
"golang.org/x/net/context"
)

// Instance encapsulate the execution state of vtworker.
Expand All @@ -47,16 +49,14 @@ type Instance struct {
lastRunError error
lastRunStopTime time.Time

// backgroundContext is context.Background() from main() which has to be plumbed through.
backgroundContext context.Context
topoServer topo.Server
cell string
commandDisplayInterval time.Duration
}

// NewInstance creates a new Instance.
func NewInstance(ctx context.Context, ts topo.Server, cell string, commandDisplayInterval time.Duration) *Instance {
wi := &Instance{backgroundContext: ctx, topoServer: ts, cell: cell, commandDisplayInterval: commandDisplayInterval}
func NewInstance(ts topo.Server, cell string, commandDisplayInterval time.Duration) *Instance {
wi := &Instance{topoServer: ts, cell: cell, commandDisplayInterval: commandDisplayInterval}
// Note: setAndStartWorker() also adds a MemoryLogger for the webserver.
wi.wr = wi.CreateWrangler(logutil.NewConsoleLogger())
return wi
Expand All @@ -70,13 +70,13 @@ func (wi *Instance) CreateWrangler(logger logutil.Logger) *wrangler.Wrangler {
// setAndStartWorker will set the current worker.
// We always log to both memory logger (for display on the web) and
// console logger (for records / display of command line worker).
func (wi *Instance) setAndStartWorker(wrk Worker, wr *wrangler.Wrangler) (chan struct{}, error) {
func (wi *Instance) setAndStartWorker(ctx context.Context, wrk Worker, wr *wrangler.Wrangler) (chan struct{}, error) {
wi.currentWorkerMutex.Lock()
defer wi.currentWorkerMutex.Unlock()

if wi.currentContext != nil {
return nil, vterrors.FromError(vtrpcpb.ErrorCode_TRANSIENT_ERROR,
fmt.Errorf("A worker job is already in progress: %v", wi.currentWorker))
fmt.Errorf("A worker job is already in progress: %v", wi.currentWorker.StatusAsText()))
}

if wi.currentWorker != nil {
Expand All @@ -99,7 +99,7 @@ func (wi *Instance) setAndStartWorker(wrk Worker, wr *wrangler.Wrangler) (chan s

wi.currentWorker = wrk
wi.currentMemoryLogger = logutil.NewMemoryLogger()
wi.currentContext, wi.currentCancelFunc = context.WithCancel(wi.backgroundContext)
wi.currentContext, wi.currentCancelFunc = context.WithCancel(ctx)
wi.lastRunError = nil
wi.lastRunStopTime = time.Unix(0, 0)
done := make(chan struct{})
Expand Down Expand Up @@ -135,6 +135,16 @@ func (wi *Instance) setAndStartWorker(wrk Worker, wr *wrangler.Wrangler) (chan s

// run will take a long time
err = wrk.Run(wi.currentContext)

// If the context was canceled, include the respective error code.
select {
case <-wi.currentContext.Done():
// Context is done i.e. probably canceled.
if wi.currentContext.Err() == context.Canceled {
err = vterrors.NewVitessError(vtrpcpb.ErrorCode_CANCELLED, err, "vtworker command was canceled: %v", err)
}
default:
}
}()

return done, nil
Expand Down
7 changes: 5 additions & 2 deletions go/vt/worker/interactive.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ import (
"html/template"
"net/http"

"golang.org/x/net/context"

log "github.com/golang/glog"

"github.com/youtube/vitess/go/acl"
"github.com/youtube/vitess/go/vt/servenv"
)
Expand Down Expand Up @@ -100,7 +103,7 @@ func (wi *Instance) InitInteractiveMode() {
return
}

wrk, template, data, err := pc.Interactive(wi.backgroundContext, wi, wi.wr, w, r)
wrk, template, data, err := pc.Interactive(context.Background(), wi, wi.wr, w, r)
if err != nil {
httpError(w, "%s", err)
} else if template != nil && data != nil {
Expand All @@ -113,7 +116,7 @@ func (wi *Instance) InitInteractiveMode() {
return
}

if _, err := wi.setAndStartWorker(wrk, wi.wr); err != nil {
if _, err := wi.setAndStartWorker(context.Background(), wrk, wi.wr); err != nil {
httpError(w, "Could not set %s worker: %s", c.Name, err)
return
}
Expand Down
2 changes: 1 addition & 1 deletion go/vt/worker/legacy_split_clone_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (tc *legacySplitCloneTestCase) setUp(v3 bool) {
db := fakesqldb.Register()
tc.ts = zktestserver.New(tc.t, []string{"cell1", "cell2"})
ctx := context.Background()
tc.wi = NewInstance(ctx, tc.ts, "cell1", time.Second)
tc.wi = NewInstance(tc.ts, "cell1", time.Second)

if v3 {
if err := tc.ts.CreateKeyspace(ctx, "ks", &topodatapb.Keyspace{}); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion go/vt/worker/split_clone_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (tc *splitCloneTestCase) setUpWithConcurreny(v3 bool, concurrency, writeQue
db := fakesqldb.Register()
tc.ts = zktestserver.New(tc.t, []string{"cell1", "cell2"})
ctx := context.Background()
tc.wi = NewInstance(ctx, tc.ts, "cell1", time.Second)
tc.wi = NewInstance(tc.ts, "cell1", time.Second)

if v3 {
if err := tc.ts.CreateKeyspace(ctx, "ks", &topodatapb.Keyspace{}); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion go/vt/worker/split_diff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func testSplitDiff(t *testing.T, v3 bool) {
db := fakesqldb.Register()
ts := zktestserver.New(t, []string{"cell1", "cell2"})
ctx := context.Background()
wi := NewInstance(ctx, ts, "cell1", time.Second)
wi := NewInstance(ts, "cell1", time.Second)

if v3 {
if err := ts.CreateKeyspace(ctx, "ks", &topodatapb.Keyspace{}); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion go/vt/worker/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (wi *Instance) InitStatusHandling() {
}

if wi.Cancel() {
// We cancelled the running worker. Go back to the status page.
// We canceled the running worker. Go back to the status page.
http.Redirect(w, r, servenv.StatusURLPath(), http.StatusTemporaryRedirect)
} else {
// No worker, or not running, we go to the menu.
Expand Down
2 changes: 1 addition & 1 deletion go/vt/worker/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
// This file contains common test helper.

func runCommand(t *testing.T, wi *Instance, wr *wrangler.Wrangler, args []string) error {
worker, done, err := wi.RunCommand(args, wr, false /* runFromCli */)
worker, done, err := wi.RunCommand(context.Background(), args, wr, false /* runFromCli */)
if err != nil {
return fmt.Errorf("Worker creation failed: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion go/vt/worker/vertical_split_clone_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func TestVerticalSplitClone(t *testing.T) {
db := fakesqldb.Register()
ts := zktestserver.New(t, []string{"cell1", "cell2"})
ctx := context.Background()
wi := NewInstance(ctx, ts, "cell1", time.Second)
wi := NewInstance(ts, "cell1", time.Second)

sourceMaster := testlib.NewFakeTablet(t, wi.wr, "cell1", 0,
topodatapb.TabletType_MASTER, db, testlib.TabletKeyspaceShard(t, "source_ks", "0"))
Expand Down
2 changes: 1 addition & 1 deletion go/vt/worker/vertical_split_diff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func TestVerticalSplitDiff(t *testing.T) {
db := fakesqldb.Register()
ts := zktestserver.New(t, []string{"cell1", "cell2"})
ctx := context.Background()
wi := NewInstance(ctx, ts, "cell1", time.Second)
wi := NewInstance(ts, "cell1", time.Second)

sourceMaster := testlib.NewFakeTablet(t, wi.wr, "cell1", 0,
topodatapb.TabletType_MASTER, db, testlib.TabletKeyspaceShard(t, "source_ks", "0"))
Expand Down
Loading

0 comments on commit d1956ff

Please sign in to comment.