Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Serialize function signature simplifications #8802

Merged
merged 3 commits into from
Jan 12, 2024
Merged

Serialize function signature simplifications #8802

merged 3 commits into from
Jan 12, 2024

Conversation

metesynnada
Copy link
Contributor

@metesynnada metesynnada commented Jan 9, 2024

Which issue does this PR close?

Closes #.

Rationale for this change

In general, issuing a blocking call or performing a lot of compute in a future without yielding is problematic, as it may prevent the executor from driving other futures forward.

https://docs.rs/tokio/latest/tokio/task/fn.spawn_blocking.html

Previously, the serialization process was always asynchronous, even if there were no asynchronous calls involved, which was a CPU-bound operation. This is an anti-pattern.

This PR simplifies function signatures in BatchSerializer.

What changes are included in this PR?

Make BatchSerializer sync , and use it with tokio::task::spawn_blocking.

Are these changes tested?

With existing tests.

Are there any user-facing changes?

No.

@github-actions github-actions bot added the core Core DataFusion crate label Jan 9, 2024
@alamb
Copy link
Contributor

alamb commented Jan 9, 2024

In general, issuing a blocking call or performing a lot of compute in a future without yielding is problematic, as it may prevent the executor from driving other futures forward.

I think this part of the tokio docs is confusing and somewhat misleading. Specifically what "a lot of compute" means is very dependent on the application.

In this case, I think the actual serialization is done for a RecordBatch and then await is called to potentially yield control. I don't think serializing a single RecordBatch qualifies as "a lot of compute"

I wrote up a detailed justification for using tokio for CPU bound tasks in the following blog post, which I think is still very relevant https://thenewstack.io/using-rustlangs-async-tokio-runtime-for-cpu-bound-tasks/

Previously, the serialization process was always asynchronous, even if there were no asynchronous calls involved, which was a CPU-bound operation.

Almost all of a DataFusion plan's execution is CPU bound and not asynchronous, yet they are executed using async heavily. I don't think this is a problem (for reasons explained in the blog) and I don't see any reason we would want to treat writing batches differently.

So all in all, I don't agree with the stated rationale of this PR that we should not use the same executor for CPU bounds tasks

@ozankabak
Copy link
Contributor

ozankabak commented Jan 9, 2024

@alamb, would the current approach work well in a setting with a small number of threads and large batch sizes? The scenario that came up today when we discussed this internally was running a deep query (i.e. with a lot of operators) in such a setting. We were wondering if the pull mechanism would get disrupted if serializations would take too long in such an environment.

Maybe the answer is no (and we should test/measure to see what happens), but I'd love to hear your thought process on such a scenario.

@alamb
Copy link
Contributor

alamb commented Jan 9, 2024

@alamb, would the current approach work well in a setting with a small number of threads and large batch sizes? The scenario that came up today when we discussed this internally was running a deep query (i.e. with a lot of operators) in such a setting. We were wondering if the pull mechanism would get disrupted if serializations would take too long in such an environment.

This likely depends on what "large batch size" means and what type of disruption you are talking about.

My internal mental model is that if you give the query plan N threads, it should be able to keep all N threads busy all of the time. Each output serializer will pull the next batch to serialize (the same thread will likely compute the input to serialize if each of the input futures is ready) and then serialize the batch. The CPU should be kept busy the entire time making input or serializing output and there won't be any stalls and the query won't suffer context switching overhead (due to more threads than CPUs)

The larger the target_batch_size is

  1. the greater the intermediate memory required (as each intermediate batch takes more)
  2. the greater the latency may be between generating subsequent batches of output
  3. the lower the CPU overhead is (as there are many things done "per batch" which gets amortized over many more rows)

So I think the application needs to figure out how it wants to trade off these features

Maybe the answer is no (and we should test/measure to see what happens), but I'd love to hear your thought process on such a scenario.

I am always a fan of measuring / testing.

@ozankabak
Copy link
Contributor

Thanks, I will ask @metesynnada to run some experiments so we all can learn if there are any intricacies at play here.

We will report the results here and close this PR if this (1) doesn't help in any scenario, or (2) hurts in some scenario.

@alamb alamb marked this pull request as draft January 9, 2024 20:36
@alamb
Copy link
Contributor

alamb commented Jan 9, 2024

Marking this as draft to signify there is ongoing work -- please feel free to mark it ready for review if I got that wrong

@metesynnada
Copy link
Contributor Author

A direct comparison was conducted between the main and the upstream/spawn-blocking-for-se branches. The scenario involved reading data from a file and writing the results to another file. Initially, the approach was to read from MemoryExec and write to a file. However, no significant differences were observed between the two branches. Therefore, I decided not to present those results.

Data generation

TPCH data is used with scale 0.1.

Benchmark Code

The benchmark code

use criterion::Criterion;
use criterion::{criterion_group, criterion_main};
use criterion::{BatchSize, BenchmarkId};
use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaBuilder, SchemaRef};
use datafusion::catalog::TableReference;
use datafusion::common::Result;
use datafusion::datasource::file_format::csv::CsvFormat;
use datafusion::datasource::listing::{
    ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
};
use datafusion::prelude::{SessionConfig, SessionContext};
use std::sync::Arc;
use tokio::runtime::Builder;

