Skip to content

Example for using a separate threadpool for CPU bound work (try 3) #16331

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
349 changes: 349 additions & 0 deletions datafusion-examples/examples/thread_pools.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,349 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! This example shows how to use separate thread pools (tokio [`Runtime`]))s to
//! run the IO and CPU intensive parts of DataFusion plans.
//!
//! # Background
//!
//! DataFusion, by default, plans and executes all operations (both CPU and IO)
//! on the same thread pool. This makes it fast and easy to get started, but
//! can cause issues when running at scale, especially when fetching and operating
//! on data directly from remote sources.
//!
//! Specifically, without configuration such as in this example, DataFusion
//! plans and executes everything the same thread pool (Tokio Runtime), including
//! any I/O, such as reading Parquet files from remote object storage
//! (e.g. AWS S3), catalog access, and CPU intensive work. Running this diverse
//! workload can lead to issues described in the [Architecture section] such as
//! throttled network bandwidth (due to congestion control) and increased
//! latencies or timeouts while processing network messages.
//!
//! [Architecture section]: https://docs.rs/datafusion/latest/datafusion/index.html#thread-scheduling-cpu--io-thread-pools-and-tokio-runtimes

use arrow::util::pretty::pretty_format_batches;
use datafusion::common::runtime::JoinSet;
use datafusion::error::Result;
use datafusion::execution::SendableRecordBatchStream;
use datafusion::prelude::*;
use futures::stream::StreamExt;
use object_store::client::SpawnedReqwestConnector;
use object_store::http::HttpBuilder;
use std::sync::Arc;
use std::time::Duration;
use tokio::runtime::Handle;
use tokio::sync::Notify;
use url::Url;

/// Normally, you don't need to worry about the details of the tokio
/// [`Runtime`], but for this example it is important to understand how the
/// [`Runtime`]s work.
///
/// Each thread has "current" runtime that is installed in a thread local
/// variable which is used by the `tokio::spawn` function.
///
/// The `#[tokio::main]` macro creates a [`Runtime`] and installs it as
/// as the "current" runtime in a thread local variable, on which any `async`
/// [`Future`], [`Stream]`s and [`Task]`s are run.
///
/// This example uses the runtime created by [`tokio::main`] to do I/O and spawn
/// CPU intensive tasks on a separate [`Runtime`], mirroring the common pattern
/// when using Rust libraries such as `tonic`. Using a separate `Runtime` for
/// CPU bound tasks will often be simpler in larger applications, even though it
/// makes this example slightly more complex.
#[tokio::main]
async fn main() -> Result<()> {
// The first two examples read local files. Enabling the URL table feature
// lets us treat filenames as tables in SQL.
let ctx = SessionContext::new().enable_url_table();
let sql = format!(
"SELECT * FROM '{}/alltypes_plain.parquet'",
datafusion::test_util::parquet_test_data()
);

// Run a query on the current runtime. Calling `await` means the future
// (in this case the `async` function and all spawned work in DataFusion
// plans) on the current runtime.
same_runtime(&ctx, &sql).await?;

// Run the same query but this time on a different runtime.
//
// Since we call `await` here, the `async` function itself runs on the
// current runtime, but internally `different_runtime_basic` executes the
// DataFusion plan on a different Runtime.
different_runtime_basic(ctx, sql).await?;

// Run the same query on a different runtime, including remote IO.
//
// NOTE: This is best practice for production systems
different_runtime_advanced().await?;

Ok(())
}

