Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refine multi-threading support #3672

Merged
merged 3 commits into from
Jan 10, 2019

Conversation

raulchen
Copy link
Contributor

@raulchen raulchen commented Jan 2, 2019

What do these changes do?

  • Make current_task_id, task_index, and put_index thread local.
  • For main thread, current_task_id is the real task id, which gets set/reset before/after each task. For background threads, current_task_id is a fake random id that will be generated when it's used for the first time, and it will never change again.
  • task_index and put_index still increments monotonically each time when submit_task and put is called (so task id generation is still deterministic for the main thread).
  • state_lock isn't needed anymore (could improve efficiency).
  • task_driver_id is still shared among threads. But we don't reset it to NIL for actors.
  • fix issue of calling an actor from multiple threads.

Related issue number

#3651

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/10537/
Test FAILed.

@raulchen raulchen changed the title [WIP] Refine multi-threading support Refine multi-threading support Jan 2, 2019
@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/10545/
Test FAILed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/10550/
Test PASSed.

@@ -71,14 +71,14 @@ public AbstractRayRuntime(RayConfig rayConfig) {
@Override
public <T> RayObject<T> put(T obj) {
UniqueId objectId = UniqueIdUtil.computePutId(
workerContext.getCurrentTask().taskId, workerContext.nextPutIndex());
workerContext.getCurrentTaskId(), workerContext.nextPutIndex());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I just realized that this may interact poorly with fault tolerance. If an object created on a separate thread is lost, then the backend will probably crash because the object won't have any lineage. We don't need to address it in this PR, but just something to keep in mind...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Maybe we should change this check to a warning?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, eventually we probably should, but it's still useful right now for debugging purposes.

currentTaskCallCount = new AtomicInteger(0);
currentClassLoader = null;
currentTask = createDummyTask(workerMode, driverId);
currentTaskId = ThreadLocal.withInitial(UniqueId::randomId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be initialized with nil? It looks like it gets overwritten soon after in both branches if the if-block below.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ThreadLocal.withInitial takes a factory method as the parameter. It means that if no value has been set for the current thread, the factory method will be called and set the return value as the thread local value.
So here, the factory method is a method that generates a random ID. A few lines below, we call currentTaskId.set. This set the value only for the main thread (a random ID for driver, and NIL for worker).
Then afterwards, when we call currentTaskId.get in a background thread for the first time. The factory method will be called, and set value for the thread.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, thanks!

Preconditions.checkState(!taskId.isNil());
return taskId;
public UniqueId getCurrentTaskId() {
return currentTaskId.get();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I'm probably missing something about Java, but how exactly does currentTaskId get set to a random ID for new threads?

TaskContext = namedtuple('TaskContext', [
'current_task_id',
'task_index',
'put_index',
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably also include the current task's driver ID since this changes for non-actors.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

task_driver_id isn't thread-specific. At any moment, all threads should have the same driver id. So it shouldn't be here. Does that make sense?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see. I was thinking of the case where a thread started during a task is still active after the worker finishes the task. Ideally, we would just prohibit that case, but not sure it's possible.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't proactively check such threads after a non-actor task finishes. But if the thread continues submitting tasks while there's no running task in the main thread, the following check will fail.
assert self.task_driver_id.is_nil()


self._task_context.task_index = 0
self._task_context.put_index = 1
self._task_context.initialized = True
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't actually seem to use TaskContext. Is that intentional?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand what you mean by use TaskContext

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean that self._task_context is not actually an instance of TaskContext.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, I see. good catch.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/10572/
Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/10584/
Test FAILed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/10702/
Test PASSed.

@stephanie-wang
Copy link
Contributor

Looks good! Can you resolve the conflicts, and then we can merge?

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/10727/
Test FAILed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/10735/
Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/10741/
Test PASSed.

self._ray_actor_cursor = object_ids.pop()
# We have notified the backend of the new actor handles to expect since
# the last task was submitted, so clear the list.
self._ray_new_actor_handles = []
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was the deletion of this line self._ray_new_actor_handles = [] intentional or an accident?

@raulchen @stephanie-wang

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this was an accident, my bad!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my bad, I'll submit an PR to fix this soon.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@robertnishihara
Copy link
Collaborator

@raulchen I think this PR introduced #3754. Can you take a look?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants