Skip to content

Commit

Permalink
feat(batch-queue): init
Browse files Browse the repository at this point in the history
* feat: add base class BatchQueue

* feat: add MemoryChecker

* fix: correct add method logic

* refactor: renamed error to be more descriptive

* fix: trigger methods in BatchQueue

* style: fix trigger lint

* test(BatchQueue): add unit tests for BatchQueue class

* feat: add base BatchQueueModule

* feat: add createCheckOnAdd method to BatchStore

* test: add tests to createCheckOnAdd

* style: batch queue and test batch queue

* chore: remove useless BatchQueueI

* fix: resolve issue BatchQueueModule not working

* fix: MemoryChecker use BatchQueue instead BatchQueueI

* feat: add consumer

* fix: add index.ts for module

* fix: export module by main index.ts

* feat: add producer

* fix: export producer

* test: init integration test

* style: BatchQueueModue providers

* feat: add BatchChecker

* feat: add StateHandler

* refactor: extract Inject tokens for BatchQueue into a separate file

* refactor: inject checker into MemoryChecker

* fix(BatchQueue): add mutexes for each queue to prevent overflow

* feat: export errors from package

* style: lint class BatchQueue

* fix: type QueueName only can be string

* style: lint Mutex

* feat: add BatchQueue integration tests

* fix: integration tests

* fix: delete queue after processing

* feat: add method createCheckOnAdd to Checker

* test: checkOnAdd integration tests

* fix: delete mutex after processing

* feat: add memory check on each defined number of additions

* style: fix lint

* fix: add MemoryCheckerOptions provider

* chore: update yarn

* fix: batch-queue integration tests

* style: add Inject.. before injection decorator

* style: batch queue provide constant to constants

* refactor: extracted provide constants into a separate file

* refactor: batch module register method

* refactor: move providers creation to separate file

* style: snake case memory checker job name

* refactor: readonly for inject checker class

* fix: one check name for on add and func check

* fix: method checkOnAddChecks in batch queue

* fix: batch queue unit test

* refactor: delete memory checker

* test: delete setTimeout from integration tests

* refactor: move mutex file top level

* refactor: moved errors from folder to a sigle file

* style: fix lint issue

* chore: delete not use dependencies

* refactor: dev dependencies fix versions

* refactor: brought out helpers in integration tests

* refactor: moved constants to root directory

* refactor: moved errors to root directory

* refactor: separate types

* refactor: unlock mutex with finally

* fix: start batch timer only if not started before

* docs: add JSDoc to BatchQueue

* docs: add JSDoc to Checker

* docs: add JSDoc to Consumer

* docs: add JSDoc to Producer

* docs: add JSDoc to StateHandler

* fix: increase batch wait time in tests

* feat: create CheckManager

* refactor: use CheckManager in BatchQueue

* refactor: add CheckManager types

* test: add unit tests to CheckManager

* test: rewrite BatchQueue unit tests

* refactor: checker depend on check manager

* refactor: state handler depend on check manager

* refactor: add check manager provider

* refactor: check failed error delete property failedChecks

* refactor: separate checker manager

* refactor: separate check manager types

* refactor: extract proxy clases

* refactor: export proxy clases from package

* refactor: delete handleChangeState method from Checker

* refactor: export check manager from package

* refactor: integration tests

* Merge branch 'master' into feat/package-batch-queue

* docs: add JSDoc to CheckManager

* docs: add JSDoc to Checker

* docs: add JSDoc to StateHandler
  • Loading branch information
OsirisAnubiz authored Oct 11, 2024
1 parent 512a526 commit 6ecbffa
Show file tree
Hide file tree
Showing 44 changed files with 2,383 additions and 562 deletions.
5 changes: 1 addition & 4 deletions .config/husky/commit-msg
Original file line number Diff line number Diff line change
@@ -1,4 +1 @@
#!/bin/sh
. "$(dirname "$0")/_/husky.sh"

yarn commit message lint
yarn commit message lint
5 changes: 1 addition & 4 deletions .config/husky/pre-commit
Original file line number Diff line number Diff line change
@@ -1,4 +1 @@
#!/bin/sh
. "$(dirname "$0")/_/husky.sh"

