Skip to content

Commit

Permalink
client: Resend transactions using same mechanism as initial send (#915)
Browse files Browse the repository at this point in the history
* client: Timeout tpu sends

* Resend transactions one at a time, same as the first send

* Simplify resends to happen right after initial sends

* Skip already processed transaction errors
  • Loading branch information
joncinque authored Apr 26, 2024
1 parent 196626d commit ed11b72
Showing 1 changed file with 75 additions and 62 deletions.
137 changes: 75 additions & 62 deletions client/src/send_and_confirm_transactions_in_parallel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use {
},
bincode::serialize,
dashmap::DashMap,
futures_util::future::{join_all, FutureExt},
futures_util::future::join_all,
solana_quic_client::{QuicConfig, QuicConnectionManager, QuicPool},
solana_rpc_client::spinner::{self, SendTransactionProgress},
solana_rpc_client_api::{
Expand All @@ -28,12 +28,16 @@ use {
},
time::Duration,
},
tokio::{sync::RwLock, task::JoinHandle, time::Instant},
tokio::{sync::RwLock, task::JoinHandle},
};

const BLOCKHASH_REFRESH_RATE: Duration = Duration::from_secs(5);
const TPU_RESEND_REFRESH_RATE: Duration = Duration::from_secs(2);
const SEND_INTERVAL: Duration = Duration::from_millis(10);
// This is a "reasonable" constant for how long it should
// take to fan the transactions out, taken from
// `solana_tpu_client::nonblocking::tpu_client::send_wire_transaction_futures`
const SEND_TIMEOUT_INTERVAL: Duration = Duration::from_secs(5);

type QuicTpuClient = TpuClient<QuicPool, QuicConnectionManager, QuicConfig>;

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -141,8 +145,11 @@ fn create_transaction_confirmation_task(
})
{
num_confirmed_transactions.fetch_add(1, Ordering::Relaxed);
if let Some(error) = status.err {
errors_map.insert(data.index, error);
match status.err {
Some(TransactionError::AlreadyProcessed) | None => {}
Some(error) => {
errors_map.insert(data.index, error);
}
}
};
}
Expand Down Expand Up @@ -190,9 +197,12 @@ async fn send_transaction_with_rpc_fallback(
index: usize,
) -> Result<()> {
let send_over_rpc = if let Some(tpu_client) = tpu_client {
!tpu_client
.send_wire_transaction(serialized_transaction.clone())
.await
!tokio::time::timeout(
SEND_TIMEOUT_INTERVAL,
tpu_client.send_wire_transaction(serialized_transaction.clone()),
)
.await
.unwrap_or(false)
} else {
true
};
Expand Down Expand Up @@ -341,41 +351,21 @@ async fn confirm_transactions_till_block_height_and_resend_unexpired_transaction
let block_height = current_block_height.load(Ordering::Relaxed);

if let Some(tpu_client) = tpu_client {
let instant = Instant::now();
// retry sending transaction only over TPU port
// any transactions sent over RPC will be automatically rebroadcast by the RPC server
let txs_to_resend_over_tpu = unconfirmed_transaction_map
.iter()
.filter(|x| block_height < x.last_valid_block_height)
.map(|x| x.serialized_transaction.clone())
.collect::<Vec<_>>();
let num_txs_to_resend = txs_to_resend_over_tpu.len();
// This is a "reasonable" constant for how long it should
// take to fan the transactions out, taken from
// `solana_tpu_client::nonblocking::tpu_client::send_wire_transaction_futures`
const SEND_TIMEOUT_INTERVAL: Duration = Duration::from_secs(5);
let message = if tokio::time::timeout(
SEND_TIMEOUT_INTERVAL,
tpu_client.try_send_wire_transaction_batch(txs_to_resend_over_tpu),
send_staggered_transactions(
progress_bar,
tpu_client,
txs_to_resend_over_tpu,
max_valid_block_height,
context,
)
.await
.is_err()
{
format!("Timed out resending {num_txs_to_resend} transactions...")
} else {
format!("Resent {num_txs_to_resend} transactions...")
};

if let Some(progress_bar) = progress_bar {
let progress =
progress_from_context_and_block_height(context, max_valid_block_height);
progress.set_message_for_confirmed_transactions(progress_bar, &message);
}

let elapsed = instant.elapsed();
if elapsed < TPU_RESEND_REFRESH_RATE {
tokio::time::sleep(TPU_RESEND_REFRESH_RATE - elapsed).await;
}
.await;
} else {
tokio::time::sleep(Duration::from_millis(100)).await;
}
Expand All @@ -391,6 +381,41 @@ async fn confirm_transactions_till_block_height_and_resend_unexpired_transaction
}
}

async fn send_staggered_transactions(
progress_bar: &Option<indicatif::ProgressBar>,
tpu_client: &QuicTpuClient,
wire_transactions: Vec<Vec<u8>>,
last_valid_block_height: u64,
context: &SendingContext,
) {
let current_transaction_count = wire_transactions.len();
let futures = wire_transactions
.into_iter()
.enumerate()
.map(|(counter, transaction)| async move {
tokio::time::sleep(SEND_INTERVAL.saturating_mul(counter as u32)).await;
if let Some(progress_bar) = progress_bar {
let progress =
progress_from_context_and_block_height(context, last_valid_block_height);
progress.set_message_for_confirmed_transactions(
progress_bar,
&format!(
"Resending {}/{} transactions",
counter + 1,
current_transaction_count,
),
);
}
tokio::time::timeout(
SEND_TIMEOUT_INTERVAL,
tpu_client.send_wire_transaction(transaction),
)
.await
})
.collect::<Vec<_>>();
join_all(futures).await;
}

/// Sends and confirms transactions concurrently
///
/// The sending and confirmation of transactions is done in parallel tasks
Expand Down Expand Up @@ -483,33 +508,21 @@ pub async fn send_and_confirm_transactions_in_parallel<T: Signers + ?Sized>(
// clear the map so that we can start resending
unconfirmed_transasction_map.clear();

let futures = [
sign_all_messages_and_send(
&progress_bar,
&rpc_client,
&tpu_client,
messages_with_index,
signers,
&context,
)
.boxed_local(),
async {
// Give the signing and sending a head start before trying to
// confirm and resend
tokio::time::sleep(TPU_RESEND_REFRESH_RATE).await;
confirm_transactions_till_block_height_and_resend_unexpired_transaction_over_tpu(
&progress_bar,
&tpu_client,
&context,
)
.await;
// Infallible, but required to have the same return type as
// `sign_all_messages_and_send`
Ok(())
}
.boxed_local(),
];
join_all(futures).await.into_iter().collect::<Result<_>>()?;
sign_all_messages_and_send(
&progress_bar,
&rpc_client,
&tpu_client,
messages_with_index,
signers,
&context,
)
.await?;
confirm_transactions_till_block_height_and_resend_unexpired_transaction_over_tpu(
&progress_bar,
&tpu_client,
&context,
)
.await;

if unconfirmed_transasction_map.is_empty() {
break;
Expand Down

0 comments on commit ed11b72

Please sign in to comment.