Skip to content

Commit d46810d

Browse files
Frandomatheus23
authored andcommitted
feat: verified payload streaming
1 parent 2337e46 commit d46810d

File tree

1 file changed

+96
-2
lines changed

1 file changed

+96
-2
lines changed

src/store/traits.rs

Lines changed: 96 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,18 @@ use std::{collections::BTreeSet, future::Future, io, path::PathBuf, time::Durati
33

44
pub use bao_tree;
55
use bao_tree::{
6-
io::fsm::{BaoContentItem, Outboard},
6+
io::{
7+
fsm::{
8+
encode_ranges_validated, BaoContentItem, Outboard, ResponseDecoder, ResponseDecoderNext,
9+
},
10+
DecodeError,
11+
},
712
BaoTree, ChunkRanges,
813
};
914
use bytes::Bytes;
1015
use futures_lite::{Stream, StreamExt};
1116
use genawaiter::rc::{Co, Gen};
12-
use iroh_io::AsyncSliceReader;
17+
use iroh_io::{AsyncSliceReader, AsyncStreamReader, AsyncStreamWriter};
1318
pub use range_collections;
1419
use serde::{Deserialize, Serialize};
1520
use tokio::io::AsyncRead;
@@ -90,6 +95,26 @@ pub trait MapEntry: std::fmt::Debug + Clone + Send + Sync + 'static {
9095
fn outboard(&self) -> impl Future<Output = io::Result<impl Outboard>> + Send;
9196
/// A future that resolves to a reader that can be used to read the data
9297
fn data_reader(&self) -> impl Future<Output = io::Result<impl AsyncSliceReader>> + Send;
98+
99+
/// Encodes data and outboard into a stream which can be imported with [`Store::import_verifiable_stream`].
100+
///
101+
/// Returns immediately without error if `start` is equal or larger than the entry's size.
102+
fn write_verifiable_stream<'a>(
103+
&'a self,
104+
start: u64,
105+
writer: impl AsyncStreamWriter + 'a,
106+
) -> impl Future<Output = io::Result<()>> + 'a {
107+
async move {
108+
let size = self.size().value();
109+
if start >= size {
110+
return Ok(());
111+
}
112+
let ranges = range_from_offset_and_length(start, size - start);
113+
let (outboard, data) = tokio::try_join!(self.outboard(), self.data_reader())?;
114+
encode_ranges_validated(data, outboard, &ranges, writer).await?;
115+
Ok(())
116+
}
117+
}
93118
}
94119

95120
/// A generic map from hashes to bao blobs (blobs with bao outboards).
@@ -341,6 +366,70 @@ pub trait Store: ReadableStore + MapMut + std::fmt::Debug {
341366
self.import_stream(stream, format, progress)
342367
}
343368

369+
/// Import a blob from a verified stream, as emitted by [`MapEntry::write_verifiable_stream`];
370+
fn import_verifiable_stream<'a>(
371+
&'a self,
372+
hash: Hash,
373+
total_size: u64,
374+
stream_offset: u64,
375+
reader: impl AsyncStreamReader + 'a,
376+
) -> impl Future<Output = io::Result<()>> + 'a {
377+
async move {
378+
if stream_offset >= total_size {
379+
return Err(io::Error::new(
380+
io::ErrorKind::InvalidInput,
381+
"offset must not be greater than total_size",
382+
));
383+
}
384+
let entry = self.get_or_create(hash, total_size).await?;
385+
let mut bw = entry.batch_writer().await?;
386+
387+
let ranges = range_from_offset_and_length(stream_offset, total_size - stream_offset);
388+
let mut decoder = ResponseDecoder::new(
389+
hash.into(),
390+
ranges,
391+
BaoTree::new(total_size, IROH_BLOCK_SIZE),
392+
reader,
393+
);
394+
let size = decoder.tree().size();
395+
let mut buf = Vec::new();
396+
let is_complete = loop {
397+
decoder = match decoder.next().await {
398+
ResponseDecoderNext::More((decoder, item)) => {
399+
let item = match item {
400+
Err(DecodeError::LeafNotFound(_) | DecodeError::ParentNotFound(_)) => {
401+
break false
402+
}
403+
Err(err) => return Err(err.into()),
404+
Ok(item) => item,
405+
};
406+
match &item {
407+
BaoContentItem::Parent(_) => {
408+
buf.push(item);
409+
}
410+
BaoContentItem::Leaf(_) => {
411+
buf.push(item);
412+
let batch = std::mem::take(&mut buf);
413+
bw.write_batch(size, batch).await?;
414+
}
415+
}
416+
decoder
417+
}
418+
ResponseDecoderNext::Done(_reader) => {
419+
debug_assert!(buf.is_empty(), "last node of bao tree must be leaf node");
420+
break true;
421+
}
422+
};
423+
};
424+
bw.sync().await?;
425+
drop(bw);
426+
if is_complete {
427+
self.insert_complete(entry).await?;
428+
}
429+
Ok(())
430+
}
431+
}
432+
344433
/// Set a tag
345434
fn set_tag(
346435
&self,
@@ -386,6 +475,11 @@ pub trait Store: ReadableStore + MapMut + std::fmt::Debug {
386475
}
387476
}
388477

478+
fn range_from_offset_and_length(offset: u64, length: u64) -> bao_tree::ChunkRanges {
479+
let ranges = bao_tree::ByteRanges::from(offset..(offset + length));
480+
bao_tree::io::round_up_to_chunks(&ranges)
481+
}
482+
389483
async fn validate_impl(
390484
store: &impl Store,
391485
repair: bool,

0 commit comments

Comments
 (0)