fn register_csv(
    ctx: &SessionContext,
    table_name: &str,
    schema: SchemaRef,
    table_path: impl AsRef<str>,
) -> Result<()> {
    let file_format = CsvFormat::default()
        .with_has_header(false)
        .with_delimiter(b'|');

    let options = ListingOptions::new(Arc::new(file_format))
        .with_file_extension(".tbl")
        .with_target_partitions(ctx.copied_config().batch_size());
    let table_path = ListingTableUrl::parse(table_path)?;

    let config = ListingTableConfig::new(table_path)
        .with_listing_options(options)
        .with_schema(schema);
    let table = ListingTable::try_new(config)?;
    ctx.register_table(
        TableReference::Bare {
            table: table_name.into(),
        },
        Arc::new(table),
    )?;
    Ok(())
}

async fn execute_query(ctx: SessionContext, sql: String) {
    ctx.sql(&sql).await.unwrap().collect().await.unwrap();
}

fn get_tpch_table_schema(table: &str) -> Schema {
    match table {
        "orders" => Schema::new(vec![
            Field::new("o_orderkey", DataType::Int64, false),
            Field::new("o_custkey", DataType::Int64, false),
            Field::new("o_orderstatus", DataType::Utf8, false),
            Field::new("o_totalprice", DataType::Decimal128(15, 2), false),
            Field::new("o_orderdate", DataType::Date32, false),
            Field::new("o_orderpriority", DataType::Utf8, false),
            Field::new("o_clerk", DataType::Utf8, false),
            Field::new("o_shippriority", DataType::Int32, false),
            Field::new("o_comment", DataType::Utf8, false),
        ]),

        "lineitem" => Schema::new(vec![
            Field::new("l_orderkey", DataType::Int64, false),
            Field::new("l_partkey", DataType::Int64, false),
            Field::new("l_suppkey", DataType::Int64, false),
            Field::new("l_linenumber", DataType::Int32, false),
            Field::new("l_quantity", DataType::Decimal128(15, 2), false),
            Field::new("l_extendedprice", DataType::Decimal128(15, 2), false),
            Field::new("l_discount", DataType::Decimal128(15, 2), false),
            Field::new("l_tax", DataType::Decimal128(15, 2), false),
            Field::new("l_returnflag", DataType::Utf8, false),
            Field::new("l_linestatus", DataType::Utf8, false),
            Field::new("l_shipdate", DataType::Date32, false),
            Field::new("l_commitdate", DataType::Date32, false),
            Field::new("l_receiptdate", DataType::Date32, false),
            Field::new("l_shipinstruct", DataType::Utf8, false),
            Field::new("l_shipmode", DataType::Utf8, false),
            Field::new("l_comment", DataType::Utf8, false),
        ]),
        _ => unimplemented!("Table: {}", table),
    }
}

pub fn get_tbl_tpch_table_schema(table: &str) -> Schema {
    let mut schema = SchemaBuilder::from(get_tpch_table_schema(table).fields);
    schema.push(Field::new("__placeholder", DataType::Utf8, true));
    schema.finish()
}

fn delete_file(path: &str) {
    std::fs::remove_file(path).expect("Failed to delete file");
}

fn from_elem_reading(c: &mut Criterion) {
    let batch_sizes = [100, 1000, 3000];
    let target_partitions = [1, 4];
    let worker_threads = [1, 4];
    let mut group = c.benchmark_group("parameter group");
    for batch_size in batch_sizes.iter() {
        for target_partition in target_partitions {
            for worker_thread in worker_threads {
                group.bench_with_input(BenchmarkId::new(
                    format!("sink_bs{}_tp{}", batch_size, target_partition),
                    worker_thread,
                ), &(batch_size, target_partition, worker_thread), |b, &(batch_size, target_partition, worker_thread)| {
                    let rt = Builder::new_multi_thread()
                        .worker_threads(worker_thread)
                        .build()
                        .unwrap();
                    b.to_async(rt).iter_batched(
                        || {
                            let csv_file = tempfile::Builder::new()
                                .prefix("foo")
                                .suffix(".csv")
                                .tempfile()
                                .unwrap();
                            let path = csv_file.path().to_str().unwrap().to_string();
                            let config = SessionConfig::new()
                                .with_coalesce_batches(false)
                                .with_batch_size(*batch_size)
                                .with_target_partitions(target_partition);
                            let ctx = SessionContext::new_with_config(config);
                            let lineitem = Arc::new(get_tbl_tpch_table_schema("lineitem"));
                            register_csv(&ctx, "lineitem", lineitem, "/path/to/tpch_sf0.1/lineitem.tbl").unwrap();
                            let orders = Arc::new(get_tbl_tpch_table_schema("orders"));
                            register_csv(&ctx, "orders", orders, "/path/to/tpch_sf0.1/orders.tbl").unwrap();
                            let sql = format!(
                                "COPY (SELECT * FROM
                                lineitem, orders
                                where
                                l_orderkey = o_orderkey)
                                to '{path}' (format csv);"
                            );
                            (path, ctx, sql)
                        },
                        |(path, clone_ctx, sql)| async move {
                            execute_query(clone_ctx, sql).await;
                            delete_file(&path);
                        },
                        BatchSize::LargeInput,
                    );
                });
            }
        }
    }
    group.finish();
}

