Skip to content

Support for flows with intersecting branches #182

Open
@miloszbednarzak

Description

@miloszbednarzak

I'd like to implement flow in which its branches intersects with each other like:
test-graph

Code:
from metaflow import FlowSpec, step


class TestFlow(FlowSpec):
    @step
    def start(self):
        self.next(self.a, self.b)

    @step
    def a(self):

        self.next(self.a1, self.a2)

    @step
    def a1(self):

        self.next(self.a1b1)

    @step
    def a2(self):

        self.next(self.a2b2)

    @step
    def b(self):

        self.next(self.b1, self.b2)

    @step
    def b1(self):

        self.next(self.a1b1)

    @step
    def b2(self,):

        self.next(self.a2b2)

    @step
    def a1b1(self, inputs):
        self.merge_artifacts(inputs)
        self.next(self.join)

    @step
    def a2b2(self, inputs):
        self.merge_artifacts(inputs)
        self.next(self.join)

    @step
    def join(self, inputs):
        self.merge_artifacts(inputs)
        self.next(self.end)

    @step
    def end(self):
        pass


if __name__ == "__main__":
    TestFlow()

It throws error:

Step a1b1 joins steps from unrelated splits. Ensure that there is a matching join for every split.

I know I can reimplement this like:
test-graph2

Code:
from metaflow import FlowSpec, step


class TestFlow(FlowSpec):
    @step
    def start(self):
        self.next(self.a, self.b)

    @step
    def a(self):

        self.next(self.a1, self.a2)

    @step
    def a1(self):

        self.next(self.a12)

    @step
    def a2(self):

        self.next(self.a12)

    @step
    def b(self):

        self.next(self.b1, self.b2)

    @step
    def b1(self):

        self.next(self.b12)

    @step
    def b2(self,):

        self.next(self.b12)

    @step
    def a12(self, inputs):
        self.merge_artifacts(inputs)
        self.next(self.as_bs)

    @step
    def b12(self, inputs):
        self.merge_artifacts(inputs)
        self.next(self.as_bs)

    @step
    def as_bs(self, inputs):
        self.merge_artifacts(inputs)
        self.next(self.end)

    @step
    def end(self):
        pass


if __name__ == "__main__":
    TestFlow()

In my case looking from a perspective of visual graph second implementation looks cleaner, but from implementation perspective it brings unnecessary additional steps.

Are there any plans to add support for this kind of Flows?
Thanks!

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions