Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Docs/raptor #2999

Merged
merged 27 commits into from
Aug 7, 2023
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
0f02403
add raptor to API docs
andre-merzky May 3, 2023
14b3eea
merge from devel
andre-merzky May 12, 2023
076a5a2
merge from devel
andre-merzky May 16, 2023
bf30206
merge from devel
andre-merzky May 18, 2023
7213c5a
merge from devel
andre-merzky May 19, 2023
6ffd822
merge from devel
andre-merzky May 20, 2023
932da3e
merge from devel
andre-merzky Jun 1, 2023
09de2db
merge from devel
andre-merzky Jun 6, 2023
e04bff4
merge from devel
andre-merzky Jun 12, 2023
1a5ea2b
merge from devel
andre-merzky Jun 14, 2023
9a1c60f
merge from devel
andre-merzky Jun 19, 2023
264b1fe
merge from devel
andre-merzky Jun 22, 2023
28cb698
merge from devel
andre-merzky Jul 3, 2023
b4b3f14
merge from devel
andre-merzky Jul 11, 2023
221ec0b
merge from devel
andre-merzky Jul 17, 2023
3861f96
merge from devel
andre-merzky Jul 21, 2023
5758bae
some raptor docs
andre-merzky Jul 25, 2023
6a61ac9
Merge branch 'docs/raptor' of github.com:radical-cybertools/radical.p…
andre-merzky Jul 25, 2023
fc7d2c6
merge from devel
andre-merzky Jul 25, 2023
ebf0a0a
doc strings
andre-merzky Jul 25, 2023
729fdfb
merge from devel
andre-merzky Jul 25, 2023
8370da5
merge from devel
andre-merzky Aug 1, 2023
ab0a3bb
more raptor API docs
andre-merzky Aug 1, 2023
32f6583
Merge branch 'docs/raptor' of github.com:radical-cybertools/radical.p…
andre-merzky Aug 1, 2023
7f2ba77
Merge branch 'devel' into docs/raptor
mturilli Aug 4, 2023
3a2bea9
Update docs/source/apidoc.rst
andre-merzky Aug 4, 2023
effc8ff
Update docs/source/apidoc.rst
andre-merzky Aug 7, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions docs/source/apidoc.rst
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,20 @@ Task
.. autoclass:: radical.pilot.Task
:members:

Raptor
------
.. autoclass:: radical.pilot.raptor.Master
:members:
.. autoclass:: radical.pilot.raptor.Worker
:members:

andre-merzky marked this conversation as resolved.
Show resolved Hide resolved
A `radical.pilot.Task` managing a `radical.pilot.raptor.Master` instance is created using
:py:attr:`radical.pilot.TaskDescription.mode`
``rp.RAPTOR_MASTER``, or through :py:func:`~radical.pilot.Pilot.submit_raptors()`.
The object returned to the client is a `Task` subclass with additional features.

.. autoclass:: radical.pilot.raptor_tasks.Raptor
:members:
andre-merzky marked this conversation as resolved.
Show resolved Hide resolved
Utilities and helpers
=====================
.. automodule:: radical.pilot.utils.component
Expand Down
133 changes: 123 additions & 10 deletions src/radical/pilot/raptor/master.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,23 @@
# ------------------------------------------------------------------------------
#
class Master(rpu.Component):

'''
Raptor Master class

The `rp.raptor.Master` instantiates and orchestrates a set of workers which
are used to rapidly and efficiently execute function tasks. As such the
raptor master acts as an RP executor: it hooks into the RP agent
communication channels to receive tasks from the RP agent scheduler in order
to execute them. Once completed tasks are pushed toward the agent output
staging component and will then continue their life cycle as all other
tasks.
'''

