Skip to content

Commit

Permalink
Move Merkle proof verification to warp syncing module (#3054)
Browse files Browse the repository at this point in the history
Before this PR, the warp syncing code asks "hey I'd like to know the
runtime", and the upper layers download a Merkle proof, verify it, and
give the storage values to the warp syncing code.

This PR modifies this. The warp syncing code now asks for a Merkle proof
of arbitrary keys, and the upper layers download that proof and give to
the warp syncing code. It is the warp syncing code that verifies said
proof.

The objective of this PR is simplification of the API surface of the
syncing code.
It will also make it easier to implement
#1769, as it is now clear
who is responsible for first downloading only the hash.

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
tomaka and mergify[bot] authored Nov 30, 2022
1 parent 7750c22 commit 4bf957a
Show file tree
Hide file tree
Showing 4 changed files with 138 additions and 109 deletions.
2 changes: 1 addition & 1 deletion bin/full-node/src/run/consensus_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -997,7 +997,7 @@ impl SyncBackground {
.push(request.map(move |r| (request_id, source_id, r)).boxed());
}
all::DesiredRequest::GrandpaWarpSync { .. }
| all::DesiredRequest::StorageGet { .. }
| all::DesiredRequest::StorageGetMerkleProof { .. }
| all::DesiredRequest::RuntimeCallMerkleProof { .. } => {
// Not used in "full" mode.
unreachable!()
Expand Down
39 changes: 5 additions & 34 deletions bin/light-base/src/sync_service/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ use smoldot::{
libp2p,
network::{self, protocol},
sync::all,
trie::proof_decode,
};

/// Starts a sync service background task to synchronize a standalone chain (relay chain or not).
Expand Down Expand Up @@ -281,11 +280,7 @@ pub(super) async fn start_standalone_chain<TPlat: Platform>(
// `result` is an error if the request got cancelled by the sync state machine.
if let Ok(result) = result {
// Inject the result of the request into the sync state machine.
task.sync.storage_get_response(
request_id,
result.map(|list| list.into_iter()),
).1

task.sync.storage_get_response(request_id, result).1
} else {
// The sync state machine has emitted a `Action::Cancel` earlier, and is
// thus no longer interested in the response.
Expand Down Expand Up @@ -439,13 +434,7 @@ struct Task<TPlat: Platform> {

/// List of storage requests currently in progress.
pending_storage_requests: stream::FuturesUnordered<
future::BoxFuture<
'static,
(
all::RequestId,
Result<Result<Vec<Option<Vec<u8>>>, ()>, future::Aborted>,
),
>,
future::BoxFuture<'static, (all::RequestId, Result<Result<Vec<u8>, ()>, future::Aborted>)>,
>,

/// List of call proof requests currently in progress.
Expand Down Expand Up @@ -560,10 +549,10 @@ impl<TPlat: Platform> Task<TPlat> {
.push(async move { (request_id, grandpa_request.await) }.boxed());
}

all::DesiredRequest::StorageGet {
all::DesiredRequest::StorageGetMerkleProof {
block_hash,
state_trie_root,
ref keys,
..
} => {
let peer_id = self.sync[source_id].0.clone(); // TODO: why does this require cloning? weird borrow chk issue

Expand All @@ -577,28 +566,10 @@ impl<TPlat: Platform> Task<TPlat> {
Duration::from_secs(16),
);

let keys = keys.clone();
let storage_request = async move {
if let Ok(outcome) = storage_request.await {
// TODO: lots of copying around
// TODO: log what happens
if let Ok(decoded) =
proof_decode::decode_and_verify_proof(proof_decode::Config {
proof: outcome.decode(),
trie_root_hash: &state_trie_root,
})
{
keys.iter()
.map(|key| {
decoded
.storage_value(key)
.ok_or(())
.map(|v| v.map(|v| v.to_vec()))
})
.collect::<Result<Vec<_>, ()>>()
} else {
Err(())
}
Ok(outcome.decode().to_vec()) // TODO: no to_vec() here, needs some API change on the networking
} else {
Err(())
}
Expand Down
38 changes: 15 additions & 23 deletions src/sync/all.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use crate::{
verify,
};

use alloc::{borrow::Cow, vec, vec::Vec};
use alloc::{borrow::Cow, vec::Vec};
use core::{
cmp, iter, marker, mem,
num::{NonZeroU32, NonZeroU64},
Expand Down Expand Up @@ -892,13 +892,14 @@ impl<TRq, TSrc, TBl> AllSync<TRq, TSrc, TBl> {
sync_start_block_hash: block_hash,
}
}
warp_sync::DesiredRequest::RuntimeParametersGet {
warp_sync::DesiredRequest::StorageGetMerkleProof {
block_hash,
state_trie_root,
} => DesiredRequest::StorageGet {
keys,
} => DesiredRequest::StorageGetMerkleProof {
block_hash,
state_trie_root,
keys: vec![b":code".to_vec(), b":heappages".to_vec()],
keys,
},
warp_sync::DesiredRequest::RuntimeCallMerkleProof {
block_hash,
Expand Down Expand Up @@ -1038,7 +1039,7 @@ impl<TRq, TSrc, TBl> AllSync<TRq, TSrc, TBl> {
(
AllSyncInner::GrandpaWarpSync { inner },
RequestDetail::StorageGet { block_hash, keys },
) if keys == &[&b":code"[..], &b":heappages"[..]] => {
) => {
let inner_source_id = match self.shared.sources.get(source_id.0).unwrap() {
SourceMapping::GrandpaWarpSync(inner_source_id) => *inner_source_id,
_ => unreachable!(),
Expand All @@ -1053,8 +1054,9 @@ impl<TRq, TSrc, TBl> AllSync<TRq, TSrc, TBl> {
outer_request_id,
user_data,
},
warp_sync::RequestDetail::RuntimeParametersGet {
warp_sync::RequestDetail::StorageGetMerkleProof {
block_hash: *block_hash,
keys: keys.clone(), // TODO: clone?
},
);

Expand Down Expand Up @@ -1579,7 +1581,7 @@ impl<TRq, TSrc, TBl> AllSync<TRq, TSrc, TBl> {
pub fn storage_get_response(
&mut self,
request_id: RequestId,
response: Result<impl Iterator<Item = Option<impl AsRef<[u8]>>>, ()>,
response: Result<Vec<u8>, ()>,
) -> (TRq, ResponseOutcome) {
debug_assert!(self.shared.requests.contains(request_id.0));
let request = self.shared.requests.remove(request_id.0);
Expand All @@ -1591,17 +1593,10 @@ impl<TRq, TSrc, TBl> AllSync<TRq, TSrc, TBl> {
) {
(
AllSyncInner::GrandpaWarpSync { inner: mut sync },
Ok(mut response),
Ok(response),
RequestMapping::WarpSync(request_id),
) => {
// In this state, we expect the response to be one value for `:code` and one for
// `:heappages`. As documented, we panic if the number of items isn't 2.
let code = response.next().unwrap();
let heap_pages = response.next().unwrap();
assert!(response.next().is_none());

let user_data = sync.runtime_parameters_get_success(request_id, code, heap_pages);

let user_data = sync.storage_get_success(request_id, response);
self.inner = AllSyncInner::GrandpaWarpSync { inner: sync };
(user_data.user_data, ResponseOutcome::Queued)
}
Expand Down Expand Up @@ -1783,7 +1778,7 @@ pub enum DesiredRequest {
},

/// Sending a storage query is requested.
StorageGet {
StorageGetMerkleProof {
/// Hash of the block whose storage is requested.
block_hash: [u8; 32],
/// Merkle value of the root of the storage trie of the block.
Expand Down Expand Up @@ -1914,7 +1909,7 @@ impl From<DesiredRequest> for RequestDetail {
} => RequestDetail::GrandpaWarpSync {
sync_start_block_hash,
},
DesiredRequest::StorageGet {
DesiredRequest::StorageGetMerkleProof {
block_hash, keys, ..
} => RequestDetail::StorageGet { block_hash, keys },
DesiredRequest::RuntimeCallMerkleProof {
Expand Down Expand Up @@ -2811,11 +2806,8 @@ impl<TRq> Shared<TRq> {
sync_start_block_hash: block_hash,
}
}
warp_sync::RequestDetail::RuntimeParametersGet { block_hash } => {
RequestDetail::StorageGet {
block_hash,
keys: vec![b":code".to_vec(), b":heappages".to_vec()],
}
warp_sync::RequestDetail::StorageGetMerkleProof { block_hash, keys } => {
RequestDetail::StorageGet { block_hash, keys }
}
warp_sync::RequestDetail::RuntimeCallMerkleProof {
block_hash,
Expand Down
Loading

0 comments on commit 4bf957a

Please sign in to comment.