Skip to content

Commit

Permalink
[Test] for transaction completion, wait on effects availability inste…
Browse files Browse the repository at this point in the history
…ad of batch service (MystenLabs#6647)
  • Loading branch information
mwtian authored Dec 12, 2022
1 parent abe8d8f commit 2c8aeaf
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 78 deletions.
115 changes: 37 additions & 78 deletions crates/sui-core/src/test_utils.rs
Original file line number Diff line number Diff line change
@@ -1,100 +1,59 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::authority::AuthorityState;
use crate::authority::{AuthorityState, EffectsNotifyRead};
use signature::Signer;
use std::collections::HashSet;
use std::sync::Arc;
use std::time::Duration;

use sui_types::{
base_types::{dbg_addr, ObjectID, TransactionDigest},
batch::UpdateItem,
base_types::{
dbg_addr, random_object_ref, AuthorityName, ExecutionDigests, ObjectID, TransactionDigest,
},
committee::Committee,
crypto::{get_key_pair, AccountKeyPair, AuthoritySignInfo, AuthoritySignature, Signature},
gas::GasCostSummary,
intent::{Intent, IntentMessage},
message_envelope::Message,
messages::{
BatchInfoRequest, BatchInfoResponseItem, Transaction, TransactionData, VerifiedTransaction,
CertifiedTransaction, ExecutionStatus, Transaction, TransactionData, TransactionEffects,
VerifiedTransaction,
},
object::Object,
object::{Object, Owner},
};
use tokio::time::timeout;
use tracing::{info, warn};

use futures::StreamExt;
use sui_types::base_types::{random_object_ref, AuthorityName, ExecutionDigests};
use sui_types::committee::Committee;
use sui_types::gas::GasCostSummary;
use sui_types::message_envelope::Message;
use sui_types::messages::{CertifiedTransaction, ExecutionStatus, TransactionEffects};
use sui_types::object::Owner;
use tokio::time::sleep;
use tracing::info;
const WAIT_FOR_TX_TIMEOUT: Duration = Duration::from_secs(15);

pub async fn wait_for_tx(wait_digest: TransactionDigest, state: Arc<AuthorityState>) {
wait_for_all_txes(vec![wait_digest], state).await
pub async fn wait_for_tx(digest: TransactionDigest, state: Arc<AuthorityState>) {
match timeout(
WAIT_FOR_TX_TIMEOUT,
state.database.notify_read_effects(vec![digest]),
)
.await
{
Ok(_) => info!(?digest, "digest found"),
Err(e) => {
warn!(?digest, "digest not found!");
panic!("timed out waiting for effects of digest! {e}");
}
}
}

pub async fn wait_for_all_txes(wait_digests: Vec<TransactionDigest>, state: Arc<AuthorityState>) {
let mut wait_digests: HashSet<_> = wait_digests.iter().collect();

let mut timeout = Box::pin(sleep(Duration::from_millis(15_000)));

let mut max_seq = Some(0);

let mut stream = Box::pin(
state
.handle_batch_streaming(BatchInfoRequest {
start: max_seq,
length: 1000,
})
.await
.unwrap(),
);

loop {
tokio::select! {
_ = &mut timeout => panic!("wait_for_tx timed out"),

items = &mut stream.next() => {
match items {
// Upon receiving a batch
Some(Ok(BatchInfoResponseItem(UpdateItem::Batch(batch)) )) => {
max_seq = Some(batch.data().next_sequence_number);
info!(?max_seq, "Received Batch");
}
// Upon receiving a transaction digest we store it, if it is not processed already.
Some(Ok(BatchInfoResponseItem(UpdateItem::Transaction((_seq, digest))))) => {
info!(?digest, "Received Transaction");
if wait_digests.remove(&digest.transaction) {
info!(?digest, "Digest found");
}
if wait_digests.is_empty() {
info!(?digest, "all digests found");
break;
}
},

Some(Err( err )) => panic!("{}", err),
None => {
info!(?max_seq, "Restarting Batch");
stream = Box::pin(
state
.handle_batch_streaming(BatchInfoRequest {
start: max_seq,
length: 1000,
})
.await
.unwrap(),
);

}
}
},
pub async fn wait_for_all_txes(digests: Vec<TransactionDigest>, state: Arc<AuthorityState>) {
match timeout(
WAIT_FOR_TX_TIMEOUT,
state.database.notify_read_effects(digests.clone()),
)
.await
{
Ok(_) => info!(?digests, "all digests found"),
Err(e) => {
warn!(?digests, "some digests not found!");
panic!("timed out waiting for effects of digests! {e}");
}
}

// A small delay is needed so that the batch process can finish notifying other subscribers,
// which tests may depend on. Otherwise tests can pass or fail depending on whether the
// subscriber in this function was notified first or last.
sleep(Duration::from_millis(10)).await;
}

// Creates a fake sender-signed transaction for testing. This transaction will
Expand Down
3 changes: 3 additions & 0 deletions crates/sui/tests/full_node_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ async fn test_full_node_follows_txes() -> Result<(), anyhow::Error> {

wait_for_tx(digest, node.state().clone()).await;

// A small delay is needed for post processing operations following the transaction to finish.
sleep(Duration::from_secs(1)).await;

// verify that the intermediate sync data is cleared.
let sync_store = node.state().node_sync_store.clone();
let epoch_id = 0;
Expand Down

0 comments on commit 2c8aeaf

Please sign in to comment.