Skip to content

Commit 0214517

Browse files
committed
fix(bft/sync): pause bft block advancement during sync
1 parent 61b3357 commit 0214517

File tree

14 files changed

+481
-368
lines changed

14 files changed

+481
-368
lines changed

Cargo.lock

Lines changed: 74 additions & 72 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ default-features = false
4848
[workspace.dependencies.snarkvm]
4949
#path = "../snarkVM"
5050
git = "https://github.com/ProvableHQ/snarkVM.git"
51-
rev = "d8e736636dd"
51+
rev = "cf10c9df8"
5252
#version = "=4.4.0"
5353
default-features = false
5454

build.rs

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -98,21 +98,22 @@ fn check_locktick_imports<P: AsRef<Path>>(path: P) {
9898

9999
// Modify the lock balance based on the type of the relevant import.
100100
if [ImportOfInterest::ParkingLot, ImportOfInterest::Tokio].contains(&ioi) {
101-
if line.contains("Mutex") {
102-
lock_balance += 1;
103-
}
104-
if line.contains("RwLock") {
105-
lock_balance += 1;
101+
lock_balance += line.matches("Mutex").count() as i8;
102+
lock_balance += line.matches("RwLock").count() as i8;
103+
104+
// Correct for `tokio::sync::MutexGuard` not having a locktick counterpart.
105+
if line.contains("MutexGuard") {
106+
lock_balance -= 1;
106107
}
108+
109+
// Correction for `use tokio::Mutex as TMutex` and `use tokio::MutexGuard as TMutexGuard`.
110+
lock_balance -= line.matches("TMutex").count() as i8;
107111
} else if ioi == ImportOfInterest::Locktick {
108112
// Use `matches` instead of just `contains` here, as more than a single
109113
// lock type entry is possible in a locktick import.
110-
for _hit in line.matches("Mutex") {
111-
lock_balance -= 1;
112-
}
113-
for _hit in line.matches("RwLock") {
114-
lock_balance -= 1;
115-
}
114+
lock_balance -= line.matches("Mutex").count() as i8;
115+
lock_balance -= line.matches("RwLock").count() as i8;
116+
116117
// A correction in case of the `use tokio::Mutex as TMutex` convention.
117118
if line.contains("TMutex") {
118119
lock_balance += 1;

node/bft/ledger-service/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,10 @@ workspace = true
7878
features = [ "macros", "rt-multi-thread" ]
7979
optional = true
8080

81+
[dependencies.thiserror]
82+
workspace = true
83+
8184
[dependencies.tracing]
8285
workspace = true
8386
optional = true
87+
features = [ "log" ]

node/bft/ledger-service/src/ledger.rs

Lines changed: 84 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,14 @@
1313
// See the License for the specific language governing permissions and
1414
// limitations under the License.
1515

16-
use crate::{LedgerService, fmt_id, spawn_blocking};
16+
use crate::{BeginLedgerUpdateError, LedgerService, LedgerUpdateService, fmt_id, spawn_blocking};
1717

1818
use snarkos_utilities::Stoppable;
1919

2020
use snarkvm::{
2121
ledger::{
2222
Block,
23+
CheckBlockError,
2324
Ledger,
2425
PendingBlock,
2526
Transaction,
@@ -48,9 +49,13 @@ use snarkvm::{
4849
use anyhow::ensure;
4950
use indexmap::IndexMap;
5051
#[cfg(feature = "locktick")]
51-
use locktick::parking_lot::RwLock;
52+
use locktick::{
53+
LockGuard,
54+
parking_lot::{Mutex, RwLock},
55+
};
56+
use parking_lot::MutexGuard;
5257
#[cfg(not(feature = "locktick"))]
53-
use parking_lot::RwLock;
58+
use parking_lot::{Mutex, RwLock};
5459
#[cfg(not(feature = "serial"))]
5560
use rayon::prelude::*;
5661

@@ -62,12 +67,73 @@ pub struct CoreLedgerService<N: Network, C: ConsensusStorage<N>> {
6267
ledger: Ledger<N, C>,
6368
latest_leader: Arc<RwLock<Option<(u64, Address<N>)>>>,
6469
stoppable: Arc<dyn Stoppable>,
70+
update_lock: Arc<Mutex<()>>,
71+
}
72+
73+
/// A transactional update to the ledger.
74+
#[cfg(feature = "ledger-write")]
75+
pub struct LedgerUpdate<'a, N: Network, C: ConsensusStorage<N>> {
76+
ledger: Ledger<N, C>,
77+
#[cfg(feature = "locktick")]
78+
_lock: LockGuard<MutexGuard<'a, ()>>,
79+
#[cfg(not(feature = "locktick"))]
80+
_lock: MutexGuard<'a, ()>,
81+
}
82+
83+
#[cfg(feature = "ledger-write")]
84+
impl<'a, N: Network, C: ConsensusStorage<N>> LedgerUpdateService<N> for LedgerUpdate<'a, N, C> {
85+
fn check_block_subdag(
86+
&self,
87+
block: Block<N>,
88+
prefix: &[PendingBlock<N>],
89+
) -> Result<PendingBlock<N>, CheckBlockError<N>> {
90+
self.ledger.check_block_subdag(block, prefix)
91+
}
92+
93+
fn check_block_content(&self, block: PendingBlock<N>) -> Result<Block<N>, CheckBlockError<N>> {
94+
self.ledger.check_block_content(block, &mut rand::thread_rng())
95+
}
96+
97+
/// Checks the given block is valid next block.
98+
fn check_next_block(&self, block: &Block<N>) -> Result<()> {
99+
self.ledger.check_next_block(block, &mut rand::thread_rng())
100+
}
101+
102+
/// Returns a candidate for the next block in the ledger, using a committed subdag and its transmissions.
103+
fn prepare_advance_to_next_quorum_block(
104+
&self,
105+
subdag: Subdag<N>,
106+
transmissions: IndexMap<TransmissionID<N>, Transmission<N>>,
107+
) -> Result<Block<N>> {
108+
self.ledger.prepare_advance_to_next_quorum_block(subdag, transmissions, &mut rand::thread_rng())
109+
}
110+
111+
/// Adds the given block as the next block in the ledger.
112+
fn advance_to_next_block(&self, block: &Block<N>) -> Result<()> {
113+
// Advance to the next block.
114+
self.ledger.advance_to_next_block(block)?;
115+
// Update BFT metrics.
116+
#[cfg(feature = "metrics")]
117+
{
118+
let num_sol = block.solutions().len();
119+
let num_tx = block.transactions().len();
120+
121+
metrics::gauge(metrics::bft::HEIGHT, block.height() as f64);
122+
metrics::gauge(metrics::bft::LAST_COMMITTED_ROUND, block.round() as f64);
123+
metrics::increment_gauge(metrics::blocks::SOLUTIONS, num_sol as f64);
124+
metrics::increment_gauge(metrics::blocks::TRANSACTIONS, num_tx as f64);
125+
metrics::update_block_metrics(block);
126+
}
127+
128+
tracing::info!("\n\nAdvanced to block {} at round {} - {}\n", block.height(), block.round(), block.hash());
129+
Ok(())
130+
}
65131
}
66132

67133
impl<N: Network, C: ConsensusStorage<N>> CoreLedgerService<N, C> {
68134
/// Initializes a new core ledger service.
69135
pub fn new(ledger: Ledger<N, C>, stoppable: Arc<dyn Stoppable>) -> Self {
70-
Self { ledger, latest_leader: Default::default(), stoppable }
136+
Self { ledger, latest_leader: Default::default(), stoppable, update_lock: Default::default() }
71137
}
72138
}
73139

@@ -338,53 +404,26 @@ impl<N: Network, C: ConsensusStorage<N>> LedgerService<N> for CoreLedgerService<
338404
spawn_blocking!(ledger.check_transaction_basic(&transaction, None, &mut rand::thread_rng()))
339405
}
340406

341-
fn check_block_subdag(&self, block: Block<N>, prefix: &[PendingBlock<N>]) -> Result<PendingBlock<N>> {
342-
self.ledger.check_block_subdag(block, prefix)
343-
}
344-
345-
fn check_block_content(&self, block: PendingBlock<N>) -> Result<Block<N>> {
346-
self.ledger.check_block_content(block, &mut rand::thread_rng())
347-
}
348-
349-
/// Checks the given block is valid next block.
350-
fn check_next_block(&self, block: &Block<N>) -> Result<()> {
351-
self.ledger.check_next_block(block, &mut rand::thread_rng())
352-
}
353-
354-
/// Returns a candidate for the next block in the ledger, using a committed subdag and its transmissions.
355-
#[cfg(feature = "ledger-write")]
356-
fn prepare_advance_to_next_quorum_block(
407+
fn check_block_subdag(
357408
&self,
358-
subdag: Subdag<N>,
359-
transmissions: IndexMap<TransmissionID<N>, Transmission<N>>,
360-
) -> Result<Block<N>> {
361-
self.ledger.prepare_advance_to_next_quorum_block(subdag, transmissions, &mut rand::thread_rng())
409+
block: Block<N>,
410+
prefix: &[PendingBlock<N>],
411+
) -> Result<PendingBlock<N>, CheckBlockError<N>> {
412+
self.ledger.check_block_subdag(block, prefix)
362413
}
363414

364-
/// Adds the given block as the next block in the ledger.
365-
#[cfg(feature = "ledger-write")]
366-
fn advance_to_next_block(&self, block: &Block<N>) -> Result<()> {
367-
// If the Ctrl-C handler registered the signal, then skip advancing to the next block.
415+
/// Begins a ledger update.i
416+
///
417+
/// # Returns
418+
/// - `Ok(Some(LedgerUpdate<N, C>))` if the ledger update was successfully started.
419+
/// - `Ok(None)` if the node is stopped.
420+
/// - `Err(anyhow::Error)` if we failed to acquire the update lock.
421+
fn begin_ledger_update<'a>(&'a self) -> Result<Box<dyn LedgerUpdateService<N> + 'a>, BeginLedgerUpdateError> {
368422
if self.stoppable.is_stopped() {
369-
bail!("Skipping advancing to block {} - The node is shutting down", block.height());
370-
}
371-
// Advance to the next block.
372-
self.ledger.advance_to_next_block(block)?;
373-
// Update BFT metrics.
374-
#[cfg(feature = "metrics")]
375-
{
376-
let num_sol = block.solutions().len();
377-
let num_tx = block.transactions().len();
378-
379-
metrics::gauge(metrics::bft::HEIGHT, block.height() as f64);
380-
metrics::gauge(metrics::bft::LAST_COMMITTED_ROUND, block.round() as f64);
381-
metrics::increment_gauge(metrics::blocks::SOLUTIONS, num_sol as f64);
382-
metrics::increment_gauge(metrics::blocks::TRANSACTIONS, num_tx as f64);
383-
metrics::update_block_metrics(block);
423+
return Err(BeginLedgerUpdateError::ShuttingDown);
384424
}
385425

386-
tracing::info!("\n\nAdvanced to block {} at round {} - {}\n", block.height(), block.round(), block.hash());
387-
Ok(())
426+
Ok(Box::new(LedgerUpdate { ledger: self.ledger.clone(), _lock: self.update_lock.lock() }))
388427
}
389428

390429
/// Returns the spend for a transaction in microcredits.

node/bft/ledger-service/src/mock.rs

Lines changed: 11 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -13,20 +13,20 @@
1313
// See the License for the specific language governing permissions and
1414
// limitations under the License.
1515

16-
use crate::{LedgerService, fmt_id};
16+
use crate::{BeginLedgerUpdateError, LedgerService, LedgerUpdateService, fmt_id};
1717
use snarkvm::{
1818
ledger::{
1919
Block,
20+
CheckBlockError,
2021
PendingBlock,
2122
Transaction,
2223
committee::Committee,
23-
narwhal::{BatchCertificate, Data, Subdag, Transmission, TransmissionID},
24+
narwhal::{BatchCertificate, Data, Transmission, TransmissionID},
2425
puzzle::{Solution, SolutionID},
2526
},
26-
prelude::{Address, ConsensusVersion, Field, Network, Result, Zero, bail, consensus_config_value, ensure},
27+
prelude::{Address, ConsensusVersion, Field, Network, Result, Zero, bail, consensus_config_value},
2728
};
2829

29-
use indexmap::IndexMap;
3030
#[cfg(feature = "locktick")]
3131
use locktick::parking_lot::Mutex;
3232
#[cfg(not(feature = "locktick"))]
@@ -213,42 +213,19 @@ impl<N: Network> LedgerService<N> for MockLedgerService<N> {
213213
Ok(())
214214
}
215215

216-
fn check_block_subdag(&self, _block: Block<N>, _prefix: &[PendingBlock<N>]) -> Result<PendingBlock<N>> {
216+
fn check_block_subdag(
217+
&self,
218+
_block: Block<N>,
219+
_prefix: &[PendingBlock<N>],
220+
) -> Result<PendingBlock<N>, CheckBlockError<N>> {
217221
unimplemented!();
218222
}
219223

220-
fn check_block_content(&self, _block: PendingBlock<N>) -> Result<Block<N>> {
224+
/// Begins a ledger update.
225+
fn begin_ledger_update<'a>(&'a self) -> Result<Box<dyn LedgerUpdateService<N> + 'a>, BeginLedgerUpdateError> {
221226
unimplemented!();
222227
}
223228

224-
/// Checks the given block is valid next block.
225-
fn check_next_block(&self, _block: &Block<N>) -> Result<()> {
226-
Ok(())
227-
}
228-
229-
/// Returns a candidate for the next block in the ledger, using a committed subdag and its transmissions.
230-
#[cfg(feature = "ledger-write")]
231-
fn prepare_advance_to_next_quorum_block(
232-
&self,
233-
_subdag: Subdag<N>,
234-
_transmissions: IndexMap<TransmissionID<N>, Transmission<N>>,
235-
) -> Result<Block<N>> {
236-
unreachable!("MockLedgerService does not support prepare_advance_to_next_quorum_block")
237-
}
238-
239-
/// Adds the given block as the next block in the ledger.
240-
#[cfg(feature = "ledger-write")]
241-
fn advance_to_next_block(&self, block: &Block<N>) -> Result<()> {
242-
ensure!(
243-
block.height() == self.latest_block_height() + 1,
244-
"Tried to advance to block {} from block {}",
245-
block.height(),
246-
self.latest_block_height()
247-
);
248-
self.height_to_round_and_hash.lock().insert(block.height(), (block.round(), block.hash()));
249-
Ok(())
250-
}
251-
252229
/// Returns the spent cost for a transaction in microcredits.
253230
fn transaction_spend_in_microcredits(
254231
&self,

node/bft/ledger-service/src/prover.rs

Lines changed: 11 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -13,24 +13,24 @@
1313
// See the License for the specific language governing permissions and
1414
// limitations under the License.
1515

16-
use crate::LedgerService;
16+
use crate::{BeginLedgerUpdateError, LedgerService, LedgerUpdateService};
1717
use snarkvm::{
1818
ledger::{
1919
Block,
20+
CheckBlockError,
2021
PendingBlock,
2122
Transaction,
2223
committee::Committee,
23-
narwhal::{BatchCertificate, Data, Subdag, Transmission, TransmissionID},
24+
narwhal::{BatchCertificate, Data, Transmission, TransmissionID},
2425
puzzle::{Solution, SolutionID},
2526
},
2627
prelude::{Address, ConsensusVersion, Field, Network, Result, Zero, bail},
2728
};
2829

29-
use indexmap::IndexMap;
3030
use std::ops::Range;
3131

3232
/// A ledger service for a prover.
33-
#[derive(Clone, Debug, Default)]
33+
#[derive(Clone, Default, Debug)]
3434
pub struct ProverLedgerService<N: Network> {
3535
_network: std::marker::PhantomData<N>,
3636
}
@@ -168,33 +168,18 @@ impl<N: Network> LedgerService<N> for ProverLedgerService<N> {
168168
Ok(())
169169
}
170170

171-
fn check_block_subdag(&self, _block: Block<N>, _prefix: &[PendingBlock<N>]) -> Result<PendingBlock<N>> {
172-
bail!("Cannot check block subDAG in prover")
173-
}
174-
175-
fn check_block_content(&self, _pending_block: PendingBlock<N>) -> Result<Block<N>> {
176-
bail!("Cannot check block content in prover")
177-
}
178-
179-
/// Checks the given block is valid next block.
180-
fn check_next_block(&self, _block: &Block<N>) -> Result<()> {
181-
Ok(())
182-
}
183-
184-
/// Returns a candidate for the next block in the ledger, using a committed subdag and its transmissions.
185-
#[cfg(feature = "ledger-write")]
186-
fn prepare_advance_to_next_quorum_block(
171+
fn check_block_subdag(
187172
&self,
188-
_subdag: Subdag<N>,
189-
_transmissions: IndexMap<TransmissionID<N>, Transmission<N>>,
190-
) -> Result<Block<N>> {
191-
bail!("Cannot prepare advance to next quorum block in prover")
173+
_block: Block<N>,
174+
_prefix: &[PendingBlock<N>],
175+
) -> Result<PendingBlock<N>, CheckBlockError<N>> {
176+
unimplemented!()
192177
}
193178

194179
/// Adds the given block as the next block in the ledger.
195180
#[cfg(feature = "ledger-write")]
196-
fn advance_to_next_block(&self, block: &Block<N>) -> Result<()> {
197-
bail!("Cannot advance to next block in prover - {block}")
181+
fn begin_ledger_update<'a>(&'a self) -> Result<Box<dyn LedgerUpdateService<N> + 'a>, BeginLedgerUpdateError> {
182+
Err(BeginLedgerUpdateError::NotSupportedForProver)
198183
}
199184

200185
/// Returns the spent cost for a transaction in microcredits.

0 commit comments

Comments
 (0)