Skip to content

Commit 7bd4e73

Browse files
committed
Example to execute python code as work queue tasks
1 parent ddb2a9a commit 7bd4e73

1 file changed

Lines changed: 107 additions & 0 deletions

File tree

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
################################################################################
2+
#
3+
# install work queue cctools and dependencies:
4+
#
5+
# conda create --yes --name work_queue_environment python=3.8 dill
6+
# conda install --yes --name work_queue_environment --channel conda-forge ndcctools
7+
# conda activate work_queue_environment
8+
# python python-fn-in-wq.py
9+
#
10+
11+
import dill
12+
import logging
13+
import os
14+
from os.path import basename
15+
import sys
16+
import tempfile
17+
18+
import work_queue as wq
19+
20+
logger = logging.getLogger()
21+
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s:%(levelname)s:%(message)s')
22+
23+
24+
def create_work_queue_task(task_counter, tmpdir, function, input_args, fn_wrapper='exec_python_fn.py'):
25+
26+
logger.debug("creating task {}: {}({})".format(task_counter, function.__name__, ','.join(str(arg) for arg in input_args)))
27+
28+
args_file = os.path.join(tmpdir, "input_args_{}.p".format(task_counter))
29+
fn_file = os.path.join(tmpdir, "function_{}.p".format(task_counter))
30+
out_file = os.path.join(tmpdir, "out_{}.p".format(task_counter))
31+
32+
# Save args to a dilled file.
33+
with open(args_file, "wb") as wf:
34+
dill.dump(input_args, wf)
35+
36+
# Save the function to a dilled file.
37+
with open(fn_file, "wb") as wf:
38+
dill.dump(function, wf)
39+
40+
# Base command just invokes python on the function and data.
41+
command = "./{wrapper} {fn} {args} {out}".format(
42+
wrapper=basename(fn_wrapper),
43+
fn=basename(fn_file),
44+
args=basename(args_file),
45+
out=basename(out_file))
46+
47+
task = wq.Task(command)
48+
task.specify_tag(str(task_counter))
49+
50+
task.specify_input_file(fn_wrapper, cache=True)
51+
task.specify_input_file(fn_file, cache=False)
52+
task.specify_input_file(args_file, cache=False)
53+
task.specify_output_file(out_file, cache=False)
54+
55+
return task
56+
57+
58+
def report_task_result(tmpdir, t):
59+
if t.result == wq.WORK_QUEUE_RESULT_SUCCESS:
60+
if t.return_status != 0:
61+
log.warning("task {} had non-zero exit code: {}".format(t.tag, t.return_status))
62+
try:
63+
with open(os.path.join(tmpdir, "out_{}.p".format(t.tag)), 'rb') as f:
64+
fn_result = dill.load(f)
65+
except Exception as e:
66+
fn_result = e
67+
else:
68+
logger.warning("no result for task {}. error code {} {}".format(t.tag, t.result, t.result_str))
69+
fn_result = NoResult()
70+
71+
if isinstance(fn_result, Exception):
72+
print("task {} with exception: {}".format(t.tag, repr(fn_result)))
73+
else:
74+
print("task {} with output: {}".format(t.tag, fn_result))
75+
76+
def application_function(x, y):
77+
return y/x
78+
79+
80+
class NoResult(Exception):
81+
def __repr__(self):
82+
return 'NoResult()'
83+
def __str__(self):
84+
return 'NoResult'
85+
86+
if __name__ == '__main__':
87+
#with tempfile.TemporaryDirectory() as tmpdir:
88+
tmpdir='here'
89+
q = wq.WorkQueue(port=9123, debug_log='debug.log', transactions_log='tr.log')
90+
91+
task_counter = 0
92+
total_tasks = 5
93+
94+
for i in range(total_tasks):
95+
task = create_work_queue_task(
96+
task_counter,
97+
tmpdir,
98+
application_function,
99+
input_args=[i, 100]) # all tasks compute application_function(i, 100) which is 100/i
100+
q.submit(task)
101+
task_counter += 1
102+
103+
while not q.empty():
104+
t = q.wait(5)
105+
if t:
106+
report_task_result(tmpdir, t)
107+

0 commit comments

Comments
 (0)