Skip to content

Async KV Store #3778

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 35 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
c09d41c
Separate sweeper persistent state
joostjager Jun 9, 2025
84ce2f2
Persist sweeper state as part of background process
joostjager Jun 9, 2025
f7d86d8
Async kv store in sweeper
joostjager Jun 11, 2025
a016c2a
Rustfmt chainmonitor.rs
joostjager Jun 12, 2025
543aaed
Formatting improvements chainmonitor.rs
joostjager Jun 12, 2025
bcf4932
Use new in chainmonitor tests
joostjager Jun 13, 2025
d04fce5
Async KVStore
joostjager Jun 13, 2025
8cd1e99
persistsync in bg processor syn
joostjager Jun 13, 2025
3a0923b
fix archive_fully_resolved_channel_monitors
joostjager Jun 13, 2025
969574f
all
joostjager Jun 13, 2025
88787dc
wip
joostjager Jun 13, 2025
789faaf
fixes wip
joostjager Jun 13, 2025
72226d5
more wrapping
joostjager Jun 13, 2025
e47e7ad
more wrapping
joostjager Jun 13, 2025
abaaa85
more pass through
joostjager Jun 13, 2025
492526a
update sigs
joostjager Jun 13, 2025
5deb3ba
fix ret
joostjager Jun 16, 2025
a236f25
tests commented out
joostjager Jun 16, 2025
82cf608
todos removed
joostjager Jun 16, 2025
72047fc
try to get tests working, borrow problems
joostjager Jun 23, 2025
c72c883
one shot channels for TestPersisteR
joostjager Jun 23, 2025
0685410
test fixed
joostjager Jun 23, 2025
80f0189
re-rename channel_monitor_updated
joostjager Jun 23, 2025
4490f37
wip
joostjager Jun 23, 2025
8f66fb4
wip 2
joostjager Jun 23, 2025
5e844b9
test fixed
joostjager Jun 23, 2025
113970b
fix test
joostjager Jun 24, 2025
8f16fe0
wip filesystemstore
joostjager Jun 24, 2025
35f79b4
more wrappers
joostjager Jun 24, 2025
978e720
fixes
joostjager Jun 24, 2025
01fb154
wip
joostjager Jun 24, 2025
a760c3f
fix order
joostjager Jun 24, 2025
cfcad99
fix fsstore test
joostjager Jun 24, 2025
adda973
lightning-persister tests fixed
joostjager Jun 24, 2025
e953e8a
try fix bg proc
joostjager Jun 25, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 49 additions & 20 deletions lightning-background-processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ use lightning::chain;
use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
use lightning::chain::chainmonitor::{ChainMonitor, Persist};
#[cfg(feature = "std")]
use lightning::chain::chainmonitor::{ChainMonitorSync, PersistSync};
#[cfg(feature = "std")]
use lightning::events::EventHandler;
#[cfg(feature = "std")]
use lightning::events::EventsProvider;
Expand All @@ -39,8 +41,9 @@ use lightning::sign::ChangeDestinationSource;
use lightning::sign::ChangeDestinationSourceSync;
use lightning::sign::EntropySource;
use lightning::sign::OutputSpender;
use lightning::util::async_poll::FutureSpawner;
use lightning::util::logger::Logger;
use lightning::util::persist::{KVStore, Persister};
use lightning::util::persist::{KVStore, KVStoreSync, Persister, PersisterSync};
use lightning::util::sweep::OutputSweeper;
#[cfg(feature = "std")]
use lightning::util::sweep::OutputSweeperSync;
Expand Down Expand Up @@ -239,7 +242,8 @@ impl<
G,
&'a (dyn UtxoLookup + Send + Sync),
L,
> where
>
where
L::Target: Logger,
{
/// Initializes a new [`GossipSync::Rapid`] variant.
Expand All @@ -256,7 +260,8 @@ impl<'a, L: Deref>
&'a NetworkGraph<L>,
&'a (dyn UtxoLookup + Send + Sync),
L,
> where
>
where
L::Target: Logger,
{
/// Initializes a new [`GossipSync::None`] variant.
Expand Down Expand Up @@ -311,6 +316,15 @@ fn update_scorer<'a, S: 'static + Deref<Target = SC> + Send + Sync, SC: 'a + Wri
true
}

macro_rules! maybe_await {
(true, $e:expr) => {
$e.await
};
(false, $e:expr) => {
$e
};
}

macro_rules! define_run_body {
(
$persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr,
Expand All @@ -319,7 +333,7 @@ macro_rules! define_run_body {
$peer_manager: ident, $gossip_sync: ident,
$process_sweeper: expr,
$logger: ident, $scorer: ident, $loop_exit_check: expr, $await: expr, $get_timer: expr,
$timer_elapsed: expr, $check_slow_await: expr, $time_fetch: expr,
$timer_elapsed: expr, $check_slow_await: expr, $time_fetch: expr, $async: tt,
) => { {
log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
$channel_manager.get_cm().timer_tick_occurred();
Expand Down Expand Up @@ -375,7 +389,7 @@ macro_rules! define_run_body {

if $channel_manager.get_cm().get_and_clear_needs_persistence() {
log_trace!($logger, "Persisting ChannelManager...");
$persister.persist_manager(&$channel_manager)?;
maybe_await!($async, $persister.persist_manager(&$channel_manager))?;
log_trace!($logger, "Done persisting ChannelManager.");
}
if $timer_elapsed(&mut last_freshness_call, FRESHNESS_TIMER) {
Expand Down Expand Up @@ -436,7 +450,7 @@ macro_rules! define_run_body {
log_trace!($logger, "Persisting network graph.");
}

if let Err(e) = $persister.persist_graph(network_graph) {
if let Err(e) = maybe_await!($async, $persister.persist_graph(network_graph)) {
log_error!($logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
}

Expand Down Expand Up @@ -464,7 +478,7 @@ macro_rules! define_run_body {
} else {
log_trace!($logger, "Persisting scorer");
}
if let Err(e) = $persister.persist_scorer(&scorer) {
if let Err(e) = maybe_await!($async, $persister.persist_scorer(&scorer)) {
log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
}
}
Expand All @@ -487,16 +501,16 @@ macro_rules! define_run_body {
// After we exit, ensure we persist the ChannelManager one final time - this avoids
// some races where users quit while channel updates were in-flight, with
// ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
$persister.persist_manager(&$channel_manager)?;
maybe_await!($async, $persister.persist_manager(&$channel_manager))?;

// Persist Scorer on exit
if let Some(ref scorer) = $scorer {
$persister.persist_scorer(&scorer)?;
maybe_await!($async, $persister.persist_scorer(&scorer))?;
}

// Persist NetworkGraph on exit
if let Some(network_graph) = $gossip_sync.network_graph() {
$persister.persist_graph(network_graph)?;
maybe_await!($async, $persister.persist_graph(network_graph))?;
}

Ok(())
Expand Down Expand Up @@ -782,8 +796,11 @@ pub async fn process_events_async<
EventHandler: Fn(Event) -> EventHandlerFuture,
PS: 'static + Deref + Send,
ES: 'static + Deref + Send,
FS: FutureSpawner,
M: 'static
+ Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P, ES>>
+ Deref<
Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P, ES, FS>,
>
+ Send
+ Sync,
CM: 'static + Deref,
Expand Down Expand Up @@ -841,7 +858,7 @@ where
if let Some(duration_since_epoch) = fetch_time() {
if update_scorer(scorer, &event, duration_since_epoch) {
log_trace!(logger, "Persisting scorer after update");
if let Err(e) = persister.persist_scorer(&*scorer) {
if let Err(e) = persister.persist_scorer(&*scorer).await {
log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e);
// We opt not to abort early on persistence failure here as persisting
// the scorer is non-critical and we still hope that it will have
Expand Down Expand Up @@ -919,6 +936,7 @@ where
},
mobile_interruptable_platform,
fetch_time,
true,
)
}

Expand Down Expand Up @@ -982,7 +1000,15 @@ impl BackgroundProcessor {
ES: 'static + Deref + Send,
M: 'static
+ Deref<
Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P, ES>,
Target = ChainMonitorSync<
<CM::Target as AChannelManager>::Signer,
CF,
T,
F,
L,
P,
ES,
>,
>
+ Send
+ Sync,
Expand All @@ -1009,16 +1035,16 @@ impl BackgroundProcessor {
T::Target: 'static + BroadcasterInterface,
F::Target: 'static + FeeEstimator,
L::Target: 'static + Logger,
P::Target: 'static + Persist<<CM::Target as AChannelManager>::Signer>,
PS::Target: 'static + Persister<'a, CM, L, S>,
P::Target: 'static + PersistSync<<CM::Target as AChannelManager>::Signer>,
PS::Target: 'static + PersisterSync<'a, CM, L, S>,
ES::Target: 'static + EntropySource,
CM::Target: AChannelManager,
OM::Target: AOnionMessenger,
PM::Target: APeerManager,
LM::Target: ALiquidityManager,
D::Target: ChangeDestinationSourceSync,
O::Target: 'static + OutputSpender,
K::Target: 'static + KVStore,
K::Target: 'static + KVStoreSync,
{
let stop_thread = Arc::new(AtomicBool::new(false));
let stop_thread_clone = stop_thread.clone();
Expand Down Expand Up @@ -1098,6 +1124,7 @@ impl BackgroundProcessor {
.expect("Time should be sometime after 1970"),
)
},
false,
)
});
Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
Expand Down Expand Up @@ -1186,15 +1213,16 @@ mod tests {
use lightning::types::payment::PaymentHash;
use lightning::util::config::UserConfig;
use lightning::util::persist::{
KVStore, CHANNEL_MANAGER_PERSISTENCE_KEY, CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
KVStoreSync, CHANNEL_MANAGER_PERSISTENCE_KEY,
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_KEY,
NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
SCORER_PERSISTENCE_KEY, SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
};
use lightning::util::ser::Writeable;
use lightning::util::sweep::{OutputSpendStatus, OutputSweeperSync, PRUNE_DELAY_BLOCKS};
use lightning::util::test_utils;
use lightning::util::test_utils::{self, TokioSpawner};
use lightning::{get_event, get_event_msg};
use lightning_liquidity::LiquidityManager;
use lightning_persister::fs_store::FilesystemStore;
Expand Down Expand Up @@ -1258,6 +1286,7 @@ mod tests {
Arc<test_utils::TestLogger>,
Arc<FilesystemStore>,
Arc<KeysManager>,
TokioSpawner,
>;

type PGS = Arc<
Expand Down Expand Up @@ -1420,7 +1449,7 @@ mod tests {
}
}

impl KVStore for Persister {
impl KVStoreSync for Persister {
fn read(
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
) -> lightning::io::Result<Vec<u8>> {
Expand Down Expand Up @@ -1665,7 +1694,7 @@ mod tests {
Arc::new(FilesystemStore::new(format!("{}_persister_{}", &persist_dir, i).into()));
let now = Duration::from_secs(genesis_block.header.time as u64);
let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos()));
let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(
let chain_monitor = Arc::new(chainmonitor::ChainMonitorSync::new(
Some(chain_source.clone()),
tx_broadcaster.clone(),
logger.clone(),
Expand Down
32 changes: 16 additions & 16 deletions lightning-persister/src/fs_store.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
//! Objects related to [`FilesystemStore`] live here.
use crate::utils::{check_namespace_key_validity, is_valid_kvstore_str};

use lightning::util::persist::{KVStore, MigratableKVStore};
use lightning::util::async_poll::{AsyncResult, AsyncResultType};
use lightning::util::persist::{KVStore, KVStoreSync, MigratableKVStore, MigratableKVStoreSync};
use lightning::util::string::PrintableString;

use std::collections::HashMap;
Expand Down Expand Up @@ -92,7 +93,7 @@ impl FilesystemStore {
}
}

impl KVStore for FilesystemStore {
impl KVStoreSync for FilesystemStore {
fn read(
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
) -> lightning::io::Result<Vec<u8>> {
Expand Down Expand Up @@ -120,7 +121,7 @@ impl KVStore for FilesystemStore {

fn write(
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8],
) -> lightning::io::Result<()> {
) -> Result<(), lightning::io::Error> {
check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "write")?;

let mut dest_file_path = self.get_dest_dir_path(primary_namespace, secondary_namespace)?;
Expand Down Expand Up @@ -425,7 +426,7 @@ fn get_key_from_dir_entry(p: &Path, base_path: &Path) -> Result<String, lightnin
}
}

impl MigratableKVStore for FilesystemStore {
impl MigratableKVStoreSync for FilesystemStore {
fn list_all_keys(&self) -> Result<Vec<(String, String, String)>, lightning::io::Error> {
let prefixed_dest = &self.data_dir;
if !prefixed_dest.exists() {
Expand Down Expand Up @@ -498,13 +499,12 @@ mod tests {
do_read_write_remove_list_persist, do_test_data_migration, do_test_store,
};

use lightning::chain::chainmonitor::Persist;
use lightning::chain::ChannelMonitorUpdateStatus;
use lightning::chain::chainmonitor::PersistSync;
use lightning::check_closed_event;
use lightning::events::ClosureReason;
use lightning::ln::functional_test_utils::*;
use lightning::ln::msgs::BaseMessageHandler;
use lightning::util::persist::read_channel_monitors;
use lightning::util::persist::{read_channel_monitors_sync, KVStoreSyncWrapper};
use lightning::util::test_utils;

impl Drop for FilesystemStore {
Expand Down Expand Up @@ -541,7 +541,8 @@ mod tests {

#[test]
fn test_if_monitors_is_not_dir() {
let store = FilesystemStore::new("test_monitors_is_not_dir".into());
let store = Arc::new(FilesystemStore::new("test_monitors_is_not_dir".into()));
let store_async = Arc::new(KVStoreSyncWrapper(Arc::clone(&store)));

fs::create_dir_all(&store.get_data_dir()).unwrap();
let mut path = std::path::PathBuf::from(&store.get_data_dir());
Expand All @@ -555,7 +556,7 @@ mod tests {
&chanmon_cfgs[0].tx_broadcaster,
&chanmon_cfgs[0].logger,
&chanmon_cfgs[0].fee_estimator,
&store,
&store_async,
node_cfgs[0].keys_manager,
);
node_cfgs[0].chain_monitor = chain_mon_0;
Expand All @@ -564,17 +565,16 @@ mod tests {

// Check that read_channel_monitors() returns error if monitors/ is not a
// directory.
assert!(
read_channel_monitors(&store, nodes[0].keys_manager, nodes[0].keys_manager).is_err()
);
assert!(read_channel_monitors_sync(store, nodes[0].keys_manager, nodes[0].keys_manager)
.is_err());
}

#[test]
fn test_filesystem_store() {
// Create the nodes, giving them FilesystemStores for data stores.
let store_0 = FilesystemStore::new("test_filesystem_store_0".into());
let store_1 = FilesystemStore::new("test_filesystem_store_1".into());
do_test_store(&store_0, &store_1)
let store_0 = Arc::new(FilesystemStore::new("test_filesystem_store_0".into()));
let store_1 = Arc::new(FilesystemStore::new("test_filesystem_store_1".into()));
do_test_store(store_0, store_1)
}

// Test that if the store's path to channel data is read-only, writing a
Expand Down Expand Up @@ -621,7 +621,7 @@ mod tests {

let monitor_name = added_monitors[0].1.persistence_key();
match store.persist_new_channel(monitor_name, &added_monitors[0].1) {
ChannelMonitorUpdateStatus::UnrecoverableError => {},
Err(()) => {},
_ => panic!("unexpected result from persisting new channel"),
}

Expand Down
Loading