-
Notifications
You must be signed in to change notification settings - Fork 1k
Description
Is your feature request related to a problem or challenge? Please describe what you are trying to do.
For several operations in data processing, it is important to be able to select some subset (for sorting or filtering)
For example, the current take kernel works like this:
┌─────────────────┐ ┌─────────┐ ┌─────────────────┐
│ A │ │ 0 │ │ A │
├─────────────────┤ ├─────────┤ ├─────────────────┤
│ D │ │ 2 │ │ B │
├─────────────────┤ ├─────────┤ take(values, indicies) ├─────────────────┤
│ B │ │ 3 │ ─────────────────────────▶ │ C │
├─────────────────┤ ├─────────┤ ├─────────────────┤
│ C │ │ 1 │ │ D │
├─────────────────┤ └─────────┘ └─────────────────┘
│ E │
└─────────────────┘
values array indicies array result
In DataFusion, our operators get multiple record batches at a time, and we would like to do stuff like sort them without first combining into a single record batch. For example:
┌─────────────────┐
│ A │
├─────────────────┤
│ D │ ┌─────────────────┐
└─────────────────┘ │ A │
values array 0 ├─────────────────┤
│ B │
? ├─────────────────┤
│ C │
┌─────────────────┐ ─────────────────────────▶ ├─────────────────┤
│ B │ │ D │
├─────────────────┤ └─────────────────┘
│ C │
├─────────────────┤
│ E │ desired result
└─────────────────┘
values array 1
Describe the solution you'd like
I would like a function something like batch_take that takes a vector of RecordBatches and a list of (record_batch_index, offset_in_the_record_batch) tuples and produces the resulting array, like:
┌─────────────────┐ ┌─────────┐ ┌─────────────────┐
│ A │ │ (0, 0) │ batch_take( │ A │
├─────────────────┤ ├─────────┤ [values0, values1], ├─────────────────┤
│ D │ │ (1, 0) │ batch_indicies │ B │
└─────────────────┘ ├─────────┤ ) ├─────────────────┤
values array 0 │ (1, 1) │ ─────────────────────────▶ │ C │
├─────────┤ ├─────────────────┤
│ (1, 0) │ │ D │
└─────────┘ └─────────────────┘
┌─────────────────┐
│ B │
├─────────────────┤ batch_indicies result
│ C │ array
├─────────────────┤
│ E │
└─────────────────┘
values array 1
Overtime I would expect these to become optimized in the same way as we have optimized the take kernel
This will come up in Grouping and Join operators as well.
Describe alternatives you've considered
There are two more features that @yjshen added in apache/datafusion#2132 that we might contemplate:
- Take a list of
(record_batch_index, offset_in_the_record_batch, num_records)to optimize the common case of copying multiple rows from each source batch. - Provide an iterator interface so that the results can be formed a batch at a time, rather than in one large array
Additional context
This came up while @yjshen was implementing a more memory efficient sort in DataFusion: apache/datafusion#2132 and suggested by @Dandandan apache/datafusion#2132 (comment)
We can probably move a bunch of the implementation from that PR to this one.