Skip to content

Commit

Permalink
test: delete setTimeout from integration tests
Browse files Browse the repository at this point in the history
  • Loading branch information
OsirisAnubiz committed Oct 8, 2024
1 parent 236c771 commit f91e6f6
Showing 1 changed file with 16 additions and 71 deletions.
87 changes: 16 additions & 71 deletions packages/nestjs-batch-queue/integration/test/batch-queue.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ describe('external renderer', () => {
let channelWrapper: ChannelWrapper
let consumeBatchs: Array<[string, Array<string>]> = []
let consumeFn: (queueName: string, value: Array<string>) => Promise<void>
let messageProcessedCount = 0
let succesProduceCount = 0

const waitForProcessedMessages = async (expectedCount: number, timeout = 5000): Promise<void> => {
const waitForConsumeCount = async (expectedCount: number, timeout = 5000): Promise<void> => {
const endTime = Date.now() + timeout
return new Promise((resolve, reject) => {
const interval = setInterval(() => {
if (messageProcessedCount >= expectedCount) {
if (consumeBatchs.length >= expectedCount) {
clearInterval(interval)
resolve()
} else if (Date.now() > endTime) {
Expand Down Expand Up @@ -92,7 +92,7 @@ describe('external renderer', () => {
const parsed: { queueName: string; value: any } = JSON.parse(msg.content.toString())
try {
await producer.produce(parsed.queueName, parsed.value)
messageProcessedCount += 1
succesProduceCount += 1
channelWrapper.ack(msg)
} catch (e) {
if (e instanceof BaseQueueError) {
Expand Down Expand Up @@ -120,22 +120,17 @@ describe('external renderer', () => {
})

beforeEach(async () => {
messageProcessedCount = 0
await channelWrapper.purgeQueue('test-queue')
consumeBatchs = []
succesProduceCount = 0
})

it('base test', async () => {
await channelWrapper.sendToQueue(
'test-queue',
Buffer.from(JSON.stringify({ queueName: 'batch-queue', value: 'test-0-0' }))
)
await waitForProcessedMessages(1)
await new Promise((res) => {
setTimeout(() => {
res(null)
}, 1100)
})
await waitForConsumeCount(1)
expect(consumeBatchs.length).toBe(1)
const result = consumeBatchs.pop()!
expect(result[0]).toBe('batch-queue')
Expand All @@ -153,12 +148,7 @@ describe('external renderer', () => {
)
}
await Promise.all(messages)
await waitForProcessedMessages(9_000)
await new Promise((res) => {
setTimeout(() => {
res(null)
}, 1100)
})
await waitForConsumeCount(1)
expect(consumeBatchs.length).toBe(1)
const result = consumeBatchs.pop()!
expect(result[0]).toBe('batch-queue')
Expand All @@ -180,12 +170,7 @@ describe('external renderer', () => {
)
}
await Promise.all(messages)
await waitForProcessedMessages(10_000)
await new Promise((res) => {
setTimeout(() => {
res(null)
}, 1100)
})
await waitForConsumeCount(1)
expect(consumeBatchs.length).toBe(1)
const result = consumeBatchs.pop()!
expect(result[0]).toBe('batch-queue')
Expand Down Expand Up @@ -220,13 +205,7 @@ describe('external renderer', () => {

await Promise.all(messages)

await waitForProcessedMessages(9_000)
await new Promise((res) => {
setTimeout(() => {
res(null)
}, 1100)
})

await waitForConsumeCount(3)
expect(consumeBatchs.length).toBe(3)
consumeBatchs.forEach((result) => {
expect(result[1].length).toBe(3_000)
Expand All @@ -247,12 +226,7 @@ describe('external renderer', () => {
}
await Promise.all(messages)

await waitForProcessedMessages(12_000)
await new Promise((res) => {
setTimeout(() => {
res(null)
}, 1100)
})
await waitForConsumeCount(2)

expect(consumeBatchs.length).toBe(2)
expect(consumeBatchs[0][1].length).toBe(10_000)
Expand All @@ -277,12 +251,7 @@ describe('external renderer', () => {
}
await Promise.all(messages)

await waitForProcessedMessages(24_000)
await new Promise((res) => {
setTimeout(() => {
res(null)
}, 1100)
})
await waitForConsumeCount(4)

expect(consumeBatchs.length).toBe(4)
expect(consumeBatchs[0][1].length).toBe(10_000)
Expand All @@ -309,20 +278,10 @@ describe('external renderer', () => {
)
}
await Promise.all(messages)
await new Promise((res) => {
setTimeout(() => {
res(null)
}, 1100)
})
expect(consumeBatchs.length).toBe(0)
expect(succesProduceCount).toBe(0)
expect(fnOk).toBeCalledTimes(0)
await checks.checkOk()
await waitForProcessedMessages(10_000)
await new Promise((res) => {
setTimeout(() => {
res(null)
}, 1100)
})
await waitForConsumeCount(1)
})

it('should not consume batches when batch queue is unavailable and then recover', async () => {
Expand All @@ -342,22 +301,13 @@ describe('external renderer', () => {
)
)
}
expect(succesProduceCount).toBe(0)
await Promise.all(messages)
await new Promise((res) => {
setTimeout(() => {
res(null)
}, 1100)
})
expect(consumeBatchs.length).toBe(0)
await checks.checkOk()
expect(fnOk).toBeCalledTimes(1)
expect(fnFail).toBeCalledTimes(0)
await waitForProcessedMessages(12_000)
await new Promise((res) => {
setTimeout(() => {
res(null)
}, 1100)
})
await waitForConsumeCount(2)
expect(consumeBatchs.length).toBe(2)
expect(consumeBatchs[0][1].length).toBe(10_000)
expect(consumeBatchs[1][1].length).toBe(2_000)
Expand All @@ -377,12 +327,7 @@ describe('external renderer', () => {
)
}
await Promise.all(messages)
await waitForProcessedMessages(5)
await new Promise((res) => {
setTimeout(() => {
res(null)
}, 1100)
})
await waitForConsumeCount(1)
expect(checkFn).toHaveBeenCalledTimes(1)
})
})

0 comments on commit f91e6f6

Please sign in to comment.