/// Run queries directly on the current tokio `Runtime`
///
/// This is how most examples in DataFusion are written and works well for
/// development, local query processing, and non latency sensitive workloads.
async fn same_runtime(ctx: &SessionContext, sql: &str) -> Result<()> {
// Calling .sql is an async function as it may also do network
// I/O, for example to contact a remote catalog or do an object store LIST
let df = ctx.sql(sql).await?;

// While many examples call `collect` or `show()`, those methods buffers the
// results. Internally DataFusion generates output a RecordBatch at a time

// Calling `execute_stream` return a `SendableRecordBatchStream`. Depending
// on the plan, this may also do network I/O, for example to begin reading a
// parquet file from a remote object store.
let mut stream: SendableRecordBatchStream = df.execute_stream().await?;

// `next()` drives the plan, incrementally producing new `RecordBatch`es
// using the current runtime.
//
// Perhaps somewhat non obviously, calling `next()` can also result in other
// tasks being spawned on the current runtime (e.g. for `RepartitionExec` to
// read data from each of its input partitions in parallel).
//
// Executing the plan using this pattern intermixes any IO and CPU intensive
// work on same Runtime
while let Some(batch) = stream.next().await {
println!("{}", pretty_format_batches(&[batch?]).unwrap());
}
Ok(())
}

/// Run queries on a **different** Runtime dedicated for CPU bound work
///
/// This example is suitable for running DataFusion plans against local data
/// sources (e.g. files) and returning results to an async destination, as might
/// be done to return query results to a remote client.
///
/// Production systems which also read data locally or require very low latency
/// should follow the recommendations on [`different_runtime_advanced`] when
/// processing data from a remote source such as object storage.
async fn different_runtime_basic(ctx: SessionContext, sql: String) -> Result<()> {
// Since we are already in the context of runtime (installed by
// #[tokio::main]), we need a new Runtime (threadpool) for CPU bound tasks
let cpu_runtime = CpuRuntime::try_new()?;

// Prepare a task that runs the plan on cpu_runtime and sends
// the results back to the original runtime via a channel.
let (tx, mut rx) = tokio::sync::mpsc::channel(2);
let driver_task = async move {
// Plan the query (which might require CPU work to evaluate statistics)
let df = ctx.sql(&sql).await?;
let mut stream: SendableRecordBatchStream = df.execute_stream().await?;

// Calling `next()` to drive the plan in this task drives the
// execution from the cpu runtime the other thread pool
//
// NOTE any IO run by this plan (for example, reading from an
// `ObjectStore`) will be done on this new thread pool as well.
while let Some(batch) = stream.next().await {
if tx.send(batch).await.is_err() {
// error means dropped receiver, so nothing will get results anymore
return Ok(());
}
}
Ok(()) as Result<()>
};

// Run the driver task on the cpu runtime. Use a JoinSet to
// ensure the spawned task is canceled on error/drop
let mut join_set = JoinSet::new();
join_set.spawn_on(driver_task, cpu_runtime.handle());

// Retrieve the results in the original (IO) runtime. This requires only
// minimal work (pass pointers around).
while let Some(batch) = rx.recv().await {
println!("{}", pretty_format_batches(&[batch?])?);
}

// wait for completion of the driver task
drain_join_set(join_set).await;

Ok(())
}

