Skip to content

Commit

Permalink
refactor(rust!): Move IO-related options structs to polars-io (#15806)
Browse files Browse the repository at this point in the history
  • Loading branch information
stinodego authored Apr 21, 2024
1 parent 53c8ec9 commit d58b912
Show file tree
Hide file tree
Showing 21 changed files with 138 additions and 131 deletions.
4 changes: 2 additions & 2 deletions crates/polars-io/src/csv/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ use std::fs::File;
use std::io::Write;
use std::path::PathBuf;

pub use parser::count_rows;
pub use parser::{count_rows, CsvParserOptions};
use polars_core::prelude::*;
#[cfg(feature = "temporal")]
use polars_time::prelude::*;
Expand All @@ -63,7 +63,7 @@ use rayon::prelude::*;
pub use read::{CommentPrefix, CsvEncoding, CsvReader, NullValues};
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};
pub use write::{BatchedWriter, CsvWriter, QuoteStyle};
pub use write::{BatchedWriter, CsvWriter, CsvWriterOptions, QuoteStyle};
pub use write_impl::SerializeOptions;

use crate::csv::read_impl::CoreReader;
Expand Down
27 changes: 27 additions & 0 deletions crates/polars-io/src/csv/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,41 @@ use polars_core::POOL;
use polars_utils::index::Bounded;
use polars_utils::slice::GetSaferUnchecked;
use rayon::prelude::*;
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};

use super::buffer::*;
use super::read::{CsvEncoding, NullValues};
use crate::csv::read::NullValuesCompiled;
use crate::csv::splitfields::SplitFields;
use crate::csv::utils::get_file_chunks;
use crate::csv::CommentPrefix;
use crate::utils::get_reader_bytes;

#[derive(Clone, Debug, PartialEq, Eq, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct CsvParserOptions {
pub comment_prefix: Option<CommentPrefix>,
pub quote_char: Option<u8>,
pub skip_rows: usize,
pub encoding: CsvEncoding,
pub skip_rows_after_header: usize,
pub infer_schema_length: Option<usize>,
pub n_threads: Option<usize>,
pub try_parse_dates: bool,
pub raise_if_empty: bool,
pub truncate_ragged_lines: bool,
pub low_memory: bool,
pub ignore_errors: bool,
pub has_header: bool,
pub eol_char: u8,
pub separator: u8,
pub schema_overwrite: Option<SchemaRef>,
pub schema: Option<SchemaRef>,
pub null_values: Option<NullValues>,
pub decimal_float: bool,
}

/// Read the number of rows without parsing columns
/// useful for count(*) queries
pub fn count_rows(
Expand Down
23 changes: 23 additions & 0 deletions crates/polars-io/src/csv/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,29 @@ use serde::{Deserialize, Serialize};

use super::*;

#[derive(Clone, Debug, PartialEq, Eq, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct CsvWriterOptions {
pub include_bom: bool,
pub include_header: bool,
pub batch_size: NonZeroUsize,
pub maintain_order: bool,
pub serialize_options: SerializeOptions,
}

#[cfg(feature = "csv")]
impl Default for CsvWriterOptions {
fn default() -> Self {
Self {
include_bom: false,
include_header: true,
batch_size: NonZeroUsize::new(1024).unwrap(),
maintain_order: false,
serialize_options: SerializeOptions::default(),
}
}
}

#[derive(Copy, Clone, Default, Eq, Hash, PartialEq, Debug)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub enum QuoteStyle {
Expand Down
8 changes: 8 additions & 0 deletions crates/polars-io/src/ipc/ipc_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,21 @@ use arrow::datatypes::ArrowSchemaRef;
use arrow::io::ipc::read;
use polars_core::frame::ArrowChunk;
use polars_core::prelude::*;
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};

use super::{finish_reader, ArrowReader};
use crate::mmap::MmapBytesReader;
use crate::predicates::PhysicalIoExpr;
use crate::prelude::*;
use crate::RowIndex;

#[derive(Clone, Debug, PartialEq, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct IpcScanOptions {
pub memory_map: bool,
}

/// Read Arrows IPC format into a DataFrame
///
/// # Example
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-io/src/ipc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ mod write;
mod write_async;

#[cfg(feature = "ipc")]
pub use ipc_file::IpcReader;
pub use ipc_file::{IpcReader, IpcScanOptions};
#[cfg(feature = "ipc_streaming")]
pub use ipc_stream::*;
pub use write::{BatchedWriter, IpcCompression, IpcWriter, IpcWriterOption};
pub use write::{BatchedWriter, IpcCompression, IpcWriter, IpcWriterOption, IpcWriterOptions};

#[cfg(feature = "cloud")]
mod ipc_reader_async;
Expand Down
9 changes: 9 additions & 0 deletions crates/polars-io/src/ipc/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,15 @@ use serde::{Deserialize, Serialize};
use crate::prelude::*;
use crate::WriterFactory;

