Skip to content

Commit

Permalink
fix RPC calls, fix log level setting
Browse files Browse the repository at this point in the history
  • Loading branch information
andre-merzky committed Sep 18, 2023
1 parent ec3f84a commit a5e6532
Show file tree
Hide file tree
Showing 12 changed files with 151 additions and 98 deletions.
30 changes: 12 additions & 18 deletions src/radical/pilot/agent/agent_0.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def __init__(self):
#
def _proxy_input_cb(self, msg):

self._log.debug('====== proxy input cb: %s', len(msg))
self._log.debug_8('proxy input cb: %s', len(msg))

to_advance = list()

Expand Down Expand Up @@ -202,16 +202,7 @@ def initialize(self):
self.register_output(rps.TMGR_STAGING_OUTPUT_PENDING,
rpc.PROXY_TASK_QUEUE)

# hook into the control pubsub for rpc handling
ctrl_addr_pub = self._session._reg['bridges.control_pubsub.addr_pub']
ctrl_addr_sub = self._session._reg['bridges.control_pubsub.addr_sub']

self._rpc_helper = rpu.RPCHelper(owner=self._uid,
ctrl_addr_pub=ctrl_addr_pub,
ctrl_addr_sub=ctrl_addr_sub,
log=self._log, prof=self._prof)

self._rpc_helper.add_handler('prepare_env', self._prepare_env)
self.register_rpc_handler('prepare_env', self._prepare_env)

# before we run any tasks, prepare a named_env `rp` for tasks which use
# the pilot's own environment, such as raptors
Expand All @@ -222,7 +213,9 @@ def initialize(self):
'export PATH=%s'
% os.environ.get('PATH', '')]
}
self._rpc_helper.request('prepare_env', env_name='rp', env_spec=env_spec)


self.rpc('prepare_env', env_name='rp', env_spec=env_spec)

# start any services if they are requested
self._start_services()
Expand All @@ -232,6 +225,8 @@ def initialize(self):
rm_info = self._rm.info
n_nodes = len(rm_info['node_list'])

self._log.debug('advance to PMGR_ACTIVE')

