From abd7d91c8df99d8ba9473a228aadb6348ca22693 Mon Sep 17 00:00:00 2001 From: Mike Hukiewitz Date: Thu, 23 Feb 2023 13:22:29 +0100 Subject: [PATCH 1/8] Problem: indexer instances would get too fat with requestResponseDAL data Solution: remove responses when all assigned request nonces have been cleared (when the response has been processed, it removes its nonce from the response) --- .../indexer/src/dal/entityRequestResponse.ts | 4 + .../src/services/indexer/src/entityFetcher.ts | 73 ++++++++++--------- 2 files changed, 43 insertions(+), 34 deletions(-) diff --git a/packages/framework/src/services/indexer/src/dal/entityRequestResponse.ts b/packages/framework/src/services/indexer/src/dal/entityRequestResponse.ts index bd919734..81df4f17 100644 --- a/packages/framework/src/services/indexer/src/dal/entityRequestResponse.ts +++ b/packages/framework/src/services/indexer/src/dal/entityRequestResponse.ts @@ -1,5 +1,6 @@ import { EntityStorage, EntityUpdateOp } from '@aleph-indexer/core' import { IndexableEntityType, ParsedEntity } from '../../../../types.js' +import * as console from 'console' export type EntitySignatureResponse = { id: string @@ -56,6 +57,9 @@ export function createEntityRequestResponseDAL>( newEntity: EntityRequestResponse, ): Promise { if (oldEntity) { + if (!newEntity.nonceIndexes) { + return EntityUpdateOp.Delete + } const nonceIndexes = { ...oldEntity.nonceIndexes, ...newEntity.nonceIndexes, diff --git a/packages/framework/src/services/indexer/src/entityFetcher.ts b/packages/framework/src/services/indexer/src/entityFetcher.ts index 39f0d404..5ec573c9 100644 --- a/packages/framework/src/services/indexer/src/entityFetcher.ts +++ b/packages/framework/src/services/indexer/src/entityFetcher.ts @@ -54,14 +54,14 @@ export interface TransactionResponse { export abstract class BaseIndexerEntityFetcher< T extends ParsedEntity, > { - protected checkPendingRetriesJob!: Utils.JobRunner + protected retryPendingSignaturesJob!: Utils.JobRunner protected checkCompletionJob!: Utils.DebouncedJob protected requestFutures: Record> = {} protected requestMutex = new Mutex() protected events: EventEmitter = new EventEmitter() protected incomingEntities: PendingWorkPool - protected toRetryBuffer: Utils.BufferExec - protected toRemoveBuffer: Utils.BufferExec + protected pendingRequestToRetryBuffer: Utils.BufferExec + protected pendingRequestToRemoveBuffer: Utils.BufferExec constructor( protected type: IndexableEntityType, @@ -83,22 +83,32 @@ export abstract class BaseIndexerEntityFetcher< checkComplete: async (): Promise => true, }) - this.checkPendingRetriesJob = new JobRunner({ - name: `${type}-indexer-pending-retries`, + this.incomingEntities = new PendingWorkPool({ + id: `${type}-indexer-incoming-entities`, + interval: 0, + chunkSize: 1000, + concurrency: 1, + dal: this.entityRequestIncomingEntityDAL, + handleWork: this.handleIncomingEntities.bind(this), + checkComplete: async (): Promise => true, + }) + + this.retryPendingSignaturesJob = new JobRunner({ + name: `${type}-indexer-retry-pending-signatures`, interval: 1000 * 60 * 10, - intervalFn: this.handlePendingRetries.bind(this), + intervalFn: this.retryPendingSignatures.bind(this), }) this.checkCompletionJob = new DebouncedJob( this.checkAllRequestCompletion.bind(this), ) - this.toRetryBuffer = new BufferExec( + this.pendingRequestToRetryBuffer = new BufferExec( this.handleRetryPendingEntities.bind(this), 1000, ) - this.toRemoveBuffer = new BufferExec( + this.pendingRequestToRemoveBuffer = new BufferExec( this.handleRemovePendingTransactions.bind(this), 1000, ) @@ -106,12 +116,12 @@ export abstract class BaseIndexerEntityFetcher< async start(): Promise { await this.incomingEntities.start() - this.checkPendingRetriesJob.start().catch(() => 'ignore') + this.retryPendingSignaturesJob.start().catch(() => 'ignore') } async stop(): Promise { await this.incomingEntities.stop() - this.checkPendingRetriesJob.stop().catch(() => 'ignore') + this.retryPendingSignaturesJob.stop().catch(() => 'ignore') } async fetchEntitiesById(params: IdRange): Promise { @@ -189,23 +199,20 @@ export abstract class BaseIndexerEntityFetcher< async isRequestComplete(nonce: number): Promise { const request = await this.entityRequestDAL.get(nonce.toString()) - if (!request) throw new Error(`Request with nonce ${nonce} does not exists`) + if (!request) throw new Error(`Request with nonce ${nonce} does not exist`) return !!request.complete } async awaitRequestComplete(nonce: number): Promise { - const request = await this.entityRequestDAL.get(nonce.toString()) - if (!request) throw new Error(`Request with nonce ${nonce} does not exists`) - - if (!request.complete) { + if (!await this.isRequestComplete(nonce)) { await this.getFuture(nonce).promise } } async getResponse(nonce: number): Promise> { const request = await this.entityRequestDAL.get(nonce.toString()) - if (!request) throw new Error(`Request with nonce ${nonce} does not exists`) + if (!request) throw new Error(`Request with nonce ${nonce} does not exist`) if (!request.complete) { await this.getFuture(nonce).promise @@ -223,7 +230,11 @@ export abstract class BaseIndexerEntityFetcher< remove: async () => { this.log('----> REMOVE REQ 🎈', request.nonce) await this.entityRequestDAL.remove(request) - // @todo: Update nonceIndexes / Remove from entityRequestResponseDAL + // remove nonce from nonceIndex and update response + for await (const item of response) { + delete item.nonceIndexes[request.nonce] + await this.entityRequestResponseDAL.save(item) + } }, } } @@ -484,17 +495,7 @@ export abstract class BaseIndexerEntityFetcher< this.resolveFuture(nonce) } - protected async checkAllPendingSignatures(): Promise { - const requests = await this.entityRequestDAL.getAllValues() - - for await (const request of requests) { - await this.checkPendingSignatures(request, false) - } - - await this.drainPendingSignaturesBuffer() - } - - protected async checkPendingSignatures( + protected async checkPendingSignature( request: EntityRequest, drain = true, ): Promise { @@ -520,8 +521,8 @@ export abstract class BaseIndexerEntityFetcher< const hasResponse = !!tx && 'parsed' in tx && tx.nonceIndexes[nonce] >= 0 hasResponse - ? await this.toRemoveBuffer.add(pending) - : await this.toRetryBuffer.add(pending) + ? await this.pendingRequestToRemoveBuffer.add(pending) + : await this.pendingRequestToRetryBuffer.add(pending) } if (drain) { @@ -530,12 +531,16 @@ export abstract class BaseIndexerEntityFetcher< } protected async drainPendingSignaturesBuffer(): Promise { - await this.toRemoveBuffer.drain() - await this.toRetryBuffer.drain() + await this.pendingRequestToRemoveBuffer.drain() + await this.pendingRequestToRetryBuffer.drain() } - protected async handlePendingRetries(): Promise { - await this.checkAllPendingSignatures() + protected async retryPendingSignatures(): Promise { + const requests = await this.entityRequestDAL.getAllValues() + for await (const request of requests) { + await this.checkPendingSignature(request, false) + } + await this.drainPendingSignaturesBuffer() await this.checkCompletionJob.run() } From e4fee6731264614031292c39f62871369f20ac8f Mon Sep 17 00:00:00 2001 From: Mike Hukiewitz Date: Thu, 23 Feb 2023 13:26:49 +0100 Subject: [PATCH 2/8] Remove unneeded import --- .../src/services/indexer/src/dal/entityRequestResponse.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/packages/framework/src/services/indexer/src/dal/entityRequestResponse.ts b/packages/framework/src/services/indexer/src/dal/entityRequestResponse.ts index 81df4f17..543da081 100644 --- a/packages/framework/src/services/indexer/src/dal/entityRequestResponse.ts +++ b/packages/framework/src/services/indexer/src/dal/entityRequestResponse.ts @@ -1,6 +1,5 @@ import { EntityStorage, EntityUpdateOp } from '@aleph-indexer/core' import { IndexableEntityType, ParsedEntity } from '../../../../types.js' -import * as console from 'console' export type EntitySignatureResponse = { id: string From 56259f122d69f7e1be4f6b8ea8a191cea2f2dc44 Mon Sep 17 00:00:00 2001 From: Mike Hukiewitz Date: Thu, 23 Feb 2023 13:38:08 +0100 Subject: [PATCH 3/8] batch write to DB --- packages/framework/src/services/indexer/src/entityFetcher.ts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/packages/framework/src/services/indexer/src/entityFetcher.ts b/packages/framework/src/services/indexer/src/entityFetcher.ts index 5ec573c9..702d93f8 100644 --- a/packages/framework/src/services/indexer/src/entityFetcher.ts +++ b/packages/framework/src/services/indexer/src/entityFetcher.ts @@ -231,10 +231,12 @@ export abstract class BaseIndexerEntityFetcher< this.log('----> REMOVE REQ 🎈', request.nonce) await this.entityRequestDAL.remove(request) // remove nonce from nonceIndex and update response + const items = [] for await (const item of response) { delete item.nonceIndexes[request.nonce] - await this.entityRequestResponseDAL.save(item) + items.push(item) } + await this.entityRequestResponseDAL.save(items) }, } } From a8f918a6e6898b8f44511ecac44377594ca53124 Mon Sep 17 00:00:00 2001 From: Mike Hukiewitz Date: Thu, 23 Feb 2023 13:44:48 +0100 Subject: [PATCH 4/8] remove duplicate PendingWorkPool initialization --- .../src/services/indexer/src/entityFetcher.ts | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/packages/framework/src/services/indexer/src/entityFetcher.ts b/packages/framework/src/services/indexer/src/entityFetcher.ts index 702d93f8..6b7a5a90 100644 --- a/packages/framework/src/services/indexer/src/entityFetcher.ts +++ b/packages/framework/src/services/indexer/src/entityFetcher.ts @@ -83,16 +83,6 @@ export abstract class BaseIndexerEntityFetcher< checkComplete: async (): Promise => true, }) - this.incomingEntities = new PendingWorkPool({ - id: `${type}-indexer-incoming-entities`, - interval: 0, - chunkSize: 1000, - concurrency: 1, - dal: this.entityRequestIncomingEntityDAL, - handleWork: this.handleIncomingEntities.bind(this), - checkComplete: async (): Promise => true, - }) - this.retryPendingSignaturesJob = new JobRunner({ name: `${type}-indexer-retry-pending-signatures`, interval: 1000 * 60 * 10, From e99483c22b287052afc19c05e4532ce9c523312c Mon Sep 17 00:00:00 2001 From: Mike Hukiewitz Date: Thu, 23 Feb 2023 15:14:02 +0100 Subject: [PATCH 5/8] only delete response entities that have already had a parsed transaction in them --- .../indexer/src/dal/entityRequestResponse.ts | 20 +++++++------------ 1 file changed, 7 insertions(+), 13 deletions(-) diff --git a/packages/framework/src/services/indexer/src/dal/entityRequestResponse.ts b/packages/framework/src/services/indexer/src/dal/entityRequestResponse.ts index 543da081..8bb83a2f 100644 --- a/packages/framework/src/services/indexer/src/dal/entityRequestResponse.ts +++ b/packages/framework/src/services/indexer/src/dal/entityRequestResponse.ts @@ -56,24 +56,18 @@ export function createEntityRequestResponseDAL>( newEntity: EntityRequestResponse, ): Promise { if (oldEntity) { - if (!newEntity.nonceIndexes) { - return EntityUpdateOp.Delete - } - const nonceIndexes = { - ...oldEntity.nonceIndexes, - ...newEntity.nonceIndexes, - } - if (!('parsed' in newEntity) && 'parsed' in oldEntity) { Object.assign(newEntity, oldEntity) } - newEntity.nonceIndexes = nonceIndexes + if (('parsed' in newEntity) && !newEntity.nonceIndexes) { + return EntityUpdateOp.Delete + } - // console.log( - // 'updated entity [entity_request_responses]', - // newEntity.timestampIndexes.length, - // ) + newEntity.nonceIndexes = { + ...oldEntity.nonceIndexes, + ...newEntity.nonceIndexes, + } } return EntityUpdateOp.Update From 9203036c927c8900fae7164b628187e934551aff Mon Sep 17 00:00:00 2001 From: Mike Hukiewitz Date: Thu, 23 Feb 2023 17:18:18 +0100 Subject: [PATCH 6/8] fix premature deletion of RequestResponses --- .../indexer/src/dal/entityRequestResponse.ts | 5 +++- .../src/services/indexer/src/entityFetcher.ts | 25 +++++++++---------- 2 files changed, 16 insertions(+), 14 deletions(-) diff --git a/packages/framework/src/services/indexer/src/dal/entityRequestResponse.ts b/packages/framework/src/services/indexer/src/dal/entityRequestResponse.ts index 8bb83a2f..ed6b2bd6 100644 --- a/packages/framework/src/services/indexer/src/dal/entityRequestResponse.ts +++ b/packages/framework/src/services/indexer/src/dal/entityRequestResponse.ts @@ -60,7 +60,10 @@ export function createEntityRequestResponseDAL>( Object.assign(newEntity, oldEntity) } - if (('parsed' in newEntity) && !newEntity.nonceIndexes) { + if ( + ('parsed' in newEntity) + && !newEntity.nonceIndexes + ) { return EntityUpdateOp.Delete } diff --git a/packages/framework/src/services/indexer/src/entityFetcher.ts b/packages/framework/src/services/indexer/src/entityFetcher.ts index 6b7a5a90..f9ec3b4e 100644 --- a/packages/framework/src/services/indexer/src/entityFetcher.ts +++ b/packages/framework/src/services/indexer/src/entityFetcher.ts @@ -256,10 +256,9 @@ export abstract class BaseIndexerEntityFetcher< const requestsNonces = [] let lastFilteredTxs = 0 - let filteredTxs: T[] = [] + let filteredTxs: EntityRequestResponse[] = [] let remainingTxs: T[] = chunk let requestCount = 0 - const requestCountId = [] for await (const request of requests) { const { nonce, complete } = request @@ -270,10 +269,13 @@ export abstract class BaseIndexerEntityFetcher< request, ) - filteredTxs = filteredTxs.concat(result.filteredEntities) + const requestResponses = result.filteredEntities as EntityRequestResponse[] + for (const responses of requestResponses) { + (responses as EntityRequestResponse).nonceIndexes = { [nonce]: 0 } + } + filteredTxs = filteredTxs.concat(requestResponses) remainingTxs = result.remainingEntities requestCount++ - requestCountId.push(nonce) lastFilteredTxs = filteredTxs.length - lastFilteredTxs requestsNonces.push([nonce, lastFilteredTxs]) @@ -285,14 +287,11 @@ export abstract class BaseIndexerEntityFetcher< if (filteredTxs.length === 0) return - const requestResponse = - filteredTxs as unknown as EntityRequestResponse[] - const pendingIds = filteredTxs as unknown as EntityRequestPendingEntity[] - this.log(`Removing pendingIds`, pendingIds.map((p) => p.id).join('\n')) + //this.log(`Removing pendingIds`, pendingIds.map((p) => p.id).join('\n')) - await this.entityRequestResponseDAL.save(requestResponse) + await this.entityRequestResponseDAL.save(filteredTxs) await this.entityRequestPendingEntityDAL.remove(pendingIds) this.checkCompletionJob.run().catch(() => 'ignore') @@ -350,7 +349,7 @@ export abstract class BaseIndexerEntityFetcher< const future = this.getFuture(nonce) let count = 0 - // @note: Sometimes we receive the responses before inserting the pendings signatures on + // @note: Sometimes we receive the responses before inserting the pending signatures on // the db, the purpose of this mutex is to avoid this const now1 = Date.now() const release = await this.requestMutex.acquire() @@ -400,7 +399,7 @@ export abstract class BaseIndexerEntityFetcher< const elapsed2 = Date.now() - now2 this.log(`onRequest time => ${elapsed1 / 1000} | ${elapsed2 / 1000}`) - this.log(`🟡 Request ${nonce} inited`) + this.log(`🟡 Request ${nonce} initialized with ${count} entities`) if (!count) { this.log(`🟢 Request ${nonce} complete`) @@ -506,7 +505,7 @@ export abstract class BaseIndexerEntityFetcher< `[Retry] Check ${tx?.id}`, !!tx, tx && 'parsed' in (tx || {}), - tx && tx.nonceIndexes[nonce] >= 0, + tx?.nonceIndexes[nonce], request.nonce, ) @@ -561,7 +560,7 @@ export abstract class BaseIndexerEntityFetcher< ): Promise { const ids = pendings.map(({ id }) => id) - this.log(`Retrying ${ids.length} ${this.blockchainId} ids`, ids) + this.log(`Retrying ${ids.length} ${this.blockchainId} ids`) return this.fetcherMsClient .useBlockchain(this.blockchainId as Blockchain) From 3dd431b35a5bbb2a4db791dac8e44d66702c5b27 Mon Sep 17 00:00:00 2001 From: Mike Hukiewitz Date: Fri, 24 Feb 2023 10:34:59 +0100 Subject: [PATCH 7/8] do not overwrite nonceIndexes of existing request nonces --- .../src/services/indexer/src/dal/entityRequestResponse.ts | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/packages/framework/src/services/indexer/src/dal/entityRequestResponse.ts b/packages/framework/src/services/indexer/src/dal/entityRequestResponse.ts index ed6b2bd6..2cf236bd 100644 --- a/packages/framework/src/services/indexer/src/dal/entityRequestResponse.ts +++ b/packages/framework/src/services/indexer/src/dal/entityRequestResponse.ts @@ -67,9 +67,14 @@ export function createEntityRequestResponseDAL>( return EntityUpdateOp.Delete } + // @note: This is a hack to make sure that the nonce indexes are + // not overwritten by the new entity. This is usually the case when + // the entity contains the actual transaction data, at which point we + // do not have the actual nonce indexes, but still need to pass in a + // nonce index object to the entity storage. newEntity.nonceIndexes = { - ...oldEntity.nonceIndexes, ...newEntity.nonceIndexes, + ...oldEntity.nonceIndexes, } } From 05d1a241937e6d8f4dec9d88bbe035e081086140 Mon Sep 17 00:00:00 2001 From: Mike Hukiewitz Date: Fri, 24 Feb 2023 10:41:10 +0100 Subject: [PATCH 8/8] revert unneeded changes to more clarity of PR --- .../src/services/indexer/src/entityFetcher.ts | 48 +++++++++++-------- 1 file changed, 27 insertions(+), 21 deletions(-) diff --git a/packages/framework/src/services/indexer/src/entityFetcher.ts b/packages/framework/src/services/indexer/src/entityFetcher.ts index f9ec3b4e..bbcbd79c 100644 --- a/packages/framework/src/services/indexer/src/entityFetcher.ts +++ b/packages/framework/src/services/indexer/src/entityFetcher.ts @@ -54,14 +54,14 @@ export interface TransactionResponse { export abstract class BaseIndexerEntityFetcher< T extends ParsedEntity, > { - protected retryPendingSignaturesJob!: Utils.JobRunner + protected checkPendingRetriesJob!: Utils.JobRunner protected checkCompletionJob!: Utils.DebouncedJob protected requestFutures: Record> = {} protected requestMutex = new Mutex() protected events: EventEmitter = new EventEmitter() protected incomingEntities: PendingWorkPool - protected pendingRequestToRetryBuffer: Utils.BufferExec - protected pendingRequestToRemoveBuffer: Utils.BufferExec + protected toRetryBuffer: Utils.BufferExec + protected toRemoveBuffer: Utils.BufferExec constructor( protected type: IndexableEntityType, @@ -83,22 +83,22 @@ export abstract class BaseIndexerEntityFetcher< checkComplete: async (): Promise => true, }) - this.retryPendingSignaturesJob = new JobRunner({ - name: `${type}-indexer-retry-pending-signatures`, + this.checkPendingRetriesJob = new JobRunner({ + name: `${type}-indexer-pending-retries`, interval: 1000 * 60 * 10, - intervalFn: this.retryPendingSignatures.bind(this), + intervalFn: this.handlePendingRetries.bind(this), }) this.checkCompletionJob = new DebouncedJob( this.checkAllRequestCompletion.bind(this), ) - this.pendingRequestToRetryBuffer = new BufferExec( + this.toRetryBuffer = new BufferExec( this.handleRetryPendingEntities.bind(this), 1000, ) - this.pendingRequestToRemoveBuffer = new BufferExec( + this.toRemoveBuffer = new BufferExec( this.handleRemovePendingTransactions.bind(this), 1000, ) @@ -106,12 +106,12 @@ export abstract class BaseIndexerEntityFetcher< async start(): Promise { await this.incomingEntities.start() - this.retryPendingSignaturesJob.start().catch(() => 'ignore') + this.checkPendingRetriesJob.start().catch(() => 'ignore') } async stop(): Promise { await this.incomingEntities.stop() - this.retryPendingSignaturesJob.stop().catch(() => 'ignore') + this.checkPendingRetriesJob.stop().catch(() => 'ignore') } async fetchEntitiesById(params: IdRange): Promise { @@ -486,7 +486,17 @@ export abstract class BaseIndexerEntityFetcher< this.resolveFuture(nonce) } - protected async checkPendingSignature( + protected async checkAllPendingSignatures(): Promise { + const requests = await this.entityRequestDAL.getAllValues() + + for await (const request of requests) { + await this.checkPendingSignatures(request, false) + } + + await this.drainPendingSignaturesBuffer() + } + + protected async checkPendingSignatures( request: EntityRequest, drain = true, ): Promise { @@ -512,8 +522,8 @@ export abstract class BaseIndexerEntityFetcher< const hasResponse = !!tx && 'parsed' in tx && tx.nonceIndexes[nonce] >= 0 hasResponse - ? await this.pendingRequestToRemoveBuffer.add(pending) - : await this.pendingRequestToRetryBuffer.add(pending) + ? await this.toRemoveBuffer.add(pending) + : await this.toRetryBuffer.add(pending) } if (drain) { @@ -522,16 +532,12 @@ export abstract class BaseIndexerEntityFetcher< } protected async drainPendingSignaturesBuffer(): Promise { - await this.pendingRequestToRemoveBuffer.drain() - await this.pendingRequestToRetryBuffer.drain() + await this.toRemoveBuffer.drain() + await this.toRetryBuffer.drain() } - protected async retryPendingSignatures(): Promise { - const requests = await this.entityRequestDAL.getAllValues() - for await (const request of requests) { - await this.checkPendingSignature(request, false) - } - await this.drainPendingSignaturesBuffer() + protected async handlePendingRetries(): Promise { + await this.checkAllPendingSignatures() await this.checkCompletionJob.run() }