Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/4950 signers terminate tenure early #4952

Merged
merged 7 commits into from
Jul 10, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/bitcoin-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ jobs:
- tests::signer::v0::block_proposal_rejection
- tests::signer::v0::miner_gather_signatures
- tests::signer::v0::mine_2_nakamoto_reward_cycles
- tests::signer::v0::end_of_tenure
- tests::nakamoto_integrations::stack_stx_burn_op_integration_test
- tests::nakamoto_integrations::check_block_heights
- tests::nakamoto_integrations::clarity_burn_state
Expand Down
2 changes: 2 additions & 0 deletions stacks-signer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ pub trait Signer<T: SignerEventTrait>: Debug + Display {
current_reward_cycle: u64,
command: Option<RunLoopCommand>,
);
/// Check if the signer is in the middle of processing blocks
fn has_pending_blocks(&self) -> bool;
}

/// A wrapper around the running signer type for the signer
Expand Down
34 changes: 27 additions & 7 deletions stacks-signer/src/runloop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,19 @@ use crate::client::{retry_with_exponential_backoff, ClientError, SignerSlotID, S
use crate::config::{GlobalConfig, SignerConfig};
use crate::Signer as SignerTrait;

/// The internal signer state info
#[derive(PartialEq, Clone, Debug)]
pub struct StateInfo {
/// the runloop state
pub runloop_state: State,
/// the current reward cycle info
pub reward_cycle_info: Option<RewardCycleInfo>,
}

/// The signer result that can be sent across threads
pub enum SignerResult {
/// The signer has received a status check
StatusCheck(State),
StatusCheck(StateInfo),
/// The signer has completed an operation
OperationResult(OperationResult),
}
Expand All @@ -48,9 +57,9 @@ impl From<OperationResult> for SignerResult {
}
}