#[derive(Copy, Clone, Debug, PartialEq, Eq, Default, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct IpcWriterOptions {
/// Data page compression
pub compression: Option<IpcCompression>,
/// maintain the order the data was processed
pub maintain_order: bool,
}

/// Write a DataFrame to Arrow's IPC format
///
/// # Example
Expand Down
9 changes: 9 additions & 0 deletions crates/polars-io/src/json/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,20 @@ use polars_core::error::to_compute_err;
use polars_core::prelude::*;
use polars_core::utils::try_get_supertype;
use polars_json::json::write::FallibleStreamingIterator;
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};
use simd_json::BorrowedValue;

use crate::mmap::{MmapBytesReader, ReaderBytes};
use crate::prelude::*;

#[derive(Copy, Clone, Debug, PartialEq, Eq, Default, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct JsonWriterOptions {
/// maintain the order the data was processed
pub maintain_order: bool,
}

/// The format to use to write the DataFrame to JSON: `Json` (a JSON array) or `JsonLines` (each row output on a
/// separate line). In either case, each row is serialized as a JSON object whose keys are the column names and whose
/// values are the row's corresponding values.
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-io/src/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use std::borrow::Cow;

pub use polars_parquet::write::FileMetaData;
pub use read::*;
pub use write::{BrotliLevel, GzipLevel, ZstdLevel, *};
pub use write::{BrotliLevel, GzipLevel, ParquetWriteOptions, ZstdLevel, *};

use crate::parquet::read_impl::materialize_hive_partitions;
use crate::utils::apply_projection;
Expand Down
8 changes: 8 additions & 0 deletions crates/polars-io/src/parquet/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,14 @@ use crate::predicates::PhysicalIoExpr;
use crate::prelude::*;
use crate::RowIndex;

#[derive(Clone, Debug, PartialEq, Eq, Copy, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct ParquetOptions {
pub parallel: ParallelStrategy,
pub low_memory: bool,
pub use_statistics: bool,
}

#[derive(Copy, Clone, Debug, Eq, PartialEq, Default, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub enum ParallelStrategy {
Expand Down
15 changes: 15 additions & 0 deletions crates/polars-io/src/parquet/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,21 @@ use write::{

use crate::prelude::chunk_df_for_writing;

#[derive(Copy, Clone, Debug, PartialEq, Eq, Default, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct ParquetWriteOptions {
/// Data page compression
pub compression: ParquetCompression,
/// Compute and write column statistics.
pub statistics: bool,
/// If `None` will be all written to a single row group.
pub row_group_size: Option<usize>,
/// if `None` will be 1024^2 bytes
pub data_pagesize_limit: Option<usize>,
/// maintain the order the data was processed
pub maintain_order: bool,
}

#[derive(Debug, Eq, PartialEq, Hash, Clone, Copy)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct GzipLevel(u8);
Expand Down
16 changes: 8 additions & 8 deletions crates/polars-lazy/src/prelude.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
#[cfg(feature = "csv")]
pub use polars_io::csv::CsvWriterOptions;
#[cfg(feature = "ipc")]
pub use polars_io::ipc::IpcWriterOptions;
#[cfg(feature = "json")]
pub use polars_io::json::JsonWriterOptions;
#[cfg(feature = "parquet")]
pub use polars_io::parquet::ParquetWriteOptions;
pub use polars_ops::prelude::{JoinArgs, JoinType, JoinValidation};
#[cfg(feature = "rank")]
pub use polars_ops::prelude::{RankMethod, RankOptions};
pub use polars_plan::logical_plan::{
AnonymousScan, AnonymousScanArgs, AnonymousScanOptions, DslPlan, Literal, LiteralValue, Null,
NULL,
};
#[cfg(feature = "csv")]
pub use polars_plan::prelude::CsvWriterOptions;
#[cfg(feature = "ipc")]
pub use polars_plan::prelude::IpcWriterOptions;
#[cfg(feature = "json")]
pub use polars_plan::prelude::JsonWriterOptions;
#[cfg(feature = "parquet")]
pub use polars_plan::prelude::ParquetWriteOptions;
pub(crate) use polars_plan::prelude::*;
#[cfg(feature = "rolling_window")]
pub use polars_time::{prelude::RollingOptions, Duration};
Expand Down
1 change: 1 addition & 0 deletions crates/polars-lazy/src/scan/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::path::{Path, PathBuf};

use polars_core::prelude::*;
use polars_io::cloud::CloudOptions;
use polars_io::ipc::IpcScanOptions;
use polars_io::RowIndex;

use crate::prelude::*;
Expand Down
3 changes: 1 addition & 2 deletions crates/polars-pipe/src/executors/sinks/output/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@ use std::path::Path;

use crossbeam_channel::bounded;
use polars_core::prelude::*;
use polars_io::csv::CsvWriter;
use polars_io::csv::{CsvWriter, CsvWriterOptions};
use polars_io::SerWriter;
use polars_plan::prelude::CsvWriterOptions;

