Skip to content

Commit

Permalink
[sui-node] Retry get_transaction_envelope() (MystenLabs#2356)
Browse files Browse the repository at this point in the history
  • Loading branch information
Evan Chan authored Jun 20, 2022
1 parent b4dbb67 commit 5aa9295
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 17 deletions.
21 changes: 17 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/sui-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ colored = "2.0.0"
curve25519-dalek = { version = "3", default-features = false }
thiserror = "1.0.30"
arc-swap = "1.5.0"
tokio-retry = "0.3"

sui-adapter = { path = "../sui-adapter" }
sui-framework = { path = "../sui-framework" }
Expand Down
39 changes: 30 additions & 9 deletions crates/sui-core/src/authority/authority_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use sui_types::batch::{SignedBatch, TxSequenceNumber};
use sui_types::committee::EpochId;
use sui_types::crypto::{AuthoritySignInfo, EmptySignInfo};
use sui_types::object::{Owner, OBJECT_START_VERSION};
use tokio_retry::strategy::{jitter, ExponentialBackoff};
use tracing::{debug, error, info, trace};
use typed_store::rocks::{DBBatch, DBMap};
use typed_store::{reopen, traits::Map};
Expand Down Expand Up @@ -240,6 +241,8 @@ impl<const ALL_OBJ_VER: bool, S: Eq + Serialize + for<'de> Deserialize<'de>>
}
}

// TODO: Async retry method, using tokio-retry crate.

/// Await a new pending certificate to be added
pub async fn wait_for_new_pending(&self) {
self.pending_notifier.notified().await
Expand Down Expand Up @@ -427,17 +430,35 @@ impl<const ALL_OBJ_VER: bool, S: Eq + Serialize + for<'de> Deserialize<'de>>
pub async fn get_transaction_envelope(
&self,
object_ref: &ObjectRef,
) -> Result<Option<TransactionEnvelope<S>>, SuiError> {
) -> SuiResult<Option<TransactionEnvelope<S>>> {
let transaction_option = self
.lock_service
.get_lock(*object_ref)
.await?
.ok_or(SuiError::TransactionLockDoesNotExist)?;

// Returns None if either no TX with the lock, or TX present but no entry in transactions table.
// However we retry a couple times because the TX is written after the lock is acquired, so it might
// just be a race.
match transaction_option {
Some(tx_digest) => {
return Ok(self.transactions.get(&tx_digest)?);
// .expect("Stored a lock without storing transaction?"),
let mut retry_strategy = ExponentialBackoff::from_millis(2)
.factor(10)
.map(jitter)
.take(3);
let mut tx_option = self.transactions.get(&tx_digest)?;
while tx_option.is_none() {
if let Some(duration) = retry_strategy.next() {
// Wait to retry
tokio::time::sleep(duration).await;
trace!(?tx_digest, "Retrying getting pending transaction");
} else {
// No more retries, just quit
break;
}
tx_option = self.transactions.get(&tx_digest)?;
}
Ok(tx_option)
}
None => Ok(None),
}
Expand Down Expand Up @@ -716,12 +737,6 @@ impl<const ALL_OBJ_VER: bool, S: Eq + Serialize + for<'de> Deserialize<'de>>
std::iter::once((transaction_digest, &certificate)),
)?;

// Once a transaction is done processing and effects committed, we no longer
// need it in the transactions table. This also allows us to track pending
// transactions.
write_batch =
write_batch.delete_batch(&self.transactions, std::iter::once(transaction_digest))?;

self.sequence_tx(
write_batch,
temporary_store,
Expand Down Expand Up @@ -851,6 +866,12 @@ impl<const ALL_OBJ_VER: bool, S: Eq + Serialize + for<'de> Deserialize<'de>>
}),
)?;

// Once a transaction is done processing and effects committed, we no longer
// need it in the transactions table. This also allows us to track pending
// transactions.
write_batch =
write_batch.delete_batch(&self.transactions, std::iter::once(transaction_digest))?;

if ALL_OBJ_VER {
// Keep all versions of every object if ALL_OBJ_VER is true.
write_batch = write_batch.insert_batch(
Expand Down
10 changes: 6 additions & 4 deletions crates/workspace-hack/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ getrandom-c65f7effa3be6d31 = { package = "getrandom", version = "0.1", default-f
getrandom-6f8ce4dd05d13bba = { package = "getrandom", version = "0.2", default-features = false, features = ["std"] }
gimli = { version = "0.26", default-features = false, features = ["read", "read-core"] }
glob = { version = "0.3", default-features = false }
globset = { version = "0.4", default-features = false }
globset = { version = "0.4", features = ["log"] }
globwalk = { version = "0.8", default-features = false }
gloo-net = { version = "0.2", default-features = false, features = ["futures-channel", "futures-core", "futures-sink", "json", "pin-project", "serde", "serde_json", "websocket"] }
gloo-timers = { version = "0.2", features = ["futures", "futures-channel", "futures-core"] }
Expand Down Expand Up @@ -431,7 +431,7 @@ target-spec = { version = "1", default-features = false, features = ["serde", "s
telemetry-subscribers = { git = "https://github.com/MystenLabs/mysten-infra", rev = "ff5c1d69057fe93be658377462ca2875a57a0223", features = ["chrome", "console-subscriber", "jaeger", "opentelemetry", "opentelemetry-jaeger", "tokio-console", "tracing-chrome", "tracing-opentelemetry"] }
temp_testdir = { version = "0.2", default-features = false }
tempfile = { version = "3", default-features = false }
tera = { version = "1", features = ["builtins", "chrono", "chrono-tz", "humansize", "percent-encoding", "rand", "slug"] }
tera = { version = "1", features = ["builtins", "chrono", "chrono-tz", "humansize", "percent-encoding", "rand", "slug", "urlencode"] }
termcolor = { version = "1", default-features = false }
test-fuzz = { version = "3", default-features = false }
test-fuzz-internal = { version = "3", default-features = false }
Expand All @@ -450,6 +450,7 @@ tinyvec = { version = "1", features = ["alloc", "tinyvec_macros"] }
tinyvec_macros = { version = "0.1", default-features = false }
tokio = { version = "1", features = ["bytes", "fs", "full", "io-std", "io-util", "libc", "macros", "memchr", "mio", "net", "num_cpus", "once_cell", "parking_lot", "process", "rt", "rt-multi-thread", "signal", "signal-hook-registry", "socket2", "sync", "test-util", "time", "tokio-macros", "tracing"] }
tokio-io-timeout = { version = "1", default-features = false }
tokio-retry = { version = "0.3", default-features = false }
tokio-rustls-3c51e837cfc5589a = { package = "tokio-rustls", version = "0.22", default-features = false }
tokio-rustls-2b5c6dc72f624058 = { package = "tokio-rustls", version = "0.23", features = ["logging", "tls12"] }
tokio-stream = { version = "0.1", features = ["fs", "net", "sync", "time", "tokio-util"] }
Expand Down Expand Up @@ -703,7 +704,7 @@ getrandom-c65f7effa3be6d31 = { package = "getrandom", version = "0.1", default-f
getrandom-6f8ce4dd05d13bba = { package = "getrandom", version = "0.2", default-features = false, features = ["std"] }
gimli = { version = "0.26", default-features = false, features = ["read", "read-core"] }
glob = { version = "0.3", default-features = false }
globset = { version = "0.4", default-features = false }
globset = { version = "0.4", features = ["log"] }
globwalk = { version = "0.8", default-features = false }
gloo-net = { version = "0.2", default-features = false, features = ["futures-channel", "futures-core", "futures-sink", "json", "pin-project", "serde", "serde_json", "websocket"] }
gloo-timers = { version = "0.2", features = ["futures", "futures-channel", "futures-core"] }
Expand Down Expand Up @@ -1010,7 +1011,7 @@ target-spec = { version = "1", default-features = false, features = ["serde", "s
telemetry-subscribers = { git = "https://github.com/MystenLabs/mysten-infra", rev = "ff5c1d69057fe93be658377462ca2875a57a0223", features = ["chrome", "console-subscriber", "jaeger", "opentelemetry", "opentelemetry-jaeger", "tokio-console", "tracing-chrome", "tracing-opentelemetry"] }
temp_testdir = { version = "0.2", default-features = false }
tempfile = { version = "3", default-features = false }
tera = { version = "1", features = ["builtins", "chrono", "chrono-tz", "humansize", "percent-encoding", "rand", "slug"] }
tera = { version = "1", features = ["builtins", "chrono", "chrono-tz", "humansize", "percent-encoding", "rand", "slug", "urlencode"] }
termcolor = { version = "1", default-features = false }
test-fuzz = { version = "3", default-features = false }
test-fuzz-internal = { version = "3", default-features = false }
Expand All @@ -1033,6 +1034,7 @@ tinyvec_macros = { version = "0.1", default-features = false }
tokio = { version = "1", features = ["bytes", "fs", "full", "io-std", "io-util", "libc", "macros", "memchr", "mio", "net", "num_cpus", "once_cell", "parking_lot", "process", "rt", "rt-multi-thread", "signal", "signal-hook-registry", "socket2", "sync", "test-util", "time", "tokio-macros", "tracing"] }
tokio-io-timeout = { version = "1", default-features = false }
tokio-macros = { version = "1", default-features = false }
tokio-retry = { version = "0.3", default-features = false }
tokio-rustls-3c51e837cfc5589a = { package = "tokio-rustls", version = "0.22", default-features = false }
tokio-rustls-2b5c6dc72f624058 = { package = "tokio-rustls", version = "0.23", features = ["logging", "tls12"] }
tokio-stream = { version = "0.1", features = ["fs", "net", "sync", "time", "tokio-util"] }
Expand Down

0 comments on commit 5aa9295

Please sign in to comment.