Skip to content
This repository has been archived by the owner on Sep 21, 2022. It is now read-only.

Commit

Permalink
First phase of vtworker: commandline framework.
Browse files Browse the repository at this point in the history
  • Loading branch information
alainjobart committed Dec 13, 2013
1 parent 62fd087 commit b74c92f
Show file tree
Hide file tree
Showing 7 changed files with 409 additions and 0 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ go/cmd/vtctl/vtctl
go/cmd/vtctld/vtctld
go/cmd/vtocc/vtocc
go/cmd/vttablet/vttablet
go/cmd/vtworker/vtworker
go/cmd/zk/zk
go/cmd/zkclient2/zkclient2
go/cmd/zkctl/zkctl
Expand Down
113 changes: 113 additions & 0 deletions go/cmd/vtworker/command.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// Copyright 2013, Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package main

import (
"flag"
"fmt"
"os"
"strings"
"time"

log "github.com/golang/glog"
"github.com/youtube/vitess/go/vt/worker"
"github.com/youtube/vitess/go/vt/wrangler"
)

type command struct {
name string
method func(wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) worker.Worker
params string
help string // if help is empty, won't list the command
}

type commandGroup struct {
name string
commands []command
}

var commands = []commandGroup{
commandGroup{
"Diffs", []command{
command{"SplitDiff", commandSplitDiff,
"<keyspace/shard|zk shard path>",
"Diffs a rdonly destination shard against its SourceShards"},
},
},
}

func shardParamToKeyspaceShard(param string) (string, string) {
if param[0] == '/' {
// old zookeeper path, convert to new-style
zkPathParts := strings.Split(param, "/")
if len(zkPathParts) != 8 || zkPathParts[0] != "" || zkPathParts[1] != "zk" || zkPathParts[2] != "global" || zkPathParts[3] != "vt" || zkPathParts[4] != "keyspaces" || zkPathParts[6] != "shards" {
log.Fatalf("Invalid shard path: %v", param)
}
return zkPathParts[5], zkPathParts[7]
}
zkPathParts := strings.Split(param, "/")
if len(zkPathParts) != 2 {
log.Fatalf("Invalid shard path: %v", param)
}
return zkPathParts[0], zkPathParts[1]
}

func commandSplitDiff(wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) worker.Worker {
subFlags.Parse(args)
if subFlags.NArg() != 1 {
log.Fatalf("command SplitDiff requires <keyspace/shard|zk shard path>")
}
keyspace, shard := shardParamToKeyspaceShard(subFlags.Arg(0))
return worker.NewSplitDiffWorker(wr, keyspace, shard)
}

func commandWorker(wr *wrangler.Wrangler, args []string) worker.Worker {
action := args[0]

actionLowerCase := strings.ToLower(action)
for _, group := range commands {
for _, cmd := range group.commands {
if strings.ToLower(cmd.name) == actionLowerCase {
subFlags := flag.NewFlagSet(action, flag.ExitOnError)
subFlags.Usage = func() {
fmt.Fprintf(os.Stderr, "Usage: %s %s %s\n\n", os.Args[0], cmd.name, cmd.params)
fmt.Fprintf(os.Stderr, "%s\n\n", cmd.help)
subFlags.PrintDefaults()
}
return cmd.method(wr, subFlags, args[1:])
}
}
}
flag.Usage()
log.Fatalf("Unknown command %#v\n\n", action)
return nil
}

func runCommand(wr *wrangler.Wrangler, args []string) {
wrk = commandWorker(wr, args)

done := make(chan struct{})

// one go function runs the worker, closes 'done' when done
go func() {
log.Infof("Starting worker...")
wrk.Run()
close(done)
}()

// one go function displays the status every second
go func() {
timer := time.Tick(time.Second)
for {
select {
case <-done:
log.Infof("Command is done:")
log.Info(wrk.StatusAsText())
os.Exit(0)
case <-timer:
log.Info(wrk.StatusAsText())
}
}
}()
}
43 changes: 43 additions & 0 deletions go/cmd/vtworker/interactive.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright 2013, Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package main

import (
"fmt"
"html/template"
"net/http"

log "github.com/golang/glog"
)

const indexHTML = `
<!DOCTYPE html>
<head>
<title>Worker Action Index</title>
</head>
<body>
<h1>Worker Action Index</h1>
<li><a href="/diffs">Diffs</a>: shows a list of all the possible diffs to run.</li>
</body>
`

func httpError(w http.ResponseWriter, format string, err error) {
log.Errorf(format, err)
http.Error(w, fmt.Sprintf(format, err), http.StatusInternalServerError)
}

func initInteractiveMode() {
indexTemplate, err := template.New("index").Parse(indexHTML)
if err != nil {
log.Fatalf("Cannot parse index template: %v", err)
}

http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
if err := indexTemplate.Execute(w, nil); err != nil {
httpError(w, "error executing template", err)
}
})
}
72 changes: 72 additions & 0 deletions go/cmd/vtworker/vtworker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright 2013, Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

/*
vtworker is the main program to run a worker job.
It has two modes: single command or interactive.
- in single command, it will start the job passed in from the command line,
and exit.
- in interactive mode, use a web browser to start an action.
*/
package main

