Skip to content

Commit

Permalink
correctly report error on compaction failure
Browse files Browse the repository at this point in the history
  • Loading branch information
MarinPostma committed Oct 2, 2024
1 parent 54ff421 commit eb59a04
Showing 1 changed file with 46 additions and 27 deletions.
73 changes: 46 additions & 27 deletions libsql-wal/src/storage/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ impl<B> Compactor<B> {
out_index: Option<&'a mut MapBuilder<Vec<u8>>>,
mut progress: impl FnMut(u32, u32) + 'a,
) -> (
impl Stream<Item = Result<(CompactedFrameHeader, Bytes)>> + 'a,
impl Stream<Item = crate::storage::Result<(CompactedFrameHeader, Bytes)>> + 'a,
CompactedSegmentHeader,
)
where
Expand Down Expand Up @@ -363,7 +363,7 @@ impl<B> Compactor<B> {
let mut builder = MapBuilder::new(Vec::new()).unwrap();

let (sender, mut receiver) = tokio::sync::mpsc::channel::<crate::storage::Result<Bytes>>(1);
let handle: JoinHandle<Result<()>> = match out_path {
let mut handle: JoinHandle<Result<()>> = match out_path {
Some(path) => {
let path = path.join(&format!("{new_key}.seg"));
let mut data_file = tokio::fs::File::create(path).await?;
Expand Down Expand Up @@ -397,37 +397,56 @@ impl<B> Compactor<B> {
}
};

let (stream, segment_header) = self
.dedup_stream(set.clone(), Some(&mut builder), progress)
.await;
let send_fut = async {
let (stream, segment_header) = self
.dedup_stream(set.clone(), Some(&mut builder), progress)
.await;

sender
.send(Ok(Bytes::copy_from_slice(segment_header.as_bytes())))
.await
.unwrap();
if sender
.send(Ok(Bytes::copy_from_slice(segment_header.as_bytes())))
.await
.is_err()
{
return;
}

{
tokio::pin!(stream);
loop {
match stream.next().await {
Some(Ok((frame_header, frame_data))) => {
sender
.send(Ok(Bytes::copy_from_slice(frame_header.as_bytes())))
.await
.unwrap();
sender.send(Ok(frame_data)).await.unwrap();
}
Some(Err(_e)) => {
panic!()
// sender.send(Err(e.into())).await.unwrap();
{
tokio::pin!(stream);
loop {
match stream.next().await {
Some(Ok((frame_header, frame_data))) => {
if sender
.send(Ok(Bytes::copy_from_slice(frame_header.as_bytes())))
.await
.is_err()
{
return;
}
if sender.send(Ok(frame_data)).await.is_err() {
return;
}
}
Some(Err(e)) => {
sender.send(Err(e.into())).await.unwrap();
return;
}
None => break,
}
None => break,
}

drop(sender);
}
drop(sender);
}
};

tokio::select! {
res = &mut handle => {
res.unwrap()?;
},
_ = send_fut => {
handle.await.unwrap()?;

handle.await.unwrap()?;
}
}

let index = builder.into_inner().unwrap();
match out_path {
Expand Down

0 comments on commit eb59a04

Please sign in to comment.