You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
However, I would like to be able to do a ParallellFor operation so that I can dynamically set the parameters for the downstream tasks. I would expect the following to work:
import kfp
from kfp.components import InputPath, InputTextFile, InputBinaryFile, OutputPath, OutputTextFile, OutputBinaryFile
from kfp.components import func_to_container_op
def stream_array(array_stream: OutputBinaryFile(bytes)):
import numpy as np
ar = np.array([2, 3, 1, 0]).astype("int32")
array_stream.write(ar.tobytes())
def read_array(array_stream: InputBinaryFile(bytes), i: int):
import numpy as np
ar = np.frombuffer(array_stream.read(), dtype="int32")
print(i)
print(ar)
stream_op = func_to_container_op(stream_array,base_image="faizanbashir/python-datascience:3.6")
read_op = func_to_container_op(read_array,base_image="faizanbashir/python-datascience:3.6")
@kfp.dsl.pipeline(name="test", description="blah")
def streaming_pipeline():
stream_task = stream_op()
with kfp.dsl.ParallelFor([1,2]) as item:
read_task = read_op(stream_task.output, item)
print(kfp.Client().create_run_from_pipeline_func(streaming_pipeline, arguments={}, experiment_name="test"))
However, this pipeline fails to start, resulting in an error (visible via the Kubeflow Dashboard) as follows:
invalid spec: templates.test.tasks.for-loop-for-loop-6ef91a55-1 failed to resolve {{inputs.artifacts.stream-array-array-stream}}
The text was updated successfully, but these errors were encountered:
Thank you for this report (P.S. your PoC is nice, modern and clean).
I highly suspect that the InputBinaryFile is not causing it, but there is definitely an issue with the compiler and ParallelFor.
I'll investigate.
Yep, I've looked at it again and same problem arises with other output types. Any ops that are called before result in an error if used as inputs within the ParallelFor
I am trying to build a workflow that fetches a fairly large dataset from S3 and makes it available for downstream components as a binary stream.
Here is a simple working example where two downstream tasks each access the binary stream:
However, I would like to be able to do a ParallellFor operation so that I can dynamically set the parameters for the downstream tasks. I would expect the following to work:
However, this pipeline fails to start, resulting in an error (visible via the Kubeflow Dashboard) as follows:
invalid spec: templates.test.tasks.for-loop-for-loop-6ef91a55-1 failed to resolve {{inputs.artifacts.stream-array-array-stream}}
The text was updated successfully, but these errors were encountered: