Skip to content

Commit

Permalink
Detect future timestamp on queue push (#1570)
Browse files Browse the repository at this point in the history
* Add utility function to get the current system time

Signed-off-by: s8sato <49983831+s8sato@users.noreply.github.com>

* Detect future timestamp on queue push

Signed-off-by: s8sato <49983831+s8sato@users.noreply.github.com>
  • Loading branch information
s8sato authored Nov 2, 2021
1 parent 09fa17a commit 53190df
Show file tree
Hide file tree
Showing 7 changed files with 107 additions and 61 deletions.
9 changes: 3 additions & 6 deletions core/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@

#![allow(clippy::module_name_repetitions)]

use std::{collections::BTreeSet, iter, marker::PhantomData, time::SystemTime};
use std::{collections::BTreeSet, iter, marker::PhantomData};

use dashmap::{iter::Iter as MapIter, mapref::one::Ref as MapRef, DashMap};
use eyre::{Context, Result};
use iroha_crypto::{HashOf, KeyPair, SignatureOf, SignaturesOf};
use iroha_data_model::{events::prelude::*, transaction::prelude::*};
use iroha_data_model::{current_time, events::prelude::*, transaction::prelude::*};
use iroha_derive::Io;
use iroha_version::{declare_versioned_with_scale, version_with_scale};
use parity_scale_codec::{Decode, Encode};
Expand Down Expand Up @@ -100,10 +100,7 @@ impl PendingBlock {
/// Create a new `PendingBlock` from transactions.
pub fn new(transactions: Vec<VersionedAcceptedTransaction>) -> PendingBlock {
#[allow(clippy::expect_used)]
let timestamp = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.expect("Failed to get System Time.")
.as_millis();
let timestamp = current_time().as_millis();
PendingBlock {
timestamp,
transactions,
Expand Down
42 changes: 41 additions & 1 deletion core/src/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pub struct Queue {
txs_in_block: usize,
max_txs: usize,
ttl: Duration,
future_threshold: Duration,
}

/// Queue push error
Expand All @@ -34,6 +35,9 @@ pub enum Error {
/// Queue is full
#[error("Queue is full")]
Full,
/// Transaction is regarded to have been tampered to have a future timestamp
#[error("Transaction is regarded to have been tampered to have a future timestamp")]
InFuture,
/// Transaction expired
#[error("Transaction is expired")]
Expired,
Expand All @@ -58,6 +62,7 @@ impl Queue {
max_txs: cfg.maximum_transactions_in_queue as usize,
txs_in_block: cfg.maximum_transactions_in_block as usize,
ttl: Duration::from_millis(cfg.transaction_time_to_live_ms),
future_threshold: Duration::from_millis(cfg.future_threshold_ms),
}
}

Expand Down Expand Up @@ -110,6 +115,9 @@ impl Queue {
tx: VersionedAcceptedTransaction,
wsv: &WorldStateView<W>,
) -> Result<(), (VersionedAcceptedTransaction, Error)> {
if tx.is_in_future(self.future_threshold) {
return Err((tx, Error::InFuture));
}
if let Err(e) = self.check_tx(&tx, wsv) {
return Err((tx, e));
}
Expand Down Expand Up @@ -213,9 +221,10 @@ pub mod config {
use serde::{Deserialize, Serialize};

const DEFAULT_MAXIMUM_TRANSACTIONS_IN_BLOCK: u32 = 2_u32.pow(13);
const DEFAULT_MAXIMUM_TRANSACTIONS_IN_QUEUE: u32 = 2_u32.pow(16);
// 24 hours
const DEFAULT_TRANSACTION_TIME_TO_LIVE_MS: u64 = 24 * 60 * 60 * 1000;
const DEFAULT_MAXIMUM_TRANSACTIONS_IN_QUEUE: u32 = 2_u32.pow(16);
const DEFAULT_FUTURE_THRESHOLD_MS: u64 = 1000;

/// Configuration for `Queue`.
#[derive(Copy, Clone, Deserialize, Serialize, Debug, Configurable, PartialEq, Eq)]
Expand All @@ -229,6 +238,8 @@ pub mod config {
pub maximum_transactions_in_queue: u32,
/// The transaction will be dropped after this time if it is still in a `Queue`.
pub transaction_time_to_live_ms: u64,
/// The threshold to determine if a transaction has been tampered to have a future timestamp.
pub future_threshold_ms: u64,
}

impl Default for QueueConfiguration {
Expand All @@ -237,6 +248,7 @@ pub mod config {
maximum_transactions_in_block: DEFAULT_MAXIMUM_TRANSACTIONS_IN_BLOCK,
maximum_transactions_in_queue: DEFAULT_MAXIMUM_TRANSACTIONS_IN_QUEUE,
transaction_time_to_live_ms: DEFAULT_TRANSACTION_TIME_TO_LIVE_MS,
future_threshold_ms: DEFAULT_FUTURE_THRESHOLD_MS,
}
}
}
Expand Down Expand Up @@ -300,6 +312,7 @@ mod tests {
maximum_transactions_in_block: 2,
transaction_time_to_live_ms: 100_000,
maximum_transactions_in_queue: 100,
..QueueConfiguration::default()
});
let wsv = WorldStateView::new(world_with_test_domains(
KeyPair::generate().unwrap().public_key,
Expand All @@ -317,6 +330,7 @@ mod tests {
maximum_transactions_in_block: 2,
transaction_time_to_live_ms: 100_000,
maximum_transactions_in_queue: max_txs_in_queue,
..QueueConfiguration::default()
});
let wsv = WorldStateView::new(world_with_test_domains(
KeyPair::generate().unwrap().public_key,
Expand All @@ -342,6 +356,7 @@ mod tests {
maximum_transactions_in_block: 2,
transaction_time_to_live_ms: 100_000,
maximum_transactions_in_queue: max_txs_in_queue,
..QueueConfiguration::default()
});
let wsv = WorldStateView::new(world_with_test_domains(
KeyPair::generate().unwrap().public_key,
Expand All @@ -366,6 +381,7 @@ mod tests {
maximum_transactions_in_block: 2,
transaction_time_to_live_ms: 100_000,
maximum_transactions_in_queue: 100,
..QueueConfiguration::default()
});
let tx = Transaction::new(
Vec::new(),
Expand Down Expand Up @@ -408,6 +424,7 @@ mod tests {
maximum_transactions_in_block: max_block_tx,
transaction_time_to_live_ms: 100_000,
maximum_transactions_in_queue: 100,
..QueueConfiguration::default()
});
for _ in 0..5 {
queue
Expand All @@ -434,6 +451,7 @@ mod tests {
maximum_transactions_in_block: max_block_tx,
transaction_time_to_live_ms: 100_000,
maximum_transactions_in_queue: 100,
..QueueConfiguration::default()
});
assert!(matches!(
queue.push(tx, &wsv),
Expand All @@ -452,6 +470,7 @@ mod tests {
maximum_transactions_in_block: max_block_tx,
transaction_time_to_live_ms: 100_000,
maximum_transactions_in_queue: 100,
..QueueConfiguration::default()
});
queue.push(tx.clone(), &wsv).unwrap();
wsv.transactions.insert(tx.hash());
Expand All @@ -468,6 +487,7 @@ mod tests {
maximum_transactions_in_block: max_block_tx,
transaction_time_to_live_ms: 200,
maximum_transactions_in_queue: 100,
..QueueConfiguration::default()
});
for _ in 0..(max_block_tx - 1) {
queue
Expand Down Expand Up @@ -510,6 +530,7 @@ mod tests {
maximum_transactions_in_block: 2,
transaction_time_to_live_ms: 100_000,
maximum_transactions_in_queue: 100,
..QueueConfiguration::default()
});
queue
.push(
Expand Down Expand Up @@ -543,6 +564,7 @@ mod tests {
maximum_transactions_in_block: max_block_tx,
transaction_time_to_live_ms: 100_000,
maximum_transactions_in_queue: 100_000_000,
..QueueConfiguration::default()
}));

let start_time = Instant::now();
Expand Down Expand Up @@ -593,4 +615,22 @@ mod tests {
assert!(queue.txs.contains_key(&tx));
}
}

#[test]
fn push_tx_in_future() {
let future_threshold_ms = 1000;
let queue = Queue::from_configuration(&QueueConfiguration {
future_threshold_ms,
..QueueConfiguration::default()
});
let alice_key = KeyPair::generate().expect("Failed to generate keypair.");
let wsv = WorldStateView::new(world_with_test_domains(alice_key.public_key.clone()));

let mut tx = accepted_tx("alice", "wonderland", 100_000, Some(&alice_key));
assert!(queue.push(tx.clone(), &wsv).is_ok());
// tamper timestamp
tx.as_mut_inner_v1().payload.creation_time += 2 * future_threshold_ms;
assert!(matches!(queue.push(tx, &wsv), Err((_, Error::InFuture))));
assert_eq!(queue.txs.len(), 1);
}
}
32 changes: 10 additions & 22 deletions core/src/sumeragi/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,17 @@ use std::{
collections::{BTreeMap, HashSet},
fmt::{self, Debug, Formatter},
sync::Arc,
time::{Duration, Instant, SystemTime},
time::{Duration, Instant},
};

use dashmap::{DashMap, DashSet};
use eyre::{eyre, Result};
use futures::{future, prelude::*, stream::futures_unordered::FuturesUnordered};
use iroha_actor::{broker::*, prelude::*};
use iroha_crypto::{HashOf, KeyPair};
use iroha_data_model::{events::Event, peer::Id as PeerId, transaction::VersionedTransaction};
use iroha_data_model::{
current_time, events::Event, peer::Id as PeerId, transaction::VersionedTransaction,
};
use iroha_logger::Instrument;
use iroha_p2p::ConnectPeer;
use network_topology::{Role, Topology};
Expand Down Expand Up @@ -914,9 +916,7 @@ impl VotingBlock {
#[allow(clippy::expect_used)]
pub fn new(block: VersionedValidBlock) -> VotingBlock {
VotingBlock {
voted_at: SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.expect("Failed to get System Time."),
voted_at: current_time(),
block,
}
}
Expand All @@ -929,7 +929,7 @@ pub mod message {
use std::{
collections::HashSet,
sync::Arc,
time::{Duration, Instant, SystemTime},
time::{Duration, Instant},
};

use eyre::{Result, WrapErr};
Expand Down Expand Up @@ -1121,13 +1121,9 @@ pub mod message {
sumeragi: &Sumeragi<G, K, W>,
) -> bool {
let voting_block = sumeragi.voting_block.read().await.clone();
#[allow(clippy::expect_used)]
let current_time = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.expect("Failed to get System Time.");
voting_block.map_or(false, |voting_block| {
voting_block.block.hash() == reason.hash
&& (current_time - voting_block.voted_at) >= sumeragi.commit_time
&& (current_time() - voting_block.voted_at) >= sumeragi.commit_time
})
}

Expand Down Expand Up @@ -1525,9 +1521,7 @@ pub mod message {
let signature = SignatureOf::from_hash(key_pair.clone(), &hash)?;
Ok(TransactionReceipt {
hash,
received_at: SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.expect("Failed to get System Time."),
received_at: current_time(),
signature,
})
}
Expand All @@ -1541,11 +1535,7 @@ pub mod message {

/// Checks if the block should have been already created by the `Leader`.
pub fn is_block_should_be_created(&self, block_time: Duration) -> bool {
#[allow(clippy::expect_used)]
let current_time = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.expect("Failed to get System Time.");
(current_time - self.received_at) >= block_time
(current_time() - self.received_at) >= block_time
}

/// Handles this message as part of `Sumeragi` consensus.
Expand All @@ -1557,9 +1547,7 @@ pub mod message {
&self,
sumeragi: &mut Sumeragi<G, K, W>,
) -> Result<()> {
let now = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.wrap_err("Failed to get System Time.")?;
let now = current_time();

// Implausible time in the future, means that the leader lies
if sumeragi.topology.role(&sumeragi.peer_id) == Role::Leader
Expand Down
27 changes: 15 additions & 12 deletions core/src/tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,7 @@
//!
//! `Transaction` is the start of the Transaction lifecycle.

use std::{
cmp::min,
collections::BTreeSet,
time::{Duration, SystemTime},
};
use std::{cmp::min, collections::BTreeSet, time::Duration};

use eyre::{Result, WrapErr};
use iroha_crypto::{HashOf, SignaturesOf};
Expand Down Expand Up @@ -69,6 +65,11 @@ impl VersionedAcceptedTransaction {
self.as_inner_v1().is_expired(transaction_time_to_live)
}

/// If `true`, this transaction is regarded to have been tampered to have a future timestamp.
pub fn is_in_future(&self, threshold: Duration) -> bool {
self.as_inner_v1().is_in_future(threshold)
}

/// Move transaction lifecycle forward by checking an ability to apply instructions to the
/// `WorldStateView<W>`.
/// # Errors
Expand Down Expand Up @@ -172,18 +173,20 @@ impl AcceptedTransaction {
/// Checks if this transaction is waiting longer than specified in `transaction_time_to_live` from `QueueConfiguration` or `time_to_live_ms` of this transaction.
/// Meaning that the transaction will be expired as soon as the lesser of the specified TTLs was reached.
pub fn is_expired(&self, transaction_time_to_live: Duration) -> bool {
#[allow(clippy::expect_used)]
let current_time = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.expect("Failed to get System Time.");

current_time.saturating_sub(Duration::from_millis(self.payload.creation_time))
let tx_timestamp = Duration::from_millis(self.payload.creation_time);
current_time().saturating_sub(tx_timestamp)
> min(
Duration::from_millis(self.payload.time_to_live_ms),
transaction_time_to_live,
Duration::from_millis(self.payload.time_to_live_ms),
)
}

/// If `true`, this transaction is regarded to have been tampered to have a future timestamp.
pub fn is_in_future(&self, threshold: Duration) -> bool {
let tx_timestamp = Duration::from_millis(self.payload.creation_time);
tx_timestamp.saturating_sub(current_time()) > threshold
}

#[allow(clippy::unwrap_in_result)]
#[allow(clippy::expect_used)]
fn validate_internal<W: WorldTrait>(
Expand Down
Loading

0 comments on commit 53190df

Please sign in to comment.