Skip to content

Commit

Permalink
cleaner RPC handling
Browse files Browse the repository at this point in the history
  • Loading branch information
andre-merzky committed Jul 26, 2023
1 parent 6322b05 commit f719517
Show file tree
Hide file tree
Showing 3 changed files with 241 additions and 193 deletions.
227 changes: 34 additions & 193 deletions src/radical/pilot/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,48 +9,10 @@
#
class HeartbeatMessage(ru.Message):

# ------------------------------
class Payload(ru.TypedDict):
_schema = {'uid': str }
_defaults = {'uid': None }
# ------------------------------

_schema = {
'payload': Payload
}

_defaults = {
'msg_type': 'heartbeat',
'payload' : {}
}



# --------------------------------------------------------------------------
def __init__(self, uid : Optional[str] = None,
from_dict: Optional[Dict[str, Any]] = None):
'''
support msg construction and usage like this:
hb_msg = rp.HeartbeatMessage(uid='foo.1')
assert hb_msg.uid == 'foo.1
'''

if uid:
from_dict = {'payload': {'uid': uid}}

super().__init__(from_dict=from_dict)


# --------------------------------------------------------------------------
@property
def uid(self):
return self.payload.uid

@uid.setter
def uid(self, value):
self.payload.uid = value
_schema = {'uid' : str }
_defaults = {'_msg_type': 'heartbeat',
'uid' : None}


ru.Message.register_msg_type('heartbeat', HeartbeatMessage)
Expand All @@ -60,82 +22,18 @@ def uid(self, value):
#
class RPCRequestMessage(ru.Message):

# ------------------------------
class Payload(ru.TypedDict):
_schema = {
'uid' : str, # uid of message
'addr': str, # who is expected to act on the request
'cmd' : str, # rpc command
'args': dict, # rpc command arguments
}
_defaults = {
'uid' : None,
'addr': None,
'cmd' : None,
'args': {},
}
# ------------------------------

_schema = {
'payload': Payload
}

_schema = {'uid' : str, # uid of message
'addr' : str, # who is expected to act on the request
'cmd' : str, # rpc command
'args' : list, # rpc command arguments
'kwargs' : dict} # rpc command named arguments
_defaults = {
'msg_type': 'rpc_req',
'payload' : {}
}



# --------------------------------------------------------------------------
def __init__(self, uid : Optional[str] = None,
addr: Optional[str] = None,
rpc : Optional[str] = None,
args: Optional[Dict[str, Any]] = None):
'''
support msg construction and usage like this:
msg = rp.Message(addr='pilot.0000', rpc='stop')
assert msg.addr == 'pilot.0000'
'''

from_dict = dict()

if addr: from_dict['addr'] = addr
if rpc: from_dict['rpc'] = rpc
if args: from_dict['args'] = args

super().__init__(from_dict=from_dict)


# --------------------------------------------------------------------------
@property
def addr(self):
return self.payload.addr

@addr.setter
def addr(self, value):
self.payload.addr = value


@property
def rpc(self):
return self.payload.rpc

@rpc.setter
def rpc(self, value):
self.payload.rpc = value


@property
def args(self):
return self.payload.args

@args.setter
def args(self, value):
self.payload.args = value

'_msg_type': 'rpc_req',
'uid' : None,
'addr' : None,
'cmd' : None,
'args' : [],
'kwargs' : {}}

ru.Message.register_msg_type('rpc_req', RPCRequestMessage)

Expand All @@ -144,92 +42,35 @@ def args(self, value):
#
class RPCResultMessage(ru.Message):

# ------------------------------
class Payload(ru.TypedDict):
_schema = {
'uid': str, # uid of rpc call
'val': Any, # return value (`None` by default)
'out': str, # stdout
'err': str, # stderr
'exc': str, # raised exception representation
}
_defaults = {
'uid': None,
'val': None,
'out': None,
'err': None,
'exc': None,
}
# ------------------------------

_schema = {
'payload': Payload
}

_defaults = {
'msg_type': 'rpc_res',
'payload' : {}
}


_schema = {'uid' : str, # uid of rpc call
'val' : Any, # return value (`None` by default)
'out' : str, # stdout
'err' : str, # stderr
'exc' : str} # raised exception representation
_defaults = {'_msg_type': 'rpc_res',
'uid' : None,
'val' : None,
'out' : None,
'err' : None,
'exc' : None}

# --------------------------------------------------------------------------
def __init__(self, rpc_req: Optional[RPCRequestMessage] = None,
uid : Optional[str] = None,
val : Optional[Any] = None,
out : Optional[str] = None,
err : Optional[str] = None,
exc : Optional[Tuple[str, str]] = None):
'''
support rpc response message construction from an rpc request message
(carries over `uid`):
#
def __init__(self, rpc_req=None, from_dict=None, **kwargs):

msg = rp.Message(rpc_req=req_msg, val=42)
# when constfructed from a request message copy the uid

'''
if rpc_req:
if not from_dict:
from_dict = dict()

from_dict = dict()
from_dict['uid'] = rpc_req['uid']

if rpc_req: from_dict['uid'] = rpc_req.uid
super().__init__(from_dict, **kwargs)

if uid: from_dict['uid'] = uid
if val: from_dict['val'] = uid
if out: from_dict['out'] = uid
if err: from_dict['err'] = uid
if exc: from_dict['exc'] = uid

super().__init__(from_dict=from_dict)
ru.Message.register_msg_type('rpc_res', RPCResultMessage)


# --------------------------------------------------------------------------
@property
def addr(self):
return self.payload.addr

@addr.setter
def addr(self, value):
self.payload.addr = value


@property
def rpc(self):
return self.payload.rpc

@rpc.setter
def rpc(self, value):
self.payload.rpc = value


@property
def args(self):
return self.payload.args

@args.setter
def args(self, value):
self.payload.args = value


ru.Message.register_msg_type('rpc_req', RPCRequestMessage)

# ------------------------------------------------------------------------------

1 change: 1 addition & 0 deletions src/radical/pilot/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
from .component import *
from .component_manager import *
from .serializer import *
from .rpc_helper import *


# ------------------------------------------------------------------------------
Expand Down
Loading

0 comments on commit f719517

Please sign in to comment.