Skip to content

fix(starknet_l1_provider): catch_up_height from sync client #4967

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions config/sequencer/default_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -1149,11 +1149,16 @@
"privacy": "TemporaryValue",
"value": true
},
"l1_provider_config.bootstrap_catch_up_height": {
"description": "Height at which the provider should catch up to the bootstrapper.",
"l1_provider_config.bootstrap_catch_up_height_override": {
"description": "Override height at which the provider should catch up to the bootstrapper.",
"privacy": "Public",
"value": 0
},
"l1_provider_config.bootstrap_catch_up_height_override.#is_none": {
"description": "Flag for an optional field.",
"privacy": "TemporaryValue",
"value": true
},
"l1_provider_config.provider_startup_height_override": {
"description": "Override height at which the provider should start",
"privacy": "Public",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,8 @@
"l1_gas_price_scraper_config.polling_interval": 1,
"l1_gas_price_scraper_config.starting_block": 0,
"l1_gas_price_scraper_config.starting_block.#is_none": true,
"l1_provider_config.bootstrap_catch_up_height": 0,
"l1_provider_config.bootstrap_catch_up_height_override": 0,
"l1_provider_config.bootstrap_catch_up_height_override.#is_none": true,
"l1_provider_config.provider_startup_height_override": 1,
"l1_provider_config.provider_startup_height_override.#is_none": false,
"l1_provider_config.startup_sync_sleep_retry_interval": 0.0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,6 @@ fn modify_height_configs_idle_nodes(
// function accordingly.
config.consensus_manager_config.immediate_active_height = node_start_height;
config.consensus_manager_config.cende_config.skip_write_height = Some(node_start_height);
config.l1_provider_config.bootstrap_catch_up_height = node_start_height.prev().unwrap();
// TODO(Gilad): remove once we add support to updating the StarknetContract on Anvil.
// This will require mocking the required permissions in the contract that typically
// forbid one from updating the state through an API call.
Expand Down
55 changes: 50 additions & 5 deletions crates/starknet_l1_provider/src/bootstrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,19 @@ use starknet_api::block::BlockNumber;
use starknet_api::transaction::TransactionHash;
use starknet_l1_provider_types::SharedL1ProviderClient;
use starknet_state_sync_types::communication::SharedStateSyncClient;
use tracing::debug;
use tokio::sync::OnceCell;
use tracing::{debug, info};

pub type LazyCatchUpHeight = Arc<OnceCell<BlockNumber>>;

/// Cache's commits to be applied later. This flow is only relevant while the node is starting up.
#[derive(Clone)]
pub struct Bootstrapper {
pub catch_up_height: BlockNumber,
/// The catch-up height for the bootstrapper is the sync height (unless overridden explicitly).
/// This value, due to infra constraints as of now, is only fetchable _after_ the provider is
/// running, and not during its initialization, hence we are forced to lazily fetch it at
/// runtime.
pub catch_up_height: LazyCatchUpHeight,
pub sync_retry_interval: Duration,
pub commit_block_backlog: Vec<CommitBlockBacklog>,
pub l1_provider_client: SharedL1ProviderClient,
Expand All @@ -20,9 +27,27 @@ pub struct Bootstrapper {
}

impl Bootstrapper {
pub fn new(
l1_provider_client: SharedL1ProviderClient,
sync_client: SharedStateSyncClient,
sync_retry_interval: Duration,
catch_up_height: LazyCatchUpHeight,
) -> Self {
Self {
sync_retry_interval,
commit_block_backlog: Default::default(),
l1_provider_client,
sync_client,
sync_task_handle: SyncTaskHandle::NotStartedYet,
catch_up_height,
}
}

/// Check if the caller has caught up with the bootstrapper.
/// If catch_up_height is unset, the sync isn't even ready yet.
pub fn is_caught_up(&self, current_provider_height: BlockNumber) -> bool {
self.catch_up_height == current_provider_height
self.catch_up_height()
.is_some_and(|catch_up_height| current_provider_height >= catch_up_height)
// TODO(Gilad): add health_check here, making sure that the sync task isn't stuck, which is
// `handle dropped && backlog empty && not caught up`.
}
Expand Down Expand Up @@ -54,12 +79,16 @@ impl Bootstrapper {
self.l1_provider_client.clone(),
self.sync_client.clone(),
current_provider_height,
self.catch_up_height,
self.catch_up_height.clone(),
self.sync_retry_interval,
));

self.sync_task_handle = SyncTaskHandle::Started(sync_task_handle.into());
}

pub fn catch_up_height(&self) -> Option<BlockNumber> {
self.catch_up_height.get().copied()
}
}

impl PartialEq for Bootstrapper {
Expand All @@ -85,9 +114,25 @@ async fn l2_sync_task(
l1_provider_client: SharedL1ProviderClient,
sync_client: SharedStateSyncClient,
mut current_height: BlockNumber,
catch_up_height: BlockNumber,
catch_up_height: LazyCatchUpHeight,
retry_interval: Duration,
) {
// Currently infra doesn't support starting up the provider only after sync is ready.
while !catch_up_height.initialized() {
info!("Try fetching sync height to initialize catch up point");
let Some(sync_height) = sync_client
.get_latest_block_number()
.await
.expect("network error handling not supported yet")
else {
info!("Sync state not ready yet, trying again later");
tokio::time::sleep(retry_interval).await;
continue;
};
catch_up_height.set(sync_height).expect("This is the only write-point, cannot fail")
}
let catch_up_height = *catch_up_height.get().expect("Initialized above");

while current_height <= catch_up_height {
// TODO(Gilad): add tracing instrument.
debug!("Try syncing L1Provider with L2 height: {}", current_height);
Expand Down
38 changes: 27 additions & 11 deletions crates/starknet_l1_provider/src/l1_provider.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::cmp::Ordering::{Equal, Greater, Less};
use std::sync::Arc;

use starknet_api::block::BlockNumber;
use starknet_api::executable_transaction::L1HandlerTransaction;
Expand All @@ -15,7 +16,7 @@ use starknet_sequencer_infra::component_definitions::ComponentStarter;
use starknet_state_sync_types::communication::SharedStateSyncClient;
use tracing::{instrument, trace, warn};

use crate::bootstrapper::{Bootstrapper, SyncTaskHandle};
use crate::bootstrapper::Bootstrapper;
use crate::transaction_manager::TransactionManager;
use crate::{L1ProviderConfig, ProviderState};

Expand All @@ -27,6 +28,7 @@ pub mod l1_provider_tests;
// here is compatible with it.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct L1Provider {
/// Represents the L2 block height being built.
pub current_height: BlockNumber,
pub tx_manager: TransactionManager,
// TODO(Gilad): consider transitioning to a generic phantom state once the infra is stabilized
Expand Down Expand Up @@ -149,13 +151,13 @@ impl L1Provider {
let backlog = std::mem::take(&mut bootstrapper.commit_block_backlog);
assert!(
backlog.is_empty()
|| bootstrapper.catch_up_height == backlog.first().unwrap().height
|| self.current_height == backlog.first().unwrap().height
&& backlog
.windows(2)
.all(|height| height[1].height == height[0].height.unchecked_next()),
"Backlog must have sequential heights starting sequentially after \
catch_up_height: {}, backlog: {:?}",
bootstrapper.catch_up_height,
"Backlog must have sequential heights starting sequentially after current height: \
{}, backlog: {:?}",
self.current_height,
backlog.iter().map(|commit_block| commit_block.height).collect::<Vec<_>>()
);
for commit_block in backlog {
Expand Down Expand Up @@ -203,6 +205,9 @@ impl L1Provider {

impl ComponentStarter for L1Provider {}

/// Initializes L1Provider at specified height (≤ scraper's last state update height).
/// Bootstrap catch-up height defaults to current sync height.
#[instrument(skip(l1_provider_client, sync_client, config))]
pub fn create_l1_provider(
config: L1ProviderConfig,
l1_provider_client: SharedL1ProviderClient,
Expand All @@ -225,14 +230,25 @@ pub fn create_l1_provider(
})
.unwrap_or(scraper_synced_startup_height);

let bootstrapper = Bootstrapper {
catch_up_height: config.bootstrap_catch_up_height,
commit_block_backlog: Default::default(),
let catch_up_height = config
.bootstrap_catch_up_height_override
.map(|catch_up_height_override| {
warn!(
"Initializing L1Provider with OVERRIDDEN catch-up height: \
{catch_up_height_override}, this MUST be greater or equal to the default \
non-overridden value, which is the current sync height, or the sync will never \
complete!"
);
Arc::new(catch_up_height_override.into())
})
.unwrap_or_default();

let bootstrapper = Bootstrapper::new(
l1_provider_client,
sync_client,
sync_task_handle: SyncTaskHandle::NotStartedYet,
sync_retry_interval: config.startup_sync_sleep_retry_interval,
};
config.startup_sync_sleep_retry_interval,
catch_up_height,
);

L1Provider {
current_height: l1_provider_startup_height,
Expand Down
2 changes: 1 addition & 1 deletion crates/starknet_l1_provider/src/l1_provider_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ macro_rules! bootstrapper {
committed_txs: vec![$(tx_hash!($tx)),*]
}),*
].into_iter().collect(),
catch_up_height: BlockNumber($catch),
catch_up_height: Arc::new(BlockNumber($catch).into()),
l1_provider_client: Arc::new(FakeL1ProviderClient::default()),
sync_client: Arc::new(MockStateSyncClient::default()),
sync_task_handle: SyncTaskHandle::default(),
Expand Down
32 changes: 17 additions & 15 deletions crates/starknet_l1_provider/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,34 +95,36 @@ pub struct L1ProviderConfig {
/// **WARNING**: Take care when setting this value, it must be no higher than the
/// LastStateUpdate height at the L1 Height that the L1Scraper is initialized on.
pub provider_startup_height_override: Option<BlockNumber>,
pub bootstrap_catch_up_height: BlockNumber,
/// In most cases this can remain None: the provider defaults to using the sync height at
/// startup.
pub bootstrap_catch_up_height_override: Option<BlockNumber>,
#[serde(deserialize_with = "deserialize_float_seconds_to_duration")]
pub startup_sync_sleep_retry_interval: Duration,
}

impl SerializeConfig for L1ProviderConfig {
fn dump(&self) -> BTreeMap<ParamPath, SerializedParam> {
let mut dump = BTreeMap::from([
ser_param(
"bootstrap_catch_up_height",
&self.bootstrap_catch_up_height,
"Height at which the provider should catch up to the bootstrapper.",
ParamPrivacyInput::Public,
),
ser_param(
"startup_sync_sleep_retry_interval",
&self.startup_sync_sleep_retry_interval.as_secs_f64(),
"Interval in seconds between each retry of syncing with L2 during startup.",
ParamPrivacyInput::Public,
),
]);
let mut dump = BTreeMap::from([ser_param(
"startup_sync_sleep_retry_interval",
&self.startup_sync_sleep_retry_interval.as_secs_f64(),
"Interval in seconds between each retry of syncing with L2 during startup.",
ParamPrivacyInput::Public,
)]);

dump.extend(ser_optional_param(
&self.provider_startup_height_override,
Default::default(),
"provider_startup_height_override",
"Override height at which the provider should start",
ParamPrivacyInput::Public,
));
dump.extend(ser_optional_param(
&self.bootstrap_catch_up_height_override,
Default::default(),
"bootstrap_catch_up_height_override",
"Override height at which the provider should catch up to the bootstrapper.",
ParamPrivacyInput::Public,
));
dump
}
}
Expand Down
72 changes: 71 additions & 1 deletion crates/starknet_l1_provider/tests/startup_flows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ async fn bootstrap_e2e() {
sync_client.expect_get_block().returning(move |input| Ok(sync_response_clone.remove(&input)));

let config = L1ProviderConfig {
bootstrap_catch_up_height: catch_up_height,
bootstrap_catch_up_height_override: Some(catch_up_height),
startup_sync_sleep_retry_interval: Duration::from_millis(10),
..Default::default()
};
Expand Down Expand Up @@ -132,6 +132,76 @@ async fn bootstrap_e2e() {
assert!(!l1_provider.state.is_bootstrapping());
}

#[tokio::test]
async fn bootstrap_delayed_sync_state_with_trivial_catch_up() {
if !in_ci() {
return;
}
configure_tracing().await;

// Setup.

let l1_provider_client = Arc::new(FakeL1ProviderClient::default());
let startup_height = BlockNumber(2);

let mut sync_client = MockStateSyncClient::default();
// Mock sync response for an arbitrary number of calls to get_latest_block_number.
// Later in the test we modify it to become something else.
let sync_height_response = Arc::new(Mutex::new(None));
let sync_response_clone = sync_height_response.clone();
sync_client
.expect_get_latest_block_number()
.returning(move || Ok(*sync_response_clone.lock().unwrap()));

let config = L1ProviderConfig {
startup_sync_sleep_retry_interval: Duration::from_millis(10),
..Default::default()
};
let mut l1_provider = create_l1_provider(
config,
l1_provider_client.clone(),
Arc::new(sync_client),
startup_height,
);

// Test.

// Start the sync sequence, should busy-wait until the sync height is sent.
let scraped_l1_handler_txs = []; // No txs to scrape in this test.
l1_provider.initialize(scraped_l1_handler_txs.into()).await.unwrap();

// **Commit** a few blocks. The height starts from the provider's current height, since this
// is a trivial catchup scenario (nothing to catch up).
// This checks that the trivial catch_up_height doesn't mess up this flow.
let no_txs_committed = []; // Not testing txs in this test.
l1_provider.commit_block(&no_txs_committed, startup_height).unwrap();
tokio::time::sleep(config.startup_sync_sleep_retry_interval).await;
l1_provider.commit_block(&no_txs_committed, startup_height.unchecked_next()).unwrap();
tokio::time::sleep(config.startup_sync_sleep_retry_interval).await;
// Commit blocks should have been applied.
let start_height_plus_2 = startup_height.unchecked_next().unchecked_next();
assert_eq!(l1_provider.current_height, start_height_plus_2);
// Should still be bootstrapping, since catchup height isn't determined yet.
// Technically we could end bootstrapping at this point, but its simpler to let it
// terminate gracefully once the the sync is ready.
assert!(l1_provider.state.is_bootstrapping());

*sync_height_response.lock().unwrap() = Some(BlockNumber(2));

// Let the sync task continue, it should short circuit.
tokio::time::sleep(config.startup_sync_sleep_retry_interval).await;
// Assert height is unchanged from last time, no commit block was called from the sync task.
assert_eq!(l1_provider.current_height, start_height_plus_2);
// Finally, commit a new block to trigger the bootstrapping check, should switch to steady
// state.
l1_provider.commit_block(&no_txs_committed, start_height_plus_2).unwrap();
assert_eq!(l1_provider.current_height, start_height_plus_2.unchecked_next());
// Should still be bootstrapping, since catchup height isn't determined yet.
// Technically we could end bootstrapping at this point, but its simpler to let it
// terminate gracefully once the the sync is ready.
assert!(!l1_provider.state.is_bootstrapping());
}

#[test]
#[ignore = "similar to backlog_happy_flow, only shorter, and sprinkle some start_block/get_txs \
attempts while its bootstrapping (and assert failure on height), then assert that they \
Expand Down
1 change: 1 addition & 0 deletions crates/starknet_sequencer_node/src/components.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ pub async fn create_node_components(
height that is at least as old as L2 genesis, or override provider startup \
height (read its docstring before using)",
);

Some(create_l1_provider(
config.l1_provider_config,
clients.get_l1_provider_shared_client().unwrap(),
Expand Down
2 changes: 1 addition & 1 deletion crates/starknet_state_sync_types/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use starknet_api::core::{ClassHash, ContractAddress};
use starknet_api::StarknetApiError;
use thiserror::Error;

#[derive(Debug, Error, Serialize, Deserialize, Clone, PartialEq)]
#[derive(Debug, Error, Serialize, Deserialize, Clone, PartialEq, Eq)]
pub enum StateSyncError {
#[error("Communication error between StateSync and StateSyncRunner")]
RunnerCommunicationError,
Expand Down
3 changes: 1 addition & 2 deletions scripts/run_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@
# List of sequencer node integration test binary names. Stored as a list to maintain order.
SEQUENCER_INTEGRATION_TEST_NAMES: List[str] = [
"integration_test_positive_flow",
# TODO(Shahak/Noam.s): enable these when required
# "integration_test_restart_flow",
"integration_test_restart_flow",
"integration_test_revert_flow",
"integration_test_central_and_p2p_sync_flow",
]
Expand Down
Loading