Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce copying in CoalesceBatchesExec for StringViews #11628

Open
Tracked by #11752
alamb opened this issue Jul 23, 2024 · 3 comments
Open
Tracked by #11752

Reduce copying in CoalesceBatchesExec for StringViews #11628

alamb opened this issue Jul 23, 2024 · 3 comments
Labels
enhancement New feature or request

Comments

@alamb
Copy link
Contributor

alamb commented Jul 23, 2024

Is your feature request related to a problem or challenge?

In pictures, what #11587 does is like
this (to ensure lots of unreachable "garbage" does not accumulate in the output batch)

┌────────────────────┐
│    RecordBatch     │                ┌────────────────────┐
│   num_rows = 23    │                │    RecordBatch     │
└────────────────────┘                │   num_rows = 23    │              ┌────────────────────┐
                                      └────────────────────┘              │                    │
┌────────────────────┐                                        Coalesce    │                    │
│                    │ StringView::gc ┌────────────────────┐   Batches    │                    │
│    RecordBatch     │                │    RecordBatch     │              │                    │
│   num_rows = 50    │                │   num_rows = 50    │  ─ ─ ─ ─ ─▶  │                    │
│                    │    ─ ─ ─ ─ ─▶  │                    │              │    RecordBatch     │
│                    │                └────────────────────┘              │   num_rows = 106   │
└────────────────────┘                                                    │                    │
                                                                          │                    │
┌────────────────────┐                ┌────────────────────┐              │                    │
│                    │                │    RecordBatch     │              │                    │
│    RecordBatch     │                │   num_rows = 33    │              │                    │
│   num_rows = 33    │                │                    │              └────────────────────┘
│                    │                └────────────────────┘
└────────────────────┘

However, as @2010YOUY01 pointed out in https://github.com/apache/datafusion/pull/11587/files#r1686678665

So here inside gc string buffer will be copied once, (below) in
concat_batches() string buffer will be copied again, it seems possible to copy
only once by changing the internal implementation of concat_batches()

This implementation will effectively copy the data twice -- once for the call to
gc and once for the call coalsece batches.

Due to the nature of StringView the actual strings vaules are only copied once, but the u128 view value will be copied twice

Describe the solution you'd like

Somehow structure the code to avoid copying the views again. Like this

┌────────────────────┐                                       
│    RecordBatch     │                                       
│   num_rows = 23    │                 ┌────────────────────┐
└────────────────────┘                 │                    │
                       StringView::gc  │                    │
┌────────────────────┐  and Coalesce   │                    │
│                    │ Batches in same │                    │
│    RecordBatch     │    operation    │                    │
│   num_rows = 50    │                 │    RecordBatch     │
│                    │    ─ ─ ─ ─ ─▶   │   num_rows = 106   │
│                    │                 │                    │
└────────────────────┘                 │                    │
                                       │                    │
┌────────────────────┐                 │                    │
│                    │                 │                    │
│    RecordBatch     │                 └────────────────────┘
│   num_rows = 33    │                                       
│                    │                                       
└────────────────────┘                                       

Describe alternatives you've considered

https://github.com/apache/datafusion/pull/11587/files#r1687099239

I think given how concat is implemented for StringView it will only copy the fixed parts (not the actual string data)

Perhaps what we could do is implement a wrapper around arrow::concat_batches that has the datafusion specific GC trigger for sparse arrays, and falls back to concat for other types: https://docs.rs/arrow-select/52.1.0/src/arrow_select/concat.rs.html#150

/// wrapper around [`arrow::compute::concat`] that 
pub fn concat(arrays: &[&dyn Array]) -> Result<ArrayRef, ArrowError> {
 // loop over columns here and handle StringView specially, 
 // or fallback to concat
 }

Additional context

#7957 is another related idea for avoding copies

@alamb alamb added the enhancement New feature or request label Jul 23, 2024
@XiangpengHao
Copy link
Contributor

XiangpengHao commented Jul 23, 2024

Got some time to think about this and want to share my thoughts here:

Implementation

The goal is to reduce copying string, specifically, only copying string once and only constructing view array once.

