-
Notifications
You must be signed in to change notification settings - Fork 796
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add IPC StreamDecoder #5531
Add IPC StreamDecoder #5531
Conversation
4428c51
to
f391d74
Compare
f391d74
to
3f83468
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @tustvold -- I went through this PR carefully and it looks good to me. I had some stylistic suggestions but nothing that would prevent this PR from merging, in my opinion.
I do think a truncated stream error is important
Reading through the code I noticed it has non trivial overlap with the FileReader
-- I wonder if it would make sense to use the StreamReader
within the FileReader
and reduce the implementation redundancy 🤔
assert!(arrow_json.equals_reader(&mut reader).unwrap()); | ||
// the next batch must be empty | ||
assert!(reader.next().is_none()); | ||
// the stream must indicate that it's finished | ||
assert!(reader.is_finished()); | ||
} | ||
|
||
// Test stream decoder | ||
let expected = arrow_json.get_record_batches().unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
✅ this is a very nice test
|
||
/// Return the [`Message`] | ||
#[inline] | ||
pub fn as_ref(&self) -> Message<'_> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be more standard to call this function as_message
and then provide an impl AsRef<Message> for MessageBuffer
and that way the compiler can automatically do the deref.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately this isn't possible because of the lifetime on Message
/// Return the [`Message`] | ||
#[inline] | ||
pub fn as_ref(&self) -> Message<'_> { | ||
// SAFETY: Run verifier on construction |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was worried that this is a not a safe API as since this is a public struct and it could be created directly
let buf = MessageBuffer(my_unsafe_buffer);
// cast to message without checks being called:
let my unsafe_message = buf.as_ref();
However, I convinced myself it was actually safe as long as MessageBuffer
is used from some other module:
/// Ok(()) | ||
/// } | ||
/// ``` | ||
pub fn decode(&mut self, buffer: &mut Buffer) -> Result<Option<RecordBatch>, ArrowError> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I double checked decode
is consistent with the names used by the csv reader
} | ||
} | ||
decoder.finish().unwrap(); | ||
assert_eq!(expected, actual); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we also add tests for error conditions?
At the very least I think we should have a test for truncated stream which I think would be the most likely error in practice.
It would be nice to have a test for things like missing dictionaries, but I think that the value of those tests is lower
For the most part the shared logic is already broken out into separate functions already, it is just they require quite a few arguments and are fairly verbose... I agree this could probably be improved |
Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
Which issue does this PR close?
Closes #1207
Rationale for this change
Following on from #5249 this adds a StreamDecoder.
This is the final piece to close off the async support for IPC, in particular we now provide:
What changes are included in this PR?
Are there any user-facing changes?