Conversation
fb3cef7 to
695c3ac
Compare
| #[tokio::test] | ||
| async fn test_file_scan_config_task_estimator_max_workers() -> Result<(), DataFusionError> { | ||
| let mut combined = CombinedTaskEstimator::default(); | ||
| combined.push(|_: &Arc<dyn ExecutionPlan>, _: &ConfigOptions| None); | ||
|
|
||
| let node = make_data_source_exec().await?; | ||
| assert_eq!( | ||
| combined.task_count(node, |mut cfg| { | ||
| cfg.__private_channel_resolver = | ||
| ChannelResolverExtension(Arc::new(InMemoryChannelResolver::new(2))); | ||
| cfg | ||
| }), | ||
| 2 | ||
| ); | ||
| Ok(()) | ||
| } |
There was a problem hiding this comment.
Limiting by max_workers is now done automatically in the distributed planner. No need for tasks estimators for handle it themselves
9a8e5cb to
8c38e5c
Compare
e890fa5 to
9d451dd
Compare
9d451dd to
27de8f5
Compare
91885af to
7561af4
Compare
jayshrivastava
left a comment
There was a problem hiding this comment.
Flushing comments. Still need to look at the updated tpch plan tests
NGA-TRAN
left a comment
There was a problem hiding this comment.
The tests clearly tell me you are doing the right things. Great work @gabotechs
I assume you change the number of file groups in the tests on purpose.
| │ PartitionIsolatorExec: t0:[p0,p1,__,__,__,__] t1:[__,__,p0,p1,__,__] t2:[__,__,__,__,p0,__] t3:[__,__,__,__,__,p0] , metrics=[] | ||
| │ DataSourceExec: file_groups={6 groups: [[/testdata/tpch/explain_analyze_sf0.1/lineitem/1.parquet:<int>..<int>, /testdata/tpch/explain_analyze_sf0.1/lineitem/10.parquet:<int>..<int>, /testdata/tpch/explain_analyze_sf0.1/lineitem/11.parquet:<int>..<int>], [/testdata/tpch/explain_analyze_sf0.1/lineitem/11.parquet:<int>..<int>, /testdata/tpch/explain_analyze_sf0.1/lineitem/12.parquet:<int>..<int>, /testdata/tpch/explain_analyze_sf0.1/lineitem/13.parquet:<int>..<int>, /testdata/tpch/explain_analyze_sf0.1/lineitem/14.parquet:<int>..<int>], [/testdata/tpch/explain_analyze_sf0.1/lineitem/14.parquet:<int>..<int>, /testdata/tpch/explain_analyze_sf0.1/lineitem/15.parquet:<int>..<int>, /testdata/tpch/explain_analyze_sf0.1/lineitem/16.parquet:<int>..<int>], [/testdata/tpch/explain_analyze_sf0.1/lineitem/16.parquet:<int>..<int>, /testdata/tpch/explain_analyze_sf0.1/lineitem/2.parquet:<int>..<int>, /testdata/tpch/explain_analyze_sf0.1/lineitem/3.parquet:<int>..<int>, /testdata/tpch/explain_analyze_sf0.1/lineitem/4.parquet:<int>..<int>], [/testdata/tpch/explain_analyze_sf0.1/lineitem/4.parquet:<int>..<int>, /testdata/tpch/explain_analyze_sf0.1/lineitem/5.parquet:<int>..<int>, /testdata/tpch/explain_analyze_sf0.1/lineitem/6.parquet:<int>..<int>, /testdata/tpch/explain_analyze_sf0.1/lineitem/7.parquet:<int>..<int>], ...]}, projection=[l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate], file_type=parquet, predicate=l_shipdate@6 <= 1998-09-02, pruning_predicate=l_shipdate_null_count@1 != row_count@2 AND l_shipdate_min@0 <= 1998-09-02, required_guarantees=[], metrics=[output_rows=600572, elapsed_compute=<metric>, batches_split=<metric>, bytes_scanned=<metric>, file_open_errors=<metric>, file_scan_errors=<metric>, num_predicate_creation_errors=<metric>, predicate_cache_inner_records=0, predicate_cache_records=0, predicate_evaluation_errors=<metric>, pushdown_rows_matched=<metric>, pushdown_rows_pruned=<metric>, bloom_filter_eval_time=<metric>, metadata_load_time=<metric>, page_index_eval_time=<metric>, row_pushdown_eval_time=<metric>, statistics_eval_time=<metric>, time_elapsed_opening=<metric>, time_elapsed_processing=<metric>, time_elapsed_scanning_total=<metric>, time_elapsed_scanning_until_data=<metric>] | ||
| │ PartitionIsolatorExec: t0:[p0,p1,p2,p3,p4,p5,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__] t1:[__,__,__,__,__,__,p0,p1,p2,p3,p4,__,__,__,__,__,__,__,__,__,__] t2:[__,__,__,__,__,__,__,__,__,__,__,p0,p1,p2,p3,p4,__,__,__,__,__] t3:[__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,p0,p1,p2,p3,p4] , metrics=[] | ||
| │ DataSourceExec: file_groups={21 groups: [[/testdata/tpch/explain_analyze_sf0.1/lineitem/1.parquet:<int>..<int>], [/testdata/tpch/explain_analyze_sf0.1/lineitem/10.parquet:<int>..<int>], [/testdata/tpch/explain_analyze_sf0.1/lineitem/11.parquet:<int>..<int>], [/testdata/tpch/explain_analyze_sf0.1/lineitem/11.parquet:<int>..<int>], [/testdata/tpch/explain_analyze_sf0.1/lineitem/12.parquet:<int>..<int>], ...]}, projection=[l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate], file_type=parquet, predicate=l_shipdate@6 <= 1998-09-02, pruning_predicate=l_shipdate_null_count@1 != row_count@2 AND l_shipdate_min@0 <= 1998-09-02, required_guarantees=[], metrics=[output_rows=600572, elapsed_compute=<metric>, batches_split=<metric>, bytes_scanned=<metric>, file_open_errors=<metric>, file_scan_errors=<metric>, num_predicate_creation_errors=<metric>, predicate_cache_inner_records=0, predicate_cache_records=0, predicate_evaluation_errors=<metric>, pushdown_rows_matched=<metric>, pushdown_rows_pruned=<metric>, bloom_filter_eval_time=<metric>, metadata_load_time=<metric>, page_index_eval_time=<metric>, row_pushdown_eval_time=<metric>, statistics_eval_time=<metric>, time_elapsed_opening=<metric>, time_elapsed_processing=<metric>, time_elapsed_scanning_total=<metric>, time_elapsed_scanning_until_data=<metric>] |
There was a problem hiding this comment.
21 groups on purpose? Don't see any configuration/setting changes
There was a problem hiding this comment.
I have the same question. The output rows on the aggregate exec went up to 64....
Our correctness tests didn't fail though 🤔
There was a problem hiding this comment.
I just debugged this query specifically, and the reason why 21 files are chosen is:
- originally, the plan contained 6 file groups, with the following distribution of partitioned files: [3,4,3,4,4,3]
- now, we attempt to split it as much as possible for distributing work, and we end up with 21 file groups, with 1 file each
There was a problem hiding this comment.
About the number of output_row going from 24 to 64, I'm not 100% sure why that happen, but should not make it incorrect. The partial aggregation now outputs more rows, maybe because it's partially aggregating more smaller batches?
There was a problem hiding this comment.
Sure I think that's fine. It makes senes for the partial aggregation to output more rows since there's more partitions.
|
Just merged #263 into this one. It mainly just adds some unit tests to the plan annotator logic. |
jayshrivastava
left a comment
There was a problem hiding this comment.
LGTM. The annotation tests you added are great. Made it easier to review for sure.
This is a full rewrite of the distributed planner that aims for two things:
How it worked before
This is how the distributed planner worked before:
TaskEstimators, scaling them up in parallelismThe order in which things happened is problematic, as it can lead to several non-desirable situations:
TaskEstimatormight decide that a leaf node needs to run in 100 tasks, and it will scale it up accordingly. Then for whatever reason, it's decided that a stage containing that leaf node decides to just use 1 task (e.g., it contains a CollectLeft HashJoin). Then we end up with a x100 scaled leaf node running in just 1 task.TaskEstimatormight decide that a leaf node needs to run in 100 tasks, and the plan has 3 stages, so the planner decides to use something like 100 tasks -> 90 tasks -> 80 tasks. Then the "Pending" to "Ready" transition happens, and it's decided that the middle stage is only going to run in 1 task, so we are left with 100 tasks -> 1 task -> 80 tasks. The middle stage is going to heavily bottleneck the query, so there's no point in using 80 tasks for the head stage.How it works now
The distribution step is simplified, and just two things happen:
This allows us to remove the "Pending" and "Ready" states of the network boundaries, as once they are place everything is set in stone, greatly simplifying the code, and reducing the runtime checks for the nodes to be ready.
Results
We now perform a better task assignation and we better scale up leaf nodes. If there's a stage that can only run in one task and is going to bottleneck the query, there's no point in distributing stages above it, as it's just going to introduce latency because of sending data over the wire without benefiting from all the available compute in the machines. We now perform a smarter scale up of the leaf nodes, making sure that they scale up to the appropriate size based on the final task count assign to the stage containing them:
Comparison against
mainwith TPCH SF1 in a local machineComparison against
mainwith TPCH SF1 in a remote cluster of 4 EC2 machinesComparison against
mainwith TPCH SF10 in a remote cluster of 4 EC2 machinesComparison against Trino with TPCH SF10 in a remote cluster of 4 EC2 machines