I implemented the gc in concat_batches on my local branch, the code looks like this:

    for i in 0..field_num {
        let data_type = schema.field(i).data_type();
        match data_type {
            &arrow_schema::DataType::Utf8View => {
                let mut string_view_builder = StringViewBuilder::with_capacity(row_count)
                    .with_block_size(1024 * 1024 * 2);
                for b in batches.iter() {
                    let array = b.column(i).as_string_view();
                    for v in array.iter() {
                        string_view_builder.append_option(v);
                    }
                }
                let array = string_view_builder.finish();
                arrays.push(Arc::new(array) as ArrayRef);
            }
            _ => {
                let array = arrow::compute::concat(
                    &batches
                        .iter()
                        .map(|batch| batch.column(i).as_ref())
                        .collect::<Vec<_>>(),
                )?;
                arrays.push(array);
            }
        }
    }

Benchmark this implement on ClickBench Q20:

SELECT COUNT(*) FROM hits WHERE "URL" LIKE '%google%';

The performance is slower by about 20%.

Profiling

I checked the flamegraph and found the new implementation takes significantly more time on page fault.
image

Then I ran heaptrack (need to disable mimalloc) and found the peak RSS (Peak Resident Set Size) increased from 1.3GB to 5.2GB.

I believe the performance regression is due to late GC. Previously, we called GC immediately after we ran the filter. Now, we call GC only after we accumulate enough values in the buffer, which can hold the underlying buffer for an excessively long time, leading to high memory consumption because the StringView buffer was not released timely.

Solution?

I think the discussion around reduce copying can be divide into two sub questions:

  1. what is the current overhead of StringView in CoalesceBatchesExec? The current implementation does not copy string data. The overhead (extra steps) comes from that we constructed the view three times (one in the filter step, one in the coalesce gc, one in the concate_batches). The implementation above gets rid of the second one, but it is done in an improper timing.
  2. should we refactor filter-then-coalesce into one operator? In that way, we don't have intermediate small batches, thus reduce copy. This is a bigger project and can potentially solve the first problem along the way.

I think this is another example of getting StringView fast in practice requires a lot of careful analysis and implementation!

cc @alamb @2010YOUY01

@alamb
Copy link
Contributor Author

alamb commented Jul 25, 2024

I believe the performance regression is due to late GC. Previously, we called GC immediately after we ran the filter. Now, we call GC only after we accumulate enough values in the buffer,

This makes sense to me and I think your analysis is very clear. Thank you

should we refactor filter-then-coalesce into one operator? In that way, we don't have intermediate small batches, thus reduce copy. This is a bigger project and can potentially solve the first problem along the way.

I think this is what we should pursue and I think what is covered by #7957. As you say it is likely the thing that will perform the best.

Maybe we could explore a solution that builds an the output StringViewArray as data came in, rather than wait for enough data to be accumulated. The code might look like

while let Some(batch) = input.read_batch() {
  // append new rows to inprogress output, producing a complete batch if ready
  if let Some(output_batch) = coalescer.push_batch(batch) {
    output.emit(output_batch)
  }
}

The idea would be that coalescer stores an in-progress StringViewBuilder so that as batches were pushed the data was copied

struct Coalescer {
  in_progress: StringViewBuilder 
  // and similiar things for other types 🤔 
}

impl Coalescer {
  fn push_bach(&mut self, batch: RecordBatch) -> Option<RecordBatch> {
    // copy relevant values to self.in_progress
    // if in_progress.len is greater than threshold emit a batch
  }
}

You might recognize this high level structure from #11610 :)

I think this is another example of getting StringView fast in practice requires a lot of careful analysis and implementation!

100% agree

@2010YOUY01
Copy link
Contributor

Benchmark this implement on ClickBench Q20:

SELECT COUNT(*) FROM hits WHERE "URL" LIKE '%google%';

The performance is slower by about 20%.

This benchmark is a great inspiration, I think this query has low selectivity and processed strings are longer, so it's preferred to do early GC and extra copies of views's influence are not dominant.
And the goal is to find a strategy to work for all possible cases: low/high selectivity filter + short/long string.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

3 participants