Skip to content

Commit

Permalink
perf: Optimize chunked-id gather for binaryviews (#14878)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 authored Mar 6, 2024
1 parent 6a244ef commit ac0131a
Show file tree
Hide file tree
Showing 4 changed files with 259 additions and 8 deletions.
2 changes: 1 addition & 1 deletion crates/polars-arrow/src/array/binview/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::array::iterator::NonNullValuesIter;
use crate::bitmap::utils::{BitmapIter, ZipValidity};
pub type BinaryViewArray = BinaryViewArrayGeneric<[u8]>;
pub type Utf8ViewArray = BinaryViewArrayGeneric<str>;
pub use view::View;
pub use view::{View, INLINE_VIEW_SIZE};

pub type MutablePlString = MutableBinaryViewArray<str>;
pub type MutablePlBinary = MutableBinaryViewArray<[u8]>;
Expand Down
6 changes: 4 additions & 2 deletions crates/polars-arrow/src/array/binview/view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ use crate::buffer::Buffer;
use crate::datatypes::PrimitiveType;
use crate::types::NativeType;

pub const INLINE_VIEW_SIZE: u32 = 12;

// We use this instead of u128 because we want alignment of <= 8 bytes.
#[derive(Debug, Copy, Clone, Default)]
#[repr(C)]
Expand Down Expand Up @@ -148,8 +150,8 @@ where
{
for view in views {
let len = view.length;
if len <= 12 {
if len < 12 && view.as_u128() >> (32 + len * 8) != 0 {
if len <= INLINE_VIEW_SIZE {
if len < INLINE_VIEW_SIZE && view.as_u128() >> (32 + len * 8) != 0 {
polars_bail!(ComputeError: "view contained non-zero padding in prefix");
}

Expand Down
2 changes: 1 addition & 1 deletion crates/polars-arrow/src/array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -700,7 +700,7 @@ mod values;
pub use binary::{BinaryArray, BinaryValueIter, MutableBinaryArray, MutableBinaryValuesArray};
pub use binview::{
BinaryViewArray, BinaryViewArrayGeneric, MutableBinaryViewArray, MutablePlBinary,
MutablePlString, Utf8ViewArray, View, ViewType,
MutablePlString, Utf8ViewArray, View, ViewType, INLINE_VIEW_SIZE,
};
pub use boolean::{BooleanArray, MutableBooleanArray};
pub use dictionary::{DictionaryArray, DictionaryKey, MutableDictionaryArray};
Expand Down
257 changes: 253 additions & 4 deletions crates/polars-ops/src/chunked_array/gather/chunked.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
use std::borrow::Cow;
use std::fmt::Debug;

use arrow::array::{Array, BinaryViewArray, View, INLINE_VIEW_SIZE};
use arrow::bitmap::MutableBitmap;
use arrow::buffer::Buffer;
use arrow::legacy::trusted_len::TrustedLenPush;
use polars_core::prelude::gather::_update_gather_sorted_flag;
use polars_core::prelude::*;
use polars_core::series::IsSorted;
Expand Down Expand Up @@ -102,11 +106,14 @@ impl TakeChunked for Series {
},
Binary => {
let ca = phys.binary().unwrap();
ca.take_chunked_unchecked(by, sorted).into_series()
let out = take_unchecked_binview(ca, by, sorted);
out.into_series()
},
String => {
let ca = phys.str().unwrap();
ca.take_chunked_unchecked(by, sorted).into_series()
let ca = ca.as_binary();
let out = take_unchecked_binview(&ca, by, sorted);
out.to_string().into_series()
},
List(_) => {
let ca = phys.list().unwrap();
Expand Down Expand Up @@ -155,11 +162,14 @@ impl TakeChunked for Series {
},
Binary => {
let ca = phys.binary().unwrap();
ca.take_opt_chunked_unchecked(by).into_series()
let out = take_unchecked_binview_opt(ca, by);
out.into_series()
},
String => {
let ca = phys.str().unwrap();
ca.take_opt_chunked_unchecked(by).into_series()
let ca = ca.as_binary();
let out = take_unchecked_binview_opt(&ca, by);
out.to_string().into_series()
},
List(_) => {
let ca = phys.list().unwrap();
Expand Down Expand Up @@ -308,3 +318,242 @@ unsafe fn take_opt_unchecked_object(s: &Series, by: &[NullableChunkId]) -> Serie
});
builder.to_series()
}

#[allow(clippy::unnecessary_cast)]
#[inline(always)]
fn rewrite_view(mut view: View, chunk_idx: IdxSize) -> View {
let chunk_idx = chunk_idx as u32;
let offset = [0, chunk_idx][(view.length > INLINE_VIEW_SIZE) as usize];
view.buffer_idx += offset;
view
}

#[allow(clippy::unnecessary_cast)]
unsafe fn take_unchecked_binview(
ca: &BinaryChunked,
by: &[ChunkId],
sorted: IsSorted,
) -> BinaryChunked {
let views = ca
.downcast_iter()
.map(|arr| arr.views().as_slice())
.collect::<Vec<_>>();
let buffers: Arc<[Buffer<u8>]> = ca
.downcast_iter()
.flat_map(|arr| arr.data_buffers().as_ref())
.cloned()
.collect();

let (views, validity) = if ca.null_count() == 0 {
let views = by
.iter()
.map(|chunk_id| {
let (chunk_idx, array_idx) = chunk_id.extract();
let array_idx = array_idx as usize;

let target = *views.get_unchecked_release(chunk_idx as usize);
let view = *target.get_unchecked_release(array_idx);

rewrite_view(view, chunk_idx)
})
.collect::<Vec<_>>();

(views, None)
} else {
let targets = ca.downcast_iter().collect::<Vec<_>>();

let mut mut_views = Vec::with_capacity(by.len());
let mut validity = MutableBitmap::with_capacity(by.len());

for id in by.iter() {
let (chunk_idx, array_idx) = id.extract();
let array_idx = array_idx as usize;

let target = *targets.get_unchecked_release(chunk_idx as usize);
if target.is_null_unchecked(array_idx) {
mut_views.push_unchecked(View::default());
validity.push_unchecked(false)
} else {
let target = *views.get_unchecked_release(chunk_idx as usize);
let view = *target.get_unchecked_release(array_idx);
let view = rewrite_view(view, chunk_idx);
mut_views.push_unchecked(view);
validity.push_unchecked(true)
}
}

(mut_views, Some(validity.freeze()))
};

let arr = BinaryViewArray::new_unchecked_unknown_md(
ArrowDataType::BinaryView,
views.into(),
buffers,
validity,
None,
)
.maybe_gc();

let mut out = BinaryChunked::with_chunk(ca.name(), arr);
let sorted_flag = _update_gather_sorted_flag(ca.is_sorted_flag(), sorted);
out.set_sorted_flag(sorted_flag);
out
}

unsafe fn take_unchecked_binview_opt(ca: &BinaryChunked, by: &[NullableChunkId]) -> BinaryChunked {
let views = ca
.downcast_iter()
.map(|arr| arr.views().as_slice())
.collect::<Vec<_>>();
let buffers: Arc<[Buffer<u8>]> = ca
.downcast_iter()
.flat_map(|arr| arr.data_buffers().as_ref())
.cloned()
.collect();

let targets = ca.downcast_iter().collect::<Vec<_>>();

let mut mut_views = Vec::with_capacity(by.len());
let mut validity = MutableBitmap::with_capacity(by.len());

let (views, validity) = if ca.null_count() == 0 {
for id in by.iter() {
if id.is_null() {
mut_views.push_unchecked(View::default());
validity.push_unchecked(false)
} else {
let (chunk_idx, array_idx) = id.extract();
let array_idx = array_idx as usize;

let target = *views.get_unchecked_release(chunk_idx as usize);
let view = *target.get_unchecked_release(array_idx);
let view = rewrite_view(view, chunk_idx);

mut_views.push_unchecked(view);
validity.push_unchecked(true)
}
}
(mut_views, Some(validity.freeze()))
} else {
for id in by.iter() {
if id.is_null() {
mut_views.push_unchecked(View::default());
validity.push_unchecked(false)
} else {
let (chunk_idx, array_idx) = id.extract();
let array_idx = array_idx as usize;

let target = *targets.get_unchecked_release(chunk_idx as usize);
if target.is_null_unchecked(array_idx) {
mut_views.push_unchecked(View::default());
validity.push_unchecked(false)
} else {
let target = *views.get_unchecked_release(chunk_idx as usize);
let view = *target.get_unchecked_release(array_idx);
let view = rewrite_view(view, chunk_idx);
mut_views.push_unchecked(view);
validity.push_unchecked(true);
}
}
}

(mut_views, Some(validity.freeze()))
};

let arr = BinaryViewArray::new_unchecked_unknown_md(
ArrowDataType::BinaryView,
views.into(),
buffers,
validity,
None,
)
.maybe_gc();

BinaryChunked::with_chunk(ca.name(), arr)
}

#[cfg(test)]
mod test {
use super::*;

#[test]
fn test_binview_chunked_gather() {
unsafe {
// # Series without nulls;
let mut s_1 = Series::new(
"a",
&["1 loooooooooooong string 1", "2 loooooooooooong string 2"],
);
let s_2 = Series::new(
"a",
&[
"11 loooooooooooong string 11",
"22 loooooooooooong string 22",
],
);
s_1.append(&s_2).unwrap();

assert_eq!(s_1.n_chunks(), 2);

// ## Ids without nulls;
let by = [
ChunkId::store(0, 0),
ChunkId::store(0, 1),
ChunkId::store(1, 1),
ChunkId::store(1, 0),
];

let out = s_1.take_chunked_unchecked(&by, IsSorted::Not);
let idx = IdxCa::new("", [0, 1, 3, 2]);
let expected = s_1.rechunk().take(&idx).unwrap();
assert!(out.equals(&expected));

// ## Ids with nulls;
let by: [ChunkId; 4] = [
ChunkId::null(),
ChunkId::store(0, 1),
ChunkId::store(1, 1),
ChunkId::store(1, 0),
];
let out = s_1.take_opt_chunked_unchecked(&by);

let idx = IdxCa::new("", [None, Some(1), Some(3), Some(2)]);
let expected = s_1.rechunk().take(&idx).unwrap();
assert!(out.equals_missing(&expected));

// # Series with nulls;
let mut s_1 = Series::new(
"a",
&["1 loooooooooooong string 1", "2 loooooooooooong string 2"],
);
let s_2 = Series::new("a", &[Some("11 loooooooooooong string 11"), None]);
s_1.append(&s_2).unwrap();

// ## Ids without nulls;
let by = [
ChunkId::store(0, 0),
ChunkId::store(0, 1),
ChunkId::store(1, 1),
ChunkId::store(1, 0),
];

let out = s_1.take_chunked_unchecked(&by, IsSorted::Not);
let idx = IdxCa::new("", [0, 1, 3, 2]);
let expected = s_1.rechunk().take(&idx).unwrap();
assert!(out.equals_missing(&expected));

// ## Ids with nulls;
let by: [ChunkId; 4] = [
ChunkId::null(),
ChunkId::store(0, 1),
ChunkId::store(1, 1),
ChunkId::store(1, 0),
];
let out = s_1.take_opt_chunked_unchecked(&by);

let idx = IdxCa::new("", [None, Some(1), Some(3), Some(2)]);
let expected = s_1.rechunk().take(&idx).unwrap();
assert!(out.equals_missing(&expected));
}
}
}

0 comments on commit ac0131a

Please sign in to comment.