-
I have I would like to modify this approach to be fully streaming so that arrays are written as they become ready, but run into a bit of a snag in the case that let source_stream = futures::stream::select_all(streams);
let limited_stream = ByteLimitedStream::new(source_stream, dtype, byte_threshold);
while stream_not_exhausted {
limited_stream.reset_bytes();
let output_buffer = VortexWriteOptions::default()
.write(Vec::new(), limited_stream)
.await?;
} However, vortex/vortex-file/src/writer.rs Lines 103 to 107 in 4f6ea56 I realize this might be me just hitting the limits of my rust knowledge, but I feel like I should be able to do something like this and I'm not sure if I need to somehow modify |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 2 replies
-
I had a think about whether we should add a push-based I think you may just need to implement the stream logic you want, which is roughly: take an ArrayStream and turn it into a Stream where you start a new stream whenever some threshold has been reached? I'm about to push a change that makes the writer return a Stream, which may make your accounting a little easier in that you could terminate the stream and start a new one when the file reaches a certain size. Am I understanding the problem correctly? |
Beta Was this translation helpful? Give feedback.
My approach to get this working with the current code has been to wrap
source_stream
in anArc<Mutex>
so that I can clone it and therefore create a newByteLimitedStream
for eachwrite
I need to do. It doesn't feel very pretty because this code is not parallel but it seems like my only alternative to that was using anUnsafeCell
and implementingSend