Skip to content

Commit

Permalink
[data] Spread map task stages by default for arg size <50MB (#36290)
Browse files Browse the repository at this point in the history
Change the default strategy for map tasks to SPREAD for arg size <50MB.

Simplify the handling of the scheduling strategy option to always use the context specified default (SPREAD).
The idea here is that SPREAD is a better default, since the result of not spreading when we should is disastrous for performance. The tradeoff is that in certain configurations, we lose task object locality for large args, so we set a threshold to avoid this downside.
  • Loading branch information
ericl authored Jun 15, 2023
1 parent 27df56c commit 319080c
Show file tree
Hide file tree
Showing 11 changed files with 163 additions and 93 deletions.
5 changes: 2 additions & 3 deletions doc/source/ray-air/doc_code/air_ingest_new.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,8 @@

# Load the data.
train_ds = ray.data.read_parquet("example://iris.parquet")

# Randomize the block order each epoch.
train_ds = train_ds.randomize_block_order()
## Uncomment to randomize the block order each epoch.
# train_ds = train_ds.randomize_block_order()


# Define a preprocessing function.
Expand Down
7 changes: 2 additions & 5 deletions python/ray/data/_internal/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
StrictModeError,
UserDefinedFunction,
)
from ray.data.context import DEFAULT_SCHEDULING_STRATEGY, DataContext
from ray.data.context import DataContext
from ray.types import ObjectRef
from ray.util.annotations import DeveloperAPI, PublicAPI

Expand Down Expand Up @@ -388,10 +388,7 @@ def map_block_nosplit(

if "scheduling_strategy" not in remote_args:
ctx = DataContext.get_current()
if ctx.scheduling_strategy == DEFAULT_SCHEDULING_STRATEGY:
remote_args["scheduling_strategy"] = "SPREAD"
else:
remote_args["scheduling_strategy"] = ctx.scheduling_strategy
remote_args["scheduling_strategy"] = ctx.scheduling_strategy

BlockWorker = ray.remote(**remote_args)(BlockWorker)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
)
from ray.data._internal.execution.util import locality_string
from ray.data.block import Block, BlockMetadata, _CallableClassProtocol
from ray.data.context import DEFAULT_SCHEDULING_STRATEGY, DataContext
from ray.data.context import DataContext
from ray.types import ObjectRef

