Skip to content

Commit dc6171a

Browse files
committed
Shutdown: Wait for event processing to fully stop
.. before initiating the Runtime shutdown.
1 parent 52ee854 commit dc6171a

File tree

2 files changed

+63
-1
lines changed

2 files changed

+63
-1
lines changed

src/builder.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -972,6 +972,7 @@ fn build_with_store_internal(
972972
};
973973

974974
let (stop_sender, _) = tokio::sync::watch::channel(());
975+
let (event_handling_stopped_sender, _) = tokio::sync::watch::channel(());
975976

976977
let is_listening = Arc::new(AtomicBool::new(false));
977978
let latest_wallet_sync_timestamp = Arc::new(RwLock::new(None));
@@ -983,6 +984,7 @@ fn build_with_store_internal(
983984
Ok(Node {
984985
runtime,
985986
stop_sender,
987+
event_handling_stopped_sender,
986988
config,
987989
wallet,
988990
tx_sync,

src/lib.rs

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,7 @@ uniffi::include_scaffolding!("ldk_node");
171171
pub struct Node {
172172
runtime: Arc<RwLock<Option<tokio::runtime::Runtime>>>,
173173
stop_sender: tokio::sync::watch::Sender<()>,
174+
event_handling_stopped_sender: tokio::sync::watch::Sender<()>,
174175
config: Arc<Config>,
175176
wallet: Arc<Wallet>,
176177
tx_sync: Arc<EsploraSyncClient<Arc<FilesystemLogger>>>,
@@ -714,6 +715,7 @@ impl Node {
714715
};
715716

716717
let background_stop_logger = Arc::clone(&self.logger);
718+
let event_handling_stopped_sender = self.event_handling_stopped_sender.clone();
717719
runtime.spawn(async move {
718720
process_events_async(
719721
background_persister,
@@ -734,6 +736,18 @@ impl Node {
734736
panic!("Failed to process events");
735737
});
736738
log_trace!(background_stop_logger, "Events processing stopped.",);
739+
740+
match event_handling_stopped_sender.send(()) {
741+
Ok(_) => (),
742+
Err(e) => {
743+
log_error!(
744+
background_stop_logger,
745+
"Failed to send 'events handling stopped' signal. This should never happen: {}",
746+
e
747+
);
748+
debug_assert!(false);
749+
},
750+
}
737751
});
738752

739753
if let Some(liquidity_source) = self.liquidity_source.as_ref() {
@@ -783,9 +797,55 @@ impl Node {
783797
},
784798
}
785799

786-
// Stop disconnect peers.
800+
// Disconnect all peers.
787801
self.peer_manager.disconnect_all_peers();
788802

803+
// Wait until event handling stopped, at least until a timeout is reached.
804+
let event_handling_stopped_logger = Arc::clone(&self.logger);
805+
let mut event_handling_stopped_receiver = self.event_handling_stopped_sender.subscribe();
806+
807+
// FIXME: For now, we wait up to 100 secs (BDK_WALLET_SYNC_TIMEOUT_SECS + 10) to allow
808+
// event handling to exit gracefully even if it was blocked on the BDK wallet syncing. We
809+
// should drop this considerably post upgrading to BDK 1.0.
810+
let timeout_res = runtime.block_on(async {
811+
tokio::time::timeout(
812+
Duration::from_secs(100),
813+
event_handling_stopped_receiver.changed(),
814+
)
815+
.await
816+
});
817+
818+
match timeout_res {
819+
Ok(stop_res) => match stop_res {
820+
Ok(()) => {},
821+
Err(e) => {
822+
log_error!(
823+
event_handling_stopped_logger,
824+
"Stopping event handling failed. This should never happen: {}",
825+
e
826+
);
827+
panic!("Stopping event handling failed. This should never happen.");
828+
},
829+
},
830+
Err(e) => {
831+
log_error!(
832+
event_handling_stopped_logger,
833+
"Stopping event handling timed out: {}",
834+
e
835+
);
836+
},
837+
}
838+
839+
#[cfg(tokio_unstable)]
840+
{
841+
log_trace!(
842+
self.logger,
843+
"Active runtime tasks left prior to shutdown: {}",
844+
runtime.metrics().active_tasks_count()
845+
);
846+
}
847+
848+
// Shutdown our runtime. By now ~no or only very few tasks should be left.
789849
runtime.shutdown_timeout(Duration::from_secs(10));
790850

791851
log_info!(self.logger, "Shutdown complete.");

0 commit comments

Comments
 (0)