Skip to content

Commit 0c33129

Browse files
authored
parquet-rewrite: add write_batch_size and compression_level config (#8642)
# Which issue does this PR close? - Closes #8639 # Rationale for this change add write_batch_size config and change compression to use parquet Compression # What changes are included in this PR? add write_batch_size config and change compression to use parquet Compression # Are these changes tested? I've try these command by myself. # Are there any user-facing changes? Yeah 1. zstd level previously is default 1, not change to 3 2. str zStd might not pass
1 parent 2f96204 commit 0c33129

File tree

1 file changed

+40
-14
lines changed

1 file changed

+40
-14
lines changed

parquet/src/bin/parquet-rewrite.rs

Lines changed: 40 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ use arrow_array::RecordBatchReader;
3939
use clap::{Parser, ValueEnum, builder::PossibleValue};
4040
use parquet::{
4141
arrow::{ArrowWriter, arrow_reader::ParquetRecordBatchReaderBuilder},
42-
basic::{Compression, Encoding},
42+
basic::{BrotliLevel, Compression, Encoding, GzipLevel, ZstdLevel},
4343
file::{
4444
properties::{BloomFilterPosition, EnabledStatistics, WriterProperties, WriterVersion},
4545
reader::FileReader,
@@ -74,18 +74,31 @@ enum CompressionArgs {
7474
Lz4Raw,
7575
}
7676

77-
impl From<CompressionArgs> for Compression {
78-
fn from(value: CompressionArgs) -> Self {
79-
match value {
80-
CompressionArgs::None => Self::UNCOMPRESSED,
81-
CompressionArgs::Snappy => Self::SNAPPY,
82-
CompressionArgs::Gzip => Self::GZIP(Default::default()),
83-
CompressionArgs::Lzo => Self::LZO,
84-
CompressionArgs::Brotli => Self::BROTLI(Default::default()),
85-
CompressionArgs::Lz4 => Self::LZ4,
86-
CompressionArgs::Zstd => Self::ZSTD(Default::default()),
87-
CompressionArgs::Lz4Raw => Self::LZ4_RAW,
88-
}
77+
fn compression_from_args(codec: CompressionArgs, level: Option<u32>) -> Compression {
78+
match codec {
79+
CompressionArgs::None => Compression::UNCOMPRESSED,
80+
CompressionArgs::Snappy => Compression::SNAPPY,
81+
CompressionArgs::Gzip => match level {
82+
Some(lvl) => {
83+
Compression::GZIP(GzipLevel::try_new(lvl).expect("invalid gzip compression level"))
84+
}
85+
None => Compression::GZIP(Default::default()),
86+
},
87+
CompressionArgs::Lzo => Compression::LZO,
88+
CompressionArgs::Brotli => match level {
89+
Some(lvl) => Compression::BROTLI(
90+
BrotliLevel::try_new(lvl).expect("invalid brotli compression level"),
91+
),
92+
None => Compression::BROTLI(Default::default()),
93+
},
94+
CompressionArgs::Lz4 => Compression::LZ4,
95+
CompressionArgs::Zstd => match level {
96+
Some(lvl) => Compression::ZSTD(
97+
ZstdLevel::try_new(lvl as i32).expect("invalid zstd compression level"),
98+
),
99+
None => Compression::ZSTD(Default::default()),
100+
},
101+
CompressionArgs::Lz4Raw => Compression::LZ4_RAW,
89102
}
90103
}
91104

@@ -219,6 +232,10 @@ struct Args {
219232
#[clap(long, value_enum)]
220233
compression: Option<CompressionArgs>,
221234

235+
/// Compression level for gzip/brotli/zstd.
236+
#[clap(long)]
237+
compression_level: Option<u32>,
238+
222239
/// Encoding used for all columns, if dictionary is not enabled.
223240
#[clap(long, value_enum)]
224241
encoding: Option<EncodingArgs>,
@@ -286,6 +303,10 @@ struct Args {
286303
#[clap(long)]
287304
writer_version: Option<WriterVersionArgs>,
288305

306+
/// Sets write batch size.
307+
#[clap(long)]
308+
write_batch_size: Option<usize>,
309+
289310
/// Sets whether to coerce Arrow types to match Parquet specification
290311
#[clap(long)]
291312
coerce_types: Option<bool>,
@@ -313,8 +334,10 @@ fn main() {
313334
.expect("parquet open");
314335

315336
let mut writer_properties_builder = WriterProperties::builder().set_key_value_metadata(kv_md);
337+
316338
if let Some(value) = args.compression {
317-
writer_properties_builder = writer_properties_builder.set_compression(value.into());
339+
let compression = compression_from_args(value, args.compression_level);
340+
writer_properties_builder = writer_properties_builder.set_compression(compression);
318341
}
319342

320343
// setup encoding
@@ -382,6 +405,9 @@ fn main() {
382405
if let Some(value) = args.coerce_types {
383406
writer_properties_builder = writer_properties_builder.set_coerce_types(value);
384407
}
408+
if let Some(value) = args.write_batch_size {
409+
writer_properties_builder = writer_properties_builder.set_write_batch_size(value);
410+
}
385411
let writer_properties = writer_properties_builder.build();
386412
let mut parquet_writer = ArrowWriter::try_new(
387413
File::create(&args.output).expect("Unable to open output file"),

0 commit comments

Comments
 (0)