Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 43 additions & 9 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import contextvars
import concurrent.futures
import collections

from dataclasses import dataclass
from libc.stdint cimport (
int32_t,
int64_t,
Expand Down Expand Up @@ -1101,6 +1102,12 @@ cdef class StreamingGeneratorExecutionContext:

return self


@dataclass(frozen=True)
class StreamingGeneratorStats:
object_creation_dur_s: float
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we rename this to object_serialization_time_s



cdef report_streaming_generator_output(
StreamingGeneratorExecutionContext context,
output: object,
Expand All @@ -1122,6 +1129,8 @@ cdef report_streaming_generator_output(
# Ray Object created from an output.
c_pair[CObjectID, shared_ptr[CRayObject]] return_obj

start = time.perf_counter()

# Report the intermediate result if there was no error.
create_generator_return_obj(
output,
Expand All @@ -1148,6 +1157,8 @@ cdef report_streaming_generator_output(
return_obj.first,
is_plasma_object(return_obj.second)))

serialization_dur_s = time.perf_counter() - start

with nogil:
check_status(CCoreWorkerProcess.GetCoreWorker().ReportGeneratorItemReturns(
return_obj,
Expand All @@ -1158,6 +1169,11 @@ cdef report_streaming_generator_output(
context.waiter))


return StreamingGeneratorStats(
object_creation_dur_s=serialization_dur_s,
)


cdef report_streaming_generator_exception(
StreamingGeneratorExecutionContext context,
e: Exception,
Expand Down Expand Up @@ -1250,9 +1266,20 @@ cdef execute_streaming_generator_sync(StreamingGeneratorExecutionContext context
gen = context.generator

try:
for output in gen:
report_streaming_generator_output(context, output, gen_index, None)
gen_index += 1
stats = None

while True:
try:
# Send object serialization duration to the generator and retrieve
# next output
output = gen.send(stats)
# Track serialization duration of the next output
stats = report_streaming_generator_output(context, output, gen_index, None)

gen_index += 1

except StopIteration:
break
except Exception as e:
report_streaming_generator_exception(context, e, gen_index, None)

Expand Down Expand Up @@ -1306,8 +1333,11 @@ async def execute_streaming_generator_async(
interrupt_signal_event = threading.Event()

try:
try:
async for output in gen:
stats = None

while True:
try:
output = await gen.asend(stats)
# NOTE: Report of streaming generator output is done in a
# standalone thread-pool to avoid blocking the event loop,
# since serializing and actual RPC I/O is done with "nogil". We
Expand All @@ -1320,7 +1350,7 @@ async def execute_streaming_generator_async(
# are currently under backpressure. Then we need to wait for an
# ack from the caller (the reply for a possibly previous report
# RPC) that they have consumed more ObjectRefs.
await loop.run_in_executor(
stats = await loop.run_in_executor(
executor,
report_streaming_generator_output,
context,
Expand All @@ -1329,9 +1359,13 @@ async def execute_streaming_generator_async(
interrupt_signal_event,
)
cur_generator_index += 1
except Exception as e:
# Report the exception to the owner of the task.
report_streaming_generator_exception(context, e, cur_generator_index, None)

except StopAsyncIteration:
break

except Exception as e:
# Report the exception to the owner of the task.
report_streaming_generator_exception(context, e, cur_generator_index, None)

except BaseException as be:
# NOTE: PLEASE READ CAREFULLY BEFORE CHANGING
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ class RunningTaskInfo:
bytes_outputs: int
num_rows_produced: int
start_time: float
cum_block_gen_time: float
cum_block_gen_time_s: float
cum_block_ser_time_s: float
task_id: ray.TaskID


Expand Down Expand Up @@ -384,6 +385,11 @@ class OpRuntimeMetrics(metaclass=OpRuntimesMetricsMeta):
description="Time spent generating blocks in tasks.",
metrics_group=MetricsGroup.TASKS,
)
block_serialization_time_s: float = metric_field(
default=0,
description="Time spent serializing blocks produced.",
metrics_group=MetricsGroup.TASKS,
)
task_submission_backpressure_time: float = metric_field(
default=0,
description="Time spent in task submission backpressure.",
Expand Down Expand Up @@ -648,7 +654,10 @@ def obj_store_mem_internal_outqueue(self) -> int:
def obj_store_mem_pending_task_inputs(self) -> int:
return self._pending_task_inputs.estimate_size_bytes()

@property
@metric_property(
description="Byte size of *pending* (not yielded yet) output blocks in running tasks.",
metrics_group=MetricsGroup.OBJECT_STORE_MEMORY,
)
def obj_store_mem_pending_task_outputs(self) -> Optional[float]:
"""Estimated size in bytes of output blocks in Ray generator buffers.

Expand Down Expand Up @@ -859,7 +868,8 @@ def on_task_submitted(
bytes_outputs=0,
num_rows_produced=0,
start_time=time.perf_counter(),
cum_block_gen_time=0,
cum_block_gen_time_s=0,
cum_block_ser_time_s=0,
task_id=ray.TaskID.nil() if task_id is None else task_id,
)

Expand Down Expand Up @@ -887,18 +897,29 @@ def on_task_output_generated(self, task_index: int, output: RefBundle):
task_info.num_rows_produced += num_rows_produced

for block_ref, meta in output.blocks:
exec_stats = meta.exec_stats

assert (
meta.exec_stats is not None and meta.exec_stats.wall_time_s is not None
exec_stats is not None
and exec_stats.wall_time_s is not None
and exec_stats.block_ser_time_s is not None
)
self.block_generation_time += meta.exec_stats.wall_time_s
task_info.cum_block_gen_time += meta.exec_stats.wall_time_s

self.block_generation_time += exec_stats.wall_time_s
self.block_serialization_time_s += exec_stats.block_ser_time_s

task_info.cum_block_gen_time_s += exec_stats.wall_time_s
task_info.cum_block_ser_time_s += exec_stats.block_ser_time_s

assert meta.num_rows is not None

trace_allocation(block_ref, "operator_output")
if meta.exec_stats.max_uss_bytes is not None:

if exec_stats.max_uss_bytes is not None:
if self._cum_max_uss_bytes is None:
self._cum_max_uss_bytes = meta.exec_stats.max_uss_bytes
self._cum_max_uss_bytes = exec_stats.max_uss_bytes
else:
self._cum_max_uss_bytes += meta.exec_stats.max_uss_bytes
self._cum_max_uss_bytes += exec_stats.max_uss_bytes

# Update per node metrics
if self._per_node_metrics_enabled:
Expand Down Expand Up @@ -926,18 +947,23 @@ def on_task_finished(self, task_index: int, exception: Optional[Exception]):
self.task_completion_time_total_s += task_time_delta
self.task_completion_time.observe(task_time_delta)

assert task_info.cum_block_gen_time is not None
assert task_info.cum_block_gen_time_s is not None
if task_info.num_outputs > 0:
# Calculate the average block generation time per block
block_time_delta = task_info.cum_block_gen_time / task_info.num_outputs
block_time_delta = (
task_info.cum_block_gen_time_s + task_info.cum_block_ser_time_s
) / task_info.num_outputs

self.block_completion_time.observe(
block_time_delta, num_observations=task_info.num_outputs
)

# NOTE: This is used for Issue Detection
self._op_task_duration_stats.add_duration(task_time_delta)

self.task_completion_time_excl_backpressure_s += task_info.cum_block_gen_time
self.task_completion_time_excl_backpressure_s += (
task_info.cum_block_gen_time_s + task_info.cum_block_ser_time_s
)
inputs = self._running_tasks[task_index].inputs
self.num_task_inputs_processed += len(inputs)
total_input_size = inputs.size_bytes()
Expand Down
14 changes: 11 additions & 3 deletions python/ray/data/_internal/execution/operators/hash_shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from ray._private.ray_constants import (
env_integer,
)
from ray._raylet import StreamingGeneratorStats
from ray.actor import ActorHandle
from ray.data._internal.arrow_block import ArrowBlockBuilder
from ray.data._internal.arrow_ops.transform_pyarrow import (
Expand Down Expand Up @@ -273,7 +274,9 @@ def _shuffle_block(
)

if block.num_rows == 0:
empty = BlockAccessor.for_block(block).get_metadata(exec_stats=stats.build())
empty = BlockAccessor.for_block(block).get_metadata(
exec_stats=stats.build(block_ser_time_s=0)
)
return (empty, {})

num_partitions = pool.num_partitions
Expand Down Expand Up @@ -346,7 +349,7 @@ def _shuffle_block(
i += 1

original_block_metadata = BlockAccessor.for_block(block).get_metadata(
exec_stats=stats.build()
exec_stats=stats.build(block_ser_time_s=0)
)

if logger.isEnabledFor(logging.DEBUG):
Expand Down Expand Up @@ -1745,7 +1748,12 @@ def finalize(
exec_stats = exec_stats_builder.build()
exec_stats_builder = BlockExecStats.builder()

yield block
stats: StreamingGeneratorStats = yield block

# Update block serialization time
if stats:
exec_stats.block_ser_time_s = stats.object_creation_dur_s

yield BlockMetadataWithSchema.from_block(block, stats=exec_stats)

def _debug_dump(self):
Expand Down
36 changes: 24 additions & 12 deletions python/ray/data/_internal/execution/operators/map_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import itertools
import logging
from abc import ABC, abstractmethod
from dataclasses import replace
from typing import (
TYPE_CHECKING,
Any,
Expand All @@ -21,7 +22,7 @@

import ray
from ray import ObjectRef
from ray._raylet import ObjectRefGenerator
from ray._raylet import ObjectRefGenerator, StreamingGeneratorStats
from ray.data._internal.compute import (
ActorPoolStrategy,
ComputeStrategy,
Expand Down Expand Up @@ -746,7 +747,9 @@ def _map_task(
)
DataContext._set_current(data_context)
ctx.kwargs.update(kwargs)

TaskContext.set_current(ctx)

stats = BlockExecStats.builder()
map_transformer.override_target_max_block_size(ctx.target_max_block_size_override)
block_iter: Iterable[Block]
Expand All @@ -756,17 +759,26 @@ def _map_task(
block_iter = iter(blocks)

with MemoryProfiler(data_context.memory_usage_poll_interval_s) as profiler:
for b_out in map_transformer.apply_transform(block_iter, ctx):
# TODO(Clark): Add input file propagation from input blocks.
m_out = BlockAccessor.for_block(b_out).get_metadata()
s_out = BlockAccessor.for_block(b_out).schema()
m_out.exec_stats = stats.build()
m_out.exec_stats.udf_time_s = map_transformer.udf_time()
m_out.exec_stats.task_idx = ctx.task_idx
m_out.exec_stats.max_uss_bytes = profiler.estimate_max_uss()
meta_with_schema = BlockMetadataWithSchema(metadata=m_out, schema=s_out)
yield b_out
yield meta_with_schema
for block in map_transformer.apply_transform(block_iter, ctx):
block_meta = BlockAccessor.for_block(block).get_metadata()
block_schema = BlockAccessor.for_block(block).schema()

# Derive block execution stats
exec_stats = stats.build()
# Yield block and retrieve its Ray object serialization timing
stats: StreamingGeneratorStats = yield block
if stats:
exec_stats.block_ser_time_s = stats.object_creation_dur_s
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when operators are fused, can we assert in that test this value is 0?


exec_stats.udf_time_s = map_transformer.udf_time_s(reset=True)
exec_stats.task_idx = ctx.task_idx
exec_stats.max_uss_bytes = profiler.estimate_max_uss()

yield BlockMetadataWithSchema(
metadata=replace(block_meta, exec_stats=exec_stats), schema=block_schema
)

# Reset trackers
stats = BlockExecStats.builder()
profiler.reset()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ def __init__(
self._transform_fns = []
self._init_fn = init_fn if init_fn is not None else lambda: None
self._output_block_size_option_override = output_block_size_option_override
self._udf_time = 0
self._udf_time_s = 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Maybe a type var to denote secs?


# Add transformations
self.add_transform_fns(transform_fns)
Expand Down Expand Up @@ -202,7 +202,7 @@ def _udf_timed_iter(
try:
start = time.perf_counter()
output = next(input)
self._udf_time += time.perf_counter() - start
self._udf_time_s += time.perf_counter() - start
yield output
except StopIteration:
break
Expand Down Expand Up @@ -274,8 +274,11 @@ def _combine_transformations(
) -> list[Any]:
return ones + others

def udf_time(self) -> float:
return self._udf_time
def udf_time_s(self, reset: bool) -> float:
cur_time_s = self._udf_time_s
if reset:
self._udf_time_s = 0
return cur_time_s


# Below are subclasses of MapTransformFn.
Expand Down
4 changes: 3 additions & 1 deletion python/ray/data/block.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ def __init__(self):
self.end_time_s: Optional[float] = None
self.wall_time_s: Optional[float] = None
self.udf_time_s: Optional[float] = 0
self.block_ser_time_s: Optional[float] = None
self.cpu_time_s: Optional[float] = None
self.node_id = ray.runtime_context.get_runtime_context().get_node_id()
self.max_uss_bytes: int = 0
Expand Down Expand Up @@ -205,7 +206,7 @@ def __init__(self):
self._start_time = time.perf_counter()
self._start_cpu = time.process_time()

def build(self) -> "BlockExecStats":
def build(self, block_ser_time_s: Optional[int] = None) -> "BlockExecStats":
# Record end times.
end_time = time.perf_counter()
end_cpu = time.process_time()
Expand All @@ -216,6 +217,7 @@ def build(self) -> "BlockExecStats":
stats.end_time_s = end_time
stats.wall_time_s = end_time - self._start_time
stats.cpu_time_s = end_cpu - self._start_cpu
stats.block_ser_time_s = block_ser_time_s

return stats

Expand Down
Loading