Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 13 additions & 11 deletions sdk/core/src/seekable_stream.rs
Original file line number Diff line number Diff line change
@@ -1,33 +1,35 @@
use bytes::Bytes;
use futures::io::AsyncRead;
use futures::stream::Stream;
use futures::task::Poll;
use dyn_clone::DynClone;
use futures::{io::AsyncRead, stream::Stream, task::Poll};
use std::{pin::Pin, task::Context};

/// Amount of the stream to buffer in memory during streaming uploads
pub(crate) const DEFAULT_BUFFER_SIZE: usize = 1024 * 64;

/// Enable a type implementing `AsyncRead` to be consumed as if it were
/// a `Stream` of `Bytes`.
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
pub trait SeekableStream:
AsyncRead + Unpin + std::fmt::Debug + Send + Sync + dyn_clone::DynClone
{
pub trait SeekableStream: AsyncRead + Unpin + std::fmt::Debug + Send + Sync + DynClone {
async fn reset(&mut self) -> crate::error::Result<()>;
fn len(&self) -> usize;

fn is_empty(&self) -> bool {
self.len() == 0
}

fn buffer_size(&self) -> usize {
DEFAULT_BUFFER_SIZE
}
}

dyn_clone::clone_trait_object!(SeekableStream);

impl Stream for dyn SeekableStream {
type Item = crate::error::Result<Bytes>;

fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let mut buffer = vec![0_u8; 1024 * 64];
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut buffer = vec![0_u8; self.buffer_size()];

match self.poll_read(cx, &mut buffer) {
Poll::Ready(Ok(0)) => Poll::Ready(None),
Expand Down