Skip to content

Commit 56e9c86

Browse files
paleolimbotetseidl
andauthored
Support writing GeospatialStatistics in Parquet writer (#8524)
# Which issue does this PR close? - Closes #8523. # Rationale for this change One of the primary reasons the GeoParquet community was excited about first-class Parquet Geometry/Geography support was the built-in column chunk statistics (we had a workaround that involved adding a struct column, but it was difficult for non-spatial readers to use it and very difficult for non-spatial writers to write it). This PR ensures it is possible for arrow-rs to write files that include those statistics. # What changes are included in this PR? This PR inserts the minimum required change to enable this support. # Are these changes tested? Yes! # Are there any user-facing changes? There are several new functions (which include documentation). Previously it was difficult or impossible to actually write Geometry or Geography logical types, and so it is unlikely any previous usage would be affected. --------- Co-authored-by: Ed Seidl <etseidl@users.noreply.github.com>
1 parent 02fa779 commit 56e9c86

File tree

15 files changed

+935
-59
lines changed

15 files changed

+935
-59
lines changed

parquet-geospatial/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ rust-version = { workspace = true }
3333
[dependencies]
3434
arrow-schema = { workspace = true }
3535
geo-traits = { version = "0.3" }
36-
wkb = { version = "0.9" }
36+
wkb = { version = "0.9.1" }
3737

3838
[dev-dependencies]
3939
wkt = { version = "0.14" }

parquet-geospatial/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,3 +29,4 @@
2929
3030
pub mod bounding;
3131
pub mod interval;
32+
pub mod testing;

parquet-geospatial/src/testing.rs

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Testing utilities for geospatial Parquet types
19+
20+
/// Build well-known binary representing a point with the given XY coordinate
21+
pub fn wkb_point_xy(x: f64, y: f64) -> Vec<u8> {
22+
let mut item: [u8; 21] = [0; 21];
23+
item[0] = 0x01;
24+
item[1] = 0x01;
25+
item[5..13].copy_from_slice(x.to_le_bytes().as_slice());
26+
item[13..21].copy_from_slice(y.to_le_bytes().as_slice());
27+
item.to_vec()
28+
}
29+
30+
/// Build well-known binary representing a point with the given XYZM coordinate
31+
pub fn wkb_point_xyzm(x: f64, y: f64, z: f64, m: f64) -> Vec<u8> {
32+
let mut item: [u8; 37] = [0; 37];
33+
item[0] = 0x01;
34+
item[1..5].copy_from_slice(3001_u32.to_le_bytes().as_slice());
35+
item[5..13].copy_from_slice(x.to_le_bytes().as_slice());
36+
item[13..21].copy_from_slice(y.to_le_bytes().as_slice());
37+
item[21..29].copy_from_slice(z.to_le_bytes().as_slice());
38+
item[29..37].copy_from_slice(m.to_le_bytes().as_slice());
39+
item.to_vec()
40+
}
41+
42+
#[cfg(test)]
43+
mod test {
44+
45+
use wkb::reader::Wkb;
46+
47+
use super::*;
48+
49+
#[test]
50+
fn test_wkb_item() {
51+
let bytes = wkb_point_xy(1.0, 2.0);
52+
let geometry = Wkb::try_new(&bytes).unwrap();
53+
let mut wkt = String::new();
54+
wkt::to_wkt::write_geometry(&mut wkt, &geometry).unwrap();
55+
assert_eq!(wkt, "POINT(1 2)");
56+
}
57+
58+
#[test]
59+
fn test_wkb_point_xyzm() {
60+
let bytes = wkb_point_xyzm(1.0, 2.0, 3.0, 4.0);
61+
let geometry = Wkb::try_new(&bytes).unwrap();
62+
let mut wkt = String::new();
63+
wkt::to_wkt::write_geometry(&mut wkt, &geometry).unwrap();
64+
assert_eq!(wkt, "POINT ZM(1 2 3 4)");
65+
}
66+
}

parquet/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ arrow-data = { workspace = true, optional = true }
4545
arrow-schema = { workspace = true, optional = true }
4646
arrow-select = { workspace = true, optional = true }
4747
arrow-ipc = { workspace = true, optional = true }
48+
parquet-geospatial = { workspace = true, optional = true }
4849
parquet-variant = { workspace = true, optional = true }
4950
parquet-variant-json = { workspace = true, optional = true }
5051
parquet-variant-compute = { workspace = true, optional = true }
@@ -131,6 +132,8 @@ flate2-rust_backened = ["flate2/rust_backend"]
131132
flate2-zlib-rs = ["flate2/zlib-rs"]
132133
# Enable parquet variant support
133134
variant_experimental = ["arrow", "parquet-variant", "parquet-variant-json", "parquet-variant-compute"]
135+
# Enable geospatial support
136+
geospatial = ["parquet-geospatial"]
134137

135138

136139
[[example]]

parquet/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ The `parquet` crate provides the following features which may be enabled in your
6565
- `simdutf8` (default) - Use the [`simdutf8`] crate for SIMD-accelerated UTF-8 validation
6666
- `encryption` - support for reading / writing encrypted Parquet files
6767
- `variant_experimental` - ⚠️ Experimental [Parquet Variant] support, which may change, even between minor releases.
68+
- `geospatial` - ⚠️ Experimental geospatial support, which may change, even between minor releases.
6869

6970
[`arrow`]: https://crates.io/crates/arrow
7071
[`simdutf8`]: https://crates.io/crates/simdutf8

parquet/src/arrow/arrow_writer/byte_array.rs

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ use crate::encodings::encoding::{DeltaBitPackEncoder, Encoder};
2323
use crate::encodings::rle::RleEncoder;
2424
use crate::errors::{ParquetError, Result};
2525
use crate::file::properties::{EnabledStatistics, WriterProperties, WriterVersion};
26+
use crate::geospatial::accumulator::{try_new_geo_stats_accumulator, GeoStatsAccumulator};
27+
use crate::geospatial::statistics::GeospatialStatistics;
2628
use crate::schema::types::ColumnDescPtr;
2729
use crate::util::bit_util::num_required_bits;
2830
use crate::util::interner::{Interner, Storage};
@@ -421,6 +423,7 @@ pub struct ByteArrayEncoder {
421423
min_value: Option<ByteArray>,
422424
max_value: Option<ByteArray>,
423425
bloom_filter: Option<Sbbf>,
426+
geo_stats_accumulator: Option<Box<dyn GeoStatsAccumulator>>,
424427
}
425428

426429
impl ColumnValueEncoder for ByteArrayEncoder {
@@ -447,13 +450,16 @@ impl ColumnValueEncoder for ByteArrayEncoder {
447450

448451
let statistics_enabled = props.statistics_enabled(descr.path());
449452

453+
let geo_stats_accumulator = try_new_geo_stats_accumulator(descr);
454+
450455
Ok(Self {
451456
fallback,
452457
statistics_enabled,
453458
bloom_filter,
454459
dict_encoder: dictionary,
455460
min_value: None,
456461
max_value: None,
462+
geo_stats_accumulator,
457463
})
458464
}
459465

@@ -536,6 +542,10 @@ impl ColumnValueEncoder for ByteArrayEncoder {
536542
_ => self.fallback.flush_data_page(min_value, max_value),
537543
}
538544
}
545+
546+
fn flush_geospatial_statistics(&mut self) -> Option<Box<GeospatialStatistics>> {
547+
self.geo_stats_accumulator.as_mut().map(|a| a.finish())?
548+
}
539549
}
540550

