Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Streaming] Revisiting Ray Core streaming to perform I/O fully async avoiding syncing gRPC client and Python generator #42260

Merged
merged 11 commits into from
Jan 25, 2024
Prev Previous commit
Next Next commit
lint
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
  • Loading branch information
alexeykudinkin committed Jan 17, 2024
commit f633401f979d0ef5a9c6589bad5bddd54c8e8b9a
4 changes: 3 additions & 1 deletion python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -1220,7 +1220,9 @@ cdef class StreamingGeneratorExecutionContext:
self.should_retry_exceptions = should_retry_exceptions

if generator_backpressure_num_objects > 0:
self.waiter = make_shared[CGeneratorBackpressureWaiter](generator_backpressure_num_objects)
self.waiter = make_shared[CGeneratorBackpressureWaiter](
generator_backpressure_num_objects
)
else:
self.waiter = shared_ptr[CGeneratorBackpressureWaiter]()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part doesn't seem to be explained in the PR description. Is this because having a waiter implementation slows down the performance? Also, the result you posted on the PR description include this optimization?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, removing waiter in particular only brought about ~3% performance improvement which wasn't standing out in any way, but i think it's still make sense to remove it from the surface area provided that it's only relevant for Data but not for RPCs


Expand Down
7 changes: 4 additions & 3 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3029,13 +3029,14 @@ Status CoreWorker::ReportGeneratorItemReturns(
<< reply.total_num_object_consumed();
if (waiter) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we avoid passing a callback entirely if there's no waiter? Not sure if this is handled any differently internally

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We still need to check for signals, waiter is just a back-pressure hook for us to hold down the thread if we want to slow down the producer

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RAY_LOG(DEBUG) << "Total object consumed: " << waiter->TotalObjectConsumed()
<< ". Total object generated: " << waiter->TotalObjectGenerated();
<< ". Total object generated: "
<< waiter->TotalObjectGenerated();
if (status.ok()) {
/// Since unary gRPC requests are not ordered, it is possible the stale
/// total value can be replied. Since total object consumed only can
/// increment, we always choose the larger value here.
waiter->UpdateTotalObjectConsumed(
std::max(waiter->TotalObjectConsumed(), reply.total_num_object_consumed()));
waiter->UpdateTotalObjectConsumed(std::max(
waiter->TotalObjectConsumed(), reply.total_num_object_consumed()));
} else {
// TODO(sang): Handle network error more gracefully.
// If the request fails, we should just resume until task finishes without
Expand Down