Skip to content

Commit

Permalink
indexer: let handlers keep retrying (MystenLabs#7807)
Browse files Browse the repository at this point in the history
### problem and solution

Before this PR, the retrial starts with interval of 1 secs, each time
multiply by 1.5 until reaching 10 min of interval, if the last retrial
with interval of 10 min failed, the handlers would give up.

On testnet wave 2, FN were down for a while, where handlers eventually
gave up, this PR let handlers keep trying such that whenever FN resumed,
indexer will catch up automatically as well.


### testing
1. run locally with testnet wave 2 PRC to make sure that indexer built
after this PR on main is still compatible with testnet wave 2
2. test with local validator to make sure that handlers will indeed keep
trying
```
2023-01-29T06:08:44.785328Z  WARN sui_indexer::handlers::handler_orchestrator: Indexer object event handler failed with error: FullNodeReadingError("Failed reading event page with cursor None and error: RpcError(Transport(HTTP error: connection error: Connection reset by peer (os error 54)))"), retrying after 10 secs...
2023-01-29T06:08:44.785328Z  WARN sui_indexer::handlers::handler_orchestrator: Indexer publish event handler failed with error: FullNodeReadingError("Failed reading event page with cursor None and error: RpcError(Transport(HTTP error: connection closed before message completed))"), retrying after 10 secs...
2023-01-29T06:08:44.785328Z  WARN sui_indexer::handlers::handler_orchestrator: Indexer move event handler failed with error: FullNodeReadingError("Failed reading Move event page with cursor None and error: RpcError(Transport(HTTP error: connection closed before message completed))"), retrying after 10 secs...
2023-01-29T06:08:44.828469Z  WARN sui_indexer::handlers::handler_orchestrator: Indexer transaction handler failed with error: FullNodeReadingError("Failed reading transaction page with cursor None and err: RpcError(Transport(HTTP error: error trying to connect: tcp connect error: Connection refused (os error 61)))"), retrying after 10 secs...
2023-01-29T06:08:44.845842Z  WARN sui_indexer::handlers::handler_orchestrator: Indexer event handler failed with error: FullNodeReadingError("Failed reading transaction page with cursor None and err: RpcError(Transport(HTTP error: error trying to connect: tcp connect error: Connection refused (os error 61)))"), retrying after 10 secs...
```
  • Loading branch information
gegaowp authored Jan 30, 2023
1 parent 22f71d2 commit 0a5b3a7
Showing 1 changed file with 66 additions and 135 deletions.
201 changes: 66 additions & 135 deletions crates/sui-indexer/src/handlers/handler_orchestrator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ use std::sync::Arc;
use sui_indexer::PgConnectionPool;
use sui_sdk::SuiClient;

use backoff::future::retry;
use backoff::ExponentialBackoffBuilder;
use futures::future::try_join_all;
use prometheus::Registry;
use tracing::{error, info, warn};
Expand All @@ -18,7 +16,8 @@ use crate::handlers::transaction_handler::TransactionHandler;
use crate::handlers::move_event_handler::MoveEventHandler;
use crate::handlers::object_event_handler::ObjectEventHandler;
use crate::handlers::publish_event_handler::PublishEventHandler;
const BACKOFF_MAX_INTERVAL_IN_SECS: u64 = 600;

const HANDLER_RETRY_INTERVAL_IN_SECS: u64 = 10;

#[derive(Clone)]
pub struct HandlerOrchestrator {
Expand Down Expand Up @@ -74,159 +73,87 @@ impl HandlerOrchestrator {
);

let checkpoint_handle = tokio::task::spawn(async move {
let backoff_config = ExponentialBackoffBuilder::new()
.with_max_interval(std::time::Duration::from_secs(BACKOFF_MAX_INTERVAL_IN_SECS))
.build();
let checkpoint_res = retry(backoff_config, || async {
let checkpoint_handler_exec_res = checkpoint_handler.start().await;
if let Err(e) = checkpoint_handler_exec_res.clone() {
checkpoint_handler
.checkpoint_handler_metrics
.total_checkpoint_handler_error
.inc();
warn!(
"Indexer checkpoint handler failed with error: {:?}, retrying...",
e
);
}
Ok(checkpoint_handler_exec_res?)
})
.await;
if let Err(e) = checkpoint_res {
error!(
"Indexer checkpoint handler failed after retrials with error: {:?}!",
e
let mut checkpoint_handler_exec_res = checkpoint_handler.start().await;
while let Err(e) = checkpoint_handler_exec_res.clone() {
warn!(
"Indexer checkpoint handler failed with error: {:?}, retrying after {:?} secs...",
e, HANDLER_RETRY_INTERVAL_IN_SECS
);
tokio::time::sleep(std::time::Duration::from_secs(
HANDLER_RETRY_INTERVAL_IN_SECS,
))
.await;
checkpoint_handler_exec_res = checkpoint_handler.start().await;
}
});
let txn_handle = tokio::task::spawn(async move {
let backoff_config = ExponentialBackoffBuilder::new()
.with_max_interval(std::time::Duration::from_secs(BACKOFF_MAX_INTERVAL_IN_SECS))
.build();
let txn_res = retry(backoff_config, || async {
let txn_handler_exec_res = txn_handler.start().await;
if let Err(e) = txn_handler_exec_res.clone() {
txn_handler
.transaction_handler_metrics
.total_transaction_handler_error
.inc();
warn!(
"Indexer transaction handler failed with error: {:?}, retrying...",
e
);
}
Ok(txn_handler_exec_res?)
})
.await;
if let Err(e) = txn_res {
error!(
"Indexer transaction handler failed after retrials with error: {:?}!",
e
let mut txn_handler_exec_res = txn_handler.start().await;
while let Err(e) = txn_handler_exec_res.clone() {
warn!(
"Indexer transaction handler failed with error: {:?}, retrying after {:?} secs...",
e, HANDLER_RETRY_INTERVAL_IN_SECS
);
tokio::time::sleep(std::time::Duration::from_secs(
HANDLER_RETRY_INTERVAL_IN_SECS,
))
.await;
txn_handler_exec_res = txn_handler.start().await;
}
});
let event_handle = tokio::task::spawn(async move {
let backoff_config = ExponentialBackoffBuilder::new()
.with_max_interval(std::time::Duration::from_secs(BACKOFF_MAX_INTERVAL_IN_SECS))
.build();
let event_res = retry(backoff_config, || async {
let event_handler_exec_res = event_handler.start().await;
if let Err(e) = event_handler_exec_res.clone() {
event_handler
.event_handler_metrics
.total_event_handler_error
.inc();
warn!(
"Indexer event handler failed with error: {:?}, retrying...",
e
);
}
Ok(event_handler_exec_res?)
})
.await;
if let Err(e) = event_res {
error!(
"Indexer event handler failed after retrials with error: {:?}",
e
let mut event_handler_exec_res = event_handler.start().await;
while let Err(e) = event_handler_exec_res.clone() {
warn!(
"Indexer event handler failed with error: {:?}, retrying after {:?} secs...",
e, HANDLER_RETRY_INTERVAL_IN_SECS
);
tokio::time::sleep(std::time::Duration::from_secs(
HANDLER_RETRY_INTERVAL_IN_SECS,
))
.await;
event_handler_exec_res = event_handler.start().await;
}
});
let object_event_handle = tokio::task::spawn(async move {
let backoff_config = ExponentialBackoffBuilder::new()
.with_max_interval(std::time::Duration::from_secs(BACKOFF_MAX_INTERVAL_IN_SECS))
.build();
let object_event_res = retry(backoff_config, || async {
let object_event_handler_exec_res = obj_event_handler.start().await;
if let Err(e) = object_event_handler_exec_res.clone() {
obj_event_handler
.event_handler_metrics
.total_object_event_handler_error
.inc();
warn!(
"Indexer object event handler failed with error: {:?}, retrying...",
e
);
}
Ok(object_event_handler_exec_res?)
})
.await;
if let Err(e) = object_event_res {
error!(
"Indexer object event handler failed after retrials with error: {:?}",
e
let mut obj_event_handler_exec_res = obj_event_handler.start().await;
while let Err(e) = obj_event_handler_exec_res.clone() {
warn!(
"Indexer object event handler failed with error: {:?}, retrying after {:?} secs...",
e, HANDLER_RETRY_INTERVAL_IN_SECS
);
tokio::time::sleep(std::time::Duration::from_secs(
HANDLER_RETRY_INTERVAL_IN_SECS,
))
.await;
obj_event_handler_exec_res = obj_event_handler.start().await;
}
});
let publish_event_handle = tokio::task::spawn(async move {
let backoff_config = ExponentialBackoffBuilder::new()
.with_max_interval(std::time::Duration::from_secs(BACKOFF_MAX_INTERVAL_IN_SECS))
.build();
let publish_event_res = retry(backoff_config, || async {
let publish_event_handler_exec_res = publish_event_handler.start().await;
if let Err(e) = publish_event_handler_exec_res.clone() {
publish_event_handler
.event_handler_metrics
.total_publish_event_handler_error
.inc();
warn!(
"Indexer publish event handler failed with error: {:?}, retrying...",
e
);
}
Ok(publish_event_handler_exec_res?)
})
.await;
if let Err(e) = publish_event_res {
error!(
"Indexer publish event handler failed after retrials with error: {:?}",
e
let mut publish_event_handler_exec_res = publish_event_handler.start().await;
while let Err(e) = publish_event_handler_exec_res.clone() {
warn!(
"Indexer publish event handler failed with error: {:?}, retrying after {:?} secs...",
e, HANDLER_RETRY_INTERVAL_IN_SECS
);
tokio::time::sleep(std::time::Duration::from_secs(
HANDLER_RETRY_INTERVAL_IN_SECS,
))
.await;
publish_event_handler_exec_res = publish_event_handler.start().await;
}
});
let move_event_handle = tokio::task::spawn(async move {
let backoff_config = ExponentialBackoffBuilder::new()
.with_max_interval(std::time::Duration::from_secs(BACKOFF_MAX_INTERVAL_IN_SECS))
.build();
let move_event_res = retry(backoff_config, || async {
let move_event_handler_exec_res = move_handler.start().await;
if let Err(e) = move_event_handler_exec_res.clone() {
move_handler
.event_handler_metrics
.total_move_event_handler_error
.inc();
warn!(
"Indexer move event handler failed with error: {:?}, retrying...",
e
);
}
Ok(move_event_handler_exec_res?)
})
.await;
if let Err(e) = move_event_res {
error!(
"Indexer move event handler failed after retrials with error: {:?}",
e
let mut move_event_handler_exec_res = move_handler.start().await;
while let Err(e) = move_event_handler_exec_res.clone() {
warn!(
"Indexer move event handler failed with error: {:?}, retrying after {:?} secs...",
e, HANDLER_RETRY_INTERVAL_IN_SECS
);
tokio::time::sleep(std::time::Duration::from_secs(
HANDLER_RETRY_INTERVAL_IN_SECS,
))
.await;
move_event_handler_exec_res = move_handler.start().await;
}
});
try_join_all(vec![
Expand All @@ -238,6 +165,10 @@ impl HandlerOrchestrator {
move_event_handle,
])
.await
.map_err(|e| {
error!("Indexer handler orchestrator failed with error: {:?}", e);
e
})
.expect("Handler orchestrator should not run into errors.");
}
}

0 comments on commit 0a5b3a7

Please sign in to comment.