Skip to content

Commit

Permalink
fixes in control msg handling
Browse files Browse the repository at this point in the history
  • Loading branch information
andre-merzky committed Sep 22, 2023
1 parent ca5c400 commit 853d3b4
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 13 deletions.
7 changes: 7 additions & 0 deletions src/radical/pilot/agent/scheduler/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ def initialize(self):
self.register_subscriber(rpc.AGENT_UNSCHEDULE_PUBSUB, self.unschedule_cb)

# start a process to host the actual scheduling algorithm
self._scheduler_process = False
self._p = mp.Process(target=self._schedule_tasks)
self._p.daemon = True
self._p.start()
Expand Down Expand Up @@ -321,6 +322,10 @@ def control_cb(self, topic, msg):
listen on the control channel for raptor queue registration commands
'''

# only the scheduler process listens for control messages
if not self._scheduler_process:
return

cmd = msg['cmd']
arg = msg['arg']

Expand Down Expand Up @@ -598,6 +603,8 @@ def _schedule_tasks(self):
tasks.
'''

self._scheduler_process = True

# ZMQ endpoints will not have survived the fork. Specifically the
# registry client of the component base class will have to reconnect.
# Note that `self._reg` of the base class is a *pointer* to the sesison
Expand Down
5 changes: 3 additions & 2 deletions src/radical/pilot/raptor/master.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,9 @@ def __init__(self, cfg: ru.Config = None):

rpu.Component.__init__(self, ccfg, self._session)

self.register_publisher(rpc.STATE_PUBSUB)
self.register_publisher(rpc.CONTROL_PUBSUB)
# we never run `self.start()` which is ok - but it means we miss out on
# some of the component initialization. Call it manually thus
self._initialize()

# send new worker tasks and agent input staging / agent scheduler
self.register_output(rps.AGENT_STAGING_INPUT_PENDING,
Expand Down
30 changes: 19 additions & 11 deletions src/radical/pilot/raptor/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,21 +145,29 @@ def _hb_worker(self):

# --------------------------------------------------------------------------
#
def _state_cb(self, topic, things):
def _state_cb(self, topic, msgs):

for thing in ru.as_list(things):
for msg in ru.as_list(msgs):

uid = thing['uid']
state = thing['state']
cmd = msg['cmd']
arg = msg['arg']

if uid == self._raptor_id:
if cmd != 'update':
continue

if state in rps.FINAL + [rps.AGENT_STAGING_OUTPUT_PENDING]:
# master completed - terminate this worker
self._log.info('master %s final: %s - terminate',
uid, state)
self.stop()
return False
for thing in arg:

uid = thing['uid']
state = thing['state']

if uid == self._raptor_id:

if state in rps.FINAL + [rps.AGENT_STAGING_OUTPUT_PENDING]:
# master completed - terminate this worker
self._log.info('master %s final: %s - terminate',
uid, state)
self.stop()
return False

return True

Expand Down

0 comments on commit 853d3b4

Please sign in to comment.