Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updated block importer to allow more blocks to be queue #2010

Merged
merged 55 commits into from
Jul 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
fea24a3
Extract `HistoricalView` trait from the `AtomicView`
xgreenx Jun 26, 2024
4c20273
Updated changelog
xgreenx Jun 26, 2024
5119dbe
Removed not related code to the chagne
xgreenx Jun 26, 2024
4b3aecb
Fixed tests
xgreenx Jun 26, 2024
b06c622
Fixed the column
xgreenx Jun 26, 2024
61f1059
Move all functionality from the `Database` to the views
xgreenx Jun 26, 2024
c5ec5bf
Moved `latest_height` to `HistoricalView` trait
xgreenx Jun 26, 2024
db90520
Merge branch 'refs/heads/feature/historical-view-trait' into feature/…
xgreenx Jun 26, 2024
2b9eadc
Improved readability
xgreenx Jun 26, 2024
c61477f
Merge branch 'refs/heads/master' into feature/iteratable-view
xgreenx Jun 26, 2024
c3d6f66
Merge branch 'master' into feature/iteratable-view
xgreenx Jun 26, 2024
29d543e
Updated CHANGELOG.md
xgreenx Jun 26, 2024
682d256
Simplify imports
xgreenx Jun 26, 2024
11d1d28
Make CI happy
xgreenx Jun 26, 2024
187ec91
Added the actual implementation for the `AtomicView::latest_view`
xgreenx Jun 26, 2024
ec86d61
Updated CHANGELOG.md
xgreenx Jun 26, 2024
22f6a63
Merge branch 'master' into feature/iteratable-view
xgreenx Jun 27, 2024
46d0eca
Merge branch 'master' into feature/iteratable-view
xgreenx Jun 27, 2024
19f7433
Merge branch 'feature/iteratable-view' into feature/atomic-latest-view
xgreenx Jun 27, 2024
f642a4a
Simplify signature of the constracutor
xgreenx Jun 27, 2024
c9f4ca2
Merge branch 'refs/heads/feature/iteratable-view' into feature/atomic…
xgreenx Jun 27, 2024
7ad0907
The actual implementation of the historical view for RocksDB
xgreenx Jun 27, 2024
58c698e
Renamed `IterableView` into `IterableKeyValueView`
xgreenx Jun 27, 2024
e06ca6e
Merge branch 'refs/heads/feature/atomic-latest-view' into feature/his…
xgreenx Jun 27, 2024
e6a02d6
Merged base branch
xgreenx Jun 27, 2024
d5b6535
Renamed `IterableView` to `IterableKeyValueView`
xgreenx Jun 27, 2024
0bf402b
Merge branch 'refs/heads/feature/iteratable-view' into feature/atomic…
xgreenx Jun 27, 2024
ffc6481
Merge branch 'refs/heads/feature/atomic-latest-view' into feature/his…
xgreenx Jun 27, 2024
97983c5
Merge branch 'refs/heads/master' into feature/atomic-latest-view
xgreenx Jun 28, 2024
ccc0a2c
Added more tets and rollback fucntionality
xgreenx Jun 28, 2024
8513901
Use explicit getter instead of deref
xgreenx Jun 28, 2024
f4e8bbf
Merge branch 'feature/atomic-latest-view' into feature/historical-vie…
xgreenx Jun 28, 2024
c733a48
Merge branch 'refs/heads/master' into feature/historical-view-impleme…
xgreenx Jun 28, 2024
eeb75ac
Added rollback command and e2e tests to verify the whole behaviour.
xgreenx Jun 29, 2024
c43006a
Updated CHANGELOG.md
xgreenx Jun 29, 2024
f06508c
Merge branch 'master' into feature/historical-view-implementation
xgreenx Jul 1, 2024
5bd9e44
Fixed the comment
xgreenx Jul 2, 2024
0020b2f
Merge branch 'master' into feature/historical-view-implementation
xgreenx Jul 2, 2024
af88cc9
Merge branch 'master' into feature/historical-view-implementation
xgreenx Jul 2, 2024
4d56031
Merge branch 'master' into feature/historical-view-implementation
xgreenx Jul 2, 2024
ba5dd8e
Updated block importer to wait for more than one block
xgreenx Jul 3, 2024
c5c5e39
Updated CHANGELOG.md
xgreenx Jul 3, 2024
c466c13
Merge branch 'master' into feature/historical-view-implementation
xgreenx Jul 3, 2024
f5fce1f
Merge branch 'feature/historical-view-implementation' into feature/im…
xgreenx Jul 3, 2024
ee634e7
Use better naming for variable=)
xgreenx Jul 3, 2024
642b93d
Apply suggestions form PR
xgreenx Jul 4, 2024
930e224
Merge branch 'feature/historical-view-implementation' into feature/im…
Dentosal Jul 4, 2024
2633cec
Merge branch 'master' into feature/historical-view-implementation
xgreenx Jul 4, 2024
562171c
Merge `history` and original databases into one
xgreenx Jul 4, 2024
608bf0a
Merge branch 'feature/historical-view-implementation' into feature/im…
xgreenx Jul 4, 2024
6885cb9
Return error instead of panic
xgreenx Jul 4, 2024
2abe7a3
Merge branch 'refs/heads/feature/historical-view-implementation' into…
xgreenx Jul 5, 2024
c7baf15
Merge branch 'refs/heads/master' into feature/import-more-blocks-in-p…
xgreenx Jul 5, 2024
7131cb8
Merge branch 'master' into feature/import-more-blocks-in-parallel-tha…
xgreenx Jul 5, 2024
b37ef88
Merge branch 'master' into feature/import-more-blocks-in-parallel-tha…
xgreenx Jul 5, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
- [#1948](https://github.com/FuelLabs/fuel-core/pull/1948): Add new `AlgorithmV1` and `AlgorithmUpdaterV1` for the gas price. Include tools for analysis

### Changed
- [#2010](https://github.com/FuelLabs/fuel-core/pull/2010): Updated the block importer to allow more blocks to be in the queue. It improves synchronization speed and mitigate the impact of other services on synchronization speed.
- [#2006](https://github.com/FuelLabs/fuel-core/pull/2006): Process block importer events first under P2P pressure.
- [#2002](https://github.com/FuelLabs/fuel-core/pull/2002): Adapted the block producer to react to checked transactions that were using another version of consensus parameters during validation in the TxPool. After an upgrade of the consensus parameters of the network, TxPool could store invalid `Checked` transactions. This change fixes that by tracking the version that was used to validate the transactions.
- [#1999](https://github.com/FuelLabs/fuel-core/pull/1999): Minimize the number of panics in the codebase.
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/services/importer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ derive_more = { workspace = true }
fuel-core-metrics = { workspace = true }
fuel-core-storage = { workspace = true }
fuel-core-types = { workspace = true }
parking_lot = { workspace = true }
tokio = { workspace = true, features = ["full"] }
tokio-rayon = { workspace = true }
tracing = { workspace = true }
Expand Down
2 changes: 0 additions & 2 deletions crates/services/importer/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
#[derive(Debug, Clone)]
pub struct Config {
pub max_block_notify_buffer: usize,
pub metrics: bool,
}

impl Config {
pub fn new() -> Self {
Self {
max_block_notify_buffer: 1 << 10,
metrics: false,
}
}
}
Expand Down
120 changes: 52 additions & 68 deletions crates/services/importer/src/importer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,13 @@ use fuel_core_types::{
Uncommitted,
},
};
use parking_lot::Mutex;
use std::{
ops::{
Deref,
DerefMut,
},
sync::{
Arc,
Mutex,
},
sync::Arc,
time::{
Instant,
SystemTime,
Expand All @@ -56,7 +54,8 @@ use std::{
};
use tokio::sync::{
broadcast,
oneshot,
OwnedSemaphorePermit,
Semaphore,
TryAcquireError,
};

Expand Down Expand Up @@ -97,6 +96,7 @@ pub enum Error {
#[from]
StorageError(StorageError),
UnsupportedConsensusVariant(String),
ActiveBlockResultsSemaphoreClosed(tokio::sync::AcquireError),
}

impl From<Error> for anyhow::Error {
Expand All @@ -118,11 +118,12 @@ pub struct Importer<D, E, V> {
verifier: Arc<V>,
chain_id: ChainId,
broadcast: broadcast::Sender<ImporterResult>,
/// The channel to notify about the end of the processing of the previous block by all listeners.
/// It is used to await until all receivers of the notification process the `SharedImportResult`
/// before starting committing a new block.
prev_block_process_result: Mutex<Option<oneshot::Receiver<()>>>,
guard: tokio::sync::Semaphore,
guard: Semaphore,
/// The semaphore tracks the number of unprocessed `SharedImportResult`.
/// If the number of unprocessed results is more than the threshold,
/// the block importer stops committing new blocks and waits for
/// the resolution of the previous one.
active_import_results: Arc<Semaphore>,
}

impl<D, E, V> Importer<D, E, V> {
Expand All @@ -133,16 +134,20 @@ impl<D, E, V> Importer<D, E, V> {
executor: E,
verifier: V,
) -> Self {
let (broadcast, _) = broadcast::channel(config.max_block_notify_buffer);
// We use semaphore as a back pressure mechanism instead of a `broadcast`
// channel because we want to prevent committing to the database results
// that will not be processed.
let max_block_notify_buffer = config.max_block_notify_buffer;
let (broadcast, _) = broadcast::channel(max_block_notify_buffer);

Self {
database: Mutex::new(database),
executor: Arc::new(executor),
verifier: Arc::new(verifier),
chain_id,
broadcast,
prev_block_process_result: Default::default(),
guard: tokio::sync::Semaphore::new(1),
active_import_results: Arc::new(Semaphore::new(max_block_notify_buffer)),
guard: Semaphore::new(1),
}
}

Expand Down Expand Up @@ -198,35 +203,31 @@ where
result: UncommittedResult<Changes>,
) -> Result<(), Error> {
let _guard = self.lock()?;
// It is safe to unwrap the channel because we have the `_guard`.
let previous_block_result = self
.prev_block_process_result
.lock()
.expect("poisoned")
.take();

// Await until all receivers of the notification process the result.
if let Some(channel) = previous_block_result {
const TIMEOUT: u64 = 20;
let result =
tokio::time::timeout(tokio::time::Duration::from_secs(TIMEOUT), channel)
.await;
const TIMEOUT: u64 = 20;
let await_result = tokio::time::timeout(
tokio::time::Duration::from_secs(TIMEOUT),
self.active_import_results.clone().acquire_owned(),
)
.await;

if result.is_err() {
tracing::error!(
"The previous block processing \
let Ok(permit) = await_result else {
tracing::error!(
"The previous block processing \
was not finished for {TIMEOUT} seconds."
);
return Err(Error::PreviousBlockProcessingNotFinished)
}
}
);
return Err(Error::PreviousBlockProcessingNotFinished)
};
let permit = permit.map_err(Error::ActiveBlockResultsSemaphoreClosed)?;

let mut guard = self
.database
.try_lock()
.expect("Semaphore prevents concurrent access to the database");
let database = guard.deref_mut();

self._commit_result(result, database)
self._commit_result(result, permit, database)
}

/// The method commits the result of the block execution and notifies about a new imported block.
Expand All @@ -242,6 +243,7 @@ where
fn _commit_result(
&self,
result: UncommittedResult<Changes>,
permit: OwnedSemaphorePermit,
database: &mut D,
) -> Result<(), Error> {
let (result, changes) = result.into();
Expand Down Expand Up @@ -327,16 +329,12 @@ where

tracing::info!("Committed block {:#x}", result.sealed_block.entity.id());

// The `tokio::sync::oneshot::Sender` is used to notify about the end
// of the processing of a new block by all listeners.
let (sender, receiver) = oneshot::channel();
let result = ImporterResult {
shared_result: Arc::new(Awaiter::new(result, sender)),
shared_result: Arc::new(Awaiter::new(result, permit)),
#[cfg(feature = "test-helpers")]
changes: Arc::new(changes_clone),
};
let _ = self.broadcast.send(result);
*self.prev_block_process_result.lock().expect("poisoned") = Some(receiver);

Ok(())
}
Expand Down Expand Up @@ -467,28 +465,22 @@ where

let result = result?;

// It is safe to unwrap the channel because we have the `_guard`.
let previous_block_result = self
.prev_block_process_result
.lock()
.expect("poisoned")
.take();

// Await until all receivers of the notification process the result.
if let Some(channel) = previous_block_result {
const TIMEOUT: u64 = 20;
let result =
tokio::time::timeout(tokio::time::Duration::from_secs(TIMEOUT), channel)
.await;
const TIMEOUT: u64 = 20;
let await_result = tokio::time::timeout(
tokio::time::Duration::from_secs(TIMEOUT),
self.active_import_results.clone().acquire_owned(),
)
.await;

if result.is_err() {
tracing::error!(
"The previous block processing \
let Ok(permit) = await_result else {
tracing::error!(
"The previous block processing \
was not finished for {TIMEOUT} seconds."
);
return Err(Error::PreviousBlockProcessingNotFinished)
}
}
);
return Err(Error::PreviousBlockProcessingNotFinished)
};
let permit = permit.map_err(Error::ActiveBlockResultsSemaphoreClosed)?;

let start = Instant::now();

Expand All @@ -497,7 +489,7 @@ where
.try_lock()
.expect("Semaphore prevents concurrent access to the database");
let database = guard.deref_mut();
let commit_result = self._commit_result(result, database);
let commit_result = self._commit_result(result, permit, database);
let commit_time = start.elapsed().as_secs_f64();
let time = execute_time + commit_time;
importer_metrics().execute_and_commit_duration.observe(time);
Expand All @@ -509,15 +501,7 @@ where
/// The wrapper around `ImportResult` to notify about the end of the processing of a new block.
struct Awaiter {
result: ImportResult,
release_channel: Option<oneshot::Sender<()>>,
}

impl Drop for Awaiter {
fn drop(&mut self) {
if let Some(release_channel) = core::mem::take(&mut self.release_channel) {
let _ = release_channel.send(());
}
}
_permit: OwnedSemaphorePermit,
}

impl Deref for Awaiter {
Expand All @@ -529,10 +513,10 @@ impl Deref for Awaiter {
}

impl Awaiter {
fn new(result: ImportResult, channel: oneshot::Sender<()>) -> Self {
fn new(result: ImportResult, permit: OwnedSemaphorePermit) -> Self {
Self {
result,
release_channel: Some(channel),
_permit: permit,
}
}
}
Loading