Skip to content

Commit

Permalink
Tidying up
Browse files Browse the repository at this point in the history
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
  • Loading branch information
alexeykudinkin committed Jan 9, 2024
1 parent e514c47 commit 40252c4
Showing 1 changed file with 4 additions and 8 deletions.
12 changes: 4 additions & 8 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -1255,9 +1255,7 @@ cdef report_streaming_generator_output(
# Ray Object created from an output.
c_pair[CObjectID, shared_ptr[CRayObject]] return_obj

is_exception = isinstance(output_or_exception, Exception)

if is_exception:
if isinstance(output_or_exception, Exception):
create_generator_error_object(
output_or_exception,
worker,
Expand Down Expand Up @@ -1309,8 +1307,6 @@ cdef report_streaming_generator_output(
context.attempt_number,
context.waiter))

return is_exception


cdef execute_streaming_generator_sync(StreamingGeneratorExecutionContext context):
"""Execute a given generator and streaming-report the
Expand Down Expand Up @@ -1345,10 +1341,10 @@ cdef execute_streaming_generator_sync(StreamingGeneratorExecutionContext context
except Exception as e:
output_or_exception = e

done = report_streaming_generator_output(output_or_exception, context, gen_index)
report_streaming_generator_output(output_or_exception, context, gen_index)
gen_index += 1

if done:
if isinstance(output_or_exception, Exception):
break


Expand Down Expand Up @@ -1412,14 +1408,14 @@ async def execute_streaming_generator_async(
gen_index,
)
)

gen_index += 1

if is_exception:
break

# TODO elaborate
await asyncio.gather(*futures)
futures.clear()


cdef create_generator_return_obj(
Expand Down

0 comments on commit 40252c4

Please sign in to comment.