Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] Set bindings for ArrayNode #2742

Merged
merged 19 commits into from
Sep 19, 2024
Merged

Conversation

pvditt
Copy link
Contributor

@pvditt pvditt commented Sep 10, 2024

Tracking issue

fixes: https://linear.app/unionai/issue/COR-1822/mapping-over-launch-plans-w-default-inputs-doesnt-compile

Why are the changes needed?

Compiling a workflow fails when evaluating an ArrayNode that maps over a LaunchPlan contains a default input. This occurs since we don't set the input bindings for the node.

https://github.com/unionai/flyte/blob/926cadac9a8cf406b85c8e7443668a13a3f60dfe/flytepropeller/pkg/compiler/validators/interface.go#L76-L86

Get errors such as:

pyflyte run --remote tests/flytekit/integration/map_ref_lp.py map_regression_line_wf
Running Execution on Remote.
Request rejected by the API, due to Invalid input.
RPC Failed, with Status: StatusCode.INVALID_ARGUMENT
        details: failed to compile workflow for [resource_type:WORKFLOW project:"flytesnacks" domain:"development" name:"tests.flytekit.integration.map_ref_lp.map_regression_line_wf" version:"m6vMatWi-HCjUFD5scx4rQ"] with err failed to compile workflow with err Collected Errors: 3
        Error 0: Code: VariableNameNotFound, Node Id: n0, Description: Variable [val] not found on node [n0].
        Error 1: Code: VariableNameNotFound, Node Id: n0, Description: Variable [x] not found on node [n0].
        Error 2: Code: VariableNameNotFound, Node Id: n0, Description: Variable [y] not found on node [n0].

What changes were proposed in this pull request?

  • Update create_and_link_node and create_and_link_node_remote functions to create a node and optionally link it to the workflow context

  • clean up some of the setting of bound_inputs for array node since that isn't supported atm

How was this patch tested?

  • added unit test to confirm the serialized array node contains the bindings
  • ran E2E
@task
def slope(x: list[int], y: list[int]) -> float:
    sum_xy = sum([x[i] * y[i] for i in range(len(x))])
    sum_x_squared = sum([x[i] ** 2 for i in range(len(x))])
    n = len(x)
    return (n * sum_xy - sum(x) * sum(y)) / (n * sum_x_squared - sum(x) ** 2)


@task
def intercept(x: list[int], y: list[int], slope: float) -> float:
    mean_x = sum(x) / len(x)
    mean_y = sum(y) / len(y)
    intercept = mean_y - slope * mean_x
    return intercept


@workflow
def slope_intercept_wf(x: list[int], y: list[int]) -> (float, float):
    slope_value = slope(x=x, y=y)
    intercept_value = intercept(x=x, y=y, slope=slope_value)
    return (slope_value, intercept_value)


@task
def regression_line(val: int, slope_value: float, intercept_value: float) -> float:
    return (slope_value * val) + intercept_value  # y = mx + c


@workflow
def regression_line_wf(val: int = 5, x: list[int] = [-3, 0, 3], y: list[int] = [7, 4, -2]) -> float:
    slope_value, intercept_value = slope_intercept_wf(x=x, y=y)
    return regression_line(val=val, slope_value=slope_value, intercept_value=intercept_value)
@reference_launch_plan(
    project="flytesnacks",
    domain="development",
    name="advanced_composition.subworkflow.regression_line_wf",
    version="1jtCscLI-_FBiNcmL8Nr4Q",
)
def ref_regression_line_wf(
    val: int, x: list[int], y: list[int]
) -> float:
    return 1.0


@workflow
def map_regression_line_wf() -> list[float]:
    a = [1, 2, 3]
    b = [[-3, 0, 3], [-8, 2, 4], [7, 3, 1]]
    c = [[7, 4, -2], [-2, 4, 7], [3, 6, 4]]
    return map_task(ref_regression_line_wf)(val=a, x=b, y=c)

Setup process

  • need to run w/ union's flyte fork

Screenshots

Check all the applicable boxes

  • I updated the documentation accordingly.
  • All new and existing tests passed.
  • All commits are signed-off.

Related PRs

Docs link

Signed-off-by: Paul Dittamo <pvdittamo@gmail.com>
Signed-off-by: Paul Dittamo <pvdittamo@gmail.com>
Signed-off-by: Paul Dittamo <pvdittamo@gmail.com>
@pvditt pvditt changed the title wip/hack set bindings [Bug] Set bindings for ArrayNode Sep 11, 2024
…ayNode

Signed-off-by: Paul Dittamo <pvdittamo@gmail.com>
if ctx.compilation_state is not None:
# interface mismatch between the target and the actual inputs since we don't create a new entity with a
# transformed to list interface
transformed_kwargs = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is fairly hacky. Would love if there's a better way to do this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is fine... but can you pull this out into a separate function and add some comments and that also makes it easier to write a unit test for this.

I assume this is why the bindings in the subnode in the test seem to be binding to the first item. Could you add a comment somewhere explaining why?

Also does this feature work in local execution? If so, could you add some more units that just exercise local execution?

Copy link
Contributor Author

@pvditt pvditt Sep 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wild-endeavor cleaned this up a lot by passing in an "overriden_interface" that is the subnode's interface transformed to a list interface like we do in array node map tasks. By passing in a fake, transformed-to-list interface we can bind all the correct/original inputs for the subnode

Copy link
Contributor Author

@pvditt pvditt Sep 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wild-endeavor

Yup, this works for local execution (updated to still create and not link the node from local execute). There are a couple unit tests to test local execute:

def test_local_exec_lp_min_successes(min_successes, min_success_ratio, should_raise_error):
+
def test_map_task_wrapper():

