From 5ca12f9d21d1006aa5a4c6c23390603c9504b2ad Mon Sep 17 00:00:00 2001 From: Hao Chen Date: Wed, 19 Feb 2025 13:46:34 -0800 Subject: [PATCH] [data] Simplify Operator.__repr__ (#50620) ## Why are these changes needed? Currently, `Operator.__repr__` prints the full DAG. It's too verbose. Simplify it to only print the current operator. Added `dag_str` for the full DAG. --------- Signed-off-by: Hao Chen --- .../_internal/execution/streaming_executor.py | 2 +- .../_internal/logical/interfaces/operator.py | 19 ++++++++----- .../data/tests/test_execution_optimizer.py | 2 +- python/ray/data/tests/test_operators.py | 28 +++++++++++++++++++ python/ray/data/tests/test_stats.py | 3 +- 5 files changed, 43 insertions(+), 11 deletions(-) diff --git a/python/ray/data/_internal/execution/streaming_executor.py b/python/ray/data/_internal/execution/streaming_executor.py index 96ad24559721d..f01db6ac01818 100644 --- a/python/ray/data/_internal/execution/streaming_executor.py +++ b/python/ray/data/_internal/execution/streaming_executor.py @@ -106,7 +106,7 @@ def execute( if log_path is not None: message += f" Full logs are in {log_path}" logger.info(message) - logger.info(f"Execution plan of Dataset: {dag}") + logger.info(f"Execution plan of Dataset: {dag.dag_str}") logger.debug("Execution config: %s", self._options) diff --git a/python/ray/data/_internal/logical/interfaces/operator.py b/python/ray/data/_internal/logical/interfaces/operator.py index 76a320ef815a2..d5227721eff9e 100644 --- a/python/ray/data/_internal/logical/interfaces/operator.py +++ b/python/ray/data/_internal/logical/interfaces/operator.py @@ -23,6 +23,17 @@ def __init__( def name(self) -> str: return self._name + @property + def dag_str(self) -> str: + """String representation of the whole DAG.""" + if self.input_dependencies: + out_str = ", ".join([x.dag_str for x in self.input_dependencies]) + out_str += " -> " + else: + out_str = "" + out_str += f"{self.__class__.__name__}[{self._name}]" + return out_str + @property def input_dependencies(self) -> List["Operator"]: """List of operators that provide inputs for this operator.""" @@ -46,13 +57,7 @@ def post_order_iter(self) -> Iterator["Operator"]: yield self def __repr__(self) -> str: - if self.input_dependencies: - out_str = ", ".join([str(x) for x in self.input_dependencies]) - out_str += " -> " - else: - out_str = "" - out_str += f"{self.__class__.__name__}[{self._name}]" - return out_str + return f"{self.__class__.__name__}[{self._name}]" def __str__(self) -> str: return repr(self) diff --git a/python/ray/data/tests/test_execution_optimizer.py b/python/ray/data/tests/test_execution_optimizer.py index 621b59dabbffa..775d84f6323ed 100644 --- a/python/ray/data/tests/test_execution_optimizer.py +++ b/python/ray/data/tests/test_execution_optimizer.py @@ -1745,7 +1745,7 @@ def test_schema_partial_execution( # Verify that ds.schema() executes only the first block, and not the # entire Dataset. assert not ds._plan.has_computed_output() - assert str(ds._plan._logical_plan.dag) == ( + assert ds._plan._logical_plan.dag.dag_str == ( "Read[ReadParquet] -> MapBatches[MapBatches()]" ) diff --git a/python/ray/data/tests/test_operators.py b/python/ray/data/tests/test_operators.py index 42371b4cd7170..90a97e32a4a25 100644 --- a/python/ray/data/tests/test_operators.py +++ b/python/ray/data/tests/test_operators.py @@ -67,6 +67,34 @@ def _take_outputs(op: PhysicalOperator) -> List[Any]: return output +def test_name_and_repr(ray_start_regular_shared): + inputs = make_ref_bundles([[1, 2], [3], [4, 5]]) + input_op = InputDataBuffer(DataContext.get_current(), inputs) + map_op1 = MapOperator.create( + _mul2_map_data_prcessor, + input_op, + DataContext.get_current(), + name="map1", + ) + + assert map_op1.name == "map1" + assert map_op1.dag_str == "InputDataBuffer[Input] -> TaskPoolMapOperator[map1]" + assert str(map_op1) == "TaskPoolMapOperator[map1]" + + map_op2 = MapOperator.create( + _mul2_map_data_prcessor, + map_op1, + DataContext.get_current(), + name="map2", + ) + assert map_op2.name == "map2" + assert ( + map_op2.dag_str + == "InputDataBuffer[Input] -> TaskPoolMapOperator[map1] -> TaskPoolMapOperator[map2]" + ) + assert str(map_op2) == "TaskPoolMapOperator[map2]" + + def test_input_data_buffer(ray_start_regular_shared): # Create with bundles. inputs = make_ref_bundles([[1, 2], [3], [4, 5]]) diff --git a/python/ray/data/tests/test_stats.py b/python/ray/data/tests/test_stats.py index 1789b28e12122..1b36b4c11fa43 100644 --- a/python/ray/data/tests/test_stats.py +++ b/python/ray/data/tests/test_stats.py @@ -1602,8 +1602,7 @@ def test_op_metrics_logging(): + gen_expected_metrics(is_map=False) ) # .replace("'obj_store_mem_used': N", "'obj_store_mem_used': Z") map_str = ( - "Operator InputDataBuffer[Input] -> " - "TaskPoolMapOperator[ReadRange->MapBatches()] completed. " + "Operator TaskPoolMapOperator[ReadRange->MapBatches()] completed. " "Operator Metrics:\n" ) + STANDARD_EXTRA_METRICS_TASK_BACKPRESSURE