Skip to content

Commit

Permalink
fix(snapshots_creator): Fix snapshot generation query (#1289)
Browse files Browse the repository at this point in the history
## What ❔

- Fixes the DB query returning query chunks, so that it doesn't
incorrectly include storage logs that would be initially written to in
future L1 batches.
- Adds basic sanity checks to Postgres recovery.

## Why ❔

These extra logs lead to incorrect tree recovery.

## Checklist

- [x] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [x] Tests for the changes have been added / updated.
- [x] Documentation comments have been added / updated.
- [x] Code has been formatted via `zk fmt` and `zk lint`.
- [x] Spellcheck has been run via `zk spellcheck`.
- [x] Linkcheck has been run via `zk linkcheck`.
  • Loading branch information
slowli authored Feb 29, 2024
1 parent eaf5a50 commit e279456
Show file tree
Hide file tree
Showing 7 changed files with 310 additions and 14 deletions.
2 changes: 1 addition & 1 deletion core/bin/snapshots_creator/src/creator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ impl SnapshotCreator {
METRICS.storage_logs_processing_duration[&StorageChunkStage::LoadFromPostgres].start();
let logs = conn
.snapshots_creator_dal()
.get_storage_logs_chunk(miniblock_number, hashed_keys_range)
.get_storage_logs_chunk(miniblock_number, l1_batch_number, hashed_keys_range)
.await
.context("Error fetching storage logs count")?;
drop(conn);
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

215 changes: 210 additions & 5 deletions core/lib/dal/src/snapshots_creator_dal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,41 @@ impl SnapshotsCreatorDal<'_, '_> {
Ok(count as u64)
}

/// Returns the total number of rows in the `storage_logs` table before and at the specified miniblock.
///
/// **Warning.** This method is slow (requires a full table scan).
pub async fn get_storage_logs_row_count(
&mut self,
at_miniblock: MiniblockNumber,
) -> sqlx::Result<u64> {
let row = sqlx::query!(
r#"
SELECT
COUNT(*) AS COUNT
FROM
storage_logs
WHERE
miniblock_number <= $1
"#,
at_miniblock.0 as i64
)
.instrument("get_storage_logs_row_count")
.with_arg("miniblock_number", &at_miniblock)
.report_latency()
.fetch_one(self.storage)
.await?;
Ok(row.count.unwrap_or(0) as u64)
}

pub async fn get_storage_logs_chunk(
&mut self,
miniblock_number: MiniblockNumber,
l1_batch_number: L1BatchNumber,
hashed_keys_range: std::ops::RangeInclusive<H256>,
) -> sqlx::Result<Vec<SnapshotStorageLog>> {
// We need to filter the returned logs by `l1_batch_number` in order to not return "phantom writes", i.e.,
// logs that have deduplicated writes (e.g., a write to a non-zero value and back to zero in the same L1 batch)
// which are actually written to in future L1 batches.
let storage_logs = sqlx::query!(
r#"
SELECT
Expand All @@ -62,8 +92,8 @@ impl SnapshotsCreatorDal<'_, '_> {
storage_logs
WHERE
miniblock_number <= $1
AND hashed_key >= $2
AND hashed_key < $3
AND hashed_key >= $3
AND hashed_key <= $4
GROUP BY
hashed_key
ORDER BY
Expand All @@ -72,11 +102,14 @@ impl SnapshotsCreatorDal<'_, '_> {
INNER JOIN storage_logs ON keys.hashed_key = storage_logs.hashed_key
AND storage_logs.miniblock_number = keys.op[1]
AND storage_logs.operation_number = keys.op[2]
INNER JOIN initial_writes ON keys.hashed_key = initial_writes.hashed_key;
INNER JOIN initial_writes ON keys.hashed_key = initial_writes.hashed_key
WHERE
initial_writes.l1_batch_number <= $2
"#,
miniblock_number.0 as i64,
hashed_keys_range.start().0.as_slice(),
hashed_keys_range.end().0.as_slice(),
l1_batch_number.0 as i64,
hashed_keys_range.start().as_bytes(),
hashed_keys_range.end().as_bytes()
)
.instrument("get_storage_logs_chunk")
.with_arg("miniblock_number", &miniblock_number)
Expand Down Expand Up @@ -127,3 +160,175 @@ impl SnapshotsCreatorDal<'_, '_> {
.collect())
}
}

#[cfg(test)]
mod tests {
use zksync_types::StorageLog;

use super::*;
use crate::ConnectionPool;

#[tokio::test]
async fn getting_storage_log_chunks_basics() {
let pool = ConnectionPool::test_pool().await;
let mut conn = pool.access_storage().await.unwrap();

let logs = (0..100).map(|i| {
let key = StorageKey::new(
AccountTreeId::new(Address::random()),
H256::from_low_u64_be(i),
);
StorageLog::new_write_log(key, H256::repeat_byte(1))
});
let mut logs: Vec<_> = logs.collect();
logs.sort_unstable_by_key(|log| log.key.hashed_key());

conn.storage_logs_dal()
.insert_storage_logs(MiniblockNumber(1), &[(H256::zero(), logs.clone())])
.await
.unwrap();
let mut written_keys: Vec<_> = logs.iter().map(|log| log.key).collect();
written_keys.sort_unstable();
conn.storage_logs_dedup_dal()
.insert_initial_writes(L1BatchNumber(1), &written_keys)
.await
.unwrap();

let log_row_count = conn
.snapshots_creator_dal()
.get_storage_logs_row_count(MiniblockNumber(1))
.await
.unwrap();
assert_eq!(log_row_count, logs.len() as u64);
assert_logs_for_snapshot(&mut conn, MiniblockNumber(1), L1BatchNumber(1), &logs).await;

// Add some inserts / updates in the next miniblock. They should be ignored.
let new_logs = (100..150).map(|i| {
let key = StorageKey::new(
AccountTreeId::new(Address::random()),
H256::from_low_u64_be(i),
);
StorageLog::new_write_log(key, H256::repeat_byte(1))
});
let new_written_keys: Vec<_> = new_logs.clone().map(|log| log.key).collect();
let updated_logs = logs.iter().step_by(3).map(|&log| StorageLog {
value: H256::repeat_byte(23),
..log
});
let all_new_logs: Vec<_> = new_logs.chain(updated_logs).collect();
let all_new_logs_len = all_new_logs.len();
conn.storage_logs_dal()
.insert_storage_logs(MiniblockNumber(2), &[(H256::zero(), all_new_logs)])
.await
.unwrap();
conn.storage_logs_dedup_dal()
.insert_initial_writes(L1BatchNumber(2), &new_written_keys)
.await
.unwrap();

let log_row_count = conn
.snapshots_creator_dal()
.get_storage_logs_row_count(MiniblockNumber(1))
.await
.unwrap();
assert_eq!(log_row_count, logs.len() as u64);
let log_row_count = conn
.snapshots_creator_dal()
.get_storage_logs_row_count(MiniblockNumber(2))
.await
.unwrap();
assert_eq!(log_row_count, (logs.len() + all_new_logs_len) as u64);
assert_logs_for_snapshot(&mut conn, MiniblockNumber(1), L1BatchNumber(1), &logs).await;
}

async fn assert_logs_for_snapshot(
conn: &mut StorageProcessor<'_>,
miniblock_number: MiniblockNumber,
l1_batch_number: L1BatchNumber,
expected_logs: &[StorageLog],
) {
let all_logs = conn
.snapshots_creator_dal()
.get_storage_logs_chunk(
miniblock_number,
l1_batch_number,
H256::zero()..=H256::repeat_byte(0xff),
)
.await
.unwrap();
assert_eq!(all_logs.len(), expected_logs.len());
for (log, expected_log) in all_logs.iter().zip(expected_logs) {
assert_eq!(log.key, expected_log.key);
assert_eq!(log.value, expected_log.value);
assert_eq!(log.l1_batch_number_of_initial_write, l1_batch_number);
}

for chunk_size in [2, 5, expected_logs.len() / 3] {
for chunk in expected_logs.chunks(chunk_size) {
let range = chunk[0].key.hashed_key()..=chunk.last().unwrap().key.hashed_key();
let logs = conn
.snapshots_creator_dal()
.get_storage_logs_chunk(miniblock_number, l1_batch_number, range)
.await
.unwrap();
assert_eq!(logs.len(), chunk.len());
for (log, expected_log) in logs.iter().zip(chunk) {
assert_eq!(log.key, expected_log.key);
assert_eq!(log.value, expected_log.value);
}
}
}
}

#[tokio::test]
async fn phantom_writes_are_filtered_out() {
let pool = ConnectionPool::test_pool().await;
let mut conn = pool.access_storage().await.unwrap();

let key = StorageKey::new(AccountTreeId::default(), H256::repeat_byte(1));
let phantom_writes = vec![
StorageLog::new_write_log(key, H256::repeat_byte(1)),
StorageLog::new_write_log(key, H256::zero()),
];
conn.storage_logs_dal()
.insert_storage_logs(MiniblockNumber(1), &[(H256::zero(), phantom_writes)])
.await
.unwrap();
// initial writes are intentionally not inserted.

let real_write = StorageLog::new_write_log(key, H256::repeat_byte(2));
conn.storage_logs_dal()
.insert_storage_logs(MiniblockNumber(2), &[(H256::zero(), vec![real_write])])
.await
.unwrap();
conn.storage_logs_dedup_dal()
.insert_initial_writes(L1BatchNumber(2), &[key])
.await
.unwrap();

let logs = conn
.snapshots_creator_dal()
.get_storage_logs_chunk(
MiniblockNumber(1),
L1BatchNumber(1),
H256::zero()..=H256::repeat_byte(0xff),
)
.await
.unwrap();
assert_eq!(logs, []);

let logs = conn
.snapshots_creator_dal()
.get_storage_logs_chunk(
MiniblockNumber(2),
L1BatchNumber(2),
H256::zero()..=H256::repeat_byte(0xff),
)
.await
.unwrap();
assert_eq!(logs.len(), 1);
assert_eq!(logs[0].key, key);
assert_eq!(logs[0].value, real_write.value);
assert_eq!(logs[0].l1_batch_number_of_initial_write, L1BatchNumber(2));
}
}
Loading

0 comments on commit e279456

Please sign in to comment.