1- use std:: { cmp:: Reverse , collections:: BinaryHeap , fmt} ;
1+ use std:: {
2+ cmp:: Reverse ,
3+ collections:: BinaryHeap ,
4+ fmt,
5+ time:: { Duration , Instant } ,
6+ } ;
27
38use at2_node:: {
49 proto:: { self , * } ,
@@ -22,11 +27,13 @@ use tonic::Response;
2227use tracing:: { info, warn} ;
2328
2429use super :: {
25- accounts:: { self , account , Accounts } ,
30+ accounts:: { self , Accounts } ,
2631 config,
2732 recent_transactions:: { self , RecentTransactions } ,
2833} ;
2934
35+ const TRANSACTION_TTL : Duration = Duration :: from_secs ( 60 ) ;
36+
3037#[ derive( Snafu , Debug ) ]
3138pub enum ProtoError {
3239 #[ snafu( display( "deserialize: {}" , source) ) ]
@@ -155,9 +162,12 @@ impl Service {
155162 Ok ( batch) => {
156163 batch. iter ( ) . for_each ( |msg| {
157164 to_process. push ( Reverse ( (
158- msg. sequence ( ) ,
159- msg. sender ( ) . to_owned ( ) ,
160- msg. payload ( ) . to_owned ( ) ,
165+ (
166+ msg. sequence ( ) ,
167+ msg. sender ( ) . to_owned ( ) ,
168+ msg. payload ( ) . to_owned ( ) ,
169+ ) ,
170+ Instant :: now ( ) ,
161171 ) ) )
162172 } ) ;
163173 }
@@ -168,24 +178,33 @@ impl Service {
168178 while to_process. len ( ) < previous_len {
169179 previous_len = to_process. len ( ) ;
170180
171- let mut remaining_to_process = BinaryHeap :: new ( ) ;
172- while let Some ( msg) = to_process. pop ( ) {
173- if let Err ( err) = service. process_payload ( msg. 0 . clone ( ) ) . await {
174- // retry only inconsecutive sequence
181+ let mut remaining_to_process = BinaryHeap :: with_capacity ( to_process. len ( ) ) ;
182+ for Reverse ( ( msg, when_added) ) in to_process. into_sorted_vec ( ) {
183+ if when_added. elapsed ( ) > TRANSACTION_TTL {
184+ warn ! ( "dropping too old: {:?}" , msg) ;
185+
186+ if let Err ( err) = service
187+ . recent_transactions
188+ . update ( Box :: new ( msg. 1 ) , msg. 0 , TransactionState :: Failure )
189+ . await
190+ {
191+ warn ! ( "unable to process: {}" , err) ;
192+ }
193+ }
194+
195+ if let Err ( err) = service. process_payload ( msg. clone ( ) ) . await {
196+ // retry only account async failures
175197 if let ProcessTransactionError :: ProcessTxForAccounts {
176- source :
177- accounts:: Error :: AccountModification {
178- source : account:: Error :: InconsecutiveSequence ,
179- } ,
198+ source : accounts:: Error :: AccountModification { .. } ,
180199 } = err
181200 {
182- remaining_to_process. push ( msg) ;
201+ remaining_to_process. push ( Reverse ( ( msg, when_added ) ) ) ;
183202 } else {
184- warn ! ( "{}" , err) ;
203+ warn ! ( "unable to process: {}" , err) ;
185204 }
186205 }
187206 }
188- to_process. append ( & mut remaining_to_process) ;
207+ to_process = remaining_to_process;
189208 }
190209 }
191210 } ) ;
@@ -199,31 +218,22 @@ impl Service {
199218
200219 let sender = Box :: new ( sender) ;
201220
202- let processed = self
203- . accounts
221+ self . accounts
204222 . transfer (
205223 sender. clone ( ) ,
206224 sequence,
207225 Box :: new ( payload. recipient ) ,
208226 payload. amount ,
209227 )
210228 . await
211- . context ( ProcessTxForAccounts ) ;
229+ . context ( ProcessTxForAccounts ) ? ;
212230
213231 self . recent_transactions
214- . update (
215- sender,
216- sequence,
217- if processed. is_ok ( ) {
218- TransactionState :: Success
219- } else {
220- TransactionState :: Failure
221- } ,
222- )
232+ . update ( sender, sequence, TransactionState :: Success )
223233 . await
224234 . context ( ProcessTxForRecent ) ?;
225235
226- processed
236+ Ok ( ( ) )
227237 }
228238}
229239
0 commit comments