Skip to content

feat(worker): Assign WorkerGroup via worker configuration #332

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions vermeer/apps/master/bl/grpc_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (h *ServerHandler) SayHelloMaster(ctx context.Context, req *pb.HelloMasterR
}
}

reqWorker, err := workerMgr.CreateWorker(ip+":"+port, ip, req.Version)
reqWorker, err := workerMgr.CreateWorker(ip+":"+port, ip, req.Version, req.WorkerGroup)
if err != nil {
logrus.Errorf("failed to create a WorkerClient, error: %s", err)
return &pb.HelloMasterResp{WorkerId: -1, WorkerName: reqWorker.Name}, err
Expand All @@ -104,7 +104,7 @@ func (h *ServerHandler) SayHelloMaster(ctx context.Context, req *pb.HelloMasterR
return &pb.HelloMasterResp{}, err
}

logrus.Infof("worker say hello name: %s, client: %s", reqWorker.Name, p.Addr.String())
logrus.Infof("worker say hello name: %s and set to workgroup: %s, client: %s", reqWorker.Name, reqWorker.Group, p.Addr.String())
Copy link
Preview

Copilot AI Jun 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Consider standardizing the log format for consistency and easier parsing, e.g.: worker registered name=%s group=%s client=%s.

Suggested change
logrus.Infof("worker say hello name: %s and set to workgroup: %s, client: %s", reqWorker.Name, reqWorker.Group, p.Addr.String())
logrus.Infof("worker registered name=%s group=%s client=%s", reqWorker.Name, reqWorker.Group, p.Addr.String())

Copilot uses AI. Check for mistakes.


resp := pb.HelloMasterResp{
WorkerId: reqWorker.Id,
Expand Down
14 changes: 12 additions & 2 deletions vermeer/apps/master/workers/worker_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (wm *workerManager) Init() {
// CreateWorker Build a WorkerClient without an ID, and it'll receive one upon joining the WorkerManager.
// The new WokerClient instance will be assigned a same name with the old one added to the WorkerManager,
// which has the same workerPeer property.
func (wm *workerManager) CreateWorker(workerPeer string, ipAddr string, version string) (*WorkerClient, error) {
func (wm *workerManager) CreateWorker(workerPeer string, ipAddr string, version string, workerGroup string) (*WorkerClient, error) {
if workerPeer == "" {
return nil, fmt.Errorf("the argument 'workerPeer' is invalid")
}
Expand All @@ -115,12 +115,17 @@ func (wm *workerManager) CreateWorker(workerPeer string, ipAddr string, version
return nil, fmt.Errorf("the argument 'version' is invalid")
}

// must check if workerGroup is valid
if workerGroup == "" {
workerGroup = "$"
}

worker := &WorkerClient{
GrpcPeer: workerPeer,
IpAddr: ipAddr,
Version: version,
LaunchTime: time.Now(),
Group: "$",
Group: workerGroup,
}

workerInDB := wm.retrieveWorker(workerPeer)
Expand All @@ -133,6 +138,11 @@ func (wm *workerManager) CreateWorker(workerPeer string, ipAddr string, version
worker.InitTime = worker.LaunchTime
}

// if workerGroup in workerInDB is different from the one in worker, give a warning to the user
if workerGroup != "$" && worker.Group != workerGroup {
logrus.Warnf("worker manager, worker group mismatch: given %s, but found %s in db for worker %s", workerGroup, worker.Group, worker.Name)
Comment on lines +142 to +143
Copy link
Preview

Copilot AI Jun 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The mismatch check compares workerGroup to worker.Group, which was just set to the same value; it should compare workerGroup to the existing group in workerInDB (e.g., workerInDB.Group) to detect real mismatches.

Suggested change
if workerGroup != "$" && worker.Group != workerGroup {
logrus.Warnf("worker manager, worker group mismatch: given %s, but found %s in db for worker %s", workerGroup, worker.Group, worker.Name)
if workerGroup != "$" && workerInDB != nil && workerInDB.Group != workerGroup {
logrus.Warnf("worker manager, worker group mismatch: given %s, but found %s in db for worker %s", workerGroup, workerInDB.Group, worker.Name)

Copilot uses AI. Check for mistakes.

}

return worker, nil
}

Expand Down
Loading
Loading