Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 58 additions & 44 deletions arrow-array/src/array/byte_view_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -512,18 +512,36 @@ impl<T: ByteViewType + ?Sized> GenericByteViewArray<T> {
};
}

struct GcCopyGroup {
total_buffer_bytes: usize,
total_len: usize,
}
let (views_buf, data_blocks) = if total_large < i32::MAX as usize {
// fast path, the entire data fits in a single buffer
// 3) Allocate exactly capacity for all non-inline data
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this restores the original code and then only drops into the slow path when needed

let mut data_buf = Vec::with_capacity(total_large);

// 4) Iterate over views and process each inline/non-inline view
let views_buf: Vec<u128> = (0..len)
.map(|i| unsafe { self.copy_view_to_buffer(i, 0, &mut data_buf) })
.collect();
let data_block = Buffer::from_vec(data_buf);
let data_blocks = vec![data_block];
(views_buf, data_blocks)
} else {
// slow path, need to split into multiple buffers

struct GcCopyGroup {
total_buffer_bytes: usize,
total_len: usize,
}

impl GcCopyGroup {
fn new(total_buffer_bytes: u32, total_len: usize) -> Self {
Self {
total_buffer_bytes: total_buffer_bytes as usize,
total_len,
}
}
}

let mut groups = vec![];
let one_group = [GcCopyGroup {
total_buffer_bytes: total_large,
total_len: len,
}];
let gc_copy_groups = if total_large > i32::MAX as usize {
// Slow-path: need to split into multiple copy groups
let mut groups = Vec::with_capacity(total_large / (i32::MAX as usize) + 1);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alamb assume a bad case which every string view is 1.5GiB, this would be smaller than expected ( this is why I don't use estimate previously, I think re-allocation here would not be the bottlenect, and the actual size might be more than total_large / (i32::MAX as usize) + 1 )

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alamb assume a bad case which every string view is 1.5GiB, this would be smaller than expected

I don't quite follow this logic. For example

  • if we had 3 string views each of 1.5G
  • total_large would be 4.5G
  • Then the capacity woudl be (4.5G / 2G + 1) = 3

Which is how many buffers are needed 🤔

( this is why I don't use estimate previously, I think re-allocation here would not be the bottlenect, and the actual size might be more than total_large / (i32::MAX as usize) + 1 )

I agree reallocation is likely not to be the bottleneck

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean, assume 100Bytes for a BinaryView and 1.5G for a BinaryView, the case would be different. Assume dest size is 15GiB:

  1. 100Bytes, it would be (15 / 2G + 1) = 8
  2. 1.5G, it would require 10 lists, since 2Gib would at most have 1 binary view

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(A better example is every BinaryView.len() is 1GiB and 1byte more ( Or string interleaves with 1Gib, 1byte, 1Gib, 1byte...)

let mut current_length = 0;
let mut current_elements = 0;

Expand All @@ -532,10 +550,7 @@ impl<T: ByteViewType + ?Sized> GenericByteViewArray<T> {
if len > MAX_INLINE_VIEW_LEN {
if current_length + len > i32::MAX as u32 {
// Start a new group
groups.push(GcCopyGroup {
total_buffer_bytes: current_length as usize,
total_len: current_elements,
});
groups.push(GcCopyGroup::new(current_length, current_elements));
current_length = 0;
current_elements = 0;
}
Expand All @@ -544,38 +559,37 @@ impl<T: ByteViewType + ?Sized> GenericByteViewArray<T> {
}
}
if current_elements != 0 {
groups.push(GcCopyGroup {
total_buffer_bytes: current_length as usize,
total_len: current_elements,
});
groups.push(GcCopyGroup::new(current_length, current_elements));
}
&groups
} else {
one_group.as_slice()
};
debug_assert!(gc_copy_groups.len() <= i32::MAX as usize);

// 3) Copy the buffers group by group
let mut views_buf = Vec::with_capacity(len);
let mut data_blocks = Vec::with_capacity(gc_copy_groups.len());

let mut current_view_idx = 0;

for (group_idx, gc_copy_group) in gc_copy_groups.iter().enumerate() {
let mut data_buf = Vec::with_capacity(gc_copy_group.total_buffer_bytes);

// Directly push views to avoid intermediate Vec allocation
for view_idx in current_view_idx..current_view_idx + gc_copy_group.total_len {
let view =
unsafe { self.copy_view_to_buffer(view_idx, group_idx as i32, &mut data_buf) };
views_buf.push(view);
debug_assert!(groups.len() <= i32::MAX as usize);

// 3) Copy the buffers group by group
let mut views_buf = Vec::with_capacity(len);
let mut data_blocks = Vec::with_capacity(groups.len());

let mut current_view_idx = 0;

for (group_idx, gc_copy_group) in groups.iter().enumerate() {
let mut data_buf = Vec::with_capacity(gc_copy_group.total_buffer_bytes);

// Directly push views to avoid intermediate Vec allocation
let new_views = (current_view_idx..current_view_idx + gc_copy_group.total_len).map(
|view_idx| {
// safety: the view index came from iterating over valid range
unsafe {
self.copy_view_to_buffer(view_idx, group_idx as i32, &mut data_buf)
}
},
);
views_buf.extend(new_views);
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also tried to make this faster using extend rather than push, which might help


data_blocks.push(Buffer::from_vec(data_buf));
current_view_idx += gc_copy_group.total_len;
}
(views_buf, data_blocks)
};

data_blocks.push(Buffer::from_vec(data_buf));
current_view_idx += gc_copy_group.total_len;
}

// 4) Wrap up buffers
// 5) Wrap up buffers
let views_scalar = ScalarBuffer::from(views_buf);

// SAFETY: views_scalar, data_blocks, and nulls are correctly aligned and sized
Expand Down