-
Notifications
You must be signed in to change notification settings - Fork 6.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Streaming] Revisiting Ray Core streaming to perform I/O fully async avoiding syncing gRPC client and Python generator #42260
[Streaming] Revisiting Ray Core streaming to perform I/O fully async avoiding syncing gRPC client and Python generator #42260
Conversation
40252c4
to
487a17f
Compare
I will review it by tmrw |
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
487a17f
to
f633401
Compare
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally LGTM!
.buildkite/core.rayci.yml
Outdated
@@ -343,7 +343,7 @@ steps: | |||
|
|||
- label: ":ray: core: cpp worker tests" | |||
tags: core_cpp | |||
instance_type: small | |||
instance_type: medium |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this necessary change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, recommended by @can-anyscale, as our C++ builds are periodically timing out
python/ray/_raylet.pyx
Outdated
@@ -1398,20 +1393,32 @@ async def execute_streaming_generator_async( | |||
raise | |||
except Exception as e: | |||
output_or_exception = e | |||
is_exception = 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why don't we use bool here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Leftover from another experiment, let me clean that one up
generator_backpressure_num_objects | ||
) | ||
else: | ||
self.waiter = shared_ptr[CGeneratorBackpressureWaiter]() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This part doesn't seem to be explained in the PR description. Is this because having a waiter implementation slows down the performance? Also, the result you posted on the PR description include this optimization?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, removing waiter in particular only brought about ~3% performance improvement which wasn't standing out in any way, but i think it's still make sense to remove it from the surface area provided that it's only relevant for Data but not for RPCs
# avoid blocking the event loop when serializing | ||
# the output (which has nogil). | ||
loop.run_in_executor( | ||
worker.core_worker.get_thread_pool_for_async_event_loop(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, this won't work if we increase the thread pool size to > 1 (cuz I don't know if report_streaming_generator_output is thread-safe now). Can you add assert somewhere to make sure it is not updated?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's correct. Those APIs aren't thread-safe and i have discovered it in my experiments already, hence the cleanup that i started doing in #42443, to alleviate some of that (it's stacked on top of this one)
return Status::OK(); | ||
} | ||
}); | ||
} else { | ||
if (options_.check_signals) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think for else case, we can just return Status::OK. (if you look at the impl of WaitUntilObjectConsumed, that's the behavior now). check_signals is to avoid ignoring python signals while cpp is using the thread (which doesn't happen in this case)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have revisited it to avoid duplication, essentially sharing the callback
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually we don't need to check signal if there's no waiter. The purpose of checking signal is when the thread is blocked inside cpp (e.g., due to locks or sleep), it cannot check python interrupt. But if we don't have the waiter there's no such problem
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it. Will clean up to avoid checking for signals
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
@@ -3016,40 +3016,52 @@ Status CoreWorker::ReportGeneratorItemReturns( | |||
RAY_LOG(DEBUG) << "Write the object ref stream, index: " << item_index | |||
<< ", id: " << dynamic_return_object.first; | |||
|
|||
waiter->IncrementObjectGenerated(); | |||
if (waiter) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
worth a comment why it's expected that waiter
can be null
// backpressure. | ||
waiter->UpdateTotalObjectConsumed(waiter->TotalObjectGenerated()); | ||
RAY_LOG(WARNING) << "Failed to send the object ref."; | ||
if (waiter) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we avoid passing a callback entirely if there's no waiter? Not sure if this is handled any differently internally
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We still need to check for signals, waiter is just a back-pressure hook for us to hold down the thread if we want to slow down the producer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. One last comment regarding checking signals
return Status::OK(); | ||
} | ||
}); | ||
} else { | ||
if (options_.check_signals) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually we don't need to check signal if there's no waiter. The purpose of checking signal is when the thread is blocked inside cpp (e.g., due to locks or sleep), it cannot check python interrupt. But if we don't have the waiter there's no such problem
@alexeykudinkin I remember you contributed the benchmark. Is it running daily right now? |
… before continuing (#44257) #42260 updated streaming generator tasks to asynchronously report generator returns, instead of synchronously reporting each generator return before yielding the next return. However this has a couple problems: If the task still has a reference to the yielded value, it may modify the value. The serialized and reported return will then have a different value than expected. As per [core] Streaming generator task waits for all object report acks before finishing the task #44079, we need to track the number of in-flight RPCs to report generator returns, so that we can wait for them all to reply before we return from the end of the task. If we increment the count of in-flight RPCs asynchronously, we can end up returning from the task while there are still in-flight RPCs. So this PR reverts some of the logic in #42260 to wait for the generator return to be serialized into the protobuf sent back to the caller. Note that we do not wait for the reply (unless under backpressure). We can later re-introduce asynchronous generator reports, but we will need to evaluate the performance benefit of a new implementation that also addresses both of the above points. --------- Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
… before continuing (ray-project#44257) ray-project#42260 updated streaming generator tasks to asynchronously report generator returns, instead of synchronously reporting each generator return before yielding the next return. However this has a couple problems: If the task still has a reference to the yielded value, it may modify the value. The serialized and reported return will then have a different value than expected. As per [core] Streaming generator task waits for all object report acks before finishing the task ray-project#44079, we need to track the number of in-flight RPCs to report generator returns, so that we can wait for them all to reply before we return from the end of the task. If we increment the count of in-flight RPCs asynchronously, we can end up returning from the task while there are still in-flight RPCs. So this PR reverts some of the logic in ray-project#42260 to wait for the generator return to be serialized into the protobuf sent back to the caller. Note that we do not wait for the reply (unless under backpressure). We can later re-introduce asynchronous generator reports, but we will need to evaluate the performance benefit of a new implementation that also addresses both of the above points. --------- Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
Why are these changes needed?
Currently, when streaming responses back from one actor/task to another in Ray Core, execution of an actual IO is performed synchronously, ie every object yielded by the generator will be
This PR addresses opportunity to overlap serialization & network IO (holding no GIL), w/ actual execution of the target generator producing target output.
Overlapping network I/O (not blocking GIL) with streaming generator allows us to considerably P50 latencies
Related issue number
Checks
git commit -s
) in this PR.scripts/format.sh
to lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/
under thecorresponding
.rst
file.