Skip to content

Commit fd662ad

Browse files
committed
Revert "Parquet: clear metadata of ParquetRecordBatchStream::schema"
This reverts commit 84be336.
1 parent 84be336 commit fd662ad

File tree

1 file changed

+2
-44
lines changed
  • parquet/src/arrow/async_reader

1 file changed

+2
-44
lines changed

parquet/src/arrow/async_reader/mod.rs

Lines changed: 2 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ use futures::stream::Stream;
9090
use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
9191

9292
use arrow_array::RecordBatch;
93-
use arrow_schema::{Schema, SchemaRef};
93+
use arrow_schema::SchemaRef;
9494

9595
use crate::arrow::array_reader::{build_array_reader, RowGroups};
9696
use crate::arrow::arrow_reader::{
@@ -385,20 +385,13 @@ impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
385385
offset: self.offset,
386386
};
387387

388-
// Clear metadata from schema for ParquetRecordBatchStream
389-
let schema = if self.schema.metadata.is_empty() {
390-
self.schema
391-
} else {
392-
Arc::new(Schema::new(self.schema.fields.clone()))
393-
};
394-
395388
Ok(ParquetRecordBatchStream {
396389
metadata: self.metadata,
397390
batch_size,
398391
row_groups,
399392
projection: self.projection,
400393
selection: self.selection,
401-
schema,
394+
schema: self.schema,
402395
reader: Some(reader),
403396
state: StreamState::Init,
404397
})
@@ -1591,41 +1584,6 @@ mod tests {
15911584
test_get_row_group_column_bloom_filter(data, false).await;
15921585
}
15931586

1594-
#[tokio::test]
1595-
async fn test_parquet_record_batch_stream_schema() {
1596-
let testdata = arrow::util::test_util::parquet_test_data();
1597-
let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
1598-
let data = Bytes::from(std::fs::read(path).unwrap());
1599-
let metadata = Arc::new(parse_metadata(&data).unwrap());
1600-
let async_reader = TestReader {
1601-
data,
1602-
metadata,
1603-
requests: Default::default(),
1604-
};
1605-
let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1606-
.await
1607-
.unwrap();
1608-
let builder_schema = builder.schema().clone();
1609-
let stream = builder.build().unwrap();
1610-
let stream_schema = stream.schema().clone();
1611-
let batches = stream.try_collect::<Vec<_>>().await.unwrap();
1612-
let batch_schema = batches[0].schema();
1613-
1614-
// ParquetRecordBatchStreamBuilder::schema should preserve metadata
1615-
assert_eq!(builder_schema.metadata.len(), 2);
1616-
assert_eq!(
1617-
builder_schema.metadata["parquet.avro.schema"],
1618-
"{\"type\":\"record\",\"name\":\"data\",\"fields\":[{\"name\":\"String\",\"type\":[\"null\",\"string\"],\"doc\":\"Type inferred from 'Hello'\",\"default\":null}]}"
1619-
);
1620-
assert_eq!(builder_schema.metadata["writer.model.name"], "avro");
1621-
1622-
// ParquetRecordBatchStream::schema should not have metadata
1623-
assert!(stream_schema.metadata.is_empty());
1624-
1625-
// RecordBatches from ParquetRecordBatchStream should not have metadata in their schema
1626-
assert!(batch_schema.metadata.is_empty());
1627-
}
1628-
16291587
#[tokio::test]
16301588
async fn test_get_row_group_column_bloom_filter_with_length() {
16311589
// convert to new parquet file with bloom_filter_length

0 commit comments

Comments
 (0)