Skip to content

Commit

Permalink
job-master: remove heartbeat in base master, use job master instead (p…
Browse files Browse the repository at this point in the history
  • Loading branch information
amyangfei authored Feb 9, 2022
1 parent 3f10e7b commit bdd71c1
Show file tree
Hide file tree
Showing 8 changed files with 41 additions and 135 deletions.
25 changes: 18 additions & 7 deletions jobmaster/cvsJob/cvsJobMaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (e *errorInfo) Error() string {
}

type JobMaster struct {
lib.BaseMaster
lib.BaseJobMaster
syncInfo *Config
syncFilesInfo map[lib.WorkerID]*workerInfo
counter int64
Expand All @@ -54,23 +54,26 @@ func init() {
registry.GlobalWorkerRegistry().MustRegisterWorkerType(lib.CvsJobMaster, factory)
}

func NewCVSJobMaster(ctx *dcontext.Context, _workerID lib.WorkerID, masterID lib.MasterID, conf lib.WorkerConfig) *JobMaster {
func NewCVSJobMaster(ctx *dcontext.Context, workerID lib.WorkerID, masterID lib.MasterID, conf lib.WorkerConfig) *JobMaster {
jm := &JobMaster{}
jm.workerID = _workerID
jm.workerID = workerID
jm.syncInfo = conf.(*Config)
jm.syncFilesInfo = make(map[lib.WorkerID]*workerInfo)
deps := ctx.Dependencies
base := lib.NewBaseMaster(

base := lib.NewBaseJobMaster(
ctx,
jm,
"fake-job-manager",
jm,
masterID,
workerID,
deps.MessageHandlerManager,
deps.MessageRouter,
deps.MetaKVClient,
deps.ExecutorClientManager,
deps.ServerMasterClient)
jm.BaseMaster = base
deps.ServerMasterClient,
)
jm.BaseJobMaster = base
log.L().Info("new cvs jobmaster ", zap.Any("id :", jm.workerID))
return jm
}
Expand Down Expand Up @@ -184,6 +187,14 @@ func (jm *JobMaster) Workload() model.RescUnit {
return 2
}

func (jm *JobMaster) OnMasterFailover(reason lib.MasterFailoverReason) error {
return nil
}

func (jm *JobMaster) Status() lib.WorkerStatus {
return lib.WorkerStatus{Code: lib.WorkerStatusNormal}
}

func (jm *JobMaster) listSrcFiles(ctx context.Context) ([]string, error) {
conn, err := grpc.Dial(jm.syncInfo.SrcHost, grpc.WithInsecure())
if err != nil {
Expand Down
4 changes: 1 addition & 3 deletions jobmaster/example/master_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ import (
)

const (
jobManagerID = "job-manager"

masterID = "master"

executorNodeID = "node-exec"
Expand All @@ -23,7 +21,7 @@ const (

func newExampleMaster() *exampleMaster {
self := &exampleMaster{}
self.DefaultBaseMaster = lib.MockBaseMaster(jobManagerID, masterID, self)
self.DefaultBaseMaster = lib.MockBaseMaster(masterID, self)
return self
}

Expand Down
8 changes: 4 additions & 4 deletions lib/base_jobmaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ func NewBaseJobMaster(
// `masterID` is always the ID of master role, against current object
// `workerID` is the ID of current object
baseMaster := NewBaseMaster(
ctx, masterImpl, masterID, workerID, messageHandlerManager, messageRouter,
metaKVClient, executorClientManager, serverMasterClient)
ctx, masterImpl, workerID, messageHandlerManager,
messageRouter, metaKVClient, executorClientManager, serverMasterClient)
baseWorker := NewBaseWorker(
workerImpl, messageHandlerManager, messageRouter, metaKVClient, workerID,
"" /* masterID, job manager uses empty string */)
workerImpl, messageHandlerManager, messageRouter, metaKVClient,
workerID, masterID)
return &defaultBaseJobMaster{
master: baseMaster,
worker: baseWorker,
Expand Down
21 changes: 16 additions & 5 deletions lib/fake/fake_master.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ const (

type Config struct{}

var _ lib.Master = (*Master)(nil)
var _ lib.BaseJobMaster = (*Master)(nil)

const (
fakeWorkerCount = 20
)

type Master struct {
lib.BaseMaster
lib.BaseJobMaster

// workerID stores the ID of the Master AS A WORKER.
workerID lib.WorkerID
Expand Down Expand Up @@ -140,21 +140,32 @@ func (m *Master) CloseImpl(ctx context.Context) error {
return nil
}

func (m *Master) OnMasterFailover(reason lib.MasterFailoverReason) error {
log.L().Info("FakeMaster: OnMasterFailover", zap.Stack("stack"))
return nil
}

func (m *Master) Status() lib.WorkerStatus {
return lib.WorkerStatus{Code: lib.WorkerStatusNormal}
}

func NewFakeMaster(ctx *dcontext.Context, workerID lib.WorkerID, masterID lib.MasterID, _config lib.WorkerConfig) *Master {
ret := &Master{
pendingWorkerSet: make(map[lib.WorkerID]int),
}
deps := ctx.Dependencies
base := lib.NewBaseMaster(
base := lib.NewBaseJobMaster(
ctx,
ret,
ret,
masterID,
workerID,
deps.MessageHandlerManager,
deps.MessageRouter,
deps.MetaKVClient,
deps.ExecutorClientManager,
deps.ServerMasterClient)
ret.BaseMaster = base
deps.ServerMasterClient,
)
ret.BaseJobMaster = base
return ret
}
112 changes: 1 addition & 111 deletions lib/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,7 @@ type DefaultBaseMaster struct {
serverMasterClient client.MasterClient
pool workerpool.AsyncPool

// used to communicate with job manager in server master
masterClient *masterClient
clock clock.Clock
clock clock.Clock

// workerManager maintains the list of all workers and
// their statuses.
Expand All @@ -108,10 +106,6 @@ type DefaultBaseMaster struct {
// closeCh is closed when the BaseMaster is exiting
closeCh chan struct{}

// read-only fields
// if this master is job master, masterID represents job manager id
// if this master is job manager, master ID is ""
masterID MasterID
id MasterID // id of this master
advertiseAddr string
nodeID p2p.NodeID
Expand All @@ -127,7 +121,6 @@ type DefaultBaseMaster struct {
func NewBaseMaster(
ctx *dcontext.Context,
impl MasterImpl,
masterID MasterID,
id MasterID,
messageHandlerManager p2p.MessageHandlerManager,
messageRouter p2p.MessageSender,
Expand All @@ -151,7 +144,6 @@ func NewBaseMaster(
executorClientManager: executorClientManager,
serverMasterClient: serverMasterClient,
pool: workerpool.NewDefaultAsyncPool(4),
masterID: masterID,
id: id,
clock: clock.New(),

Expand Down Expand Up @@ -181,29 +173,6 @@ func (m *DefaultBaseMaster) Init(ctx context.Context) error {
m.currentEpoch.Store(epoch)
m.workerManager = newWorkerManager(m.id, !isInit, epoch)

// job manager doesn't need to send heartbeat to anybody
if !m.isJobManager() {
m.masterClient = newMasterClient(
m.masterID,
m.id,
m.messageRouter,
m.metaKVClient,
m.clock.Mono(),
func() error {
// TODO: support job manager failover
return nil
},
)
err = m.registerHandlerForMaster(ctx)
if err != nil {
return err
}
err = m.masterClient.InitMasterInfoFromMeta(ctx)
if err != nil {
return err
}
}

m.startBackgroundTasks()

if isInit {
Expand Down Expand Up @@ -289,29 +258,6 @@ func (m *DefaultBaseMaster) startBackgroundTasks() {
m.OnError(err)
}
}()

// TODO: extract a heartbeat manager and reuses it in both lib/master and lib/worker
if !m.isJobManager() {
m.wg.Add(1)
go func() {
defer m.wg.Done()
if err := m.runHeartbeatWorker(cctx); err != nil {
m.OnError(err)
}
}()

m.wg.Add(1)
go func() {
defer m.wg.Done()
if err := m.runWatchDog(cctx); err != nil {
m.OnError(err)
}
}()
}
}

func (m *DefaultBaseMaster) isJobManager() bool {
return m.masterID == ""
}

func (m *DefaultBaseMaster) runWorkerCheck(ctx context.Context) error {
Expand Down Expand Up @@ -409,29 +355,6 @@ func (m *DefaultBaseMaster) markInitializedInMetadata(ctx context.Context) error
return nil
}

func (m *DefaultBaseMaster) registerHandlerForMaster(ctx context.Context) error {
topic := HeartbeatPongTopic(m.masterClient.masterID, m.id)
ok, err := m.messageHandlerManager.RegisterHandler(
ctx,
topic,
&HeartbeatPongMessage{},
func(sender p2p.NodeID, value p2p.MessageValue) error {
msg := value.(*HeartbeatPongMessage)
log.L().Debug("heartbeat pong received",
zap.Any("msg", msg))
m.masterClient.HandleHeartbeat(sender, msg)
return nil
})
if err != nil {
return errors.Trace(err)
}
if !ok {
log.L().Panic("duplicate handler",
zap.String("topic", topic))
}
return nil
}

func (m *DefaultBaseMaster) registerHandlerForWorker(ctx context.Context, workerID WorkerID) error {
topic := HeartbeatPingTopic(m.id, workerID)
ok, err := m.messageHandlerManager.RegisterHandler(
Expand Down Expand Up @@ -591,36 +514,3 @@ func (m *DefaultBaseMaster) GetWorkerStatusExtTypeInfo() interface{} {
info := int64(0)
return &info
}

func (m *DefaultBaseMaster) runHeartbeatWorker(ctx context.Context) error {
ticker := m.clock.Ticker(m.timeoutConfig.workerHeartbeatInterval)
for {
select {
case <-ctx.Done():
return errors.Trace(ctx.Err())
case <-ticker.C:
if err := m.masterClient.SendHeartBeat(ctx, m.clock); err != nil {
return errors.Trace(err)
}
}
}
}

func (m *DefaultBaseMaster) runWatchDog(ctx context.Context) error {
ticker := m.clock.Ticker(m.timeoutConfig.workerHeartbeatInterval)
for {
select {
case <-ctx.Done():
return errors.Trace(ctx.Err())
case <-ticker.C:
}

isNormal, err := m.masterClient.CheckMasterTimeout(ctx, m.clock)
if err != nil {
return errors.Trace(err)
}
if !isNormal {
return derror.ErrWorkerSuicide.GenWithStackByArgs()
}
}
}
3 changes: 1 addition & 2 deletions lib/master_mock_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,12 @@ import (
"github.com/hanfei1991/microcosm/pkg/uuid"
)

func MockBaseMaster(masterID MasterID, id MasterID, masterImpl MasterImpl) *DefaultBaseMaster {
func MockBaseMaster(id MasterID, masterImpl MasterImpl) *DefaultBaseMaster {
ret := NewBaseMaster(
// ctx is nil for now
// TODO refine this
nil,
masterImpl,
masterID,
id,
p2p.NewMockMessageHandlerManager(),
p2p.NewMockMessageSender(),
Expand Down
2 changes: 0 additions & 2 deletions lib/mock_master_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ func NewMockMasterImpl(masterID, id MasterID) *MockMasterImpl {
// TODO refine this
nil,
ret,
masterID,
id,
ret.messageHandlerManager,
ret.messageSender,
Expand All @@ -71,7 +70,6 @@ func (m *MockMasterImpl) Reset() {
m.DefaultBaseMaster = NewBaseMaster(
nil,
m,
m.masterID,
m.id,
m.messageHandlerManager,
m.messageSender,
Expand Down
1 change: 0 additions & 1 deletion servermaster/jobmanager_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ func NewJobManagerImplV2(
impl.BaseMaster = lib.NewBaseMaster(
dctx,
impl,
masterID,
id,
impl.messageHandlerManager,
impl.messageSender,
Expand Down

0 comments on commit bdd71c1

Please sign in to comment.