Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core][cgraph] Collapse other params into max_inflight_executions and adjust execution_index counting #49565

Merged
merged 16 commits into from
Jan 13, 2025
Merged
67 changes: 15 additions & 52 deletions python/ray/dag/compiled_dag_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -740,8 +740,6 @@ def __init__(
submit_timeout: Optional[float] = None,
buffer_size_bytes: Optional[int] = None,
enable_asyncio: bool = False,
asyncio_max_queue_size: Optional[int] = None,
max_buffered_results: Optional[int] = None,
max_inflight_executions: Optional[int] = None,
overlap_gpu_communication: Optional[bool] = None,
):
Expand All @@ -759,20 +757,6 @@ def __init__(
be running in an event loop and must use `execute_async` to
invoke the DAG. Otherwise, the caller should use `execute` to
invoke the DAG.
asyncio_max_queue_size: Optional parameter to limit how many DAG
inputs can be queued at a time. The actual number of concurrent
DAG invocations may be higher than this, if there are already
inputs being processed by the DAG executors. If used, the
caller is responsible for preventing deadlock, i.e. if the
input queue is full, another asyncio task is reading from the
DAG output. It is only used when enable_asyncio=True.
max_buffered_results: The maximum number of execution results that
are allowed to be buffered. Setting a higher value allows more
DAGs to be executed before `ray.get()` must be called but also
increases the memory usage. Note that if the number of ongoing
executions is beyond the DAG capacity, the new execution would
be blocked in the first place; therefore, this limit is only
enforced when it is smaller than the DAG capacity.
max_inflight_executions: The maximum number of in-flight executions that
can be submitted via `execute` or `execute_async` before consuming
the output using `ray.get()`. If the caller submits more executions,
Expand All @@ -792,11 +776,6 @@ def __init__(

self._enable_asyncio: bool = enable_asyncio
self._fut_queue = asyncio.Queue()
self._asyncio_max_queue_size: Optional[int] = asyncio_max_queue_size
# TODO(rui): consider unify it with asyncio_max_queue_size
self._max_buffered_results: Optional[int] = max_buffered_results
if self._max_buffered_results is None:
self._max_buffered_results = ctx.max_buffered_results
self._max_inflight_executions = max_inflight_executions
if self._max_inflight_executions is None:
self._max_inflight_executions = ctx.max_inflight_executions
Expand Down Expand Up @@ -886,7 +865,7 @@ def __init__(
self._communicator_ids: Set[str] = set()
# The index of the current execution. It is incremented each time
# the DAG is executed.
self._execution_index: int = 0
self._execution_index: int = -1
# The maximum index of finished executions.
# All results with higher indexes have not been generated yet.
self._max_finished_execution_index: int = -1
Expand Down Expand Up @@ -925,12 +904,6 @@ def is_teardown(self) -> bool:
def communicator_ids(self) -> Set[str]:
return self._communicator_ids

def increment_max_finished_execution_index(self) -> None:
"""Increment the max finished execution index. It is used to
figure out the max number of in-flight requests to the DAG
"""
self._max_finished_execution_index += 1

def get_id(self) -> str:
"""
Get the unique ID of the compiled DAG.
Expand Down Expand Up @@ -1643,7 +1616,6 @@ def _get_or_compile(
self._dag_submitter = AwaitableBackgroundWriter(
self.dag_input_channels,
input_task.output_idxs,
self._asyncio_max_queue_size,
is_input=True,
)
self._dag_output_fetcher = AwaitableBackgroundReader(
Expand Down Expand Up @@ -1933,14 +1905,13 @@ def run(self):
def raise_if_too_many_inflight_requests(self):
num_in_flight_requests = (
self._execution_index - self._max_finished_execution_index
)
if num_in_flight_requests > self._max_inflight_executions:
) + len(self._result_buffer)
if num_in_flight_requests >= self._max_inflight_executions:
raise ray.exceptions.RayCgraphCapacityExceeded(
f"There are {num_in_flight_requests} in-flight requests which "
"is more than specified _max_inflight_executions of the dag: "
f"{self._max_inflight_executions}. Retrieve the output using "
"ray.get before submitting more requests or increase "
"`max_inflight_executions`. "
f"You cannot execute more than {self._max_inflight_executions} "
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe change to "The compiled DAG cannot execute more than ..." instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the user is the one doing the executing and calling execute though. Can do something like
you can't execute the compiled graph more than x times, but I feel like it's pretty obvious it's a compiled graph execution because the error will point to the compiled_graph.execute

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to what Kai-Hsun said. I also feel it's a bit weird to say "you ..." , can we avoid that? Using There are ... or The compiled DAG/graph sounds better.

Copy link
Contributor Author

@dayshah dayshah Jan 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated to

                "The compiled graph can't have more than "
                f"{self._max_inflight_executions} in-flight executions, and you "
                f"currently have {num_inflight_executions} in-flight executions. "
                "Retrieve an output using ray.get before submitting more requests or "
                "increase `_max_inflight_executions`. "
                "`dag.experimental_compile(_max_inflight_executions=...)`"

f"in-flight requests, and you currently have {num_in_flight_requests} "
f"in-flight requests. Retrieve an output using ray.get before "
"submitting more requests or increase `max_inflight_executions`. "
"`dag.experimental_compile(_max_inflight_executions=...)`"
)

Expand Down Expand Up @@ -2024,7 +1995,7 @@ def release_output_channel_buffers(self, execution_index: int):
timeout = ctx.get_timeout

while self._max_finished_execution_index < execution_index:
self.increment_max_finished_execution_index()
self._max_finished_execution_index += 1
start_time = time.monotonic()
self._dag_output_fetcher.release_channel_buffers(timeout)

Expand Down Expand Up @@ -2068,21 +2039,15 @@ def _execute_until(
timeout = ctx.get_timeout

while self._max_finished_execution_index < execution_index:
if len(self._result_buffer) >= self._max_buffered_results:
raise ValueError(
"Too many buffered results: the allowed max count for "
f"buffered results is {self._max_buffered_results}; call "
"ray.get() on previous CompiledDAGRefs to free them up "
"from buffer."
)
self.increment_max_finished_execution_index()
start_time = time.monotonic()

# Fetch results from each output channel up to execution_index and cache
# them separately to enable individual retrieval
result = self._dag_output_fetcher.read(timeout)
self._max_finished_execution_index += 1
self._cache_execution_results(
self._max_finished_execution_index,
self._dag_output_fetcher.read(timeout),
result,
)

if timeout != -1:
Expand Down Expand Up @@ -2130,6 +2095,8 @@ def execute(
self.raise_if_too_many_inflight_requests()
self._dag_submitter.write(inp, self._submit_timeout)

self._execution_index += 1

if self._returns_list:
ref = [
CompiledDAGRef(self, self._execution_index, channel_index)
Expand All @@ -2138,7 +2105,6 @@ def execute(
else:
ref = CompiledDAGRef(self, self._execution_index)

self._execution_index += 1
return ref

def _check_inputs(self, args: Tuple[Any, ...], kwargs: Dict[str, Any]) -> None:
Expand Down Expand Up @@ -2199,6 +2165,8 @@ async def execute_async(
fut = asyncio.Future()
await self._fut_queue.put(fut)

self._execution_index += 1

if self._returns_list:
fut = [
CompiledDAGFuture(self, self._execution_index, fut, channel_index)
Expand All @@ -2207,7 +2175,6 @@ async def execute_async(
else:
fut = CompiledDAGFuture(self, self._execution_index, fut)

self._execution_index += 1
return fut

def _visualize_ascii(self) -> str:
Expand Down Expand Up @@ -2805,17 +2772,13 @@ def build_compiled_dag_from_ray_dag(
submit_timeout: Optional[float] = None,
buffer_size_bytes: Optional[int] = None,
enable_asyncio: bool = False,
asyncio_max_queue_size: Optional[int] = None,
max_buffered_results: Optional[int] = None,
max_inflight_executions: Optional[int] = None,
overlap_gpu_communication: Optional[bool] = None,
) -> "CompiledDAG":
compiled_dag = CompiledDAG(
submit_timeout,
buffer_size_bytes,
enable_asyncio,
asyncio_max_queue_size,
max_buffered_results,
max_inflight_executions,
overlap_gpu_communication,
)
Expand Down
20 changes: 0 additions & 20 deletions python/ray/dag/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,6 @@
DEFAULT_TEARDOWN_TIMEOUT_S = int(os.environ.get("RAY_CGRAPH_teardown_timeout", 30))
# Default buffer size is 1MB.
DEFAULT_BUFFER_SIZE_BYTES = int(os.environ.get("RAY_CGRAPH_buffer_size_bytes", 1e6))
# Default asyncio_max_queue_size is 0, which means no limit.
DEFAULT_ASYNCIO_MAX_QUEUE_SIZE = int(
os.environ.get("RAY_CGRAPH_asyncio_max_queue_size", 0)
)
# The default max_buffered_results is 1000, and the default buffer size is 1 MB.
# The maximum memory usage for buffered results is 1 GB.
DEFAULT_MAX_BUFFERED_RESULTS = int(
os.environ.get("RAY_CGRAPH_max_buffered_results", 1000)
)
# The default number of in-flight executions that can be submitted before consuming the
# output.
DEFAULT_MAX_INFLIGHT_EXECUTIONS = int(
Expand Down Expand Up @@ -61,15 +52,6 @@ class DAGContext:
that can be passed between tasks in the DAG. The buffers will
be automatically resized if larger messages are written to the
channel.
asyncio_max_queue_size: The max queue size for the async execution.
It is only used when enable_asyncio=True.
max_buffered_results: The maximum number of execution results that
are allowed to be buffered. Setting a higher value allows more
DAGs to be executed before `ray.get()` must be called but also
increases the memory usage. Note that if the number of ongoing
executions is beyond the DAG capacity, the new execution would
be blocked in the first place; therefore, this limit is only
enforced when it is smaller than the DAG capacity.
max_inflight_executions: The maximum number of in-flight executions that
can be submitted via `execute` or `execute_async` before consuming
the output using `ray.get()`. If the caller submits more executions,
Expand All @@ -84,8 +66,6 @@ class DAGContext:
get_timeout: int = DEFAULT_GET_TIMEOUT_S
teardown_timeout: int = DEFAULT_TEARDOWN_TIMEOUT_S
buffer_size_bytes: int = DEFAULT_BUFFER_SIZE_BYTES
asyncio_max_queue_size: int = DEFAULT_ASYNCIO_MAX_QUEUE_SIZE
max_buffered_results: int = DEFAULT_MAX_BUFFERED_RESULTS
max_inflight_executions: int = DEFAULT_MAX_INFLIGHT_EXECUTIONS
overlap_gpu_communication: bool = DEFAULT_OVERLAP_GPU_COMMUNICATION

Expand Down
17 changes: 0 additions & 17 deletions python/ray/dag/dag_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,6 @@ def experimental_compile(
_submit_timeout: Optional[float] = None,
_buffer_size_bytes: Optional[int] = None,
enable_asyncio: bool = False,
_asyncio_max_queue_size: Optional[int] = None,
_max_buffered_results: Optional[int] = None,
_max_inflight_executions: Optional[int] = None,
_overlap_gpu_communication: Optional[bool] = None,
) -> "ray.dag.CompiledDAG":
Expand All @@ -203,15 +201,6 @@ def experimental_compile(
be automatically resized if larger messages are written to the
channel.
enable_asyncio: Whether to enable asyncio for this DAG.
_asyncio_max_queue_size: The max queue size for the async execution.
It is only used when enable_asyncio=True.
_max_buffered_results: The maximum number of execution results that
are allowed to be buffered. Setting a higher value allows more
DAGs to be executed before `ray.get()` must be called but also
increases the memory usage. Note that if the number of ongoing
executions is beyond the DAG capacity, the new execution would
be blocked in the first place; therefore, this limit is only
enforced when it is smaller than the DAG capacity.
_max_inflight_executions: The maximum number of in-flight executions that
can be submitted via `execute` or `execute_async` before consuming
the output using `ray.get()`. If the caller submits more executions,
Expand All @@ -230,10 +219,6 @@ def experimental_compile(
ctx = DAGContext.get_current()
if _buffer_size_bytes is None:
_buffer_size_bytes = ctx.buffer_size_bytes
if _asyncio_max_queue_size is None:
_asyncio_max_queue_size = ctx.asyncio_max_queue_size
if _max_buffered_results is None:
_max_buffered_results = ctx.max_buffered_results

# Validate whether this DAG node has already been compiled.
if self.is_cgraph_output_node:
Expand All @@ -251,8 +236,6 @@ def experimental_compile(
_submit_timeout,
_buffer_size_bytes,
enable_asyncio,
_asyncio_max_queue_size,
_max_buffered_results,
_max_inflight_executions,
_overlap_gpu_communication,
)
Expand Down
Loading
Loading