Skip to content

Commit

Permalink
[data] Simplify Operator.__repr__ (#50620)
Browse files Browse the repository at this point in the history
## Why are these changes needed?

Currently, `Operator.__repr__` prints the full DAG. It's too verbose. 
Simplify it to only print the current operator. Added `dag_str` for
the full DAG.


---------

Signed-off-by: Hao Chen <chenh1024@gmail.com>
  • Loading branch information
raulchen authored Feb 19, 2025
1 parent e94a60b commit 5ca12f9
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 11 deletions.
2 changes: 1 addition & 1 deletion python/ray/data/_internal/execution/streaming_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def execute(
if log_path is not None:
message += f" Full logs are in {log_path}"
logger.info(message)
logger.info(f"Execution plan of Dataset: {dag}")
logger.info(f"Execution plan of Dataset: {dag.dag_str}")

logger.debug("Execution config: %s", self._options)

Expand Down
19 changes: 12 additions & 7 deletions python/ray/data/_internal/logical/interfaces/operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,17 @@ def __init__(
def name(self) -> str:
return self._name

@property
def dag_str(self) -> str:
"""String representation of the whole DAG."""
if self.input_dependencies:
out_str = ", ".join([x.dag_str for x in self.input_dependencies])
out_str += " -> "
else:
out_str = ""
out_str += f"{self.__class__.__name__}[{self._name}]"
return out_str

@property
def input_dependencies(self) -> List["Operator"]:
"""List of operators that provide inputs for this operator."""
Expand All @@ -46,13 +57,7 @@ def post_order_iter(self) -> Iterator["Operator"]:
yield self

def __repr__(self) -> str:
if self.input_dependencies:
out_str = ", ".join([str(x) for x in self.input_dependencies])
out_str += " -> "
else:
out_str = ""
out_str += f"{self.__class__.__name__}[{self._name}]"
return out_str
return f"{self.__class__.__name__}[{self._name}]"

def __str__(self) -> str:
return repr(self)
2 changes: 1 addition & 1 deletion python/ray/data/tests/test_execution_optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -1745,7 +1745,7 @@ def test_schema_partial_execution(
# Verify that ds.schema() executes only the first block, and not the
# entire Dataset.
assert not ds._plan.has_computed_output()
assert str(ds._plan._logical_plan.dag) == (
assert ds._plan._logical_plan.dag.dag_str == (
"Read[ReadParquet] -> MapBatches[MapBatches(<lambda>)]"
)

Expand Down
28 changes: 28 additions & 0 deletions python/ray/data/tests/test_operators.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,34 @@ def _take_outputs(op: PhysicalOperator) -> List[Any]:
return output


def test_name_and_repr(ray_start_regular_shared):
inputs = make_ref_bundles([[1, 2], [3], [4, 5]])
input_op = InputDataBuffer(DataContext.get_current(), inputs)
map_op1 = MapOperator.create(
_mul2_map_data_prcessor,
input_op,
DataContext.get_current(),
name="map1",
)

assert map_op1.name == "map1"
assert map_op1.dag_str == "InputDataBuffer[Input] -> TaskPoolMapOperator[map1]"
assert str(map_op1) == "TaskPoolMapOperator[map1]"

map_op2 = MapOperator.create(
_mul2_map_data_prcessor,
map_op1,
DataContext.get_current(),
name="map2",
)
assert map_op2.name == "map2"
assert (
map_op2.dag_str
== "InputDataBuffer[Input] -> TaskPoolMapOperator[map1] -> TaskPoolMapOperator[map2]"
)
assert str(map_op2) == "TaskPoolMapOperator[map2]"


def test_input_data_buffer(ray_start_regular_shared):
# Create with bundles.
inputs = make_ref_bundles([[1, 2], [3], [4, 5]])
Expand Down
3 changes: 1 addition & 2 deletions python/ray/data/tests/test_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -1602,8 +1602,7 @@ def test_op_metrics_logging():
+ gen_expected_metrics(is_map=False)
) # .replace("'obj_store_mem_used': N", "'obj_store_mem_used': Z")
map_str = (
"Operator InputDataBuffer[Input] -> "
"TaskPoolMapOperator[ReadRange->MapBatches(<lambda>)] completed. "
"Operator TaskPoolMapOperator[ReadRange->MapBatches(<lambda>)] completed. "
"Operator Metrics:\n"
) + STANDARD_EXTRA_METRICS_TASK_BACKPRESSURE

Expand Down

0 comments on commit 5ca12f9

Please sign in to comment.