-
Couldn't load subscription status.
- Fork 1k
Implement projection for arrow IPC Reader file / streams
#1339
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
11 commits
Select commit
Hold shift + click to select a range
792c2e6
Implement projection for arrow file / streams
Dandandan e2984e3
Tests
Dandandan b2cbf8e
Fix
Dandandan 541036e
Fix
Dandandan 431aa56
Add test
Dandandan d218081
Add test
Dandandan bdf4eea
Add link
Dandandan 427fa08
Undo change to existing test
Dandandan b94ef98
Update arrow/src/ipc/reader.rs
Dandandan 169cdda
Use project
Dandandan 04b133c
Merge branch 'arrow_ipc_projection' of github.com:Dandandan/arrow-rs …
Dandandan File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -69,6 +69,7 @@ pub fn flight_data_to_arrow_batch( | |
| batch, | ||
| schema, | ||
| dictionaries_by_field, | ||
| None, | ||
| ) | ||
| })? | ||
| } | ||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -465,6 +465,7 @@ pub fn read_record_batch( | |
| batch: ipc::RecordBatch, | ||
| schema: SchemaRef, | ||
| dictionaries: &[Option<ArrayRef>], | ||
| projection: Option<&[usize]>, | ||
| ) -> Result<RecordBatch> { | ||
| let buffers = batch.buffers().ok_or_else(|| { | ||
| ArrowError::IoError("Unable to get buffers from IPC RecordBatch".to_string()) | ||
|
|
@@ -477,23 +478,43 @@ pub fn read_record_batch( | |
| let mut node_index = 0; | ||
| let mut arrays = vec![]; | ||
|
|
||
| // keep track of index as lists require more than one node | ||
| for field in schema.fields() { | ||
| let triple = create_array( | ||
| field_nodes, | ||
| field.data_type(), | ||
| buf, | ||
| buffers, | ||
| dictionaries, | ||
| node_index, | ||
| buffer_index, | ||
| )?; | ||
| node_index = triple.1; | ||
| buffer_index = triple.2; | ||
| arrays.push(triple.0); | ||
| } | ||
| if let Some(projection) = projection { | ||
| let fields = schema.fields(); | ||
| for &index in projection { | ||
| let field = &fields[index]; | ||
| let triple = create_array( | ||
| field_nodes, | ||
| field.data_type(), | ||
| buf, | ||
| buffers, | ||
| dictionaries, | ||
| node_index, | ||
| buffer_index, | ||
| )?; | ||
| node_index = triple.1; | ||
| buffer_index = triple.2; | ||
| arrays.push(triple.0); | ||
| } | ||
|
|
||
| RecordBatch::try_new(schema, arrays) | ||
| RecordBatch::try_new(Arc::new(schema.project(projection)?), arrays) | ||
| } else { | ||
| // keep track of index as lists require more than one node | ||
| for field in schema.fields() { | ||
| let triple = create_array( | ||
| field_nodes, | ||
| field.data_type(), | ||
| buf, | ||
| buffers, | ||
| dictionaries, | ||
| node_index, | ||
| buffer_index, | ||
| )?; | ||
| node_index = triple.1; | ||
| buffer_index = triple.2; | ||
| arrays.push(triple.0); | ||
| } | ||
| RecordBatch::try_new(schema, arrays) | ||
| } | ||
| } | ||
|
|
||
| /// Read the dictionary from the buffer and provided metadata, | ||
|
|
@@ -532,6 +553,7 @@ pub fn read_dictionary( | |
| batch.data().unwrap(), | ||
| Arc::new(schema), | ||
| dictionaries_by_field, | ||
| None, | ||
| )?; | ||
| Some(record_batch.column(0).clone()) | ||
| } | ||
|
|
@@ -581,14 +603,17 @@ pub struct FileReader<R: Read + Seek> { | |
|
|
||
| /// Metadata version | ||
| metadata_version: ipc::MetadataVersion, | ||
|
|
||
| /// Optional projection and projected_schema | ||
| projection: Option<(Vec<usize>, Schema)>, | ||
| } | ||
|
|
||
| impl<R: Read + Seek> FileReader<R> { | ||
| /// Try to create a new file reader | ||
| /// | ||
| /// Returns errors if the file does not meet the Arrow Format header and footer | ||
| /// requirements | ||
| pub fn try_new(reader: R) -> Result<Self> { | ||
| pub fn try_new(reader: R, projection: Option<Vec<usize>>) -> Result<Self> { | ||
| let mut reader = BufReader::new(reader); | ||
| // check if header and footer contain correct magic bytes | ||
| let mut magic_buffer: [u8; 6] = [0; 6]; | ||
|
|
@@ -671,6 +696,13 @@ impl<R: Read + Seek> FileReader<R> { | |
| } | ||
| }; | ||
| } | ||
| let projection = match projection { | ||
| Some(projection_indices) => { | ||
| let schema = schema.project(&projection_indices)?; | ||
| Some((projection_indices, schema)) | ||
| } | ||
| _ => None, | ||
| }; | ||
|
|
||
| Ok(Self { | ||
| reader, | ||
|
|
@@ -680,6 +712,7 @@ impl<R: Read + Seek> FileReader<R> { | |
| total_blocks, | ||
| dictionaries_by_field, | ||
| metadata_version: footer.version(), | ||
| projection, | ||
| }) | ||
| } | ||
|
|
||
|
|
@@ -760,6 +793,8 @@ impl<R: Read + Seek> FileReader<R> { | |
| batch, | ||
| self.schema(), | ||
| &self.dictionaries_by_field, | ||
| self.projection.as_ref().map(|x| x.0.as_ref()), | ||
|
|
||
| ).map(Some) | ||
| } | ||
| ipc::MessageHeader::NONE => { | ||
|
|
@@ -808,6 +843,9 @@ pub struct StreamReader<R: Read> { | |
| /// | ||
| /// This value is set to `true` the first time the reader's `next()` returns `None`. | ||
| finished: bool, | ||
|
|
||
| /// Optional projection | ||
| projection: Option<(Vec<usize>, Schema)>, | ||
| } | ||
|
|
||
| impl<R: Read> StreamReader<R> { | ||
|
|
@@ -816,7 +854,7 @@ impl<R: Read> StreamReader<R> { | |
| /// The first message in the stream is the schema, the reader will fail if it does not | ||
| /// encounter a schema. | ||
| /// To check if the reader is done, use `is_finished(self)` | ||
| pub fn try_new(reader: R) -> Result<Self> { | ||
| pub fn try_new(reader: R, projection: Option<Vec<usize>>) -> Result<Self> { | ||
| let mut reader = BufReader::new(reader); | ||
| // determine metadata length | ||
| let mut meta_size: [u8; 4] = [0; 4]; | ||
|
|
@@ -845,11 +883,19 @@ impl<R: Read> StreamReader<R> { | |
| // Create an array of optional dictionary value arrays, one per field. | ||
| let dictionaries_by_field = vec![None; schema.fields().len()]; | ||
|
|
||
| let projection = match projection { | ||
| Some(projection_indices) => { | ||
| let schema = schema.project(&projection_indices)?; | ||
| Some((projection_indices, schema)) | ||
| } | ||
| _ => None, | ||
| }; | ||
| Ok(Self { | ||
| reader, | ||
| schema: Arc::new(schema), | ||
| finished: false, | ||
| dictionaries_by_field, | ||
| projection, | ||
| }) | ||
| } | ||
|
|
||
|
|
@@ -922,7 +968,7 @@ impl<R: Read> StreamReader<R> { | |
| let mut buf = vec![0; message.bodyLength() as usize]; | ||
| self.reader.read_exact(&mut buf)?; | ||
|
|
||
| read_record_batch(&buf, batch, self.schema(), &self.dictionaries_by_field).map(Some) | ||
| read_record_batch(&buf, batch, self.schema(), &self.dictionaries_by_field, self.projection.as_ref().map(|x| x.0.as_ref())).map(Some) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe it is worth adding something like impl<R: Read> StreamReader<R> {
...
/// get projected schema, if any
pub fn projected_schema(&self) -> Option<&Schema> {
...
} |
||
| } | ||
| ipc::MessageHeader::DictionaryBatch => { | ||
| let batch = message.header_as_dictionary_batch().ok_or_else(|| { | ||
|
|
@@ -998,7 +1044,7 @@ mod tests { | |
| )) | ||
| .unwrap(); | ||
|
|
||
| let mut reader = FileReader::try_new(file).unwrap(); | ||
| let mut reader = FileReader::try_new(file, None).unwrap(); | ||
|
|
||
| // read expected JSON output | ||
| let arrow_json = read_gzip_json(version, path); | ||
|
|
@@ -1015,7 +1061,7 @@ mod tests { | |
| testdata | ||
| )) | ||
| .unwrap(); | ||
| FileReader::try_new(file).unwrap(); | ||
| FileReader::try_new(file, None).unwrap(); | ||
| } | ||
|
|
||
| #[test] | ||
|
|
@@ -1031,7 +1077,7 @@ mod tests { | |
| testdata | ||
| )) | ||
| .unwrap(); | ||
| FileReader::try_new(file).unwrap(); | ||
| FileReader::try_new(file, None).unwrap(); | ||
| } | ||
|
|
||
| #[test] | ||
|
|
@@ -1056,7 +1102,39 @@ mod tests { | |
| )) | ||
| .unwrap(); | ||
|
|
||
| FileReader::try_new(file).unwrap(); | ||
| FileReader::try_new(file, None).unwrap(); | ||
| }); | ||
| } | ||
|
|
||
| #[test] | ||
| fn projection_should_work() { | ||
| // complementary to the previous test | ||
| let testdata = crate::util::test_util::arrow_test_data(); | ||
| let paths = vec![ | ||
| "generated_interval", | ||
| "generated_datetime", | ||
| // "generated_map", Err: Last offset 872415232 of Utf8 is larger than values length 52 (https://github.com/apache/arrow-rs/issues/859) | ||
| "generated_nested", | ||
| "generated_null_trivial", | ||
| "generated_null", | ||
| "generated_primitive_no_batches", | ||
| "generated_primitive_zerolength", | ||
| "generated_primitive", | ||
| ]; | ||
| paths.iter().for_each(|path| { | ||
| let file = File::open(format!( | ||
| "{}/arrow-ipc-stream/integration/1.0.0-bigendian/{}.arrow_file", | ||
| testdata, path | ||
| )) | ||
| .unwrap(); | ||
|
|
||
| let reader = FileReader::try_new(file, Some(vec![0])).unwrap(); | ||
| let datatype_0 = reader.schema().fields()[0].data_type().clone(); | ||
| reader.for_each(|batch| { | ||
| let batch = batch.unwrap(); | ||
| assert_eq!(batch.columns().len(), 1); | ||
| assert_eq!(datatype_0, batch.schema().fields()[0].data_type().clone()); | ||
| }); | ||
| }); | ||
| } | ||
|
|
||
|
|
@@ -1083,7 +1161,7 @@ mod tests { | |
| )) | ||
| .unwrap(); | ||
|
|
||
| let mut reader = StreamReader::try_new(file).unwrap(); | ||
| let mut reader = StreamReader::try_new(file, None).unwrap(); | ||
|
|
||
| // read expected JSON output | ||
| let arrow_json = read_gzip_json(version, path); | ||
|
|
@@ -1120,7 +1198,7 @@ mod tests { | |
| )) | ||
| .unwrap(); | ||
|
|
||
| let mut reader = FileReader::try_new(file).unwrap(); | ||
| let mut reader = FileReader::try_new(file, None).unwrap(); | ||
|
|
||
| // read expected JSON output | ||
| let arrow_json = read_gzip_json(version, path); | ||
|
|
@@ -1153,7 +1231,7 @@ mod tests { | |
| )) | ||
| .unwrap(); | ||
|
|
||
| let mut reader = StreamReader::try_new(file).unwrap(); | ||
| let mut reader = StreamReader::try_new(file, None).unwrap(); | ||
|
|
||
| // read expected JSON output | ||
| let arrow_json = read_gzip_json(version, path); | ||
|
|
@@ -1189,7 +1267,7 @@ mod tests { | |
|
|
||
| // read stream back | ||
| let file = File::open("target/debug/testdata/float.stream").unwrap(); | ||
| let reader = StreamReader::try_new(file).unwrap(); | ||
| let reader = StreamReader::try_new(file, None).unwrap(); | ||
|
|
||
| reader.for_each(|batch| { | ||
| let batch = batch.unwrap(); | ||
|
|
@@ -1211,7 +1289,19 @@ mod tests { | |
| .value(0) | ||
| != 0.0 | ||
| ); | ||
| }) | ||
| }); | ||
|
|
||
| let file = File::open("target/debug/testdata/float.stream").unwrap(); | ||
|
|
||
| // Read with projection | ||
| let reader = StreamReader::try_new(file, Some(vec![0, 3])).unwrap(); | ||
|
|
||
| reader.for_each(|batch| { | ||
| let batch = batch.unwrap(); | ||
| assert_eq!(batch.schema().fields().len(), 2); | ||
| assert_eq!(batch.schema().fields()[0].data_type(), &DataType::Float32); | ||
| assert_eq!(batch.schema().fields()[1].data_type(), &DataType::Int32); | ||
| }); | ||
| } | ||
|
|
||
| fn roundtrip_ipc(rb: &RecordBatch) -> RecordBatch { | ||
|
|
@@ -1223,7 +1313,7 @@ mod tests { | |
| drop(writer); | ||
|
|
||
| let mut reader = | ||
| ipc::reader::FileReader::try_new(std::io::Cursor::new(buf)).unwrap(); | ||
| ipc::reader::FileReader::try_new(std::io::Cursor::new(buf), None).unwrap(); | ||
| reader.next().unwrap().unwrap() | ||
| } | ||
|
|
||
|
|
||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.