# flags for worker readiness. These flags have somewhat different meaning
# than the worker's task state: the worker reaching `AGENT_EXECUTING` is
# necessary, but the worker also needs to perform some setup steps and needs
# to hook into the agent's communication channels - only then is the worker
# considered `ACTIVE` and ready to receive tasks.
NEW = 'NEW'
ACTIVE = 'ACTIVE'
DONE = 'DONE'
Expand All @@ -31,6 +47,17 @@ class Master(rpu.Component):
# --------------------------------------------------------------------------
#
def __init__(self, cfg=None):
'''
This raptor master is expected to be hosted in a main thread of a RP
task instance. As such the normal `RP_*` environment variables are
expected to be available.

This c'tor will create communication channels which are later used by
workers to communicate with this master instance.

Args:
cfg: session config. fallback: agent config
'''

self._uid = os.environ['RP_TASK_ID']
self._pid = os.environ['RP_PILOT_ID']
Expand Down Expand Up @@ -154,9 +181,13 @@ def __init__(self, cfg=None):

# --------------------------------------------------------------------------
#
def register_rpc_handler(self, cmd, handler):
def register_rpc_handler(self, cmd, handler) -> None:
'''
register a handler to be invoked on 'cmd' type rpc calls.

Args:
cmd (str): name of the registered rpc call
handler (callable): the method which implements the rpc call
'''
self._rpc_handlers[cmd] = handler

Expand All @@ -166,6 +197,9 @@ def register_rpc_handler(self, cmd, handler):
def _get_config(self, cfg=None):
'''
derive a worker base configuration from the control pubsub configuration

Args:
cfg (Dict[str, Any]): configuration to start from
'''

# FIXME: use registry for comm EP info exchange, not cfg files
Expand Down Expand Up @@ -197,12 +231,19 @@ def _get_config(self, cfg=None):
#
@property
def workers(self):
'''
task dictionaries representing all currently registered workers
'''
return self._workers


# --------------------------------------------------------------------------
#
def _control_cb(self, topic, msg):
'''
listen for `worker_register`, `worker_unregister`,
`worker_rank_heartbeat` and `rpc_req` messages.
'''

cmd = msg['cmd']
arg = msg['arg']
Expand Down Expand Up @@ -298,6 +339,10 @@ def _control_cb(self, topic, msg):
# --------------------------------------------------------------------------
#
def _state_cb(self, topic, msg):
'''
listen for state updates for tasks executed by raptor workers, but also
check for state updates originating directly from our workers.
'''

cmd = msg['cmd']
arg = msg['arg']
Expand Down Expand Up @@ -343,6 +388,11 @@ def worker_state_cb(self, worker_dict, state):
'''
This callback can be overloaded - it will be invoked whenever the master
receives a state update for a worker it is connected to.

args:
worker_dict (Dict[str, Any]): a task dictionary representing the
worker whose state was updated
state (str): new state of the worker
'''

