Skip to content

Commit

Permalink
chainHead/storage: Fix storage iteration using the query key (parityt…
Browse files Browse the repository at this point in the history
…ech#1665)

This PR ensures that all storage keys under a prefix are returned by the
`chainHead_storage` method.

Before this PR, the `storage_keys` was used with just the `start_key`.
Before the pagination event was generated, the last reported key
`last_key` was saved internally.
When the pagination is resumed, the `last_key` will serve as the next
`start_key` to the `storage_keys` API.

However, this behavior does not function properly for non-prefixed
storage keys.

Entry keys `a`, `ab`, `abc` share a common prefix and therefore the `ab`
key leads to `abc`.
However, for `a`, `ab`, `aB` and `abc`, the `aB` key does not
immediately lead to `abc`.

To mitigate this, the PR saves the start key of the query, together with
the next pagination key.
Improve testing to ensure we have a key entry that doesn't share the
prefix with the descendant key.

@paritytech/subxt-team

---------

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
  • Loading branch information
lexnv authored Sep 26, 2023
1 parent 3f7f9cc commit 022c7cd
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 13 deletions.
43 changes: 30 additions & 13 deletions substrate/client/rpc-spec-v2/src/chain_head/chain_head_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,10 @@ impl<Client, Block, BE> ChainHeadStorage<Client, Block, BE> {

/// Query to iterate over storage.
struct QueryIter {
/// The next key from which the iteration should continue.
next_key: StorageKey,
/// The key from which the iteration was started.
query_key: StorageKey,
/// The key after which pagination should resume.
pagination_start_key: Option<StorageKey>,
/// The type of the query (either value or hash).
ty: IterQueryType,
}
Expand Down Expand Up @@ -184,20 +186,27 @@ where
hash: Block::Hash,
child_key: Option<&ChildInfo>,
) -> QueryIterResult {
let QueryIter { next_key, ty } = query;
let QueryIter { ty, query_key, pagination_start_key } = query;

let mut keys_iter = if let Some(child_key) = child_key {
self.client
.child_storage_keys(hash, child_key.to_owned(), Some(&next_key), None)
self.client.child_storage_keys(
hash,
child_key.to_owned(),
Some(&query_key),
pagination_start_key.as_ref(),
)
} else {
self.client.storage_keys(hash, Some(&next_key), None)
self.client.storage_keys(hash, Some(&query_key), pagination_start_key.as_ref())
}
.map_err(|err| err.to_string())?;

let mut ret = Vec::with_capacity(self.operation_max_storage_items);
let mut next_pagination_key = None;
for _ in 0..self.operation_max_storage_items {
let Some(key) = keys_iter.next() else { break };

next_pagination_key = Some(key.clone());

let result = match ty {
IterQueryType::Value => self.query_storage_value(hash, &key, child_key),
IterQueryType::Hash => self.query_storage_hash(hash, &key, child_key),
Expand All @@ -209,7 +218,11 @@ where
}

// Save the next key if any to continue the iteration.
let maybe_next_query = keys_iter.next().map(|next_key| QueryIter { next_key, ty });
let maybe_next_query = keys_iter.next().map(|_| QueryIter {
ty,
query_key,
pagination_start_key: next_pagination_key,
});
Ok((ret, maybe_next_query))
}

Expand Down Expand Up @@ -325,12 +338,16 @@ where
return
},
},
StorageQueryType::DescendantsValues => self
.iter_operations
.push_back(QueryIter { next_key: item.key, ty: IterQueryType::Value }),
StorageQueryType::DescendantsHashes => self
.iter_operations
.push_back(QueryIter { next_key: item.key, ty: IterQueryType::Hash }),
StorageQueryType::DescendantsValues => self.iter_operations.push_back(QueryIter {
query_key: item.key,
ty: IterQueryType::Value,
pagination_start_key: None,
}),
StorageQueryType::DescendantsHashes => self.iter_operations.push_back(QueryIter {
query_key: item.key,
ty: IterQueryType::Hash,
pagination_start_key: None,
}),
};
}

Expand Down
20 changes: 20 additions & 0 deletions substrate/client/rpc-spec-v2/src/chain_head/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2352,6 +2352,7 @@ async fn check_continue_operation() {
builder.push_storage_change(b":m".to_vec(), Some(b"a".to_vec())).unwrap();
builder.push_storage_change(b":mo".to_vec(), Some(b"ab".to_vec())).unwrap();
builder.push_storage_change(b":moc".to_vec(), Some(b"abc".to_vec())).unwrap();
builder.push_storage_change(b":moD".to_vec(), Some(b"abcmoD".to_vec())).unwrap();
builder.push_storage_change(b":mock".to_vec(), Some(b"abcd".to_vec())).unwrap();
let block = builder.build().unwrap().block;
let block_hash = format!("{:?}", block.header.hash());
Expand Down Expand Up @@ -2430,6 +2431,25 @@ async fn check_continue_operation() {
res.items[0].result == StorageResultType::Value(hex_string(b"ab"))
);

// Pagination event.
assert_matches!(
get_next_event::<FollowEvent<String>>(&mut sub).await,
FollowEvent::OperationWaitingForContinue(res) if res.operation_id == operation_id
);
does_not_produce_event::<FollowEvent<String>>(
&mut sub,
std::time::Duration::from_secs(DOES_NOT_PRODUCE_EVENTS_SECONDS),
)
.await;
let _res: () = api.call("chainHead_unstable_continue", [&sub_id, &operation_id]).await.unwrap();
assert_matches!(
get_next_event::<FollowEvent<String>>(&mut sub).await,
FollowEvent::OperationStorageItems(res) if res.operation_id == operation_id &&
res.items.len() == 1 &&
res.items[0].key == hex_string(b":moD") &&
res.items[0].result == StorageResultType::Value(hex_string(b"abcmoD"))
);

// Pagination event.
assert_matches!(
get_next_event::<FollowEvent<String>>(&mut sub).await,
Expand Down

0 comments on commit 022c7cd

Please sign in to comment.