Skip to content

feat: Pluggable gc with exemptions #17

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Nov 28, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Add garbage collection to blobs
also add ability to add excepmtions for gc before gc is started.
  • Loading branch information
rklaehn committed Nov 25, 2024
commit 048538f4827a149bfa0044198c1d4c502aa3836b
119 changes: 66 additions & 53 deletions src/net_protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@
#![allow(missing_docs)]

use std::{
collections::BTreeMap,
collections::{BTreeMap, BTreeSet},
fmt::Debug,
ops::DerefMut,
sync::{Arc, OnceLock},
};

use anyhow::{anyhow, Result};
use anyhow::{anyhow, bail, Result};
use futures_lite::future::Boxed as BoxedFuture;
use futures_util::future::BoxFuture;
use iroh_base::hash::{BlobFormat, Hash};
use iroh_net::{endpoint::Connecting, Endpoint, NodeAddr};
use iroh_router::ProtocolHandler;
Expand All @@ -24,27 +26,32 @@ use crate::{
Stats,
},
provider::EventSender,
store::GcConfig,
util::{
local_pool::LocalPoolHandle,
local_pool::{self, LocalPoolHandle},
progress::{AsyncChannelProgressSender, ProgressSender},
SetTagOption,
},
HashAndFormat, TempTag,
};

// pub type ProtectCb = Box<dyn Fn(&mut BTreeSet<Hash>) -> BoxFuture<()> + Send + Sync>;
//
// #[derive(derive_more::Debug)]
// enum GcState {
// Initial(#[debug(skip)] Vec<ProtectCb>),
// Started(#[allow(dead_code)] Option<local_pool::Run<()>>),
// }
//
// impl Default for GcState {
// fn default() -> Self {
// Self::Initial(Vec::new())
// }
// }
/// A callback that blobs can ask about a set of hashes that should not be garbage collected.
pub type ProtectCb = Box<dyn Fn(&mut BTreeSet<Hash>) -> BoxFuture<()> + Send + Sync>;

/// The state of the gc loop.
#[derive(derive_more::Debug)]
enum GcState {
// Gc loop is not yet running. Other protcols can add protect callbacks
Initial(#[debug(skip)] Vec<ProtectCb>),
// Gc loop is running. No more protect callbacks can be added.
Started(#[allow(dead_code)] Option<local_pool::Run<()>>),
}

impl Default for GcState {
fn default() -> Self {
Self::Initial(Vec::new())
}
}

#[derive(Debug)]
pub struct Blobs<S> {
Expand All @@ -54,6 +61,7 @@ pub struct Blobs<S> {
downloader: Downloader,
batches: tokio::sync::Mutex<BlobBatches>,
endpoint: Endpoint,
gc_state: Arc<std::sync::Mutex<GcState>>,
#[cfg(feature = "rpc")]
pub(crate) rpc_handler: Arc<OnceLock<crate::rpc::RpcHandler>>,
}
Expand Down Expand Up @@ -185,6 +193,7 @@ impl<S: crate::store::Store> Blobs<S> {
downloader,
endpoint,
batches: Default::default(),
gc_state: Default::default(),
#[cfg(feature = "rpc")]
rpc_handler: Arc::new(OnceLock::new()),
}
Expand All @@ -206,43 +215,47 @@ impl<S: crate::store::Store> Blobs<S> {
&self.endpoint
}

// pub fn add_protected(&self, cb: ProtectCb) -> Result<()> {
// let mut state = self.gc_state.lock().unwrap();
// match &mut *state {
// GcState::Initial(cbs) => {
// cbs.push(cb);
// }
// GcState::Started(_) => {
// anyhow::bail!("cannot add protected blobs after gc has started");
// }
// }
// Ok(())
// }
//
// pub fn start_gc(&self, config: GcConfig) -> Result<()> {
// let mut state = self.gc_state.lock().unwrap();
// let protected = match state.deref_mut() {
// GcState::Initial(items) => std::mem::take(items),
// GcState::Started(_) => anyhow::bail!("gc already started"),
// };
// let protected = Arc::new(protected);
// let protected_cb = move || {
// let protected = protected.clone();
// async move {
// let mut set = BTreeSet::new();
// for cb in protected.iter() {
// cb(&mut set).await;
// }
// set
// }
// };
// let store = self.store.clone();
// let run = self
// .rt
// .spawn(move || async move { store.gc_run(config, protected_cb).await });
// *state = GcState::Started(Some(run));
// Ok(())
// }
/// Add a callback that will be called before the garbage collector runs.
///
/// This can only be called before the garbage collector has started, otherwise it will return an error.
pub fn add_protected(&self, cb: ProtectCb) -> Result<()> {
let mut state = self.gc_state.lock().unwrap();
match &mut *state {
GcState::Initial(cbs) => {
cbs.push(cb);
}
GcState::Started(_) => {
anyhow::bail!("cannot add protected blobs after gc has started");
}
}
Ok(())
}

/// Start garbage collection with the given settings.
pub fn start_gc(&self, config: GcConfig) -> Result<()> {
let mut state = self.gc_state.lock().unwrap();
let protected = match state.deref_mut() {
GcState::Initial(items) => std::mem::take(items),
GcState::Started(_) => bail!("gc already started"),
};
let protected = Arc::new(protected);
let protected_cb = move || {
let protected = protected.clone();
async move {
let mut set = BTreeSet::new();
for cb in protected.iter() {
cb(&mut set).await;
}
set
}
};
let store = self.store.clone();
let run = self
.rt
.spawn(move || async move { store.gc_run(config, protected_cb).await });
*state = GcState::Started(Some(run));
Ok(())
}

pub(crate) async fn batches(&self) -> tokio::sync::MutexGuard<'_, BlobBatches> {
self.batches.lock().await
Expand Down
Loading