Skip to content

Commit

Permalink
Add Dataset Stats verbosity, Only log extra_metrics if verbosity is s…
Browse files Browse the repository at this point in the history
…et (ray-project#42789)

The current DatasetStats logs include extra_metrics which are less helpful, so this PR hides them by default allowing users to turn them on through data context.

---------

Signed-off-by: Matthew Owen <mowen@anyscale.com>
Signed-off-by: Matthew Owen <omatthew98@berkeley.edu>
Co-authored-by: Scott Lee <scottjlee@users.noreply.github.com>
  • Loading branch information
omatthew98 and scottjlee authored Feb 2, 2024
1 parent c202890 commit bccab24
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 63 deletions.
2 changes: 1 addition & 1 deletion python/ray/data/_internal/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -760,7 +760,7 @@ def to_string(
else:
already_printed.add(operator_uuid)
out += str(operators_stats_summary)
if self.extra_metrics:
if DataContext.get_current().verbose_stats_logs and self.extra_metrics:
indent = (
"\t"
if operators_stats_summary and operators_stats_summary.is_sub_operator
Expand Down
7 changes: 7 additions & 0 deletions python/ray/data/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@
# If disabled, users can still manually print stats with Dataset.stats().
DEFAULT_AUTO_LOG_STATS = False

# Whether stats logs should be verbose. This will include fields such
# as `extra_metrics` in the stats output, which are excluded by default.
DEFAULT_VERBOSE_STATS_LOG = False

# Set this env var to enable distributed tqdm (experimental).
DEFAULT_USE_RAY_TQDM = bool(int(os.environ.get("RAY_TQDM", "1")))

Expand Down Expand Up @@ -170,6 +174,7 @@ def __init__(
min_parallelism: bool,
enable_tensor_extension_casting: bool,
enable_auto_log_stats: bool,
verbose_stats_log: bool,
trace_allocations: bool,
execution_options: "ExecutionOptions",
use_ray_tqdm: bool,
Expand Down Expand Up @@ -200,6 +205,7 @@ def __init__(
self.min_parallelism = min_parallelism
self.enable_tensor_extension_casting = enable_tensor_extension_casting
self.enable_auto_log_stats = enable_auto_log_stats
self.verbose_stats_logs = verbose_stats_log
self.trace_allocations = trace_allocations
# TODO: expose execution options in Dataset public APIs.
self.execution_options = execution_options
Expand Down Expand Up @@ -271,6 +277,7 @@ def get_current() -> "DataContext":
DEFAULT_ENABLE_TENSOR_EXTENSION_CASTING
),
enable_auto_log_stats=DEFAULT_AUTO_LOG_STATS,
verbose_stats_log=DEFAULT_VERBOSE_STATS_LOG,
trace_allocations=DEFAULT_TRACE_ALLOCATIONS,
execution_options=ray.data.ExecutionOptions(),
use_ray_tqdm=DEFAULT_USE_RAY_TQDM,
Expand Down
127 changes: 65 additions & 62 deletions python/ray/data/tests/test_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
)
from ray.data._internal.util import create_dataset_tag
from ray.data.block import BlockMetadata
from ray.data.context import DataContext
from ray.data.tests.util import column_udf
from ray.tests.conftest import * # noqa

Expand Down Expand Up @@ -71,6 +72,10 @@ def gen_expected_metrics(
return "{" + ", ".join(metrics) + "}"


def gen_extra_metrics_str(metrics: str, verbose: bool):
return f"* Extra metrics: {metrics}" + "\n" if verbose else ""


STANDARD_EXTRA_METRICS = gen_expected_metrics(
is_map=True,
spilled=False,
Expand Down Expand Up @@ -169,7 +174,9 @@ def patch_update_stats_actor_iter():
yield update_fn


def test_streaming_split_stats(ray_start_regular_shared):
def test_streaming_split_stats(ray_start_regular_shared, restore_data_context):
context = DataContext.get_current()
context.verbose_stats_logs = True
ds = ray.data.range(1000, parallelism=10)
it = ds.map_batches(dummy_map_batches).streaming_split(1)[0]
list(it.iter_batches())
Expand Down Expand Up @@ -209,36 +216,42 @@ def test_streaming_split_stats(ray_start_regular_shared):
)


def test_large_args_scheduling_strategy(ray_start_regular_shared):
@pytest.mark.parametrize("verbose_stats_logs", [True, False])
def test_large_args_scheduling_strategy(ray_start_regular_shared, verbose_stats_logs):
context = DataContext.get_current()
context.verbose_stats_logs = verbose_stats_logs
ds = ray.data.range_tensor(100, shape=(100000,), parallelism=1)
ds = ds.map_batches(dummy_map_batches, num_cpus=0.9).materialize()
stats = ds.stats()
assert (
canonicalize(stats)
== f"""Operator N ReadRange: {EXECUTION_STRING}
* Remote wall time: T min, T max, T mean, T total
* Remote cpu time: T min, T max, T mean, T total
* Peak heap memory usage (MiB): N min, N max, N mean
* Output num rows per block: N min, N max, N mean, N total
* Output size bytes per block: N min, N max, N mean, N total
* Output rows per task: N min, N max, N mean, N tasks used
* Tasks per node: N min, N max, N mean; N nodes used
* Extra metrics: {STANDARD_EXTRA_METRICS}
Operator N MapBatches(dummy_map_batches): {EXECUTION_STRING}
* Remote wall time: T min, T max, T mean, T total
* Remote cpu time: T min, T max, T mean, T total
* Peak heap memory usage (MiB): N min, N max, N mean
* Output num rows per block: N min, N max, N mean, N total
* Output size bytes per block: N min, N max, N mean, N total
* Output rows per task: N min, N max, N mean, N tasks used
* Tasks per node: N min, N max, N mean; N nodes used
* Extra metrics: {LARGE_ARGS_EXTRA_METRICS}
"""
expected_stats = (
f"Operator N ReadRange: {EXECUTION_STRING}\n"
f"* Remote wall time: T min, T max, T mean, T total\n"
f"* Remote cpu time: T min, T max, T mean, T total\n"
f"* Peak heap memory usage (MiB): N min, N max, N mean\n"
f"* Output num rows per block: N min, N max, N mean, N total\n"
f"* Output size bytes per block: N min, N max, N mean, N total\n"
f"* Output rows per task: N min, N max, N mean, N tasks used\n"
f"* Tasks per node: N min, N max, N mean; N nodes used\n"
f"{gen_extra_metrics_str(STANDARD_EXTRA_METRICS, verbose_stats_logs)}\n"
f"Operator N MapBatches(dummy_map_batches): {EXECUTION_STRING}\n"
f"* Remote wall time: T min, T max, T mean, T total\n"
f"* Remote cpu time: T min, T max, T mean, T total\n"
f"* Peak heap memory usage (MiB): N min, N max, N mean\n"
f"* Output num rows per block: N min, N max, N mean, N total\n"
f"* Output size bytes per block: N min, N max, N mean, N total\n"
f"* Output rows per task: N min, N max, N mean, N tasks used\n"
f"* Tasks per node: N min, N max, N mean; N nodes used\n"
f"{gen_extra_metrics_str(LARGE_ARGS_EXTRA_METRICS, verbose_stats_logs)}"
)
assert canonicalize(stats) == expected_stats


def test_dataset_stats_basic(ray_start_regular_shared, enable_auto_log_stats):
@pytest.mark.parametrize("verbose_stats_logs", [True, False])
def test_dataset_stats_basic(
ray_start_regular_shared, enable_auto_log_stats, verbose_stats_logs
):
context = DataContext.get_current()
context.verbose_stats_logs = verbose_stats_logs
logger = DatasetLogger(
"ray.data._internal.execution.streaming_executor"
).get_logger(
Expand All @@ -262,7 +275,7 @@ def test_dataset_stats_basic(ray_start_regular_shared, enable_auto_log_stats):
f"* Output size bytes per block: N min, N max, N mean, N total\n"
f"* Output rows per task: N min, N max, N mean, N tasks used\n"
f"* Tasks per node: N min, N max, N mean; N nodes used\n"
f"* Extra metrics: {STANDARD_EXTRA_METRICS}\n"
f"{gen_extra_metrics_str(STANDARD_EXTRA_METRICS, verbose_stats_logs)}" # noqa: E501
)

ds = ds.map(dummy_map_batches).materialize()
Expand All @@ -278,7 +291,7 @@ def test_dataset_stats_basic(ray_start_regular_shared, enable_auto_log_stats):
f"* Output size bytes per block: N min, N max, N mean, N total\n"
f"* Output rows per task: N min, N max, N mean, N tasks used\n"
f"* Tasks per node: N min, N max, N mean; N nodes used\n"
f"* Extra metrics: {STANDARD_EXTRA_METRICS}\n"
f"{gen_extra_metrics_str(STANDARD_EXTRA_METRICS, verbose_stats_logs)}" # noqa: E501
)

for batch in ds.iter_batches():
Expand All @@ -294,8 +307,7 @@ def test_dataset_stats_basic(ray_start_regular_shared, enable_auto_log_stats):
f"* Output size bytes per block: N min, N max, N mean, N total\n"
f"* Output rows per task: N min, N max, N mean, N tasks used\n"
f"* Tasks per node: N min, N max, N mean; N nodes used\n"
f"* Extra metrics: {STANDARD_EXTRA_METRICS}\n"
f"\n"
f"{gen_extra_metrics_str(STANDARD_EXTRA_METRICS, verbose_stats_logs)}\n"
f"Operator N Map(dummy_map_batches): {EXECUTION_STRING}\n"
f"* Remote wall time: T min, T max, T mean, T total\n"
f"* Remote cpu time: T min, T max, T mean, T total\n"
Expand All @@ -304,8 +316,7 @@ def test_dataset_stats_basic(ray_start_regular_shared, enable_auto_log_stats):
f"* Output size bytes per block: N min, N max, N mean, N total\n"
f"* Output rows per task: N min, N max, N mean, N tasks used\n"
f"* Tasks per node: N min, N max, N mean; N nodes used\n"
f"* Extra metrics: {STANDARD_EXTRA_METRICS}\n"
f"\n"
f"{gen_extra_metrics_str(STANDARD_EXTRA_METRICS, verbose_stats_logs)}\n"
f"Dataset iterator time breakdown:\n"
f"* Total time user code is blocked: T\n"
f"* Total time in user code: T\n"
Expand Down Expand Up @@ -501,7 +512,7 @@ def test_dataset_stats_shuffle(ray_start_regular_shared):
stats = canonicalize(ds.materialize().stats())
assert (
stats
== f"""Operator N ReadRange->RandomShuffle: executed in T
== """Operator N ReadRange->RandomShuffle: executed in T
Suboperator Z ReadRange->RandomShuffleMap: N tasks executed, N blocks produced
* Remote wall time: T min, T max, T mean, T total
Expand All @@ -520,7 +531,6 @@ def test_dataset_stats_shuffle(ray_start_regular_shared):
* Output size bytes per block: N min, N max, N mean, N total
* Output rows per task: N min, N max, N mean, N tasks used
* Tasks per node: N min, N max, N mean; N nodes used
* Extra metrics: {gen_expected_metrics(is_map=False)}
Operator N Repartition: executed in T
Expand All @@ -541,7 +551,6 @@ def test_dataset_stats_shuffle(ray_start_regular_shared):
* Output size bytes per block: N min, N max, N mean, N total
* Output rows per task: N min, N max, N mean, N tasks used
* Tasks per node: N min, N max, N mean; N nodes used
* Extra metrics: {gen_expected_metrics(is_map=False)}
"""
)

Expand Down Expand Up @@ -595,7 +604,6 @@ def test_dataset_stats_read_parquet(ray_start_regular_shared, tmp_path):
f"* Output size bytes per block: N min, N max, N mean, N total\n"
f"* Output rows per task: N min, N max, N mean, N tasks used\n"
f"* Tasks per node: N min, N max, N mean; N nodes used\n"
f"* Extra metrics: {STANDARD_EXTRA_METRICS}\n"
)


Expand All @@ -615,7 +623,6 @@ def test_dataset_split_stats(ray_start_regular_shared, tmp_path):
f"* Output size bytes per block: N min, N max, N mean, N total\n"
f"* Output rows per task: N min, N max, N mean, N tasks used\n"
f"* Tasks per node: N min, N max, N mean; N nodes used\n"
f"* Extra metrics: {STANDARD_EXTRA_METRICS}\n"
f"\n"
f"Operator N Split: {EXECUTION_STRING}\n"
f"* Remote wall time: T min, T max, T mean, T total\n"
Expand All @@ -634,7 +641,6 @@ def test_dataset_split_stats(ray_start_regular_shared, tmp_path):
f"* Output size bytes per block: N min, N max, N mean, N total\n"
f"* Output rows per task: N min, N max, N mean, N tasks used\n"
f"* Tasks per node: N min, N max, N mean; N nodes used\n"
f"* Extra metrics: {STANDARD_EXTRA_METRICS}\n"
)


Expand Down Expand Up @@ -808,7 +814,6 @@ def test_streaming_stats_full(ray_start_regular_shared, restore_data_context):
* Output size bytes per block: N min, N max, N mean, N total
* Output rows per task: N min, N max, N mean, N tasks used
* Tasks per node: N min, N max, N mean; N nodes used
* Extra metrics: {STANDARD_EXTRA_METRICS}
Dataset iterator time breakdown:
* Total time user code is blocked: T
Expand Down Expand Up @@ -840,7 +845,6 @@ def test_write_ds_stats(ray_start_regular_shared, tmp_path):
* Output size bytes per block: N min, N max, N mean, N total
* Output rows per task: N min, N max, N mean, N tasks used
* Tasks per node: N min, N max, N mean; N nodes used
* Extra metrics: {STANDARD_EXTRA_METRICS}
"""
)

Expand All @@ -860,7 +864,6 @@ def test_write_ds_stats(ray_start_regular_shared, tmp_path):
* Output size bytes per block: N min, N max, N mean, N total
* Output rows per task: N min, N max, N mean, N tasks used
* Tasks per node: N min, N max, N mean; N nodes used
* Extra metrics: {STANDARD_EXTRA_METRICS}
Operator N Write: {EXECUTION_STRING}
* Remote wall time: T min, T max, T mean, T total
Expand All @@ -870,7 +873,6 @@ def test_write_ds_stats(ray_start_regular_shared, tmp_path):
* Output size bytes per block: N min, N max, N mean, N total
* Output rows per task: N min, N max, N mean, N tasks used
* Tasks per node: N min, N max, N mean; N nodes used
* Extra metrics: {STANDARD_EXTRA_METRICS}
"""
)

Expand Down Expand Up @@ -918,33 +920,34 @@ def test_stats_actor_cap_num_stats(ray_start_cluster):
assert ray.get(actor._get_stats_dict_size.remote()) == (3, 2, 2)


def test_spilled_stats(shutdown_only):
@pytest.mark.parametrize("verbose_stats_logs", [True, False])
def test_spilled_stats(shutdown_only, verbose_stats_logs):
context = DataContext.get_current()
context.verbose_stats_logs = verbose_stats_logs
# The object store is about 100MB.
ray.init(object_store_memory=100e6)
# The size of dataset is 1000*80*80*4*8B, about 200MB.
ds = ray.data.range(1000 * 80 * 80 * 4).map_batches(lambda x: x).materialize()

assert (
canonicalize(ds.stats(), filter_global_stats=False)
== f"""Operator N ReadRange->MapBatches(<lambda>): {EXECUTION_STRING}
* Remote wall time: T min, T max, T mean, T total
* Remote cpu time: T min, T max, T mean, T total
* Peak heap memory usage (MiB): N min, N max, N mean
* Output num rows per block: N min, N max, N mean, N total
* Output size bytes per block: N min, N max, N mean, N total
* Output rows per task: N min, N max, N mean, N tasks used
* Tasks per node: N min, N max, N mean; N nodes used
* Extra metrics: {MEM_SPILLED_EXTRA_METRICS}
Cluster memory:
* Spilled to disk: M
* Restored from disk: M
Dataset memory:
* Spilled to disk: M
"""
expected_stats = (
f"Operator N ReadRange->MapBatches(<lambda>): {EXECUTION_STRING}\n"
f"* Remote wall time: T min, T max, T mean, T total\n"
f"* Remote cpu time: T min, T max, T mean, T total\n"
f"* Peak heap memory usage (MiB): N min, N max, N mean\n"
f"* Output num rows per block: N min, N max, N mean, N total\n"
f"* Output size bytes per block: N min, N max, N mean, N total\n"
f"* Output rows per task: N min, N max, N mean, N tasks used\n"
f"* Tasks per node: N min, N max, N mean; N nodes used\n"
f"{gen_extra_metrics_str(MEM_SPILLED_EXTRA_METRICS, verbose_stats_logs)}\n"
f"Cluster memory:\n"
f"* Spilled to disk: M\n"
f"* Restored from disk: M\n"
f"\n"
f"Dataset memory:\n"
f"* Spilled to disk: M\n"
)

assert canonicalize(ds.stats(), filter_global_stats=False) == expected_stats

# Around 100MB should be spilled (200MB - 100MB)
assert ds._plan.stats().dataset_bytes_spilled > 100e6

Expand Down

0 comments on commit bccab24

Please sign in to comment.