Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/indexer backpressure #36

Merged
merged 4 commits into from
Apr 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 23 additions & 7 deletions packages/core/src/utils/concurrence/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ import { sleep } from '../time.js'
/**
* It keeps a concurrent fixed size buffer of pending promises.
* When some of them finish, it takes another one from the provided iterator
* @param {Iterator} iterator - A iterator function that returns promises
* @param {number} concurrency - The max number of concurrent pending promises
* @param it - A iterator function that returns promises
* @param concurrency - The max number of concurrent pending promises
*/
export async function concurrentPromises<T>(
it: Iterator<Promise<unknown>, T>,
concurrency = 20,
concurrency: number = 20,
): Promise<T> {
let done
let lastValue!: T
Expand Down Expand Up @@ -52,7 +52,7 @@ export async function concurrentPromises<T>(
* In other frameworks they usually call it "Deferred" too.
*
* Example:
*
* ```ts
* function sleep(ms) {
* const future = new Future()
* setTimeout(() => future.resolve(), ms)
Expand All @@ -62,6 +62,7 @@ export async function concurrentPromises<T>(
* async function main() {
* await sleep(1000)
* }
* ```
*/
export class Future<T> {
public resolve!: (value: T | PromiseLike<T>) => void
Expand All @@ -81,7 +82,7 @@ export class Future<T> {
* to some region of the code
*
* Example:
*
* ```ts
* // Waits for the lock to be free and returns the releaser function
* const release = await mutex.acquire()
*
Expand All @@ -92,6 +93,7 @@ export class Future<T> {
* // Ensures that the lock is always released, even if there are errors
* release()
* }
* ```
*/
export class Mutex {
protected queue = Promise.resolve()
Expand All @@ -114,8 +116,22 @@ export class Mutex {
}

/**
* An util for retaining an unique snapshot of data while
* the previous snapshot is being processed
* An util for retaining a unique snapshot of data while
* the previous snapshot is being processed.
*
* Example:
* ```ts
* const job = new DebouncedJob<string>(async (data) => {
* // Do something with the data
* console.log(data)
* return data
* }, 1000)
*
* job.run('foo')
* ```
*
* @param callback - The callback function that will be called with the data
* @param throttle - The minimum time between calls
*/
export class DebouncedJob<T = void, R = unknown> {
protected pendingData: T | undefined
Expand Down
37 changes: 37 additions & 0 deletions packages/core/src/utils/concurrence/jobRunner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,23 @@ export interface JobRunnerOptions {
jitter?: number
}

/**
* Runs an `intervalFn` at a given `interval`. The `intervalFn` can return a
* number to change the interval for the next run. `intervalMax` can be used to
* limit the maximum interval time.
*
* The `intervalFn` can also return `JobRunnerReturnCode.Reset == -1` to reset
* the interval to the initial value, or `JobRunnerReturnCode.Stop == -2` to
* stop the runner.
*
* You can configure the `times` an interval is run, and also set a `jitter` to
* randomize the interval. This is useful to avoid stampedes.
*
* Set `startAfter` to delay the first run. `intervalInit` can be used to set a
* different interval for after the first run. `intervalAccuracy` can be used to
* make sure the `intervalFn` is called at the exact interval time, otherwise
* it will be called again after execution time + interval.
*/
export class JobRunner {
private _events = new EventEmitter()
private _isRunning = false
Expand Down Expand Up @@ -84,6 +101,13 @@ export class JobRunner {
}
}

/**
* Registers an event handler. Possible events are:
* - `beforeSleep`: Called before the runner sleeps until the next interval.
* - `firstRun`: Called after the first run.
* @param event
* @param handler
*/
on(
event: 'beforeSleep' | 'firstRun',
handler: (...args: any[]) => void | Promise<void>,
Expand All @@ -98,10 +122,17 @@ export class JobRunner {
}
}

/**
* Starts the runner.
*/
async start(): Promise<void> {
return this.run()
}

/**
* The runner function. Returns a promise that resolves when the runner is
* finished.
*/
async run(): Promise<void> {
if (this._isRunning) return
this._isRunning = true
Expand Down Expand Up @@ -202,11 +233,17 @@ export class JobRunner {
p.resolve()
}

/**
* Stops the runner.
*/
stop(): Promise<void> {
this._isRunning = false
return this.hasFinished()
}

/**
* Returns a promise that resolves when the runner is finished.
*/
hasFinished(): Promise<void> {
return this._finished.promise
}
Expand Down
49 changes: 48 additions & 1 deletion packages/core/src/utils/concurrence/pendingWork.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,35 @@ import { JobRunner } from './jobRunner.js'

export * from './dal/pendingWork.js'

/**
* A pending work item
*/
export type PendingWork<T> = {
id: string
time: number
payload: T
}

/**
* Options for the pending work pool
* @template T The type of the payload
* @property id - The id of the pool
* @property dal - The data access layer
* @property interval - The interval to run the pool
* @property concurrency - The number of concurrent jobs to run
* @property maxQueueSize - The max number of works to queue
* @property chunkSize - The number of works to fetch at once
* @property handleWork - The function to handle the works
* @property checkComplete - The function to check if a work is complete
* @property preCheckComplete - Whether to check if a work is complete before
* handling it
*/
export interface PendingWorkOptions<T> {
id: string
dal: PendingWorkDAL<T>
interval: number
concurrency: number
maxQueueSize?: number
chunkSize: number
handleWork: (
works: PendingWork<T>[],
Expand All @@ -25,12 +43,31 @@ export interface PendingWorkOptions<T> {
preCheckComplete?: boolean
}

/**
* Error thrown when the pending work queue is full
*/
export class QueueFullError extends Error {
constructor(
protected pendingWork: PendingWorkPool<any>,
) {
super(`Queue (max size: ${pendingWork.options.maxQueueSize}) is full for ${pendingWork.options.id}`)
}
}

/**
* A pool of pending works. It will run the works in the pool at a given interval
* or when new works arrive. It will also check if the works are complete before
* running them. If they are, it will remove them from the pool.
* @note If the interval is 0, it will only run when new works arrive.
* @template T The type of the payload
* @property options The options of the pool
*/
export class PendingWorkPool<T> {
protected skipSleep = false
protected debouncedJob: DebouncedJobRunner | undefined
protected coordinatorJob: JobRunner | undefined

constructor(protected options: PendingWorkOptions<T>) {
constructor(public readonly options: PendingWorkOptions<T>) {
const name = `${this.options.id} 🔄`

// @note: If interval is 0, run it only when new items arrive
Expand Down Expand Up @@ -67,9 +104,19 @@ export class PendingWorkPool<T> {
return this.coordinatorJob && this.coordinatorJob.stop()
}

/**
* Add work to the pool
* @param work The work payload to add
* @throws QueueFullError if the queue is full
*/
async addWork(work: PendingWork<T> | PendingWork<T>[]): Promise<void> {
work = Array.isArray(work) ? work : [work]
if (!work.length) return
if (this.options.maxQueueSize) {
const count = await this.getCount()
if (count + work.length > this.options.maxQueueSize)
throw new QueueFullError(this)
}

await this.options.dal.save(work)

Expand Down
41 changes: 30 additions & 11 deletions packages/framework/src/services/fetcher/src/entityFetcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ import {
EntityStorage,
PendingWork,
PendingWorkPool,
Utils,
QueueFullError,
Utils
} from '@aleph-indexer/core'
import {
FetchEntitiesByIdRequestArgs,
Expand Down Expand Up @@ -33,13 +34,15 @@ export abstract class BaseEntityFetcher<RE extends RawEntity> {

/**
* Initialize the fetcher service.
* @param type
* @param blockchainId The blockchain
* @param broker The moleculer broker to assign to the service.
* @param pendingEntityDAL The pending entities' storage.
* @param pendingEntityCacheDAL
* @param pendingEntityFetchDAL
* @param pendingEntityCacheDAL The pending entity cache storage.
* @param pendingEntityFetchDAL The pending entity
* @param entityCacheDAL The raw entities' storage.
*/
constructor(
protected constructor(
protected type: IndexableEntityType,
protected blockchainId: Blockchain,
protected broker: ServiceBroker,
Expand Down Expand Up @@ -73,6 +76,7 @@ export abstract class BaseEntityFetcher<RE extends RawEntity> {
id: `${blockchainId}:pending-${type}`,
interval: 0,
chunkSize: 1000,
maxQueueSize: 10000,
concurrency: 1,
dal: this.pendingEntityDAL,
handleWork: this.handlePendingEntities.bind(this),
Expand Down Expand Up @@ -106,21 +110,35 @@ export abstract class BaseEntityFetcher<RE extends RawEntity> {
/**
* Fetch entities from an account by ids.
* @param args Entity ids.
* @throws QueueFullError if fetcher queue is full
*/
async fetchEntitiesById(args: FetchEntitiesByIdRequestArgs): Promise<void> {
const { ids, indexerId } = args

this.log(
`🔗 ${ids.length} new ids added to the ${this.type} fetcher queue... [${indexerId}]`,
)

const entities = ids.filter(this.filterEntityId.bind(this)).map((id) => ({
id,
time: Date.now(),
payload: indexerId ? [indexerId] : [],
}))

await this.pendingEntities.addWork(entities)
.catch((e: Error) => {
if (e.constructor == QueueFullError) {
return Promise.reject(e)
} else {
throw e
}
})
.then(() => {
this.log(
`🔗 ${ids.length} new ids added to the ${this.type} fetcher queue... [${indexerId}]`,
)
},
() => {
this.log(
`⏯ ${ids.length} new ids waiting to be added to the ${this.type} fetcher queue... [${indexerId}]`,
)
})
}

/**
Expand Down Expand Up @@ -302,7 +320,7 @@ export abstract class BaseEntityFetcher<RE extends RawEntity> {

/**
* Returns the fetch status of certain txn signatures.
* @param signatures The txn signatures to get its state.
* @param args The txn ids to check
*/
async getEntityState(args: CheckEntityRequestArgs): Promise<EntityState[]> {
const { ids } = args
Expand Down Expand Up @@ -356,8 +374,9 @@ export abstract class BaseEntityFetcher<RE extends RawEntity> {
}

/**
* Fetch entities from a RPC Node.
* @param works Entity ids with extra properties as time and payload.
* Guard to fetch entities from an RPC Node.
* @param ids Entity ids with extra properties as time and payload.
* @param isRetry Whether this request is a retry
*/
protected abstract remoteFetchIds(
ids: string[],
Expand Down
2 changes: 1 addition & 1 deletion packages/framework/src/services/fetcher/src/fetcherPool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ export interface FetcherPoolOptions<T>

export class FetcherPool<T> extends PendingWorkPool<T> {
protected workFetcher: Record<string, BaseHistoryFetcher<any>> = {}
public options!: FetcherPoolOptions<T> & PendingWorkOptions<T>
public readonly options!: FetcherPoolOptions<T> & PendingWorkOptions<T>

constructor(options: FetcherPoolOptions<T>) {
const { checkComplete, ...rest } = options
Expand Down