Skip to content

Commit d820d9b

Browse files
committed
v0.2.1; bugfixes and features.
1 parent 0101c8d commit d820d9b

File tree

17 files changed

+817
-564
lines changed

17 files changed

+817
-564
lines changed

CHANGELOG.md

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,24 @@ All notable changes to this project will be documented in this file.
44

55
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/) and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).
66

7+
## v0.2.1
8+
9+
### Added
10+
11+
- `clear_waiting()` function to clear the waiting queue.
12+
- Added `stop_on_lane_error` parameter to `ezpq.Job` to allow for short-circuiting a synchronous lane if a job in the lane fails. When set to `True` and the preceding job has a non-zero exit code, this job will not be run. Note that this is to be set per-job for flexibility.
13+
- Additional unit tests.
14+
15+
### Changed
16+
17+
- `stop_all()` function now clears the waiting queue and terminate running jobs. This addresses a bug where a queue would fail to close when disposing with jobs still in the waiting queue.
18+
- The default `poll` for the queue itself is still `0.1`. Now, the default `poll` for `get` and `wait` is equal to the `poll` for the queue itself, as it makes no sense to check for changes more freqeuntly than changes could arise.
19+
20+
### Removed
21+
22+
- Removed functions `has_waiting`, `has_work`, and `has_completed`. Use `size(...)` for this.
23+
- Renamed `Queue.is_started` to `Queue.is_running`.
24+
725
## v0.2.0
826

927
### Added

README.Rmd

Lines changed: 37 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ def random_sleep(x):
9090
return n
9191
```
9292

93-
```{python, echo=TRUE}
93+
```{python, echo=TRUE, eval=FALSE}
9494
start = time.time()
9595
9696
output = [random_sleep(x) for x in range(60)]
@@ -100,9 +100,17 @@ end = time.time()
100100
print('> Runtime: ' + str(end - start))
101101
```
102102

103+
```
104+
## '> Runtime: 58.932034969329834'
105+
```
106+
103107
Here is the function ran in parallel with an `ezpq` Queue of 6 workers. Thus, the runtime of the above operation will be reduced from ~60s to ~10s.
104108

105109
```{python, eval=FALSE, echo=TRUE}
110+
import time
111+
import random
112+
import ezpq
113+
106114
start = time.time()
107115
108116
with ezpq.Queue(6) as Q:
@@ -244,7 +252,7 @@ with ezpq.Queue(6) as Q:
244252
Q.put(random_sleep, x)
245253
246254
# repeatedly print sizes until complete.
247-
while Q.has_work():
255+
while Q.size(waiting=True, working=True):
248256
print_sizes(Q)
249257
time.sleep(1)
250258
@@ -253,7 +261,7 @@ with ezpq.Queue(6) as Q:
253261

254262
### wait
255263

256-
The `wait()` method will block execution until all jobs complete. It also accepts a `timeout` parameter, given in seconds. The return value is the count of jobs that did not complete. Thus, a return value greater than 0 indicates the timeout was exceeded. The parameter `poll` can be used to adjust how frequently (in seconds) the operation checks for completed jobs (default=0.1).
264+
The `wait()` method will block execution until all jobs complete. It also accepts a `timeout` parameter, given in seconds. The return value is the count of jobs that did not complete. Thus, a return value greater than 0 indicates the timeout was exceeded. The parameter `poll` can be used to adjust how frequently (in seconds) the operation checks for completed jobs.
257265

258266
New in v0.2.0, include `show_progress=True` to show a progress bar while waiting. This is equivalent to a call to `waitpb()`.
259267

@@ -262,7 +270,7 @@ New in v0.2.0, include `show_progress=True` to show a progress bar while waiting
262270

263271
### get
264272

265-
`get()` retrieves and deletes ("pop") the highest priority job from the completed queue, if one is available. If the completed queue is empty, `get()` returns `None`. However, `get()` will wait for a completed job if the `poll` frequency is greater than 0. If the timeout is exceeded, `None` is returned.
273+
`get()` retrieves and deletes ("pop") the highest priority job from the completed queue, if one is available. If the completed queue is empty, `get()` returns `None`. However, `get()` will wait for a completed job if `wait`, `poll`, or `timeout` are specified. If the timeout is exceeded, `None` is returned.
266274

267275
```{python, echo=TRUE}
268276
with ezpq.Queue(6) as Q:
@@ -274,7 +282,7 @@ with ezpq.Queue(6) as Q:
274282
275283
# repeatedly `get()` until queue is empty.
276284
for i in range(n_inputs):
277-
output[i] = Q.get(poll=0.1)
285+
output[i] = Q.get(wait=True)
278286
```
279287

280288
### collect
@@ -326,6 +334,30 @@ When you have jobs that are dependent upon another, you can use "lanes" to execu
326334

327335
In the above graphic, notice how same-colored bars never overlap. These bars represent jobs that are in the same lane, which executed synchronously.
328336

337+
### Lane Error Handling
338+
339+
You may want to short-circuit a synchronous lane if a job in the lane fails. You can do this by specifying `skip_on_lane_error=True` when putting a job in the queue. If specified and the preceding job has a non-zero exit code, this job will not be run.
340+
341+
```{python, echo=TRUE}
342+
def reciprocal(x):
343+
time.sleep(0.1) # slow things down
344+
return 1/x # will throw DivideByZero exception
345+
```
346+
347+
```{python, echo=TRUE}
348+
import random
349+
with ezpq.Queue(6) as Q:
350+
for i in range(100):
351+
Q.put(reciprocal, random.randint(0, 10), lane=i%5, suppress_errors=True, stop_on_lane_error=True)
352+
Q.wait()
353+
output = Q.collect()
354+
355+
plt = ezpq.Plot(output).build(facet_by='lane', color_by='exitcode', color_pal=['red', 'blue'])
356+
plt.save('docs/imgs/lane_error.png')
357+
```
358+
359+
![](docs/imgs/lane_error.png)
360+
329361
## ezpq.Plot
330362

331363
The `Plot` class is used to visualize the wait, start, and end times for each job that entered the queueing system. The class is initialized with a list of dicts; exactly what is returned from a call to `collect()` or `map()`.

README.md

Lines changed: 52 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
# `ezpq`: an easy parallel queueing system.
22

3-
Read this on [GitHub](https://github.com/dm3ll3n/ezpq) or [my site](https://www.donaldmellenbruch.com/project/ezpq/).
4-
53
* [`ezpq`: an easy parallel queueing system.](#ezpq-an-easy-parallel-queueing-system)
64
* [Overview](#overview)
75
* [Features](#features)
@@ -17,9 +15,13 @@ Read this on [GitHub](https://github.com/dm3ll3n/ezpq) or [my site](https://www.
1715
* [map](#map)
1816
* [dispose](#dispose)
1917
* [Synchronous Lanes](#synchronous-lanes)
18+
* [Lane Error Handling](#lane-error-handling)
2019
* [ezpq.Plot](#ezpqplot)
2120
* [More Examples](#more-examples)
2221

22+
Read this on [GitHub](https://github.com/dm3ll3n/ezpq) or [my
23+
site](https://www.donaldmellenbruch.com/project/ezpq/).
24+
2325
## Overview
2426

2527
`ezpq` implements a parallel queueing system consisting of:
@@ -97,6 +99,9 @@ Thus, the runtime of the above operation will be reduced from ~60s to
9799
~10s.
98100

99101
``` python
102+
import time
103+
import random
104+
import ezpq
100105
start = time.time()
101106
with ezpq.Queue(6) as Q:
102107
output = Q.map(random_sleep, range(60))
@@ -138,7 +143,7 @@ print( output[0] )
138143
## {'args': [0],
139144
## 'callback': None,
140145
## 'cancelled': False,
141-
## 'ended': datetime.datetime(2019, 1, 28, 17, 45, 29, 943860),
146+
## 'ended': datetime.datetime(2019, 2, 18, 20, 21, 0, 902915),
142147
## 'exception': None,
143148
## 'exitcode': 0,
144149
## 'function': 'random_sleep',
@@ -148,11 +153,11 @@ print( output[0] )
148153
## 'name': 1,
149154
## 'output': 1.3444218515250481,
150155
## 'priority': 100,
151-
## 'processed': datetime.datetime(2019, 1, 28, 17, 45, 29, 998175),
152-
## 'qid': 'd6eaaf93',
153-
## 'runtime': 1.3502492904663086,
154-
## 'started': datetime.datetime(2019, 1, 28, 17, 45, 28, 593611),
155-
## 'submitted': datetime.datetime(2019, 1, 28, 17, 45, 28, 489300),
156+
## 'processed': datetime.datetime(2019, 2, 18, 20, 21, 0, 955396),
157+
## 'qid': 'f4717edb',
158+
## 'runtime': 1.3515939712524414,
159+
## 'started': datetime.datetime(2019, 2, 18, 20, 20, 59, 551321),
160+
## 'submitted': datetime.datetime(2019, 2, 18, 20, 20, 59, 446199),
156161
## 'timeout': 0}
157162

158163
Easily convert output to a `pandas` dataframe:
@@ -164,11 +169,11 @@ print( df.head()[['id', 'output', 'runtime', 'exitcode']] )
164169
```
165170

166171
## id output runtime exitcode
167-
## 0 1 1.344422 1.350249 0
168-
## 1 2 0.634364 0.638975 0
169-
## 2 3 1.456034 1.460431 0
170-
## 3 4 0.737965 0.742028 0
171-
## 4 5 0.736048 0.740672 0
172+
## 0 1 1.344422 1.351594 0
173+
## 1 2 0.634364 0.640723 0
174+
## 2 3 1.456034 1.461620 0
175+
## 3 4 0.737965 0.743645 0
176+
## 4 5 0.736048 0.742260 0
172177

173178
Use `ezpq.Plot` to generate a Gannt chart of the job timings.
174179

@@ -236,7 +241,7 @@ queue with a call to `submit()`.
236241

237242
## Help on function __init__ in module ezpq.Job:
238243
##
239-
## __init__(self, function, args=None, kwargs=None, name=None, priority=100, lane=None, timeout=0, suppress_errors=False)
244+
## __init__(self, function, args=None, kwargs=None, name=None, priority=100, lane=None, timeout=0, suppress_errors=False, stop_on_lane_error=False)
240245
## Defines what to run within a `ezpq.Queue`, and how to run it.
241246
##
242247
## Args:
@@ -316,7 +321,7 @@ with ezpq.Queue(6) as Q:
316321
for x in range(60):
317322
Q.put(random_sleep, x)
318323
# repeatedly print sizes until complete.
319-
while Q.has_work():
324+
while Q.size(waiting=True, working=True):
320325
print_sizes(Q)
321326
time.sleep(1)
322327
print_sizes(Q)
@@ -330,9 +335,9 @@ with ezpq.Queue(6) as Q:
330335
## 'Total: 60; Waiting: 31; Working: 6; Completed: 23'
331336
## 'Total: 60; Waiting: 24; Working: 6; Completed: 30'
332337
## 'Total: 60; Waiting: 17; Working: 6; Completed: 37'
333-
## 'Total: 60; Waiting: 11; Working: 6; Completed: 43'
338+
## 'Total: 60; Waiting: 12; Working: 6; Completed: 42'
334339
## 'Total: 60; Waiting: 6; Working: 6; Completed: 48'
335-
## 'Total: 60; Waiting: 0; Working: 5; Completed: 55'
340+
## 'Total: 60; Waiting: 1; Working: 6; Completed: 53'
336341
## 'Total: 60; Waiting: 0; Working: 1; Completed: 59'
337342
## 'Total: 60; Waiting: 0; Working: 0; Completed: 60'
338343

@@ -343,7 +348,7 @@ also accepts a `timeout` parameter, given in seconds. The return value
343348
is the count of jobs that did not complete. Thus, a return value greater
344349
than 0 indicates the timeout was exceeded. The parameter `poll` can be
345350
used to adjust how frequently (in seconds) the operation checks for
346-
completed jobs (default=0.1).
351+
completed jobs.
347352

348353
New in v0.2.0, include `show_progress=True` to show a progress bar while
349354
waiting. This is equivalent to a call to `waitpb()`.
@@ -355,8 +360,8 @@ waiting. This is equivalent to a call to `waitpb()`.
355360
`get()` retrieves and deletes (“pop”) the highest priority job from the
356361
completed queue, if one is available. If the completed queue is empty,
357362
`get()` returns `None`. However, `get()` will wait for a completed job
358-
if the `poll` frequency is greater than 0. If the timeout is exceeded,
359-
`None` is returned.
363+
if `wait`, `poll`, or `timeout` are specified. If the timeout is
364+
exceeded, `None` is returned.
360365

361366
``` python
362367
with ezpq.Queue(6) as Q:
@@ -368,7 +373,7 @@ with ezpq.Queue(6) as Q:
368373

369374
# repeatedly `get()` until queue is empty.
370375
for i in range(n_inputs):
371-
output[i] = Q.get(poll=0.1)
376+
output[i] = Q.get(wait=True)
372377
```
373378

374379
### collect
@@ -427,6 +432,32 @@ In the above graphic, notice how same-colored bars never overlap. These
427432
bars represent jobs that are in the same lane, which executed
428433
synchronously.
429434

435+
### Lane Error Handling
436+
437+
You may want to short-circuit a synchronous lane if a job in the lane
438+
fails. You can do this by specifying `skip_on_lane_error=True` when
439+
putting a job in the queue. If specified and the preceding job has a
440+
non-zero exit code, this job will not be run.
441+
442+
``` python
443+
def reciprocal(x):
444+
time.sleep(0.1) # slow things down
445+
return 1/x # will throw DivideByZero exception
446+
```
447+
448+
``` python
449+
import random
450+
with ezpq.Queue(6) as Q:
451+
for i in range(100):
452+
Q.put(reciprocal, random.randint(0, 10), lane=i%5, suppress_errors=True, stop_on_lane_error=True)
453+
Q.wait()
454+
output = Q.collect()
455+
plt = ezpq.Plot(output).build(facet_by='lane', color_by='exitcode', color_pal=['red', 'blue'])
456+
plt.save('docs/imgs/lane_error.png')
457+
```
458+
459+
![](docs/imgs/lane_error.png)
460+
430461
## ezpq.Plot
431462

432463
The `Plot` class is used to visualize the wait, start, and end times for

0 commit comments

Comments
 (0)