Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 23 additions & 44 deletions datafusion_ray/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,9 @@ def _get_worker_inputs(
plan_bytes = datafusion_ray.serialize_execution_plan(stage.get_execution_plan())
futures = []
opt = {}
opt["resources"] = {"worker": 1e-3}
# TODO not sure why we had this but my Ray cluster could not find suitable resource
# until I commented this out
# opt["resources"] = {"worker": 1e-3}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to remove this; otherwise, the Ray cluster could not find a suitable worker node.

@franklsf95 do we still need this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This only works when you start a Ray cluster with custom resources, e.g. the head node with ray start --head --resources='{"head":1}' and worker nodes each with ray start --resources='{"worker":1}'. I had this resource requirement to make sure the tasks run on worker nodes exclusively (for fair benchmarking). If we don't do this, the task could also run on the driver node. Depending on the use case, this may be harmless.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe a comment here is useful?

opt["num_returns"] = output_partitions_count
for part in range(concurrency):
ids, inputs = _get_worker_inputs(part)
Expand All @@ -93,7 +95,6 @@ def _get_worker_inputs(
def execute_query_stage(
query_stages: list[QueryStage],
stage_id: int,
use_ray_shuffle: bool,
) -> tuple[int, list[ray.ObjectRef]]:
"""
Execute a query stage on the workers.
Expand All @@ -106,7 +107,7 @@ def execute_query_stage(
child_futures = []
for child_id in stage.get_child_stage_ids():
child_futures.append(
execute_query_stage.remote(query_stages, child_id, use_ray_shuffle)
execute_query_stage.remote(query_stages, child_id)
)

# if the query stage has a single output partition then we need to execute for the output
Expand All @@ -133,33 +134,28 @@ def _get_worker_inputs(
) -> tuple[list[tuple[int, int, int]], list[ray.ObjectRef]]:
ids = []
futures = []
if use_ray_shuffle:
for child_stage_id, child_futures in child_outputs:
for i, lst in enumerate(child_futures):
if isinstance(lst, list):
for j, f in enumerate(lst):
if concurrency == 1 or j == part:
# If concurrency is 1, pass in all shuffle partitions. Otherwise,
# only pass in the partitions that match the current worker partition.
ids.append((child_stage_id, i, j))
futures.append(f)
elif concurrency == 1 or part == 0:
ids.append((child_stage_id, i, 0))
futures.append(lst)
for child_stage_id, child_futures in child_outputs:
for i, lst in enumerate(child_futures):
if isinstance(lst, list):
for j, f in enumerate(lst):
if concurrency == 1 or j == part:
# If concurrency is 1, pass in all shuffle partitions. Otherwise,
# only pass in the partitions that match the current worker partition.
ids.append((child_stage_id, i, j))
futures.append(f)
elif concurrency == 1 or part == 0:
ids.append((child_stage_id, i, 0))
futures.append(lst)
return ids, futures

# if we are using disk-based shuffle, wait until the child stages to finish
# writing the shuffle files to disk first.
if not use_ray_shuffle:
ray.get([f for _, lst in child_outputs for f in lst])

# schedule the actual execution workers
plan_bytes = datafusion_ray.serialize_execution_plan(stage.get_execution_plan())
futures = []
opt = {}
opt["resources"] = {"worker": 1e-3}
if use_ray_shuffle:
opt["num_returns"] = output_partitions_count
# TODO not sure why we had this but my Ray cluster could not find suitable resource
# until I commented this out
#opt["resources"] = {"worker": 1e-3}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above

opt["num_returns"] = output_partitions_count
for part in range(concurrency):
ids, inputs = _get_worker_inputs(part)
futures.append(
Expand Down Expand Up @@ -210,10 +206,9 @@ def execute_query_partition(


class DatafusionRayContext:
def __init__(self, num_workers: int = 1, use_ray_shuffle: bool = False):
self.ctx = Context(num_workers, use_ray_shuffle)
def __init__(self, num_workers: int = 1):
self.ctx = Context(num_workers)
self.num_workers = num_workers
self.use_ray_shuffle = use_ray_shuffle

def register_csv(self, table_name: str, path: str, has_header: bool):
self.ctx.register_csv(table_name, path, has_header)
Expand All @@ -234,23 +229,7 @@ def sql(self, sql: str) -> pa.RecordBatch:

graph = self.ctx.plan(sql)
final_stage_id = graph.get_final_query_stage().id()
if self.use_ray_shuffle:
partitions = schedule_execution(graph, final_stage_id, True)
else:
# serialize the query stages and store in Ray object store
query_stages = [
datafusion_ray.serialize_execution_plan(
graph.get_query_stage(i).get_execution_plan()
)
for i in range(final_stage_id + 1)
]
# schedule execution
future = execute_query_stage.remote(
query_stages,
final_stage_id,
self.use_ray_shuffle,
)
_, partitions = ray.get(future)
partitions = schedule_execution(graph, final_stage_id, True)
# assert len(partitions) == 1, len(partitions)
result_set = ray.get(partitions[0])
return result_set
9 changes: 4 additions & 5 deletions datafusion_ray/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@
RESULTS_DIR = f"results-sf{SF}"


def setup_context(use_ray_shuffle: bool, num_workers: int = 2) -> DatafusionRayContext:
def setup_context(num_workers: int = 2) -> DatafusionRayContext:
print(f"Using {num_workers} workers")
ctx = DatafusionRayContext(num_workers, use_ray_shuffle)
ctx = DatafusionRayContext(num_workers)
for table in [
"customer",
"lineitem",
Expand Down Expand Up @@ -103,10 +103,9 @@ def compare(q: int):
def tpch_bench():
ray.init(resources={"worker": 1})
num_workers = int(ray.cluster_resources().get("worker", 1)) * NUM_CPUS_PER_WORKER
use_ray_shuffle = False
ctx = setup_context(use_ray_shuffle, num_workers)
ctx = setup_context(num_workers)
# t = tpch_timing(ctx, 11, print_result=True)
# print(f"query,{t},{use_ray_shuffle},{num_workers}")
# print(f"query,{t},{num_workers}")
# return
run_id = time.strftime("%Y-%m-%d-%H-%M-%S")
with open(f"results-sf{SF}-{run_id}.csv", "w") as fout:
Expand Down
50 changes: 50 additions & 0 deletions docs/testing.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# Distributed Testing

Install Ray on at least two nodes.

https://docs.ray.io/en/latest/ray-overview/installation.html

```shell
sudo apt install -y python3-pip python3.12-venv
python3 -m venv venv
source venv/bin/activate
pip3 install -U "ray[default]"
```

## Start Ray Head Node

```shell
ray start --head --node-ip-address=10.0.0.23 --port=6379 --dashboard-host=0.0.0.0
```

## Start Ray Worker Nodes(s)

```shell
ray start --address=10.0.0.23:6379 --redis-password='5241590000000000'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you need to start redis?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, this is built into ray somehow and this is the password that it uses

```

## Install DataFusion Ray (on each node)

Clone the repo with the version that you want to test. Run `maturin build --release` in the virtual env.

```shell
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh
. "$HOME/.cargo/env"
```

```shell
pip3 install maturin
```

```shell
git clone https://github.com/apache/datafusion-ray.git
cd datafusion-ray
maturin develop --release
```

## Submit Job

```shell
cd examples
RAY_ADDRESS='http://10.0.0.23:8265' ray job submit --working-dir `pwd` -- python3 tips.py
```
6 changes: 4 additions & 2 deletions examples/tips.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
# under the License.

import os
import pandas as pd
import ray

from datafusion_ray import DatafusionRayContext
Expand All @@ -26,8 +25,11 @@
# Start a local cluster
ray.init(resources={"worker": 1})

# Connect to a cluster
# ray.init()

# Create a context and register a table
ctx = DatafusionRayContext(2, use_ray_shuffle=True)
ctx = DatafusionRayContext(2)
# Register either a CSV or Parquet file
# ctx.register_csv("tips", f"{SCRIPT_DIR}/tips.csv", True)
ctx.register_parquet("tips", f"{SCRIPT_DIR}/tips.parquet")
Expand Down
20 changes: 7 additions & 13 deletions src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,10 @@ use crate::utils::wait_for_future;
use datafusion::arrow::pyarrow::FromPyArrow;
use datafusion::arrow::pyarrow::ToPyArrow;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::config::Extensions;
use datafusion::error::{DataFusionError, Result};
use datafusion::execution::context::TaskContext;
use datafusion::execution::disk_manager::DiskManagerConfig;
use datafusion::execution::memory_pool::FairSpillPool;
use datafusion::execution::options::ReadOptions;
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::physical_plan::{displayable, ExecutionPlan};
use datafusion::prelude::*;
Expand All @@ -47,13 +45,12 @@ type PyResultSet = Vec<PyObject>;
#[pyclass(name = "Context", module = "datafusion_ray", subclass)]
pub struct PyContext {
pub(crate) ctx: SessionContext,
use_ray_shuffle: bool,
}

#[pymethods]
impl PyContext {
#[new]
pub fn new(target_partitions: usize, use_ray_shuffle: bool) -> Result<Self> {
pub fn new(target_partitions: usize) -> Result<Self> {
let config = SessionConfig::default()
.with_target_partitions(target_partitions)
.with_batch_size(16 * 1024)
Expand All @@ -67,11 +64,8 @@ impl PyContext {
.with_memory_pool(Arc::new(FairSpillPool::new(mem_pool_size)))
.with_disk_manager(DiskManagerConfig::new_specified(vec!["/tmp".into()]));
let runtime = Arc::new(RuntimeEnv::new(runtime_config)?);
let ctx = SessionContext::with_config_rt(config, runtime);
Ok(Self {
ctx,
use_ray_shuffle,
})
let ctx = SessionContext::new_with_config_rt(config, runtime);
Ok(Self { ctx })
}

pub fn register_csv(
Expand All @@ -94,9 +88,9 @@ impl PyContext {

pub fn register_datalake_table(
&self,
name: &str,
path: Vec<String>,
py: Python,
_name: &str,
_path: Vec<String>,
_py: Python,
) -> PyResult<()> {
// let options = ParquetReadOptions::default();
// let listing_options = options.to_listing_options(&self.ctx.state().config());
Expand All @@ -119,7 +113,7 @@ impl PyContext {
let df = wait_for_future(py, self.ctx.sql(sql))?;
let plan = wait_for_future(py, df.create_physical_plan())?;

let graph = make_execution_graph(plan.clone(), self.use_ray_shuffle)?;
let graph = make_execution_graph(plan.clone())?;

// debug logging
let mut stages = graph.query_stages.values().collect::<Vec<_>>();
Expand Down
Loading