yarn commit staged
yarn commit staged
5 changes: 1 addition & 4 deletions .config/husky/prepare-commit-msg
Original file line number Diff line number Diff line change
@@ -1,4 +1 @@
#!/bin/sh
. "$(dirname "$0")/_/husky.sh"

yarn commit message $@
yarn commit message $@
439 changes: 439 additions & 0 deletions .pnp.cjs

Large diffs are not rendered by default.

1,022 changes: 487 additions & 535 deletions .yarn/releases/yarn.cjs

Large diffs are not rendered by default.

298 changes: 298 additions & 0 deletions packages/nestjs-batch-queue/integration/test/batch-queue.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,298 @@
import type { INestApplication } from '@nestjs/common'
import type { ChannelWrapper } from 'amqp-connection-manager'
import type { Channel } from 'amqplib'
import type { StartedTestContainer } from 'testcontainers'

import type { Producer } from '../../src/index.js'
import type { Consumer } from '../../src/index.js'
import type { Checker } from '../../src/index.js'
import type { StateHandler } from '../../src/index.js'
import type { ChangeStateCallback } from '../../src/index.js'

import { Test } from '@nestjs/testing'
import { describe } from '@jest/globals'
import { it } from '@jest/globals'
import { expect } from '@jest/globals'
import { beforeAll } from '@jest/globals'
import { afterAll } from '@jest/globals'
import { beforeEach } from '@jest/globals'
import { jest } from '@jest/globals'
import { GenericContainer } from 'testcontainers'
import { Wait } from 'testcontainers'
import amqp from 'amqp-connection-manager'

import { BatchQueueModule } from '../../src/index.js'
import { BATCH_QUEUE_CONSUMER } from '../../src/index.js'
import { BATCH_QUEUE_PRODUCER } from '../../src/index.js'
import { BATCH_QUEUE_CHECKER } from '../../src/index.js'
import { BATCH_QUEUE_STATE_HANDLER } from '../../src/index.js'
import { BaseQueueError } from '../../src/index.js'
import { waitForConsumeCount } from './helpers/index.js'

describe('external renderer', () => {
let app: INestApplication
let rabbitmq: StartedTestContainer
let channelWrapper: ChannelWrapper
let consumeBatchs: Array<[string, Array<string>]> = []
let consumeFn: (queueName: string, value: Array<string>) => Promise<void>
let succesProduceCount = 0

beforeAll(async () => {
rabbitmq = await new GenericContainer('rabbitmq:3-alpine')
.withWaitStrategy(Wait.forLogMessage('Starting broker'))
.withExposedPorts(5672)
.start()

const testingModule = await Test.createTestingModule({
imports: [
BatchQueueModule.registerAsync({
imports: [],
useFactory: () => ({
core: {
maxQueueLength: 10_000,
maxTotalQueueLength: 100_000,
maxQueues: 20,
timeoutDuration: 2_000,
},
}),
inject: [],
}),
],
}).compile()

const connection = amqp.connect([
`amqp://${rabbitmq.getHost()}:${rabbitmq.getMappedPort(5672)}`,
])
channelWrapper = connection.createChannel({
json: false,
confirm: false,
setup: async (channel: Channel) => {
await channel.assertQueue('test-queue', {
durable: true,
})
},
})

channelWrapper.consume('test-queue', (msg) => {
;(async (): Promise<void> => {
const producer: Producer<any> = app.get(BATCH_QUEUE_PRODUCER)
const parsed: { queueName: string; value: any } = JSON.parse(msg.content.toString())
try {
await producer.produce(parsed.queueName, parsed.value)
succesProduceCount += 1
channelWrapper.ack(msg)
} catch (e) {
if (e instanceof BaseQueueError) {
channelWrapper.nack(msg)
}
}
})()
})

await channelWrapper.waitForConnect()

app = testingModule.createNestApplication()
await app.init()

const batchConsumer: Consumer = app.get(BATCH_QUEUE_CONSUMER)
consumeFn = async (queueName: string, value: Array<string>): Promise<void> => {
consumeBatchs.push([queueName, value])
}
batchConsumer.consume(consumeFn)
})

afterAll(async () => {
await app.close()
await rabbitmq.stop()
})

beforeEach(async () => {
await channelWrapper.purgeQueue('test-queue')
consumeBatchs = []
succesProduceCount = 0
})

it('base test', async () => {
await channelWrapper.sendToQueue(
'test-queue',
Buffer.from(JSON.stringify({ queueName: 'batch-queue', value: 'test-0-0' }))
)
await waitForConsumeCount(1, consumeBatchs)
expect(consumeBatchs.length).toBe(1)
const result = consumeBatchs.pop()!
expect(result[0]).toBe('batch-queue')
expect(result[1]).toEqual(['test-0-0'])
})

it('fill 90% queue', async () => {
const messages = []
for (let i = 0; i < 9_000; i += 1) {
messages.push(
channelWrapper.sendToQueue(
'test-queue',
Buffer.from(JSON.stringify({ queueName: 'batch-queue', value: `test-1-${i}` }))
)
)
}
await Promise.all(messages)
await waitForConsumeCount(1, consumeBatchs)
expect(consumeBatchs.length).toBe(1)
const result = consumeBatchs.pop()!
expect(result[0]).toBe('batch-queue')
const expectMessages = []
for (let i = 0; i < 9_000; i += 1) {
expectMessages.push(`test-1-${i}`)
}
expect(result[1]).toEqual(expectMessages)
})

it('fullfill queue', async () => {
const messages = []
for (let i = 0; i < 10_000; i += 1) {
messages.push(
channelWrapper.sendToQueue(
'test-queue',
Buffer.from(JSON.stringify({ queueName: 'batch-queue', value: `test-2-${i}` }))
)
)
}
await Promise.all(messages)
await waitForConsumeCount(1, consumeBatchs)
expect(consumeBatchs.length).toBe(1)
const result = consumeBatchs.pop()!
expect(result[0]).toBe('batch-queue')
const expectMessages = []
for (let i = 0; i < 10_000; i += 1) {
expectMessages.push(`test-2-${i}`)
}
expect(result[1]).toEqual(expectMessages)
})

it('handle multiple queues', async () => {
const messages: Array<Promise<any>> = []
const queues: Array<string> = ['queue-one', 'queue-two', 'queue-three']
const expectedResults: Record<string, Array<string>> = {
'queue-one': [],
'queue-two': [],
'queue-three': [],
}

for (let i = 0; i < 3_000; i += 1) {
// eslint-disable-next-line no-loop-func
queues.forEach((queue: string) => {
messages.push(
channelWrapper.sendToQueue(
'test-queue',
Buffer.from(JSON.stringify({ queueName: queue, value: `test-3-${i}` }))
)
)
expectedResults[queue].push(`test-3-${i}`)
})
}

await Promise.all(messages)

await waitForConsumeCount(3, consumeBatchs)
expect(consumeBatchs.length).toBe(3)
consumeBatchs.forEach((result) => {
expect(result[1].length).toBe(3_000)
expect(result[1]).toEqual(expectedResults[result[0]])
})
})

it('fulfill a single queue with total of 12,000 messages, receiving batches of 10,000 and 2,000', async () => {
const messages = []

for (let i = 0; i < 12_000; i += 1) {
messages.push(
channelWrapper.sendToQueue(
'test-queue',
Buffer.from(JSON.stringify({ queueName: 'batch-queue', value: `test-4-${i}` }))
)
)
}
await Promise.all(messages)

await waitForConsumeCount(2, consumeBatchs)

expect(consumeBatchs.length).toBe(2)
expect(consumeBatchs[0][1].length).toBe(10_000)
expect(consumeBatchs[1][1].length).toBe(2_000)
})

it('two queues with 12,000 messages each', async () => {
const messages = []

for (let i = 0; i < 24_000; i += 1) {
messages.push(
channelWrapper.sendToQueue(
'test-queue',
Buffer.from(
JSON.stringify({
queueName: i % 2 === 0 ? 'queue-one' : 'queue-two',
value: `test-5-${i}`,
})
)
)
)
}
await Promise.all(messages)

await waitForConsumeCount(4, consumeBatchs)

expect(consumeBatchs.length).toBe(4)
expect(consumeBatchs[0][1].length).toBe(10_000)
expect(consumeBatchs[1][1].length).toBe(10_000)
expect(consumeBatchs[2][1].length).toBe(2_000)
expect(consumeBatchs[3][1].length).toBe(2_000)
})

it('should not consume batches when batch queue is unavailable', async () => {
const checker: Checker = app.get(BATCH_QUEUE_CHECKER)
checker.createCheck('mock-memory-1', false)
const stateHandler: StateHandler = app.get(BATCH_QUEUE_STATE_HANDLER)
const fnChangeState = jest.fn() as ChangeStateCallback
stateHandler.handleChangeState('mock-memory-1', fnChangeState)
const messages = []
for (let i = 0; i < 10_000; i += 1) {
messages.push(
channelWrapper.sendToQueue(
'test-queue',
Buffer.from(JSON.stringify({ queueName: 'batch-queue', value: `test-6-${i}` }))
)
)
}
await Promise.all(messages)
expect(succesProduceCount).toBe(0)
expect(fnChangeState).toBeCalledTimes(0)
await checker.changeState('mock-memory-1', true)
await waitForConsumeCount(1, consumeBatchs)
})

it('should not consume batches when batch queue is unavailable and then recover', async () => {
const checker: Checker = app.get(BATCH_QUEUE_CHECKER)
const stateHandler: StateHandler = app.get(BATCH_QUEUE_STATE_HANDLER)
const fnChangeState = jest.fn() as ChangeStateCallback
checker.createCheck('mock-memory-2', false)
stateHandler.handleChangeState('mock-memory-2', fnChangeState)
const messages = []
for (let i = 0; i < 12_000; i += 1) {
messages.push(
channelWrapper.sendToQueue(
'test-queue',
Buffer.from(JSON.stringify({ queueName: 'batch-queue', value: `test-${i}` }))
)
)
}
expect(succesProduceCount).toBe(0)
await Promise.all(messages)
expect(consumeBatchs.length).toBe(0)
await checker.changeState('mock-memory-2', true)
expect(fnChangeState).toBeCalledTimes(1)
expect(fnChangeState).toBeCalledWith(true)
await waitForConsumeCount(2, consumeBatchs)
expect(consumeBatchs.length).toBe(2)
expect(consumeBatchs[0][1].length).toBe(10_000)
expect(consumeBatchs[1][1].length).toBe(2_000)
})
})
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from './wait-for-consume-count.js'
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
export const waitForConsumeCount = async (
expectedCount: number,
consumeBatchs: Array<any>,
timeout = 5000
): Promise<void> => {
const endTime = Date.now() + timeout
return new Promise((resolve, reject) => {
const interval = setInterval(() => {
if (consumeBatchs.length >= expectedCount) {
clearInterval(interval)
resolve()
} else if (Date.now() > endTime) {
clearInterval(interval)
reject(new Error('Timeout waiting for messages to be processed'))
}
}, 100)
})
}
49 changes: 49 additions & 0 deletions packages/nestjs-batch-queue/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
{
"name": "@atls/nestjs-batch-queue",
"version": "0.0.1",
"license": "BSD-3-Clause",
"type": "module",
"exports": {
"./package.json": "./package.json",
".": "./src/index.ts"
},
"main": "src/index.ts",
"files": [
"dist"
],
"scripts": {
"build": "yarn library build",
"prepack": "yarn run build",
"postpack": "rm -rf dist"
},
"devDependencies": {
"@jest/globals": "29.7.0",
"@nestjs/common": "10.0.5",
"@nestjs/core": "10.0.5",
"@nestjs/testing": "10.4.1",
"@types/amqplib": "0.10.1",
"amqp-connection-manager": "4.1.14",
"amqplib": "0.10.4",
"reflect-metadata": "0.1.13",
"rxjs": "7.8.1",
"testcontainers": "10.10.0"
},
"peerDependencies": {
"@nestjs/common": "^10",
"@nestjs/core": "^10",
"reflect-metadata": "^0.1",
"rxjs": "^7"
},
"publishConfig": {
"exports": {
"./package.json": "./package.json",
".": {
"import": "./dist/index.js",
"types": "./dist/index.d.ts",
"default": "./dist/index.js"
}
},
"main": "dist/index.js",
"typings": "dist/index.d.ts"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
export interface BatchQueueOptions {
maxQueueLength: number
maxTotalQueueLength: number
maxQueues: number
timeoutDuration: number
}
Loading

0 comments on commit 6ecbffa

Please sign in to comment.