Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Streaming] Revisiting Ray Core streaming to perform I/O fully async avoiding syncing gRPC client and Python generator #42260

Merged
merged 11 commits into from
Jan 25, 2024

Conversation

alexeykudinkin
Copy link
Contributor

@alexeykudinkin alexeykudinkin commented Jan 9, 2024

Why are these changes needed?

Currently, when streaming responses back from one actor/task to another in Ray Core, execution of an actual IO is performed synchronously, ie every object yielded by the generator will be

  1. Serialized (sync)
  2. Dispatched back to the caller (sync)

This PR addresses opportunity to overlap serialization & network IO (holding no GIL), w/ actual execution of the target generator producing target output.

Overlapping network I/O (not blocking GIL) with streaming generator allows us to considerably P50 latencies

  • P50: ~700ms -> 485ms (30%)
  • P75: ~720ms -> 600ms (16%)
  • P99: unchanged
----------------------------------------------------------------
Stream responses asynchronously (IO threads: 1; default)
----------------------------------------------------------------

Core Actors streaming throughput (ASYNC) (num_replicas=1, tokens_per_request=1000, batch_size=10): 14199.39 +- 93.58 tokens/s
(CallerActor pid=98043) Individual request quantiles:
(CallerActor pid=98043) 	P50=480.73095849999964
(CallerActor pid=98043) 	P75=603.5131979999991
(CallerActor pid=98043) 	P99=770.7011399399998


----------------------------------------------------------------
Skip back-pressure handler
----------------------------------------------------------------

Core Actors streaming throughput (ASYNC) (num_replicas=1, tokens_per_request=1000, batch_size=10): 14643.21 +- 352.34 tokens/s
(CallerActor pid=13155) Individual request quantiles:
(CallerActor pid=13155) 	P50=685.5565414999995
(CallerActor pid=13155) 	P75=707.7666252500006
(CallerActor pid=13155) 	P99=777.5089473099982

----------------------------------------------------------------
Baseline
----------------------------------------------------------------

Core Actors streaming throughput (ASYNC) (num_replicas=1, tokens_per_request=1000, batch_size=10): 14264.66 +- 162.12 tokens/s
(CallerActor pid=12256) Individual request quantiles:
(CallerActor pid=12256) 	P50=705.1566460000007
(CallerActor pid=12256) 	P75=723.1873227499985
(CallerActor pid=12256) 	P99=771.7197762799998

Related issue number

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

@alexeykudinkin alexeykudinkin requested review from rkooo567, edoakes and jjyao and removed request for rkooo567 and edoakes January 9, 2024 17:52
@alexeykudinkin alexeykudinkin changed the title [WIP][Streaming] Revisiting Ray Core streaming to perform I/O fully async avoiding syncing gRPC client and Python generator [Streaming] Revisiting Ray Core streaming to perform I/O fully async avoiding syncing gRPC client and Python generator Jan 11, 2024
@rkooo567 rkooo567 self-assigned this Jan 16, 2024
@rkooo567
Copy link
Contributor

I will review it by tmrw

Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Copy link
Contributor

@rkooo567 rkooo567 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally LGTM!

@@ -343,7 +343,7 @@ steps:

- label: ":ray: core: cpp worker tests"
tags: core_cpp
instance_type: small
instance_type: medium
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this necessary change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, recommended by @can-anyscale, as our C++ builds are periodically timing out

@@ -1398,20 +1393,32 @@ async def execute_streaming_generator_async(
raise
except Exception as e:
output_or_exception = e
is_exception = 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why don't we use bool here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leftover from another experiment, let me clean that one up

generator_backpressure_num_objects
)
else:
self.waiter = shared_ptr[CGeneratorBackpressureWaiter]()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part doesn't seem to be explained in the PR description. Is this because having a waiter implementation slows down the performance? Also, the result you posted on the PR description include this optimization?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, removing waiter in particular only brought about ~3% performance improvement which wasn't standing out in any way, but i think it's still make sense to remove it from the surface area provided that it's only relevant for Data but not for RPCs