/// Run CPU intensive work on a different runtime but do IO operations (object
/// store access) on the current runtime.
async fn different_runtime_advanced() -> Result<()> {
// In this example, we will query a file via https, reading
// the data directly from the plan

// The current runtime (created by tokio::main) is used for IO
//
// Note this handle should be used for *ALL* remote IO operations in your
// systems, including remote catalog access, which is not included in this
// example.
let cpu_runtime = CpuRuntime::try_new()?;
let io_handle = Handle::current();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: this seems like the inverse of what I would have expected where DF would run on the current runtime and IO would run on a specialized runtime. Is there a reason why that would not work here? I would think it would simplify the code a fair bit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think there is any technical reason

The reason I did it this way is that I think most applications / server programs (and the examples from tokio, etc) use the runtime automatically created by tokio for IO and so I wanted to follow the same pattern.

I'll update the documentation to make this clearer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My guess for that is that many/(most?) systems don't have a way to push IO to a separate runtime whereas it's easier to do so with cpu much of the time. However, with ObjectStore at least that isn't the case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. Another consideration is that most uses of tokio are not for CPU bound work, so it makes sense to just use the default pool.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated it to say

/// This example uses the runtime created by [`tokio::main`] to do I/O and spawn
/// CPU intensive tasks on a separate [`Runtime`], mirroring the common pattern
/// when using Rust libraries such as `tonic`. Using a separate `Runtime` for
/// CPU bound tasks will often be simpler in larger applications, even though it
/// makes this example slightly more complex.


let ctx = SessionContext::new();

// By default, the HttpStore use the same runtime that calls `await` for IO
// operations. This means that if the DataFusion plan is called from the
// cpu_runtime, the HttpStore IO operations will *also* run on the CPU
// runtime, which will error.
//
// To avoid this, we use a `SpawnedReqwestConnector` to configure the
// `ObjectStore` to run the HTTP requests on the IO runtime.
let base_url = Url::parse("https://github.com").unwrap();
let http_store = HttpBuilder::new()
.with_url(base_url.clone())
// Use the io_runtime to run the HTTP requests. Without this line,
// you will see an error such as:
// A Tokio 1.x context was found, but IO is disabled.
.with_http_connector(SpawnedReqwestConnector::new(io_handle))
.build()?;

// Tell DataFusion to process `http://` urls with this wrapped object store
ctx.register_object_store(&base_url, Arc::new(http_store));

// As above, plan and execute the query on the cpu runtime.
let (tx, mut rx) = tokio::sync::mpsc::channel(2);
let driver_task = async move {
// Plan / execute the query
let url = "https://github.com/apache/arrow-testing/raw/master/data/csv/aggregate_test_100.csv";
let df = ctx
.sql(&format!("SELECT c1,c2,c3 FROM '{url}' LIMIT 5"))
.await?;

let mut stream: SendableRecordBatchStream = df.execute_stream().await?;

// Note you can do other non trivial CPU work on the results of the
// stream before sending it back to the original runtime. For example,
// calling a FlightDataEncoder to convert the results to flight messages
// to send over the network

// send results, as above
while let Some(batch) = stream.next().await {
if tx.send(batch).await.is_err() {
return Ok(());
}
}
Ok(()) as Result<()>
};

let mut join_set = JoinSet::new();
join_set.spawn_on(driver_task, cpu_runtime.handle());
while let Some(batch) = rx.recv().await {
println!("{}", pretty_format_batches(&[batch?])?);
}

Ok(())
}

/// Waits for all tasks in the JoinSet to complete and reports any errors that
/// occurred.
///
/// If we don't do this, any errors that occur in the task (such as IO errors)
/// are not reported.
async fn drain_join_set(mut join_set: JoinSet<Result<()>>) {
// retrieve any errors from the tasks
while let Some(result) = join_set.join_next().await {
match result {
Ok(Ok(())) => {} // task completed successfully
Ok(Err(e)) => eprintln!("Task failed: {e}"), // task failed
Err(e) => eprintln!("JoinSet error: {e}"), // JoinSet error
}
}
}

/// Creates a Tokio [`Runtime`] for use with CPU bound tasks
///
/// Tokio forbids dropping `Runtime`s in async contexts, so creating a separate
/// `Runtime` correctly is somewhat tricky. This structure manages the creation
/// and shutdown of a separate thread.
///
/// # Notes
/// On drop, the current thread will wait for all remaining tasks to complete.
///
/// Depending on your application, more sophisticated shutdown logic may be
/// required, such as ensuring that no new tasks are added to the runtime.
///
/// # Credits
/// This code is derived from code originally written for [InfluxDB 3.0]
///
/// [InfluxDB 3.0]: https://github.com/influxdata/influxdb3_core/tree/6fcbb004232738d55655f32f4ad2385523d10696/executor
struct CpuRuntime {
/// Handle is the tokio structure for interacting with a Runtime.
handle: Handle,
/// Signal to start shutting down
notify_shutdown: Arc<Notify>,
/// When thread is active, is Some
thread_join_handle: Option<std::thread::JoinHandle<()>>,
}

