Skip to content

Commit

Permalink
Merge pull request #34 from aleph-im/feat/clean-responses-from-indexer
Browse files Browse the repository at this point in the history
Clean up responses in indexer instances
  • Loading branch information
MHHukiewitz authored Apr 25, 2023
2 parents 088a55a + 05d1a24 commit 6df3609
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,21 +56,26 @@ export function createEntityRequestResponseDAL<T extends ParsedEntity<unknown>>(
newEntity: EntityRequestResponse<T>,
): Promise<EntityUpdateOp> {
if (oldEntity) {
const nonceIndexes = {
...oldEntity.nonceIndexes,
...newEntity.nonceIndexes,
}

if (!('parsed' in newEntity) && 'parsed' in oldEntity) {
Object.assign(newEntity, oldEntity)
}

newEntity.nonceIndexes = nonceIndexes
if (
('parsed' in newEntity)
&& !newEntity.nonceIndexes
) {
return EntityUpdateOp.Delete
}

// console.log(
// 'updated entity [entity_request_responses]',
// newEntity.timestampIndexes.length,
// )
// @note: This is a hack to make sure that the nonce indexes are
// not overwritten by the new entity. This is usually the case when
// the entity contains the actual transaction data, at which point we
// do not have the actual nonce indexes, but still need to pass in a
// nonce index object to the entity storage.
newEntity.nonceIndexes = {
...newEntity.nonceIndexes,
...oldEntity.nonceIndexes,
}
}

return EntityUpdateOp.Update
Expand Down
42 changes: 22 additions & 20 deletions packages/framework/src/services/indexer/src/entityFetcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -189,23 +189,20 @@ export abstract class BaseIndexerEntityFetcher<

async isRequestComplete(nonce: number): Promise<boolean> {
const request = await this.entityRequestDAL.get(nonce.toString())
if (!request) throw new Error(`Request with nonce ${nonce} does not exists`)
if (!request) throw new Error(`Request with nonce ${nonce} does not exist`)

return !!request.complete
}

async awaitRequestComplete(nonce: number): Promise<void> {
const request = await this.entityRequestDAL.get(nonce.toString())
if (!request) throw new Error(`Request with nonce ${nonce} does not exists`)

if (!request.complete) {
if (!await this.isRequestComplete(nonce)) {
await this.getFuture(nonce).promise
}
}

async getResponse(nonce: number): Promise<TransactionResponse<T>> {
const request = await this.entityRequestDAL.get(nonce.toString())
if (!request) throw new Error(`Request with nonce ${nonce} does not exists`)
if (!request) throw new Error(`Request with nonce ${nonce} does not exist`)

if (!request.complete) {
await this.getFuture(nonce).promise
Expand All @@ -223,7 +220,13 @@ export abstract class BaseIndexerEntityFetcher<
remove: async () => {
this.log('----> REMOVE REQ 🎈', request.nonce)
await this.entityRequestDAL.remove(request)
// @todo: Update nonceIndexes / Remove from entityRequestResponseDAL
// remove nonce from nonceIndex and update response
const items = []
for await (const item of response) {
delete item.nonceIndexes[request.nonce]
items.push(item)
}
await this.entityRequestResponseDAL.save(items)
},
}
}
Expand Down Expand Up @@ -253,10 +256,9 @@ export abstract class BaseIndexerEntityFetcher<
const requestsNonces = []
let lastFilteredTxs = 0

let filteredTxs: T[] = []
let filteredTxs: EntityRequestResponse<T>[] = []
let remainingTxs: T[] = chunk
let requestCount = 0
const requestCountId = []

for await (const request of requests) {
const { nonce, complete } = request
Expand All @@ -267,10 +269,13 @@ export abstract class BaseIndexerEntityFetcher<
request,
)

filteredTxs = filteredTxs.concat(result.filteredEntities)
const requestResponses = result.filteredEntities as EntityRequestResponse<T>[]
for (const responses of requestResponses) {
(responses as EntityRequestResponse<T>).nonceIndexes = { [nonce]: 0 }
}
filteredTxs = filteredTxs.concat(requestResponses)
remainingTxs = result.remainingEntities
requestCount++
requestCountId.push(nonce)

lastFilteredTxs = filteredTxs.length - lastFilteredTxs
requestsNonces.push([nonce, lastFilteredTxs])
Expand All @@ -282,14 +287,11 @@ export abstract class BaseIndexerEntityFetcher<

if (filteredTxs.length === 0) return

const requestResponse =
filteredTxs as unknown as EntityRequestResponse<T>[]

const pendingIds = filteredTxs as unknown as EntityRequestPendingEntity[]

this.log(`Removing pendingIds`, pendingIds.map((p) => p.id).join('\n'))
//this.log(`Removing pendingIds`, pendingIds.map((p) => p.id).join('\n'))

await this.entityRequestResponseDAL.save(requestResponse)
await this.entityRequestResponseDAL.save(filteredTxs)
await this.entityRequestPendingEntityDAL.remove(pendingIds)

this.checkCompletionJob.run().catch(() => 'ignore')
Expand Down Expand Up @@ -347,7 +349,7 @@ export abstract class BaseIndexerEntityFetcher<
const future = this.getFuture(nonce)
let count = 0

// @note: Sometimes we receive the responses before inserting the pendings signatures on
// @note: Sometimes we receive the responses before inserting the pending signatures on
// the db, the purpose of this mutex is to avoid this
const now1 = Date.now()
const release = await this.requestMutex.acquire()
Expand Down Expand Up @@ -397,7 +399,7 @@ export abstract class BaseIndexerEntityFetcher<
const elapsed2 = Date.now() - now2
this.log(`onRequest time => ${elapsed1 / 1000} | ${elapsed2 / 1000}`)

this.log(`🟡 Request ${nonce} inited`)
this.log(`🟡 Request ${nonce} initialized with ${count} entities`)

if (!count) {
this.log(`🟢 Request ${nonce} complete`)
Expand Down Expand Up @@ -513,7 +515,7 @@ export abstract class BaseIndexerEntityFetcher<
`[Retry] Check ${tx?.id}`,
!!tx,
tx && 'parsed' in (tx || {}),
tx && tx.nonceIndexes[nonce] >= 0,
tx?.nonceIndexes[nonce],
request.nonce,
)

Expand Down Expand Up @@ -564,7 +566,7 @@ export abstract class BaseIndexerEntityFetcher<
): Promise<void> {
const ids = pendings.map(({ id }) => id)

this.log(`Retrying ${ids.length} ${this.blockchainId} ids`, ids)
this.log(`Retrying ${ids.length} ${this.blockchainId} ids`)

return this.fetcherMsClient
.useBlockchain(this.blockchainId as Blockchain)
Expand Down

0 comments on commit 6df3609

Please sign in to comment.