Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

initial implementation of task priorities #3215

Merged
merged 22 commits into from
Oct 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions examples/misc/task_priorities.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#!/usr/bin/env python3

import radical.pilot as rp


def state_cb(task, state):
if state in rp.FINAL:
print('%s [%d]: %s' % (task.uid, task.description.priority, task.state))


if __name__ == '__main__':

session = rp.Session()
try:

pmgr = rp.PilotManager(session=session)
tmgr = rp.TaskManager(session=session)
pdesc = rp.PilotDescription({'resource': 'local.localhost',
'runtime' : 1024 * 1024,
'nodes' : 1})
pilot = pmgr.submit_pilots(pdesc)

tmgr.add_pilots(pilot)
tmgr.register_callback(state_cb)

n = 200
tds = list()
for i in range(n):
td = rp.TaskDescription({'executable': 'true',
'priority' : i % 3})
tds.append(td)

tasks = tmgr.submit_tasks(tds)

tmgr.wait_tasks()

finally:
session.close(download=True)


# ------------------------------------------------------------------------------

242 changes: 136 additions & 106 deletions src/radical/pilot/agent/scheduler/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,12 +232,16 @@ def initialize(self):
# waitlist to a binned list where tasks are binned by size for faster
# lookups of replacement tasks. And outdated binlist is mostly
# sufficient, only rebuild when we run dry
self._lock = mt.Lock() # lock for waitpool
self._waitpool = dict() # map uid:task
self._ts_map = dict()
self._ts_valid = False # set to False to trigger re-binning
self._active_cnt = 0 # count of currently scheduled tasks
self._named_envs = list() # record available named environments
#
# NOTE: the waitpool is really a dict of dicts where the first level key
# is the task priority.
#
self._lock = mt.Lock() # lock for waitpool
self._waitpool = defaultdict(dict) # {priority : {uid:task}}
self._ts_map = defaultdict(set) # tasks binned by tuple size
self._ts_valid = False # set to False to trigger re-binning
self._active_cnt = 0 # count of currently scheduled tasks
self._named_envs = list() # record available named environments

# the scheduler algorithms have two inputs: tasks to be scheduled, and
# slots becoming available (after tasks complete).
Expand Down Expand Up @@ -410,9 +414,12 @@ def control_cb(self, topic, msg):
to_cancel = list()
with self._lock:
for uid in uids:
if uid in self._waitpool:
to_cancel.append(self._waitpool[uid])
del self._waitpool[uid]
for priority in self._waitpool:
if uid in self._waitpool[priority]:
task = self._waitpool[priority][uid]
to_cancel.append(task)
del self._waitpool[priority][uid]
mtitov marked this conversation as resolved.
Show resolved Hide resolved
break

with self._raptor_lock:
for queue in self._raptor_tasks:
Expand Down Expand Up @@ -538,14 +545,16 @@ def _refresh_ts_map(self):
return

# we can only rebuild if we have waiting tasks
if not self._waitpool:
return
for priority in self._waitpool:

for uid, task in self._waitpool.items():
ts = task['tuple_size']
if ts not in self._ts_map:
self._ts_map[ts] = set()
self._ts_map[ts].add(uid)
if not self._waitpool[priority]:
continue

for uid, task in self._waitpool[priority].items():
ts = task['tuple_size']
if ts not in self._ts_map:
self._ts_map[ts] = set()
self._ts_map[ts].add(uid)

self._ts_valid = True

Expand Down Expand Up @@ -679,7 +688,7 @@ def _schedule_tasks(self):
while not self._term.is_set():

self._log.debug_3('schedule tasks 0: %s, w: %d', resources,
len(self._waitpool))
sum([len(pool) for pool in self._waitpool.values()]))

active = 0 # see if we do anything in this iteration

Expand Down Expand Up @@ -741,50 +750,56 @@ def _schedule_waitpool(self):
to_wait = list()
to_test = list()

for task in self._waitpool.values():
named_env = task['description'].get('named_env')
if named_env:
if named_env in self._named_envs:
to_test.append(task)
active = False # nothing happeend yet
resources = True # fresh start, all is free

for priority in sorted(self._waitpool.keys(), reverse=True):
for task in self._waitpool[priority].values():
named_env = task['description'].get('named_env')
if named_env:
if named_env in self._named_envs:
to_test.append(task)
else:
to_wait.append(task)
else:
to_wait.append(task)
else:
to_test.append(task)
to_test.append(task)

