Skip to content

Commit

Permalink
submit-job to servermaster directly. (pingcap#154)
Browse files Browse the repository at this point in the history
  • Loading branch information
hanfei1991 authored Feb 15, 2022
1 parent bdd273b commit 0f8ca41
Show file tree
Hide file tree
Showing 16 changed files with 113 additions and 221 deletions.
5 changes: 0 additions & 5 deletions cmd/executor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,6 @@ import (
"syscall"

"github.com/hanfei1991/microcosm/executor"
_ "github.com/hanfei1991/microcosm/executor/cvsTask"
_ "github.com/hanfei1991/microcosm/jobmaster/cvsJob"
"github.com/hanfei1991/microcosm/lib/registry"
"github.com/pingcap/errors"
"github.com/pingcap/tiflow/dm/pkg/log"
"go.uber.org/zap"
Expand Down Expand Up @@ -60,8 +57,6 @@ func main() {
}
}()

registry.LoadFake(registry.GlobalWorkerRegistry())

// 4. run executor server
server := executor.NewServer(cfg, nil)
if err != nil {
Expand Down
86 changes: 1 addition & 85 deletions cmd/master-client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,105 +2,21 @@ package main

import (
"context"
"encoding/json"
"flag"
"fmt"
"os"
"strconv"

"github.com/hanfei1991/microcosm/client"
"github.com/hanfei1991/microcosm/ctl"
"github.com/hanfei1991/microcosm/jobmaster/benchmark"
"github.com/hanfei1991/microcosm/pb"
"github.com/pingcap/tiflow/dm/pkg/log"
"github.com/pkg/errors"
)

func main() {
cmd := os.Args[1]
ctx := context.Background()
addr := ""
err := log.InitLogger(&log.Config{
Level: "info",
})
if err != nil {
fmt.Printf("err: %v", err)
os.Exit(1)
}
switch cmd {
case "submit-job", "cancel-job":
flag1 := os.Args[2]
if flag1 != "--master-addr" {
fmt.Printf("no master address found")
os.Exit(1)
}
addr = os.Args[3]
case "run-fake":
ctl.MainStart(ctx, os.Args[1:])
os.Exit(0)
default:
fmt.Printf("submit-job --config configFile")
os.Exit(0)
}
clt, err := client.NewMasterClient(ctx, []string{addr})
if err != nil {
fmt.Printf("err: %v", err)
os.Exit(1)
}

if cmd == "submit-job" {
args := os.Args[4:]
cfg := benchmark.NewConfig()
err = cfg.Parse(args)
switch errors.Cause(err) {
case nil:
case flag.ErrHelp:
os.Exit(0)
default:
fmt.Printf("err1: %v", err)
os.Exit(2)
}

configJSON, err := json.Marshal(cfg)
if err != nil {
fmt.Printf("err2: %v", err)
}

req := &pb.SubmitJobRequest{
Tp: pb.JobType_Benchmark,
Config: configJSON,
User: "hanfei",
}
resp, err := clt.SubmitJob(context.Background(), req)
if err != nil {
fmt.Printf("err: %v", err)
return
}
if resp.Err != nil {
fmt.Printf("err: %v", resp.Err.Message)
return
}
fmt.Printf("submit job successful JobID:%d JobIDStr:%s\n", resp.JobId, resp.JobIdStr)
}
if cmd == "cancel-job" {
flag1 := os.Args[4]
jobID, err := strconv.ParseInt(flag1, 10, 32)
if err != nil {
fmt.Print(err.Error())
os.Exit(1)
}
req := &pb.CancelJobRequest{
JobId: int32(jobID),
}
resp, err := clt.CancelJob(context.Background(), req)
if err != nil {
fmt.Printf("err: %v", err)
return
}
if resp.Err != nil {
fmt.Printf("err: %v", resp.Err.Message)
return
}
fmt.Print("cancel job successful")
}
ctl.MainStart(ctx, os.Args[1:])
}
35 changes: 6 additions & 29 deletions ctl/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,6 @@ import (
"io/ioutil"
"os"

"github.com/google/uuid"
"github.com/hanfei1991/microcosm/client"
"github.com/hanfei1991/microcosm/lib"
"github.com/hanfei1991/microcosm/model"
"github.com/hanfei1991/microcosm/pb"
"github.com/pingcap/tiflow/dm/pkg/log"
"github.com/spf13/cobra"
Expand All @@ -18,7 +14,7 @@ import (

func NewRunFake() *cobra.Command {
cmd := &cobra.Command{
Use: "run-fake [--executor-addr addr] [--executor-id id]",
Use: "submit-job",
Short: "Run a fake workload to a specific executor",
RunE: runFakeFunc,
}
Expand All @@ -38,16 +34,6 @@ func openFileAndReadString(path string) (content []byte, err error) {
}

func runFakeFunc(cmd *cobra.Command, _ []string) error {
execAddr, err := cmd.Flags().GetString("executor-addr")
if err != nil {
fmt.Print("error in parse `--executor-addr`")
return err
}
execID, err := cmd.Flags().GetString("executor-id")
if err != nil {
fmt.Print("error in parse `--executor-id`")
return err
}
path, err := cmd.Flags().GetString("job-config")
if err != nil {
fmt.Print("error in parse `--job-config`")
Expand All @@ -58,25 +44,16 @@ func runFakeFunc(cmd *cobra.Command, _ []string) error {
fmt.Print("error in parse job-config")
return err
}
err = cltManager.AddExecutor(model.ExecutorID(execID), execAddr)
if err != nil {
return err
}
ctx, cancel := context.WithTimeout(context.Background(), rpcTimeout)
defer cancel()

log.L().Info("sending request to executor", zap.String("address", execAddr))
resp, err := cltManager.ExecutorClient(model.ExecutorID(execID)).Send(ctx, &client.ExecutorRequest{
Cmd: client.CmdDispatchTask,
Req: &pb.DispatchTaskRequest{
TaskTypeId: int64(lib.CvsJobMaster),
TaskConfig: jobConfig,
MasterId: uuid.New().String(), // use a unique ID to force Init the master each time,
WorkerId: uuid.New().String(),
},
resp, err := cltManager.MasterClient().SubmitJob(ctx, &pb.SubmitJobRequest{
Tp: pb.JobType_CVSDemo, // TODO: Support different job types.
Config: jobConfig,
User: "hanfei",
})
if err != nil {
log.L().Error("failed to dispatch master", zap.Error(err))
log.L().Error("failed to submit job", zap.Error(err))
os.Exit(1)
}
log.L().Info("resp", zap.Any("resp", resp))
Expand Down
2 changes: 1 addition & 1 deletion executor/cvsTask/cvstask.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type cvsTask struct {
isEOF bool
}

func init() {
func RegisterWorker() {
constructor := func(ctx *dcontext.Context, id lib.WorkerID, masterID lib.MasterID, config lib.WorkerConfig) lib.Worker {
return NewCvsTask(ctx, id, masterID, config)
}
Expand Down
11 changes: 4 additions & 7 deletions executor/init.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
package executor

import (
"github.com/hanfei1991/microcosm/executor/runtime"
"github.com/hanfei1991/microcosm/executor/runtime/benchmark"
"github.com/hanfei1991/microcosm/executor/runtime/jobmaster"
cvstask "github.com/hanfei1991/microcosm/executor/cvsTask"
cvs "github.com/hanfei1991/microcosm/jobmaster/cvsJob"
)

func init() {
// register operator builder for runtime
runtime.InitOpBuilders()
benchmark.RegisterBuilder()
jobmaster.RegisterBuilder()
cvstask.RegisterWorker()
cvs.RegisterWorker()
}
10 changes: 5 additions & 5 deletions jobmaster/cvsJob/cvsJobMaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type JobMaster struct {
workerID lib.WorkerID
}

func init() {
func RegisterWorker() {
constructor := func(ctx *dcontext.Context, id lib.WorkerID, masterID lib.MasterID, config lib.WorkerConfig) lib.Worker {
return NewCVSJobMaster(ctx, id, masterID, config)
}
Expand Down Expand Up @@ -113,10 +113,10 @@ func (jm *JobMaster) Tick(ctx context.Context) error {
}
status := worker.handle.Status()
if status.Code == lib.WorkerStatusNormal {
num := status.Ext.(int64)
worker.curLoc = num
jm.counter += num
log.L().Info("cvs job tmp num ", zap.Any("id :", worker.handle.ID()), zap.Int64("counter: ", num))
num := status.Ext.(*int64)
worker.curLoc = *num
jm.counter += *num
log.L().Debug("cvs job tmp num ", zap.Any("id :", worker.handle.ID()), zap.Int64("counter: ", *num))
// todo : store the sync progress into the meta store for each file
} else if status.Code == lib.WorkerStatusFinished {
// todo : handle error case here
Expand Down
2 changes: 2 additions & 0 deletions lib/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,11 @@ const (

const (
JobManager = WorkerType(iota + 1)
// job master
CvsJobMaster
DmJobMaster
CdcJobMaster
// task
CvsTask
DmTask
CdcTask
Expand Down
3 changes: 2 additions & 1 deletion lib/registry/load_fake.go → lib/registry/register_fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ const (

type FakeConfig struct{}

func LoadFake(registry Registry) {
// only for test.
func RegisterFake(registry Registry) {
fakeMasterFactory := NewSimpleWorkerFactory(func(ctx *dcontext.Context, id lib.WorkerID, masterID lib.MasterID, config WorkerConfig) lib.Worker {
return fake.NewFakeMaster(ctx, id, masterID, config)
}, &FakeConfig{})
Expand Down
1 change: 1 addition & 0 deletions lib/registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func (r *registryImpl) MustRegisterWorkerType(tp lib.WorkerType, factory WorkerF
if ok := r.RegisterWorkerType(tp, factory); !ok {
log.L().Panic("duplicate worker type", zap.Int64("worker-type", int64(tp)))
}
log.L().Info("register worker", zap.Int64("worker-type", int64(tp)))
}

func (r *registryImpl) RegisterWorkerType(tp lib.WorkerType, factory WorkerFactory) (ok bool) {
Expand Down
2 changes: 1 addition & 1 deletion lib/registry/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,6 @@ func TestRegistryWorkerTypeNotFound(t *testing.T) {
func TestLoadFake(t *testing.T) {
registry := NewRegistry()
require.NotPanics(t, func() {
LoadFake(registry)
RegisterFake(registry)
})
}
Loading

0 comments on commit 0f8ca41

Please sign in to comment.