Skip to content

Commit

Permalink
executer_manager(engine): Support watching executor online/offline ev…
Browse files Browse the repository at this point in the history
…ents (#5565)

ref #5513
  • Loading branch information
liuzix authored May 25, 2022
1 parent 03a259f commit 98fbefd
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 3 deletions.
29 changes: 26 additions & 3 deletions engine/servermaster/executor_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ type ExecutorManagerImpl struct {

rescMgr resource.RescMgr
logRL *rate.Limiter

notifier *notifier.Notifier[model.ExecutorStatusChange]
}

// NewExecutorManagerImpl creates a new ExecutorManagerImpl instance
Expand All @@ -78,6 +80,7 @@ func NewExecutorManagerImpl(initHeartbeatTTL, keepAliveInterval time.Duration, c
keepAliveInterval: keepAliveInterval,
rescMgr: resource.NewCapRescMgr(),
logRL: rate.NewLimiter(rate.Every(time.Second*5), 1 /*burst*/),
notifier: notifier.NewNotifier[model.ExecutorStatusChange](),
}
}

Expand All @@ -99,6 +102,11 @@ func (e *ExecutorManagerImpl) removeExecutorImpl(id model.ExecutorID) error {
Time: time.Now(),
})
}

e.notifier.Notify(model.ExecutorStatusChange{
ID: id,
Tp: model.EventExecutorOffline,
})
return nil
}

Expand Down Expand Up @@ -150,6 +158,10 @@ func (e *ExecutorManagerImpl) RegisterExec(info *model.NodeInfo) {
}
e.mu.Lock()
e.executors[info.ID] = exec
e.notifier.Notify(model.ExecutorStatusChange{
ID: info.ID,
Tp: model.EventExecutorOnline,
})
e.mu.Unlock()
e.rescMgr.Register(exec.ID, exec.Addr, model.RescUnit(exec.Capability))
}
Expand Down Expand Up @@ -293,7 +305,18 @@ func (e *ExecutorManagerImpl) GetAddr(executorID model.ExecutorID) (string, bool
// WatchExecutors implements the ExecutorManager interface.
func (e *ExecutorManagerImpl) WatchExecutors(
ctx context.Context,
) ([]model.ExecutorID, *notifier.Receiver[model.ExecutorStatusChange], error) {
// TODO This method will be implemented before we enable local file GC.
panic("implement me")
) (snap []model.ExecutorID, receiver *notifier.Receiver[model.ExecutorStatusChange], err error) {
e.mu.Lock()
defer e.mu.Unlock()

for executorID := range e.executors {
snap = append(snap, executorID)
}

if err := e.notifier.Flush(ctx); err != nil {
return nil, nil, err
}

receiver = e.notifier.NewReceiver()
return
}
68 changes: 68 additions & 0 deletions engine/servermaster/executor_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,71 @@ func TestExecutorManager(t *testing.T) {
require.NotNil(t, resp.Err)
require.Equal(t, pb.ErrorCode_UnknownExecutor, resp.Err.GetCode())
}

func TestExecutorManagerWatch(t *testing.T) {
t.Parallel()

heartbeatTTL := time.Millisecond * 400
checkInterval := time.Millisecond * 10
mgr := NewExecutorManagerImpl(heartbeatTTL, checkInterval, nil)
mgr.Start(context.Background())

// register an executor server
executorAddr := "127.0.0.1:10001"
registerReq := &pb.RegisterExecutorRequest{
Address: executorAddr,
Capability: 2,
}
info, err := mgr.AllocateNewExec(registerReq)
require.Nil(t, err)

executorID1 := info.ID
snap, stream, err := mgr.WatchExecutors(context.Background())
require.NoError(t, err)
require.Equal(t, []model.ExecutorID{executorID1}, snap)

// register another executor server
executorAddr = "127.0.0.1:10002"
registerReq = &pb.RegisterExecutorRequest{
Address: executorAddr,
Capability: 2,
}
info, err = mgr.AllocateNewExec(registerReq)
require.Nil(t, err)

executorID2 := info.ID
event := <-stream.C
require.Equal(t, model.ExecutorStatusChange{
ID: executorID2,
Tp: model.EventExecutorOnline,
}, event)

newHeartbeatReq := func(executorID model.ExecutorID) *pb.HeartbeatRequest {
return &pb.HeartbeatRequest{
ExecutorId: string(executorID),
Status: int32(model.Running),
Timestamp: uint64(time.Now().Unix()),
Ttl: uint64(10), // 10ms ttl
}
}

_, err = mgr.HandleHeartbeat(newHeartbeatReq(executorID1))
require.NoError(t, err)
_, err = mgr.HandleHeartbeat(newHeartbeatReq(executorID2))
require.NoError(t, err)

require.Equal(t, 2, mgr.ExecutorCount(model.Running))

require.Eventually(t, func() bool {
resp, err := mgr.HandleHeartbeat(newHeartbeatReq(executorID2))
require.NoError(t, err)
require.Nil(t, resp.Err)
return mgr.ExecutorCount(model.Running) == 1
}, time.Second*2, time.Millisecond*5)

event = <-stream.C
require.Equal(t, model.ExecutorStatusChange{
ID: executorID1,
Tp: model.EventExecutorOffline,
}, event)
}

0 comments on commit 98fbefd

Please sign in to comment.