Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Convert protobuf to literal as remote exec #2925

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

JiangJiaWei1103
Copy link
Contributor

@JiangJiaWei1103 JiangJiaWei1103 commented Nov 13, 2024

Tracking issue

Closes flyteorg/flyte#5959.

Why are the changes needed?

When resolving the protobuf struct with the attribute path, the return values can be of type google.protobuf.struct_pb2.Struct or google.protobuf.struct_pb2.ListValue. However, ProtobufTransformer doesn't support converting ListValue to flyte Literal, as can be seen here.

What changes were proposed in this pull request?

Support ListValue conversion from a protobuf to a flyte Literal following the logic of remote execution.

How was this patch tested?

Local test in the PR, flyteorg/flytekit#2894:

Screenshot 2024-11-13 at 9 29 11 PM

Will add unit test soon.

Setup process

git clone https://github.com/flyteorg/flytekit.git
gh pr checkout 2925
make setup && pip install -e .

Check all the applicable boxes

  • I updated the documentation accordingly.
  • All new and existing tests passed.
  • All commits are signed-off.

Related PRs

flyteorg/flytekit#2894

Docs link

Signed-off-by: JiaWei Jiang <waynechuang97@gmail.com>
Copy link
Member

@Future-Outlier Future-Outlier left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My ideal test is like this.
Note: change the flytefile and flytedir path to local.
you can refer these 2 URLs.

@pytest.fixture
def local_dummy_file():
fd, path = tempfile.mkstemp()
try:
with os.fdopen(fd, "w") as tmp:
tmp.write("Hello world")
yield path
finally:
os.remove(path)

@pytest.fixture
def local_dummy_directory():
temp_dir = tempfile.TemporaryDirectory()
try:
with open(os.path.join(temp_dir.name, "file"), "w") as tmp:
tmp.write("Hello world")
yield temp_dir.name
finally:
temp_dir.cleanup()

for structured dataset and flyteschema, you can add parquet file from here.
https://github.com/flyteorg/flytekit/tree/master/tests/flytekit/integration/remote/workflows/basic/data/df.parquet

import typing
import os
from dataclasses import dataclass, field
from typing import Dict, List
from flytekit.types.file import FlyteFile
from flytekit.types.directory import FlyteDirectory
from flytekit.types.structured import StructuredDataset
from flytekit.types.schema import FlyteSchema
from flytekit import task, workflow, ImageSpec
from enum import Enum
import pandas as pd

class Status(Enum):
    PENDING = "pending"
    APPROVED = "approved"
    REJECTED = "rejected"

@dataclass
class InnerDC:
    a: int = -1
    b: float = 2.1
    c: str = "Hello, Flyte"
    d: bool = False
    e: List[int] = field(default_factory=lambda: [0, 1, 2, -1, -2])
    f: List[FlyteFile] = field(default_factory=lambda: [FlyteFile("s3://my-s3-bucket/s3_flyte_dir/example.txt")])
    g: List[List[int]] = field(default_factory=lambda: [[0], [1], [-1]])
    h: List[Dict[int, bool]] = field(default_factory=lambda: [{0: False}, {1: True}, {-1: True}])
    i: Dict[int, bool] = field(default_factory=lambda: {0: False, 1: True, -1: False})
    j: Dict[int, FlyteFile] = field(default_factory=lambda: {
        0: FlyteFile("s3://my-s3-bucket/s3_flyte_dir/example.txt"),
        1: FlyteFile("s3://my-s3-bucket/s3_flyte_dir/example.txt"),
        -1: FlyteFile("s3://my-s3-bucket/s3_flyte_dir/example.txt")
    })
    k: Dict[int, List[int]] = field(default_factory=lambda: {0: [0, 1, -1]})
    l: Dict[int, Dict[int, int]] = field(default_factory=lambda: {1: {-1: 0}})
    m: dict = field(default_factory=lambda: {"key": "value"})
    n: FlyteFile = field(default_factory=lambda: FlyteFile("s3://my-s3-bucket/s3_flyte_dir/example.txt"))
    o: FlyteDirectory = field(default_factory=lambda: FlyteDirectory("s3://my-s3-bucket/s3_flyte_dir"))
    enum_status: Status = field(default=Status.PENDING)
    sd: StructuredDataset = field(default_factory=lambda: StructuredDataset(uri="s3://my-s3-bucket/s3_flyte_dir/df.parquet", file_format="parquet"))
    fsc: FlyteSchema = field(default_factory=lambda: FlyteSchema(remote_path="s3://my-s3-bucket/s3_flyte_dir/df.parquet"))

@dataclass
class DC:
    a: int = -1
    b: float = 2.1
    c: str = "Hello, Flyte"
    d: bool = False
    e: List[int] = field(default_factory=lambda: [0, 1, 2, -1, -2])
    f: List[FlyteFile] = field(default_factory=lambda: [FlyteFile("s3://my-s3-bucket/s3_flyte_dir/example.txt")])
    g: List[List[int]] = field(default_factory=lambda: [[0], [1], [-1]])
    h: List[Dict[int, bool]] = field(default_factory=lambda: [{0: False}, {1: True}, {-1: True}])
    i: Dict[int, bool] = field(default_factory=lambda: {0: False, 1: True, -1: False})
    j: Dict[int, FlyteFile] = field(default_factory=lambda: {
        0: FlyteFile("s3://my-s3-bucket/s3_flyte_dir/example.txt"),
        1: FlyteFile("s3://my-s3-bucket/s3_flyte_dir/example.txt"),
        -1: FlyteFile("s3://my-s3-bucket/s3_flyte_dir/example.txt")
    })
    k: Dict[int, List[int]] = field(default_factory=lambda: {0: [0, 1, -1]})
    l: Dict[int, Dict[int, int]] = field(default_factory=lambda: {1: {-1: 0}})
    m: dict = field(default_factory=lambda: {"key": "value"})
    n: FlyteFile = field(default_factory=lambda: FlyteFile("s3://my-s3-bucket/s3_flyte_dir/example.txt"))
    o: FlyteDirectory = field(default_factory=lambda: FlyteDirectory("s3://my-s3-bucket/s3_flyte_dir"))
    inner_dc: InnerDC = field(default_factory=lambda: InnerDC())
    enum_status: Status = field(default=Status.PENDING)
    sd: StructuredDataset = field(default_factory=lambda: StructuredDataset(uri="s3://my-s3-bucket/s3_flyte_dir/df.parquet", file_format="parquet"))
    fsc: FlyteSchema = field(default_factory=lambda: FlyteSchema(remote_path="s3://my-s3-bucket/s3_flyte_dir/df.parquet"))

@task(container_image=image)
def t_dc(dc: DC) -> DC:
    return dc

@task(container_image=image)
def t_inner(inner_dc: InnerDC) -> InnerDC:
    assert isinstance(inner_dc, InnerDC), "inner_dc is not of type InnerDC"

    expected_file_content = "Default content"

    # f: List[FlyteFile]
    for ff in inner_dc.f:
        assert isinstance(ff, FlyteFile), "Expected FlyteFile"
        with open(ff, "r") as f:
            assert f.read() == expected_file_content, "File content mismatch in f"

    # j: Dict[int, FlyteFile]
    for _, ff in inner_dc.j.items():
        assert isinstance(ff, FlyteFile), "Expected FlyteFile in j"
        with open(ff, "r") as f:
            assert f.read() == expected_file_content, "File content mismatch in j"

    # n: FlyteFile
    assert isinstance(inner_dc.n, FlyteFile), "n is not FlyteFile"
    with open(inner_dc.n, "r") as f:
        assert f.read() == expected_file_content, "File content mismatch in n"

    # o: FlyteDirectory
    assert isinstance(inner_dc.o, FlyteDirectory), "o is not FlyteDirectory"
    assert not inner_dc.o.downloaded, "o should not be downloaded initially"
    with open(os.path.join(inner_dc.o, "example.txt"), "r") as fh:
        assert fh.read() == expected_file_content, "File content mismatch in o"
    assert inner_dc.o.downloaded, "o should be marked as downloaded after access"

    assert inner_dc.enum_status == Status.PENDING, "enum_status does not match"
    assert isinstance(inner_dc.sd, StructuredDataset), "sd is not StructuredDataset"
    assert isinstance(inner_dc.fsc, FlyteSchema), "fsc is not FlyteSchema"
    print("All checks in InnerDC passed")

    return inner_dc

@task(container_image=image)
def t_test_all_attributes(a: int, b: float, c: str, d: bool, e: List[int], f: List[FlyteFile], g: List[List[int]],
                          h: List[Dict[int, bool]], i: Dict[int, bool], j: Dict[int, FlyteFile],
                          k: Dict[int, List[int]], l: Dict[int, Dict[int, int]], m: dict,
                          n: FlyteFile, o: FlyteDirectory, enum_status: Status, sd: StructuredDataset,
                          fsc: FlyteSchema
                          ):

    # Strict type checks for simple types
    assert isinstance(a, int), f"a is not int, it's {type(a)}"
    assert a == -1
    assert isinstance(b, float), f"b is not float, it's {type(b)}"
    assert isinstance(c, str), f"c is not str, it's {type(c)}"
    assert isinstance(d, bool), f"d is not bool, it's {type(d)}"

    # Strict type checks for List[int]
    assert isinstance(e, list) and all(isinstance(i, int) for i in e), "e is not List[int]"

    # Strict type checks for List[FlyteFile]
    assert isinstance(f, list) and all(isinstance(i, FlyteFile) for i in f), "f is not List[FlyteFile]"

    # Strict type checks for List[List[int]]
    assert isinstance(g, list) and all(
        isinstance(i, list) and all(isinstance(j, int) for j in i) for i in g), "g is not List[List[int]]"

    # Strict type checks for List[Dict[int, bool]]
    assert isinstance(h, list) and all(
        isinstance(i, dict) and all(isinstance(k, int) and isinstance(v, bool) for k, v in i.items()) for i in h
    ), "h is not List[Dict[int, bool]]"

    # Strict type checks for Dict[int, bool]
    assert isinstance(i, dict) and all(
        isinstance(k, int) and isinstance(v, bool) for k, v in i.items()), "i is not Dict[int, bool]"

    # Strict type checks for Dict[int, FlyteFile]
    assert isinstance(j, dict) and all(
        isinstance(k, int) and isinstance(v, FlyteFile) for k, v in j.items()), "j is not Dict[int, FlyteFile]"

    # Strict type checks for Dict[int, List[int]]
    assert isinstance(k, dict) and all(
        isinstance(k, int) and isinstance(v, list) and all(isinstance(i, int) for i in v) for k, v in
        k.items()), "k is not Dict[int, List[int]]"

    # Strict type checks for Dict[int, Dict[int, int]]
    assert isinstance(l, dict) and all(
        isinstance(k, int) and isinstance(v, dict) and all(
            isinstance(sub_k, int) and isinstance(sub_v, int) for sub_k, sub_v in v.items())
        for k, v in l.items()), "l is not Dict[int, Dict[int, int]]"

    # Strict type check for a generic dict
    assert isinstance(m, dict), "m is not dict"

    # Strict type check for FlyteFile
    assert isinstance(n, FlyteFile), "n is not FlyteFile"

    # Strict type check for FlyteDirectory
    assert isinstance(o, FlyteDirectory), "o is not FlyteDirectory"

    # # Strict type check for Enum
    assert isinstance(enum_status, Status), "enum_status is not Status"

    assert isinstance(sd, StructuredDataset), "sd is not StructuredDataset"
    print("sd:", sd.open(pd.DataFrame).all())

    assert isinstance(fsc, FlyteSchema), "fsc is not FlyteSchema"
    print("fsc: ", fsc.open().all())

    print("All attributes passed strict type checks.")