import (
"flag"
"os"
"os/signal"
"syscall"
"time"

"github.com/youtube/vitess/go/vt/servenv"
tm "github.com/youtube/vitess/go/vt/tabletmanager"
"github.com/youtube/vitess/go/vt/topo"
"github.com/youtube/vitess/go/vt/worker"
"github.com/youtube/vitess/go/vt/wrangler"
)

var (
port = flag.Int("port", 8080, "port for the status / interactive mode")
)

// signal handling, centralized here
func installSignalHandlers() {
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGTERM, syscall.SIGINT)
go func() {
<-sigChan
// we got a signal, notify our modules:
// - tm will interrupt anything waiting on a tablet action
// - wr will interrupt anything waiting on a shard or
// keyspace lock
tm.SignalInterrupt()
wrangler.SignalInterrupt()
worker.SignalInterrupt()
}()
}

var wrk worker.Worker

func main() {
flag.Parse()
args := flag.Args()

installSignalHandlers()

servenv.Init()
defer servenv.Close()

ts := topo.GetServer()
defer topo.CloseServers()

wr := wrangler.New(ts, 30*time.Second, 30*time.Second)
if len(args) == 0 {
// interactive mode, initialize the web UI to chose a command
initInteractiveMode()
} else {
runCommand(wr, args)
}

servenv.Run(*port)
}
145 changes: 145 additions & 0 deletions go/vt/worker/split_diff.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
// Copyright 2013, Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

/*
'worker' package contains the framework, utility methods and core
functions for long running actions. 'vtworker' binary will use these.
*/
package worker

import (
"fmt"
"sync"
"time"

"github.com/youtube/vitess/go/vt/topo"
"github.com/youtube/vitess/go/vt/wrangler"
)

const (
// all the states for the worker
stateNotSarted = "not started"
stateInit = "initializing"
stateRunning = "running"
stateDone = "done"
stateError = "error"
)

// SplitDiffWorker executes a diff between a destination shard and its
// source shards in a shard split case.
type SplitDiffWorker struct {
wr *wrangler.Wrangler
keyspace string
shard string

// protected by the mutex
mu sync.Mutex
state string

// populated if state == stateError
err error

// populated during stateInit
shardInfo *topo.ShardInfo
}

// NewSplitDiff returns a new SplitDiffWorker object.
func NewSplitDiffWorker(wr *wrangler.Wrangler, keyspace, shard string) Worker {
return &SplitDiffWorker{
wr: wr,
keyspace: keyspace,
shard: shard,

state: stateNotSarted,
}
}

func (sdw *SplitDiffWorker) setState(state string) {
sdw.mu.Lock()
sdw.state = state
sdw.mu.Unlock()
}

func (sdw *SplitDiffWorker) recordError(err error) {
sdw.mu.Lock()
sdw.state = stateError
sdw.err = err
sdw.mu.Unlock()
}

func (sdw *SplitDiffWorker) StatusAsHTML() string {
sdw.mu.Lock()
defer sdw.mu.Unlock()
result := "<b>Working on:</b> " + sdw.keyspace + "/" + sdw.shard + "<br></br>\n"
result += "<b>State:</b> " + sdw.state + "<br></br>\n"
switch sdw.state {
case stateError:
result += "<b>Error</b>: " + sdw.err.Error() + "<br></br>\n"
}
return result
}

func (sdw *SplitDiffWorker) StatusAsText() string {
sdw.mu.Lock()
defer sdw.mu.Unlock()
result := "Working on: " + sdw.keyspace + "/" + sdw.shard + "\n"
result += "State: " + sdw.state + "\n"
switch sdw.state {
case stateError:
result += "Error: " + sdw.err.Error() + "\n"
}
return result
}

func (sdw *SplitDiffWorker) CheckInterrupted() bool {
select {
case <-interrupted:
sdw.recordError(topo.ErrInterrupted)
return true
default:
}
return false
}

func (sdw *SplitDiffWorker) Run() {
// first state: read what we need to do
if err := sdw.Init(); err != nil {
sdw.recordError(err)
return
}

if sdw.CheckInterrupted() {
return
}

// second state: dummy sleep
sdw.setState(stateRunning)
for i := 0; i < 10; i++ {
time.Sleep(time.Second)
if sdw.CheckInterrupted() {
return
}
}

sdw.setState(stateDone)
}

func (sdw *SplitDiffWorker) Init() error {
sdw.setState(stateInit)

shardInfo, err := sdw.wr.TopoServer().GetShard(sdw.keyspace, sdw.shard)
if err != nil {
return fmt.Errorf("Cannot read shard %v/%v: %v", sdw.keyspace, sdw.shard, err)
}

if len(shardInfo.SourceShards) == 0 {
return fmt.Errorf("Shard %v/%v has no source shard", sdw.keyspace, sdw.shard)
}

sdw.mu.Lock()
sdw.shardInfo = shardInfo
sdw.mu.Unlock()

return nil
}
Loading

0 comments on commit b74c92f

Please sign in to comment.