Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use state_getKeys for historical prefix querying #442

Merged
merged 1 commit into from
Sep 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,28 @@ import jp.co.soramitsu.fearless_utils.wsrpc.request.runtime.RuntimeRequest
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ensureActive
import kotlinx.coroutines.withContext
import kotlin.coroutines.coroutineContext

class GetKeysPagedRequest(
keyPrefix: String,
pageSize: Int,
fullKeyOffset: String?,
at: BlockHash?
) : RuntimeRequest(
method = "state_getKeysPaged",
params = listOfNotNull(
keyPrefix,
pageSize,
fullKeyOffset,
)
)

class GetKeys(
keyPrefix: String,
at: BlockHash?
) : RuntimeRequest(
method = "state_getKeys",
params = listOfNotNull(
keyPrefix,
at
)
)
Expand Down Expand Up @@ -51,30 +61,20 @@ class BulkRetriever(
private val pageSize: Int = DEFAULT_PAGE_SIZE
) {

/**
* Retrieves all keys starting with [keyPrefix] from [at] block
* Returns only first [DEFAULT_PAGE_SIZE] elements in case historical querying is used ([at] is not null)
*/
suspend fun retrieveAllKeys(
socketService: SocketService,
keyPrefix: String,
at: BlockHash? = null
): List<String> = withContext(Dispatchers.IO) {
val result = mutableListOf<String>()

var currentOffset: String? = null

while (true) {
ensureActive()

val request = GetKeysPagedRequest(keyPrefix, DEFAULT_PAGE_SIZE, currentOffset, at)

val page = socketService.executeAsync(request, mapper = pojoList<String>().nonNull())

result += page

if (isLastPage(page)) break

currentOffset = page.last()
if (at != null) {
queryKeysByPrefixHistorical(socketService, keyPrefix, at)
} else {
queryKeysByPrefixCurrent(socketService, keyPrefix)
}

result
}

suspend fun queryKeys(
Expand All @@ -98,6 +98,48 @@ class BulkRetriever(
}
}

/**
* Note: the amount of keys returned by this method is limited by [DEFAULT_PAGE_SIZE]
* So it is should not be used for storages with big amount of entries
*/
private suspend fun queryKeysByPrefixHistorical(
socketService: SocketService,
prefix: String,
at: BlockHash
): List<String> {
// We use `state_getKeys` for historical prefix queries instead of `state_getKeysPaged`
// since most of the chains always return empty list when the same is requested via `state_getKeysPaged`
// Thus, we can only request up to 1000 first historical keys
val request = GetKeys(prefix, at)

return socketService.executeAsync(request, mapper = pojoList<String>().nonNull())
}

private suspend fun queryKeysByPrefixCurrent(
socketService: SocketService,
prefix: String
): List<String> {
val result = mutableListOf<String>()

var currentOffset: String? = null

while (true) {
coroutineContext.ensureActive()

val request = GetKeysPagedRequest(prefix, DEFAULT_PAGE_SIZE, currentOffset)

val page = socketService.executeAsync(request, mapper = pojoList<String>().nonNull())

result += page

if (isLastPage(page)) break

currentOffset = page.last()
}

return result
}

private fun isLastPage(page: List<String>) = page.size < pageSize
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,8 @@ import io.novafoundation.nova.runtime.multiNetwork.ChainRegistry
import io.novafoundation.nova.runtime.multiNetwork.getRuntime
import io.novafoundation.nova.runtime.state.chain
import io.novafoundation.nova.runtime.storage.source.StorageDataSource
import io.novafoundation.nova.runtime.storage.source.query.StorageQueryContext
import jp.co.soramitsu.fearless_utils.runtime.AccountId
import jp.co.soramitsu.fearless_utils.runtime.metadata.storage
import jp.co.soramitsu.fearless_utils.runtime.metadata.storageKey
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.emptyFlow
import kotlinx.coroutines.flow.mapLatest
Expand All @@ -43,16 +40,10 @@ class TuringAutomationTasksUpdater(
return storageSubscriptionBuilder.subscribe(accountKey)
.withIndex()
.mapLatest { (index, change) ->
val expectedAccountValue = change.value
val isChange = index > 0
val at = if (isChange) change.block else null

remoteStorageSource.query(chain.id) {
if (isChange) {
// Due to parachain consensus querying right after change arrived does not guarantee that the change will be in storage right away
// So we wait until account storage matches with the change
awaitSameAccountValue(accountId, expectedAccountValue)
}

remoteStorageSource.query(chain.id, at) {
val storageEntry = runtime.metadata.automationTime().storage("AccountTasks")
val entries = storageEntry.entriesRaw(accountId)
val storagePrefix = storageEntry.storageKey(runtime, accountId)
Expand All @@ -61,21 +52,4 @@ class TuringAutomationTasksUpdater(
}
}.noSideAffects()
}

private suspend fun StorageQueryContext.awaitSameAccountValue(accountId: AccountId, expected: String?) {
suspend fun checkSame(): Boolean {
val actual = runtime.metadata.system().storage("Account").queryRaw(accountId)

return actual == expected
}

// no delay at the first try
if (checkSame()) {
return
}

do {
delay(500)
} while (!checkSame())
}
}