impl Drop for CpuRuntime {
fn drop(&mut self) {
// Notify the thread to shutdown.
self.notify_shutdown.notify_one();
// IN a production system you also need to ensure that nothing adds new
// tasks to the underlying runtime after this point
if let Some(thread_join_handle) = self.thread_join_handle.take() {
// If the thread is still running, we wait for it to finish
println!("Shutting down CPU runtime thread...");
if let Err(e) = thread_join_handle.join() {
eprintln!("Error joining CPU runtime thread: {e:?}",);
} else {
println!("CPU runtime thread shutdown successfully.");
}
}
}
}

impl CpuRuntime {
/// Create a new Tokio Runtime for CPU bound tasks
pub fn try_new() -> Result<Self> {
let cpu_runtime = tokio::runtime::Builder::new_multi_thread()
.enable_time()
.build()?;
let handle = cpu_runtime.handle().clone();
let notify_shutdown = Arc::new(Notify::new());
let notify_shutdown_captured = Arc::clone(&notify_shutdown);

// The cpu_runtime must be dropped on a separate thread,
let thread_join_handle = std::thread::spawn(move || {
cpu_runtime.block_on(async move {
notify_shutdown_captured.notified().await;
});
cpu_runtime.shutdown_timeout(Duration::from_secs(1));
});

Ok(Self {
handle,
notify_shutdown,
thread_join_handle: Some(thread_join_handle),
})
}

/// Return a handle suitable for spawning CPU bound tasks
///
/// # Notes
///
/// If a task spawned on this handle attempts to do IO, it will error with a
/// message such as:
///
/// ```text
///A Tokio 1.x context was found, but IO is disabled.
/// ```
pub fn handle(&self) -> &Handle {
&self.handle
}
}
18 changes: 11 additions & 7 deletions datafusion/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -510,17 +510,20 @@
//! initial development and processing local files, but it can lead to problems
//! under load and/or when reading from network sources such as AWS S3.
//!
//! ### Optimizing Latency: Throttled CPU / IO under Highly Concurrent Load
//!
//! If your system does not fully utilize either the CPU or network bandwidth
//! during execution, or you see significantly higher tail (e.g. p99) latencies
//! responding to network requests, **it is likely you need to use a different
//! [`Runtime`] for CPU intensive DataFusion plans**. This effect can be especially
//! pronounced when running several queries concurrently.
//! [`Runtime`] for DataFusion plans**. The [thread_pools example]
//! has an example of how to do so.
//!
//! As shown in the following figure, using the same [`Runtime`] for both CPU
//! intensive processing and network requests can introduce significant
//! delays in responding to those network requests. Delays in processing network
//! requests can and does lead network flow control to throttle the available
//! bandwidth in response.
//! As shown below, using the same [`Runtime`] for both CPU intensive processing
//! and network requests can introduce significant delays in responding to
//! those network requests. Delays in processing network requests can and does
//! lead network flow control to throttle the available bandwidth in response.
//! This effect can be especially pronounced when running multiple queries
//! concurrently.
//!
//! ```text
//! Legend
Expand Down Expand Up @@ -602,6 +605,7 @@
//!
//! [Tokio]: https://tokio.rs
//! [`Runtime`]: tokio::runtime::Runtime
//! [thread_pools example]: https://github.com/apache/datafusion/tree/main/datafusion-examples/examples/thread_pools.rs
//! [`task`]: tokio::task
//! [Using Rustlang’s Async Tokio Runtime for CPU-Bound Tasks]: https://thenewstack.io/using-rustlangs-async-tokio-runtime-for-cpu-bound-tasks/
//! [`RepartitionExec`]: physical_plan::repartition::RepartitionExec
Expand Down