@@ -49,6 +50,7 @@ def __init__(
self._concurrency = concurrency
self._execution_mode = execution_mode
self.id = target.name
self._bindings: List[_literal_models.Binding] = []
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make this optional? and set default to None.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wild-endeavor would it make sense to keep this as is since this isn't a passed in value and to have:

    @property
    def bindings(self) -> List[_literal_models.Binding]:
        # Required in get_serializable_node
        return self._bindings

return an empty list if the bindings aren't set

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see slack

if ctx.compilation_state is not None:
# interface mismatch between the target and the actual inputs since we don't create a new entity with a
# transformed to list interface
transformed_kwargs = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is fine... but can you pull this out into a separate function and add some comments and that also makes it easier to write a unit test for this.

I assume this is why the bindings in the subnode in the test seem to be binding to the first item. Could you add a comment somewhere explaining why?

Also does this feature work in local execution? If so, could you add some more units that just exercise local execution?

flytekit/core/promise.py Outdated Show resolved Hide resolved
tests/flytekit/unit/core/test_array_node.py Show resolved Hide resolved
@wild-endeavor
Copy link
Contributor

also, one more question, so the unit test with all the asserts... that's how propeller expects the node to be? It's supposed to bound to the first element of the list?

Can we write an example with unions also? What if we're mapping over a union, and different elements are of different types of the union, does that matter?

@pvditt
Copy link
Contributor Author

pvditt commented Sep 12, 2024

also, one more question, so the unit test with all the asserts... that's how propeller expects the node to be? It's supposed to bound to the first element of the list?

Can we write an example with unions also? What if we're mapping over a union, and different elements are of different types of the union, does that matter?

@wild-endeavor

ArrayNode SubNodeSpec input bindings aren't utilized in propeller. This input binding would just be used to set a parameter name to get set for a workflow node's input.

We don't update the interface of the underlying subnode, launch plan in this case, to a list of a type so we bypass this mismatch during compilation.

Update: I went back and cleaned things up so the correct/original types are set for the input bindings. I also updated unit testing to include a param with a union or different types.

Signed-off-by: Paul Dittamo <pvdittamo@gmail.com>
Signed-off-by: Paul Dittamo <pvdittamo@gmail.com>
Signed-off-by: Paul Dittamo <pvdittamo@gmail.com>
Signed-off-by: Paul Dittamo <pvdittamo@gmail.com>
Signed-off-by: Paul Dittamo <pvdittamo@gmail.com>
Signed-off-by: Paul Dittamo <pvdittamo@gmail.com>
Copy link

codecov bot commented Sep 17, 2024

Codecov Report

Attention: Patch coverage is 0% with 10 lines in your changes missing coverage. Please review.

Project coverage is 40.62%. Comparing base (ae9c6f8) to head (2698975).
Report is 8 commits behind head on master.

Files with missing lines Patch % Lines
flytekit/core/array_node.py 0.00% 7 Missing ⚠️
flytekit/core/promise.py 0.00% 3 Missing ⚠️

❗ There is a different number of reports uploaded between BASE (ae9c6f8) and HEAD (2698975). Click for more details.

HEAD has 14 uploads less than BASE
Flag BASE (ae9c6f8) HEAD (2698975)
15 1
Additional details and impacted files
@@             Coverage Diff             @@
##           master    #2742       +/-   ##
===========================================
- Coverage   90.87%   40.62%   -50.25%     
===========================================
  Files          74      194      +120     
  Lines        3331    19774    +16443     
  Branches        0     3899     +3899     
===========================================
+ Hits         3027     8033     +5006     
- Misses        304    11626    +11322     
- Partials        0      115      +115     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Signed-off-by: Paul Dittamo <pvdittamo@gmail.com>
…erface for local execute

Signed-off-by: Paul Dittamo <pvdittamo@gmail.com>
Signed-off-by: Paul Dittamo <pvdittamo@gmail.com>
@@ -137,7 +139,8 @@ def local_execute(self, ctx: FlyteContext, **kwargs) -> Union[Tuple[Promise], Pr
literals = []
for i in range(mapped_entity_count):
single_instance_inputs = {}
for k in self.python_interface.inputs.keys():
for binding in self.bindings:
k = binding.var
if k not in self._bound_inputs:
single_instance_inputs[k] = kwargs[k][i]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could probably iterate through the bindings here instead, but this feels cleaner for now...

@pvditt pvditt marked this pull request as draft September 17, 2024 22:35
Signed-off-by: Paul Dittamo <pvdittamo@gmail.com>
Signed-off-by: Paul Dittamo <pvdittamo@gmail.com>
@pvditt pvditt marked this pull request as ready for review September 17, 2024 23:55
@@ -49,6 +50,7 @@ def __init__(
self._concurrency = concurrency
self._execution_mode = execution_mode
self.id = target.name
self._bindings: List[_literal_models.Binding] = []
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see slack

node_id = (
f"{ctx.compilation_state.prefix}n{len(ctx.compilation_state.nodes)}"
if add_node_to_compilation_state and ctx.compilation_state
else ""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add an additional optional argument to this function for node_id? and then let's pass in a constant "ARRAY_NODE_SUB" or some other constant string? at least that way it'll be obvious.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good. Updated

Signed-off-by: Paul Dittamo <pvdittamo@gmail.com>
Signed-off-by: Paul Dittamo <pvdittamo@gmail.com>
Signed-off-by: Paul Dittamo <pvdittamo@gmail.com>
Signed-off-by: Paul Dittamo <pvdittamo@gmail.com>
@pvditt pvditt merged commit 9bce7c3 into master Sep 19, 2024
103 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants