Skip to content

chore(starknet_l1_provider): add lazy sync height test #5188

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion crates/starknet_l1_provider/src/bootstrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,11 @@ async fn l2_sync_task(
.await
.expect("network error handling not supported yet")
else {
info!("Sync state not ready yet, trying again later");
debug!("Sync state not ready yet, trying again later");
tokio::time::sleep(retry_interval).await;
continue;
};
info!("Catch up height set: {sync_height}");
catch_up_height.set(sync_height).expect("This is the only write-point, cannot fail")
}
let catch_up_height = *catch_up_height.get().expect("Initialized above");
Expand Down
12 changes: 12 additions & 0 deletions crates/starknet_l1_provider/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,8 @@ impl TransactionManagerContentBuilder {
}
}

/// A fake L1 provider client that buffers all received messages, allow asserting the order in which
/// they were received, and forward them to the l1 provider (flush the messages).
#[derive(Default)]
pub struct FakeL1ProviderClient {
// Interior mutability needed since this is modifying during client API calls, which are all
Expand All @@ -188,6 +190,16 @@ pub struct FakeL1ProviderClient {
}

impl FakeL1ProviderClient {
/// Apply all messages received to the l1 provider.
pub async fn flush_messages(&self, l1_provider: &mut L1Provider) {
let commit_blocks = self.commit_blocks_received.lock().unwrap().drain(..).collect_vec();
for CommitBlockBacklog { height, committed_txs } in commit_blocks {
l1_provider.commit_block(&committed_txs, height).unwrap();
}

// TODO(gilad): flush other buffers if necessary.
}

#[track_caller]
pub fn assert_add_events_received_with(&self, expected: &[Event]) {
let events_received = mem::take(&mut *self.events_received.lock().unwrap());
Expand Down
106 changes: 93 additions & 13 deletions crates/starknet_l1_provider/tests/startup_flows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use std::time::Duration;
use itertools::Itertools;
use mempool_test_utils::in_ci;
use starknet_api::block::BlockNumber;
use starknet_l1_provider::bootstrapper::CommitBlockBacklog;
use starknet_l1_provider::l1_provider::create_l1_provider;
use starknet_l1_provider::test_utils::FakeL1ProviderClient;
use starknet_l1_provider::L1ProviderConfig;
Expand All @@ -14,6 +13,10 @@ use starknet_sequencer_infra::trace_util::configure_tracing;
use starknet_state_sync_types::communication::MockStateSyncClient;
use starknet_state_sync_types::state_sync_types::SyncBlock;

const fn height_add(block_number: BlockNumber, k: u64) -> BlockNumber {
BlockNumber(block_number.0 + k)
}

// TODO(Gilad): figure out how To setup anvil on a specific L1 block (through genesis.json?) and
// with a specified L2 block logged to L1 (hopefully without having to use real backup).
/// This test simulates a bootstrapping flow, in which 3 blocks are synced from L2, during which two
Expand Down Expand Up @@ -71,14 +74,14 @@ async fn bootstrap_e2e() {
l1_provider_client.commit_block(no_txs_committed.clone(), catch_up_height).await.unwrap();
tokio::time::sleep(config.startup_sync_sleep_retry_interval).await;
l1_provider_client
.commit_block(no_txs_committed, catch_up_height.unchecked_next())
.commit_block(no_txs_committed, height_add(catch_up_height, 1))
.await
.unwrap();
tokio::time::sleep(config.startup_sync_sleep_retry_interval).await;

// Feed sync task the remaining blocks, will be received after the commits above.
sync_response.lock().unwrap().insert(BlockNumber(startup_height.0 + 1), SyncBlock::default());
sync_response.lock().unwrap().insert(BlockNumber(startup_height.0 + 2), SyncBlock::default());
sync_response.lock().unwrap().insert(height_add(startup_height, 1), SyncBlock::default());
sync_response.lock().unwrap().insert(height_add(startup_height, 2), SyncBlock::default());
tokio::time::sleep(2 * config.startup_sync_sleep_retry_interval).await;

// Assert that initializer task has received the stubbed responses from the sync client and sent
Expand Down Expand Up @@ -177,19 +180,15 @@ async fn bootstrap_delayed_sync_state_with_trivial_catch_up() {
let no_txs_committed = []; // Not testing txs in this test.
l1_provider_client.commit_block(no_txs_committed.to_vec(), startup_height).await.unwrap();
l1_provider_client
.commit_block(no_txs_committed.to_vec(), startup_height.unchecked_next())
.commit_block(no_txs_committed.to_vec(), height_add(startup_height, 1))
.await
.unwrap();

// Flush txs from the fake client to the provider, acts like the `recv()` in a channel.
let commit_blocks =
l1_provider_client.commit_blocks_received.lock().unwrap().drain(..).collect_vec();
for CommitBlockBacklog { height, committed_txs } in commit_blocks {
l1_provider.commit_block(&committed_txs, height).unwrap();
}
// Forward all messages buffered in the client to the provider.
l1_provider_client.flush_messages(&mut l1_provider).await;

// Commit blocks should have been applied.
let start_height_plus_2 = startup_height.unchecked_next().unchecked_next();
let start_height_plus_2 = height_add(startup_height, 2);
assert_eq!(l1_provider.current_height, start_height_plus_2);
// Should still be bootstrapping, since catchup height isn't determined yet.
// Technically we could end bootstrapping at this point, but its simpler to let it
Expand All @@ -205,11 +204,92 @@ async fn bootstrap_delayed_sync_state_with_trivial_catch_up() {
// Finally, commit a new block to trigger the bootstrapping check, should switch to steady
// state.
l1_provider.commit_block(&no_txs_committed, start_height_plus_2).unwrap();
assert_eq!(l1_provider.current_height, start_height_plus_2.unchecked_next());
assert_eq!(l1_provider.current_height, height_add(start_height_plus_2, 1));
// The new commit block triggered the catch-up check, which ended the bootstrapping phase.
assert!(!l1_provider.state.is_bootstrapping());
}

#[tokio::test]
async fn bootstrap_delayed_sync_state_with_sync_behind_batcher() {
if !in_ci() {
return;
}
configure_tracing().await;

// Setup.

let l1_provider_client = Arc::new(FakeL1ProviderClient::default());
let startup_height = BlockNumber(1);
let sync_height = BlockNumber(3);

let mut sync_client = MockStateSyncClient::default();
// Mock sync response for an arbitrary number of calls to get_latest_block_number.
// Later in the test we modify it to become something else.
let sync_height_response = Arc::new(Mutex::new(None));
let sync_response_clone = sync_height_response.clone();
sync_client
.expect_get_latest_block_number()
.returning(move || Ok(*sync_response_clone.lock().unwrap()));
sync_client.expect_get_block().returning(|_| Ok(Some(SyncBlock::default())));

let config = L1ProviderConfig {
startup_sync_sleep_retry_interval: Duration::from_millis(10),
..Default::default()
};
let mut l1_provider = create_l1_provider(
config,
l1_provider_client.clone(),
Arc::new(sync_client),
startup_height,
);

// Test.

// Start the sync sequence, should busy-wait until the sync height is sent.
let scraped_l1_handler_txs = []; // No txs to scrape in this test.
l1_provider.initialize(scraped_l1_handler_txs.into()).await.unwrap();

// **Commit** a few blocks. These should get backlogged since they are post-sync-height.
// Sleeps are sprinkled in to give the async task a couple shots at attempting to get the sync
// height (see DEBUG log).
let no_txs_committed = []; // Not testing txs in this test.
l1_provider_client
.commit_block(no_txs_committed.to_vec(), sync_height.unchecked_next())
.await
.unwrap();
tokio::time::sleep(config.startup_sync_sleep_retry_interval).await;
l1_provider_client
.commit_block(no_txs_committed.to_vec(), sync_height.unchecked_next().unchecked_next())
.await
.unwrap();

// Forward all messages buffered in the client to the provider.
l1_provider_client.flush_messages(&mut l1_provider).await;
tokio::time::sleep(config.startup_sync_sleep_retry_interval).await;

// Assert commit blocks are backlogged (didn't affect start height).
assert_eq!(l1_provider.current_height, startup_height);
// Should still be bootstrapping, since catchup height isn't determined yet.
assert!(l1_provider.state.is_bootstrapping());

// Simulate the state sync service finally being ready, and give the async task enough time to
// pick this up and sync up the provider.
*sync_height_response.lock().unwrap() = Some(sync_height);
tokio::time::sleep(config.startup_sync_sleep_retry_interval).await;
// Forward all messages buffered in the client to the provider.
l1_provider_client.flush_messages(&mut l1_provider).await;

// Two things happened here: the async task sent 2 commit blocks it got from the sync_client,
// which bumped the provider height to sync_height+1, then the backlog was applied which bumped
// it twice again.
assert_eq!(
l1_provider.current_height,
sync_height.unchecked_next().unchecked_next().unchecked_next()
);
// Sync height was reached, bootstrapping was completed.
assert!(!l1_provider.state.is_bootstrapping());
}

#[test]
#[ignore = "similar to backlog_happy_flow, only shorter, and sprinkle some start_block/get_txs \
attempts while its bootstrapping (and assert failure on height), then assert that they \
Expand Down
Loading