Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Convert protobuf to literal as remote exec #2925

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions flytekit/core/promise.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ async def resolve_attr_path_in_promise(p: Promise) -> Promise:

curr_val = curr_val.value.literals[attr]
used += 1

# Scalar is always the leaf. There can't be a collection or map in a scalar.
if type(curr_val.value) is _literals_models.Scalar:
break
Expand All @@ -167,7 +168,6 @@ async def resolve_attr_path_in_promise(p: Promise) -> Promise:
This works correctly in remote execution.
Issue Link: https://github.com/flyteorg/flyte/issues/5959
"""

st = curr_val.value.value
new_st = resolve_attr_path_in_pb_struct(st, attr_path=p.attr_path[used:])
literal_type = TypeEngine.to_literal_type(type(new_st))
Expand Down Expand Up @@ -204,7 +204,14 @@ def resolve_attr_path_in_dict(d: dict, attr_path: List[Union[str, int]]) -> Any:
return curr_val


def resolve_attr_path_in_pb_struct(st: _struct.Struct, attr_path: List[Union[str, int]]) -> _struct.Struct:
def resolve_attr_path_in_pb_struct(
st: _struct.Struct, attr_path: List[Union[str, int]]
) -> Union[_struct.Struct, _struct.ListValue]:
"""
Resolves the protobuf struct (e.g. dataclass) with attribute path.

Note that the return type can be google.protobuf.struct_pb2.Struct or google.protobuf.struct_pb2.ListValue.
"""
curr_val = st
for attr in attr_path:
if attr not in curr_val:
Expand Down
28 changes: 25 additions & 3 deletions flytekit/core/type_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -972,12 +972,34 @@ def get_literal_type(self, t: Type[T]) -> LiteralType:
return LiteralType(simple=SimpleType.STRUCT, metadata={ProtobufTransformer.PB_FIELD_KEY: self.tag(t)})

def to_literal(self, ctx: FlyteContext, python_val: T, python_type: Type[T], expected: LiteralType) -> Literal:
struct = Struct()
"""
Convert the protobuf struct to literal.

This conversion supports two types of python_val:
1. google.protobuf.struct_pb2.Struct: A dictionary-like message
2. google.protobuf.struct_pb2.ListValue: An ordered collection of values

For details, please refer to the following issue:
https://github.com/flyteorg/flyte/issues/5959

Because the remote handling works without errors, we implement conversion with the logic as below:
https://github.com/flyteorg/flyte/blob/a87585ab7cbb6a047c76d994b3f127c4210070fd/flytepropeller/pkg/controller/nodes/attr_path_resolver.go#L72-L106
"""
try:
struct.update(_MessageToDict(cast(Message, python_val)))
if type(python_val) == _struct.Struct:
struct = Struct()
struct.update(_MessageToDict(cast(Message, python_val)))
return Literal(scalar=Scalar(generic=struct))
else:
literals = []
for v in python_val:
literal_type = TypeEngine.to_literal_type(type(v))
# Recursively convert python native values to literals
literal = TypeEngine.to_literal(ctx, v, type(v), literal_type)
literals.append(literal)
return Literal(collection=LiteralCollection(literals=literals))
except Exception:
raise TypeTransformerFailedError("Failed to convert to generic protobuf struct")
return Literal(scalar=Scalar(generic=struct))

def to_python_value(self, ctx: FlyteContext, lv: Literal, expected_python_type: Type[T]) -> T:
if not (lv and lv.scalar and lv.scalar.generic is not None):
Expand Down
Loading