Skip to content

Commit

Permalink
test: chunk validator kickout (#11672)
Browse files Browse the repository at this point in the history
Express the power of TestLoop by implementing scenario where chunk
validator-only node is kicked out due to low endorsement stats.

The logic is to simply prevent all chunks validated by selected accounts
from appearing on chain. But implementing this without adding extra
logic to `Client`, `ShardsManagerActor` and `Network` is a challenge.
TestLoop, however, can add overrides to itself, which are generic enough
to use for testing other scenarios, like ones in `chunks_management`
tests. New components are:

* `TestLoopChunksStorage` - global storage for all chunks ever observed
by monitoring client messages. Other options would be to extend network
messages content or understand how to query `ShardsManagerActor` on
network message override. However, global test loop storage makes sense
to me as it can serve for measuring health of the whole chain, if we
test other disruption scenarios.
* `partial_encoded_chunks_dropper` - overrides processing of network
messages related to chunks; if chunk is validated by the given account,
the message is dropped. The logic is generic enough to be extended to
kickout specific block/chunk producers, drop chunks for specific
heights/shards, etc.

---------

Co-authored-by: Bowen Wang <bowen@near.org>
  • Loading branch information
Longarithm and bowenwang1996 authored Jun 28, 2024
1 parent 80f08d7 commit a134185
Show file tree
Hide file tree
Showing 7 changed files with 332 additions and 19 deletions.
4 changes: 2 additions & 2 deletions chain/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ impl Client {
state_sync_adapter: Arc<RwLock<SyncAdapter>>,
runtime_adapter: Arc<dyn RuntimeAdapter>,
network_adapter: PeerManagerAdapter,
shards_manager_adapter: Sender<ShardsManagerRequestFromClient>,
shards_manager_sender: Sender<ShardsManagerRequestFromClient>,
validator_signer: MutableValidatorSigner,
enable_doomslug: bool,
rng_seed: RngSeed,
Expand Down Expand Up @@ -390,7 +390,7 @@ impl Client {
epoch_manager,
shard_tracker,
runtime_adapter,
shards_manager_adapter,
shards_manager_adapter: shards_manager_sender,
sharded_tx_pool,
network_adapter,
validator_signer,
Expand Down
20 changes: 16 additions & 4 deletions core/chain-configs/src/test_genesis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,11 +238,23 @@ impl TestGenesisBuilder {
self
}

pub fn kickouts_standard_90_percent(&mut self) -> &mut Self {
/// Validators with performance below 80% are kicked out, similarly to
/// mainnet as of 28 Jun 2024.
pub fn kickouts_standard_80_percent(&mut self) -> &mut Self {
self.kickouts_config = Some(KickoutsConfig {
block_producer_kickout_threshold: 90,
chunk_producer_kickout_threshold: 90,
chunk_validator_only_kickout_threshold: 90,
block_producer_kickout_threshold: 80,
chunk_producer_kickout_threshold: 80,
chunk_validator_only_kickout_threshold: 80,
});
self
}

/// Only chunk validator-only nodes can be kicked out.
pub fn kickouts_for_chunk_validators_only(&mut self) -> &mut Self {
self.kickouts_config = Some(KickoutsConfig {
block_producer_kickout_threshold: 0,
chunk_producer_kickout_threshold: 0,
chunk_validator_only_kickout_threshold: 50,
});
self
}
Expand Down
137 changes: 125 additions & 12 deletions integration-tests/src/test_loop/builder.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::sync::{Arc, RwLock};
use std::sync::{Arc, Mutex, RwLock};

use near_async::futures::FutureSpawner;
use near_async::messaging::{noop, IntoMultiSender, IntoSender, LateBoundSender};
Expand All @@ -23,8 +23,9 @@ use near_client::sync_jobs_actor::SyncJobsActor;
use near_client::test_utils::test_loop::test_loop_sync_actor_maker;
use near_client::{Client, PartialWitnessActor, SyncAdapter};
use near_epoch_manager::shard_tracker::{ShardTracker, TrackedConfig};
use near_epoch_manager::EpochManager;
use near_epoch_manager::{EpochManager, EpochManagerAdapter};
use near_network::test_loop::TestLoopPeerManagerActor;
use near_network::types::NetworkRequests;
use near_primitives::network::PeerId;
use near_primitives::test_utils::create_test_signer;
use near_primitives::types::AccountId;
Expand All @@ -36,18 +37,29 @@ use near_vm_runner::{ContractRuntimeCache, FilesystemContractRuntimeCache};
use nearcore::state_sync::StateSyncDumper;
use tempfile::TempDir;

use super::env::{TestData, TestLoopEnv};
use super::env::{ClientToShardsManagerSender, TestData, TestLoopChunksStorage, TestLoopEnv};

pub struct TestLoopBuilder {
test_loop: TestLoopV2,
genesis: Option<Genesis>,
clients: Vec<AccountId>,
/// Will store all chunks produced within the test loop.
chunks_storage: Arc<Mutex<TestLoopChunksStorage>>,
/// Whether test loop should drop all chunks validated by the given account.
drop_chunks_validated_by: Option<AccountId>,
gc: bool,
}

impl TestLoopBuilder {
pub fn new() -> Self {
Self { test_loop: TestLoopV2::new(), genesis: None, clients: vec![], gc: true }
Self {
test_loop: TestLoopV2::new(),
genesis: None,
clients: vec![],
chunks_storage: Default::default(),
drop_chunks_validated_by: None,
gc: true,
}
}

/// Get the clock for the test loop.
Expand All @@ -67,6 +79,11 @@ impl TestLoopBuilder {
self
}

pub fn drop_chunks_validated_by(mut self, account_id: &str) -> Self {
self.drop_chunks_validated_by = Some(account_id.parse().unwrap());
self
}

/// Disable garbage collection for the nodes.
/// TODO(#11605): should always be enabled, if it doesn't work, it's a bug.
pub fn disable_gc(mut self) -> Self {
Expand All @@ -92,13 +109,15 @@ impl TestLoopBuilder {
fn build_impl(mut self) -> TestLoopEnv {
let mut datas = Vec::new();
let mut network_adapters = Vec::new();
let mut epoch_manager_adapters = Vec::new();
let tempdir = tempfile::tempdir().unwrap();
for idx in 0..self.clients.len() {
let (data, network_adapter) = self.setup_client(idx, &tempdir);
let (data, network_adapter, epoch_manager_adapter) = self.setup_client(idx, &tempdir);
datas.push(data);
network_adapters.push(network_adapter);
epoch_manager_adapters.push(epoch_manager_adapter);
}
self.setup_network(&datas, &network_adapters);
self.setup_network(&datas, &network_adapters, &epoch_manager_adapters);

let env = TestLoopEnv { test_loop: self.test_loop, datas };
env.warmup()
Expand All @@ -108,11 +127,14 @@ impl TestLoopBuilder {
&mut self,
idx: usize,
tempdir: &TempDir,
) -> (TestData, Arc<LateBoundSender<TestLoopSender<TestLoopPeerManagerActor>>>) {
) -> (
TestData,
Arc<LateBoundSender<TestLoopSender<TestLoopPeerManagerActor>>>,
Arc<dyn EpochManagerAdapter>,
) {
let client_adapter = LateBoundSender::new();
let network_adapter = LateBoundSender::new();
let state_snapshot_adapter = LateBoundSender::new();
let shards_manager_adapter = LateBoundSender::new();
let partial_witness_adapter = LateBoundSender::new();
let sync_jobs_adapter = LateBoundSender::new();

Expand Down Expand Up @@ -194,6 +216,13 @@ impl TestLoopBuilder {
Some(Arc::new(create_test_signer(self.clients[idx].as_str()))),
"validator_signer",
);

let shards_manager_adapter = LateBoundSender::new();
let client_to_shards_manager_sender = Arc::new(ClientToShardsManagerSender {
sender: shards_manager_adapter.clone(),
chunks_storage: self.chunks_storage.clone(),
});

let client = Client::new(
self.test_loop.clock(),
client_config.clone(),
Expand All @@ -203,7 +232,7 @@ impl TestLoopBuilder {
state_sync_adapter,
runtime_adapter.clone(),
network_adapter.as_multi_sender(),
shards_manager_adapter.as_sender(),
client_to_shards_manager_sender.as_sender(),
validator_signer.clone(),
true,
[0; 32],
Expand Down Expand Up @@ -269,7 +298,7 @@ impl TestLoopBuilder {
clock: self.test_loop.clock(),
client_config,
chain_genesis,
epoch_manager,
epoch_manager: epoch_manager.clone(),
shard_tracker,
runtime: runtime_adapter,
validator: validator_signer,
Expand Down Expand Up @@ -312,17 +341,29 @@ impl TestLoopBuilder {
partial_witness_sender,
state_sync_dumper_handle,
};
(data, network_adapter)
(data, network_adapter, epoch_manager)
}

// TODO: we assume that all `Vec`s have the same length, consider
// joining them into one structure.
fn setup_network(
&mut self,
datas: &Vec<TestData>,
network_adapters: &Vec<Arc<LateBoundSender<TestLoopSender<TestLoopPeerManagerActor>>>>,
epoch_manager_adapters: &Vec<Arc<dyn EpochManagerAdapter>>,
) {
for (idx, data) in datas.iter().enumerate() {
let peer_manager_actor =
let mut peer_manager_actor =
TestLoopPeerManagerActor::new(self.test_loop.clock(), &data.account_id, datas);

if let Some(account_id) = &self.drop_chunks_validated_by {
peer_manager_actor.register_override_handler(partial_encoded_chunks_dropper(
self.chunks_storage.clone(),
epoch_manager_adapters[idx].clone(),
account_id.clone(),
));
}

self.test_loop.register_actor_for_index(
idx,
peer_manager_actor,
Expand All @@ -331,3 +372,75 @@ impl TestLoopBuilder {
}
}
}

/// Handler to drop all network messages relevant to chunk validated by
/// `validator_of_chunks_to_drop`. If number of nodes on chain is significant
/// enough (at least three?), this is enough to prevent chunk from being
/// included.
///
/// This logic can be easily extended to dropping chunk based on any rule.
pub fn partial_encoded_chunks_dropper(
chunks_storage: Arc<Mutex<TestLoopChunksStorage>>,
epoch_manager_adapter: Arc<dyn EpochManagerAdapter>,
validator_of_chunks_to_drop: AccountId,
) -> Arc<dyn Fn(NetworkRequests) -> Option<NetworkRequests>> {
Arc::new(move |request| {
// Filter out only messages related to distributing chunk in the
// network; extract `chunk_hash` from the message.
let chunk_hash = match &request {
NetworkRequests::PartialEncodedChunkRequest { request, .. } => {
Some(request.chunk_hash.clone())
}
NetworkRequests::PartialEncodedChunkResponse { response, .. } => {
Some(response.chunk_hash.clone())
}
NetworkRequests::PartialEncodedChunkMessage { partial_encoded_chunk, .. } => {
Some(partial_encoded_chunk.header.chunk_hash())
}
NetworkRequests::PartialEncodedChunkForward { forward, .. } => {
Some(forward.chunk_hash.clone())
}
_ => None,
};

let Some(chunk_hash) = chunk_hash else {
return Some(request);
};

let chunk = {
let chunks_storage = chunks_storage.lock().unwrap();
let chunk = chunks_storage.get(&chunk_hash).unwrap().clone();
let can_drop_chunk = chunks_storage.can_drop_chunk(&chunk);

if !can_drop_chunk {
return Some(request);
}

chunk
};

let prev_block_hash = chunk.prev_block_hash();
let shard_id = chunk.shard_id();
let height_created = chunk.height_created();

// If we don't have block on top of which chunk is built, we can't
// retrieve epoch id.
// This case appears to be too rare to interfere with the goal of
// dropping chunk.
let Ok(epoch_id) = epoch_manager_adapter.get_epoch_id_from_prev_block(prev_block_hash)
else {
return Some(request);
};

// Finally, we drop chunk if the given account is present in the list
// of its validators.
let chunk_validators = epoch_manager_adapter
.get_chunk_validator_assignments(&epoch_id, shard_id, height_created)
.unwrap();
if !chunk_validators.contains(&validator_of_chunks_to_drop) {
return Some(request);
}

return None;
})
}
65 changes: 64 additions & 1 deletion integration-tests/src/test_loop/env.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
use near_async::messaging::{IntoMultiSender, IntoSender, Sender};
use near_async::messaging::{CanSend, IntoMultiSender, IntoSender, LateBoundSender, Sender};
use near_async::test_loop::data::{TestLoopData, TestLoopDataHandle};
use near_async::test_loop::sender::TestLoopSender;
use near_async::test_loop::TestLoopV2;
use near_async::time::Duration;
use near_chunks::adapter::ShardsManagerRequestFromClient;
use near_chunks::shards_manager_actor::ShardsManagerActor;
use near_client::client_actor::ClientActorInner;
use near_client::PartialWitnessActor;
use near_network::shards_manager::ShardsManagerRequestFromNetwork;
use near_network::state_witness::PartialWitnessSenderForNetwork;
use near_network::test_loop::ClientSenderForTestLoopNetwork;
use near_primitives::sharding::{ChunkHash, ShardChunkHeader};
use near_primitives::types::AccountId;
use near_primitives_core::types::BlockHeight;
use nearcore::state_sync::StateSyncDumper;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};

const NETWORK_DELAY: Duration = Duration::milliseconds(10);

Expand Down Expand Up @@ -67,6 +72,64 @@ impl TestLoopEnv {
}
}

/// Stores all chunks ever observed on chain. Determines if a chunk can be
/// dropped within a test loop.
///
/// Needed to intercept network messages storing chunk hash only, while
/// interception requires more detailed information like shard id.
#[derive(Default)]
pub struct TestLoopChunksStorage {
/// Mapping from chunk hashes to headers.
storage: HashMap<ChunkHash, ShardChunkHeader>,
/// Minimal chunk height ever observed.
min_chunk_height: Option<BlockHeight>,
}

impl TestLoopChunksStorage {
pub fn insert(&mut self, chunk_header: ShardChunkHeader) {
let chunk_height = chunk_header.height_created();
self.min_chunk_height = Some(
self.min_chunk_height
.map_or(chunk_height, |current_height| current_height.min(chunk_height)),
);
self.storage.insert(chunk_header.chunk_hash(), chunk_header);
}

pub fn get(&self, chunk_hash: &ChunkHash) -> Option<&ShardChunkHeader> {
self.storage.get(chunk_hash)
}

/// If chunk height is too low, don't drop chunk, allow the chain to warm
/// up.
pub fn can_drop_chunk(&self, chunk_header: &ShardChunkHeader) -> bool {
self.min_chunk_height
.is_some_and(|min_height| chunk_header.height_created() >= min_height + 3)
}
}

/// Custom implementation of `Sender` for messages from `Client` to
/// `ShardsManagerActor` that allows to intercept all messages indicating
/// any chunk production and storing all chunks.
pub struct ClientToShardsManagerSender {
pub sender: Arc<LateBoundSender<TestLoopSender<ShardsManagerActor>>>,
/// Storage of chunks shared between all test loop nodes.
pub chunks_storage: Arc<Mutex<TestLoopChunksStorage>>,
}

impl CanSend<ShardsManagerRequestFromClient> for ClientToShardsManagerSender {
fn send(&self, message: ShardsManagerRequestFromClient) {
// `DistributeEncodedChunk` indicates that a certain chunk was produced.
if let ShardsManagerRequestFromClient::DistributeEncodedChunk { partial_chunk, .. } =
&message
{
let mut chunks_storage = self.chunks_storage.lock().unwrap();
chunks_storage.insert(partial_chunk.cloned_header());
}
// After maybe storing the chunk, send the message as usual.
self.sender.send(message);
}
}

pub struct TestData {
pub account_id: AccountId,
pub client_sender: TestLoopSender<ClientActorInner>,
Expand Down
Loading

0 comments on commit a134185

Please sign in to comment.