541551
/// Encodes the provided `values` and `indices` to `encoder`
@@ -547,7 +557,9 @@ where
547557
T::Item: Copy + Ord + AsRef<[u8]>,
548558
{
549559
if encoder.statistics_enabled != EnabledStatistics::None {
550-
if let Some((min, max)) = compute_min_max(values, indices.iter().cloned()) {
560+
if let Some(accumulator) = encoder.geo_stats_accumulator.as_mut() {
561+
update_geo_stats_accumulator(accumulator.as_mut(), values, indices.iter().cloned());
562+
} else if let Some((min, max)) = compute_min_max(values, indices.iter().cloned()) {
551563
if encoder.min_value.as_ref().is_none_or(|m| m > &min) {
552564
encoder.min_value = Some(min);
553565
}
@@ -595,3 +607,20 @@ where
595607
}
596608
Some((min.as_ref().to_vec().into(), max.as_ref().to_vec().into()))
597609
}
610+
611+
/// Updates geospatial statistics for the provided array and indices
612+
fn update_geo_stats_accumulator<T>(
613+
bounder: &mut dyn GeoStatsAccumulator,
614+
array: T,
615+
valid: impl Iterator<Item = usize>,
616+
) where
617+
T: ArrayAccessor,
618+
T::Item: Copy + Ord + AsRef<[u8]>,
619+
{
620+
if bounder.is_valid() {
621+
for idx in valid {
622+
let val = array.value(idx);
623+
bounder.update_wkb(val.as_ref());
624+
}
625+
}
626+
}

parquet/src/arrow/arrow_writer/mod.rs

Lines changed: 89 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -228,11 +228,18 @@ impl<W: Write + Send> ArrowWriter<W> {
228228
options: ArrowWriterOptions,
229229
) -> Result<Self> {
230230
let mut props = options.properties;
231-
let mut converter = ArrowSchemaConverter::new().with_coerce_types(props.coerce_types());
232-
if let Some(schema_root) = &options.schema_root {
233-
converter = converter.schema_root(schema_root);
234-
}
235-
let schema = converter.convert(&arrow_schema)?;
231+
232+
let schema = if let Some(parquet_schema) = options.schema_descr {
233+
parquet_schema.clone()
234+
} else {
235+
let mut converter = ArrowSchemaConverter::new().with_coerce_types(props.coerce_types());
236+
if let Some(schema_root) = &options.schema_root {
237+
converter = converter.schema_root(schema_root);
238+
}
239+
240+
converter.convert(&arrow_schema)?
241+
};
242+
236243
if !options.skip_arrow_metadata {
237244
// add serialized arrow schema
238245
add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut props);
@@ -457,6 +464,7 @@ pub struct ArrowWriterOptions {
457464
properties: WriterProperties,
458465
skip_arrow_metadata: bool,
459466
schema_root: Option<String>,
467+
schema_descr: Option<SchemaDescriptor>,
460468
}
461469

462470
impl ArrowWriterOptions {
@@ -490,6 +498,18 @@ impl ArrowWriterOptions {
490498
..self
491499
}
492500
}
501+
502+
/// Explicitly specify the Parquet schema to be used
503+
///
504+
/// If omitted (the default), the [`ArrowSchemaConverter`] is used to compute the
505+
/// Parquet [`SchemaDescriptor`]. This may be used When the [`SchemaDescriptor`] is
506+
/// already known or must be calculated using custom logic.
507+
pub fn with_parquet_schema(self, schema_descr: SchemaDescriptor) -> Self {
508+
Self {
509+
schema_descr: Some(schema_descr),
510+
..self
511+
}
512+
}
493513
}
494514

495515
/// A single column chunk produced by [`ArrowColumnWriter`]
@@ -1508,7 +1528,7 @@ mod tests {
15081528
use crate::file::page_index::column_index::ColumnIndexMetaData;
15091529
use crate::file::reader::SerializedPageReader;
15101530
use crate::parquet_thrift::{ReadThrift, ThriftSliceInputProtocol};
1511-
use crate::schema::types::ColumnPath;
1531+
use crate::schema::types::{ColumnPath, Type};
15121532
use arrow::datatypes::ToByteSlice;
15131533
use arrow::datatypes::{DataType, Schema};
15141534
use arrow::error::Result as ArrowResult;
@@ -4132,6 +4152,69 @@ mod tests {
41324152
}
41334153
}
41344154

4155+
#[test]
4156+
fn test_arrow_writer_explicit_schema() {
4157+
// Write an int32 array using explicit int64 storage
4158+
let batch_schema = Arc::new(Schema::new(vec![Field::new(
4159+
"integers",
4160+
DataType::Int32,
4161+
true,
4162+
)]));
4163+
let parquet_schema = Type::group_type_builder("root")
4164+
.with_fields(vec![Type::primitive_type_builder(
4165+
"integers",
4166+
crate::basic::Type::INT64,
4167+
)
4168+
.build()
4169+
.unwrap()
4170+
.into()])
4171+
.build()
4172+
.unwrap();
4173+
let parquet_schema_descr = SchemaDescriptor::new(parquet_schema.into());
4174+
4175+
let batch = RecordBatch::try_new(
4176+
batch_schema.clone(),
4177+
vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
4178+
)
4179+
.unwrap();
4180+
4181+
let explicit_schema_options =
4182+
ArrowWriterOptions::new().with_parquet_schema(parquet_schema_descr);
4183+
let mut buf = Vec::with_capacity(1024);
4184+
let mut writer = ArrowWriter::try_new_with_options(
4185+
&mut buf,
4186+
batch_schema.clone(),
4187+
explicit_schema_options,
4188+
)
4189+
.unwrap();
4190+
writer.write(&batch).unwrap();
4191+
writer.close().unwrap();
4192+
4193+
let bytes = Bytes::from(buf);
4194+
let reader_builder = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
4195+
4196+
let expected_schema = Arc::new(Schema::new(vec![Field::new(
4197+
"integers",
4198+
DataType::Int64,
4199+
true,
4200+
)]));
4201+
assert_eq!(reader_builder.schema(), &expected_schema);
4202+
4203+
let batches = reader_builder
4204+
.build()
4205+
.unwrap()
4206+
.collect::<Result<Vec<_>, ArrowError>>()
4207+
.unwrap();
4208+
assert_eq!(batches.len(), 1);
4209+
4210+
let expected_batch = RecordBatch::try_new(
4211+
expected_schema.clone(),
4212+
vec![Arc::new(Int64Array::from(vec![1, 2, 3, 4])) as _],
4213+
)
4214+
.unwrap();
4215+
assert_eq!(batches[0], expected_batch);
4216+
}
4217+
41354218
#[test]
41364219
fn mismatched_schemas() {
41374220
let batch_schema = Schema::new(vec![Field::new("count", DataType::Int32, false)]);

parquet/src/column/writer/encoder.rs

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ use crate::data_type::private::ParquetValueType;
2828
use crate::encodings::encoding::{DictEncoder, Encoder, get_encoder};
2929
use crate::errors::{ParquetError, Result};
3030
use crate::file::properties::{EnabledStatistics, WriterProperties};
31+
use crate::geospatial::accumulator::{try_new_geo_stats_accumulator, GeoStatsAccumulator};
32+
use crate::geospatial::statistics::GeospatialStatistics;
3133
use crate::schema::types::{ColumnDescPtr, ColumnDescriptor};
3234

3335
/// A collection of [`ParquetValueType`] encoded by a [`ColumnValueEncoder`]
@@ -121,6 +123,10 @@ pub trait ColumnValueEncoder {
121123
/// will *not* be tracked by the bloom filter as it is empty since. This should be called once
122124
/// near the end of encoding.
123125
fn flush_bloom_filter(&mut self) -> Option<Sbbf>;
126+
127+
/// Computes [`GeospatialStatistics`], if any, and resets internal state such that any internal
128+
/// accumulator is prepared to accumulate statistics for the next column chunk.
129+
fn flush_geospatial_statistics(&mut self) -> Option<Box<GeospatialStatistics>>;
124130
}
125131

126132
pub struct ColumnValueEncoderImpl<T: DataType> {
@@ -133,6 +139,7 @@ pub struct ColumnValueEncoderImpl<T: DataType> {
133139
max_value: Option<T::T>,
134140
bloom_filter: Option<Sbbf>,
135141
variable_length_bytes: Option<i64>,
142+
geo_stats_accumulator: Option<Box<dyn GeoStatsAccumulator>>,
136143
}
137144

138145
impl<T: DataType> ColumnValueEncoderImpl<T> {
@@ -145,10 +152,12 @@ impl<T: DataType> ColumnValueEncoderImpl<T> {
145152

146153
fn write_slice(&mut self, slice: &[T::T]) -> Result<()> {
147154
if self.statistics_enabled != EnabledStatistics::None
148-
// INTERVAL has undefined sort order, so don't write min/max stats for it
155+
// INTERVAL, Geometry, and Geography have undefined sort order, so don't write min/max stats for them
149156
&& self.descr.converted_type() != ConvertedType::INTERVAL
150157
{
151-
if let Some((min, max)) = self.min_max(slice, None) {
158+
if let Some(accumulator) = self.geo_stats_accumulator.as_deref_mut() {
159+
update_geo_stats_accumulator(accumulator, slice.iter());
160+
} else if let Some((min, max)) = self.min_max(slice, None) {
152161
update_min(&self.descr, &min, &mut self.min_value);
153162
update_max(&self.descr, &max, &mut self.max_value);
154163
}
@@ -201,6 +210,8 @@ impl<T: DataType> ColumnValueEncoder for ColumnValueEncoderImpl<T> {
201210
.map(|props| Sbbf::new_with_ndv_fpp(props.ndv, props.fpp))
202211
.transpose()?;
203212

213+
let geo_stats_accumulator = try_new_geo_stats_accumulator(descr);
214+
204215
Ok(Self {
205216
encoder,
206217
dict_encoder,
@@ -211,6 +222,7 @@ impl<T: DataType> ColumnValueEncoder for ColumnValueEncoderImpl<T> {
211222
min_value: None,
212223
max_value: None,
213224
variable_length_bytes: None,
225+
geo_stats_accumulator,
214226
})
215227
}
216228

@@ -307,6 +319,10 @@ impl<T: DataType> ColumnValueEncoder for ColumnValueEncoderImpl<T> {
307319
variable_length_bytes: self.variable_length_bytes.take(),
308320
})
309321
}
322+
323+
fn flush_geospatial_statistics(&mut self) -> Option<Box<GeospatialStatistics>> {
324+
self.geo_stats_accumulator.as_mut().map(|a| a.finish())?
325+
}
310326
}
311327

312328
fn get_min_max<'a, T, I>(descr: &ColumnDescriptor, mut iter: I) -> Option<(T, T)>
@@ -367,3 +383,15 @@ fn replace_zero<T: ParquetValueType>(val: &T, descr: &ColumnDescriptor, replace:
367383
_ => val.clone(),
368384
}
369385
}
386+
387+
fn update_geo_stats_accumulator<'a, T, I>(bounder: &mut dyn GeoStatsAccumulator, iter: I)
388+
where
389+
T: ParquetValueType + 'a,
390+
I: Iterator<Item = &'a T>,
391+
{
392+
if bounder.is_valid() {
393+
for val in iter {
394+
bounder.update_wkb(val.as_bytes());
395+
}
396+
}
397+
}

0 commit comments

Comments
 (0)