Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
chainHead_storage: Iterate over keys (#14628)
Browse files Browse the repository at this point in the history
* chainHead: Iterate over key,values and key,hashes

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead/tests: Multi query with iteration over keys

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead/events: Fix typo in StorageQuery

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead: Take 10 from key iterator

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

---------

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
Co-authored-by: parity-processbot <>
  • Loading branch information
lexnv authored Jul 25, 2023
1 parent d38d176 commit b0178d8
Show file tree
Hide file tree
Showing 4 changed files with 217 additions and 67 deletions.
6 changes: 2 additions & 4 deletions client/rpc-spec-v2/src/chain_head/chain_head.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,9 +300,7 @@ where
let items = items
.into_iter()
.map(|query| {
if query.queue_type != StorageQueryType::Value &&
query.queue_type != StorageQueryType::Hash
{
if query.query_type == StorageQueryType::ClosestDescendantMerkleValue {
// Note: remove this once all types are implemented.
let _ = sink.reject(ChainHeadRpcError::InvalidParam(
"Storage query type not supported".into(),
Expand All @@ -312,7 +310,7 @@ where

Ok(StorageQuery {
key: StorageKey(parse_hex_param(&mut sink, query.key)?),
queue_type: query.queue_type,
query_type: query.query_type,
})
})
.collect::<Result<Vec<_>, _>>()?;
Expand Down
147 changes: 105 additions & 42 deletions client/rpc-spec-v2/src/chain_head/chain_head_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,18 @@ use super::{
hex_string, ErrorEvent,
};

/// The maximum number of items the `chainHead_storage` can return
/// before paginations is required.
const MAX_ITER_ITEMS: usize = 10;

/// The query type of an interation.
enum IterQueryType {
/// Iterating over (key, value) pairs.
Value,
/// Iterating over (key, hash) pairs.
Hash,
}

/// Generates the events of the `chainHead_storage` method.
pub struct ChainHeadStorage<Client, Block, BE> {
/// Substrate client.
Expand All @@ -58,7 +70,10 @@ fn is_key_queryable(key: &[u8]) -> bool {
}

/// The result of making a query call.
type QueryResult = Result<StorageResult<String>, ChainHeadStorageEvent<String>>;
type QueryResult = Result<Option<StorageResult<String>>, ChainHeadStorageEvent<String>>;

/// The result of iterating over keys.
type QueryIterResult = Result<Vec<StorageResult<String>>, ChainHeadStorageEvent<String>>;

impl<Client, Block, BE> ChainHeadStorage<Client, Block, BE>
where
Expand All @@ -72,7 +87,7 @@ where
hash: Block::Hash,
key: &StorageKey,
child_key: Option<&ChildInfo>,
) -> Option<QueryResult> {
) -> QueryResult {
let result = if let Some(child_key) = child_key {
self.client.child_storage(hash, child_key, key)
} else {
Expand All @@ -81,17 +96,15 @@ where

result
.map(|opt| {
opt.map(|storage_data| {
QueryResult::Ok(StorageResult::<String> {
key: hex_string(&key.0),
result: StorageResultType::Value(hex_string(&storage_data.0)),
})
})
QueryResult::Ok(opt.map(|storage_data| StorageResult::<String> {
key: hex_string(&key.0),
result: StorageResultType::Value(hex_string(&storage_data.0)),
}))
})
.unwrap_or_else(|err| {
Some(QueryResult::Err(ChainHeadStorageEvent::<String>::Error(ErrorEvent {
QueryResult::Err(ChainHeadStorageEvent::<String>::Error(ErrorEvent {
error: err.to_string(),
})))
}))
})
}

Expand All @@ -101,7 +114,7 @@ where
hash: Block::Hash,
key: &StorageKey,
child_key: Option<&ChildInfo>,
) -> Option<QueryResult> {
) -> QueryResult {
let result = if let Some(child_key) = child_key {
self.client.child_storage_hash(hash, child_key, key)
} else {
Expand All @@ -110,36 +123,49 @@ where

result
.map(|opt| {
opt.map(|storage_data| {
QueryResult::Ok(StorageResult::<String> {
key: hex_string(&key.0),
result: StorageResultType::Hash(hex_string(&storage_data.as_ref())),
})
})
QueryResult::Ok(opt.map(|storage_data| StorageResult::<String> {
key: hex_string(&key.0),
result: StorageResultType::Hash(hex_string(&storage_data.as_ref())),
}))
})
.unwrap_or_else(|err| {
Some(QueryResult::Err(ChainHeadStorageEvent::<String>::Error(ErrorEvent {
QueryResult::Err(ChainHeadStorageEvent::<String>::Error(ErrorEvent {
error: err.to_string(),
})))
}))
})
}

/// Make the storage query.
fn query_storage(
/// Handle iterating over (key, value) or (key, hash) pairs.
fn query_storage_iter(
&self,
hash: Block::Hash,
query: &StorageQuery<StorageKey>,
key: &StorageKey,
child_key: Option<&ChildInfo>,
) -> Option<QueryResult> {
if !is_key_queryable(&query.key.0) {
return None
ty: IterQueryType,
) -> QueryIterResult {
let keys_iter = if let Some(child_key) = child_key {
self.client.child_storage_keys(hash, child_key.to_owned(), Some(key), None)
} else {
self.client.storage_keys(hash, Some(key), None)
}

match query.queue_type {
StorageQueryType::Value => self.query_storage_value(hash, &query.key, child_key),
StorageQueryType::Hash => self.query_storage_hash(hash, &query.key, child_key),
_ => None,
.map_err(|err| {
ChainHeadStorageEvent::<String>::Error(ErrorEvent { error: err.to_string() })
})?;

let mut ret = Vec::with_capacity(MAX_ITER_ITEMS);
let mut keys_iter = keys_iter.take(MAX_ITER_ITEMS);
while let Some(key) = keys_iter.next() {
let result = match ty {
IterQueryType::Value => self.query_storage_value(hash, &key, child_key),
IterQueryType::Hash => self.query_storage_hash(hash, &key, child_key),
}?;

if let Some(result) = result {
ret.push(result);
}
}

QueryIterResult::Ok(ret)
}

/// Generate the block events for the `chainHead_storage` method.
Expand All @@ -159,19 +185,56 @@ where

let mut storage_results = Vec::with_capacity(items.len());
for item in items {
let Some(result) = self.query_storage(hash, &item, child_key.as_ref()) else {
continue
};

match result {
QueryResult::Ok(storage_result) => storage_results.push(storage_result),
QueryResult::Err(event) => {
let _ = sink.send(&event);
// If an error is encountered for any of the query items
// do not produce any other events.
return
},
if !is_key_queryable(&item.key.0) {
continue
}

match item.query_type {
StorageQueryType::Value => {
match self.query_storage_value(hash, &item.key, child_key.as_ref()) {
Ok(Some(value)) => storage_results.push(value),
Ok(None) => continue,
Err(err) => {
let _ = sink.send(&err);
return
},
}
},
StorageQueryType::Hash =>
match self.query_storage_hash(hash, &item.key, child_key.as_ref()) {
Ok(Some(value)) => storage_results.push(value),
Ok(None) => continue,
Err(err) => {
let _ = sink.send(&err);
return
},
},
StorageQueryType::DescendantsValues => match self.query_storage_iter(
hash,
&item.key,
child_key.as_ref(),
IterQueryType::Value,
) {
Ok(values) => storage_results.extend(values),
Err(err) => {
let _ = sink.send(&err);
return
},
},
StorageQueryType::DescendantsHashes => match self.query_storage_iter(
hash,
&item.key,
child_key.as_ref(),
IterQueryType::Hash,
) {
Ok(values) => storage_results.extend(values),
Err(err) => {
let _ = sink.send(&err);
return
},
},
_ => continue,
};
}

if !storage_results.is_empty() {
Expand Down
12 changes: 6 additions & 6 deletions client/rpc-spec-v2/src/chain_head/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ pub struct StorageQuery<Key> {
pub key: Key,
/// The type of the storage query.
#[serde(rename = "type")]
pub queue_type: StorageQueryType,
pub query_type: StorageQueryType,
}

/// The type of the storage query.
Expand Down Expand Up @@ -558,7 +558,7 @@ mod tests {
#[test]
fn chain_head_storage_query() {
// Item with Value.
let item = StorageQuery { key: "0x1", queue_type: StorageQueryType::Value };
let item = StorageQuery { key: "0x1", query_type: StorageQueryType::Value };
// Encode
let ser = serde_json::to_string(&item).unwrap();
let exp = r#"{"key":"0x1","type":"value"}"#;
Expand All @@ -568,7 +568,7 @@ mod tests {
assert_eq!(dec, item);

// Item with Hash.
let item = StorageQuery { key: "0x1", queue_type: StorageQueryType::Hash };
let item = StorageQuery { key: "0x1", query_type: StorageQueryType::Hash };
// Encode
let ser = serde_json::to_string(&item).unwrap();
let exp = r#"{"key":"0x1","type":"hash"}"#;
Expand All @@ -578,7 +578,7 @@ mod tests {
assert_eq!(dec, item);

// Item with DescendantsValues.
let item = StorageQuery { key: "0x1", queue_type: StorageQueryType::DescendantsValues };
let item = StorageQuery { key: "0x1", query_type: StorageQueryType::DescendantsValues };
// Encode
let ser = serde_json::to_string(&item).unwrap();
let exp = r#"{"key":"0x1","type":"descendants-values"}"#;
Expand All @@ -588,7 +588,7 @@ mod tests {
assert_eq!(dec, item);

// Item with DescendantsHashes.
let item = StorageQuery { key: "0x1", queue_type: StorageQueryType::DescendantsHashes };
let item = StorageQuery { key: "0x1", query_type: StorageQueryType::DescendantsHashes };
// Encode
let ser = serde_json::to_string(&item).unwrap();
let exp = r#"{"key":"0x1","type":"descendants-hashes"}"#;
Expand All @@ -599,7 +599,7 @@ mod tests {

// Item with Merkle.
let item =
StorageQuery { key: "0x1", queue_type: StorageQueryType::ClosestDescendantMerkleValue };
StorageQuery { key: "0x1", query_type: StorageQueryType::ClosestDescendantMerkleValue };
// Encode
let ser = serde_json::to_string(&item).unwrap();
let exp = r#"{"key":"0x1","type":"closest-descendant-merkle-value"}"#;
Expand Down
Loading

0 comments on commit b0178d8

Please sign in to comment.