pilot = {'$all' : True, # pass full info to client side
'type' : 'pilot',
'uid' : self._pid,
Expand Down Expand Up @@ -545,15 +540,14 @@ def control_cb(self, topic, msg):
requests to handle.
'''

self._log.debug('==== %s: %s', topic, msg)
self._log.debug_1('control msg %s: %s', topic, msg)

cmd = msg['cmd']
arg = msg['arg']

self._log.debug('pilot command: %s: %s', cmd, arg)
self._prof.prof('cmd', msg="%s : %s" % (cmd, arg), uid=self._pid)


if cmd == 'pmgr_heartbeat' and arg['pmgr'] == self._pmgr:
self._session._hb.beat(uid=self._pmgr)
return True
Expand Down Expand Up @@ -598,17 +592,17 @@ def _ctrl_service_up(self, msg):

if uid not in self._service_uids_launched:
# we do not know this service instance
self._log.warn('=== ignore service startup signal for %s', uid)
self._log.warn('ignore service startup signal for %s', uid)
return True

if uid in self._service_uids_running:
self._log.warn('=== duplicated service startup signal for %s', uid)
self._log.warn('duplicated service startup signal for %s', uid)
return True

self._log.debug('=== service startup message for %s', uid)
self._log.debug('service startup message for %s', uid)

self._service_uids_running.append(uid)
self._log.debug('=== service %s started (%s / %s)', uid,
self._log.debug('service %s started (%s / %s)', uid,
len(self._service_uids_running),
len(self._service_uids_launched))

Expand Down
4 changes: 4 additions & 0 deletions src/radical/pilot/agent/scheduler/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,7 @@ def control_cb(self, topic, msg):

if cmd == 'register_named_env':


env_name = arg['env_name']
self._named_envs.append(env_name)

Expand Down Expand Up @@ -656,6 +657,9 @@ def _schedule_tasks(self):
self.register_output(rps.AGENT_EXECUTING_PENDING,
rpc.AGENT_EXECUTING_QUEUE)

# re-register the control callback in this subprocess
self.register_subscriber(rpc.CONTROL_PUBSUB, self._control_cb)

self._publishers = dict()
self.register_publisher(rpc.STATE_PUBSUB)

Expand Down
42 changes: 23 additions & 19 deletions src/radical/pilot/pilot.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,10 @@ def __init__(self, pmgr: PilotManager, descr):
# hook into the control pubsub for rpc handling
self._rpc_reqs = dict()
ctrl_addr_sub = self._session._reg['bridges.control_pubsub.addr_sub']
ctrl_addr_pub = self._session._reg['bridges.control_pubsub.addr_pub']

self._ctrl_pub = ru.zmq.Publisher(rpc.CONTROL_PUBSUB, url=ctrl_addr_pub,
log=self._log, prof=self._prof)

ru.zmq.Subscriber(rpc.CONTROL_PUBSUB, url=ctrl_addr_sub,
log=self._log, prof=self._prof, cb=self._control_cb,
Expand Down Expand Up @@ -700,8 +704,7 @@ def prepare_env(self, env_name, env_spec):
"""

self.rpc('prepare_env', {'env_name': env_name,
'env_spec': env_spec})
self.rpc('prepare_env', env_name=env_name, env_spec=env_spec)


# --------------------------------------------------------------------------
Expand Down Expand Up @@ -738,35 +741,36 @@ def _control_cb(self, topic, msg_data):

try:
msg = ru.zmq.Message.deserialize(msg_data)
self._log.debug('=== rpc res: %s', msg)

if msg.uid in self._rpc_reqs:
self._rpc_reqs[msg.uid]['res'] = msg
self._rpc_reqs[msg.uid]['evt'].set()
if isinstance(msg, RPCResultMessage):

self._log.debug_4('handle rpc result %s', msg)

if msg.uid in self._rpc_reqs:
self._rpc_reqs[msg.uid]['res'] = msg
self._rpc_reqs[msg.uid]['evt'].set()

except:
self._log.debug('=== ignore msg %s', msg_data)
pass


# --------------------------------------------------------------------------
#
def rpc(self, cmd, args=None, kwargs=None):
def rpc(self, cmd, *args, **kwargs):
'''Remote procedure call.
Send am RPC command and arguments to the pilot and wait for the
response. This is a synchronous operation at this point, and it is not
thread safe to have multiple concurrent RPC calls.
'''

self._log.debug('=== pilot in %s state', self.state)
# RPC's can only be handled in `PMGR_ACTIVE` state
# FIXME: RPCs will hang vorever if the pilot dies after sending the msg
self.wait(rps.PMGR_ACTIVE)
self._log.debug('=== pilot now in %s state', self.state)

if not args:
args = dict()

rpc_id = ru.generate_id('%s.rpc' % self._uid)
rpc_req = RPCRequestMessage(uid=rpc_id, cmd=cmd, args=args)
rpc_req = RPCRequestMessage(uid=rpc_id, cmd=cmd, args=args,
kwargs=kwargs)

self._rpc_reqs[rpc_id] = {
'req': rpc_req,
Expand All @@ -775,18 +779,18 @@ def rpc(self, cmd, args=None, kwargs=None):
'time': time.time(),
}

self._log.debug('=== wait for rpc request %s', rpc_req)
self._ctrl_pub.put(rpc.CONTROL_PUBSUB, rpc_req)

while True:

if not self._rpc_reqs[rpc_id]['evt'].wait(timeout=10):
self._log.debug('=== still waiting for rpc request %s', rpc_id)
if not self._rpc_reqs[rpc_id]['evt'].wait(timeout=60):
self._log.debug('still waiting for rpc request %s', rpc_id)
continue

rpc_res = self._rpc_reqs[rpc_id]['res']
self._log.debug('=== rpc result: %s', rpc_res)

if rpc_res.exc:
raise RuntimeError('=== rpc failed: %s' % rpc_res.exc)
raise RuntimeError('rpc failed: %s' % rpc_res.exc)

return rpc_res.val

Expand Down
2 changes: 1 addition & 1 deletion src/radical/pilot/pilot_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -832,7 +832,7 @@ def cancel_pilots(self, uids=None, _timeout=None):
# send the cancellation request to the pilots
# FIXME: MongoDB
# self._session._dbs.pilot_command('cancel_pilot', [], uids)
self._log.debug('=== issue cancel_pilots for %s', uids)
self._log.debug('issue cancel_pilots for %s', uids)
self.publish(rpc.CONTROL_PUBSUB, {'cmd' : 'cancel_pilots',
'arg' : {'pmgr' : self.uid,
'uids' : uids}})
Expand Down
3 changes: 2 additions & 1 deletion src/radical/pilot/pmgr/launching/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -886,7 +886,8 @@ def _prepare_pilot(self, resource, rcfg, pilot, expand, tar_name):
agent_cfg['task_post_launch'] = task_post_launch
agent_cfg['task_post_exec'] = task_post_exec
agent_cfg['resource_cfg'] = copy.deepcopy(rcfg)
agent_cfg['debug'] = self._log.getEffectiveLevel()
agent_cfg['log_lvl'] = self._log.level
agent_cfg['debug_lvl'] = self._log.debug_level
agent_cfg['services'] = services

pilot['cfg'] = agent_cfg
Expand Down
1 change: 1 addition & 0 deletions src/radical/pilot/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ def _register(self, arg):
#
def _worker(self, sid, q, term):

# FIXME: log level etc
log = ru.Logger('radical.pilot.bridge', level='debug', path=sid)

proxy_cp = None
Expand Down
1 change: 1 addition & 0 deletions src/radical/pilot/raptor/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def __init__(self, manager, rank, raptor_id):

self._log = ru.Logger(name=self._uid, ns='radical.pilot.worker',
level=self._cfg.log_lvl,
debug=self._cfg.debug_lvl,
targets=self._cfg.log_tgt,
path=self._cfg.path)
self._prof = ru.Profiler(name=self._uid, ns='radical.pilot.worker',
Expand Down
50 changes: 20 additions & 30 deletions src/radical/pilot/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,29 +186,10 @@ def __init__(self, proxy_url: Optional[str ] = None,

# initialization is different for each session type
# NOTE: we could refactor this to session sub-classes
if self._role == self._PRIMARY:

# if user did not set a uid, we need to generate a new ID
if not self._uid:
self._uid = ru.generate_id('rp.session', mode=ru.ID_PRIVATE)

self._init_primary()


elif self._role == self._AGENT_0:

self._init_agent_0()


elif self._role == self._AGENT_N:

self._init_agent_n()


else:

self._init_default()

if self._role == self._PRIMARY: self._init_primary()
elif self._role == self._AGENT_0: self._init_agent_0()
elif self._role == self._AGENT_N: self._init_agent_n()
else : self._init_default()

# now we have config and uid - initialize base class (saga session)
rs.Session.__init__(self, uid=self._uid)
Expand Down Expand Up @@ -244,6 +225,10 @@ def _init_primary(self):
# - pushes bridge and component configs into that registry
# - starts a ZMQ proxy (or ensures one is up and running)

# if user did not set a uid, we need to generate a new ID
if not self._uid:
self._uid = ru.generate_id('rp.session', mode=ru.ID_PRIVATE)

# we still call `_init_cfg` to complete missing config settings
# FIXME: completion only needed by `PRIMARY`
self._init_cfg_from_scratch()
Expand Down Expand Up @@ -434,7 +419,8 @@ def _init_cfg_from_scratch(self):
self._prof = self._get_profiler(name=self._uid)
self._rep = self._get_reporter(name=self._uid)
self._log = self._get_logger (name=self._uid,
level=self._cfg.get('debug'))
level=self._cfg.get('log_lvl'),
debug=self._cfg.get('debug_lvl'))

from . import version_detail as rp_version_detail
self._log.info('radical.pilot version: %s', rp_version_detail)
Expand Down Expand Up @@ -482,7 +468,8 @@ def _init_cfg_from_dict(self):
self._prof = self._get_profiler(name=self._uid)
self._rep = self._get_reporter(name=self._uid)
self._log = self._get_logger (name=self._uid,
level=self._cfg.get('debug'))
level=self._cfg.get('log_lvl'),
debug=self._cfg.get('debug_lvl'))

from . import version_detail as rp_version_detail
self._log.info('radical.pilot version: %s', rp_version_detail)
Expand Down Expand Up @@ -511,7 +498,8 @@ def _init_cfg_from_registry(self):
self._prof = self._get_profiler(name=self._uid)
self._rep = self._get_reporter(name=self._uid)
self._log = self._get_logger (name=self._uid,
level=self._cfg.get('debug'))
level=self._cfg.get('log_lvl'),
debug=self._cfg.get('debug_lvl'))

from . import version_detail as rp_version_detail
self._log.info('radical.pilot version: %s', rp_version_detail)
Expand Down Expand Up @@ -576,7 +564,7 @@ def _hb_term_cb(hb_uid):
# --------------------------------------

# create heartbeat manager which monitors all components in this session
# self._log.debug('=== hb %s from session', self._uid)
# self._log.debug('hb %s from session', self._uid)
self._hb = ru.Heartbeat(uid=self._uid,
timeout=self._cfg.heartbeat.timeout,
interval=self._cfg.heartbeat.interval,
Expand Down Expand Up @@ -1032,14 +1020,16 @@ def cmgr(self):

# --------------------------------------------------------------------------
#
def _get_logger(self, name, level=None):
def _get_logger(self, name, level=None, debug=None):
"""Get the Logger instance.
This is a thin wrapper around `ru.Logger()` which makes sure that
log files end up in a separate directory with the name of `session.uid`.
"""
return ru.Logger(name=name, ns='radical.pilot', path=self._cfg.path,
targets=['.'], level=level)
log = ru.Logger(name=name, ns='radical.pilot', path=self._cfg.path,
targets=['.'], level=level, debug=debug)

return log


# --------------------------------------------------------------------------
Expand Down
2 changes: 1 addition & 1 deletion src/radical/pilot/tmgr/scheduler/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ def control_cb(self, topic, msg):

cmd = msg['cmd']

self._log.debug('=== got cmd %s', cmd)
self._log.debug('got cmd %s', cmd)

if cmd not in ['add_pilots', 'remove_pilots', 'cancel_tasks']:
return True
Expand Down
2 changes: 1 addition & 1 deletion src/radical/pilot/tmgr/staging_input/default.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ def _advance_tasks(self, tasks, pid=None, state=None, push=True):
# perform and publish state update
# push to the proxy queue
for task in tasks:
self._log.debug('====== push to proxy: %s', task['uid'])
self._log.debug_8('push to proxy: %s', task['uid'])

self.advance(tasks, state, publish=True, push=push, qname=pid)

Expand Down
Loading

0 comments on commit a5e6532

Please sign in to comment.