Skip to content

Commit

Permalink
feat: Shadow tracking (#11689)
Browse files Browse the repository at this point in the history
Part of: near/near-one-project-tracking#65
An option for non-validator node to track shards of given validator.

During stateful -> stateless protocol upgrade a node will track all
shards and will require a lot of RAM. After the migration we can move
the validator key to a new, smaller node, that does not track all
shards.
To make it with minimal downtime, the new node needs to have appropriate
shards in place and memtries loaded in memory, then we hot swap the
validator key without stopping the new node.
But before that happen the new node is not a validator and we need a way
to tell it which validator's shards it should track.
  • Loading branch information
staffik authored Jul 5, 2024
1 parent ac23d14 commit 03a8b5d
Show file tree
Hide file tree
Showing 10 changed files with 188 additions and 24 deletions.
19 changes: 19 additions & 0 deletions chain/chain/src/test_utils/kv_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -957,6 +957,25 @@ impl EpochManagerAdapter for MockEpochManager {
Ok(true)
}

fn cares_about_shard_in_epoch(
&self,
epoch_id: EpochId,
account_id: &AccountId,
shard_id: ShardId,
) -> Result<bool, EpochError> {
// This `unwrap` here tests that in all code paths we check that the epoch exists before
// we check if we care about a shard. Please do not remove the unwrap, fix the logic of
// the calling function.
let epoch_valset = self.get_valset_for_epoch(&epoch_id).unwrap();
let chunk_producers = self.get_chunk_producers(epoch_valset, shard_id);
for validator in chunk_producers {
if validator.account_id() == account_id {
return Ok(true);
}
}
Ok(false)
}

fn cares_about_shard_from_prev_block(
&self,
parent_hash: &CryptoHash,
Expand Down
20 changes: 18 additions & 2 deletions chain/epoch-manager/src/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@ use near_primitives::block::Tip;
use near_primitives::block_header::{Approval, ApprovalInner, BlockHeader};
use near_primitives::epoch_manager::block_info::BlockInfo;
use near_primitives::epoch_manager::epoch_info::EpochInfo;
use near_primitives::epoch_manager::EpochConfig;
use near_primitives::epoch_manager::ShardConfig;
use near_primitives::epoch_manager::{EpochConfig, ShardConfig};
use near_primitives::errors::EpochError;
use near_primitives::hash::CryptoHash;
use near_primitives::shard_layout::{account_id_to_shard_id, ShardLayout, ShardLayoutError};
Expand Down Expand Up @@ -422,6 +421,13 @@ pub trait EpochManagerAdapter: Send + Sync {
partial_witness: &PartialEncodedStateWitness,
) -> Result<bool, Error>;

fn cares_about_shard_in_epoch(
&self,
epoch_id: EpochId,
account_id: &AccountId,
shard_id: ShardId,
) -> Result<bool, EpochError>;

fn cares_about_shard_from_prev_block(
&self,
parent_hash: &CryptoHash,
Expand Down Expand Up @@ -1142,4 +1148,14 @@ impl EpochManagerAdapter for EpochManagerHandle {
let epoch_manager = self.read();
Ok(epoch_manager.get_epoch_info(epoch_id)?.validators_iter().collect::<Vec<_>>())
}

fn cares_about_shard_in_epoch(
&self,
epoch_id: EpochId,
account_id: &AccountId,
shard_id: ShardId,
) -> Result<bool, EpochError> {
let epoch_manager = self.read();
epoch_manager.cares_about_shard_in_epoch(epoch_id, account_id, shard_id)
}
}
38 changes: 19 additions & 19 deletions chain/epoch-manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1138,6 +1138,25 @@ impl EpochManager {
self.get_epoch_info(&epoch_id)
}

pub fn cares_about_shard_in_epoch(
&self,
epoch_id: EpochId,
account_id: &AccountId,
shard_id: ShardId,
) -> Result<bool, EpochError> {
let epoch_info = self.get_epoch_info(&epoch_id)?;
let chunk_producers_settlement = epoch_info.chunk_producers_settlement();
let chunk_producers = chunk_producers_settlement
.get(shard_id as usize)
.ok_or_else(|| EpochError::ShardingError(format!("invalid shard id {shard_id}")))?;
for validator_id in chunk_producers.iter() {
if epoch_info.validator_account_id(*validator_id) == account_id {
return Ok(true);
}
}
Ok(false)
}

pub fn cares_about_shard_from_prev_block(
&self,
parent_hash: &CryptoHash,
Expand Down Expand Up @@ -1630,25 +1649,6 @@ impl EpochManager {

/// Private utilities for EpochManager.
impl EpochManager {
fn cares_about_shard_in_epoch(
&self,
epoch_id: EpochId,
account_id: &AccountId,
shard_id: ShardId,
) -> Result<bool, EpochError> {
let epoch_info = self.get_epoch_info(&epoch_id)?;
let chunk_producers_settlement = epoch_info.chunk_producers_settlement();
let chunk_producers = chunk_producers_settlement
.get(shard_id as usize)
.ok_or_else(|| EpochError::ShardingError(format!("invalid shard id {shard_id}")))?;
for validator_id in chunk_producers.iter() {
if epoch_info.validator_account_id(*validator_id) == account_id {
return Ok(true);
}
}
Ok(false)
}

#[inline]
pub(crate) fn block_producer_from_info(
epoch_info: &EpochInfo,
Expand Down
11 changes: 10 additions & 1 deletion chain/epoch-manager/src/shard_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,13 @@ use near_primitives::types::{AccountId, EpochId, ShardId};

#[derive(Clone)]
pub enum TrackedConfig {
/// Tracks shards that contain one of the given account.
Accounts(Vec<AccountId>),
/// Tracks shards that are assigned to given validator account.
ShadowValidator(AccountId),
/// Tracks all shards.
AllShards,
// Rotates between sets of shards to track.
/// Rotates between sets of shards to track.
Schedule(Vec<Vec<ShardId>>),
}

Expand All @@ -26,6 +30,8 @@ impl TrackedConfig {
TrackedConfig::AllShards
} else if !config.tracked_shard_schedule.is_empty() {
TrackedConfig::Schedule(config.tracked_shard_schedule.clone())
} else if let Some(account_id) = config.tracked_shadow_validator.as_ref() {
TrackedConfig::ShadowValidator(account_id.clone())
} else {
TrackedConfig::Accounts(config.tracked_accounts.clone())
}
Expand Down Expand Up @@ -90,6 +96,9 @@ impl ShardTracker {
let subset = &schedule[index as usize];
Ok(subset.contains(&shard_id))
}
TrackedConfig::ShadowValidator(account_id) => {
self.epoch_manager.cares_about_shard_in_epoch(*epoch_id, account_id, shard_id)
}
}
}

Expand Down
3 changes: 3 additions & 0 deletions core/chain-configs/src/client_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,8 @@ pub struct ClientConfig {
pub gc: GCConfig,
/// Accounts that this client tracks.
pub tracked_accounts: Vec<AccountId>,
/// Track shards that should be tracked by given validator.
pub tracked_shadow_validator: Option<AccountId>,
/// Shards that this client tracks.
pub tracked_shards: Vec<ShardId>,
/// Rotate between these sets of tracked shards.
Expand Down Expand Up @@ -539,6 +541,7 @@ impl ClientConfig {
block_header_fetch_horizon: 50,
gc: GCConfig { gc_blocks_limit: 100, ..GCConfig::default() },
tracked_accounts: vec![],
tracked_shadow_validator: None,
tracked_shards: vec![],
tracked_shard_schedule: vec![],
archive,
Expand Down
3 changes: 3 additions & 0 deletions nearcore/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ pub struct Config {
pub network: near_network::config_json::Config,
pub consensus: Consensus,
pub tracked_accounts: Vec<AccountId>,
pub tracked_shadow_validator: Option<AccountId>,
pub tracked_shards: Vec<ShardId>,
#[serde(skip_serializing_if = "Option::is_none")]
pub tracked_shard_schedule: Option<Vec<Vec<ShardId>>>,
Expand Down Expand Up @@ -337,6 +338,7 @@ impl Default for Config {
network: Default::default(),
consensus: Consensus::default(),
tracked_accounts: vec![],
tracked_shadow_validator: None,
tracked_shards: vec![],
tracked_shard_schedule: None,
archive: false,
Expand Down Expand Up @@ -557,6 +559,7 @@ impl NearConfig {
doosmslug_step_period: config.consensus.doomslug_step_period,
tracked_accounts: config.tracked_accounts,
tracked_shards: config.tracked_shards,
tracked_shadow_validator: config.tracked_shadow_validator,
tracked_shard_schedule: config.tracked_shard_schedule.unwrap_or(vec![]),
archive: config.archive,
save_trie_changes: config.save_trie_changes.unwrap_or(!config.archive),
Expand Down
4 changes: 4 additions & 0 deletions nearcore/src/config_duration_test.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use std::str::FromStr;

use crate::config::Config;
use near_jsonrpc::RpcConfig;
use near_network::config_json::{ExperimentalConfig, NetworkConfigOverrides};
use near_o11y::testonly::init_test_logger;
use near_primitives::types::AccountId;
use near_store::StoreConfig;
use serde::ser::{
SerializeMap, SerializeSeq, SerializeStruct, SerializeStructVariant, SerializeTuple,
Expand Down Expand Up @@ -40,6 +43,7 @@ fn test_config_duration_all_std() {
rosetta_rpc: Some(Default::default()),
save_trie_changes: Some(Default::default()),
split_storage: Some(Default::default()),
tracked_shadow_validator: Some(AccountId::from_str("test").unwrap()),
tracked_shard_schedule: Some(Default::default()),
transaction_pool_size_limit: Some(Default::default()),
state_sync: Some(Default::default()),
Expand Down
2 changes: 2 additions & 0 deletions nightly/pytest-sanity.txt
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ pytest --timeout=240 sanity/validator_switch_key.py
pytest --timeout=240 sanity/validator_switch_key.py --features nightly
pytest --timeout=120 sanity/validator_switch_key_quick.py
pytest --timeout=120 sanity/validator_switch_key_quick.py --features nightly
pytest --timeout=60 sanity/shadow_tracking.py
pytest --timeout=60 sanity/shadow_tracking.py --features nightly
pytest sanity/proxy_simple.py
pytest sanity/proxy_simple.py --features nightly
pytest sanity/proxy_restart.py
Expand Down
108 changes: 108 additions & 0 deletions pytest/tests/sanity/shadow_tracking.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
#!/usr/bin/env python3
# Starts two validating nodes, one RPC node, and one dumper node.
# Set the RPC node to shadow track one of the validators.
# Stop the RPC node for 1 epoch so that shard assignment changes.
# Restart the RPC node, wait for state sync.
# Ensure RPC node has chunks for the shards it supposed to track as shadow validator.
# Wait for 1 epoch so that shard assignment changes and do the check again, repeat 3 times.

import unittest
import sys
import pathlib

sys.path.append(str(pathlib.Path(__file__).resolve().parents[2] / 'lib'))

from configured_logger import logger
from cluster import start_cluster
import state_sync_lib
from utils import wait_for_blocks

EPOCH_LENGTH = 10
TIMEOUT = 100


class ShadowTrackingTest(unittest.TestCase):

def _get_final_block_height(self, nodes):
height_per_node = [node.get_latest_block().height for node in nodes]
min_height = min(height_per_node)
max_height = max(height_per_node)
self.assertGreaterEqual(min_height + 1, max_height, height_per_node)
return min_height

def _get_block_hash(self, block_height, node):
result = node.get_block_by_height(block_height)
self.assertNotIn('error', result, result)
self.assertIn('result', result, result)
return result['result']['header']['hash']

def _get_shard_assignment(self, rpc_node):
result = rpc_node.json_rpc('validators', 'latest')
self.assertNotIn('error', result, result)
self.assertIn('result', result, result)
validators = result['result']['current_validators']
shard_assigment = {}
for validator in validators:
shard_assigment[validator['account_id']] = validator['shards']
return shard_assigment

def _has_chunk(self, block_hash, shard_id, node):
result = node.json_rpc("chunk", {
"block_id": block_hash,
"shard_id": shard_id
})
if 'error' in result:
return False
self.assertIn('result', result, result)
return True

def test_shadow_tracking(self):
node_config_dump, node_config_sync = state_sync_lib.get_state_sync_configs_pair(
)
node_config_sync["tracked_shards"] = []
node_config_sync["store.load_mem_tries_for_tracked_shards"] = True
configs = {x: node_config_sync for x in range(3)}
configs[3] = node_config_dump

# Set RPC node to shadow track "test0".
configs[2]["tracked_shadow_validator"] = "test0"

nodes = start_cluster(
2, 2, 3, None,
[["epoch_length", EPOCH_LENGTH],
["shuffle_shard_assignment_for_chunk_producers", True],
["block_producer_kickout_threshold", 20],
["chunk_producer_kickout_threshold", 20]], configs)

for node in nodes:
node.stop_checking_store()

# Wait for 1 epoch so that shard shuffling kicks in.
wait_for_blocks(nodes[3], count=EPOCH_LENGTH)
logger.info('## Initial shard assignment: {}'.format(
self._get_shard_assignment(nodes[3])))

# Stop RPC node for 1 epoch, so that it has to state sync to a new shard tracked by "test0".
nodes[2].kill()
wait_for_blocks(nodes[3], count=EPOCH_LENGTH)
nodes[2].start(boot_node=nodes[3])
# Give it some time to catch up.
wait_for_blocks(nodes[3], count=EPOCH_LENGTH // 2)

round = 0
while True:
round += 1
shards = self._get_shard_assignment(nodes[3])
logger.info(f'## Round {round} shard assigment: {shards}')
block_height = self._get_final_block_height(nodes)
block_hash = self._get_block_hash(block_height, nodes[3])
for shard in shards['test0']:
# The RPC node should have chunk from a shard tracked by "test0".
self.assertTrue(self._has_chunk(block_hash, shard, nodes[2]))
if round == 3:
break
wait_for_blocks(nodes[3], count=EPOCH_LENGTH)


if __name__ == '__main__':
unittest.main()
4 changes: 2 additions & 2 deletions pytest/tests/sanity/validator_switch_key_quick.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def test_validator_switch_key_quick(self):
# that it will be assigned to when becoming a validator.
config_map = {
2: {
"tracked_shards": [0],
"tracked_shadow_validator": "test0",
"store.load_mem_tries_for_tracked_shards": True,
}
}
Expand All @@ -42,7 +42,7 @@ def test_validator_switch_key_quick(self):
["block_producer_kickout_threshold", 10],
["chunk_producer_kickout_threshold", 10]],
config_map)
wait_for_blocks(old_validator, count=2)
wait_for_blocks(old_validator, count=5)

new_validator.reset_validator_key(other_validator.validator_key)
other_validator.kill()
Expand Down

0 comments on commit 03a8b5d

Please sign in to comment.