to_test.sort(key=lambda x:
x['tuple_size'][0] * x['tuple_size'][1] * x['tuple_size'][2],
reverse=True)
to_test.sort(key=lambda x:
x['tuple_size'][0] * x['tuple_size'][1] * x['tuple_size'][2],
reverse=True)

# cycle through waitpool, and see if we get anything placed now.
self._log.debug_9('before bisec: %d', len(to_test))
scheduled, unscheduled, failed = ru.lazy_bisect(to_test,
check=self._try_allocation,
on_skip=self._prof_sched_skip,
log=self._log)
self._log.debug_9('after bisec: %d : %d : %d', len(scheduled),
len(unscheduled), len(failed))
# cycle through waitpool, and see if we get anything placed now.
self._log.debug_9('before bisec: %d', len(to_test))
scheduled, unscheduled, failed = ru.lazy_bisect(to_test,
check=self._try_allocation,
on_skip=self._prof_sched_skip,
log=self._log)
self._log.debug_9('after bisec: %d : %d : %d', len(scheduled),
len(unscheduled), len(failed))

for task, error in failed:
for task, error in failed:

error = error.replace('"', '\\"')
self._fail_task(task, RuntimeError('bisect failed'), error)
self._log.error('bisect failed on %s: %s', task['uid'], error)
error = error.replace('"', '\\"')
self._fail_task(task, RuntimeError('bisect failed'), error)
self._log.error('bisect failed on %s: %s', task['uid'], error)

self._waitpool = {task['uid']: task for task in (unscheduled + to_wait)}
self._waitpool[priority] = {task['uid']: task
for task in (unscheduled + to_wait)}

# update task resources
for task in scheduled:
td = task['description']
task['resources'] = {'cpu': td['ranks'] * td['cores_per_rank'],
'gpu': td['ranks'] * td['gpus_per_rank']}
self.advance(scheduled, rps.AGENT_EXECUTING_PENDING, publish=True,
push=True)
# update task resources
for task in scheduled:
td = task['description']
task['$set'] = ['resources']
task['resources'] = {'cpu': td['ranks'] * td['cores_per_rank'],
'gpu': td['ranks'] * td['gpus_per_rank']}
self.advance(scheduled, rps.AGENT_EXECUTING_PENDING, publish=True,
push=True)

# method counts as `active` if anything was scheduled
active = bool(scheduled)
# method counts as `active` if anything was scheduled
active = active or bool(scheduled)

# if we sccheduled some tasks but not all, we ran out of resources
resources = not (bool(unscheduled) and bool(unscheduled))
# if we sccheduled some tasks but not all, we ran out of resources
resources = resources and not (bool(unscheduled))

# self.slot_status("after schedule waitpool")
return resources, active
Expand All @@ -807,7 +822,7 @@ def _fail_task(self, task, e, detail):
def _schedule_incoming(self):

# fetch all tasks from the queue
to_schedule = list() # some tasks get scheduled here
to_schedule = defaultdict(list) # some tasks get scheduled here
to_raptor = defaultdict(list) # some tasks get forwared to raptor
try:

Expand All @@ -828,6 +843,7 @@ def _schedule_incoming(self):
# check if this task is to be scheduled by sub-schedulers
# like raptor
raptor_id = td.get('raptor_id')
priority = td.get('priority', 0)
mode = td.get('mode')

# raptor workers are not scheduled by raptor itself!
Expand All @@ -836,15 +852,15 @@ def _schedule_incoming(self):
if task.get('raptor_seen'):
# raptor has handled this one - we can execute it
self._set_tuple_size(task)
to_schedule.append(task)
to_schedule[priority].append(task)

else:
to_raptor[raptor_id].append(task)

else:
# no raptor - schedule it here
self._set_tuple_size(task)
to_schedule.append(task)
to_schedule[priority].append(task)

except queue.Empty:
# no more unschedule requests
Expand Down Expand Up @@ -888,65 +904,75 @@ def _schedule_incoming(self):
# no resource change, no activity
return None, False

self.slot_status("before schedule incoming [%d]" % len(to_schedule))
n_tasks = sum([len(x) for x in to_schedule.values()])
self.slot_status("before schedule incoming [%d]" % n_tasks)

mtitov marked this conversation as resolved.
Show resolved Hide resolved
# handle largest to_schedule first
# FIXME: this needs lazy-bisect
to_wait = list()
for task in sorted(to_schedule, key=lambda x: x['tuple_size'][0],
reverse=True):

td = task['description']

