-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Serialize function signature simplifications #8802
Conversation
I think this part of the tokio docs is confusing and somewhat misleading. Specifically what "a lot of compute" means is very dependent on the application. In this case, I think the actual serialization is done for a I wrote up a detailed justification for using tokio for CPU bound tasks in the following blog post, which I think is still very relevant https://thenewstack.io/using-rustlangs-async-tokio-runtime-for-cpu-bound-tasks/
Almost all of a DataFusion plan's execution is CPU bound and not asynchronous, yet they are executed using So all in all, I don't agree with the stated rationale of this PR that we should not use the same executor for CPU bounds tasks |
@alamb, would the current approach work well in a setting with a small number of threads and large batch sizes? The scenario that came up today when we discussed this internally was running a deep query (i.e. with a lot of operators) in such a setting. We were wondering if the pull mechanism would get disrupted if serializations would take too long in such an environment. Maybe the answer is no (and we should test/measure to see what happens), but I'd love to hear your thought process on such a scenario. |
This likely depends on what "large batch size" means and what type of disruption you are talking about. My internal mental model is that if you give the query plan The larger the
So I think the application needs to figure out how it wants to trade off these features
I am always a fan of measuring / testing. |
Thanks, I will ask @metesynnada to run some experiments so we all can learn if there are any intricacies at play here. We will report the results here and close this PR if this (1) doesn't help in any scenario, or (2) hurts in some scenario. |
Marking this as draft to signify there is ongoing work -- please feel free to mark it ready for review if I got that wrong |
A direct comparison was conducted between the Data generationTPCH data is used with scale 0.1. Benchmark CodeThe benchmark code use criterion::Criterion;
use criterion::{criterion_group, criterion_main};
use criterion::{BatchSize, BenchmarkId};
use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaBuilder, SchemaRef};
use datafusion::catalog::TableReference;
use datafusion::common::Result;
use datafusion::datasource::file_format::csv::CsvFormat;
use datafusion::datasource::listing::{
ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
};
use datafusion::prelude::{SessionConfig, SessionContext};
use std::sync::Arc;
use tokio::runtime::Builder;
fn register_csv(
ctx: &SessionContext,
table_name: &str,
schema: SchemaRef,
table_path: impl AsRef<str>,
) -> Result<()> {
let file_format = CsvFormat::default()
.with_has_header(false)
.with_delimiter(b'|');
let options = ListingOptions::new(Arc::new(file_format))
.with_file_extension(".tbl")
.with_target_partitions(ctx.copied_config().batch_size());
let table_path = ListingTableUrl::parse(table_path)?;
let config = ListingTableConfig::new(table_path)
.with_listing_options(options)
.with_schema(schema);
let table = ListingTable::try_new(config)?;
ctx.register_table(
TableReference::Bare {
table: table_name.into(),
},
Arc::new(table),
)?;
Ok(())
}
async fn execute_query(ctx: SessionContext, sql: String) {
ctx.sql(&sql).await.unwrap().collect().await.unwrap();
}
fn get_tpch_table_schema(table: &str) -> Schema {
match table {
"orders" => Schema::new(vec![
Field::new("o_orderkey", DataType::Int64, false),
Field::new("o_custkey", DataType::Int64, false),
Field::new("o_orderstatus", DataType::Utf8, false),
Field::new("o_totalprice", DataType::Decimal128(15, 2), false),
Field::new("o_orderdate", DataType::Date32, false),
Field::new("o_orderpriority", DataType::Utf8, false),
Field::new("o_clerk", DataType::Utf8, false),
Field::new("o_shippriority", DataType::Int32, false),
Field::new("o_comment", DataType::Utf8, false),
]),
"lineitem" => Schema::new(vec![
Field::new("l_orderkey", DataType::Int64, false),
Field::new("l_partkey", DataType::Int64, false),
Field::new("l_suppkey", DataType::Int64, false),
Field::new("l_linenumber", DataType::Int32, false),
Field::new("l_quantity", DataType::Decimal128(15, 2), false),
Field::new("l_extendedprice", DataType::Decimal128(15, 2), false),
Field::new("l_discount", DataType::Decimal128(15, 2), false),
Field::new("l_tax", DataType::Decimal128(15, 2), false),
Field::new("l_returnflag", DataType::Utf8, false),
Field::new("l_linestatus", DataType::Utf8, false),
Field::new("l_shipdate", DataType::Date32, false),
Field::new("l_commitdate", DataType::Date32, false),
Field::new("l_receiptdate", DataType::Date32, false),
Field::new("l_shipinstruct", DataType::Utf8, false),
Field::new("l_shipmode", DataType::Utf8, false),
Field::new("l_comment", DataType::Utf8, false),
]),
_ => unimplemented!("Table: {}", table),
}
}
pub fn get_tbl_tpch_table_schema(table: &str) -> Schema {
let mut schema = SchemaBuilder::from(get_tpch_table_schema(table).fields);
schema.push(Field::new("__placeholder", DataType::Utf8, true));
schema.finish()
}
fn delete_file(path: &str) {
std::fs::remove_file(path).expect("Failed to delete file");
}
fn from_elem_reading(c: &mut Criterion) {
let batch_sizes = [100, 1000, 3000];
let target_partitions = [1, 4];
let worker_threads = [1, 4];
let mut group = c.benchmark_group("parameter group");
for batch_size in batch_sizes.iter() {
for target_partition in target_partitions {
for worker_thread in worker_threads {
group.bench_with_input(BenchmarkId::new(
format!("sink_bs{}_tp{}", batch_size, target_partition),
worker_thread,
), &(batch_size, target_partition, worker_thread), |b, &(batch_size, target_partition, worker_thread)| {
let rt = Builder::new_multi_thread()
.worker_threads(worker_thread)
.build()
.unwrap();
b.to_async(rt).iter_batched(
|| {
let csv_file = tempfile::Builder::new()
.prefix("foo")
.suffix(".csv")
.tempfile()
.unwrap();
let path = csv_file.path().to_str().unwrap().to_string();
let config = SessionConfig::new()
.with_coalesce_batches(false)
.with_batch_size(*batch_size)
.with_target_partitions(target_partition);
let ctx = SessionContext::new_with_config(config);
let lineitem = Arc::new(get_tbl_tpch_table_schema("lineitem"));
register_csv(&ctx, "lineitem", lineitem, "/path/to/tpch_sf0.1/lineitem.tbl").unwrap();
let orders = Arc::new(get_tbl_tpch_table_schema("orders"));
register_csv(&ctx, "orders", orders, "/path/to/tpch_sf0.1/orders.tbl").unwrap();
let sql = format!(
"COPY (SELECT * FROM
lineitem, orders
where
l_orderkey = o_orderkey)
to '{path}' (format csv);"
);
(path, ctx, sql)
},
|(path, clone_ctx, sql)| async move {
execute_query(clone_ctx, sql).await;
delete_file(&path);
},
BatchSize::LargeInput,
);
});
}
}
}
group.finish();
}
criterion_group!(benches, from_elem_reading);
criterion_main!(benches); Results
I try my best to make an objective benchmark, but I might miss something. |
Interesting results. @alamb, can you take a look at the measurement script? Let's make sure there are no methodological mistakes before arriving at any conclusions. |
TLDR is that I don't think this benchmark keeps the number of cores constant When I run the first benchmark run that is supposed to use 1 core:
Main uses a single core as epxected When I run on the What I think is happening is that the let rt = Builder::new_multi_thread()
.worker_threads(worker_thread)
.build()
.unwrap(); The fact that the branch is using 3 threads and main can only use one I think accounts for the speed differences Methodologydocker run -v "/Users/andrewlamb/Downloads/tpch_sf0.1":/data -it --rm ghcr.io/databloom-ai/tpch-docker:main -vf -s 0.1 I put the benchmark code on this branch https://github.com/alamb/arrow-datafusion/tree/alamb/write_threads_bench And ran cargo bench --bench write_threads |
Good catch. Tokio offers a |
I run the benchmarks in single-core and there is no substantial difference. Thus, I will convert this to spawn again. |
Agreed. I changed the PR title and description to match what this does now -- simplifying function signatures only (no change to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM -- thank you.
Which issue does this PR close?
Closes #.
Rationale for this change
https://docs.rs/tokio/latest/tokio/task/fn.spawn_blocking.htmlPreviously, the serialization process was always asynchronous, even if there were no asynchronous calls involved, which was a CPU-bound operation. This is an anti-pattern.This PR simplifies function signatures in
BatchSerializer
.What changes are included in this PR?
Make
BatchSerializer
sync, and use it with.tokio::task::spawn_blocking
Are these changes tested?
With existing tests.
Are there any user-facing changes?
No.