@@ -5,8 +5,10 @@ use crate::engine_api::{
5
5
} ;
6
6
use crate :: HttpJsonRpc ;
7
7
use lru:: LruCache ;
8
- use slog:: { crit , debug, info , warn , Logger } ;
8
+ use slog:: { debug, error , info , Logger } ;
9
9
use std:: future:: Future ;
10
+ use std:: sync:: Arc ;
11
+ use task_executor:: TaskExecutor ;
10
12
use tokio:: sync:: { Mutex , RwLock } ;
11
13
use types:: { Address , ExecutionBlockHash , Hash256 } ;
12
14
@@ -16,7 +18,7 @@ use types::{Address, ExecutionBlockHash, Hash256};
16
18
const PAYLOAD_ID_LRU_CACHE_SIZE : usize = 512 ;
17
19
18
20
/// Stores the remembered state of a engine.
19
- #[ derive( Copy , Clone , PartialEq ) ]
21
+ #[ derive( Copy , Clone , PartialEq , Debug ) ]
20
22
enum EngineState {
21
23
Synced ,
22
24
Offline ,
@@ -31,22 +33,6 @@ pub struct ForkChoiceState {
31
33
pub finalized_block_hash : ExecutionBlockHash ,
32
34
}
33
35
34
- /// Used to enable/disable logging on some tasks.
35
- #[ derive( Copy , Clone , PartialEq ) ]
36
- pub enum Logging {
37
- Enabled ,
38
- Disabled ,
39
- }
40
-
41
- impl Logging {
42
- pub fn is_enabled ( & self ) -> bool {
43
- match self {
44
- Logging :: Enabled => true ,
45
- Logging :: Disabled => false ,
46
- }
47
- }
48
- }
49
-
50
36
#[ derive( Hash , PartialEq , std:: cmp:: Eq ) ]
51
37
struct PayloadIdCacheKey {
52
38
pub head_block_hash : ExecutionBlockHash ,
@@ -69,17 +55,19 @@ pub struct Engine {
69
55
payload_id_cache : Mutex < LruCache < PayloadIdCacheKey , PayloadId > > ,
70
56
state : RwLock < EngineState > ,
71
57
pub latest_forkchoice_state : RwLock < Option < ForkChoiceState > > ,
58
+ pub executor : TaskExecutor ,
72
59
pub log : Logger ,
73
60
}
74
61
75
62
impl Engine {
76
63
/// Creates a new, offline engine.
77
- pub fn new ( api : HttpJsonRpc , log : & Logger ) -> Self {
64
+ pub fn new ( api : HttpJsonRpc , executor : TaskExecutor , log : & Logger ) -> Self {
78
65
Self {
79
66
api,
80
67
payload_id_cache : Mutex :: new ( LruCache :: new ( PAYLOAD_ID_LRU_CACHE_SIZE ) ) ,
81
68
state : RwLock :: new ( EngineState :: Offline ) ,
82
69
latest_forkchoice_state : Default :: default ( ) ,
70
+ executor,
83
71
log : log. clone ( ) ,
84
72
}
85
73
}
@@ -179,164 +167,117 @@ impl Engine {
179
167
pub async fn is_synced ( & self ) -> bool {
180
168
* self . state . read ( ) . await == EngineState :: Synced
181
169
}
170
+
182
171
/// Run the `EngineApi::upcheck` function if the node's last known state is not synced. This
183
172
/// might be used to recover the node if offline.
184
- pub async fn upcheck_not_synced ( & self , logging : Logging ) {
185
- let mut state_lock = self . state . write ( ) . await ;
186
- if * state_lock != EngineState :: Synced {
187
- match self . api . upcheck ( ) . await {
188
- Ok ( ( ) ) => {
189
- if logging . is_enabled ( ) {
190
- info ! (
191
- self . log,
192
- "Execution engine online" ;
193
- ) ;
194
- }
173
+ pub async fn upcheck ( & self ) {
174
+ let state : EngineState = match self . api . upcheck ( ) . await {
175
+ Ok ( ( ) ) => {
176
+ let mut state = self . state . write ( ) . await ;
177
+
178
+ if * state != EngineState :: Synced {
179
+ info ! (
180
+ self . log,
181
+ "Execution engine online" ;
182
+ ) ;
183
+
195
184
// Send the node our latest forkchoice_state.
196
185
self . send_latest_forkchoice_state ( ) . await ;
197
-
198
- * state_lock = EngineState :: Synced
186
+ } else {
187
+ debug ! (
188
+ self . log,
189
+ "Execution engine online" ;
190
+ ) ;
199
191
}
200
- Err ( EngineApiError :: IsSyncing ) => {
201
- if logging. is_enabled ( ) {
202
- warn ! (
203
- self . log,
204
- "Execution engine syncing" ;
205
- )
206
- }
207
-
208
- // Send the node our latest forkchoice_state, it may assist with syncing.
209
- self . send_latest_forkchoice_state ( ) . await ;
210
192
211
- * state_lock = EngineState :: Syncing
212
- }
213
- Err ( EngineApiError :: Auth ( err) ) => {
214
- if logging. is_enabled ( ) {
215
- warn ! (
216
- self . log,
217
- "Failed jwt authorization" ;
218
- "error" => ?err,
219
- ) ;
220
- }
221
-
222
- * state_lock = EngineState :: AuthFailed
223
- }
224
- Err ( e) => {
225
- if logging. is_enabled ( ) {
226
- warn ! (
227
- self . log,
228
- "Execution engine offline" ;
229
- "error" => ?e,
230
- )
231
- }
232
- }
193
+ * state = EngineState :: Synced ;
194
+ * state
233
195
}
234
- }
235
-
236
- if * state_lock != EngineState :: Synced && logging. is_enabled ( ) {
237
- crit ! (
238
- self . log,
239
- "No synced execution engines" ;
240
- )
241
- }
242
- }
196
+ Err ( EngineApiError :: IsSyncing ) => {
197
+ let mut state = self . state . write ( ) . await ;
198
+ * state = EngineState :: Syncing ;
199
+ * state
200
+ }
201
+ Err ( EngineApiError :: Auth ( err) ) => {
202
+ error ! (
203
+ self . log,
204
+ "Failed jwt authorization" ;
205
+ "error" => ?err,
206
+ ) ;
243
207
244
- /// Run `func` on the node.
245
- ///
246
- /// This function might try to run `func` twice. If the node returns an error it will try to
247
- /// upcheck it and then run the function again.
248
- pub async fn first_success < ' a , F , G , H > ( & ' a self , func : F ) -> Result < H , EngineError >
249
- where
250
- F : Fn ( & ' a Engine ) -> G + Copy ,
251
- G : Future < Output = Result < H , EngineApiError > > ,
252
- {
253
- match self . first_success_without_retry ( func) . await {
254
- Ok ( result) => Ok ( result) ,
208
+ let mut state = self . state . write ( ) . await ;
209
+ * state = EngineState :: AuthFailed ;
210
+ * state
211
+ }
255
212
Err ( e) => {
256
- debug ! ( self . log, "First engine call failed. Retrying" ; "err" => ?e) ;
257
- // Try to recover the node.
258
- self . upcheck_not_synced ( Logging :: Enabled ) . await ;
259
- // Try again.
260
- self . first_success_without_retry ( func) . await
213
+ error ! (
214
+ self . log,
215
+ "Error during execution engine upcheck" ;
216
+ "error" => ?e,
217
+ ) ;
218
+
219
+ let mut state = self . state . write ( ) . await ;
220
+ * state = EngineState :: Offline ;
221
+ * state
261
222
}
262
- }
223
+ } ;
224
+
225
+ debug ! (
226
+ self . log,
227
+ "Execution engine upcheck complete" ;
228
+ "state" => ?state,
229
+ ) ;
263
230
}
264
231
265
- /// Run `func` on the node.
266
- pub async fn first_success_without_retry < ' a , F , G , H > (
267
- & ' a self ,
268
- func : F ,
269
- ) -> Result < H , EngineError >
232
+ /// Run `func` on the node regardless of the node's current state.
233
+ ///
234
+ /// ## Note
235
+ ///
236
+ /// This function takes locks on `self.state`, holding a conflicting lock might cause a
237
+ /// deadlock.
238
+ pub async fn request < ' a , F , G , H > ( self : & ' a Arc < Self > , func : F ) -> Result < H , EngineError >
270
239
where
271
240
F : Fn ( & ' a Engine ) -> G ,
272
241
G : Future < Output = Result < H , EngineApiError > > ,
273
242
{
274
- let ( engine_synced , engine_auth_failed ) = {
275
- let state = self . state . read ( ) . await ;
276
- (
277
- * state == EngineState :: Synced ,
278
- * state == EngineState :: AuthFailed ,
279
- )
280
- } ;
281
- if engine_synced {
282
- match func ( self ) . await {
283
- Ok ( result ) => Ok ( result ) ,
284
- Err ( error ) => {
285
- debug ! (
286
- self . log ,
287
- "Execution engine call failed" ;
288
- "error" => ?error ,
243
+ match func ( self ) . await {
244
+ Ok ( result ) => {
245
+ // Take a clone *without* holding the read-lock since the `upcheck` function will
246
+ // take a write-lock.
247
+ let state: EngineState = * self . state . read ( ) . await ;
248
+
249
+ // If this request just returned successfully but we don't think this node is
250
+ // synced, check to see if it just became synced. This helps to ensure that the
251
+ // networking stack can get fast feedback about a synced engine.
252
+ if state != EngineState :: Synced {
253
+ // Spawn the upcheck in another task to avoid slowing down this request.
254
+ let inner_self = self . clone ( ) ;
255
+ self . executor . spawn (
256
+ async move { inner_self . upcheck ( ) . await } ,
257
+ "upcheck_after_success" ,
289
258
) ;
290
- * self . state . write ( ) . await = EngineState :: Offline ;
291
- Err ( EngineError :: Api { error } )
292
259
}
293
- }
294
- } else if engine_auth_failed {
295
- Err ( EngineError :: Auth )
296
- } else {
297
- Err ( EngineError :: Offline )
298
- }
299
- }
300
260
301
- /// Runs `func` on the node.
302
- ///
303
- /// This function might try to run `func` twice. If all nodes return an error on the first time
304
- /// it runs, it will try to upcheck all offline nodes and then run the function again.
305
- pub async fn broadcast < ' a , F , G , H > ( & ' a self , func : F ) -> Result < H , EngineError >
306
- where
307
- F : Fn ( & ' a Engine ) -> G + Copy ,
308
- G : Future < Output = Result < H , EngineApiError > > ,
309
- {
310
- match self . broadcast_without_retry ( func) . await {
311
- Err ( EngineError :: Offline { .. } ) => {
312
- self . upcheck_not_synced ( Logging :: Enabled ) . await ;
313
- self . broadcast_without_retry ( func) . await
261
+ Ok ( result)
314
262
}
315
- other => other,
316
- }
317
- }
263
+ Err ( error) => {
264
+ error ! (
265
+ self . log,
266
+ "Execution engine call failed" ;
267
+ "error" => ?error,
268
+ ) ;
318
269
319
- /// Runs `func` on the node if it's last state is not offline.
320
- pub async fn broadcast_without_retry < ' a , F , G , H > ( & ' a self , func : F ) -> Result < H , EngineError >
321
- where
322
- F : Fn ( & ' a Engine ) -> G ,
323
- G : Future < Output = Result < H , EngineApiError > > ,
324
- {
325
- let func = & func;
326
- if * self . state . read ( ) . await == EngineState :: Offline {
327
- Err ( EngineError :: Offline )
328
- } else {
329
- match func ( self ) . await {
330
- Ok ( res) => Ok ( res) ,
331
- Err ( error) => {
332
- debug ! (
333
- self . log,
334
- "Execution engine call failed" ;
335
- "error" => ?error,
336
- ) ;
337
- * self . state . write ( ) . await = EngineState :: Offline ;
338
- Err ( EngineError :: Api { error } )
339
- }
270
+ // The node just returned an error, run an upcheck so we can update the endpoint
271
+ // state.
272
+ //
273
+ // Spawn the upcheck in another task to avoid slowing down this request.
274
+ let inner_self = self . clone ( ) ;
275
+ self . executor . spawn (
276
+ async move { inner_self. upcheck ( ) . await } ,
277
+ "upcheck_after_error" ,
278
+ ) ;
279
+
280
+ Err ( EngineError :: Api { error } )
340
281
}
341
282
}
342
283
}
0 commit comments