-
Notifications
You must be signed in to change notification settings - Fork 5.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[core] Fiber vs thread_local SIGSEGV in Actor async methods #46489
Comments
TLDR: If the async actor crash-restarts, there is a chance that it will call PinExistingReturnObject() which doesn't support async task. This is a race. For short term, we should pass owner_address from I have successfully reproduced the issue in repeated way:
import os
import ray
import numpy as np
import pytest
def test_died_generator(ray_start_cluster):
"""
Tests nondeterministic generators vs lineage reconstruction.
Timeline:
1. In worker node, creates a generator that generates 100 objects
2. Kills worker node, objs exist in ref, but data lost
3. In worker node, creates a consumer that consumes 100 objects
4. Start a worker node to enable the task and lineage reconstruction
5. Lineage reconstruction should be working here. The gen is dead after it only generated 50.
6. Verify that the consumer task can still run (it's not)
"""
cluster = ray_start_cluster
cluster.add_node(num_cpus=1, resources={"head": 1})
cluster.wait_for_nodes()
ray.init(address=cluster.address)
@ray.remote(num_cpus=0, resources={"head": 0.1})
class Killer:
def __init__(self):
self.pid = None
self.at_num = None
self.kill_num = 0
def set_pid(self, pid):
self.pid = pid
def set_at_num(self, at_num):
self.at_num = at_num
def kill_if_needed(self, num):
if self.kill_num > 15:
# Uncomment the following and comment above will make it pass
# if self.kill_num > 14:
return
self.kill_num = self.kill_num + 1
if self.pid is not None and self.at_num is not None and num == self.at_num:
import os
import signal
print(f"Killing the pid = {self.pid}")
os.kill(self.pid, signal.SIGKILL)
@ray.remote(num_cpus=1, max_restarts=-1, max_task_retries=-1, resources={"worker": 1})
class Generator:
async def gen(self, nums, killer):
"""
Generates "value_holder" objects. For each object, it first notifies the
killer, and yields the object.
"""
print(f'my pid is {os.getpid()}, telling to killer')
await killer.set_pid.remote(os.getpid())
print(f"generates total {nums}")
for i in range(nums):
print(f"generating {i}")
await killer.kill_if_needed.remote(i)
yield np.ones((1000, 1000), dtype=np.uint8) * i
print(f"generated {i}")
print(f"generated total {nums}")
def get_pid(self):
return os.getpid()
@ray.remote(num_cpus=1, resources={"worker": 1})
def consumes(objs, expected_num):
nums = ray.get(objs)
assert len(nums) == expected_num
print(f"consumes {len(nums)}")
print(nums)
return expected_num
worker_node = cluster.add_node(num_cpus=10, resources={"worker": 10})
cluster.wait_for_nodes()
generator = Generator.remote()
killer = Killer.remote()
# First run, no kills
# gen -> ObjectRefGenerator
gen = ray.get(generator.gen.remote(10, killer))
# [ObjectRef, ...]
objs = list(gen)
assert len(objs) == 10
# kill the worker node
cluster.remove_node(worker_node, allow_graceful=False)
# In the lineage reconstruction, the generator is dead after it only generated 5...
ray.get(killer.set_at_num.remote(5))
# ... but a consumer takes all 10
consumer = consumes.remote(objs, 10)
# start a new worker node
worker_node = cluster.add_node(num_cpus=10, resources={"worker": 10})
cluster.wait_for_nodes()
ray.get(consumer) |
hmm in my test I don't think we need to kill 15 times; only 1 times suffice. what's the diff of your test code? |
That's not true. In your test the async actor was killed indefinitely. |
What happened + What you expected to happen
We have this
thread_local
thing here:ray/src/ray/core_worker/context.h
Line 171 in 8d02655
and it's used to determine "CurrentTask" in Core Worker. Sadly we have cases where the
SetCurrentTask
, called inExecuteTask
, is called in a fiber thread. So, when the python thread reads viaGetCurrentTask
, it segfaults.It's a mystery why it only shows up in a actor restart. In theory it should show up right away easily.
We may be able to use boost::fiber_specific_ptr to solve this. But more fundamentally it's dubious to me why we need such a global thread local method to get a task spec in the first place.
Versions / Dependencies
master
Reproduction script
Issue Severity
Medium: It is a significant difficulty but I can work around it.
The text was updated successfully, but these errors were encountered: