forked from scikit-bio/scikit-bio
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathworkflow.py
557 lines (443 loc) · 18.4 KB
/
workflow.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
r"""
Constructing workflows (:mod:`skbio.workflow`)
==============================================
.. currentmodule:: skbio.workflow
Construct arbitrarily complex workflows in which the specific methods run are
determined at runtime. This module supports short circuiting a workflow if an
item fails, supports ordering methods, callbacks for processed items, and
deciding what methods are executed based on state or runtime options.
Classes
-------
.. autosummary::
:toctree: generated/
Workflow
Decorators
----------
.. autosummary::
:toctree: generated/
requires
method
Examples
--------
>>> from skbio.workflow import Workflow
As an example of the ``Workflow`` object, let's construct a sequence processor
that will filter sequences that are < 10 nucleotides, reverse the sequence
if the runtime options indicate to, and truncate if a specific nucleotide
pattern is observed. The ``Workflow`` object will only short circuit, and
evaluate requirements on methods decorated by ``method``. Developers are free
to define as many methods as they'd like within the object definition, and
which can be called from workflow methods, but they will not be subjected
directly to workflow checks.
>>> nuc_pattern = 'AATTG'
>>> has_nuc_pattern = lambda s: s[:len(nuc_pattern)] == nuc_pattern
>>> class SequenceProcessor(Workflow):
... def initialize_state(self, item):
... # Setup the state for a new item (e.g., a new sequence)
... self.state = item
... @method(priority=100)
... def check_length(self):
... # Always make sure the sequence is at least 10 nucleotides
... if len(self.state) < 10:
... self.failed = True
... @method(priority=90)
... @requires(state=has_nuc_pattern)
... def truncate(self):
... # Truncate if a specific starting nucleotide pattern is observed
... self.state = self.state[len(nuc_pattern):]
... @method(priority=80)
... @requires(option='reverse', values=True)
... def reverse(self):
... # Reverse the sequence if indicatd at runtime
... self.state = self.state[::-1]
An instance of a ``Workflow`` must be passed a ``state`` object and any runtime
options. There are a few other useful parameters that can be specfied but are
out of scope for the purposes of this example. We also do not need to provide
a state object as our ``initialize_state`` method overrides ``self.state``.
Now, let's create the instance.
>>> wf = SequenceProcessor(state=None, options={'reverse=': False})
To run items through the ``SequenceProcessor``, we need to pass in an
iterable. So, lets create a ``list`` of sequences.
>>> seqs = ['AAAAAAATTTTTTT', 'ATAGACC', 'AATTGCCGGAC', 'ATATGAACAAA']
Before we run these sequences through, we're going to also define callbacks
that are applied to the result of an single pass through the ``Workflow``.
Callbacks are optional -- by default, a success will simply yield the state
member variable while failures are ignored -- but, depending on your workflow,
it can be useful to handle failures or potentially do something fun and
exciting on success.
>>> def success_f(obj):
... return "SUCCESS: %s" % obj.state
>>> def fail_f(obj):
... return "FAIL: %s" % obj.state
Now, lets process some data!
>>> for result in wf(seqs, success_callback=success_f, fail_callback=fail_f):
... print(result)
SUCCESS: AAAAAAATTTTTTT
FAIL: ATAGACC
SUCCESS: CCGGAC
SUCCESS: ATATGAACAAA
A few things of note just happened. First off, none of the sequences were
reversed as the ``SequenceProcessor`` did not have option "reverse"
set to ``True``. Second, you'll notice that the 3rd sequence was truncated,
which is expected as it matched our nucleotide pattern of interest. Finally,
of the sequences we processed, only a single sequence failed.
To assist in constructing workflows, debug information is available but it
must be turned on at instantiation. Let's do that, and while we're at it, let's
go ahead and enable the reversal method. This time through though, were going
to walk through an item at a time so we can examine the debug information.
>>> wf = SequenceProcessor(state=None, options={'reverse':True}, debug=True)
>>> gen = wf(seqs, fail_callback=lambda x: x.state)
>>> next(gen)
'TTTTTTTAAAAAAA'
>>> wf.failed
False
>>> sorted(wf.debug_trace)
[('check_length', 0), ('reverse', 2)]
The ``debug_trace`` specifies the methods executed, and the order of their
execution where closer to zero indicates earlier in the execution order. Gaps
indicate there was a method evaluated but not executed. Each of the items in
the ``debug_trace`` is a key into a few other ``dict`` of debug information
which we'll discuss in a moment. Did you see that the sequence was reversed
this time through the workflow?
Now, let's take a look at the next item, which on our prior run through the
workflow was a failed item.
>>> next(gen)
'ATAGACC'
>>> wf.failed
True
>>> sorted(wf.debug_trace)
[('check_length', 0)]
What we can see is that the failed sequence only executed the check_length
method. Since the sequence didn't pass our length filter of 10 nucleotides,
it was marked as failed within the ``check_length`` method. As a result, none
of the other methods were evaluated (note: this short circuiting behavior can
be disabled if desired).
This third item previously matched our nucleotide pattern of interest for
truncation. Let's see what that looks like in the debug output.
>>> next(gen)
'CAGGCC'
>>> wf.failed
False
>>> sorted(wf.debug_trace)
[('check_length', 0), ('reverse', 2), ('truncate', 1)]
In this last example, we can see that the ``truncate`` method was executed
prior to the ``reverse`` method and following the ``check_length`` method. This
is as anticipated given the priorities we specified for these methods. Since
the ``truncate`` method is doing something interesting, let's take a closer
look at how the ``state`` is changing. First, we're going to dump out the
state of the workflow prior to the call to ``truncate`` and then we're going
to dump out the ``state`` following the call to ``truncate``, which will allow
us to rapidly what is going on.
>>> wf.debug_pre_state[('truncate', 1)]
'AATTGCCGGAC'
>>> wf.debug_post_state[('truncate', 1)]
'CCGGAC'
As we expect, we have our original sequence going into ``truncate``, and
following the application of ``truncate``, our sequence is missing our
nucleotide pattern of interest. Awesome, right?
There is one final piece of debug output, ``wf.debug_runtime``, which can
be useful when diagnosing the amount of time required for individual methods
on a particular piece of state (as opposed to the aggregate as provided by
cProfile).
Three final components of the workflow that are quite handy are objects that
allow you to indicate ``anything`` as an option value, anything that is
``not_none``, and a mechanism to define a range of valid values.
>>> from skbio.workflow import not_none, anything
>>> class Ex(Workflow):
... @method()
... @requires(option='foo', values=not_none)
... def do_something(self):
... pass
... @method()
... @requires(option='bar', values=anything)
... def do_something_else(self):
... pass
... @method()
... @requires(option='foobar', values=[1,2,3])
... def do_something_awesome(self):
... pass
...
"""
# ----------------------------------------------------------------------------
# Copyright (c) 2013--, scikit-bio development team.
#
# Distributed under the terms of the Modified BSD License.
#
# The full license is in the file COPYING.txt, distributed with this software.
# ----------------------------------------------------------------------------
import sys
from copy import deepcopy
from time import time
from functools import update_wrapper
from collections import Iterable
from types import MethodType
from skbio.util._decorator import experimental
class NotExecuted:
"""Helper object to track if a method was executed"""
@experimental(as_of="0.4.0")
def __init__(self):
self.msg = None
@experimental(as_of="0.4.0")
def __call__(self, msg):
self.msg = msg
return self
_not_executed = NotExecuted()
class Exists:
"""Stub object to assist with ``requires`` when a value exists"""
@experimental(as_of="0.4.0")
def __contains__(self, item):
return True
anything = Exists() # external, for when a value can be anything
class NotNone:
@experimental(as_of="0.4.0")
def __contains__(self, item):
if item is None:
return False
else:
return True
not_none = NotNone()
class Workflow:
"""Arbitrary workflow support structure
Methods that are considered to be directly part of the workflow must
be decorated with ``method``. The workflow methods offer a mechanism to
logically group functionality together, and are free to make subsequent
calls to other methods.
All methods of a subclass of Workflow (those with and without the
``method`` decoration) can take advantage of the ``requires`` decorator
to specify any option or state requirements for the decorated function.
Parameters
----------
state : object
State can be anything or nothing. This is dependent on the
workflow as in some cases, it is useful to preallocate state
while in other workflows state may be ignored.
short_circuit : bool
if True, enables ignoring function methods when a given item
has failed
debug : bool
Enable debug mode
options : dict
runtime options, {'option':values}, that the ``requires``
decorator can interrogate.
kwargs : dict
Additional arguments will be added as member variables to self.
This is handy if additional contextual information is needed by a
workflow method (e.g., a lookup table).
"""
@experimental(as_of="0.4.0")
def __init__(self, state, short_circuit=True, debug=False, options=None,
**kwargs):
r"""Build thy workflow of self"""
if options is None:
self.options = {}
else:
self.options = options
self.short_circuit = short_circuit
self.failed = False
self.debug = debug
self.state = state
self.iter_ = None
for k, v in kwargs.items():
if hasattr(self, k):
raise AttributeError("'%s' already exists in self." % k)
setattr(self, k, v)
if self.debug:
self._setup_debug()
@experimental(as_of="0.4.0")
def initialize_state(self, item):
"""Initialize state
This method is called first prior to any other defined workflow method
with the exception of _setup_debug_trace if self.debug is True
Parameters
----------
item : anything
Workflow dependent
"""
raise NotImplementedError("Must implement this method")
def _setup_debug(self):
"""Wrap all methods with debug trace support"""
# ignore all members of the baseclass
ignore = set(dir(Workflow))
for attrname in dir(self):
if attrname in ignore:
continue
attr = getattr(self, attrname)
if isinstance(attr, MethodType):
setattr(self, attrname, self._debug_trace_wrapper(attr))
def _all_wf_methods(self):
"""Get all workflow methods
Methods are sorted by priority
"""
methods = []
for item in dir(self):
obj = getattr(self, item)
if hasattr(obj, 'priority'):
methods.append(obj)
def key(x):
return getattr(x, 'priority')
methods_sorted = sorted(methods, key=key, reverse=True)
if self.debug:
methods_sorted.insert(0, self._setup_debug_trace)
return methods_sorted
def _setup_debug_trace(self):
"""Setup a trace
The trace is per item iterated over by the workflow. Information about
each method executed is tracked and keyed by::
(function name, order of execution)
Order of execution starts from zero. Multiple calls to the same
function are independent in the trace.
The following information is tracked::
debug_trace : set([key])
debug_runtime : {key: runtime}
debug_pre_state : {key: deepcopy(Workflow.state)}, state prior to
method execution
debug_post_state : {key: deepcopy(Workflow.state)}, state following
method execution
"""
self.debug_counter = 0
self.debug_trace = set()
self.debug_runtime = {}
self.debug_pre_state = {}
self.debug_post_state = {}
@experimental(as_of="0.4.0")
def __call__(self, iter_, success_callback=None, fail_callback=None):
"""Operate on all the data
This is the processing engine of the workflow. Callbacks are executed
following applying all workflow methods to an item from ``iter_``
(unless ``short_cicruit=True`` in which case method execution for an
item is stopped if ``failed=True``). Callbacks are provided ``self``
which allows them to examine any aspect of the workflow.
Parameters
----------
it : an iterator
success_callback : method to call on a successful item prior to
yielding. By default, ``self.state`` is yielded.
fail_callback : method to call on a failed item prior to yielding. By
default, failures are ignored.
"""
if success_callback is None:
def success_callback(x):
return x.state
self.iter_ = iter_
workflow = self._all_wf_methods()
for item in self.iter_:
self.failed = False
self.initialize_state(item)
for func in workflow:
if self.short_circuit and self.failed:
break
else:
func()
if self.failed:
if fail_callback is not None:
yield fail_callback(self)
else:
yield success_callback(self)
self.iter_ = None
def _debug_trace_wrapper(self, func):
"""Trace a function call"""
def wrapped():
"""Track debug information about a method execution"""
if not hasattr(self, 'debug_trace'):
raise AttributeError(
"%s doesn't have debug_trace." % self.__class__)
exec_order = self.debug_counter
name = func.__name__
key = (name, exec_order)
pre_state = deepcopy(self.state)
self.debug_trace.add(key)
self.debug_counter += 1
start_time = time()
if func() is _not_executed:
self.debug_trace.remove(key)
else:
self.debug_runtime[key] = time() - start_time
self.debug_pre_state[key] = pre_state
self.debug_post_state[key] = deepcopy(self.state)
return update_wrapper(wrapped, func)
class method:
"""Decorate a function to indicate it is a workflow method
Parameters
----------
priority : int
Specify a priority for the method, the higher the value the higher
the priority. Priorities are relative to a given workflow
"""
highest_priority = sys.maxsize
@experimental(as_of="0.4.0")
def __init__(self, priority=0):
self.priority = priority
@experimental(as_of="0.4.0")
def __call__(self, func):
func.priority = self.priority
return func
class requires:
"""Decorator that executes a function if requirements are met
Parameters
----------
option : any Hashable object
An option that is required for the decorated method to execute.
This option will be looked up within the containing ``Workflow``s'
``options``.
values : object
A required value. This defaults to ``anything`` indicating that
the only requirement is that the ``option`` exists. It can be
useful to specify ``not_none`` which indicates that the
requirement is satisfied if the ``option`` exists and it holds
a value that is not ``None``. Values also supports iterables
or singular values.
state : Function
A requirement on workflow state. This must be a function that
accepts a single argument, and returns ``True`` to indicate
the requirement is satisfied, or ``False`` to indicate the
requirement is not satisfied. This method will be passed the
containing ``Workflow``s' ``state`` member variable.
"""
@experimental(as_of="0.4.0")
def __init__(self, option=None, values=anything, state=None):
# self here is the requires object
self.option = option
self.required_state = state
if values is anything:
self.values = anything
elif values is not_none:
self.values = not_none
elif isinstance(values, set):
self.values = values
else:
if isinstance(values, str):
self.values = values
elif isinstance(values, Iterable):
self.values = set(values)
else:
self.values = set([values])
@experimental(as_of="0.4.0")
def __call__(self, func):
"""Wrap a function
func : the function to wrap
"""
def decorated(dec_self):
"""A decorated function that has requirements
dec_self : this is "self" for the decorated function
"""
if self.required_state is not None:
if not self.required_state(dec_self.state):
return _not_executed
s_opt = self.option
ds_opts = dec_self.options
# if this is a function that doesn't have an option to validate
if s_opt is None:
func(dec_self)
# if the option exists in the Workflow
elif s_opt in ds_opts:
val = ds_opts[s_opt]
# if the value just needs to be not None
if self.values is not_none and val is not None:
func(dec_self)
# otherwise make sure the value is acceptable
elif val in self.values:
func(dec_self)
else:
return _not_executed
else:
return _not_executed
return update_wrapper(decorated, func)