Skip to content

take kernel that works across multiple RecordBatches #1523

@alamb

Description

@alamb

Is your feature request related to a problem or challenge? Please describe what you are trying to do.
For several operations in data processing, it is important to be able to select some subset (for sorting or filtering)

For example, the current take kernel works like this:

┌─────────────────┐      ┌─────────┐                              ┌─────────────────┐
│        A        │      │    0    │                              │        A        │
├─────────────────┤      ├─────────┤                              ├─────────────────┤
│        D        │      │    2    │                              │        B        │
├─────────────────┤      ├─────────┤   take(values, indicies)     ├─────────────────┤
│        B        │      │    3    │ ─────────────────────────▶   │        C        │
├─────────────────┤      ├─────────┤                              ├─────────────────┤
│        C        │      │    1    │                              │        D        │
├─────────────────┤      └─────────┘                              └─────────────────┘
│        E        │                                                                  
└─────────────────┘                                                                  
   values array            indicies array                              result        
                                                                                     

In DataFusion, our operators get multiple record batches at a time, and we would like to do stuff like sort them without first combining into a single record batch. For example:

┌─────────────────┐                                                        
│        A        │                                                        
├─────────────────┤                                                        
│        D        │                                     ┌─────────────────┐
└─────────────────┘                                     │        A        │
  values array 0                                        ├─────────────────┤
                                                        │        B        │
                                     ?                  ├─────────────────┤
                                                        │        C        │
┌─────────────────┐     ─────────────────────────▶      ├─────────────────┤
│        B        │                                     │        D        │
├─────────────────┤                                     └─────────────────┘
│        C        │                                                        
├─────────────────┤                                                        
│        E        │                                      desired result    
└─────────────────┘                                                        
  values array 1                                                           

Describe the solution you'd like

I would like a function something like batch_take that takes a vector of RecordBatches and a list of (record_batch_index, offset_in_the_record_batch) tuples and produces the resulting array, like:

┌─────────────────┐      ┌─────────┐                                  ┌─────────────────┐
│        A        │      │ (0, 0)  │        batch_take(               │        A        │
├─────────────────┤      ├─────────┤          [values0, values1],     ├─────────────────┤
│        D        │      │ (1, 0)  │          batch_indicies          │        B        │
└─────────────────┘      ├─────────┤        )                         ├─────────────────┤
  values array 0         │ (1, 1)  │      ─────────────────────────▶  │        C        │
                         ├─────────┤                                  ├─────────────────┤
                         │ (1, 0)  │                                  │        D        │
                         └─────────┘                                  └─────────────────┘
┌─────────────────┐                                                                      
│        B        │                                                                      
├─────────────────┤   batch_indicies                                       result        
│        C        │        array                                                         
├─────────────────┤                                                                      
│        E        │                                                                      
└─────────────────┘                                                                      
  values array 1                                                                                                                                                                                                                    

Overtime I would expect these to become optimized in the same way as we have optimized the take kernel

This will come up in Grouping and Join operators as well.

Describe alternatives you've considered
There are two more features that @yjshen added in apache/datafusion#2132 that we might contemplate:

  1. Take a list of (record_batch_index, offset_in_the_record_batch, num_records) to optimize the common case of copying multiple rows from each source batch.
  2. Provide an iterator interface so that the results can be formed a batch at a time, rather than in one large array

Additional context
This came up while @yjshen was implementing a more memory efficient sort in DataFusion: apache/datafusion#2132 and suggested by @Dandandan apache/datafusion#2132 (comment)

We can probably move a bunch of the implementation from that PR to this one.

Metadata

Metadata

Assignees

Labels

arrowChanges to the arrow crateenhancementAny new improvement worthy of a entry in the changelogperformance

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions