-
Notifications
You must be signed in to change notification settings - Fork 6.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refine multi-threading support #3672
Conversation
Test FAILed. |
Test FAILed. |
Test PASSed. |
@@ -71,14 +71,14 @@ public AbstractRayRuntime(RayConfig rayConfig) { | |||
@Override | |||
public <T> RayObject<T> put(T obj) { | |||
UniqueId objectId = UniqueIdUtil.computePutId( | |||
workerContext.getCurrentTask().taskId, workerContext.nextPutIndex()); | |||
workerContext.getCurrentTaskId(), workerContext.nextPutIndex()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, I just realized that this may interact poorly with fault tolerance. If an object created on a separate thread is lost, then the backend will probably crash because the object won't have any lineage. We don't need to address it in this PR, but just something to keep in mind...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. Maybe we should change this check to a warning?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, eventually we probably should, but it's still useful right now for debugging purposes.
currentTaskCallCount = new AtomicInteger(0); | ||
currentClassLoader = null; | ||
currentTask = createDummyTask(workerMode, driverId); | ||
currentTaskId = ThreadLocal.withInitial(UniqueId::randomId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be initialized with nil? It looks like it gets overwritten soon after in both branches if the if-block below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ThreadLocal.withInitial
takes a factory method as the parameter. It means that if no value has been set for the current thread, the factory method will be called and set the return value as the thread local value.
So here, the factory method is a method that generates a random ID. A few lines below, we call currentTaskId.set
. This set the value only for the main thread (a random ID for driver, and NIL for worker).
Then afterwards, when we call currentTaskId.get
in a background thread for the first time. The factory method will be called, and set value for the thread.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, thanks!
Preconditions.checkState(!taskId.isNil()); | ||
return taskId; | ||
public UniqueId getCurrentTaskId() { | ||
return currentTaskId.get(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I'm probably missing something about Java, but how exactly does currentTaskId
get set to a random ID for new threads?
python/ray/worker.py
Outdated
TaskContext = namedtuple('TaskContext', [ | ||
'current_task_id', | ||
'task_index', | ||
'put_index', |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should probably also include the current task's driver ID since this changes for non-actors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
task_driver_id
isn't thread-specific. At any moment, all threads should have the same driver id. So it shouldn't be here. Does that make sense?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I see. I was thinking of the case where a thread started during a task is still active after the worker finishes the task. Ideally, we would just prohibit that case, but not sure it's possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't proactively check such threads after a non-actor task finishes. But if the thread continues submitting tasks while there's no running task in the main thread, the following check will fail.
assert self.task_driver_id.is_nil()
|
||
self._task_context.task_index = 0 | ||
self._task_context.put_index = 1 | ||
self._task_context.initialized = True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't actually seem to use TaskContext
. Is that intentional?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand what you mean by use TaskContext
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean that self._task_context
is not actually an instance of TaskContext
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah, I see. good catch.
Test FAILed. |
Test FAILed. |
Test PASSed. |
Looks good! Can you resolve the conflicts, and then we can merge? |
0082bde
to
09066ca
Compare
Test FAILed. |
09066ca
to
ef4d465
Compare
Test PASSed. |
Test PASSed. |
self._ray_actor_cursor = object_ids.pop() | ||
# We have notified the backend of the new actor handles to expect since | ||
# the last task was submitted, so clear the list. | ||
self._ray_new_actor_handles = [] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was the deletion of this line self._ray_new_actor_handles = []
intentional or an accident?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this was an accident, my bad!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
my bad, I'll submit an PR to fix this soon.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do these changes do?
current_task_id
,task_index
, andput_index
thread local.current_task_id
is the real task id, which gets set/reset before/after each task. For background threads,current_task_id
is a fake random id that will be generated when it's used for the first time, and it will never change again.task_index
andput_index
still increments monotonically each time whensubmit_task
andput
is called (so task id generation is still deterministic for the main thread).state_lock
isn't needed anymore (could improve efficiency).task_driver_id
is still shared among threads. But we don't reset it to NIL for actors.Related issue number
#3651