Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: Use split_at instead of double slice in chunk splits. #16856

Merged
merged 2 commits into from
Jun 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions crates/polars-arrow/src/array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,9 @@ pub trait Array: Send + Sync + dyn_clone::DynClone + 'static {
/// This function panics iff `offset + length > self.len()`.
#[must_use]
fn sliced(&self, offset: usize, length: usize) -> Box<dyn Array> {
if length == 0 {
return new_empty_array(self.data_type().clone());
}
let mut new = self.to_boxed();
new.slice(offset, length);
new
Expand Down
98 changes: 96 additions & 2 deletions crates/polars-core/src/chunked_array/ops/chunkops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,44 @@ use crate::chunked_array::metadata::MetadataProperties;
use crate::chunked_array::object::builder::ObjectChunkedBuilder;
use crate::utils::slice_offsets;

#[inline]
pub(crate) fn split_at(
chunks: &[ArrayRef],
offset: i64,
own_length: usize,
) -> (Vec<ArrayRef>, Vec<ArrayRef>) {
let mut new_chunks_left = Vec::with_capacity(1);
let mut new_chunks_right = Vec::with_capacity(1);
let (raw_offset, _) = slice_offsets(offset, 0, own_length);

let mut remaining_offset = raw_offset;
let mut iter = chunks.iter();

for chunk in &mut iter {
let chunk_len = chunk.len();
if remaining_offset > 0 && remaining_offset >= chunk_len {
remaining_offset -= chunk_len;
new_chunks_left.push(chunk.clone());
continue;
}

let (l, r) = chunk.split_at_boxed(remaining_offset);
new_chunks_left.push(l);
new_chunks_right.push(r);
break;
}

for chunk in iter {
new_chunks_right.push(chunk.clone())
}
if new_chunks_left.is_empty() {
new_chunks_left.push(chunks[0].sliced(0, 0));
}
if new_chunks_right.is_empty() {
new_chunks_right.push(chunks[0].sliced(0, 0));
}
(new_chunks_left, new_chunks_right)
}

pub(crate) fn slice(
chunks: &[ArrayRef],
offset: i64,
Expand Down Expand Up @@ -136,12 +173,69 @@ impl<T: PolarsDataType> ChunkedArray<T> {
}
}

/// Split the array. The chunks are reallocated the underlying data slices are zero copy.
///
/// When offset is negative it will be counted from the end of the array.
/// This method will never error,
/// and will slice the best match when offset, or length is out of bounds
pub fn split_at(&self, offset: i64) -> (Self, Self) {
// A normal slice, slice the buffers and thus keep the whole memory allocated.
let (l, r) = split_at(&self.chunks, offset, self.len());
let mut out_l = unsafe { self.copy_with_chunks(l) };
let mut out_r = unsafe { self.copy_with_chunks(r) };

use MetadataProperties as P;
let mut properties_l = P::SORTED | P::FAST_EXPLODE_LIST;
let mut properties_r = P::SORTED | P::FAST_EXPLODE_LIST;

let is_ascending = self.is_sorted_ascending_flag();
let is_descending = self.is_sorted_descending_flag();

if is_ascending || is_descending {
let has_nulls_at_start = self.null_count() != 0
&& self
.chunks()
.first()
.unwrap()
.as_ref()
.validity()
.map_or(false, |bm| bm.get(0).unwrap());

if !has_nulls_at_start {
let can_copy_min_value = !has_nulls_at_start && is_ascending;
let can_copy_max_value = !has_nulls_at_start && is_descending;

properties_l.set(P::MIN_VALUE, can_copy_min_value);
properties_l.set(P::MAX_VALUE, can_copy_max_value);
}

let has_nulls_at_end = self.null_count() != 0
&& self
.chunks()
.last()
.unwrap()
.as_ref()
.validity()
.map_or(false, |bm| bm.get(bm.len() - 1).unwrap());

if !has_nulls_at_end {
let can_copy_min_value = !has_nulls_at_end && is_descending;
let can_copy_max_value = !has_nulls_at_end && is_ascending;
properties_r.set(P::MIN_VALUE, can_copy_min_value);
properties_r.set(P::MAX_VALUE, can_copy_max_value);
}
}
out_l.copy_metadata(self, properties_l);
out_r.copy_metadata(self, properties_r);

(out_l, out_r)
}

/// Slice the array. The chunks are reallocated the underlying data slices are zero copy.
///
/// When offset is negative it will be counted from the end of the array.
/// This method will never error,
/// and will slice the best match when offset, or length is out of bounds
#[inline]
pub fn slice(&self, offset: i64, length: usize) -> Self {
// The len: 0 special cases ensure we release memory.
// A normal slice, slice the buffers and thus keep the whole memory allocated.
Expand Down
8 changes: 8 additions & 0 deletions crates/polars-core/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2233,6 +2233,14 @@ impl DataFrame {
unsafe { DataFrame::new_no_checks(col) }
}

/// Split [`DataFrame`] at the given `offset`.
pub fn split_at(&self, offset: i64) -> (Self, Self) {
let (a, b) = self.columns.iter().map(|s| s.split_at(offset)).unzip();
let a = unsafe { DataFrame::new_no_checks(a) };
let b = unsafe { DataFrame::new_no_checks(b) };
(a, b)
}

pub fn clear(&self) -> Self {
let col = self.columns.iter().map(|s| s.clear()).collect::<Vec<_>>();
unsafe { DataFrame::new_no_checks(col) }
Expand Down
5 changes: 5 additions & 0 deletions crates/polars-core/src/series/implementations/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ impl SeriesTrait for SeriesWrap<ArrayChunked> {
self.0.slice(offset, length).into_series()
}

fn split_at(&self, offset: i64) -> (Series, Series) {
let (a, b) = self.0.split_at(offset);
(a.into_series(), b.into_series())
}

fn append(&mut self, other: &Series) -> PolarsResult<()> {
polars_ensure!(self.0.dtype() == other.dtype(), append);
let other = other.array()?;
Expand Down
4 changes: 4 additions & 0 deletions crates/polars-core/src/series/implementations/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@ impl SeriesTrait for SeriesWrap<BinaryChunked> {
fn slice(&self, offset: i64, length: usize) -> Series {
self.0.slice(offset, length).into_series()
}
fn split_at(&self, offset: i64) -> (Series, Series) {
let (a, b) = self.0.split_at(offset);
(a.into_series(), b.into_series())
}

fn append(&mut self, other: &Series) -> PolarsResult<()> {
polars_ensure!(self.0.dtype() == other.dtype(), append);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ impl SeriesTrait for SeriesWrap<BinaryOffsetChunked> {
fn slice(&self, offset: i64, length: usize) -> Series {
self.0.slice(offset, length).into_series()
}
fn split_at(&self, offset: i64) -> (Series, Series) {
let (a, b) = self.0.split_at(offset);
(a.into_series(), b.into_series())
}

fn append(&mut self, other: &Series) -> PolarsResult<()> {
polars_ensure!(self.0.dtype() == other.dtype(), append);
Expand Down
4 changes: 4 additions & 0 deletions crates/polars-core/src/series/implementations/boolean.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@ impl SeriesTrait for SeriesWrap<BooleanChunked> {
fn slice(&self, offset: i64, length: usize) -> Series {
self.0.slice(offset, length).into_series()
}
fn split_at(&self, offset: i64) -> (Series, Series) {
let (a, b) = self.0.split_at(offset);
(a.into_series(), b.into_series())
}

fn append(&mut self, other: &Series) -> PolarsResult<()> {
polars_ensure!(self.0.dtype() == other.dtype(), append);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,12 @@ impl SeriesTrait for SeriesWrap<CategoricalChunked> {
self.with_state(false, |cats| cats.slice(offset, length))
.into_series()
}
fn split_at(&self, offset: i64) -> (Series, Series) {
let (a, b) = self.0.physical().split_at(offset);
let a = self.finish_with_state(false, a).into_series();
let b = self.finish_with_state(false, b).into_series();
(a, b)
}

fn append(&mut self, other: &Series) -> PolarsResult<()> {
polars_ensure!(self.0.dtype() == other.dtype(), append);
Expand Down
4 changes: 4 additions & 0 deletions crates/polars-core/src/series/implementations/date.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,10 @@ impl SeriesTrait for SeriesWrap<DateChunked> {
fn slice(&self, offset: i64, length: usize) -> Series {
self.0.slice(offset, length).into_date().into_series()
}
fn split_at(&self, offset: i64) -> (Series, Series) {
let (a, b) = self.0.split_at(offset);
(a.into_date().into_series(), b.into_date().into_series())
}

fn mean(&self) -> Option<f64> {
self.0.mean()
Expand Down
9 changes: 9 additions & 0 deletions crates/polars-core/src/series/implementations/datetime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,15 @@ impl SeriesTrait for SeriesWrap<DatetimeChunked> {
.into_datetime(self.0.time_unit(), self.0.time_zone().clone())
.into_series()
}
fn split_at(&self, offset: i64) -> (Series, Series) {
let (a, b) = self.0.split_at(offset);
(
a.into_datetime(self.0.time_unit(), self.0.time_zone().clone())
.into_series(),
b.into_datetime(self.0.time_unit(), self.0.time_zone().clone())
.into_series(),
)
}

fn mean(&self) -> Option<f64> {
self.0.mean()
Expand Down
11 changes: 11 additions & 0 deletions crates/polars-core/src/series/implementations/decimal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,17 @@ impl SeriesTrait for SeriesWrap<DecimalChunked> {
self.apply_physical_to_s(|ca| ca.slice(offset, length))
}

fn split_at(&self, offset: i64) -> (Series, Series) {
let (a, b) = self.0.split_at(offset);
let a = a
.into_decimal_unchecked(self.0.precision(), self.0.scale())
.into_series();
let b = b
.into_decimal_unchecked(self.0.precision(), self.0.scale())
.into_series();
(a, b)
}

fn append(&mut self, other: &Series) -> PolarsResult<()> {
polars_ensure!(self.0.dtype() == other.dtype(), append);
let other = other.decimal()?;
Expand Down
7 changes: 7 additions & 0 deletions crates/polars-core/src/series/implementations/duration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,13 @@ impl SeriesTrait for SeriesWrap<DurationChunked> {
.into_series()
}

fn split_at(&self, offset: i64) -> (Series, Series) {
let (a, b) = self.0.split_at(offset);
let a = a.into_duration(self.0.time_unit()).into_series();
let b = b.into_duration(self.0.time_unit()).into_series();
(a, b)
}

fn mean(&self) -> Option<f64> {
self.0.mean()
}
Expand Down
5 changes: 5 additions & 0 deletions crates/polars-core/src/series/implementations/floats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,11 @@ macro_rules! impl_dyn_series {
return self.0.slice(offset, length).into_series();
}

fn split_at(&self, offset: i64) -> (Series, Series) {
let (a, b) = self.0.split_at(offset);
(a.into_series(), b.into_series())
}

fn append(&mut self, other: &Series) -> PolarsResult<()> {
polars_ensure!(self.0.dtype() == other.dtype(), append);
self.0.append(other.as_ref().as_ref());
Expand Down
5 changes: 5 additions & 0 deletions crates/polars-core/src/series/implementations/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ impl SeriesTrait for SeriesWrap<ListChunked> {
self.0.slice(offset, length).into_series()
}

fn split_at(&self, offset: i64) -> (Series, Series) {
let (a, b) = self.0.split_at(offset);
(a.into_series(), b.into_series())
}

fn append(&mut self, other: &Series) -> PolarsResult<()> {
polars_ensure!(self.0.dtype() == other.dtype(), append);
self.0.append(other.as_ref().as_ref())
Expand Down
7 changes: 6 additions & 1 deletion crates/polars-core/src/series/implementations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,12 @@ macro_rules! impl_dyn_series {
}

fn slice(&self, offset: i64, length: usize) -> Series {
return self.0.slice(offset, length).into_series();
self.0.slice(offset, length).into_series()
}

fn split_at(&self, offset: i64) -> (Series, Series) {
let (a, b) = self.0.split_at(offset);
(a.into_series(), b.into_series())
}

fn append(&mut self, other: &Series) -> PolarsResult<()> {
Expand Down
18 changes: 18 additions & 0 deletions crates/polars-core/src/series/implementations/null.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,24 @@ impl SeriesTrait for NullChunked {
.into_series()
}

fn split_at(&self, offset: i64) -> (Series, Series) {
let (l, r) = chunkops::split_at(self.chunks(), offset, self.len());
(
NullChunked {
name: self.name.clone(),
length: l.iter().map(|arr| arr.len() as IdxSize).sum(),
chunks: l,
}
.into_series(),
NullChunked {
name: self.name.clone(),
length: r.iter().map(|arr| arr.len() as IdxSize).sum(),
chunks: r,
}
.into_series(),
)
}

fn sort_with(&self, _options: SortOptions) -> PolarsResult<Series> {
Ok(self.clone().into_series())
}
Expand Down
5 changes: 5 additions & 0 deletions crates/polars-core/src/series/implementations/object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,11 @@ where
ObjectChunked::slice(&self.0, offset, length).into_series()
}

fn split_at(&self, offset: i64) -> (Series, Series) {
let (a, b) = ObjectChunked::split_at(&self.0, offset);
(a.into_series(), b.into_series())
}

fn append(&mut self, other: &Series) -> PolarsResult<()> {
if self.dtype() != other.dtype() {
polars_bail!(append);
Expand Down
4 changes: 4 additions & 0 deletions crates/polars-core/src/series/implementations/string.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@ impl SeriesTrait for SeriesWrap<StringChunked> {
fn slice(&self, offset: i64, length: usize) -> Series {
self.0.slice(offset, length).into_series()
}
fn split_at(&self, offset: i64) -> (Series, Series) {
let (a, b) = self.0.split_at(offset);
(a.into_series(), b.into_series())
}

fn append(&mut self, other: &Series) -> PolarsResult<()> {
polars_ensure!(
Expand Down
8 changes: 8 additions & 0 deletions crates/polars-core/src/series/implementations/struct_.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,14 @@ impl SeriesTrait for SeriesWrap<StructChunked> {
out.into_series()
}

fn split_at(&self, offset: i64) -> (Series, Series) {
let (a, b): (Vec<_>, Vec<_>) = self.0.fields().iter().map(|s| s.split_at(offset)).unzip();

let a = StructChunked::new(self.name(), &a).unwrap();
let b = StructChunked::new(self.name(), &b).unwrap();
(a.into_series(), b.into_series())
}

fn append(&mut self, other: &Series) -> PolarsResult<()> {
let other = other.struct_()?;
if self.is_empty() {
Expand Down
4 changes: 4 additions & 0 deletions crates/polars-core/src/series/implementations/time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@ impl SeriesTrait for SeriesWrap<TimeChunked> {
fn slice(&self, offset: i64, length: usize) -> Series {
self.0.slice(offset, length).into_time().into_series()
}
fn split_at(&self, offset: i64) -> (Series, Series) {
let (a, b) = self.0.split_at(offset);
(a.into_series(), b.into_series())
}

fn mean(&self) -> Option<f64> {
self.0.mean()
Expand Down
6 changes: 6 additions & 0 deletions crates/polars-core/src/series/series_trait.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,12 @@ pub trait SeriesTrait:
/// end of the array
fn slice(&self, _offset: i64, _length: usize) -> Series;

/// Get a zero copy view of the data.
///
/// When offset is negative the offset is counted from the
/// end of the array
fn split_at(&self, _offset: i64) -> (Series, Series);

#[doc(hidden)]
fn append(&mut self, _other: &Series) -> PolarsResult<()>;

Expand Down
Loading