Skip to content

Push Dynamic Join Predicates into Scan ("Sideways Information Passing", etc) #7955

Open
@alamb

Description

@alamb

Is your feature request related to a problem or challenge?

If we want to make DataFusion the engine of choice for fast OLAP processing, eventually we will need to make joins faster. In addition to making sure the join order is not disastrous (e.g. #7949) we can consider other advanced OLAP techniques improve joins (especially queries with multiple joins)

Describe the solution you'd like

I would like to propose we look into pushing "join predicate" into scans (which I know of as "sideways information passing")

As an example, consider the joins from TPCH Q17

select
sum(l_extendedprice) / 7.0 as avg_yearly from
part, lineitem
where
  p_partkey = l_partkey
  and p_brand = 'Brand#23'
  and p_container = 'MED BOX'
  and l_quantity < (	select	0.2 * avg(l_quantity)	from	lineitem where	l_partkey = p_partkey	);

The first join (should) look like this. The observation is there are no predicates on the lineitem table (the big one), which means all the filtering happens in the join, which is bad because the scan can't do optimizations like "late materialization" and instead must decode all 60M values of selected columns, even though very few (2044!) are actually used

                          │                                                         
                          │                                                         
           2044 Rows      │                                                         
                          │                                                         
                          ▼                                                         
                 ┌────────────────┐                                                 
                 │    HashJoin    │                                                 
                 │   p_partkey =  │                                                 
                 │   l_partkey    │                                                 
                 └──┬─────────┬───┘                     This scan decodes 60M values
   2M Rows          │         │             60M Rows         of l_quantity and      
           ┌────────┘         └─────────┐               l_extendedprice, even though
           │                            │               all but 2044 are filtered by
           ▼                            ▼                         the join          
 ┌──────────────────┐        ┌─────────────────────┐                                
 │Scan: part        │        │Scan: lineitem       │                  │             
 │projection:       │        │projection:          │                                
 │  p_partkey       │        │  l_quantity,        │                  │             
 │filters:          │        │  l_extendedprice,   │◀─ ─ ─ ─ ─ ─ ─ ─ ─              
 │  p_brand = ..    │        │  l_partkey          │                                
 │  p_container = ..│        │filters:             │                                
 │                  │        │  NONE               │                                
 └──────────────────┘        └─────────────────────┘                                

The idea is to push the predicate into the join, by making something that acts like l_partkey IN (...) that can be applied during the scan


                               1. The HashJoin completely reads the build                        
                               side before starting the probe side.                              
                                                                                                 
                               Thus, all 2M known matching values of                             
                         │     l_partkey are in a hash table prior to                            
                         │     scanning lineitem                                                 
          2044 Rows      │                                                                       
                         │                           │                                           
                         ▼                                                                       
                ┌────────────────┐                   │                                           
                │    HashJoin    │                                                               
                │   p_partkey =  │◀─ ─ ─ ─ ─ ─ ─ ─ ─ ┘                                           
                │   l_partkey    │                                                               
                └──┬─────────┬───┘                                                               
                   │         │             60M Rows                                              
          ┌────────┘         └────────────┐                  The idea is to introduce a filter   
          │                               │                  that is effectively "l_partkey IN   
          ▼                               ▼                  (HASH TABLE)" or something similar  
┌──────────────────┐        ┌──────────────────────────┐     that is applied during the scan     
│Scan: part        │        │Scan: lineitem            │┌ ─ ─                                    
│projection:       │        │projection:               │     If the scan can avoid decoding      
│  p_partkey       │        │  l_quantity,             ││    l_quantity and l_extended that do   
│filters:          │        │  l_extendedprice,        │     not match, there is significant     
│  p_brand = ..    │        │  l_partkey               ││    savings                             
│  p_container = ..│        │filters:                  │                                         
│                  │        │  l_partkey IN (....)   ◀─│┘                                        
└──────────────────┘        └──────────────────────────┘                                         

In a query with a single selective join (that filters many values) the savings is likely minimal as it depends on how much work can be saved in materialization (decoding). The only scan that does late materialization in DataFusion at the time of writing is the ParquetExec

However, in a query with multiple selective joins the savings becomes much more pronounced, because we can save the effort of creating intermediate join outputs which are filtered out by joins later in the plan

For example:

    Pass down in multiple joins                                                                 
                                                                                                
 While this doesn't happen in TPCH                                                              
Q17 (the subquery has no predicates)                                                            
 the SIPS approach can be even more                                                             
 effective with multiple selective                                                              
               joins                  │                                                         
                                      │                                                         
                                      │             Filters on both join keys can be applied    
                                      │             at this level, which can be even more       
                                      ▼             effective as it avoids the work to create   
                             ┌────────────────┐     the intermediate output of HashJoin(2)   ─ ┐
                             │  HashJoin (1)  │     which is then filtered by HashJoin(1)       
                             │     d1.key =   │                                                │
                             │    f.d1_key    │                                                 
                             └──┬─────────┬───┘                                                │
                                │         │                                                     
                     ┌──────────┘         └────────────┐                                       │
                     │                                 │                                        
                     ▼                                 ▼                                       │
           ┌──────────────────┐               ┌────────────────┐                                
           │Scan: D1          │               │  HashJoin (2)  │                               │
           │filters:          │               │     d2.key =   │                                
           │  ...             │               │    f.d2_key    │                               │
           └──────────────────┘               └───┬─────────┬──┘                                
                                                  │         │                                  │
                                      ┌───────────┘         └─────────────┐                     
                                      │                                   │                    │
                                      ▼                                   ▼                     
                             ┌────────────────┐                ┌─────────────────────┐         │
                             │Scan: D2        │                │Scan: F              │          
                             │filters:        │                │filters:             │         │
                             │  ...           │                │  f.d1_key IN (...)  │◀ ─ ─ ─ ─ 
                             └────────────────┘                │  f.d2_key IN (...)  │          
                                                               │                     │          
                                                               └─────────────────────┘          

Describe alternatives you've considered

Some version of this technique is described in "Bloom Filter Joins" in Spark: https://issues.apache.org/jira/browse/SPARK-32268

Building a seprate Bloom Filter has the nice property that you can distribute them in a networked cluster, however, the overhead of creating the bloom filter would likely be non trivial

Additional context

See a description of how DataFusion HashJoins work here: #7953

Here is an industrial paper that describes industrial experience with using SIPS techniques here: https://15721.courses.cs.cmu.edu/spring2020/papers/13-execution/shrinivas-icde2013.pdf

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions