Skip to content

Commit 84be336

Browse files
committed
Parquet: clear metadata of ParquetRecordBatchStream::schema
1 parent df69ef5 commit 84be336

File tree

1 file changed

+44
-2
lines changed
  • parquet/src/arrow/async_reader

1 file changed

+44
-2
lines changed

parquet/src/arrow/async_reader/mod.rs

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ use futures::stream::Stream;
9090
use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
9191

9292
use arrow_array::RecordBatch;
93-
use arrow_schema::SchemaRef;
93+
use arrow_schema::{Schema, SchemaRef};
9494

9595
use crate::arrow::array_reader::{build_array_reader, RowGroups};
9696
use crate::arrow::arrow_reader::{
@@ -385,13 +385,20 @@ impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
385385
offset: self.offset,
386386
};
387387

388+
// Clear metadata from schema for ParquetRecordBatchStream
389+
let schema = if self.schema.metadata.is_empty() {
390+
self.schema
391+
} else {
392+
Arc::new(Schema::new(self.schema.fields.clone()))
393+
};
394+
388395
Ok(ParquetRecordBatchStream {
389396
metadata: self.metadata,
390397
batch_size,
391398
row_groups,
392399
projection: self.projection,
393400
selection: self.selection,
394-
schema: self.schema,
401+
schema,
395402
reader: Some(reader),
396403
state: StreamState::Init,
397404
})
@@ -1584,6 +1591,41 @@ mod tests {
15841591
test_get_row_group_column_bloom_filter(data, false).await;
15851592
}
15861593

1594+
#[tokio::test]
1595+
async fn test_parquet_record_batch_stream_schema() {
1596+
let testdata = arrow::util::test_util::parquet_test_data();
1597+
let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
1598+
let data = Bytes::from(std::fs::read(path).unwrap());
1599+
let metadata = Arc::new(parse_metadata(&data).unwrap());
1600+
let async_reader = TestReader {
1601+
data,
1602+
metadata,
1603+
requests: Default::default(),
1604+
};
1605+
let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1606+
.await
1607+
.unwrap();
1608+
let builder_schema = builder.schema().clone();
1609+
let stream = builder.build().unwrap();
1610+
let stream_schema = stream.schema().clone();
1611+
let batches = stream.try_collect::<Vec<_>>().await.unwrap();
1612+
let batch_schema = batches[0].schema();
1613+
1614+
// ParquetRecordBatchStreamBuilder::schema should preserve metadata
1615+
assert_eq!(builder_schema.metadata.len(), 2);
1616+
assert_eq!(
1617+
builder_schema.metadata["parquet.avro.schema"],
1618+
"{\"type\":\"record\",\"name\":\"data\",\"fields\":[{\"name\":\"String\",\"type\":[\"null\",\"string\"],\"doc\":\"Type inferred from 'Hello'\",\"default\":null}]}"
1619+
);
1620+
assert_eq!(builder_schema.metadata["writer.model.name"], "avro");
1621+
1622+
// ParquetRecordBatchStream::schema should not have metadata
1623+
assert!(stream_schema.metadata.is_empty());
1624+
1625+
// RecordBatches from ParquetRecordBatchStream should not have metadata in their schema
1626+
assert!(batch_schema.metadata.is_empty());
1627+
}
1628+
15871629
#[tokio::test]
15881630
async fn test_get_row_group_column_bloom_filter_with_length() {
15891631
// convert to new parquet file with bloom_filter_length

0 commit comments

Comments
 (0)