pass
Expand All @@ -367,6 +417,12 @@ def submit_workers(self, descriptions: List[TaskDescription]
Note that only one worker rank (presumably rank 0) should register with
the master - the workers are expected to synchronize their ranks as
needed.

Args:
descriptions (List[TaskDescription]): a list of worker descriptions

Returns:
List[str]: list of uids for submitted worker tasks
'''

# FIXME registry: use registry instead of config files
Expand Down Expand Up @@ -448,7 +504,13 @@ def wait_workers(self, count=None, uids=None):
'''
Wait for `n` workers, *or* for workers with given UID, *or* for all
workers to become available, then return. A worker is considered
`available` when it registered with this master.
`available` when it registered with this master and get's its status
flag set to `ACTIVE`.

Args:

count (int): number of workers to wait for
uids (List[str]): set of worker UIDs to wait for
'''

if not uids and not count:
Expand Down Expand Up @@ -496,6 +558,9 @@ def wait_workers(self, count=None, uids=None):
# --------------------------------------------------------------------------
#
def start(self):
'''
start the main work thread of this master
'''

self._thread = mt.Thread(target=self._run)
self._thread.daemon = True
Expand All @@ -505,6 +570,9 @@ def start(self):
# --------------------------------------------------------------------------
#
def stop(self):
'''
stop the main work thread of this master
'''

self._log.debug('set term from stop: %s', ru.get_stacktrace())
self._term.set()
Expand All @@ -517,6 +585,9 @@ def stop(self):
# --------------------------------------------------------------------------
#
def alive(self):
'''
check if the main work thread of this master is running
'''

if not self._thread or self._term.is_set():
return False
Expand All @@ -526,6 +597,9 @@ def alive(self):
# --------------------------------------------------------------------------
#
def join(self):
'''
wait until the main work thread of this master completes
'''

if self._thread:
self._thread.join()
Expand All @@ -534,6 +608,9 @@ def join(self):
# --------------------------------------------------------------------------
#
def _run(self):
'''
main work threda of this master
'''

# wait for the submitted requests to complete
while True:
Expand Down Expand Up @@ -608,13 +685,16 @@ def _run_task(self, td):

# --------------------------------------------------------------------------
#
def submit_tasks(self, tasks):
def submit_tasks(self, tasks) -> None:
'''
submit a list of tasks to the task queue
We expect to get either `TaskDescription` instances which will then get
converted into task dictionaries and pushed out, or we get task
dictionaries which are used as is. Either way, `self.request_cb` will
be called for all tasks submitted here.

Args:
tasks (List[TaskDescription]): description of tasks to be submitted
'''

normalized = list()
Expand All @@ -632,7 +712,7 @@ def submit_tasks(self, tasks):

# --------------------------------------------------------------------------
#
def _submit_tasks(self, tasks):
def _submit_tasks(self, tasks) -> None:
'''
This is the internal implementation of `self.submit_tasks` which
performs the actual submission after the tasks passed through the
Expand Down Expand Up @@ -670,7 +750,7 @@ def _submit_tasks(self, tasks):

# --------------------------------------------------------------------------
#
def _submit_raptor_tasks(self, tasks):
def _submit_raptor_tasks(self, tasks) -> None:

if tasks:

Expand All @@ -682,7 +762,7 @@ def _submit_raptor_tasks(self, tasks):

# --------------------------------------------------------------------------
#
def _submit_executable_tasks(self, tasks):
def _submit_executable_tasks(self, tasks) -> None:
'''
Submit tasks per given task description to the agent this
master is running in.
Expand Down Expand Up @@ -719,7 +799,11 @@ def _submit_executable_tasks(self, tasks):

# --------------------------------------------------------------------------
#
def _request_cb(self, tasks):
def _request_cb(self, tasks) -> None:
'''
This cb will be called for all tasks which are under control of this
raptor master, upon receival by the master.
'''

tasks = ru.as_list(tasks)

Expand All @@ -745,6 +829,23 @@ def _request_cb(self, tasks):
# --------------------------------------------------------------------------
#
def request_cb(self, tasks):
'''
A raptor master implementation can overload this cb to filter all newly
submitted tasks: it recieves a list of tasks and returns a potentially
different list of tasks which are then executed. It is up to the master
implementation to ensure proper state transition for any tasks which
are passed as argument but are not returned by the call and thus are not
submitted for execution.

Args:
tasks ([List[Dict[str, ANY]]): list of tasks which this master
received for execution

Returns:
tasks ([List[Dict[str, ANY]]): possibly different list of tasks than
received

'''

# FIXME: document task format
return tasks
Expand All @@ -753,6 +854,10 @@ def request_cb(self, tasks):
# --------------------------------------------------------------------------
#
def _result_cb(self, tasks):
'''
As pendant to the `_request_cb`, the `_result_cb` is invoked when raptor
tasks complete execution.
'''

tasks = ru.as_list(tasks)

Expand Down Expand Up @@ -789,17 +894,25 @@ def _result_cb(self, tasks):
# --------------------------------------------------------------------------
#
def result_cb(self, tasks):
'''
A raptor master implementation can overload this cb which get's called
when raptor tasks complete execution.

# FIXME: document task format
Args:
tasks ([List[Dict[str, ANY]]): list of tasks which this master
executed

'''

# FIXME: document task format
pass


# --------------------------------------------------------------------------
#
def terminate(self):
'''
terminate all workers
terminate all workers and the master's own work loop.
'''

# unregister input queue
Expand Down
Loading