# avoid blocking the event loop when serializing
# the output (which has nogil).
loop.run_in_executor(
worker.core_worker.get_thread_pool_for_async_event_loop(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, this won't work if we increase the thread pool size to > 1 (cuz I don't know if report_streaming_generator_output is thread-safe now). Can you add assert somewhere to make sure it is not updated?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's correct. Those APIs aren't thread-safe and i have discovered it in my experiments already, hence the cleanup that i started doing in #42443, to alleviate some of that (it's stacked on top of this one)

return Status::OK();
}
});
} else {
if (options_.check_signals) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think for else case, we can just return Status::OK. (if you look at the impl of WaitUntilObjectConsumed, that's the behavior now). check_signals is to avoid ignoring python signals while cpp is using the thread (which doesn't happen in this case)

Copy link
Contributor Author

@alexeykudinkin alexeykudinkin Jan 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have revisited it to avoid duplication, essentially sharing the callback

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually we don't need to check signal if there's no waiter. The purpose of checking signal is when the thread is blocked inside cpp (e.g., due to locks or sleep), it cannot check python interrupt. But if we don't have the waiter there's no such problem

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Will clean up to avoid checking for signals

@rkooo567 rkooo567 added the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label Jan 18, 2024
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
@alexeykudinkin alexeykudinkin removed the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label Jan 19, 2024
alexeykudinkin and others added 3 commits January 18, 2024 23:04
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
@@ -3016,40 +3016,52 @@ Status CoreWorker::ReportGeneratorItemReturns(
RAY_LOG(DEBUG) << "Write the object ref stream, index: " << item_index
<< ", id: " << dynamic_return_object.first;

waiter->IncrementObjectGenerated();
if (waiter) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

worth a comment why it's expected that waiter can be null

// backpressure.
waiter->UpdateTotalObjectConsumed(waiter->TotalObjectGenerated());
RAY_LOG(WARNING) << "Failed to send the object ref.";
if (waiter) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we avoid passing a callback entirely if there's no waiter? Not sure if this is handled any differently internally

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We still need to check for signals, waiter is just a back-pressure hook for us to hold down the thread if we want to slow down the producer

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

@rkooo567 rkooo567 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. One last comment regarding checking signals

return Status::OK();
}
});
} else {
if (options_.check_signals) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually we don't need to check signal if there's no waiter. The purpose of checking signal is when the thread is blocked inside cpp (e.g., due to locks or sleep), it cannot check python interrupt. But if we don't have the waiter there's no such problem

@rkooo567 rkooo567 merged commit c76a5ac into ray-project:master Jan 25, 2024
2 checks passed
@rkooo567
Copy link
Contributor

@alexeykudinkin I remember you contributed the benchmark. Is it running daily right now?

@edoakes
Copy link
Collaborator

edoakes commented Jan 25, 2024

@rkooo567 it's not. The serve release tests are not in good shape in general. @zcin is working on improving them in the coming months. Adding these should be part of it.

stephanie-wang added a commit that referenced this pull request Mar 26, 2024
… before continuing (#44257)

#42260 updated streaming generator tasks to asynchronously report generator returns, instead of synchronously reporting each generator return before yielding the next return. However this has a couple problems:

    If the task still has a reference to the yielded value, it may modify the value. The serialized and reported return will then have a different value than expected.

    As per [core] Streaming generator task waits for all object report acks before finishing the task #44079, we need to track the number of in-flight RPCs to report generator returns, so that we can wait for them all to reply before we return from the end of the task. If we increment the count of in-flight RPCs asynchronously, we can end up returning from the task while there are still in-flight RPCs.

So this PR reverts some of the logic in #42260 to wait for the generator return to be serialized into the protobuf sent back to the caller. Note that we do not wait for the reply (unless under backpressure).

We can later re-introduce asynchronous generator reports, but we will need to evaluate the performance benefit of a new implementation that also addresses both of the above points.

---------

Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
stephanie-wang added a commit to stephanie-wang/ray that referenced this pull request Mar 27, 2024
… before continuing (ray-project#44257)

ray-project#42260 updated streaming generator tasks to asynchronously report generator returns, instead of synchronously reporting each generator return before yielding the next return. However this has a couple problems:

    If the task still has a reference to the yielded value, it may modify the value. The serialized and reported return will then have a different value than expected.

    As per [core] Streaming generator task waits for all object report acks before finishing the task ray-project#44079, we need to track the number of in-flight RPCs to report generator returns, so that we can wait for them all to reply before we return from the end of the task. If we increment the count of in-flight RPCs asynchronously, we can end up returning from the task while there are still in-flight RPCs.

So this PR reverts some of the logic in ray-project#42260 to wait for the generator return to be serialized into the protobuf sent back to the caller. Note that we do not wait for the reply (unless under backpressure).

We can later re-introduce asynchronous generator reports, but we will need to evaluate the performance benefit of a new implementation that also addresses both of the above points.

---------

Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants