Skip to content

Commit b1a9db2

Browse files
committed
[wip]
1 parent 3913979 commit b1a9db2

File tree

1 file changed

+65
-3
lines changed

1 file changed

+65
-3
lines changed

datafusion/src/physical_plan/file_format/file_stream.rs

Lines changed: 65 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,15 @@ use crate::{
2525
datasource::{object_store::ObjectStore, PartitionedFile},
2626
error::Result as DataFusionResult,
2727
physical_plan::RecordBatchStream,
28+
scalar::ScalarValue,
2829
};
2930
use arrow::{
30-
datatypes::SchemaRef,
31+
array::{
32+
make_array, Array, ArrayData, ArrayRef, DictionaryArray, StringArray,
33+
UInt8BufferBuilder, UInt8Builder,
34+
},
35+
buffer::Buffer,
36+
datatypes::{DataType, SchemaRef, UInt8Type},
3137
error::{ArrowError, Result as ArrowResult},
3238
record_batch::RecordBatch,
3339
};
@@ -61,8 +67,12 @@ impl<T> FormatReaderOpener for T where
6167

6268
/// A stream that iterates record batch by record batch, file over file.
6369
pub struct FileStream<F: FormatReaderOpener> {
70+
/// Partitioning column names for this stream
71+
partition_names: Vec<String>,
6472
/// An iterator over record batches of the last file returned by file_iter
6573
batch_iter: BatchIter,
74+
/// Partitioning column values for the current batch_iter
75+
partition_values: Vec<ScalarValue>,
6676
/// An iterator over input files
6777
file_iter: FileIter,
6878
/// The stream schema (file schema after projection)
@@ -74,6 +84,10 @@ pub struct FileStream<F: FormatReaderOpener> {
7484
/// is not capable of limiting the number of records in the last batch, the file
7585
/// stream will take care of truncating it.
7686
file_reader: F,
87+
/// A buffer initialized to zeros that represents the key array of all partition
88+
/// columns (partition columns are materialized by dictionary arrays with only one
89+
/// value in the dictionary, thus all the keys are equal to zero)
90+
key_buffer: Option<Buffer>,
7791
}
7892

7993
impl<F: FormatReaderOpener> FileStream<F> {
@@ -83,6 +97,7 @@ impl<F: FormatReaderOpener> FileStream<F> {
8397
file_reader: F,
8498
schema: SchemaRef,
8599
limit: Option<usize>,
100+
partition_names: Vec<String>,
86101
) -> Self {
87102
let read_iter = files.into_iter().map(move |f| -> DataFusionResult<_> {
88103
object_store
@@ -93,16 +108,31 @@ impl<F: FormatReaderOpener> FileStream<F> {
93108
Self {
94109
file_iter: Box::new(read_iter),
95110
batch_iter: Box::new(iter::empty()),
111+
partition_values: vec![],
96112
remain: limit,
97113
schema,
98114
file_reader,
115+
partition_names,
116+
key_buffer: None,
99117
}
100118
}
101119

102-
/// Acts as a flat_map of record batches over files.
120+
/// Acts as a flat_map of record batches over files. Adds the partitioning
121+
/// Columns to the returned record batches.
103122
fn next_batch(&mut self) -> Option<ArrowResult<RecordBatch>> {
104123
match self.batch_iter.next() {
105-
Some(batch) => Some(batch),
124+
Some(Ok(batch)) => {
125+
// let mut cols = batch.columns().to_vec();
126+
// let partitions = self.partition_names.iter().zip(&self.partition_values);
127+
// for p in partitions {
128+
// cols.push(
129+
// Int8Array::from(std::iter::repeat(0u8).take(batch.num_rows()))
130+
// .finish_dict,
131+
// )
132+
// }
133+
Some(Ok(batch))
134+
}
135+
Some(Err(e)) => Some(Err(e)),
106136
None => match self.file_iter.next() {
107137
Some(Ok(f)) => {
108138
self.batch_iter = (self.file_reader)(f, &self.remain);
@@ -113,6 +143,38 @@ impl<F: FormatReaderOpener> FileStream<F> {
113143
},
114144
}
115145
}
146+
147+
fn init_and_cache_buf(&mut self, len: usize) -> Buffer {
148+
let mut key_buffer_builder = UInt8BufferBuilder::new(len);
149+
key_buffer_builder.advance(len); // keys are all 0
150+
self.key_buffer = Some(key_buffer_builder.finish());
151+
self.key_buffer.as_ref().unwrap().clone()
152+
}
153+
154+
fn create_dict_array(&mut self, val: &ScalarValue, len: usize) -> ArrayRef {
155+
let value_datatype = val.get_datatype();
156+
157+
// build value dictionary
158+
let dict_vals = val.to_array();
159+
160+
// build keys array
161+
let key_buffer = match self.key_buffer {
162+
None => self.init_and_cache_buf(len),
163+
Some(buf) if buf.len() < len => self.init_and_cache_buf(len),
164+
Some(buf) => buf.slice(buf.len() - len),
165+
};
166+
167+
// create data type
168+
let data_type =
169+
DataType::Dictionary(Box::new(DataType::UInt8), Box::new(value_datatype));
170+
171+
// assemble pieces together
172+
let mut builder = ArrayData::builder(data_type)
173+
.len(len)
174+
.add_buffer(key_buffer);
175+
builder = builder.add_child_data(dict_vals.data().clone());
176+
Arc::new(DictionaryArray::<UInt8Type>::from(builder.build().unwrap()))
177+
}
116178
}
117179

118180
impl<F: FormatReaderOpener> Stream for FileStream<F> {

0 commit comments

Comments
 (0)