-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Example for using a separate threadpool for CPU bound work (try 3) #16331
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
alamb
wants to merge
5
commits into
apache:main
Choose a base branch
from
alamb:alamb/threadpool_example4
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
5 commits
Select commit
Hold shift + click to select a range
c908a09
Example for using a separate threadpool for CPU bound work (try 3)
alamb 56b0a3f
Update datafusion-examples/examples/thread_pools.rs
alamb 9b2e770
Merge remote-tracking branch 'apache/main' into alamb/threadpool_exam…
alamb cc26de1
Add a note about why the main Runtime is used for IO and not CPU
alamb 1cdf543
remove random thought
alamb File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,349 @@ | ||
// Licensed to the Apache Software Foundation (ASF) under one | ||
// or more contributor license agreements. See the NOTICE file | ||
// distributed with this work for additional information | ||
// regarding copyright ownership. The ASF licenses this file | ||
// to you under the Apache License, Version 2.0 (the | ||
// "License"); you may not use this file except in compliance | ||
// with the License. You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, | ||
// software distributed under the License is distributed on an | ||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
// KIND, either express or implied. See the License for the | ||
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
//! This example shows how to use separate thread pools (tokio [`Runtime`]))s to | ||
//! run the IO and CPU intensive parts of DataFusion plans. | ||
//! | ||
//! # Background | ||
//! | ||
//! DataFusion, by default, plans and executes all operations (both CPU and IO) | ||
//! on the same thread pool. This makes it fast and easy to get started, but | ||
//! can cause issues when running at scale, especially when fetching and operating | ||
//! on data directly from remote sources. | ||
//! | ||
//! Specifically, without configuration such as in this example, DataFusion | ||
//! plans and executes everything the same thread pool (Tokio Runtime), including | ||
//! any I/O, such as reading Parquet files from remote object storage | ||
//! (e.g. AWS S3), catalog access, and CPU intensive work. Running this diverse | ||
//! workload can lead to issues described in the [Architecture section] such as | ||
//! throttled network bandwidth (due to congestion control) and increased | ||
//! latencies or timeouts while processing network messages. | ||
//! | ||
//! [Architecture section]: https://docs.rs/datafusion/latest/datafusion/index.html#thread-scheduling-cpu--io-thread-pools-and-tokio-runtimes | ||
|
||
use arrow::util::pretty::pretty_format_batches; | ||
use datafusion::common::runtime::JoinSet; | ||
use datafusion::error::Result; | ||
use datafusion::execution::SendableRecordBatchStream; | ||
use datafusion::prelude::*; | ||
use futures::stream::StreamExt; | ||
use object_store::client::SpawnedReqwestConnector; | ||
use object_store::http::HttpBuilder; | ||
use std::sync::Arc; | ||
use std::time::Duration; | ||
use tokio::runtime::Handle; | ||
use tokio::sync::Notify; | ||
use url::Url; | ||
|
||
/// Normally, you don't need to worry about the details of the tokio | ||
/// [`Runtime`], but for this example it is important to understand how the | ||
/// [`Runtime`]s work. | ||
/// | ||
/// Each thread has "current" runtime that is installed in a thread local | ||
/// variable which is used by the `tokio::spawn` function. | ||
/// | ||
/// The `#[tokio::main]` macro creates a [`Runtime`] and installs it as | ||
/// as the "current" runtime in a thread local variable, on which any `async` | ||
/// [`Future`], [`Stream]`s and [`Task]`s are run. | ||
/// | ||
/// This example uses the runtime created by [`tokio::main`] to do I/O and spawn | ||
/// CPU intensive tasks on a separate [`Runtime`], mirroring the common pattern | ||
/// when using Rust libraries such as `tonic`. Using a separate `Runtime` for | ||
/// CPU bound tasks will often be simpler in larger applications, even though it | ||
/// makes this example slightly more complex. | ||
#[tokio::main] | ||
async fn main() -> Result<()> { | ||
// The first two examples read local files. Enabling the URL table feature | ||
// lets us treat filenames as tables in SQL. | ||
let ctx = SessionContext::new().enable_url_table(); | ||
let sql = format!( | ||
"SELECT * FROM '{}/alltypes_plain.parquet'", | ||
datafusion::test_util::parquet_test_data() | ||
); | ||
|
||
// Run a query on the current runtime. Calling `await` means the future | ||
// (in this case the `async` function and all spawned work in DataFusion | ||
// plans) on the current runtime. | ||
same_runtime(&ctx, &sql).await?; | ||
|
||
// Run the same query but this time on a different runtime. | ||
// | ||
// Since we call `await` here, the `async` function itself runs on the | ||
// current runtime, but internally `different_runtime_basic` executes the | ||
// DataFusion plan on a different Runtime. | ||
different_runtime_basic(ctx, sql).await?; | ||
|
||
// Run the same query on a different runtime, including remote IO. | ||
// | ||
// NOTE: This is best practice for production systems | ||
different_runtime_advanced().await?; | ||
|
||
Ok(()) | ||
} | ||
|
||
/// Run queries directly on the current tokio `Runtime` | ||
/// | ||
/// This is how most examples in DataFusion are written and works well for | ||
/// development, local query processing, and non latency sensitive workloads. | ||
async fn same_runtime(ctx: &SessionContext, sql: &str) -> Result<()> { | ||
// Calling .sql is an async function as it may also do network | ||
// I/O, for example to contact a remote catalog or do an object store LIST | ||
let df = ctx.sql(sql).await?; | ||
|
||
// While many examples call `collect` or `show()`, those methods buffers the | ||
// results. Internally DataFusion generates output a RecordBatch at a time | ||
|
||
// Calling `execute_stream` return a `SendableRecordBatchStream`. Depending | ||
// on the plan, this may also do network I/O, for example to begin reading a | ||
// parquet file from a remote object store. | ||
let mut stream: SendableRecordBatchStream = df.execute_stream().await?; | ||
|
||
// `next()` drives the plan, incrementally producing new `RecordBatch`es | ||
// using the current runtime. | ||
// | ||
// Perhaps somewhat non obviously, calling `next()` can also result in other | ||
// tasks being spawned on the current runtime (e.g. for `RepartitionExec` to | ||
// read data from each of its input partitions in parallel). | ||
// | ||
// Executing the plan using this pattern intermixes any IO and CPU intensive | ||
// work on same Runtime | ||
while let Some(batch) = stream.next().await { | ||
println!("{}", pretty_format_batches(&[batch?]).unwrap()); | ||
} | ||
Ok(()) | ||
} | ||
|
||
/// Run queries on a **different** Runtime dedicated for CPU bound work | ||
/// | ||
/// This example is suitable for running DataFusion plans against local data | ||
/// sources (e.g. files) and returning results to an async destination, as might | ||
/// be done to return query results to a remote client. | ||
/// | ||
/// Production systems which also read data locally or require very low latency | ||
/// should follow the recommendations on [`different_runtime_advanced`] when | ||
/// processing data from a remote source such as object storage. | ||
async fn different_runtime_basic(ctx: SessionContext, sql: String) -> Result<()> { | ||
// Since we are already in the context of runtime (installed by | ||
// #[tokio::main]), we need a new Runtime (threadpool) for CPU bound tasks | ||
let cpu_runtime = CpuRuntime::try_new()?; | ||
|
||
// Prepare a task that runs the plan on cpu_runtime and sends | ||
// the results back to the original runtime via a channel. | ||
let (tx, mut rx) = tokio::sync::mpsc::channel(2); | ||
let driver_task = async move { | ||
// Plan the query (which might require CPU work to evaluate statistics) | ||
let df = ctx.sql(&sql).await?; | ||
let mut stream: SendableRecordBatchStream = df.execute_stream().await?; | ||
|
||
// Calling `next()` to drive the plan in this task drives the | ||
// execution from the cpu runtime the other thread pool | ||
// | ||
// NOTE any IO run by this plan (for example, reading from an | ||
// `ObjectStore`) will be done on this new thread pool as well. | ||
while let Some(batch) = stream.next().await { | ||
if tx.send(batch).await.is_err() { | ||
// error means dropped receiver, so nothing will get results anymore | ||
return Ok(()); | ||
} | ||
} | ||
Ok(()) as Result<()> | ||
}; | ||
|
||
// Run the driver task on the cpu runtime. Use a JoinSet to | ||
// ensure the spawned task is canceled on error/drop | ||
let mut join_set = JoinSet::new(); | ||
join_set.spawn_on(driver_task, cpu_runtime.handle()); | ||
|
||
// Retrieve the results in the original (IO) runtime. This requires only | ||
// minimal work (pass pointers around). | ||
while let Some(batch) = rx.recv().await { | ||
println!("{}", pretty_format_batches(&[batch?])?); | ||
} | ||
|
||
// wait for completion of the driver task | ||
drain_join_set(join_set).await; | ||
|
||
Ok(()) | ||
} | ||
|
||
/// Run CPU intensive work on a different runtime but do IO operations (object | ||
/// store access) on the current runtime. | ||
async fn different_runtime_advanced() -> Result<()> { | ||
// In this example, we will query a file via https, reading | ||
// the data directly from the plan | ||
|
||
// The current runtime (created by tokio::main) is used for IO | ||
// | ||
// Note this handle should be used for *ALL* remote IO operations in your | ||
// systems, including remote catalog access, which is not included in this | ||
// example. | ||
let cpu_runtime = CpuRuntime::try_new()?; | ||
let io_handle = Handle::current(); | ||
|
||
let ctx = SessionContext::new(); | ||
|
||
// By default, the HttpStore use the same runtime that calls `await` for IO | ||
// operations. This means that if the DataFusion plan is called from the | ||
// cpu_runtime, the HttpStore IO operations will *also* run on the CPU | ||
// runtime, which will error. | ||
// | ||
// To avoid this, we use a `SpawnedReqwestConnector` to configure the | ||
// `ObjectStore` to run the HTTP requests on the IO runtime. | ||
let base_url = Url::parse("https://github.com").unwrap(); | ||
let http_store = HttpBuilder::new() | ||
.with_url(base_url.clone()) | ||
// Use the io_runtime to run the HTTP requests. Without this line, | ||
// you will see an error such as: | ||
// A Tokio 1.x context was found, but IO is disabled. | ||
.with_http_connector(SpawnedReqwestConnector::new(io_handle)) | ||
.build()?; | ||
|
||
// Tell DataFusion to process `http://` urls with this wrapped object store | ||
ctx.register_object_store(&base_url, Arc::new(http_store)); | ||
|
||
// As above, plan and execute the query on the cpu runtime. | ||
let (tx, mut rx) = tokio::sync::mpsc::channel(2); | ||
let driver_task = async move { | ||
// Plan / execute the query | ||
let url = "https://github.com/apache/arrow-testing/raw/master/data/csv/aggregate_test_100.csv"; | ||
let df = ctx | ||
.sql(&format!("SELECT c1,c2,c3 FROM '{url}' LIMIT 5")) | ||
.await?; | ||
|
||
let mut stream: SendableRecordBatchStream = df.execute_stream().await?; | ||
|
||
// Note you can do other non trivial CPU work on the results of the | ||
// stream before sending it back to the original runtime. For example, | ||
// calling a FlightDataEncoder to convert the results to flight messages | ||
// to send over the network | ||
|
||
// send results, as above | ||
while let Some(batch) = stream.next().await { | ||
if tx.send(batch).await.is_err() { | ||
return Ok(()); | ||
} | ||
} | ||
Ok(()) as Result<()> | ||
}; | ||
|
||
let mut join_set = JoinSet::new(); | ||
join_set.spawn_on(driver_task, cpu_runtime.handle()); | ||
while let Some(batch) = rx.recv().await { | ||
println!("{}", pretty_format_batches(&[batch?])?); | ||
} | ||
|
||
Ok(()) | ||
} | ||
|
||
/// Waits for all tasks in the JoinSet to complete and reports any errors that | ||
/// occurred. | ||
/// | ||
/// If we don't do this, any errors that occur in the task (such as IO errors) | ||
/// are not reported. | ||
async fn drain_join_set(mut join_set: JoinSet<Result<()>>) { | ||
// retrieve any errors from the tasks | ||
while let Some(result) = join_set.join_next().await { | ||
match result { | ||
Ok(Ok(())) => {} // task completed successfully | ||
Ok(Err(e)) => eprintln!("Task failed: {e}"), // task failed | ||
Err(e) => eprintln!("JoinSet error: {e}"), // JoinSet error | ||
} | ||
} | ||
} | ||
|
||
/// Creates a Tokio [`Runtime`] for use with CPU bound tasks | ||
/// | ||
/// Tokio forbids dropping `Runtime`s in async contexts, so creating a separate | ||
/// `Runtime` correctly is somewhat tricky. This structure manages the creation | ||
/// and shutdown of a separate thread. | ||
/// | ||
/// # Notes | ||
/// On drop, the current thread will wait for all remaining tasks to complete. | ||
/// | ||
/// Depending on your application, more sophisticated shutdown logic may be | ||
/// required, such as ensuring that no new tasks are added to the runtime. | ||
/// | ||
/// # Credits | ||
/// This code is derived from code originally written for [InfluxDB 3.0] | ||
/// | ||
/// [InfluxDB 3.0]: https://github.com/influxdata/influxdb3_core/tree/6fcbb004232738d55655f32f4ad2385523d10696/executor | ||
struct CpuRuntime { | ||
/// Handle is the tokio structure for interacting with a Runtime. | ||
handle: Handle, | ||
/// Signal to start shutting down | ||
notify_shutdown: Arc<Notify>, | ||
/// When thread is active, is Some | ||
thread_join_handle: Option<std::thread::JoinHandle<()>>, | ||
} | ||
|
||
impl Drop for CpuRuntime { | ||
fn drop(&mut self) { | ||
// Notify the thread to shutdown. | ||
self.notify_shutdown.notify_one(); | ||
// IN a production system you also need to ensure that nothing adds new | ||
// tasks to the underlying runtime after this point | ||
if let Some(thread_join_handle) = self.thread_join_handle.take() { | ||
// If the thread is still running, we wait for it to finish | ||
println!("Shutting down CPU runtime thread..."); | ||
if let Err(e) = thread_join_handle.join() { | ||
eprintln!("Error joining CPU runtime thread: {e:?}",); | ||
} else { | ||
println!("CPU runtime thread shutdown successfully."); | ||
} | ||
} | ||
} | ||
} | ||
|
||
impl CpuRuntime { | ||
/// Create a new Tokio Runtime for CPU bound tasks | ||
pub fn try_new() -> Result<Self> { | ||
let cpu_runtime = tokio::runtime::Builder::new_multi_thread() | ||
.enable_time() | ||
.build()?; | ||
let handle = cpu_runtime.handle().clone(); | ||
let notify_shutdown = Arc::new(Notify::new()); | ||
let notify_shutdown_captured = Arc::clone(¬ify_shutdown); | ||
|
||
// The cpu_runtime must be dropped on a separate thread, | ||
let thread_join_handle = std::thread::spawn(move || { | ||
cpu_runtime.block_on(async move { | ||
notify_shutdown_captured.notified().await; | ||
}); | ||
cpu_runtime.shutdown_timeout(Duration::from_secs(1)); | ||
}); | ||
|
||
Ok(Self { | ||
handle, | ||
notify_shutdown, | ||
thread_join_handle: Some(thread_join_handle), | ||
}) | ||
} | ||
|
||
/// Return a handle suitable for spawning CPU bound tasks | ||
/// | ||
/// # Notes | ||
/// | ||
/// If a task spawned on this handle attempts to do IO, it will error with a | ||
/// message such as: | ||
/// | ||
/// ```text | ||
///A Tokio 1.x context was found, but IO is disabled. | ||
/// ``` | ||
pub fn handle(&self) -> &Handle { | ||
&self.handle | ||
} | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question: this seems like the inverse of what I would have expected where DF would run on the current runtime and IO would run on a specialized runtime. Is there a reason why that would not work here? I would think it would simplify the code a fair bit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think there is any technical reason
The reason I did it this way is that I think most applications / server programs (and the examples from tokio, etc) use the runtime automatically created by tokio for IO and so I wanted to follow the same pattern.
I'll update the documentation to make this clearer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My guess for that is that many/(most?) systems don't have a way to push IO to a separate runtime whereas it's easier to do so with cpu much of the time. However, with ObjectStore at least that isn't the case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah. Another consideration is that most uses of tokio are not for CPU bound work, so it makes sense to just use the default pool.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I updated it to say