Closed
Description
Given the following simple flow:
Creating it using the Functional and Imperative style results in an error in the second case:
from prefect import Flow, task
@task
def print_dict(input_dict: dict):
output = ""
for key, value in input_dict.items():
output += f"{key}, {value}; "
print(output)
if __name__ == "__main__":
with Flow("test flow") as flow:
print_dict(dict(first_key="test1", second_key="test2"))
state = flow.run()
assert state.is_successful()
The functional style returns in the following: first_key, test1; second_key, test2;
from prefect import Flow, task
@task
def print_dict(input_dict: dict):
output = ""
for key, value in input_dict.items():
output += f"{key}, {value}; "
print(output)
if __name__ == "__main__":
flow = Flow("test flow")
flow.add_task(print_dict)
print_dict.bind(dict(first_key="test1", second_key="test2"), flow=flow)
state = flow.run()
assert state.is_successful()
The imperative one fails on the call to bind with: ValueError: Could not infer an active Flow context.
The same happens when using list, tuple or set.
The cause of it seems to be the as_task()
function defined in prefect.utilities.tasks
.
It is called in the set_dependencies()
method of the Flow class defined in prefect.core.flow
.
The easiest way to fix this would be to add an optional flow
keyword argument to the as_task()
function.
I can work on that if that's alright.
Metadata
Metadata
Assignees
Labels
No labels
Activity