@workflow
def wf(dc: DC):
    new_dc = t_dc(dc=dc)
    t_inner(new_dc.inner_dc)
    t_test_all_attributes(a=new_dc.a, b=new_dc.b, c=new_dc.c,
                          d=new_dc.d, e=new_dc.e, f=new_dc.f,
                          g=new_dc.g, h=new_dc.h, i=new_dc.i,
                          j=new_dc.j, k=new_dc.k, l=new_dc.l,
                          m=new_dc.m, n=new_dc.n, o=new_dc.o,
                          enum_status=new_dc.enum_status,
                          sd=new_dc.sd,
                          fsc=new_dc.fsc
                          )
    t_test_all_attributes(a=new_dc.inner_dc.a, b=new_dc.inner_dc.b, c=new_dc.inner_dc.c,
                          d=new_dc.inner_dc.d, e=new_dc.inner_dc.e, f=new_dc.inner_dc.f,
                          g=new_dc.inner_dc.g, h=new_dc.inner_dc.h, i=new_dc.inner_dc.i,
                          j=new_dc.inner_dc.j, k=new_dc.inner_dc.k, l=new_dc.inner_dc.l,
                          m=new_dc.inner_dc.m, n=new_dc.inner_dc.n, o=new_dc.inner_dc.o,
                          enum_status=new_dc.inner_dc.enum_status,
                          sd=new_dc.inner_dc.sd, fsc=new_dc.inner_dc.fsc)


if __name__ == "__main__":
    from flytekit.clis.sdk_in_container import pyflyte
    from click.testing import CliRunner
    import os
    # FLYTE_USE_OLD_DC_FORMAT": "true"
    # os.environ["FLYTE_USE_OLD_DC_FORMAT"] = "true"

    runner = CliRunner()
    path = os.path.realpath(__file__)
    # os.environ["FLYTE_USE_OLD_DC_FORMAT"] = "True"
    input_val = '{"a": -1, "b": 3.14}'
    # result = runner.invoke(pyflyte.main, ["run", path, "wf", "--dc", input_val])
    # print("Local Execution: ", result.output)
    result = runner.invoke(pyflyte.main, ["run", "--remote", path, "wf", "--dc", input_val])
    print("Remote Execution: ", result.output)

Signed-off-by: JiaWei Jiang <waynechuang97@gmail.com>
@JiangJiaWei1103
Copy link
Contributor Author

JiangJiaWei1103 commented Nov 15, 2024

Hi @Future-Outlier,

I've added the first version of the test. Some takeaways are summarized as follows,

  1. For FlyteFile and FlyteDirectory, local temporary file and directory are used, which is handled with the context manager.
  2. For StructuredDataset and FlyteSchema, a static local parquet file provided by you is used.
    • I use the relative path here.
  3. For FlyteSchema, I use local_path, instead of remote_path.

Some questions are:

  1. Is there a better practice to access the local parquet file than mine? I think my method is a little bit weird.
  2. Is it better to pull all assertions in a task out of it?

Thanks!

Copy link

codecov bot commented Nov 15, 2024

Codecov Report

Attention: Patch coverage is 9.09091% with 10 lines in your changes missing coverage. Please review.

Project coverage is 51.45%. Comparing base (3f0ab84) to head (d6f7821).
Report is 4 commits behind head on master.

Files with missing lines Patch % Lines
flytekit/core/type_engine.py 0.00% 10 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff             @@
##           master    #2925       +/-   ##
===========================================
- Coverage   76.33%   51.45%   -24.89%     
===========================================
  Files         199      199               
  Lines       20840    20860       +20     
  Branches     2681     2686        +5     
===========================================
- Hits        15908    10733     -5175     
- Misses       4214     9524     +5310     
+ Partials      718      603      -115     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Status: In progress
Development

Successfully merging this pull request may close these issues.

[BUG] Make Protobuf Struct Attribute Access local execurtion the same as remote execution
2 participants