Skip to content

Commit 28fab0d

Browse files
committed
parquet2 implementation backed by parquet2 feature gate
decouple core from arrow
1 parent abd0aaf commit 28fab0d

File tree

21 files changed

+1796
-686
lines changed

21 files changed

+1796
-686
lines changed

Cargo.lock

Lines changed: 82 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rust/Cargo.toml

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,10 +54,13 @@ async-stream = { version = "0.3.2", default-features = true, optional = true }
5454
# High-level writer
5555
parquet-format = "~2.6.1"
5656

57-
arrow = { git = "https://github.com/apache/arrow-rs", rev = "615d7830fdaf994ebd32ca1e349daf68b18c99b0" }
58-
parquet = { git = "https://github.com/apache/arrow-rs", rev = "615d7830fdaf994ebd32ca1e349daf68b18c99b0" }
57+
arrow = { git = "https://github.com/apache/arrow-rs", rev = "615d7830fdaf994ebd32ca1e349daf68b18c99b0", optional = true }
58+
parquet = { git = "https://github.com/apache/arrow-rs", rev = "615d7830fdaf994ebd32ca1e349daf68b18c99b0", optional = true }
5959
datafusion = { git = "https://github.com/houqp/arrow-datafusion", rev = "af34cec956c8d67b2520a266e74683d7bdcb3099", optional = true }
6060

61+
arrow2 = { version = "0", optional = true }
62+
parquet2 = { version = "0", optional = true}
63+
6164
crossbeam = { version = "0", optional = true }
6265

6366
cfg-if = "1"
@@ -68,6 +71,7 @@ async-trait = "0.1"
6871
# rust-dataframe = {version = "0.*", optional = true }
6972

7073
[features]
74+
default = ["arrow", "parquet"]
7175
rust-dataframe-ext = []
7276
datafusion-ext = ["datafusion"]
7377
azure = ["azure_core", "azure_storage", "reqwest"]

rust/src/action.rs renamed to rust/src/action/mod.rs

Lines changed: 71 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,12 @@
22
33
#![allow(non_snake_case, non_camel_case_types)]
44

5+
#[cfg(feature = "parquet")]
56
use parquet::record::{ListAccessor, MapAccessor, RowAccessor};
7+
8+
#[cfg(feature = "parquet2")]
9+
pub mod parquet2_read;
10+
611
use percent_encoding::percent_decode;
712
use serde::{Deserialize, Serialize};
813
use serde_json::{Map, Value};
@@ -24,8 +29,17 @@ pub enum ActionError {
2429
/// A generic action error. The wrapped error string describes the details.
2530
#[error("Generic action error: {0}")]
2631
Generic(String),
32+
/// Error returned when parsing checkpoint parquet using parquet2 crate.
33+
#[cfg(feature = "parquet2")]
34+
#[error("Failed to parse parquet checkpoint: {}", .source)]
35+
ParquetParseError {
36+
/// Parquet error details returned when parsing the checkpoint parquet
37+
#[from]
38+
source: parquet2_read::ParseError,
39+
},
2740
}
2841

42+
#[cfg(feature = "parquet")]
2943
fn populate_hashmap_with_option_from_parquet_map(
3044
map: &mut HashMap<String, Option<String>>,
3145
pmap: &parquet::record::Map,
@@ -49,6 +63,7 @@ fn populate_hashmap_with_option_from_parquet_map(
4963
Ok(())
5064
}
5165

66+
#[cfg(feature = "parquet")]
5267
fn gen_action_type_error(action: &str, field: &str, expected_type: &str) -> ActionError {
5368
ActionError::InvalidField(format!(
5469
"type for {} in {} action should be {}",
@@ -145,9 +160,18 @@ pub struct StatsParsed {
145160

146161
// start of per column stats
147162
/// Contains a value smaller than all values present in the file for all columns.
163+
#[cfg(feature = "parquet")]
148164
pub min_values: HashMap<String, parquet::record::Field>,
165+
/// Contains a value smaller than all values present in the file for all columns.
166+
#[cfg(feature = "parquet2")]
167+
pub min_values: HashMap<String, String>,
168+
/// Contains a value larger than all values present in the file for all columns.
169+
#[cfg(feature = "parquet")]
149170
/// Contains a value larger than all values present in the file for all columns.
150171
pub max_values: HashMap<String, parquet::record::Field>,
172+
#[cfg(feature = "parquet2")]
173+
/// Contains a value larger than all values present in the file for all columns.
174+
pub max_values: HashMap<String, String>,
151175
/// The number of null values for all columns.
152176
pub null_count: HashMap<String, DeltaDataTypeLong>,
153177
}
@@ -169,8 +193,19 @@ pub struct Add {
169193
/// column can be omitted.
170194
///
171195
/// This field is only available in add action records read from checkpoints
196+
#[cfg(feature = "parquet")]
172197
#[serde(skip_serializing, skip_deserializing)]
173198
pub partition_values_parsed: Option<parquet::record::Row>,
199+
/// Partition values stored in raw parquet struct format. In this struct, the column names
200+
/// correspond to the partition columns and the values are stored in their corresponding data
201+
/// type. This is a required field when the table is partitioned and the table property
202+
/// delta.checkpoint.writeStatsAsStruct is set to true. If the table is not partitioned, this
203+
/// column can be omitted.
204+
///
205+
/// This field is only available in add action records read from checkpoints
206+
#[cfg(feature = "parquet2")]
207+
#[serde(skip_serializing, skip_deserializing)]
208+
pub partition_values_parsed: Option<String>,
174209
/// The time this file was created, as milliseconds since the epoch
175210
pub modification_time: DeltaDataTypeTimestamp,
176211
/// When false the file must already be present in the table or the records in the added file
@@ -186,13 +221,23 @@ pub struct Add {
186221
/// table property: delta.checkpoint.writeStatsAsStruct is set to true.
187222
///
188223
/// This field is only available in add action records read from checkpoints
224+
#[cfg(feature = "parquet")]
189225
#[serde(skip_serializing, skip_deserializing)]
190226
pub stats_parsed: Option<parquet::record::Row>,
227+
/// Contains statistics (e.g., count, min/max values for columns) about the data in this file in
228+
/// raw parquet format. This field needs to be written when statistics are available and the
229+
/// table property: delta.checkpoint.writeStatsAsStruct is set to true.
230+
///
231+
/// This field is only available in add action records read from checkpoints
232+
#[cfg(feature = "parquet2")]
233+
#[serde(skip_serializing, skip_deserializing)]
234+
pub stats_parsed: Option<String>,
191235
/// Map containing metadata about this file
192236
pub tags: Option<HashMap<String, Option<String>>>,
193237
}
194238

195239
impl Add {
240+
#[cfg(feature = "parquet")]
196241
fn from_parquet_record(record: &parquet::record::Row) -> Result<Self, ActionError> {
197242
let mut re = Self {
198243
..Default::default()
@@ -306,6 +351,7 @@ impl Add {
306351

307352
/// Returns the composite HashMap representation of stats contained in the action if present.
308353
/// Since stats are defined as optional in the protocol, this may be None.
354+
#[cfg(feature = "parquet")]
309355
pub fn get_stats_parsed(&self) -> Result<Option<StatsParsed>, parquet::errors::ParquetError> {
310356
self.stats_parsed.as_ref().map_or(Ok(None), |record| {
311357
let mut stats = StatsParsed::default();
@@ -428,6 +474,7 @@ pub struct MetaData {
428474
}
429475

430476
impl MetaData {
477+
#[cfg(feature = "parquet")]
431478
fn from_parquet_record(record: &parquet::record::Row) -> Result<Self, ActionError> {
432479
let mut re = Self {
433480
..Default::default()
@@ -548,7 +595,7 @@ impl MetaData {
548595

549596
/// Represents a tombstone (deleted file) in the Delta log.
550597
/// This is a top-level action in Delta log entries.
551-
#[derive(Serialize, Deserialize, Clone, Eq, Debug, Default)]
598+
#[derive(Serialize, Deserialize, Clone, Eq, Debug)]
552599
#[serde(rename_all = "camelCase")]
553600
pub struct Remove {
554601
/// The path of the file that is removed from the table.
@@ -597,13 +644,24 @@ impl PartialEq for Remove {
597644
}
598645
}
599646

600-
impl Remove {
601-
fn from_parquet_record(record: &parquet::record::Row) -> Result<Self, ActionError> {
602-
let mut re = Self {
647+
impl Default for Remove {
648+
fn default() -> Self {
649+
Remove {
650+
path: String::default(),
651+
deletion_timestamp: None,
603652
data_change: true,
604653
extended_file_metadata: Some(false),
605-
..Default::default()
606-
};
654+
partition_values: None,
655+
size: None,
656+
tags: None,
657+
}
658+
}
659+
}
660+
661+
impl Remove {
662+
#[cfg(feature = "parquet")]
663+
fn from_parquet_record(record: &parquet::record::Row) -> Result<Self, ActionError> {
664+
let mut re = Self::default();
607665

608666
for (i, (name, _)) in record.get_column_iter().enumerate() {
609667
match name.as_str() {
@@ -698,6 +756,7 @@ pub struct Txn {
698756
}
699757

700758
impl Txn {
759+
#[cfg(feature = "parquet")]
701760
fn from_parquet_record(record: &parquet::record::Row) -> Result<Self, ActionError> {
702761
let mut re = Self {
703762
..Default::default()
@@ -747,6 +806,7 @@ pub struct Protocol {
747806
}
748807

749808
impl Protocol {
809+
#[cfg(feature = "parquet")]
750810
fn from_parquet_record(record: &parquet::record::Row) -> Result<Self, ActionError> {
751811
let mut re = Self {
752812
..Default::default()
@@ -801,6 +861,7 @@ pub enum Action {
801861
impl Action {
802862
/// Returns an action from the given parquet Row. Used when deserializing delta log parquet
803863
/// checkpoints.
864+
#[cfg(feature = "parquet")]
804865
pub fn from_parquet_record(
805866
schema: &parquet::schema::types::Type,
806867
record: &parquet::record::Row,
@@ -903,11 +964,13 @@ pub enum OutputMode {
903964
#[cfg(test)]
904965
mod tests {
905966
use super::*;
906-
use parquet::file::reader::{FileReader, SerializedFileReader};
907-
use std::fs::File;
908967

968+
#[cfg(feature = "parquet")]
909969
#[test]
910970
fn test_add_action_without_partition_values_and_stats() {
971+
use parquet::file::reader::{FileReader, SerializedFileReader};
972+
use std::fs::File;
973+
911974
let path = "./tests/data/delta-0.2.0/_delta_log/00000000000000000003.checkpoint.parquet";
912975
let preader = SerializedFileReader::new(File::open(path).unwrap()).unwrap();
913976

0 commit comments

Comments
 (0)