Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove ray task post_execute because ray.shutdown will be automatically called #2304

Closed
wants to merge 1 commit into from

Conversation

yundai424
Copy link
Contributor

Tracking issue

https://github.com/flyteorg/flyte/issues/

Why are the changes needed?

ray.shutdown() will always be automatically called (registered through atexit here). We should rather rely on it instead of explicitly calling it again especially we don't set _exiting_interpreter=True which may result in disconnecting too soon before worker logs are flushed to driver.

What changes were proposed in this pull request?

remove the post_execute of ray task

How was this patch tested?

test with a simple example:

import typing

from flytekit import ImageSpec, Resources, task, workflow

commit_hash = "244a85c876eab507742d72bd90916d0e8662ea5b"
ray_plugin = f"git+https://github.com/yundai424/flytekit.git@{commit_hash}#subdirectory=plugins/flytekit-ray"

custom_image = ImageSpec(
    name="ray-flyte-plugin",
    registry="localhost:30000",
    packages=[ray_plugin],
    apt_packages=['git', 'wget', 'vim'],
)

# if custom_image.is_container():
import ray
from flytekitplugins.ray import HeadNodeConfig, RayJobConfig, WorkerNodeConfig

@ray.remote
def f(x):
    print(f"ray task get {x}")
    return x * x

@task(
    task_config=RayJobConfig(
        head_node_config=HeadNodeConfig(ray_start_params={"log-color": "True", "block": "true"}),
        worker_node_config=[WorkerNodeConfig(group_name="ray-group", replicas=1)],
        runtime_env={"pip": []},
    ),
    requests=Resources(mem="4Gi", cpu="1"),
    container_image=custom_image,
)
def ray_task(n: int) -> typing.List[int]:
    futures = [f.remote(i) for i in range(n)]
    l = ray.get(futures)
    print(l, 2)
    return l

@workflow
def ray_workflow(n: int) -> typing.List[int]:
    return ray_task(n=n)

Setup process

Screenshots

Check all the applicable boxes

  • I updated the documentation accordingly.
  • All new and existing tests passed.
  • All commits are signed-off.

Related PRs

Docs link

Copy link

welcome bot commented Mar 28, 2024

Thank you for opening this pull request! 🙌

These tips will help get your PR across the finish line:

  • Most of the repos have a PR template; if not, fill it out to the best of your knowledge.
  • Sign off your commits (Reference: DCO Guide).

@dosubot dosubot bot added the size:XS This PR changes 0-9 lines, ignoring generated files. label Mar 28, 2024
…ly called

Signed-off-by: Yun Dai <yundai424@gmail.com>
@yundai424 yundai424 closed this Mar 28, 2024
@dosubot dosubot bot added size:XXL This PR changes 1000+ lines, ignoring generated files. and removed size:XS This PR changes 0-9 lines, ignoring generated files. labels Mar 28, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
size:XXL This PR changes 1000+ lines, ignoring generated files.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant