Skip to content

Commit

Permalink
Merge pull request #4952 from stacks-network/fix/4950-signers-termina…
Browse files Browse the repository at this point in the history
…te-tenure-early

Fix/4950 signers terminate tenure early
  • Loading branch information
wileyj authored Jul 10, 2024
2 parents 3c0bb38 + 25ca77e commit 108bf0e
Show file tree
Hide file tree
Showing 9 changed files with 242 additions and 66 deletions.
1 change: 1 addition & 0 deletions .github/workflows/bitcoin-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ jobs:
- tests::signer::v0::block_proposal_rejection
- tests::signer::v0::miner_gather_signatures
- tests::signer::v0::mine_2_nakamoto_reward_cycles
- tests::signer::v0::end_of_tenure
- tests::nakamoto_integrations::stack_stx_burn_op_integration_test
- tests::nakamoto_integrations::check_block_heights
- tests::nakamoto_integrations::clarity_burn_state
Expand Down
2 changes: 2 additions & 0 deletions stacks-signer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ pub trait Signer<T: SignerEventTrait>: Debug + Display {
current_reward_cycle: u64,
command: Option<RunLoopCommand>,
);
/// Check if the signer is in the middle of processing blocks
fn has_pending_blocks(&self) -> bool;
}

/// A wrapper around the running signer type for the signer
Expand Down
34 changes: 27 additions & 7 deletions stacks-signer/src/runloop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,19 @@ use crate::client::{retry_with_exponential_backoff, ClientError, SignerSlotID, S
use crate::config::{GlobalConfig, SignerConfig};
use crate::Signer as SignerTrait;

/// The internal signer state info
#[derive(PartialEq, Clone, Debug)]
pub struct StateInfo {
/// the runloop state
pub runloop_state: State,
/// the current reward cycle info
pub reward_cycle_info: Option<RewardCycleInfo>,
}

/// The signer result that can be sent across threads
pub enum SignerResult {
/// The signer has received a status check
StatusCheck(State),
StatusCheck(StateInfo),
/// The signer has completed an operation
OperationResult(OperationResult),
}
Expand All @@ -48,9 +57,9 @@ impl From<OperationResult> for SignerResult {
}
}