impl From<State> for SignerResult {
fn from(state: State) -> Self {
SignerResult::StatusCheck(state)
impl From<StateInfo> for SignerResult {
fn from(state_info: StateInfo) -> Self {
SignerResult::StatusCheck(state_info)
}
}

Expand Down Expand Up @@ -375,10 +384,16 @@ impl<Signer: SignerTrait<T>, T: StacksMessageCodec + Clone + Send + Debug> RunLo
fn cleanup_stale_signers(&mut self, current_reward_cycle: u64) {
let mut to_delete = Vec::new();
for (idx, signer) in &mut self.stacks_signers {
if signer.reward_cycle() < current_reward_cycle {
let reward_cycle = signer.reward_cycle();
let next_reward_cycle = reward_cycle.wrapping_add(1);
let stale = match next_reward_cycle.cmp(&current_reward_cycle) {
std::cmp::Ordering::Less => true, // We are more than one reward cycle behind, so we are stale
std::cmp::Ordering::Equal => !signer.has_pending_blocks(), // We are the next reward cycle, so check if we have any pending blocks to process
std::cmp::Ordering::Greater => false, // We are the current reward cycle, so we are not stale
};
if stale {
debug!("{signer}: Signer's tenure has completed.");
to_delete.push(*idx);
continue;
}
}
for idx in to_delete {
Expand Down Expand Up @@ -452,7 +467,12 @@ impl<Signer: SignerTrait<T>, T: StacksMessageCodec + Clone + Send + Debug>
// This is the only event that we respond to from the outer signer runloop
if let Some(SignerEvent::StatusCheck) = event {
info!("Signer status check requested: {:?}.", self.state);
if let Err(e) = res.send(vec![self.state.into()]) {
if let Err(e) = res.send(vec![StateInfo {
runloop_state: self.state,
reward_cycle_info: self.current_reward_cycle_info,
}
.into()])
{
error!("Failed to send status check result: {e}.");
}
}
Expand Down
38 changes: 38 additions & 0 deletions stacks-signer/src/signerdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ CREATE TABLE IF NOT EXISTS blocks (
const CREATE_INDEXES: &str = "
CREATE INDEX IF NOT EXISTS blocks_signed_over ON blocks (signed_over);
CREATE INDEX IF NOT EXISTS blocks_consensus_hash ON blocks (consensus_hash);
CREATE INDEX IF NOT EXISTS blocks_valid ON blocks ((json_extract(block_info, '$.valid')));
";

const CREATE_SIGNER_STATE_TABLE: &str = "
Expand Down Expand Up @@ -230,6 +231,15 @@ impl SignerDb {

Ok(())
}

/// Determine if there are any pending blocks that have not yet been processed by checking the block_info.valid field
pub fn has_pending_blocks(&self, reward_cycle: u64) -> Result<bool, DBError> {
let query = "SELECT block_info FROM blocks WHERE reward_cycle = ? AND json_extract(block_info, '$.valid') IS NULL LIMIT 1";
let result: Option<String> =
query_row(&self.db, query, params!(&u64_to_sql(reward_cycle)?))?;

Ok(result.is_some())
}
}

fn try_deserialize<T>(s: Option<String>) -> Result<Option<T>, DBError>
Expand Down Expand Up @@ -420,6 +430,34 @@ mod tests {
.is_none());
}

#[test]
fn test_has_pending_blocks() {
jferrant marked this conversation as resolved.
Show resolved Hide resolved
let db_path = tmp_db_path();
let mut db = SignerDb::new(db_path).expect("Failed to create signer db");
let (mut block_info_1, _block_proposal) = create_block();
let (block_info_2, _block_proposal) = create_block();
let (block_info_3, _block_proposal) = create_block();
let (block_info_4, _block_proposal) = create_block();

db.insert_block(&block_info_1)
.expect("Unable to insert block into db");
db.insert_block(&block_info_2)
.expect("Unable to insert block into db");
db.insert_block(&block_info_3)
.expect("Unable to insert block into db");
db.insert_block(&block_info_4)
.expect("Unable to insert block into db");

assert!(db.has_pending_blocks(block_info_1.reward_cycle).unwrap());

block_info_1.valid = Some(true);

db.insert_block(&block_info_1)
.expect("Unable to update block in db");

assert!(!db.has_pending_blocks(block_info_1.reward_cycle).unwrap());
}

#[test]
fn test_sqlite_version() {
let db_path = tmp_db_path();
Expand Down
17 changes: 13 additions & 4 deletions stacks-signer/src/v0/signer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,13 @@ impl SignerTrait<SignerMessage> for Signer {
sortition_state: &mut Option<SortitionsView>,
event: Option<&SignerEvent<SignerMessage>>,
_res: Sender<Vec<SignerResult>>,
current_reward_cycle: u64,
_current_reward_cycle: u64,
) {
let event_parity = match event {
Some(SignerEvent::BlockValidationResponse(_)) => Some(current_reward_cycle % 2),
// Block proposal events do have reward cycles, but each proposal has its own cycle,
// and the vec could be heterogenous, so, don't differentiate.
Some(SignerEvent::MinerMessages(..))
Some(SignerEvent::BlockValidationResponse(_))
| Some(SignerEvent::MinerMessages(..))
| Some(SignerEvent::NewBurnBlock(_))
| Some(SignerEvent::StatusCheck)
| None => None,
Expand Down Expand Up @@ -163,6 +163,16 @@ impl SignerTrait<SignerMessage> for Signer {
warn!("{self}: Received a command: {command:?}. V0 Signers do not support commands. Ignoring...")
}
}

fn has_pending_blocks(&self) -> bool {
self.signer_db
.has_pending_blocks(self.reward_cycle)
.unwrap_or_else(|e| {
error!("{self}: Failed to check for pending blocks: {e:?}",);
// Assume we have pending blocks to prevent premature cleanup
true
})
}
}

impl From<SignerConfig> for Signer {
Expand Down Expand Up @@ -381,7 +391,6 @@ impl Signer {
}
};
block_info.valid = Some(true);
// TODO: do not sign the block if it fails signer state checks (forks, etc.)
let signature = self
.private_key
.sign(&signer_signature_hash.0)
Expand Down
10 changes: 10 additions & 0 deletions stacks-signer/src/v1/signer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,16 @@ impl SignerTrait<SignerMessage> for Signer {
}
self.process_next_command(stacks_client, current_reward_cycle);
}

fn has_pending_blocks(&self) -> bool {
self.signer_db
.has_pending_blocks(self.reward_cycle)
.unwrap_or_else(|e| {
error!("{self}: Failed to check if there are pending blocks: {e:?}");
// Assume there are pending blocks to prevent premature cleanup
true
})
}
}

impl Signer {
Expand Down
24 changes: 17 additions & 7 deletions testnet/stacks-node/src/tests/signer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ use stacks_common::types::StacksEpochId;
use stacks_common::util::hash::{hex_bytes, Sha512Trunc256Sum};
use stacks_signer::client::{SignerSlotID, StacksClient};
use stacks_signer::config::{build_signer_config_tomls, GlobalConfig as SignerConfig, Network};
use stacks_signer::runloop::{SignerResult, State};
use stacks_signer::runloop::{SignerResult, StateInfo};
use stacks_signer::{Signer, SpawnedSigner};
use wsts::state_machine::PublicKeys;

Expand Down Expand Up @@ -100,7 +100,11 @@ pub struct SignerTest<S> {
}

impl<S: Signer<T> + Send + 'static, T: SignerEventTrait + 'static> SignerTest<SpawnedSigner<S, T>> {
fn new(num_signers: usize, initial_balances: Vec<(StacksAddress, u64)>) -> Self {
fn new(
num_signers: usize,
initial_balances: Vec<(StacksAddress, u64)>,
wait_on_signers: Option<Duration>,
) -> Self {
// Generate Signer Data
let signer_stacks_private_keys = (0..num_signers)
.map(|_| StacksPrivateKey::new())
Expand All @@ -118,7 +122,11 @@ impl<S: Signer<T> + Send + 'static, T: SignerEventTrait + 'static> SignerTest<Sp
// That's the kind of thing an idiot would have on his luggage!
let password = "12345";
naka_conf.connection_options.block_proposal_token = Some(password.to_string());
naka_conf.miner.wait_on_signers = Duration::from_secs(10);
if let Some(wait_on_signers) = wait_on_signers {
naka_conf.miner.wait_on_signers = wait_on_signers;
} else {
naka_conf.miner.wait_on_signers = Duration::from_secs(10);
}

let run_stamp = rand::random();

Expand Down Expand Up @@ -160,7 +168,8 @@ impl<S: Signer<T> + Send + 'static, T: SignerEventTrait + 'static> SignerTest<Sp
}
}

fn send_status_request(&self) {
/// Send a status request to each spawned signer
pub fn send_status_request(&self) {
for port in 3000..3000 + self.spawned_signers.len() {
let endpoint = format!("http://localhost:{}", port);
let path = format!("{endpoint}/status");
Expand All @@ -174,7 +183,7 @@ impl<S: Signer<T> + Send + 'static, T: SignerEventTrait + 'static> SignerTest<Sp
}

/// Wait for the signers to respond to a status check
fn wait_for_states(&mut self, timeout: Duration) -> Vec<State> {
pub fn wait_for_states(&mut self, timeout: Duration) -> Vec<StateInfo> {
debug!("Waiting for Status...");
let now = std::time::Instant::now();
let mut states = Vec::with_capacity(self.spawned_signers.len());
Expand All @@ -194,8 +203,8 @@ impl<S: Signer<T> + Send + 'static, T: SignerEventTrait + 'static> SignerTest<Sp
SignerResult::OperationResult(_operation) => {
panic!("Recieved an operation result.");
}
SignerResult::StatusCheck(state) => {
states.push(state);
SignerResult::StatusCheck(state_info) => {
states.push(state_info);
}
}
}
Expand Down Expand Up @@ -547,6 +556,7 @@ fn setup_stx_btc_node(
EventKeyType::StackerDBChunks,
EventKeyType::BlockProposal,
EventKeyType::MinedBlocks,
EventKeyType::BurnchainBlocks,
],
});

Expand Down
Loading