Skip to content

Commit

Permalink
depr(python,rust!): Rename row_count_name/row_count_offset parame…
Browse files Browse the repository at this point in the history
…ters in IO functions to `row_index_*` (pola-rs#13563)
  • Loading branch information
stinodego authored Jan 9, 2024
1 parent 8730ced commit 130c48f
Show file tree
Hide file tree
Showing 57 changed files with 485 additions and 455 deletions.
4 changes: 2 additions & 2 deletions crates/polars-core/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -384,8 +384,8 @@ impl DataFrame {
DataFrame::new(columns)
}

/// Add a row count in place.
pub fn with_row_count_mut(&mut self, name: &str, offset: Option<IdxSize>) -> &mut Self {
/// Add a row index column in place.
pub fn with_row_index_mut(&mut self, name: &str, offset: Option<IdxSize>) -> &mut Self {
let offset = offset.unwrap_or(0);
let mut ca = IdxCa::from_vec(
name,
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-io/src/csv/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,4 @@ use crate::csv::read_impl::CoreReader;
use crate::mmap::MmapBytesReader;
use crate::predicates::PhysicalIoExpr;
use crate::utils::{get_reader_bytes, resolve_homedir};
use crate::{RowCount, SerReader, SerWriter};
use crate::{RowIndex, SerReader, SerWriter};
12 changes: 6 additions & 6 deletions crates/polars-io/src/csv/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ where
quote_char: Option<u8>,
skip_rows_after_header: usize,
try_parse_dates: bool,
row_count: Option<RowCount>,
row_index: Option<RowIndex>,
/// Aggregates chunk afterwards to a single chunk.
rechunk: bool,
raise_if_empty: bool,
Expand All @@ -173,9 +173,9 @@ where
self
}

/// Add a `row_count` column.
pub fn with_row_count(mut self, rc: Option<RowCount>) -> Self {
self.row_count = rc;
/// Add a row index column.
pub fn with_row_index(mut self, row_index: Option<RowIndex>) -> Self {
self.row_index = row_index;
self
}

Expand Down Expand Up @@ -417,7 +417,7 @@ impl<'a, R: MmapBytesReader + 'a> CsvReader<'a, R> {
std::mem::take(&mut self.predicate),
to_cast,
self.skip_rows_after_header,
std::mem::take(&mut self.row_count),
std::mem::take(&mut self.row_index),
self.try_parse_dates,
self.raise_if_empty,
self.truncate_ragged_lines,
Expand Down Expand Up @@ -603,7 +603,7 @@ where
quote_char: Some(b'"'),
skip_rows_after_header: 0,
try_parse_dates: false,
row_count: None,
row_index: None,
raise_if_empty: true,
truncate_ragged_lines: false,
}
Expand Down
10 changes: 5 additions & 5 deletions crates/polars-io/src/csv/read_impl/batched_mmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ impl<'a> CoreReader<'a> {
str_columns,
projection,
starting_point_offset,
row_count: self.row_count,
row_index: self.row_index,
comment_prefix: self.comment_prefix,
quote_char: self.quote_char,
eol_char: self.eol_char,
Expand Down Expand Up @@ -181,7 +181,7 @@ pub struct BatchedCsvReaderMmap<'a> {
str_columns: StringColumns,
projection: Vec<usize>,
starting_point_offset: Option<usize>,
row_count: Option<RowCount>,
row_index: Option<RowIndex>,
comment_prefix: Option<CommentPrefix>,
quote_char: Option<u8>,
eol_char: u8,
Expand Down Expand Up @@ -255,16 +255,16 @@ impl<'a> BatchedCsvReaderMmap<'a> {
cast_columns(&mut df, &self.to_cast, false, self.ignore_errors)?;

update_string_stats(&self.str_capacities, &self.str_columns, &df)?;
if let Some(rc) = &self.row_count {
df.with_row_count_mut(&rc.name, Some(rc.offset));
if let Some(rc) = &self.row_index {
df.with_row_index_mut(&rc.name, Some(rc.offset));
}
Ok(df)
})
.collect::<PolarsResult<Vec<_>>>()
})?;
self.file_chunks.clear();

if self.row_count.is_some() {
if self.row_index.is_some() {
update_row_counts2(&mut chunks, self.rows_read)
}
for df in &chunks {
Expand Down
10 changes: 5 additions & 5 deletions crates/polars-io/src/csv/read_impl/batched_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ impl<'a> CoreReader<'a> {
str_columns,
projection,
starting_point_offset,
row_count: self.row_count,
row_index: self.row_index,
comment_prefix: self.comment_prefix,
quote_char: self.quote_char,
eol_char: self.eol_char,
Expand Down Expand Up @@ -264,7 +264,7 @@ pub struct BatchedCsvReaderRead<'a> {
str_columns: StringColumns,
projection: Vec<usize>,
starting_point_offset: Option<usize>,
row_count: Option<RowCount>,
row_index: Option<RowIndex>,
comment_prefix: Option<CommentPrefix>,
quote_char: Option<u8>,
eol_char: u8,
Expand Down Expand Up @@ -352,16 +352,16 @@ impl<'a> BatchedCsvReaderRead<'a> {
cast_columns(&mut df, &self.to_cast, false, self.ignore_errors)?;

update_string_stats(&self.str_capacities, &self.str_columns, &df)?;
if let Some(rc) = &self.row_count {
df.with_row_count_mut(&rc.name, Some(rc.offset));
if let Some(rc) = &self.row_index {
df.with_row_index_mut(&rc.name, Some(rc.offset));
}
Ok(df)
})
.collect::<PolarsResult<Vec<_>>>()
})?;
self.file_chunks.clear();

if self.row_count.is_some() {
if self.row_index.is_some() {
update_row_counts2(&mut chunks, self.rows_read)
}
for df in &chunks {
Expand Down
28 changes: 14 additions & 14 deletions crates/polars-io/src/csv/read_impl/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::csv::{CsvEncoding, NullValues};
use crate::mmap::ReaderBytes;
use crate::predicates::PhysicalIoExpr;
use crate::utils::update_row_counts;
use crate::RowCount;
use crate::RowIndex;

pub(crate) fn cast_columns(
df: &mut DataFrame,
Expand Down Expand Up @@ -116,7 +116,7 @@ pub(crate) struct CoreReader<'a> {
missing_is_null: bool,
predicate: Option<Arc<dyn PhysicalIoExpr>>,
to_cast: Vec<Field>,
row_count: Option<RowCount>,
row_index: Option<RowIndex>,
truncate_ragged_lines: bool,
}

Expand Down Expand Up @@ -206,7 +206,7 @@ impl<'a> CoreReader<'a> {
predicate: Option<Arc<dyn PhysicalIoExpr>>,
to_cast: Vec<Field>,
skip_rows_after_header: usize,
row_count: Option<RowCount>,
row_index: Option<RowIndex>,
try_parse_dates: bool,
raise_if_empty: bool,
truncate_ragged_lines: bool,
Expand Down Expand Up @@ -306,7 +306,7 @@ impl<'a> CoreReader<'a> {
missing_is_null,
predicate,
to_cast,
row_count,
row_index,
truncate_ragged_lines,
})
}
Expand Down Expand Up @@ -552,8 +552,8 @@ impl<'a> CoreReader<'a> {
// An empty file with a schema should return an empty DataFrame with that schema
if bytes.is_empty() {
let mut df = DataFrame::from(self.schema.as_ref());
if let Some(ref row_count) = self.row_count {
df.insert_column(0, Series::new_empty(&row_count.name, &IDX_DTYPE))?;
if let Some(ref row_index) = self.row_index {
df.insert_column(0, Series::new_empty(&row_index.name, &IDX_DTYPE))?;
}
return Ok(df);
}
Expand Down Expand Up @@ -618,8 +618,8 @@ impl<'a> CoreReader<'a> {
.collect::<PolarsResult<_>>()?,
);
let current_row_count = local_df.height() as IdxSize;
if let Some(rc) = &self.row_count {
local_df.with_row_count_mut(&rc.name, Some(rc.offset));
if let Some(rc) = &self.row_index {
local_df.with_row_index_mut(&rc.name, Some(rc.offset));
};

cast_columns(&mut local_df, &self.to_cast, false, self.ignore_errors)?;
Expand All @@ -638,7 +638,7 @@ impl<'a> CoreReader<'a> {
.collect::<PolarsResult<Vec<_>>>()
})?;
let mut dfs = flatten(&dfs, None);
if self.row_count.is_some() {
if self.row_index.is_some() {
update_row_counts(&mut dfs, 0)
}
accumulate_dataframes_vertical(dfs.into_iter().map(|t| t.0))
Expand Down Expand Up @@ -687,8 +687,8 @@ impl<'a> CoreReader<'a> {
}

cast_columns(&mut df, &self.to_cast, false, self.ignore_errors)?;
if let Some(rc) = &self.row_count {
df.with_row_count_mut(&rc.name, Some(rc.offset));
if let Some(rc) = &self.row_index {
df.with_row_index_mut(&rc.name, Some(rc.offset));
}
let n_read = df.height() as IdxSize;
Ok((df, n_read))
Expand Down Expand Up @@ -738,15 +738,15 @@ impl<'a> CoreReader<'a> {
};

cast_columns(&mut df, &self.to_cast, false, self.ignore_errors)?;
if let Some(rc) = &self.row_count {
df.with_row_count_mut(&rc.name, Some(rc.offset));
if let Some(rc) = &self.row_index {
df.with_row_index_mut(&rc.name, Some(rc.offset));
}
let n_read = df.height() as IdxSize;
(df, n_read)
});
}
}
if self.row_count.is_some() {
if self.row_index.is_some() {
update_row_counts(&mut dfs, 0)
}
accumulate_dataframes_vertical(dfs.into_iter().map(|t| t.0))
Expand Down
16 changes: 8 additions & 8 deletions crates/polars-io/src/ipc/ipc_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use super::{finish_reader, ArrowReader};
use crate::mmap::MmapBytesReader;
use crate::predicates::PhysicalIoExpr;
use crate::prelude::*;
use crate::RowCount;
use crate::RowIndex;

/// Read Arrows IPC format into a DataFrame
///
Expand All @@ -71,7 +71,7 @@ pub struct IpcReader<R: MmapBytesReader> {
pub(super) n_rows: Option<usize>,
pub(super) projection: Option<Vec<usize>>,
pub(crate) columns: Option<Vec<String>>,
pub(super) row_count: Option<RowCount>,
pub(super) row_index: Option<RowIndex>,
memmap: bool,
metadata: Option<read::FileMetadata>,
schema: Option<ArrowSchemaRef>,
Expand Down Expand Up @@ -127,9 +127,9 @@ impl<R: MmapBytesReader> IpcReader<R> {
self
}

/// Add a `row_count` column.
pub fn with_row_count(mut self, row_count: Option<RowCount>) -> Self {
self.row_count = row_count;
/// Add a row index column.
pub fn with_row_index(mut self, row_index: Option<RowIndex>) -> Self {
self.row_index = row_index;
self
}

Expand Down Expand Up @@ -173,7 +173,7 @@ impl<R: MmapBytesReader> IpcReader<R> {

let reader = read::FileReader::new(self.reader, metadata, self.projection, self.n_rows);

finish_reader(reader, rechunk, None, predicate, &schema, self.row_count)
finish_reader(reader, rechunk, None, predicate, &schema, self.row_index)
}
}

Expand All @@ -194,7 +194,7 @@ impl<R: MmapBytesReader> SerReader<R> for IpcReader<R> {
n_rows: None,
columns: None,
projection: None,
row_count: None,
row_index: None,
memmap: true,
metadata: None,
schema: None,
Expand Down Expand Up @@ -230,6 +230,6 @@ impl<R: MmapBytesReader> SerReader<R> for IpcReader<R> {

let ipc_reader =
read::FileReader::new(self.reader, metadata.clone(), self.projection, self.n_rows);
finish_reader(ipc_reader, rechunk, None, None, &schema, self.row_count)
finish_reader(ipc_reader, rechunk, None, None, &schema, self.row_index)
}
}
28 changes: 16 additions & 12 deletions crates/polars-io/src/ipc/ipc_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ pub struct IpcStreamReader<R> {
n_rows: Option<usize>,
projection: Option<Vec<usize>>,
columns: Option<Vec<String>>,
row_count: Option<RowCount>,
row_index: Option<RowIndex>,
metadata: Option<StreamMetadata>,
}

Expand All @@ -95,9 +95,9 @@ impl<R: Read> IpcStreamReader<R> {
self
}

/// Add a `row_count` column.
pub fn with_row_count(mut self, row_count: Option<RowCount>) -> Self {
self.row_count = row_count;
/// Add a row index column.
pub fn with_row_index(mut self, row_index: Option<RowIndex>) -> Self {
self.row_index = row_index;
self
}

Expand Down Expand Up @@ -146,7 +146,7 @@ where
n_rows: None,
columns: None,
projection: None,
row_count: None,
row_index: None,
metadata: None,
}
}
Expand Down Expand Up @@ -177,7 +177,7 @@ where
metadata.schema.clone()
};

let include_row_count = self.row_count.is_some();
let include_row_index = self.row_index.is_some();
let ipc_reader =
read::StreamReader::new(&mut self.reader, metadata.clone(), sorted_projection);
finish_reader(
Expand All @@ -186,23 +186,27 @@ where
self.n_rows,
None,
&schema,
self.row_count,
self.row_index,
)
.map(|df| fix_column_order(df, self.projection, include_row_count))
.map(|df| fix_column_order(df, self.projection, include_row_index))
}
}

fn fix_column_order(df: DataFrame, projection: Option<Vec<usize>>, row_count: bool) -> DataFrame {
fn fix_column_order(
df: DataFrame,
projection: Option<Vec<usize>>,
include_row_index: bool,
) -> DataFrame {
if let Some(proj) = projection {
let offset = usize::from(row_count);
let offset = usize::from(include_row_index);
let mut args = (0..proj.len()).zip(proj).collect::<Vec<_>>();
// first el of tuple is argument index
// second el is the projection index
args.sort_unstable_by_key(|tpl| tpl.1);
let cols = df.get_columns();

let iter = args.iter().map(|tpl| cols[tpl.0 + offset].clone());
let cols = if row_count {
let cols = if include_row_index {
let mut new_cols = vec![df.get_columns()[0].clone()];
new_cols.extend(iter);
new_cols
Expand Down Expand Up @@ -242,7 +246,7 @@ pub struct IpcStreamWriter<W> {

use polars_core::frame::ArrowChunk;

use crate::RowCount;
use crate::RowIndex;

impl<W> IpcStreamWriter<W> {
/// Set the compression used. Defaults to None.
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-io/src/ipc/mmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ impl<R: MmapBytesReader> IpcReader<R> {
self.n_rows,
predicate,
&schema,
self.row_count.clone(),
self.row_index.clone(),
)
},
None => polars_bail!(ComputeError: "cannot memory-map, you must provide a file"),
Expand Down
6 changes: 3 additions & 3 deletions crates/polars-io/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ pub(crate) fn finish_reader<R: ArrowReader>(
n_rows: Option<usize>,
predicate: Option<Arc<dyn PhysicalIoExpr>>,
arrow_schema: &ArrowSchema,
row_count: Option<RowCount>,
row_index: Option<RowIndex>,
) -> PolarsResult<DataFrame> {
use polars_core::utils::accumulate_dataframes_vertical;

Expand All @@ -106,8 +106,8 @@ pub(crate) fn finish_reader<R: ArrowReader>(
num_rows += batch.len();
let mut df = DataFrame::try_from((batch, arrow_schema.fields.as_slice()))?;

if let Some(rc) = &row_count {
df.with_row_count_mut(&rc.name, Some(current_num_rows + rc.offset));
if let Some(rc) = &row_index {
df.with_row_index_mut(&rc.name, Some(current_num_rows + rc.offset));
}

if let Some(predicate) = &predicate {
Expand Down
Loading

0 comments on commit 130c48f

Please sign in to comment.