criterion_group!(benches, from_elem_reading);
criterion_main!(benches);

Results

Batch Size Target Partitions Worker Thread main upstream/spawn-blocking-for-se Change
100 1 1 [1.3831 s 1.3996 s 1.4189 s] [592.50 ms 596.27 ms 600.22 ms] -57.396%
100 1 4 [685.99 ms 699.18 ms 718.43 ms] [598.63 ms 603.06 ms 607.75 ms] -13.748%
100 4 1 [1.7561 s 1.7670 s 1.7815 s] [1.0961 s 1.1026 s 1.1101 s] -37.600%
100 4 4 [608.23 ms 610.23 ms 612.30 ms] [554.09 ms 556.52 ms 559.10 ms] -8.8015%
1000 1 1 [1.2395 s 1.2435 s 1.2476 s] [454.51 ms 458.97 ms 463.63 ms] -63.090%
1000 1 4 [527.75 ms 529.60 ms 531.56 ms] [430.79 ms 433.86 ms 437.25 ms] -18.077%
1000 4 1 [1.3800 s 1.3892 s 1.3984 s] [533.50 ms 536.24 ms 539.02 ms] -61.400%
1000 4 4 [372.48 ms 374.06 ms 375.66 ms [299.15 ms 301.95 ms 305.48 ms] -19.278%
3000 1 1 [1.2785 s 1.2860 s 1.2934 s] [464.70 ms 473.55 ms 482.60 ms] -63.178%
3000 1 4 [518.10 ms 519.63 ms 521.23 ms] [417.72 ms 419.75 ms 422.10 ms] -19.221%
3000 4 1 [1.3351 s 1.3410 s 1.3471 s] [498.30 ms 501.91 ms 505.76 ms] -62.573%
3000 4 4 [351.96 ms 353.56 ms 355.25 ms] [274.52 ms 277.74 ms 281.95 ms] -21.443%

I try my best to make an objective benchmark, but I might miss something.

@ozankabak
Copy link
Contributor

Interesting results. @alamb, can you take a look at the measurement script? Let's make sure there are no methodological mistakes before arriving at any conclusions.

@alamb
Copy link
Contributor

alamb commented Jan 10, 2024

TLDR is that I don't think this benchmark keeps the number of cores constant

When I run the first benchmark run that is supposed to use 1 core:

Benchmarking parameter group/sink_bs100_tp1/1: Collecting 100 samples in estimated 105.75 s (100 iterations)
...

Main uses a single core as epxected
Screenshot 2024-01-10 at 3 47 57 PM

When I run on the synnada-ai:upstream/spawn-blocking-for-se branch, the first benchmark uses almost three cores:
Screenshot 2024-01-10 at 3 40 39 PM

What I think is happening is that the spawn_blocking call, as one might expect, runs the workload on a dedicated thread that is not controlled by the tokio runtime's worker thread count

                    let rt = Builder::new_multi_thread()
                        .worker_threads(worker_thread)
                        .build()
                        .unwrap();

The fact that the branch is using 3 threads and main can only use one I think accounts for the speed differences

Methodology

docker run -v "/Users/andrewlamb/Downloads/tpch_sf0.1":/data -it --rm ghcr.io/databloom-ai/tpch-docker:main -vf -s 0.1

I put the benchmark code on this branch https://github.com/alamb/arrow-datafusion/tree/alamb/write_threads_bench

And ran

cargo bench --bench write_threads

@ozankabak
Copy link
Contributor

ozankabak commented Jan 10, 2024

Good catch. Tokio offers a max_blocking_threads API, but I don't immediately see a way to use that to fix the benchmark. Apparently there used to be a max_threads API, which was limiting the total number of threads (async workers + blocking), which is closer to what we want in this context (but still not exactly what we want). What we really want is to run this in a core-constrained environment. Maybe there is a way to do that via docker-compose or something similar. I will consult with @metesynnada tomorrow to see if there is anything we can do to fix the methodology.

@metesynnada
Copy link
Contributor Author

I run the benchmarks in single-core and there is no substantial difference. Thus, I will convert this to spawn again.

@metesynnada metesynnada marked this pull request as ready for review January 11, 2024 10:35
@ozankabak ozankabak changed the title Make serialization spawn_blocking in async runtime Serialize function signature simplifications Jan 11, 2024
@ozankabak
Copy link
Contributor

Agreed. I changed the PR title and description to match what this does now -- simplifying function signatures only (no change to async vs. spawn_blocking behaviors). It looks good to me

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM -- thank you.

cc @devinjdangelo

@alamb alamb merged commit 1c49152 into apache:main Jan 12, 2024
24 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants