16
16
17
17
//! Helper for tracking transaction invalidation events.
18
18
19
- use crate :: { Chain , HashOf , Subscription , TransactionStatusOf } ;
19
+ use crate :: { Chain , Client , Error , HashOf , HeaderIdOf , Subscription , TransactionStatusOf } ;
20
20
21
21
use async_trait:: async_trait;
22
22
use futures:: { future:: Either , Future , FutureExt , Stream , StreamExt } ;
23
- use relay_utils:: TrackedTransactionStatus ;
23
+ use relay_utils:: { HeaderId , TrackedTransactionStatus } ;
24
+ use sp_runtime:: traits:: Header as _;
24
25
use std:: time:: Duration ;
25
26
27
+ /// Transaction tracker environment.
28
+ #[ async_trait]
29
+ pub trait Environment < C : Chain > : Send + Sync {
30
+ /// Returns header id by its hash.
31
+ async fn header_id_by_hash ( & self , hash : HashOf < C > ) -> Result < HeaderIdOf < C > , Error > ;
32
+ }
33
+
34
+ #[ async_trait]
35
+ impl < C : Chain > Environment < C > for Client < C > {
36
+ async fn header_id_by_hash ( & self , hash : HashOf < C > ) -> Result < HeaderIdOf < C > , Error > {
37
+ self . header_by_hash ( hash) . await . map ( |h| HeaderId ( * h. number ( ) , hash) )
38
+ }
39
+ }
40
+
26
41
/// Substrate transaction tracker implementation.
27
42
///
28
43
/// Substrate node provides RPC API to submit and watch for transaction events. This way
@@ -43,20 +58,22 @@ use std::time::Duration;
43
58
/// it is lost.
44
59
///
45
60
/// This struct implements third option as it seems to be the most optimal.
46
- pub struct TransactionTracker < C : Chain > {
61
+ pub struct TransactionTracker < C : Chain , E > {
62
+ environment : E ,
47
63
transaction_hash : HashOf < C > ,
48
64
stall_timeout : Duration ,
49
65
subscription : Subscription < TransactionStatusOf < C > > ,
50
66
}
51
67
52
- impl < C : Chain > TransactionTracker < C > {
68
+ impl < C : Chain , E : Environment < C > > TransactionTracker < C , E > {
53
69
/// Create transaction tracker.
54
70
pub fn new (
71
+ environment : E ,
55
72
stall_timeout : Duration ,
56
73
transaction_hash : HashOf < C > ,
57
74
subscription : Subscription < TransactionStatusOf < C > > ,
58
75
) -> Self {
59
- Self { stall_timeout, transaction_hash, subscription }
76
+ Self { environment , stall_timeout, transaction_hash, subscription }
60
77
}
61
78
62
79
/// Wait for final transaction status and return it along with last known internal invalidation
@@ -65,10 +82,11 @@ impl<C: Chain> TransactionTracker<C> {
65
82
self ,
66
83
wait_for_stall_timeout : impl Future < Output = ( ) > ,
67
84
wait_for_stall_timeout_rest : impl Future < Output = ( ) > ,
68
- ) -> ( TrackedTransactionStatus , Option < InvalidationStatus > ) {
85
+ ) -> ( TrackedTransactionStatus < HeaderIdOf < C > > , Option < InvalidationStatus < HeaderIdOf < C > > > ) {
69
86
// sometimes we want to wait for the rest of the stall timeout even if
70
87
// `wait_for_invalidation` has been "select"ed first => it is shared
71
- let wait_for_invalidation = watch_transaction_status :: < C , _ > (
88
+ let wait_for_invalidation = watch_transaction_status :: < _ , C , _ > (
89
+ self . environment ,
72
90
self . transaction_hash ,
73
91
self . subscription . into_stream ( ) ,
74
92
) ;
@@ -86,8 +104,8 @@ impl<C: Chain> TransactionTracker<C> {
86
104
( TrackedTransactionStatus :: Lost , None )
87
105
} ,
88
106
Either :: Right ( ( invalidation_status, _) ) => match invalidation_status {
89
- InvalidationStatus :: Finalized =>
90
- ( TrackedTransactionStatus :: Finalized , Some ( invalidation_status) ) ,
107
+ InvalidationStatus :: Finalized ( at_block ) =>
108
+ ( TrackedTransactionStatus :: Finalized ( at_block ) , Some ( invalidation_status) ) ,
91
109
InvalidationStatus :: Invalid =>
92
110
( TrackedTransactionStatus :: Lost , Some ( invalidation_status) ) ,
93
111
InvalidationStatus :: Lost => {
@@ -111,8 +129,10 @@ impl<C: Chain> TransactionTracker<C> {
111
129
}
112
130
113
131
#[ async_trait]
114
- impl < C : Chain > relay_utils:: TransactionTracker for TransactionTracker < C > {
115
- async fn wait ( self ) -> TrackedTransactionStatus {
132
+ impl < C : Chain , E : Environment < C > > relay_utils:: TransactionTracker for TransactionTracker < C , E > {
133
+ type HeaderId = HeaderIdOf < C > ;
134
+
135
+ async fn wait ( self ) -> TrackedTransactionStatus < HeaderIdOf < C > > {
116
136
let wait_for_stall_timeout = async_std:: task:: sleep ( self . stall_timeout ) . shared ( ) ;
117
137
let wait_for_stall_timeout_rest = wait_for_stall_timeout. clone ( ) ;
118
138
self . do_wait ( wait_for_stall_timeout, wait_for_stall_timeout_rest) . await . 0
@@ -125,20 +145,25 @@ impl<C: Chain> relay_utils::TransactionTracker for TransactionTracker<C> {
125
145
/// ignored - relay loops are detecting the mining/finalization using their own
126
146
/// techniques. That's why we're using `InvalidationStatus` here.
127
147
#[ derive( Debug , PartialEq ) ]
128
- enum InvalidationStatus {
129
- /// Transaction has been included into block and finalized.
130
- Finalized ,
148
+ enum InvalidationStatus < BlockId > {
149
+ /// Transaction has been included into block and finalized at given block .
150
+ Finalized ( BlockId ) ,
131
151
/// Transaction has been invalidated.
132
152
Invalid ,
133
153
/// We have lost track of transaction status.
134
154
Lost ,
135
155
}
136
156
137
157
/// Watch for transaction status until transaction is finalized or we lose track of its status.
138
- async fn watch_transaction_status < C : Chain , S : Stream < Item = TransactionStatusOf < C > > > (
158
+ async fn watch_transaction_status <
159
+ E : Environment < C > ,
160
+ C : Chain ,
161
+ S : Stream < Item = TransactionStatusOf < C > > ,
162
+ > (
163
+ environment : E ,
139
164
transaction_hash : HashOf < C > ,
140
165
subscription : S ,
141
- ) -> InvalidationStatus {
166
+ ) -> InvalidationStatus < HeaderIdOf < C > > {
142
167
futures:: pin_mut!( subscription) ;
143
168
144
169
loop {
@@ -153,7 +178,23 @@ async fn watch_transaction_status<C: Chain, S: Stream<Item = TransactionStatusOf
153
178
transaction_hash,
154
179
block_hash,
155
180
) ;
156
- return InvalidationStatus :: Finalized
181
+
182
+ let header_id = match environment. header_id_by_hash ( block_hash) . await {
183
+ Ok ( header_id) => header_id,
184
+ Err ( e) => {
185
+ log:: error!(
186
+ target: "bridge" ,
187
+ "Failed to read header {:?} when watching for {} transaction {:?}: {:?}" ,
188
+ block_hash,
189
+ C :: NAME ,
190
+ transaction_hash,
191
+ e,
192
+ ) ;
193
+ // that's the best option we have here
194
+ return InvalidationStatus :: Lost
195
+ } ,
196
+ } ;
197
+ return InvalidationStatus :: Finalized ( header_id)
157
198
} ,
158
199
Some ( TransactionStatusOf :: < C > :: Invalid ) => {
159
200
// if node says that the transaction is invalid, there are still chances that
@@ -247,11 +288,27 @@ mod tests {
247
288
use futures:: { FutureExt , SinkExt } ;
248
289
use sc_transaction_pool_api:: TransactionStatus ;
249
290
291
+ struct TestEnvironment ( Result < HeaderIdOf < TestChain > , Error > ) ;
292
+
293
+ #[ async_trait]
294
+ impl Environment < TestChain > for TestEnvironment {
295
+ async fn header_id_by_hash (
296
+ & self ,
297
+ _hash : HashOf < TestChain > ,
298
+ ) -> Result < HeaderIdOf < TestChain > , Error > {
299
+ self . 0 . as_ref ( ) . map_err ( |_| Error :: UninitializedBridgePallet ) . cloned ( )
300
+ }
301
+ }
302
+
250
303
async fn on_transaction_status (
251
304
status : TransactionStatus < HashOf < TestChain > , HashOf < TestChain > > ,
252
- ) -> Option < ( TrackedTransactionStatus , InvalidationStatus ) > {
305
+ ) -> Option < (
306
+ TrackedTransactionStatus < HeaderIdOf < TestChain > > ,
307
+ InvalidationStatus < HeaderIdOf < TestChain > > ,
308
+ ) > {
253
309
let ( mut sender, receiver) = futures:: channel:: mpsc:: channel ( 1 ) ;
254
- let tx_tracker = TransactionTracker :: < TestChain > :: new (
310
+ let tx_tracker = TransactionTracker :: < TestChain , TestEnvironment > :: new (
311
+ TestEnvironment ( Ok ( HeaderId ( 0 , Default :: default ( ) ) ) ) ,
255
312
Duration :: from_secs ( 0 ) ,
256
313
Default :: default ( ) ,
257
314
Subscription ( async_std:: sync:: Mutex :: new ( receiver) ) ,
@@ -270,7 +327,23 @@ mod tests {
270
327
async fn returns_finalized_on_finalized ( ) {
271
328
assert_eq ! (
272
329
on_transaction_status( TransactionStatus :: Finalized ( Default :: default ( ) ) ) . await ,
273
- Some ( ( TrackedTransactionStatus :: Finalized , InvalidationStatus :: Finalized ) ) ,
330
+ Some ( (
331
+ TrackedTransactionStatus :: Finalized ( Default :: default ( ) ) ,
332
+ InvalidationStatus :: Finalized ( Default :: default ( ) )
333
+ ) ) ,
334
+ ) ;
335
+ }
336
+
337
+ #[ async_std:: test]
338
+ async fn returns_lost_on_finalized_and_environment_error ( ) {
339
+ assert_eq ! (
340
+ watch_transaction_status:: <_, TestChain , _>(
341
+ TestEnvironment ( Err ( Error :: UninitializedBridgePallet ) ) ,
342
+ Default :: default ( ) ,
343
+ futures:: stream:: iter( [ TransactionStatus :: Finalized ( Default :: default ( ) ) ] )
344
+ )
345
+ . now_or_never( ) ,
346
+ Some ( InvalidationStatus :: Lost ) ,
274
347
) ;
275
348
}
276
349
@@ -343,16 +416,21 @@ mod tests {
343
416
#[ async_std:: test]
344
417
async fn lost_on_subscription_error ( ) {
345
418
assert_eq ! (
346
- watch_transaction_status:: <TestChain , _>( Default :: default ( ) , futures:: stream:: iter( [ ] ) )
347
- . now_or_never( ) ,
419
+ watch_transaction_status:: <_, TestChain , _>(
420
+ TestEnvironment ( Ok ( HeaderId ( 0 , Default :: default ( ) ) ) ) ,
421
+ Default :: default ( ) ,
422
+ futures:: stream:: iter( [ ] )
423
+ )
424
+ . now_or_never( ) ,
348
425
Some ( InvalidationStatus :: Lost ) ,
349
426
) ;
350
427
}
351
428
352
429
#[ async_std:: test]
353
430
async fn lost_on_timeout_when_waiting_for_invalidation_status ( ) {
354
431
let ( _sender, receiver) = futures:: channel:: mpsc:: channel ( 1 ) ;
355
- let tx_tracker = TransactionTracker :: < TestChain > :: new (
432
+ let tx_tracker = TransactionTracker :: < TestChain , TestEnvironment > :: new (
433
+ TestEnvironment ( Ok ( HeaderId ( 0 , Default :: default ( ) ) ) ) ,
356
434
Duration :: from_secs ( 0 ) ,
357
435
Default :: default ( ) ,
358
436
Subscription ( async_std:: sync:: Mutex :: new ( receiver) ) ,
0 commit comments