Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Streaming] Revisiting Ray Core streaming to perform I/O fully async avoiding syncing gRPC client and Python generator #42260

Merged
merged 11 commits into from
Jan 25, 2024
Prev Previous commit
Next Next commit
Tidying up
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
  • Loading branch information
alexeykudinkin committed Jan 19, 2024
commit 42f5e0bb33302bf7c4661f4e42403c28b532b0eb
15 changes: 6 additions & 9 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -1228,10 +1228,9 @@ cdef class StreamingGeneratorExecutionContext:

return self


cdef report_streaming_generator_output(
output_or_exception: Union[object, Exception],
StreamingGeneratorExecutionContext context,
output_or_exception: Union[object, Exception],
generator_index: int64_t
):
"""Report a given generator output to a caller.
Expand All @@ -1242,14 +1241,11 @@ cdef report_streaming_generator_output(
True otherwise.

Args:
context: Streaming generator's execution context.
output_or_exception: The output yielded from a
generator or raised as an exception.
context: The execution context.

Returns:
True if a generator that produced the output
shouldn't resume anymore (i.e., if the
generator is done being used). False otherwise.
generator_index: index of the output element in the
generated sequence
"""
worker = ray._private.worker.global_worker

Expand Down Expand Up @@ -1310,6 +1306,7 @@ cdef report_streaming_generator_output(
context.waiter))



cdef execute_streaming_generator_sync(StreamingGeneratorExecutionContext context):
"""Execute a given generator and streaming-report the
result to the given caller_address.
Expand Down Expand Up @@ -1343,7 +1340,7 @@ cdef execute_streaming_generator_sync(StreamingGeneratorExecutionContext context
except Exception as e:
output_or_exception = e

report_streaming_generator_output(output_or_exception, context, gen_index)
report_streaming_generator_output(context, output_or_exception, gen_index)
gen_index += 1

if isinstance(output_or_exception, Exception):
Expand Down