logger = DatasetLogger(__name__)
Expand Down Expand Up @@ -348,10 +348,7 @@ def _apply_default_remote_args(ray_remote_args: Dict[str, Any]) -> Dict[str, Any
ray_remote_args = ray_remote_args.copy()
if "scheduling_strategy" not in ray_remote_args:
ctx = DataContext.get_current()
if ctx.scheduling_strategy == DEFAULT_SCHEDULING_STRATEGY:
ray_remote_args["scheduling_strategy"] = "SPREAD"
else:
ray_remote_args["scheduling_strategy"] = ctx.scheduling_strategy
ray_remote_args["scheduling_strategy"] = ctx.scheduling_strategy
# Enable actor fault tolerance by default, with infinite actor recreations and
# up to N retries per task. The user can customize this in map_batches via
# extra kwargs (e.g., map_batches(..., max_restarts=0) to disable).
Expand Down
36 changes: 32 additions & 4 deletions python/ray/data/_internal/execution/operators/map_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from ray.data._internal.memory_tracing import trace_allocation
from ray.data._internal.stats import StatsDict
from ray.data.block import Block, BlockAccessor, BlockExecStats, BlockMetadata
from ray.data.context import DataContext
from ray.types import ObjectRef
from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy

Expand All @@ -49,6 +50,7 @@ def __init__(
self._transform_fn = transform_fn
self._ray_remote_args = _canonicalize_ray_remote_args(ray_remote_args or {})
self._ray_remote_args_factory = None
self._remote_args_for_metrics = copy.deepcopy(self._ray_remote_args)

# Bundles block references up to the min_rows_per_bundle target.
self._block_ref_bundler = _BlockRefBundler(min_rows_per_bundle)
Expand Down Expand Up @@ -188,10 +190,32 @@ def add_input(self, refs: RefBundle, input_index: int):
bundle = self._block_ref_bundler.get_next_bundle()
self._add_bundled_input(bundle)

def _get_runtime_ray_remote_args(self) -> Dict[str, Any]:
def _get_runtime_ray_remote_args(
self, input_bundle: Optional[RefBundle] = None
) -> Dict[str, Any]:
ray_remote_args = copy.deepcopy(self._ray_remote_args)
# For tasks with small args, we will use SPREAD by default to optimize for
# compute load-balancing. For tasks with large args, we will use DEFAULT to
# allow the Ray locality scheduler a chance to optimize task placement.
if "scheduling_strategy" not in ray_remote_args:
ctx = DataContext.get_current()
if input_bundle and input_bundle.size_bytes() > ctx.large_args_threshold:
ray_remote_args[
"scheduling_strategy"
] = ctx.scheduling_strategy_large_args
# Takes precedence over small args case. This is to let users know
# when the large args case is being triggered.
self._remote_args_for_metrics = copy.deepcopy(ray_remote_args)
else:
ray_remote_args["scheduling_strategy"] = ctx.scheduling_strategy
# Only save to metrics if we haven't already done so.
if "scheduling_strategy" not in self._remote_args_for_metrics:
self._remote_args_for_metrics = copy.deepcopy(ray_remote_args)
# This should take precedence over previously set scheduling strategy, as it
# implements actor-based locality overrides.
if self._ray_remote_args_factory:
return self._ray_remote_args_factory(self._ray_remote_args)
return self._ray_remote_args
return self._ray_remote_args_factory(ray_remote_args)
return ray_remote_args

@abstractmethod
def _add_bundled_input(self, refs: RefBundle):
Expand Down Expand Up @@ -291,7 +315,11 @@ def progress_str(self) -> str:
raise NotImplementedError

def get_metrics(self) -> Dict[str, int]:
return self._metrics.to_metrics_dict()
sorted_ray_args = dict(sorted(self._remote_args_for_metrics.items()))
return dict(
self._metrics.to_metrics_dict(),
ray_remote_args=sorted_ray_args,
)

def get_stats(self) -> StatsDict:
return {self._name: self._output_metadata}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def _add_bundled_input(self, bundle: RefBundle):
input_blocks = [block for block, _ in bundle.blocks]
ctx = TaskContext(task_idx=self._next_task_idx)
ref = map_task.options(
**self._get_runtime_ray_remote_args(), name=self.name
**self._get_runtime_ray_remote_args(input_bundle=bundle), name=self.name
).remote(self._transform_fn_ref, ctx, *input_blocks)
self._next_task_idx += 1
task = _TaskState(bundle)
Expand Down
22 changes: 20 additions & 2 deletions python/ray/data/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,18 @@
os.environ.get("RAY_DATA_PUSH_BASED_SHUFFLE", None)
)

# The default global scheduling strategy.
DEFAULT_SCHEDULING_STRATEGY = "DEFAULT"
# The default global scheduling strategy. Note that for tasks with large args,
# DEFAULT_SCHEDULING_STRATEGY_LARGE_ARGS applies.
DEFAULT_SCHEDULING_STRATEGY = "SPREAD"

# Default scheduling strategy for tasks with large args. This enables locality-based
# scheduling in Ray for tasks where arg data transfer is a bottleneck.
DEFAULT_SCHEDULING_STRATEGY_LARGE_ARGS = "DEFAULT"

# Size in bytes after which point task arguments are considered large. Choose a value
# here at which point data transfer overhead becomes significant in comparison to
# task scheduling (i.e., low tens of ms).
DEFAULT_LARGE_ARGS_THRESHOLD = 50 * 1024 * 1024

# Whether to use Polars for tabular dataset sorts, groupbys, and aggregations.
DEFAULT_USE_POLARS = False
Expand Down Expand Up @@ -153,6 +163,8 @@ def __init__(
use_push_based_shuffle: bool,
pipeline_push_based_shuffle_reduce_tasks: bool,
scheduling_strategy: SchedulingStrategyT,
scheduling_strategy_large_args: SchedulingStrategyT,
large_args_threshold: int,
use_polars: bool,
new_execution_backend: bool,
use_streaming_executor: bool,
Expand Down Expand Up @@ -185,6 +197,8 @@ def __init__(
pipeline_push_based_shuffle_reduce_tasks
)
self.scheduling_strategy = scheduling_strategy
self.scheduling_strategy_large_args = scheduling_strategy_large_args
self.large_args_threshold = large_args_threshold
self.use_polars = use_polars
self.new_execution_backend = new_execution_backend
self.use_streaming_executor = use_streaming_executor
Expand Down Expand Up @@ -233,6 +247,10 @@ def get_current() -> "DataContext":
# See https://github.com/ray-project/ray/issues/25412.
pipeline_push_based_shuffle_reduce_tasks=True,
scheduling_strategy=DEFAULT_SCHEDULING_STRATEGY,
scheduling_strategy_large_args=(
DEFAULT_SCHEDULING_STRATEGY_LARGE_ARGS
),
large_args_threshold=DEFAULT_LARGE_ARGS_THRESHOLD,
use_polars=DEFAULT_USE_POLARS,
new_execution_backend=DEFAULT_NEW_EXECUTION_BACKEND,
use_streaming_executor=DEFAULT_USE_STREAMING_EXECUTOR,
Expand Down
7 changes: 2 additions & 5 deletions python/ray/data/random_access_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import ray
from ray.data._internal.remote_fn import cached_remote_fn
from ray.data.block import BlockAccessor
from ray.data.context import DEFAULT_SCHEDULING_STRATEGY, DataContext
from ray.data.context import DataContext
from ray.types import ObjectRef
from ray.util.annotations import PublicAPI

Expand Down Expand Up @@ -67,10 +67,7 @@ def __init__(

logger.info("[setup] Creating {} random access workers.".format(num_workers))
ctx = DataContext.get_current()
if ctx.scheduling_strategy != DEFAULT_SCHEDULING_STRATEGY:
scheduling_strategy = ctx.scheduling_strategy
else:
scheduling_strategy = "SPREAD"
scheduling_strategy = ctx.scheduling_strategy
self._workers = [
_RandomAccessWorker.options(scheduling_strategy=scheduling_strategy).remote(
key
Expand Down
9 changes: 3 additions & 6 deletions python/ray/data/read_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
pandas_df_to_arrow_block,
)
from ray.data.block import Block, BlockAccessor, BlockExecStats, BlockMetadata
from ray.data.context import DEFAULT_SCHEDULING_STRATEGY, WARN_PREFIX, DataContext
from ray.data.context import WARN_PREFIX, DataContext
from ray.data.dataset import Dataset, MaterializedDataset
from ray.data.datasource import (
BaseFileMetadataProvider,
Expand Down Expand Up @@ -331,11 +331,8 @@ def read_datasource(
)
local_uri = True

if (
"scheduling_strategy" not in ray_remote_args
and ctx.scheduling_strategy == DEFAULT_SCHEDULING_STRATEGY
):
ray_remote_args["scheduling_strategy"] = "SPREAD"
if "scheduling_strategy" not in ray_remote_args:
ray_remote_args["scheduling_strategy"] = ctx.scheduling_strategy

force_local = False
cur_pg = ray.util.get_current_placement_group()
Expand Down
22 changes: 22 additions & 0 deletions python/ray/data/tests/test_executor_resource_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,28 @@ def test_resource_canonicalization(ray_start_10_cpus_shared):
)


def test_scheduling_strategy_overrides(ray_start_10_cpus_shared, restore_data_context):
input_op = InputDataBuffer(make_ref_bundles([[i] for i in range(100)]))
op = MapOperator.create(
_mul2_transform,
input_op=input_op,
name="TestMapper",
compute_strategy=TaskPoolStrategy(),
ray_remote_args={"num_gpus": 2, "scheduling_strategy": "DEFAULT"},
)
assert op._ray_remote_args == {"num_gpus": 2, "scheduling_strategy": "DEFAULT"}

ray.data.DataContext.get_current().scheduling_strategy = "DEFAULT"
op = MapOperator.create(
_mul2_transform,
input_op=input_op,
name="TestMapper",
compute_strategy=TaskPoolStrategy(),
ray_remote_args={"num_gpus": 2},
)
assert op._ray_remote_args == {"num_gpus": 2}


def test_task_pool_resource_reporting(ray_start_10_cpus_shared):
input_op = InputDataBuffer(make_ref_bundles([[SMALL_STR] for i in range(100)]))
op = MapOperator.create(
Expand Down
Loading

0 comments on commit 319080c

Please sign in to comment.