-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Push down InList or hash table references from HashJoinExec depending on the size of the build side #18393
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Leaving my review comments, will post benchmarks afterwards.
Although this PR is large I think there's a clear path to split it up into independent smaller PRs:
- Refactor
create_hashesto accept references (changes only tohash_utils.rs) - Refactor
InListExprto store arrays and support structs, re-usingcreate_hashes_from_arraysfrom (1) (changes only toin_list.rs). - Refactor the data structures used to track pushdown data in HashJoinExec (changes only to files in
datafusion/physical-plan/src/joins/hash_join/). - Introduce the
CASEstatement structure into the filter pushdown ofHashJoinExecand the repartition hash PhysicalExpr (changes only to files indatafusion/physical-plan/src/joins/hash_join/, addsHashExprindatafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs). - Add hash table pushdown (adds
HashTableLookupExprindatafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs), addsHashTabletoPushdownStrategy, addscreate_membership_predicate, etc.). - Add
InListExprpushdown (addsPushdownStrategy::InList, etc.)
There is also potential to remove the barrier expression by filtering out only from known partitions CASE ... ELSE true i.e. if we don't have information for all partitions only filter out data that we know won't match in our partition and then update the filter to ELSE false in the case where we have information from all partitions. This might be useful for the distributed case, not sure though.
I think we could somehow unify the hashing / inner data structures of the join hash table and the InList expression - they are very similar - to at least eliminate one round of hashing. I wonder if there's a version of an InList expression that avoids building a Vec<ScalarValue> altogether and instead just wraps an ArrayRef + metadata (data types etc) + an optional hash lookup. That would be quite versatile, we could essentially replace the join hash tables with that structure.
datafusion/common/src/hash_utils.rs
Outdated
| #[cfg(not(feature = "force_hash_collisions"))] | ||
| pub fn create_hashes<'a>( | ||
| arrays: &[ArrayRef], | ||
| pub fn create_hashes_from_arrays<'a>( | ||
| arrays: &[&dyn Array], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Recommend this for its own PR.
I think this is a nice refactor for this function, however I decided not to deprecate / replace it to avoid churn. If this were it's own PR I think it would be worth it to just make a new version and deprecate the old one, replacing all references in DataFusion (which I imagine is most users; even if this is pub I think it's mostly pub to be used in other crates within this repo). I did not investigate if this can help avoid clones in any other call sites.
| - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1 | ||
| - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 >= ab AND a@0 <= ab AND b@1 >= bb AND b@1 <= bb OR a@0 >= aa AND a@0 <= aa AND b@1 >= ba AND b@1 <= ba ] | ||
| - RepartitionExec: partitioning=Hash([a@0, b@1], 4), input_partitions=1 | ||
| - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ CASE hash_repartition % 4 WHEN 0 THEN a@0 >= aa AND a@0 <= aa AND b@1 >= ba AND b@1 <= ba AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:ba}]) WHEN 2 THEN a@0 >= ab AND a@0 <= ab AND b@1 >= bb AND b@1 <= bb AND struct(a@0, b@1) IN (SET) ([{c0:ab,c1:bb}]) ELSE false END ] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that this automatically excludes empty partitions and defaults to false, which works with inner joins. I'm not sure how we'd structure this for other join types.
| - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1 | ||
| - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb ] | ||
| - RepartitionExec: partitioning=Hash([a@0, b@1], 4), input_partitions=1 | ||
| - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:ba}, {c0:ab,c1:bb}]) ] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case there's only 1 partition with data (because of hash collisions) -> we optimize away the CASE expression. This is relevant because the same thing would happen with a point lookup primary key join.
| /// Specialized Set implementation for StructArray | ||
| struct StructArraySet { | ||
| array: Arc<StructArray>, | ||
| hash_set: ArrayHashSet, | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is a nice improvement / feature that can easily be it's own PR.
| let has_nulls = self.array.null_count() != 0; | ||
|
|
||
| // Compute hashes for all rows in the input array | ||
| let mut input_hashes = vec![0u64; v.len()]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we use a thread local to avoid repeated re-allocations here? I imagine that would work quite well.
| /// Create a new `JoinLeftData` from its parts | ||
| pub(super) fn new( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Clippy was complaining about too many arguments, a constructor was not necessary anyway
| let membership = if num_rows == 0 { | ||
| PushdownStrategy::Empty | ||
| } else { | ||
| // If the build side is small enough we can use IN list pushdown. | ||
| // If it's too big we fall back to pushing down a reference to the hash table. | ||
| // See `PushdownStrategy` for more details. | ||
| if let Some(in_list_values) = | ||
| build_struct_inlist_values(&left_values, max_inlist_size)? | ||
| { | ||
| PushdownStrategy::InList(in_list_values) | ||
| } else { | ||
| PushdownStrategy::HashTable(Arc::clone(&hash_map)) | ||
| } | ||
| }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did some remodeling of the data structures we use to track state. I think the new structure is much better - even if we didn't move forward with the rest of the PR.
| // Size check using built-in method | ||
| // This is not 1:1 with the actual size of ScalarValues, but it is a good approximation | ||
| // and at this point is basically "free" to compute since we have the arrays already. | ||
| let estimated_size = join_key_arrays | ||
| .iter() | ||
| .map(|arr| arr.get_array_memory_size()) | ||
| .sum::<usize>(); | ||
|
|
||
| if estimated_size > max_size_bytes { | ||
| return Ok(None); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: this is where we check the size to set the size limit.
| @@ -0,0 +1,292 @@ | |||
| // Licensed to the Apache Software Foundation (ASF) under one | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the part used to handle the cases where we push down the entire hash table.
| /// Build-side data reported by a single partition | ||
| pub(crate) enum PartitionBuildData { | ||
| Partitioned { | ||
| partition_id: usize, | ||
| pushdown: PushdownStrategy, | ||
| bounds: PartitionBounds, | ||
| }, | ||
| CollectLeft { | ||
| pushdown: PushdownStrategy, | ||
| bounds: PartitionBounds, | ||
| }, | ||
| } | ||
|
|
||
| /// Per-partition accumulated data (Partitioned mode) | ||
| #[derive(Clone)] | ||
| struct PartitionData { | ||
| bounds: PartitionBounds, | ||
| pushdown: PushdownStrategy, | ||
| } | ||
|
|
||
| /// Build-side data organized by partition mode | ||
| enum AccumulatedBuildData { | ||
| Partitioned { | ||
| partitions: Vec<Option<PartitionData>>, | ||
| }, | ||
| CollectLeft { | ||
| data: Option<PartitionData>, | ||
| }, | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the refactoring of the data structures we store to track state. It's now much cleaner, e.g. avoids comparing partitions by id.
|
Here's one interesting benchmark: COPY (SELECT uuid() as k, uuid() as v FROM generate_series(1, 5) t(i))
TO 'small_table_uuids.parquet'
OPTIONS (
'MAX_ROW_GROUP_SIZE' '50000',
'BLOOM_FILTER_ENABLED::k' 'true'
);
COPY (SELECT random()::text as v1, random()::text as v2, uuid() as k FROM generate_series(1, 100000000) t(i))
TO 'large_table_uuids.parquet'
OPTIONS (
'MAX_ROW_GROUP_SIZE' '50000',
'BLOOM_FILTER_ENABLED::k' 'true'
);
CREATE EXTERNAL TABLE small_table STORED AS PARQUET LOCATION 'small_table_uuids.parquet';
CREATE EXTERNAL TABLE large_table STORED AS PARQUET LOCATION 'large_table_uuids.parquet';
SET datafusion.execution.parquet.pushdown_filters = true;
SET datafusion.execution.parquet.reorder_filters = true;
SET datafusion.runtime.metadata_cache_limit = 0;
-- Join the two tables, with a filter on small_table
SELECT *
FROM small_table s JOIN large_table l ON s.k = l.k;This is similar to the query we benchmarked in our recent blog post but using UUIDs instead of Full benchmark script#!/usr/bin/env python3
"""
Benchmark script comparing DataFusion with/without inlist pushdown vs DuckDB.
Groups:
1. branch (no inlist): hash_join_inlist_pushdown_max_size = 0
2. branch (w/ inlist): hash_join_inlist_pushdown_max_size = default (999999)
3. main: using datafusion-cli-main
4. duckdb: using duckdb CLI
"""
import subprocess
import tempfile
import time
import os
import sys
from pathlib import Path
# Configuration
DATAFUSION_CLI = "./target/release/datafusion-cli"
DATAFUSION_CLI_MAIN = "./datafusion-cli-main"
DUCKDB_CLI = "duckdb"
NUM_RUNS = 5 # Number of times to run each benchmark
# Data generation settings
SMALL_TABLE_SIZE = 5
LARGE_TABLE_SIZE = 100_000_000
SMALL_TABLE_FILE = "small_table_uuids.parquet"
LARGE_TABLE_FILE = "large_table_uuids.parquet"
def run_command(cmd, input_sql=None, description=""):
"""Run a command and measure execution time."""
print(f" Running: {description}...", end=" ", flush=True)
start = time.time()
try:
if input_sql:
result = subprocess.run(
cmd,
input=input_sql,
capture_output=True,
text=True,
timeout=600 # 10 minute timeout
)
else:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=600
)
elapsed = time.time() - start
if result.returncode != 0:
print(f"FAILED (exit code {result.returncode})")
print(f" stderr: {result.stderr}")
return None
print(f"{elapsed:.3f}s")
return elapsed
except subprocess.TimeoutExpired:
print("TIMEOUT")
return None
except Exception as e:
print(f"ERROR: {e}")
return None
def create_data():
"""Create test data files if they don't exist."""
if os.path.exists(SMALL_TABLE_FILE) and os.path.exists(LARGE_TABLE_FILE):
print(f"Data files already exist, skipping creation.")
return True
print(f"Creating test data...")
data_gen_sql = f"""
COPY (SELECT uuid() as k, uuid() as v FROM generate_series(1, {SMALL_TABLE_SIZE}) t(i))
TO '{SMALL_TABLE_FILE}'
OPTIONS (
'MAX_ROW_GROUP_SIZE' '50000',
'BLOOM_FILTER_ENABLED::k' 'true'
);
COPY (SELECT random()::text as v1, random()::text as v2, uuid() as k FROM generate_series(1, {LARGE_TABLE_SIZE}) t(i))
TO '{LARGE_TABLE_FILE}'
OPTIONS (
'MAX_ROW_GROUP_SIZE' '50000',
'BLOOM_FILTER_ENABLED::k' 'true'
);
"""
result = subprocess.run(
[DATAFUSION_CLI],
input=data_gen_sql,
capture_output=True,
text=True
)
if result.returncode != 0:
print(f"Failed to create data: {result.stderr}")
return False
print(f"Data created successfully.")
return True
def create_datafusion_sql(inlist_size):
"""Create SQL for DataFusion with specified inlist pushdown size."""
return f"""
CREATE EXTERNAL TABLE small_table STORED AS PARQUET LOCATION '{SMALL_TABLE_FILE}';
CREATE EXTERNAL TABLE large_table STORED AS PARQUET LOCATION '{LARGE_TABLE_FILE}';
SET datafusion.execution.parquet.pushdown_filters = true;
SET datafusion.execution.parquet.reorder_filters = true;
SET datafusion.optimizer.hash_join_inlist_pushdown_max_size = {inlist_size};
SET datafusion.runtime.metadata_cache_limit = '0M';
SELECT *
FROM small_table s JOIN large_table l ON s.k = l.k;
"""
def create_duckdb_sql():
"""Create SQL for DuckDB."""
return f"""
SELECT *
FROM '{SMALL_TABLE_FILE}' s JOIN '{LARGE_TABLE_FILE}' l ON s.k = l.k;
"""
def run_benchmark_group(name, cmd, sql_content, num_runs=NUM_RUNS):
"""Run a benchmark group multiple times and collect results."""
print(f"\n{name}:")
times = []
for i in range(num_runs):
elapsed = run_command(cmd, input_sql=sql_content, description=f"Run {i+1}/{num_runs}")
if elapsed is not None:
times.append(elapsed)
if times:
avg = sum(times) / len(times)
min_time = min(times)
max_time = max(times)
print(f" Results: min={min_time:.3f}s, avg={avg:.3f}s, max={max_time:.3f}s")
return times
else:
print(f" No successful runs")
return []
def main():
print("=" * 60)
print("DataFusion Inlist Pushdown Benchmark")
print("=" * 60)
# Verify executables exist
if not os.path.exists(DATAFUSION_CLI):
print(f"Error: {DATAFUSION_CLI} not found")
sys.exit(1)
if not os.path.exists(DATAFUSION_CLI_MAIN):
print(f"Error: {DATAFUSION_CLI_MAIN} not found")
sys.exit(1)
try:
subprocess.run([DUCKDB_CLI, "--version"], capture_output=True, check=True)
except (subprocess.CalledProcessError, FileNotFoundError):
print(f"Error: duckdb CLI not found or not working")
sys.exit(1)
# Create data
if not create_data():
sys.exit(1)
# Run benchmarks
results = {}
# 1. Branch without inlist pushdown
results["branch_no_inlist"] = run_benchmark_group(
"Branch (no inlist, size=0)",
[DATAFUSION_CLI],
create_datafusion_sql(0)
)
# 2. Branch with inlist pushdown
results["branch_with_inlist"] = run_benchmark_group(
"Branch (w/ inlist, size=999999)",
[DATAFUSION_CLI],
create_datafusion_sql(999999)
)
# 3. Main branch
results["main"] = run_benchmark_group(
"Main branch",
[DATAFUSION_CLI_MAIN],
create_datafusion_sql(999999)
)
# 4. DuckDB
results["duckdb"] = run_benchmark_group(
"DuckDB",
[DUCKDB_CLI],
create_duckdb_sql()
)
# Summary
print("\n" + "=" * 60)
print("SUMMARY")
print("=" * 60)
for name, times in results.items():
if times:
avg = sum(times) / len(times)
print(f"{name:25s}: {avg:.3f}s avg over {len(times)} runs")
else:
print(f"{name:25s}: No successful runs")
print("\nAll times (seconds):")
for name, times in results.items():
if times:
times_str = ", ".join(f"{t:.3f}" for t in times)
print(f" {name}: [{times_str}]")
if __name__ == "__main__":
main() |
|
|
Surprised that duckdb takes so long, I was thinking it also can push down a inlist for hash join |
As far as I know they only do min/max stats: https://duckdb.org/2024/09/09/announcing-duckdb-110#dynamic-filter-pushdown-from-joins |
|
Here are some new numbers after a couple more optimizations to InListExpr: I'd like to clarify why the InListExpr makes such a difference:
I think it's mainly that latter optimization that provides the win. And TPCH: |
|
Marking as draft again. Looking at the changes while they're mostly in the right spirit there's a lot of details that need more manual care. I'll do that before marking as ready for review. |
|
I've done some cleanup, the first three PRs are ready for review:
That will be most of the changes, the only two follwup PRs will be to push down the hash table references and InListExpr |
## Background This PR is part of an EPIC to push down hash table references from HashJoinExec into scans. The EPIC is tracked in apache#17171. A "target state" is tracked in apache#18393. There is a series of PRs to get us to this target state in smaller more reviewable changes that are still valuable on their own: - (This PR): apache#18448 - apache#18449 (depends on apache#18448) - apache#18451 ## Changes in this PR Change create_hashes and related functions to work with &dyn Array references instead of requiring ArrayRef (Arc-wrapped arrays). This avoids unnecessary Arc::clone() calls and enables calls that only have an &dyn Array to use the hashing utilities. - Add create_hashes_from_arrays(&[&dyn Array]) function - Refactor hash_dictionary, hash_list_array, hash_fixed_list_array to use references instead of cloning - Extract hash_single_array() helper for common logic --------- Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
## Background This PR is part of an EPIC to push down hash table references from HashJoinExec into scans. The EPIC is tracked in apache#17171. A "target state" is tracked in apache#18393. There is a series of PRs to get us to this target state in smaller more reviewable changes that are still valuable on their own: - (This PR): apache#18448 - apache#18449 (depends on apache#18448) - apache#18451 ## Changes in this PR Change create_hashes and related functions to work with &dyn Array references instead of requiring ArrayRef (Arc-wrapped arrays). This avoids unnecessary Arc::clone() calls and enables calls that only have an &dyn Array to use the hashing utilities. - Add create_hashes_from_arrays(&[&dyn Array]) function - Refactor hash_dictionary, hash_list_array, hash_fixed_list_array to use references instead of cloning - Extract hash_single_array() helper for common logic --------- Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
…nfrastructure (#18449) ## Background This PR is part of an EPIC to push down hash table references from HashJoinExec into scans. The EPIC is tracked in #17171. A "target state" is tracked in #18393. There is a series of PRs to get us to this target state in smaller more reviewable changes that are still valuable on their own: - #18448 - (This PR): #18449 (depends on #18448) - #18451 ## Changes in this PR - Enhance InListExpr to efficiently store homogeneous lists as arrays and avoid a conversion to Vec<PhysicalExpr> by adding an internal InListStorage enum with Array and Exprs variants - Re-use existing hashing and comparison utilities to support Struct arrays and other complex types - Add public function `in_list_from_array(expr, list_array, negated)` for creating InList from arrays Although the diff looks large most of it is actually tests and docs. I think the actual code change is a negative LOC change, or at least negative complexity (eliminates a trait, a macro, matching on data types). --------- Co-authored-by: David Hewitt <mail@davidhewitt.dev> Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
d198ab3 to
fe5dfec
Compare
|
This is getting close! The main blocker is a review on #18451 which will bring the size of this PR down to only a couple hundred LOC |
…for more precise filters (#18451) ## Background This PR is part of an EPIC to push down hash table references from HashJoinExec into scans. The EPIC is tracked in #17171. A "target state" is tracked in #18393. There is a series of PRs to get us to this target state in smaller more reviewable changes that are still valuable on their own: - #18448 - #18449 (depends on #18448) - (This PR): #18451 ## Changes in this PR This PR refactors state management in HashJoinExec to make filter pushdown more efficient and prepare for pushing down membership tests. - Refactor internal data structures to clean up state management and make usage more idiomatic (use `Option` instead of comparing integers, etc.) - Uses CASE expressions to evaluate pushed-down filters selectively by partition Example: `CASE hash_repartition % N WHEN partition_id THEN condition ELSE false END` --------- Co-authored-by: Lía Adriana <lia.castaneda@datadoghq.com>
9299d0c to
3f067b9
Compare
|
I think this is now ready for review. @rkrishn7, @LiaCastaneda or @Dandandan would one of you like to review? @alamb could you kick off some benchmarks please? |
|
I can take a look later today 👍 |
| /// Note that this will not deduplicate values, that will happen later when building an InList expression from this array. | ||
| /// | ||
| /// Returns `None` if the estimated size exceeds `max_size_bytes`. | ||
| /// Performs deduplication to ensure unique values only. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤔 I think it doesn't dedup here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was a bad comment, updated
| /// The default is 128kB per partition. | ||
| /// This should allow point lookup joins (e.g. joining on a unique primary key) to use InList pushdown in most cases | ||
| /// but avoids excessive memory usage or overhead for larger joins. | ||
| pub hash_join_inlist_pushdown_max_size: usize, default = 128 * 1024 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe a future improvement could be to also expose an option to limit the number of distinct values that can be inside an IN LIST. For instance, we could end up with a very large list like x IN (1, 2, 3, ..., 1000000) that fits in 128KB but is still inefficient because we'd be duplicating values and performance might decrease.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I got this idea from trino:
https://trino.io/docs/current/admin/dynamic-filtering.html#dynamic-filter-collection-thresholds
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For instance, we could end up with a very large list like x IN (1, 2, 3, ..., 1000000) that fits in 128KB but is still inefficient because we'd be duplicating values and performance might decrease
Could you elaborate? In my mind a large InList is not any more or less efficient than pushing down the hash table itself, but if it's big it looses access to the bloom filter pushdown optimization so it's probably not faster than the hash table itself. That said there are still reasons to push it down instead, namely that custom execution nodes that downcast match on a PhysicalExpr can recognize it.
So the idea with the 128kB is to balance how much CPU we burn upfront building the filter. But I agree it could be in terms of rows as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a config
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So if theIN LISTis very large, it loses the pruning advantage on the probe side - like you say, bloom filters becoming ineffective with that many values, so once we lose that, we might as well use the hash table instead and avoid having to copy the data from the IN LIST
Not sure if I'm missing anything here, anyways this is just a thought and it would be better to check this with the benchmarks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah that's the idea. As the build side gets larger:
- It becomes more expensive to build the InListExpr (I think we can make it cheaper but it will probably always be more expensive than copying a reference)
- It's less likely optimizations like bloom filters will help. In fact, bloom filters will only be hit with < 20 items (this is set deep in the PruningPredicate code)
So at some point it makes sense to cut the losses and go through each row with the hash table.
The InListExpr approach is going to shine when there is a point lookup type query (i.e. one row from the build side) that can hit a bloom filter on the probe side.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Naive question: is the size (in bits) of the bloom filter tunable at the moment?
In principle you could use NDV to tune its size at build time, and lift the limitation on the number of elements (within a reasonable limit, of course).
I think the PR is already very useful as is, this could be tackled in a follow-up PR if the point raised makes sense (possibly another addition to the list of places where NDV can help, as tracked in #18628)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤔 As a parquet write propery, I don't think it can be tunned, you can control some write properties like NDV or FPP which in theory help control the size of the bloom filter as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I think there may be a bit of confusion:
- The bloom filters I am referring to here are written into the parquet file like Lia said.
- The thing we are pushing down is an expression like
col IN (1, 2, 3)which inside of pruning logic gets converted tocol = 1 or col = 2 or col = 3as long as there are <20 elements in the list, and thencol = 1gets evaluated against any bloom filters oncol, thencol = 2, etc. for each row group.
| // Combine membership and bounds expressions | ||
| let filter_expr = match (membership_expr, bounds_expr) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I probably missed the explanation somewhere in previous threads, but is there a special benefit of pushing both bounds and IN LIST filters into the consumer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think so because:
- You have to calculate the bound anyway in case you need to fall back to that.
- Downstream operators may be able to do things with bounds that they can't with InListExpr (e.g. stats pruning).
- The bound are going to be cheaper to evaluate and thus may short circuit the InListExpr if they are false.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah, makes sense!
| as Arc<dyn PhysicalExpr> | ||
| } | ||
| (Some(membership), None) => membership, | ||
| (None, Some(bounds)) => bounds, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is (None, Some(bounds)) and (Some(membership), None) actually reachable? If we have no data, we shouldn't have any bounds either, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I added a note explaining that it's easier to handle it defensively so might as well (as opposed to unreacheable!)
| // Optimize for single partition: skip CASE expression entirely | ||
| let filter_expr = if when_then_branches.is_empty() { | ||
| // All partitions are empty: no rows can match | ||
| lit(false) | ||
| } else if when_then_branches.len() == 1 { | ||
| // Single partition: just use the condition directly | ||
| // since hash % 1 == 0 always, the WHEN 0 branch will always match | ||
| Arc::clone(&when_then_branches[0].1) | ||
| } else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This makes sense, so we avoid calling create_hashes for every single row on the probe side if it's going to end up landing on the same branch. Can we add a comment to the tests that changed in datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs about this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code comments or review comments? I added a review comment for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as a code comment, before looking into this file I was looking at the diff in test_dynamic_filter_pushdown_through_hash_join_with_topk and wondered why there’s no CASE struct if it’s a partitioned join. Super nit -- just mentioning it for clarity
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay will add!
| - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, d@0)] | ||
| - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true | ||
| - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ CASE hash_repartition % 1 WHEN 0 THEN d@0 >= aa AND d@0 <= ab ELSE false END ] AND DynamicFilter [ e@1 IS NULL OR e@1 < bb ] | ||
| - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ d@0 >= aa AND d@0 <= ab AND d@0 IN (SET) ([aa, ab]) ] AND DynamicFilter [ e@1 IS NULL OR e@1 < bb ] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that we dropped the CASE expression here because we now optimize that away if there's only 1 partition
| // See `PushdownStrategy` for more details. | ||
| let estimated_size = left_values | ||
| .iter() | ||
| .map(|arr| arr.get_array_memory_size()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we have a query like SELECT * FROM t1 JOIN t2 ON t1.a = t2.x AND t1.a = t2.y, left_values would have t1.a twice (same ArrayRef). Since both are references to the same underlying data, estimated_size would double count the memory. However, I guess this overaccounting is acceptable because we are estimating CPU cost?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense. But maybe it's still okay since we would end up duplicating the values in the InListExpr? Either way like you say I think it's not a big deal, it's just a ballpark estimate...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, since estimated_size is used to estimate CPU time spent building the filter (rather than actual memory), it makes sense to 'double account' because in theory its ~double the CPU work for building the filter I guess
|
@adriangb I think you will have to update the branch to include |
LiaCastaneda
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This lgtm! It would be interesting to know the benchmark results
8f8856d to
dff5b80
Compare
…nfrastructure (apache#18449) ## Background This PR is part of an EPIC to push down hash table references from HashJoinExec into scans. The EPIC is tracked in apache#17171. A "target state" is tracked in apache#18393. There is a series of PRs to get us to this target state in smaller more reviewable changes that are still valuable on their own: - apache#18448 - (This PR): apache#18449 (depends on apache#18448) - apache#18451 ## Changes in this PR - Enhance InListExpr to efficiently store homogeneous lists as arrays and avoid a conversion to Vec<PhysicalExpr> by adding an internal InListStorage enum with Array and Exprs variants - Re-use existing hashing and comparison utilities to support Struct arrays and other complex types - Add public function `in_list_from_array(expr, list_array, negated)` for creating InList from arrays Although the diff looks large most of it is actually tests and docs. I think the actual code change is a negative LOC change, or at least negative complexity (eliminates a trait, a macro, matching on data types). --------- Co-authored-by: David Hewitt <mail@davidhewitt.dev> Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
…for more precise filters (apache#18451) ## Background This PR is part of an EPIC to push down hash table references from HashJoinExec into scans. The EPIC is tracked in apache#17171. A "target state" is tracked in apache#18393. There is a series of PRs to get us to this target state in smaller more reviewable changes that are still valuable on their own: - apache#18448 - apache#18449 (depends on apache#18448) - (This PR): apache#18451 ## Changes in this PR This PR refactors state management in HashJoinExec to make filter pushdown more efficient and prepare for pushing down membership tests. - Refactor internal data structures to clean up state management and make usage more idiomatic (use `Option` instead of comparing integers, etc.) - Uses CASE expressions to evaluate pushed-down filters selectively by partition Example: `CASE hash_repartition % N WHEN partition_id THEN condition ELSE false END` --------- Co-authored-by: Lía Adriana <lia.castaneda@datadoghq.com>
… on the size of the build side
1c53209 to
f8bc4ed
Compare
|
@gabotechs, @2010YOUY01 or @comphead would you be willing to review this? |
|
Hi @adriangb thanks for the PR numbers looks good, I'll check it out this week! |
|
Im planning to introduce TPCDS benchmarks in addition to TPCH and see how this PR performs |
This PR is part of an EPIC to push down hash table references from HashJoinExec into scans. The EPIC is tracked in #17171.
A "target state" is tracked in #18393 (this PR).
There is a series of PRs to get us to this target state in smaller more reviewable changes that are still valuable on their own:
HashJoinExecand use CASE expressions for more precise filters #18451As those are merged I will rebase this PR to keep track of the "remaining work", and we can use this PR to explore big picture ideas or benchmarks of the final state.