Open
Description
I'd like to implement flow in which its branches intersects with each other like:
Code:
from metaflow import FlowSpec, step
class TestFlow(FlowSpec):
@step
def start(self):
self.next(self.a, self.b)
@step
def a(self):
self.next(self.a1, self.a2)
@step
def a1(self):
self.next(self.a1b1)
@step
def a2(self):
self.next(self.a2b2)
@step
def b(self):
self.next(self.b1, self.b2)
@step
def b1(self):
self.next(self.a1b1)
@step
def b2(self,):
self.next(self.a2b2)
@step
def a1b1(self, inputs):
self.merge_artifacts(inputs)
self.next(self.join)
@step
def a2b2(self, inputs):
self.merge_artifacts(inputs)
self.next(self.join)
@step
def join(self, inputs):
self.merge_artifacts(inputs)
self.next(self.end)
@step
def end(self):
pass
if __name__ == "__main__":
TestFlow()
It throws error:
Step a1b1 joins steps from unrelated splits. Ensure that there is a matching join for every split.
I know I can reimplement this like:
Code:
from metaflow import FlowSpec, step
class TestFlow(FlowSpec):
@step
def start(self):
self.next(self.a, self.b)
@step
def a(self):
self.next(self.a1, self.a2)
@step
def a1(self):
self.next(self.a12)
@step
def a2(self):
self.next(self.a12)
@step
def b(self):
self.next(self.b1, self.b2)
@step
def b1(self):
self.next(self.b12)
@step
def b2(self,):
self.next(self.b12)
@step
def a12(self, inputs):
self.merge_artifacts(inputs)
self.next(self.as_bs)
@step
def b12(self, inputs):
self.merge_artifacts(inputs)
self.next(self.as_bs)
@step
def as_bs(self, inputs):
self.merge_artifacts(inputs)
self.next(self.end)
@step
def end(self):
pass
if __name__ == "__main__":
TestFlow()
In my case looking from a perspective of visual graph second implementation looks cleaner, but from implementation perspective it brings unnecessary additional steps.
Are there any plans to add support for this kind of Flows?
Thanks!
Metadata
Metadata
Assignees
Labels
No labels