Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core][cgraph] Collapse other params into max_inflight_executions and adjust execution_index counting #49565

Merged
merged 16 commits into from
Jan 13, 2025
Merged
31 changes: 9 additions & 22 deletions python/ray/dag/compiled_dag_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -740,7 +740,6 @@ def __init__(
submit_timeout: Optional[float] = None,
buffer_size_bytes: Optional[int] = None,
enable_asyncio: bool = False,
asyncio_max_queue_size: Optional[int] = None,
max_buffered_results: Optional[int] = None,
max_inflight_executions: Optional[int] = None,
overlap_gpu_communication: Optional[bool] = None,
Expand All @@ -759,13 +758,6 @@ def __init__(
be running in an event loop and must use `execute_async` to
invoke the DAG. Otherwise, the caller should use `execute` to
invoke the DAG.
asyncio_max_queue_size: Optional parameter to limit how many DAG
inputs can be queued at a time. The actual number of concurrent
DAG invocations may be higher than this, if there are already
inputs being processed by the DAG executors. If used, the
caller is responsible for preventing deadlock, i.e. if the
input queue is full, another asyncio task is reading from the
DAG output. It is only used when enable_asyncio=True.
max_buffered_results: The maximum number of execution results that
are allowed to be buffered. Setting a higher value allows more
DAGs to be executed before `ray.get()` must be called but also
Expand All @@ -792,8 +784,6 @@ def __init__(

self._enable_asyncio: bool = enable_asyncio
self._fut_queue = asyncio.Queue()
self._asyncio_max_queue_size: Optional[int] = asyncio_max_queue_size
# TODO(rui): consider unify it with asyncio_max_queue_size
self._max_buffered_results: Optional[int] = max_buffered_results
if self._max_buffered_results is None:
self._max_buffered_results = ctx.max_buffered_results
Expand Down Expand Up @@ -1643,7 +1633,6 @@ def _get_or_compile(
self._dag_submitter = AwaitableBackgroundWriter(
self.dag_input_channels,
input_task.output_idxs,
self._asyncio_max_queue_size,
is_input=True,
)
self._dag_output_fetcher = AwaitableBackgroundReader(
Expand Down Expand Up @@ -1968,6 +1957,13 @@ def _cache_execution_results(
execution_index: The execution index corresponding to the result.
result: The results from all channels to be cached.
"""
if len(self._result_buffer) >= self._max_buffered_results:
raise ValueError(
"Too many buffered results: the allowed max count for "
f"buffered results is {self._max_buffered_results}; call "
"ray.get() on previous CompiledDAGRefs to free them up "
"from buffer."
)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cache execution results is used by both async and standard, so if we put the check here it will do it for both

if not self._has_execution_results(execution_index):
for chan_idx, res in enumerate(result):
self._result_buffer[execution_index][chan_idx] = res
Expand Down Expand Up @@ -2063,22 +2059,15 @@ def _execute_until(
timeout = ctx.get_timeout

while self._max_finished_execution_index < execution_index:
if len(self._result_buffer) >= self._max_buffered_results:
raise ValueError(
"Too many buffered results: the allowed max count for "
f"buffered results is {self._max_buffered_results}; call "
"ray.get() on previous CompiledDAGRefs to free them up "
"from buffer."
)
self.increment_max_finished_execution_index()
start_time = time.monotonic()

# Fetch results from each output channel up to execution_index and cache
# them separately to enable individual retrieval
self._cache_execution_results(
self._max_finished_execution_index,
self._max_finished_execution_index + 1,
self._dag_output_fetcher.read(timeout),
)
self.increment_max_finished_execution_index()

if timeout != -1:
timeout -= time.monotonic() - start_time
Expand Down Expand Up @@ -2800,7 +2789,6 @@ def build_compiled_dag_from_ray_dag(
submit_timeout: Optional[float] = None,
buffer_size_bytes: Optional[int] = None,
enable_asyncio: bool = False,
asyncio_max_queue_size: Optional[int] = None,
max_buffered_results: Optional[int] = None,
max_inflight_executions: Optional[int] = None,
overlap_gpu_communication: Optional[bool] = None,
Expand All @@ -2809,7 +2797,6 @@ def build_compiled_dag_from_ray_dag(
submit_timeout,
buffer_size_bytes,
enable_asyncio,
asyncio_max_queue_size,
max_buffered_results,
max_inflight_executions,
overlap_gpu_communication,
Expand Down
7 changes: 0 additions & 7 deletions python/ray/dag/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,6 @@
DEFAULT_TEARDOWN_TIMEOUT_S = int(os.environ.get("RAY_CGRAPH_teardown_timeout", 30))
# Default buffer size is 1MB.
DEFAULT_BUFFER_SIZE_BYTES = int(os.environ.get("RAY_CGRAPH_buffer_size_bytes", 1e6))
# Default asyncio_max_queue_size is 0, which means no limit.
DEFAULT_ASYNCIO_MAX_QUEUE_SIZE = int(
os.environ.get("RAY_CGRAPH_asyncio_max_queue_size", 0)
)
# The default max_buffered_results is 1000, and the default buffer size is 1 MB.
# The maximum memory usage for buffered results is 1 GB.
DEFAULT_MAX_BUFFERED_RESULTS = int(
Expand Down Expand Up @@ -61,8 +57,6 @@ class DAGContext:
that can be passed between tasks in the DAG. The buffers will
be automatically resized if larger messages are written to the
channel.
asyncio_max_queue_size: The max queue size for the async execution.
It is only used when enable_asyncio=True.
max_buffered_results: The maximum number of execution results that
are allowed to be buffered. Setting a higher value allows more
DAGs to be executed before `ray.get()` must be called but also
Expand All @@ -82,7 +76,6 @@ class DAGContext:
get_timeout: int = DEFAULT_GET_TIMEOUT_S
teardown_timeout: int = DEFAULT_TEARDOWN_TIMEOUT_S
buffer_size_bytes: int = DEFAULT_BUFFER_SIZE_BYTES
asyncio_max_queue_size: int = DEFAULT_ASYNCIO_MAX_QUEUE_SIZE
max_buffered_results: int = DEFAULT_MAX_BUFFERED_RESULTS
max_inflight_executions: int = DEFAULT_MAX_INFLIGHT_EXECUTIONS
overlap_gpu_communication: bool = DEFAULT_OVERLAP_GPU_COMMUNICATION
Expand Down
6 changes: 0 additions & 6 deletions python/ray/dag/dag_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,6 @@ def experimental_compile(
_submit_timeout: Optional[float] = None,
_buffer_size_bytes: Optional[int] = None,
enable_asyncio: bool = False,
_asyncio_max_queue_size: Optional[int] = None,
_max_buffered_results: Optional[int] = None,
_max_inflight_executions: Optional[int] = None,
_overlap_gpu_communication: Optional[bool] = None,
Expand All @@ -203,8 +202,6 @@ def experimental_compile(
be automatically resized if larger messages are written to the
channel.
enable_asyncio: Whether to enable asyncio for this DAG.
_asyncio_max_queue_size: The max queue size for the async execution.
It is only used when enable_asyncio=True.
_max_buffered_results: The maximum number of execution results that
are allowed to be buffered. Setting a higher value allows more
DAGs to be executed before `ray.get()` must be called but also
Expand All @@ -230,8 +227,6 @@ def experimental_compile(
ctx = DAGContext.get_current()
if _buffer_size_bytes is None:
_buffer_size_bytes = ctx.buffer_size_bytes
if _asyncio_max_queue_size is None:
_asyncio_max_queue_size = ctx.asyncio_max_queue_size
if _max_buffered_results is None:
_max_buffered_results = ctx.max_buffered_results

Expand All @@ -251,7 +246,6 @@ def experimental_compile(
_submit_timeout,
_buffer_size_bytes,
enable_asyncio,
_asyncio_max_queue_size,
_max_buffered_results,
_max_inflight_executions,
_overlap_gpu_communication,
Expand Down
8 changes: 1 addition & 7 deletions python/ray/experimental/channel/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -533,16 +533,10 @@ def __init__(
self,
output_channels: List[ChannelInterface],
output_idxs: List[Optional[Union[int, str]]],
max_queue_size: Optional[int] = None,
is_input=False,
):
super().__init__(output_channels, output_idxs, is_input=is_input)
if max_queue_size is None:
from ray.dag import DAGContext

ctx = DAGContext.get_current()
max_queue_size = ctx.asyncio_max_queue_size
self._queue = asyncio.Queue(max_queue_size)
self._queue = asyncio.Queue()
self._background_task = None
self._background_task_executor = concurrent.futures.ThreadPoolExecutor(
max_workers=1, thread_name_prefix="channel.AwaitableBackgroundWriter"
Expand Down