# FIXME: This is a slow and inefficient way to wait for named VEs.
# The semantics should move to the upcoming eligibility
# checker
# FIXME: Note that this code is duplicated in _schedule_waitpool
named_env = task['description'].get('named_env')
if named_env:
if named_env not in self._named_envs:
to_wait.append(task)
self._log.debug('delay %s, no env %s',
task['uid'], named_env)
for priority in sorted(to_schedule.keys(), reverse=True):

tasks = to_schedule[priority]
to_wait = list()
for task in sorted(tasks, key=lambda x: x['tuple_size'][0],
reverse=True):

td = task['description']

# FIXME: This is a slow, inefficient way to wait for named VEs.
# The semantics should move to the upcoming eligibility
# checker
# FIXME: Note that this code is duplicated in _schedule_waitpool
named_env = td.get('named_env')
if named_env:
if named_env not in self._named_envs:
to_wait.append(task)
self._log.debug('delay %s, no env %s',
task['uid'], named_env)
continue

# we actually only schedule tasks which do not yet have
# a `slots` structure attached. Those which do were presumably
# scheduled earlier (by the applicaiton of some other client
# side scheduler), and we honor that decision. We though will
# mark the respective resources as being used, to avoid other
# tasks being scheduled onto the same set of resources.
if td.get('slots'):

task['slots'] = td['slots']
task['partition'] = td['partition']
task['resources'] = {'cpu': td['ranks'] * td['cores_per_rank'],
'gpu': td['ranks'] * td['gpus_per_rank']}
self.advance(task, rps.AGENT_EXECUTING_PENDING,
publish=True, push=True, fwd=True)
continue

# we actually only schedule tasks which do not yet have a `slots`
# structure attached. Those which do were presumably scheduled
# earlier (by the applicaiton of some other client side scheduler),
# and we honor that decision. We though will mark the respective
# resources as being used, to avoid other tasks being scheduled onto
# the same set of resources.
if td.get('slots'):

task['slots'] = td['slots']
task['partition'] = td['partition']
task['resources'] = {'cpu': td['ranks'] * td['cores_per_rank'],
'gpu': td['ranks'] * td['gpus_per_rank']}
self.advance(task, rps.AGENT_EXECUTING_PENDING,
publish=True, push=True, fwd=True)
continue
# either we can place the task straight away, or we have to
# put it in the wait pool.
try:
if self._try_allocation(task):
# task got scheduled - advance state, notify world about the
# state change, and push it out toward the next component.
td = task['description']
task['$set'] = ['resources']
task['resources'] = {'cpu': td['ranks'] *
td['cores_per_rank'],
'gpu': td['ranks'] *
td['gpus_per_rank']}
self.advance(task, rps.AGENT_EXECUTING_PENDING,
publish=True, push=True, fwd=True)

# either we can place the task straight away, or we have to
# put it in the wait pool.
try:
if self._try_allocation(task):
# task got scheduled - advance state, notify world about the
# state change, and push it out toward the next component.
task['resources'] = {'cpu': td['ranks'] *
td['cores_per_rank'],
'gpu': td['ranks'] *
td['gpus_per_rank']}
self.advance(task, rps.AGENT_EXECUTING_PENDING,
publish=True, push=True, fwd=True)
else:
to_wait.append(task)

else:
to_wait.append(task)
except Exception as e:
self._fail_task(task, e, '\n'.join(ru.get_exception_trace()))

except Exception as e:
self._fail_task(task, e, '\n'.join(ru.get_exception_trace()))

# all tasks which could not be scheduled are added to the waitpool
self._waitpool.update({task['uid']: task for task in to_wait})
# all tasks which could not be scheduled are re-added to the waitpool
for task in to_wait:
uid = task['uid']
self._waitpool[priority][uid] = task

# we performed some activity (worked on tasks)
active = True
Expand Down Expand Up @@ -1007,7 +1033,8 @@ def _unschedule_completed(self):
# immediately. This assumes that the `tuple_size` is good enough to
# judge the legality of the resources for the new target task.
#
# FIXME
# FIXME: use tuple size again
# FIXME: consider priorities

# ts = tuple(task['tuple_size'])
# if self._ts_map.get(ts):
Expand Down Expand Up @@ -1053,7 +1080,10 @@ def _unschedule_completed(self):

# we placed some previously waiting tasks, and need to remove those from
# the waitpool
self._waitpool = {task['uid']: task for task in self._waitpool.values()
for priority in self._waitpool:
tasks = self._waitpool[priority].values()
self._waitpool[priority] = {task['uid']: task
for task in tasks
if task['uid'] not in placed}

# we have new resources, and were active
Expand Down
Loading
Loading