Skip to content

Async KV Store #3778

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 10 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 76 additions & 69 deletions lightning-background-processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use lightning::sign::ChangeDestinationSource;
#[cfg(feature = "std")]
use lightning::sign::ChangeDestinationSourceSync;
use lightning::sign::OutputSpender;
use lightning::util::async_poll::FutureSpawner;
use lightning::util::logger::Logger;
use lightning::util::persist::{KVStore, Persister};
use lightning::util::sweep::OutputSweeper;
Expand Down Expand Up @@ -374,7 +375,7 @@ macro_rules! define_run_body {

if $channel_manager.get_cm().get_and_clear_needs_persistence() {
log_trace!($logger, "Persisting ChannelManager...");
$persister.persist_manager(&$channel_manager)?;
$persister.persist_manager(&$channel_manager).await?;
log_trace!($logger, "Done persisting ChannelManager.");
}
if $timer_elapsed(&mut last_freshness_call, FRESHNESS_TIMER) {
Expand Down Expand Up @@ -435,7 +436,7 @@ macro_rules! define_run_body {
log_trace!($logger, "Persisting network graph.");
}

if let Err(e) = $persister.persist_graph(network_graph) {
if let Err(e) = $persister.persist_graph(network_graph).await {
log_error!($logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
}

Expand Down Expand Up @@ -463,7 +464,7 @@ macro_rules! define_run_body {
} else {
log_trace!($logger, "Persisting scorer");
}
if let Err(e) = $persister.persist_scorer(&scorer) {
if let Err(e) = $persister.persist_scorer(&scorer).await {
log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
}
}
Expand All @@ -486,16 +487,16 @@ macro_rules! define_run_body {
// After we exit, ensure we persist the ChannelManager one final time - this avoids
// some races where users quit while channel updates were in-flight, with
// ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
$persister.persist_manager(&$channel_manager)?;
$persister.persist_manager(&$channel_manager).await?;

// Persist Scorer on exit
if let Some(ref scorer) = $scorer {
$persister.persist_scorer(&scorer)?;
$persister.persist_scorer(&scorer).await?;
}

// Persist NetworkGraph on exit
if let Some(network_graph) = $gossip_sync.network_graph() {
$persister.persist_graph(network_graph)?;
$persister.persist_graph(network_graph).await?;
}

Ok(())
Expand Down Expand Up @@ -780,8 +781,9 @@ pub async fn process_events_async<
EventHandlerFuture: core::future::Future<Output = Result<(), ReplayEvent>>,
EventHandler: Fn(Event) -> EventHandlerFuture,
PS: 'static + Deref + Send,
FS: FutureSpawner,
M: 'static
+ Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P>>
+ Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P, FS>>
+ Send
+ Sync,
CM: 'static + Deref,
Expand Down Expand Up @@ -838,7 +840,7 @@ where
if let Some(duration_since_epoch) = fetch_time() {
if update_scorer(scorer, &event, duration_since_epoch) {
log_trace!(logger, "Persisting scorer after update");
if let Err(e) = persister.persist_scorer(&*scorer) {
if let Err(e) = persister.persist_scorer(&*scorer).await {
log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e);
// We opt not to abort early on persistence failure here as persisting
// the scorer is non-critical and we still hope that it will have
Expand Down Expand Up @@ -977,7 +979,9 @@ impl BackgroundProcessor {
EH: 'static + EventHandler + Send,
PS: 'static + Deref + Send,
M: 'static
+ Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P>>
+ Deref<
Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P, FS>,
>
+ Send
+ Sync,
CM: 'static + Deref + Send,
Expand All @@ -992,6 +996,7 @@ impl BackgroundProcessor {
O: 'static + Deref,
K: 'static + Deref,
OS: 'static + Deref<Target = OutputSweeperSync<T, D, F, CF, K, L, O>> + Send,
FS: FutureSpawner,
>(
persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
onion_messenger: Option<OM>, gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM,
Expand Down Expand Up @@ -1028,70 +1033,72 @@ impl BackgroundProcessor {
.expect("Time should be sometime after 1970");
if update_scorer(scorer, &event, duration_since_epoch) {
log_trace!(logger, "Persisting scorer after update");
if let Err(e) = persister.persist_scorer(&scorer) {
log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
}
// if let Err(e) = persister.persist_scorer(&scorer).await {
// log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
// }
}
}
event_handler.handle_event(event)
};
define_run_body!(
persister,
chain_monitor,
chain_monitor.process_pending_events(&event_handler),
channel_manager,
channel_manager.get_cm().process_pending_events(&event_handler),
onion_messenger,
if let Some(om) = &onion_messenger {
om.get_om().process_pending_events(&event_handler)
},
peer_manager,
gossip_sync,
{
if let Some(ref sweeper) = sweeper {
let _ = sweeper.regenerate_and_broadcast_spend_if_necessary();
}
},
logger,
scorer,
stop_thread.load(Ordering::Acquire),
{
let sleeper = match (onion_messenger.as_ref(), liquidity_manager.as_ref()) {
(Some(om), Some(lm)) => Sleeper::from_four_futures(
&channel_manager.get_cm().get_event_or_persistence_needed_future(),
&chain_monitor.get_update_future(),
&om.get_om().get_update_future(),
&lm.get_lm().get_pending_msgs_future(),
),
(Some(om), None) => Sleeper::from_three_futures(
&channel_manager.get_cm().get_event_or_persistence_needed_future(),
&chain_monitor.get_update_future(),
&om.get_om().get_update_future(),
),
(None, Some(lm)) => Sleeper::from_three_futures(
&channel_manager.get_cm().get_event_or_persistence_needed_future(),
&chain_monitor.get_update_future(),
&lm.get_lm().get_pending_msgs_future(),
),
(None, None) => Sleeper::from_two_futures(
&channel_manager.get_cm().get_event_or_persistence_needed_future(),
&chain_monitor.get_update_future(),
),
};
sleeper.wait_timeout(Duration::from_millis(100));
},
|_| Instant::now(),
|time: &Instant, dur| time.elapsed().as_secs() > dur,
false,
|| {
use std::time::SystemTime;
Some(
SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.expect("Time should be sometime after 1970"),
)
},
)
// define_run_body!(
// persister,
// chain_monitor,
// chain_monitor.process_pending_events(&event_handler),
// channel_manager,
// channel_manager.get_cm().process_pending_events(&event_handler),
// onion_messenger,
// if let Some(om) = &onion_messenger {
// om.get_om().process_pending_events(&event_handler)
// },
// peer_manager,
// gossip_sync,
// {
// if let Some(ref sweeper) = sweeper {
// let _ = sweeper.regenerate_and_broadcast_spend_if_necessary();
// }
// },
// logger,
// scorer,
// stop_thread.load(Ordering::Acquire),
// {
// let sleeper = match (onion_messenger.as_ref(), liquidity_manager.as_ref()) {
// (Some(om), Some(lm)) => Sleeper::from_four_futures(
// &channel_manager.get_cm().get_event_or_persistence_needed_future(),
// &chain_monitor.get_update_future(),
// &om.get_om().get_update_future(),
// &lm.get_lm().get_pending_msgs_future(),
// ),
// (Some(om), None) => Sleeper::from_three_futures(
// &channel_manager.get_cm().get_event_or_persistence_needed_future(),
// &chain_monitor.get_update_future(),
// &om.get_om().get_update_future(),
// ),
// (None, Some(lm)) => Sleeper::from_three_futures(
// &channel_manager.get_cm().get_event_or_persistence_needed_future(),
// &chain_monitor.get_update_future(),
// &lm.get_lm().get_pending_msgs_future(),
// ),
// (None, None) => Sleeper::from_two_futures(
// &channel_manager.get_cm().get_event_or_persistence_needed_future(),
// &chain_monitor.get_update_future(),
// ),
// };
// sleeper.wait_timeout(Duration::from_millis(100));
// },
// |_| Instant::now(),
// |time: &Instant, dur| time.elapsed().as_secs() > dur,
// false,
// || {
// use std::time::SystemTime;
// Some(
// SystemTime::now()
// .duration_since(SystemTime::UNIX_EPOCH)
// .expect("Time should be sometime after 1970"),
// )
// },
// )

Ok(())
});
Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
}
Expand Down
22 changes: 20 additions & 2 deletions lightning-persister/src/fs_store.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Objects related to [`FilesystemStore`] live here.
use crate::utils::{check_namespace_key_validity, is_valid_kvstore_str};

use lightning::util::async_poll::{AsyncResult, AsyncResultType};
use lightning::util::persist::{KVStore, MigratableKVStore};
use lightning::util::string::PrintableString;

Expand Down Expand Up @@ -92,7 +93,7 @@ impl FilesystemStore {
}
}

impl KVStore for FilesystemStore {
impl FilesystemStore {
fn read(
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
) -> lightning::io::Result<Vec<u8>> {
Expand Down Expand Up @@ -120,7 +121,7 @@ impl KVStore for FilesystemStore {

fn write(
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8],
) -> lightning::io::Result<()> {
) -> Result<(), lightning::io::Error> {
check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "write")?;

let mut dest_file_path = self.get_dest_dir_path(primary_namespace, secondary_namespace)?;
Expand Down Expand Up @@ -204,6 +205,23 @@ impl KVStore for FilesystemStore {

res
}
}

impl KVStore for FilesystemStore {
fn read(
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
) -> AsyncResultType<'static, Vec<u8>, lightning::io::Error> {
let res = self.read(primary_namespace, secondary_namespace, key);
Box::pin(async move { res })
}

fn write(
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8],
) -> AsyncResultType<'static, (), lightning::io::Error> {
let res = self.write(primary_namespace, secondary_namespace, key, buf);

Box::pin(async move { res })
}

fn remove(
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,
Expand Down
Loading
Loading