1212use bdk_core:: { BlockId , CheckPoint } ;
1313use bitcoin:: { Block , BlockHash , Transaction , Txid } ;
1414use bitcoincore_rpc:: { bitcoincore_rpc_json, RpcApi } ;
15- use std:: { collections:: HashSet , ops:: Deref } ;
15+ use std:: {
16+ collections:: { HashMap , HashSet } ,
17+ ops:: Deref ,
18+ sync:: Arc ,
19+ } ;
1620
1721pub mod bip158;
1822
@@ -37,30 +41,23 @@ pub struct Emitter<C> {
3741 /// gives us an opportunity to re-fetch this result.
3842 last_block : Option < bitcoincore_rpc_json:: GetBlockResult > ,
3943
40- /// The latest first-seen epoch of emitted mempool transactions. This is used to determine
41- /// whether a mempool transaction is already emitted.
42- last_mempool_time : usize ,
43-
44- /// The last emitted block during our last mempool emission. This is used to determine whether
45- /// there has been a reorg since our last mempool emission.
46- last_mempool_tip : Option < u32 > ,
47-
48- /// A set of txids currently assumed to still be in the mempool.
44+ /// The last snapshot of mempool transactions.
4945 ///
50- /// This is used to detect mempool evictions by comparing the set against the latest mempool
51- /// snapshot from bitcoind. Any txid in this set that is missing from the snapshot is
52- /// considered evicted.
46+ /// This is used to detect mempool evictions and as a cache for transactions to emit.
5347 ///
54- /// When the emitter emits a block, confirmed txids are removed from this set. This prevents
55- /// confirmed transactions from being mistakenly marked with an `evicted_at` timestamp.
56- expected_mempool_txids : HashSet < Txid > ,
48+ /// For mempool evictions, the latest call to `getrawmempool` is compared against this field.
49+ /// Any transaction that is missing from this field is considered evicted. The exception is if
50+ /// the transaction is confirmed into a block - therefore, we only emit evictions when we are
51+ /// sure the tip block is already emitted. When a block is emitted, the transactions in the
52+ /// block are removed from this field.
53+ mempool_snapshot : HashMap < Txid , Arc < Transaction > > ,
5754}
5855
5956/// Indicates that there are no initially expected mempool transactions.
6057///
6158/// Pass this to the `expected_mempool_txids` field of [`Emitter::new`] when the wallet is known
6259/// to start empty (i.e. with no unconfirmed transactions).
63- pub const NO_EXPECTED_MEMPOOL_TXIDS : core:: iter:: Empty < Txid > = core:: iter:: empty ( ) ;
60+ pub const NO_EXPECTED_MEMPOOL_TXIDS : core:: iter:: Empty < Arc < Transaction > > = core:: iter:: empty ( ) ;
6461
6562impl < C > Emitter < C >
6663where
@@ -75,23 +72,27 @@ where
7572 /// `start_height` starts emission from a given height (if there are no conflicts with the
7673 /// original chain).
7774 ///
78- /// `expected_mempool_txids ` is the initial set of unconfirmed txids provided by the wallet.
79- /// This allows the [`Emitter`] to inform the wallet about relevant mempool evictions. If it is
80- /// known that the wallet is empty, [`NO_EXPECTED_MEMPOOL_TXIDS`] can be used.
75+ /// `expected_mempool_txs ` is the initial set of unconfirmed transactions provided by the
76+ /// wallet. This allows the [`Emitter`] to inform the wallet about relevant mempool evictions.
77+ /// If it is known that the wallet is empty, [`NO_EXPECTED_MEMPOOL_TXIDS`] can be used.
8178 pub fn new (
8279 client : C ,
8380 last_cp : CheckPoint ,
8481 start_height : u32 ,
85- expected_mempool_txids : impl IntoIterator < Item = impl Into < Txid > > ,
82+ expected_mempool_txs : impl IntoIterator < Item = impl Into < Arc < Transaction > > > ,
8683 ) -> Self {
8784 Self {
8885 client,
8986 start_height,
9087 last_cp,
9188 last_block : None ,
92- last_mempool_time : 0 ,
93- last_mempool_tip : None ,
94- expected_mempool_txids : expected_mempool_txids. into_iter ( ) . map ( Into :: into) . collect ( ) ,
89+ mempool_snapshot : expected_mempool_txs
90+ . into_iter ( )
91+ . map ( |tx| {
92+ let tx: Arc < Transaction > = tx. into ( ) ;
93+ ( tx. compute_txid ( ) , tx)
94+ } )
95+ . collect ( ) ,
9596 }
9697 }
9798
@@ -115,110 +116,89 @@ where
115116 pub fn mempool ( & mut self ) -> Result < MempoolEvent , bitcoincore_rpc:: Error > {
116117 let client = & * self . client ;
117118
118- // This is the emitted tip height during the last mempool emission.
119- let prev_mempool_tip = self
120- . last_mempool_tip
121- // We use `start_height - 1` as we cannot guarantee that the block at
122- // `start_height` has been emitted.
123- . unwrap_or ( self . start_height . saturating_sub ( 1 ) ) ;
124-
125- // Loop to make sure that the fetched mempool content and the fetched tip are consistent
126- // with one another.
127- let ( raw_mempool, raw_mempool_txids, rpc_height, rpc_block_hash) = loop {
128- // Determine if height and hash matches the best block from the RPC. Evictions are
129- // deferred if we are not at the best block.
130- let height = client. get_block_count ( ) ?;
131- let hash = client. get_block_hash ( height) ?;
132-
133- // Get the raw mempool result from the RPC client which will be used to determine if any
134- // transactions have been evicted.
135- let mp = client. get_raw_mempool_verbose ( ) ?;
136- let mp_txids: HashSet < Txid > = mp. keys ( ) . copied ( ) . collect ( ) ;
137-
138- if height == client. get_block_count ( ) ? && hash == client. get_block_hash ( height) ? {
139- break ( mp, mp_txids, height, hash) ;
119+ let mut rpc_tip_height;
120+ let mut rpc_tip_hash;
121+ let mut rpc_mempool;
122+ let mut rpc_mempool_txids;
123+
124+ // Ensure we get a mempool snapshot consistent with `rpc_tip_hash` as the tip.
125+ loop {
126+ rpc_tip_height = client. get_block_count ( ) ?;
127+ rpc_tip_hash = client. get_block_hash ( rpc_tip_height) ?;
128+ rpc_mempool = client. get_raw_mempool_verbose ( ) ?;
129+ rpc_mempool_txids = rpc_mempool. keys ( ) . copied ( ) . collect :: < HashSet < Txid > > ( ) ;
130+ let is_still_at_tip = rpc_tip_hash == client. get_block_hash ( rpc_tip_height) ?
131+ && rpc_tip_height == client. get_block_count ( ) ?;
132+ if is_still_at_tip {
133+ break ;
140134 }
141- } ;
142-
143- let at_tip =
144- rpc_height == self . last_cp . height ( ) as u64 && rpc_block_hash == self . last_cp . hash ( ) ;
145-
146- // If at tip, any expected txid missing from raw mempool is considered evicted;
147- // if not at tip, we don't evict anything.
148- let evicted_txids: HashSet < Txid > = if at_tip {
149- self . expected_mempool_txids
150- . difference ( & raw_mempool_txids)
151- . copied ( )
152- . collect ( )
153- } else {
154- HashSet :: new ( )
155- } ;
135+ }
156136
157- // Mempool txs come with a timestamp of when the tx is introduced to the mempool. We keep
158- // track of the latest mempool tx's timestamp to determine whether we have seen a tx
159- // before. `prev_mempool_time` is the previous timestamp and `last_time` records what will
160- // be the new latest timestamp.
161- let prev_mempool_time = self . last_mempool_time ;
162- let mut latest_time = prev_mempool_time;
137+ let mut mempool_event = MempoolEvent :: default ( ) ;
138+ let update_time = & mut 0_u64 ;
163139
164- let new_txs = raw_mempool
140+ mempool_event . update = rpc_mempool
165141 . into_iter ( )
166142 . filter_map ( {
167- let latest_time = & mut latest_time;
168- move |( txid, tx_entry) | -> Option < Result < _ , bitcoincore_rpc:: Error > > {
169- let tx_time = tx_entry. time as usize ;
170- if tx_time > * latest_time {
171- * latest_time = tx_time;
172- }
173- // Best-effort check to avoid re-emitting transactions we've already emitted.
174- //
175- // Complete suppression isn't possible, since a transaction may spend outputs
176- // owned by the wallet. To determine if such a transaction is relevant, we must
177- // have already seen its ancestor(s) that contain the spent prevouts.
178- //
179- // Fortunately, bitcoind provides the block height at which the transaction
180- // entered the mempool. If we've already emitted that block height, we can
181- // reasonably assume the receiver has seen all ancestor transactions.
182- let is_already_emitted = tx_time <= prev_mempool_time;
183- let is_within_height = tx_entry. height <= prev_mempool_tip as _ ;
184- if is_already_emitted && is_within_height {
185- return None ;
186- }
187- let tx = match client. get_raw_transaction ( & txid, None ) {
188- Ok ( tx) => tx,
189- Err ( err) if err. is_not_found_error ( ) => return None ,
190- Err ( err) => return Some ( Err ( err) ) ,
143+ |( txid, tx_entry) | -> Option < Result < _ , bitcoincore_rpc:: Error > > {
144+ * update_time = u64:: max ( * update_time, tx_entry. time ) ;
145+ let tx = match self . mempool_snapshot . get ( & txid) {
146+ Some ( tx) => tx. clone ( ) ,
147+ None => match client. get_raw_transaction ( & txid, None ) {
148+ Ok ( tx) => {
149+ let tx = Arc :: new ( tx) ;
150+ self . mempool_snapshot . insert ( txid, tx. clone ( ) ) ;
151+ tx
152+ }
153+ Err ( err) if err. is_not_found_error ( ) => return None ,
154+ Err ( err) => return Some ( Err ( err) ) ,
155+ } ,
191156 } ;
192- Some ( Ok ( ( tx, tx_time as u64 ) ) )
157+ Some ( Ok ( ( tx, tx_entry . time ) ) )
193158 }
194159 } )
195160 . collect :: < Result < Vec < _ > , _ > > ( ) ?;
196161
197- self . last_mempool_time = latest_time ;
198- self . last_mempool_tip = Some ( self . last_cp . height ( ) ) ;
162+ let at_tip =
163+ rpc_tip_height == self . last_cp . height ( ) as u64 && rpc_tip_hash == self . last_cp . hash ( ) ;
199164
200- // If at tip, we replace `expected_mempool_txids` with just the new txids. Otherwise, we’re
201- // still catching up to the tip and keep accumulating.
202165 if at_tip {
203- self . expected_mempool_txids = new_txs. iter ( ) . map ( |( tx, _) | tx. compute_txid ( ) ) . collect ( ) ;
166+ // We only emit evicted transactions when we have already emitted the RPC tip. This is
167+ // because we cannot differenciate between transactions that are confirmed and
168+ // transactions that are evicted, so we rely on emitted blocks to remove
169+ // transactions from the `mempool_snapshot`.
170+ mempool_event. evicted = self
171+ . mempool_snapshot
172+ . keys ( )
173+ . filter ( |& txid| !rpc_mempool_txids. contains ( txid) )
174+ . map ( |& txid| ( txid, * update_time) )
175+ . collect ( ) ;
176+ self . mempool_snapshot = mempool_event
177+ . update
178+ . iter ( )
179+ . map ( |( tx, _) | ( tx. compute_txid ( ) , tx. clone ( ) ) )
180+ . collect ( ) ;
204181 } else {
205- self . expected_mempool_txids
206- . extend ( new_txs. iter ( ) . map ( |( tx, _) | tx. compute_txid ( ) ) ) ;
207- }
182+ // Since we are still catching up to the tip (a.k.a tip has not been emitted), we
183+ // accumulate more transactions in `mempool_snapshot` so that we can emit evictions in
184+ // a batch once we catch up.
185+ self . mempool_snapshot . extend (
186+ mempool_event
187+ . update
188+ . iter ( )
189+ . map ( |( tx, _) | ( tx. compute_txid ( ) , tx. clone ( ) ) ) ,
190+ ) ;
191+ } ;
208192
209- Ok ( MempoolEvent {
210- new_txs,
211- evicted_txids,
212- latest_update_time : latest_time as u64 ,
213- } )
193+ Ok ( mempool_event)
214194 }
215195
216196 /// Emit the next block height and block (if any).
217197 pub fn next_block ( & mut self ) -> Result < Option < BlockEvent < Block > > , bitcoincore_rpc:: Error > {
218198 if let Some ( ( checkpoint, block) ) = poll ( self , move |hash, client| client. get_block ( hash) ) ? {
219199 // Stop tracking unconfirmed transactions that have been confirmed in this block.
220200 for tx in & block. txdata {
221- self . expected_mempool_txids . remove ( & tx. compute_txid ( ) ) ;
201+ self . mempool_snapshot . remove ( & tx. compute_txid ( ) ) ;
222202 }
223203 return Ok ( Some ( BlockEvent { block, checkpoint } ) ) ;
224204 }
@@ -227,32 +207,13 @@ where
227207}
228208
229209/// A new emission from mempool.
230- #[ derive( Debug ) ]
210+ #[ derive( Debug , Default ) ]
231211pub struct MempoolEvent {
232- /// Unemitted transactions or transactions with ancestors that are unseen by the receiver.
233- ///
234- /// To understand the second condition, consider a receiver which filters transactions based on
235- /// whether it alters the UTXO set of tracked script pubkeys. If an emitted mempool transaction
236- /// spends a tracked UTXO which is confirmed at height `h`, but the receiver has only seen up
237- /// to block of height `h-1`, we want to re-emit this transaction until the receiver has
238- /// seen the block at height `h`.
239- pub new_txs : Vec < ( Transaction , u64 ) > ,
240-
241- /// [`Txid`]s of all transactions that have been evicted from mempool.
242- pub evicted_txids : HashSet < Txid > ,
212+ /// Transactions currently in the mempool alongside their seen-at timestamp.
213+ pub update : Vec < ( Arc < Transaction > , u64 ) > ,
243214
244- /// The latest timestamp of when a transaction entered the mempool.
245- ///
246- /// This is useful for setting the timestamp for evicted transactions.
247- pub latest_update_time : u64 ,
248- }
249-
250- impl MempoolEvent {
251- /// Returns an iterator of `(txid, evicted_at)` pairs for all evicted transactions.
252- pub fn evicted_ats ( & self ) -> impl ExactSizeIterator < Item = ( Txid , u64 ) > + ' _ {
253- let time = self . latest_update_time ;
254- self . evicted_txids . iter ( ) . map ( move |& txid| ( txid, time) )
255- }
215+ /// Transactions evicted from the mempool alongside their evicted-at timestamp.
216+ pub evicted : Vec < ( Txid , u64 ) > ,
256217}
257218
258219/// A newly emitted block from [`Emitter`].
@@ -396,16 +357,6 @@ where
396357 continue ;
397358 }
398359 PollResponse :: AgreementFound ( res, cp) => {
399- let agreement_h = res. height as u32 ;
400-
401- // The tip during the last mempool emission needs to in the best chain, we reduce
402- // it if it is not.
403- if let Some ( h) = emitter. last_mempool_tip . as_mut ( ) {
404- if * h > agreement_h {
405- * h = agreement_h;
406- }
407- }
408-
409360 // get rid of evicted blocks
410361 emitter. last_cp = cp;
411362 emitter. last_block = Some ( res) ;
@@ -479,7 +430,7 @@ mod test {
479430
480431 for txid in & mempool_txids {
481432 assert ! (
482- emitter. expected_mempool_txids . contains ( txid) ,
433+ emitter. mempool_snapshot . contains_key ( txid) ,
483434 "Expected txid {txid:?} missing"
484435 ) ;
485436 }
@@ -500,19 +451,19 @@ mod test {
500451 . collect :: < HashSet < _ > > ( ) ;
501452 for txid in confirmed_txids {
502453 assert ! (
503- !emitter. expected_mempool_txids . contains ( & txid) ,
454+ !emitter. mempool_snapshot . contains_key ( & txid) ,
504455 "Expected txid {txid:?} should have been removed"
505456 ) ;
506457 }
507458 for txid in & mempool_txids {
508459 assert ! (
509- emitter. expected_mempool_txids . contains ( txid) ,
460+ emitter. mempool_snapshot . contains_key ( txid) ,
510461 "Expected txid {txid:?} missing"
511462 ) ;
512463 }
513464 }
514465
515- assert ! ( emitter. expected_mempool_txids . is_empty( ) ) ;
466+ assert ! ( emitter. mempool_snapshot . is_empty( ) ) ;
516467
517468 Ok ( ( ) )
518469 }
0 commit comments