impl From<State> for SignerResult {
fn from(state: State) -> Self {
SignerResult::StatusCheck(state)
impl From<StateInfo> for SignerResult {
fn from(state_info: StateInfo) -> Self {
SignerResult::StatusCheck(state_info)
}
}

Expand Down Expand Up @@ -375,10 +384,16 @@ impl<Signer: SignerTrait<T>, T: StacksMessageCodec + Clone + Send + Debug> RunLo
fn cleanup_stale_signers(&mut self, current_reward_cycle: u64) {
let mut to_delete = Vec::new();
for (idx, signer) in &mut self.stacks_signers {
if signer.reward_cycle() < current_reward_cycle {
let reward_cycle = signer.reward_cycle();
let next_reward_cycle = reward_cycle.wrapping_add(1);
let stale = match next_reward_cycle.cmp(&current_reward_cycle) {
std::cmp::Ordering::Less => true, // We are more than one reward cycle behind, so we are stale
std::cmp::Ordering::Equal => !signer.has_pending_blocks(), // We are the next reward cycle, so check if we have any pending blocks to process
std::cmp::Ordering::Greater => false, // We are the current reward cycle, so we are not stale
};
if stale {
debug!("{signer}: Signer's tenure has completed.");
to_delete.push(*idx);
continue;
}
}
for idx in to_delete {
Expand Down Expand Up @@ -452,7 +467,12 @@ impl<Signer: SignerTrait<T>, T: StacksMessageCodec + Clone + Send + Debug>
// This is the only event that we respond to from the outer signer runloop
if let Some(SignerEvent::StatusCheck) = event {
info!("Signer status check requested: {:?}.", self.state);
if let Err(e) = res.send(vec![self.state.into()]) {
if let Err(e) = res.send(vec![StateInfo {
runloop_state: self.state,
reward_cycle_info: self.current_reward_cycle_info,
}
.into()])
{
error!("Failed to send status check result: {e}.");
}
}
Expand Down
48 changes: 47 additions & 1 deletion stacks-signer/src/signerdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ CREATE TABLE IF NOT EXISTS blocks (
const CREATE_INDEXES: &str = "
CREATE INDEX IF NOT EXISTS blocks_signed_over ON blocks (signed_over);
CREATE INDEX IF NOT EXISTS blocks_consensus_hash ON blocks (consensus_hash);
CREATE INDEX IF NOT EXISTS blocks_valid ON blocks ((json_extract(block_info, '$.valid')));
";

const CREATE_SIGNER_STATE_TABLE: &str = "
Expand Down Expand Up @@ -191,7 +192,7 @@ impl SignerDb {
tenure: &ConsensusHash,
) -> Result<Option<BlockInfo>, DBError> {
let query = "SELECT block_info FROM blocks WHERE consensus_hash = ? AND signed_over = 1 ORDER BY stacks_height DESC LIMIT 1";
let result: Option<String> = query_row(&self.db, query, &[tenure])?;
let result: Option<String> = query_row(&self.db, query, [tenure])?;

try_deserialize(result)
}
Expand Down Expand Up @@ -230,6 +231,15 @@ impl SignerDb {

Ok(())
}

/// Determine if there are any pending blocks that have not yet been processed by checking the block_info.valid field
pub fn has_pending_blocks(&self, reward_cycle: u64) -> Result<bool, DBError> {
let query = "SELECT block_info FROM blocks WHERE reward_cycle = ? AND json_extract(block_info, '$.valid') IS NULL LIMIT 1";
let result: Option<String> =
query_row(&self.db, query, params!(&u64_to_sql(reward_cycle)?))?;

Ok(result.is_some())
}
}

fn try_deserialize<T>(s: Option<String>) -> Result<Option<T>, DBError>
Expand Down Expand Up @@ -260,6 +270,7 @@ mod tests {
use blockstack_lib::chainstate::nakamoto::{
NakamotoBlock, NakamotoBlockHeader, NakamotoBlockVote,
};
use clarity::util::secp256k1::MessageSignature;
use libsigner::BlockProposal;

use super::*;
Expand Down Expand Up @@ -420,6 +431,41 @@ mod tests {
.is_none());
}

#[test]
fn test_has_pending_blocks() {
let db_path = tmp_db_path();
let mut db = SignerDb::new(db_path).expect("Failed to create signer db");
let (mut block_info_1, _block_proposal) = create_block_override(|b| {
b.block.header.miner_signature = MessageSignature([0x01; 65]);
b.burn_height = 1;
});
let (mut block_info_2, _block_proposal) = create_block_override(|b| {
b.block.header.miner_signature = MessageSignature([0x02; 65]);
b.burn_height = 2;
});

db.insert_block(&block_info_1)
.expect("Unable to insert block into db");
db.insert_block(&block_info_2)
.expect("Unable to insert block into db");

assert!(db.has_pending_blocks(block_info_1.reward_cycle).unwrap());

block_info_1.valid = Some(true);

db.insert_block(&block_info_1)
.expect("Unable to update block in db");

assert!(db.has_pending_blocks(block_info_1.reward_cycle).unwrap());

block_info_2.valid = Some(true);

db.insert_block(&block_info_2)
.expect("Unable to update block in db");

assert!(!db.has_pending_blocks(block_info_1.reward_cycle).unwrap());
}

#[test]
fn test_sqlite_version() {
let db_path = tmp_db_path();
Expand Down
17 changes: 13 additions & 4 deletions stacks-signer/src/v0/signer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,13 @@ impl SignerTrait<SignerMessage> for Signer {
sortition_state: &mut Option<SortitionsView>,
event: Option<&SignerEvent<SignerMessage>>,
_res: Sender<Vec<SignerResult>>,
current_reward_cycle: u64,
_current_reward_cycle: u64,
) {
let event_parity = match event {
Some(SignerEvent::BlockValidationResponse(_)) => Some(current_reward_cycle % 2),
// Block proposal events do have reward cycles, but each proposal has its own cycle,
// and the vec could be heterogenous, so, don't differentiate.
Some(SignerEvent::MinerMessages(..))
Some(SignerEvent::BlockValidationResponse(_))
| Some(SignerEvent::MinerMessages(..))
| Some(SignerEvent::NewBurnBlock(_))
| Some(SignerEvent::StatusCheck)
| None => None,
Expand Down Expand Up @@ -163,6 +163,16 @@ impl SignerTrait<SignerMessage> for Signer {
warn!("{self}: Received a command: {command:?}. V0 Signers do not support commands. Ignoring...")
}
}

fn has_pending_blocks(&self) -> bool {
self.signer_db
.has_pending_blocks(self.reward_cycle)
.unwrap_or_else(|e| {
error!("{self}: Failed to check for pending blocks: {e:?}",);
// Assume we have pending blocks to prevent premature cleanup
true
})
}
}

impl From<SignerConfig> for Signer {
Expand Down Expand Up @@ -381,7 +391,6 @@ impl Signer {
}
};
block_info.valid = Some(true);
// TODO: do not sign the block if it fails signer state checks (forks, etc.)
let signature = self
.private_key
.sign(&signer_signature_hash.0)
Expand Down
10 changes: 10 additions & 0 deletions stacks-signer/src/v1/signer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,16 @@ impl SignerTrait<SignerMessage> for Signer {
}
self.process_next_command(stacks_client, current_reward_cycle);
}

fn has_pending_blocks(&self) -> bool {
self.signer_db
.has_pending_blocks(self.reward_cycle)
.unwrap_or_else(|e| {
error!("{self}: Failed to check if there are pending blocks: {e:?}");
// Assume there are pending blocks to prevent premature cleanup
true
})
}
}

impl Signer {
Expand Down
24 changes: 17 additions & 7 deletions testnet/stacks-node/src/tests/signer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ use stacks_common::types::StacksEpochId;
use stacks_common::util::hash::{hex_bytes, Sha512Trunc256Sum};
use stacks_signer::client::{SignerSlotID, StacksClient};
use stacks_signer::config::{build_signer_config_tomls, GlobalConfig as SignerConfig, Network};
use stacks_signer::runloop::{SignerResult, State};
use stacks_signer::runloop::{SignerResult, StateInfo};
use stacks_signer::{Signer, SpawnedSigner};
use wsts::state_machine::PublicKeys;

Expand Down Expand Up @@ -100,7 +100,11 @@ pub struct SignerTest<S> {
}

impl<S: Signer<T> + Send + 'static, T: SignerEventTrait + 'static> SignerTest<SpawnedSigner<S, T>> {
fn new(num_signers: usize, initial_balances: Vec<(StacksAddress, u64)>) -> Self {
fn new(
num_signers: usize,
initial_balances: Vec<(StacksAddress, u64)>,
wait_on_signers: Option<Duration>,
) -> Self {
// Generate Signer Data
let signer_stacks_private_keys = (0..num_signers)
.map(|_| StacksPrivateKey::new())
Expand All @@ -118,7 +122,11 @@ impl<S: Signer<T> + Send + 'static, T: SignerEventTrait + 'static> SignerTest<Sp
// That's the kind of thing an idiot would have on his luggage!
let password = "12345";
naka_conf.connection_options.block_proposal_token = Some(password.to_string());
naka_conf.miner.wait_on_signers = Duration::from_secs(10);
if let Some(wait_on_signers) = wait_on_signers {
naka_conf.miner.wait_on_signers = wait_on_signers;
} else {
naka_conf.miner.wait_on_signers = Duration::from_secs(10);
}

let run_stamp = rand::random();

Expand Down Expand Up @@ -160,7 +168,8 @@ impl<S: Signer<T> + Send + 'static, T: SignerEventTrait + 'static> SignerTest<Sp
}
}

fn send_status_request(&self) {
/// Send a status request to each spawned signer
pub fn send_status_request(&self) {
for port in 3000..3000 + self.spawned_signers.len() {
let endpoint = format!("http://localhost:{}", port);
let path = format!("{endpoint}/status");
Expand All @@ -174,7 +183,7 @@ impl<S: Signer<T> + Send + 'static, T: SignerEventTrait + 'static> SignerTest<Sp
}

/// Wait for the signers to respond to a status check
fn wait_for_states(&mut self, timeout: Duration) -> Vec<State> {
pub fn wait_for_states(&mut self, timeout: Duration) -> Vec<StateInfo> {
debug!("Waiting for Status...");
let now = std::time::Instant::now();
let mut states = Vec::with_capacity(self.spawned_signers.len());
Expand All @@ -194,8 +203,8 @@ impl<S: Signer<T> + Send + 'static, T: SignerEventTrait + 'static> SignerTest<Sp
SignerResult::OperationResult(_operation) => {
panic!("Recieved an operation result.");
}
SignerResult::StatusCheck(state) => {
states.push(state);
SignerResult::StatusCheck(state_info) => {
states.push(state_info);
}
}
}
Expand Down Expand Up @@ -547,6 +556,7 @@ fn setup_stx_btc_node(
EventKeyType::StackerDBChunks,
EventKeyType::BlockProposal,
EventKeyType::MinedBlocks,
EventKeyType::BurnchainBlocks,
],
});

Expand Down
Loading

0 comments on commit 108bf0e

Please sign in to comment.