Skip to content

Commit 5e2a276

Browse files
authored
[Data] Remove DatasetLogger (#44400)
Ray Data uses a DatasetLogger wrapper class for logging. Some downsides of this implementation are: - You need to add a redundant get_logger call whenever you log (i.e., logger.get_logger(log_to_stdout=False).info instead of logger.info) - There's a layer of indirection to the logging module (creating DatasetLogger instead of calling logging.getLogger directly) - Logging configuration is tightly coupled to the DatasetLogger implementation To simplify our code, this PR removes DatasetLogger and replaces it with the built-in logging.Logger object. The logger is appropriately configured so that the logging behavior doesn't change. --------- Signed-off-by: Balaji Veeramani <balaji@anyscale.com>
1 parent 9fb9d75 commit 5e2a276

31 files changed

+426
-423
lines changed

bazel/conftest.py

+9
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,12 @@ def shutdown_ray():
1313
def preserve_block_order():
1414
ray.data.context.DataContext.get_current().execution_options.preserve_order = True
1515
yield
16+
17+
18+
@pytest.fixture(autouse=True)
19+
def disable_start_message():
20+
context = ray.data.context.DataContext.get_current()
21+
original_value = context.print_on_execution_start
22+
context.print_on_execution_start = False
23+
yield
24+
context.print_on_execution_start = original_value

python/ray/data/BUILD

+11-2
Original file line numberDiff line numberDiff line change
@@ -250,9 +250,9 @@ py_test(
250250
)
251251

252252
py_test(
253-
name = "test_logger",
253+
name = "test_logging",
254254
size = "small",
255-
srcs = ["tests/test_logger.py"],
255+
srcs = ["tests/test_logging.py"],
256256
tags = ["team:data", "exclusive"],
257257
deps = ["//:ray_lib", ":conftest"],
258258
)
@@ -353,6 +353,15 @@ py_test(
353353
deps = ["//:ray_lib", ":conftest"],
354354
)
355355

356+
py_test(
357+
name = "test_exceptions",
358+
size = "small",
359+
srcs = ["tests/test_exceptions.py"],
360+
tags = ["team:data", "exclusive"],
361+
deps = ["//:ray_lib", ":conftest"],
362+
)
363+
364+
356365
py_test(
357366
name = "test_execution_optimizer",
358367
size = "medium",

python/ray/data/__init__.py

+2
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
ExecutionResources,
1111
NodeIdStr,
1212
)
13+
from ray.data._internal.logging import configure_logging
1314
from ray.data._internal.progress_bar import set_progress_bars
1415
from ray.data.context import DataContext, DatasetContext
1516
from ray.data.dataset import Dataset, Schema
@@ -62,6 +63,7 @@
6263
_cached_fn = None
6364
_cached_cls = None
6465

66+
configure_logging()
6567

6668
try:
6769
import pyarrow as pa

python/ray/data/_internal/dataset_logger.py

-111
This file was deleted.

python/ray/data/_internal/execution/operators/actor_pool_map_operator.py

+4-6
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
import collections
2+
import logging
23
from dataclasses import dataclass
34
from typing import Any, Dict, Iterator, List, Optional, Union
45

56
import ray
67
from ray.data._internal.compute import ActorPoolStrategy
7-
from ray.data._internal.dataset_logger import DatasetLogger
88
from ray.data._internal.execution.interfaces import (
99
ExecutionOptions,
1010
ExecutionResources,
@@ -20,7 +20,7 @@
2020
from ray.data.context import DataContext
2121
from ray.types import ObjectRef
2222

23-
logger = DatasetLogger(__name__)
23+
logger = logging.getLogger(__name__)
2424

2525
# Higher values here are better for prefetching and locality. It's ok for this to be
2626
# fairly high since streaming backpressure prevents us from overloading actors.
@@ -123,9 +123,7 @@ def start(self, options: ExecutionOptions):
123123
# situations where the scheduler is unable to schedule downstream operators
124124
# due to lack of available actors, causing an initial "pileup" of objects on
125125
# upstream operators, leading to a spike in memory usage prior to steady state.
126-
logger.get_logger(log_to_stdout=False).info(
127-
f"{self._name}: Waiting for {len(refs)} pool actors to start..."
128-
)
126+
logger.debug(f"{self._name}: Waiting for {len(refs)} pool actors to start...")
129127
try:
130128
ray.get(refs, timeout=DEFAULT_WAIT_FOR_MIN_ACTORS_SEC)
131129
except ray.exceptions.GetTimeoutError:
@@ -291,7 +289,7 @@ def shutdown(self):
291289
min_workers = self._autoscaling_policy.min_workers
292290
if len(self._output_metadata) < min_workers:
293291
# The user created a stream that has too few blocks to begin with.
294-
logger.get_logger().warning(
292+
logger.warning(
295293
"To ensure full parallelization across an actor pool of size "
296294
f"{min_workers}, the Dataset should consist of at least "
297295
f"{min_workers} distinct blocks. Consider increasing "

python/ray/data/_internal/execution/resource_manager.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
1+
import logging
12
import os
23
import time
34
from abc import ABC, abstractmethod
45
from collections import defaultdict
56
from typing import TYPE_CHECKING, Dict, Iterable, List, Optional
67

78
import ray
8-
from ray.data._internal.dataset_logger import DatasetLogger
99
from ray.data._internal.execution.interfaces.execution_options import (
1010
ExecutionOptions,
1111
ExecutionResources,
@@ -25,7 +25,7 @@
2525
from ray.data._internal.execution.streaming_executor_state import Topology
2626

2727

28-
logger = DatasetLogger(__name__)
28+
logger = logging.getLogger(__name__)
2929
DEBUG_RESOURCE_MANAGER = os.environ.get("RAY_DATA_DEBUG_RESOURCE_MANAGER", "0") == "1"
3030

3131

@@ -344,7 +344,7 @@ def print_warning_if_idle_for_too_long(
344344
" `DataContext.get_current().execution_options.exclude_resources`."
345345
" This message will only print once."
346346
)
347-
logger.get_logger(log_to_stdout=True).warning(msg)
347+
logger.warning(msg)
348348

349349
def __init__(self, resource_manager: ResourceManager, reservation_ratio: float):
350350
super().__init__(resource_manager)

python/ray/data/_internal/execution/streaming_executor.py

+20-27
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
1+
import logging
12
import threading
23
import time
34
import uuid
45
from typing import Dict, Iterator, List, Optional
56

6-
from ray.data._internal.dataset_logger import DatasetLogger
77
from ray.data._internal.execution.autoscaling_requester import (
88
get_or_create_autoscaling_requester_actor,
99
)
@@ -30,11 +30,12 @@
3030
select_operator_to_run,
3131
update_operator_states,
3232
)
33+
from ray.data._internal.logging import get_log_path
3334
from ray.data._internal.progress_bar import ProgressBar
3435
from ray.data._internal.stats import DatasetStats, StatsManager
3536
from ray.data.context import DataContext
3637

37-
logger = DatasetLogger(__name__)
38+
logger = logging.getLogger(__name__)
3839

3940
# Force a progress bar update after this many events processed . This avoids the
4041
# progress bar seeming to stall for very large scale workloads.
@@ -102,22 +103,16 @@ def execute(
102103
self._start_time = time.perf_counter()
103104

104105
if not isinstance(dag, InputDataBuffer):
105-
stdout_logger = logger.get_logger()
106-
log_path = logger.get_datasets_log_path()
107-
message = "Starting execution of Dataset."
108-
if log_path is not None:
109-
message += f" Full log is in {log_path}"
110-
stdout_logger.info(message)
111-
stdout_logger.info("Execution plan of Dataset: %s\n", dag)
112-
logger.get_logger(log_to_stdout=False).info(
113-
"Execution config: %s", self._options
114-
)
115-
if not self._options.verbose_progress:
116-
logger.get_logger(log_to_stdout=False).info(
117-
"Tip: For detailed progress reporting, run "
118-
"`ray.data.DataContext.get_current()."
119-
"execution_options.verbose_progress = True`"
120-
)
106+
context = DataContext.get_current()
107+
if context.print_on_execution_start:
108+
message = "Starting execution of Dataset."
109+
log_path = get_log_path()
110+
if log_path is not None:
111+
message += f" Full log is in {log_path}"
112+
logger.info(message)
113+
logger.info(f"Execution plan of Dataset: {dag}")
114+
115+
logger.debug("Execution config: %s", self._options)
121116

122117
# Setup the streaming DAG topology and start the runner thread.
123118
self._topology, _ = build_streaming_topology(dag, self._options)
@@ -171,7 +166,7 @@ def shutdown(self, execution_completed: bool = True):
171166
with self._shutdown_lock:
172167
if not self._execution_started or self._shutdown:
173168
return
174-
logger.get_logger(log_to_stdout=False).debug(f"Shutting down {self}.")
169+
logger.debug(f"Shutting down {self}.")
175170
_num_shutdown += 1
176171
self._shutdown = True
177172
# Give the scheduling loop some time to finish processing.
@@ -190,9 +185,8 @@ def shutdown(self, execution_completed: bool = True):
190185
stats_summary_string = self._final_stats.to_summary().to_string(
191186
include_parent=False
192187
)
193-
logger.get_logger(log_to_stdout=context.enable_auto_log_stats).info(
194-
stats_summary_string,
195-
)
188+
if context.enable_auto_log_stats:
189+
logger.info(stats_summary_string)
196190
# Close the progress bars from top to bottom to avoid them jumping
197191
# around in the console after completion.
198192
if self._global_info:
@@ -325,7 +319,7 @@ def _scheduling_loop_step(self, topology: Topology) -> bool:
325319
f"Operator {op} completed. "
326320
f"Operator Metrics:\n{op._metrics.as_dict()}"
327321
)
328-
logger.get_logger(log_to_stdout=False).info(log_str)
322+
logger.debug(log_str)
329323
self._has_op_completed[op] = True
330324

331325
# Keep going until all operators run to completion.
@@ -436,13 +430,12 @@ def _debug_dump_topology(topology: Topology, resource_manager: ResourceManager)
436430
topology: The topology to debug.
437431
resource_manager: The resource manager for this topology.
438432
"""
439-
logger.get_logger(log_to_stdout=False).info("Execution Progress:")
433+
logger.debug("Execution Progress:")
440434
for i, (op, state) in enumerate(topology.items()):
441-
logger.get_logger(log_to_stdout=False).info(
435+
logger.debug(
442436
f"{i}: {state.summary_str(resource_manager)}, "
443437
f"Blocks Outputted: {state.num_completed_tasks}/{op.num_outputs_total()}"
444438
)
445-
logger.get_logger(log_to_stdout=False).info("")
446439

447440

448441
def _log_op_metrics(topology: Topology) -> None:
@@ -454,4 +447,4 @@ def _log_op_metrics(topology: Topology) -> None:
454447
log_str = "Operator Metrics:\n"
455448
for op in topology:
456449
log_str += f"{op.name}: {op.metrics.as_dict()}\n"
457-
logger.get_logger(log_to_stdout=False).info(log_str)
450+
logger.debug(log_str)

python/ray/data/_internal/execution/streaming_executor_state.py

+4-4
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
This is split out from streaming_executor.py to facilitate better unit testing.
44
"""
55

6+
import logging
67
import math
78
import threading
89
import time
@@ -11,7 +12,6 @@
1112
from typing import Dict, List, Optional, Tuple
1213

1314
import ray
14-
from ray.data._internal.dataset_logger import DatasetLogger
1515
from ray.data._internal.execution.autoscaling_requester import (
1616
get_or_create_autoscaling_requester_actor,
1717
)
@@ -35,7 +35,7 @@
3535
from ray.data._internal.execution.resource_manager import ResourceManager
3636
from ray.data._internal.progress_bar import ProgressBar
3737

38-
logger = DatasetLogger(__name__)
38+
logger = logging.getLogger(__name__)
3939

4040
# Holds the full execution state of the streaming topology. It's a dict mapping each
4141
# operator to tracked streaming exec state.
@@ -436,14 +436,14 @@ def process_completed_tasks(
436436
" Ignoring this exception with remaining"
437437
f" max_errored_blocks={remaining}."
438438
)
439-
logger.get_logger().error(error_message, exc_info=e)
439+
logger.error(error_message, exc_info=e)
440440
else:
441441
error_message += (
442442
" Dataset execution will now abort."
443443
" To ignore this exception and continue, set"
444444
" DataContext.max_errored_blocks."
445445
)
446-
logger.get_logger().error(error_message)
446+
logger.error(error_message)
447447
raise e from None
448448
else:
449449
assert isinstance(task, MetadataOpTask)

0 commit comments

Comments
 (0)