Skip to content

Commit 7910765

Browse files
committed
added projections for avro columns
1 parent b2cfe2b commit 7910765

File tree

3 files changed

+12
-14
lines changed

3 files changed

+12
-14
lines changed

datafusion/src/avro_to_arrow/arrow_array_reader.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ impl<'a, R: Read> AvroBatchReader<R> {
3838
avro_schemas: Vec<avro_schema::Schema>,
3939
codec: Option<Compression>,
4040
file_marker: [u8; 16],
41+
projection: Option<Vec<bool>>,
4142
) -> Result<Self> {
4243
let reader = AvroReader::new(
4344
read::Decompressor::new(
@@ -46,6 +47,7 @@ impl<'a, R: Read> AvroBatchReader<R> {
4647
),
4748
avro_schemas,
4849
schema.fields.clone(),
50+
projection,
4951
);
5052
Ok(Self { reader, schema })
5153
}

datafusion/src/avro_to_arrow/reader.rs

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -108,22 +108,16 @@ impl ReaderBuilder {
108108

109109
// check if schema should be inferred
110110
source.seek(SeekFrom::Start(0))?;
111-
let (mut avro_schemas, mut schema, codec, file_marker) =
111+
let (avro_schemas, schema, codec, file_marker) =
112112
read::read_metadata(&mut source)?;
113-
if let Some(proj) = self.projection {
114-
let mut indices: Vec<usize> = schema
113+
114+
let projection = self.projection.map(|proj| {
115+
schema
115116
.fields
116117
.iter()
117-
.filter(|f| !proj.contains(&f.name))
118-
.enumerate()
119-
.map(|(i, _)| i)
120-
.collect();
121-
indices.sort_by(|i1, i2| i2.cmp(i1));
122-
for i in indices {
123-
avro_schemas.remove(i);
124-
schema.fields.remove(i);
125-
}
126-
}
118+
.map(|f| proj.contains(&f.name))
119+
.collect::<Vec<bool>>()
120+
});
127121

128122
Reader::try_new(
129123
source,
@@ -132,6 +126,7 @@ impl ReaderBuilder {
132126
avro_schemas,
133127
codec,
134128
file_marker,
129+
projection,
135130
)
136131
}
137132
}
@@ -155,6 +150,7 @@ impl<'a, R: Read> Reader<R> {
155150
avro_schemas: Vec<avro_schema::Schema>,
156151
codec: Option<Compression>,
157152
file_marker: [u8; 16],
153+
projection: Option<Vec<bool>>,
158154
) -> Result<Self> {
159155
Ok(Self {
160156
array_reader: AvroBatchReader::try_new(
@@ -163,6 +159,7 @@ impl<'a, R: Read> Reader<R> {
163159
avro_schemas,
164160
codec,
165161
file_marker,
162+
projection,
166163
)?,
167164
schema,
168165
batch_size,

datafusion/src/avro_to_arrow/schema.rs

Lines changed: 0 additions & 1 deletion
This file was deleted.

0 commit comments

Comments
 (0)