use crate::executors::sinks::output::file_sink::{init_writer_thread, FilesSink, SinkWriter};
use crate::pipeline::morsels_per_sink;
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-pipe/src/executors/sinks/output/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ use std::path::Path;

use crossbeam_channel::bounded;
use polars_core::prelude::*;
use polars_io::ipc::IpcWriterOptions;
use polars_io::prelude::*;
use polars_plan::prelude::IpcWriterOptions;

use crate::executors::sinks::output::file_sink::{init_writer_thread, FilesSink, SinkWriter};
use crate::pipeline::morsels_per_sink;
Expand Down
3 changes: 1 addition & 2 deletions crates/polars-pipe/src/executors/sinks/output/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@ use std::path::Path;

use crossbeam_channel::bounded;
use polars_core::prelude::*;
use polars_io::json::BatchedWriter;
use polars_plan::prelude::JsonWriterOptions;
use polars_io::json::{BatchedWriter, JsonWriterOptions};

use crate::executors::sinks::output::file_sink::{init_writer_thread, FilesSink, SinkWriter};
use crate::pipeline::morsels_per_sink;
Expand Down
3 changes: 1 addition & 2 deletions crates/polars-pipe/src/executors/sinks/output/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ use std::thread::JoinHandle;

use crossbeam_channel::{bounded, Receiver, Sender};
use polars_core::prelude::*;
use polars_io::parquet::{BatchedWriter, ParquetWriter, RowGroupIter};
use polars_plan::prelude::ParquetWriteOptions;
use polars_io::parquet::{BatchedWriter, ParquetWriteOptions, ParquetWriter, RowGroupIter};

use crate::executors::sinks::output::file_sink::{init_writer_thread, FilesSink, SinkWriter};
use crate::operators::{DataChunk, FinalizedSink, PExecutionContext, Sink, SinkResult};
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-pipe/src/executors/sources/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ use std::path::PathBuf;
use polars_core::export::arrow::Either;
use polars_core::POOL;
use polars_io::csv::read_impl::{BatchedCsvReaderMmap, BatchedCsvReaderRead};
use polars_io::csv::{CsvEncoding, CsvReader};
use polars_io::csv::{CsvEncoding, CsvParserOptions, CsvReader};
use polars_plan::global::_set_n_rows_for_scan;
use polars_plan::prelude::{CsvParserOptions, FileScanOptions};
use polars_plan::prelude::FileScanOptions;
use polars_utils::iter::EnumerateIdxTrait;

use super::*;
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-pipe/src/executors/sources/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use polars_core::error::*;
use polars_core::prelude::Series;
use polars_core::POOL;
use polars_io::cloud::CloudOptions;
use polars_io::parquet::{BatchedParquetReader, FileMetaData, ParquetReader};
use polars_io::parquet::{BatchedParquetReader, FileMetaData, ParquetOptions, ParquetReader};
use polars_io::pl_async::get_runtime;
use polars_io::predicates::PhysicalIoExpr;
use polars_io::prelude::materialize_projection;
Expand All @@ -18,7 +18,7 @@ use polars_io::prelude::ParquetAsyncReader;
use polars_io::utils::check_projected_arrow_schema;
use polars_io::{is_cloud_url, SerReader};
use polars_plan::logical_plan::FileInfo;
use polars_plan::prelude::{FileScanOptions, ParquetOptions};
use polars_plan::prelude::FileScanOptions;
use polars_utils::iter::EnumerateIdxTrait;
use polars_utils::IdxSize;

Expand Down
8 changes: 6 additions & 2 deletions crates/polars-plan/src/logical_plan/builder_dsl.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
use polars_core::prelude::*;
#[cfg(feature = "parquet")]
use polars_io::cloud::CloudOptions;
#[cfg(feature = "csv")]
use polars_io::csv::{CommentPrefix, CsvEncoding, CsvParserOptions, NullValues};
#[cfg(feature = "ipc")]
use polars_io::ipc::IpcScanOptions;
#[cfg(feature = "parquet")]
use polars_io::parquet::ParquetOptions;
use polars_io::HiveOptions;
#[cfg(any(
feature = "parquet",
Expand All @@ -9,8 +15,6 @@ use polars_io::HiveOptions;
feature = "ipc"
))]
use polars_io::RowIndex;
#[cfg(feature = "csv")]
use polars_io::{csv::CommentPrefix, csv::CsvEncoding, csv::NullValues};

use crate::constants::UNLIMITED_CACHE;
use crate::logical_plan::expr_expansion::rewrite_projections;
Expand Down
6 changes: 6 additions & 0 deletions crates/polars-plan/src/logical_plan/file_scan.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
use std::hash::{Hash, Hasher};

#[cfg(feature = "csv")]
use polars_io::csv::CsvParserOptions;
#[cfg(feature = "ipc")]
use polars_io::ipc::IpcScanOptions;
#[cfg(feature = "parquet")]
use polars_io::parquet::ParquetOptions;
#[cfg(feature = "parquet")]
use polars_parquet::write::FileMetaData;

Expand Down
Loading

0 comments on commit d58b912

Please sign in to comment.