Skip to content

Commit

Permalink
[data] Add label to indicate if operator is backpressured (ray-projec…
Browse files Browse the repository at this point in the history
…t#47095)

## Why are these changes needed?
During the execution of an operator, display whether or not an operator
is backpressured. This will more easily surface to the viewer if there
are portions of their dataset execution that are bottlenecking the other
operations.

When backpressured the display will change from `Map(g): 3 active, 22
queued, [cpu: 0.3, objects: 768.0MB]: : 0.00 row [02:14, ? row/s` to
`Map(g): 3 active, 22 queued, BACKPRESSURED, [cpu: 0.3, objects:
768.0MB]: : 0.00 row [02:14, ? row/s`.

### Examples:
#### Typical example

```python
import ray
import time

def f(x):
    time.sleep(0.1)
    return x

ray.data.range(1000).map(f).map(f, num_cpus=0.1).materialize()
```


https://github.com/user-attachments/assets/8283fdff-d94f-43c3-a9f0-c929924c441f

#### Backpressure example
```python
import ray
import time

def f(x):
    time.sleep(0.1)
    return x

def g(x):
    time.sleep(10000000)
    return x

ray.data.range(1000).map(f).map(g, num_cpus=0.1).materialize()
```


https://github.com/user-attachments/assets/d263b8ee-8dd2-4ec6-bd45-c55941c45253

---------

Signed-off-by: Matthew Owen <mowen@anyscale.com>
  • Loading branch information
omatthew98 authored Aug 14, 2024
1 parent 048190e commit a782a66
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ def __init__(
self._target_max_block_size = target_max_block_size
self._started = False
self._in_task_submission_backpressure = False
self._in_task_output_backpressure = False
self._metrics = OpRuntimeMetrics(self)
self._estimated_num_output_bundles = None
self._estimated_output_num_rows = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,11 @@ def summary_str(self, resource_manager: ResourceManager) -> str:
queued = self.num_queued() + self.op.internal_queue_size()
active = self.op.num_active_tasks()
desc = f"- {self.op.name}: {active} active, {queued} queued"
if (
self.op._in_task_submission_backpressure
or self.op._in_task_output_backpressure
):
desc += " 🚧"
desc += f", [{resource_manager.get_op_usage_str(self.op)}]"
suffix = self.op.progress_str()
if suffix:
Expand Down Expand Up @@ -400,6 +405,7 @@ def process_completed_tasks(
max_bytes_to_read = (
resource_manager.op_resource_allocator.max_task_output_bytes_to_read(op)
)
op._in_task_output_backpressure = max_bytes_to_read == 0
if max_bytes_to_read is not None:
max_bytes_to_read_per_op[state] = max_bytes_to_read

Expand Down

0 comments on commit a782a66

Please sign in to comment.