Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ParellelFor cannot resolve binary input stream #2569

Closed
karlschriek opened this issue Nov 7, 2019 · 2 comments · Fixed by #3029
Closed

ParellelFor cannot resolve binary input stream #2569

karlschriek opened this issue Nov 7, 2019 · 2 comments · Fixed by #3029

Comments

@karlschriek
Copy link

karlschriek commented Nov 7, 2019

I am trying to build a workflow that fetches a fairly large dataset from S3 and makes it available for downstream components as a binary stream.

Here is a simple working example where two downstream tasks each access the binary stream:

import kfp
from kfp.components import InputBinaryFile, OutputBinaryFile
from kfp.components import func_to_container_op

def stream_array(array_stream: OutputBinaryFile(bytes)):
    import numpy as np
    ar = np.array([2, 3, 1, 0]).astype("int32")
    array_stream.write(ar.tobytes())


def read_array(array_stream: InputBinaryFile(bytes), i: int):
    import numpy as np
    ar = np.frombuffer(array_stream.read(), dtype="int32")
    print(i)
    print(ar)

stream_op = func_to_container_op(stream_array,base_image="faizanbashir/python-datascience:3.6")
read_op = func_to_container_op(read_array,base_image="faizanbashir/python-datascience:3.6")


@kfp.dsl.pipeline(name="test", description="blah")
def streaming_pipeline():
    stream_task = stream_op()
    read_task_1 = read_op(stream_task.output, 1)
    read_task_2 = read_op(stream_task.output, 2)


print(kfp.Client().create_run_from_pipeline_func(streaming_pipeline, arguments={}, experiment_name="test"))

However, I would like to be able to do a ParallellFor operation so that I can dynamically set the parameters for the downstream tasks. I would expect the following to work:

import kfp
from kfp.components import InputPath, InputTextFile, InputBinaryFile, OutputPath, OutputTextFile, OutputBinaryFile
from kfp.components import func_to_container_op


def stream_array(array_stream: OutputBinaryFile(bytes)):
    import numpy as np
    ar = np.array([2, 3, 1, 0]).astype("int32")
    array_stream.write(ar.tobytes())


def read_array(array_stream: InputBinaryFile(bytes), i: int):
    import numpy as np
    ar = np.frombuffer(array_stream.read(), dtype="int32")
    print(i)
    print(ar)

stream_op = func_to_container_op(stream_array,base_image="faizanbashir/python-datascience:3.6")
read_op = func_to_container_op(read_array,base_image="faizanbashir/python-datascience:3.6")


@kfp.dsl.pipeline(name="test", description="blah")
def streaming_pipeline():
    stream_task = stream_op()

    with kfp.dsl.ParallelFor([1,2]) as item:
        read_task = read_op(stream_task.output, item)


print(kfp.Client().create_run_from_pipeline_func(streaming_pipeline, arguments={}, experiment_name="test"))

However, this pipeline fails to start, resulting in an error (visible via the Kubeflow Dashboard) as follows:

invalid spec: templates.test.tasks.for-loop-for-loop-6ef91a55-1 failed to resolve {{inputs.artifacts.stream-array-array-stream}}

@Ark-kun
Copy link
Contributor

Ark-kun commented Nov 8, 2019

Thank you for this report (P.S. your PoC is nice, modern and clean).
I highly suspect that the InputBinaryFile is not causing it, but there is definitely an issue with the compiler and ParallelFor.
I'll investigate.

@karlschriek
Copy link
Author

Yep, I've looked at it again and same problem arises with other output types. Any ops that are called before result in an error if used as inputs within the ParallelFor

magdalenakuhn17 pushed a commit to magdalenakuhn17/pipelines that referenced this issue Oct 22, 2023
…#2569)

Signed-off-by: Andrews Arokiam <andrews.arokiam@ideas2it.com>

Signed-off-by: Andrews Arokiam <andrews.arokiam@ideas2it.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants