Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Concurrent produce() sequence number fix #1050

Merged
merged 11 commits into from
Feb 9, 2022
Prev Previous commit
Next Next commit
Remove sleep() from tests
  • Loading branch information
t-d-d committed Aug 9, 2021
commit a686ebd59179930bc25c6823f80f63aa09022499
9 changes: 4 additions & 5 deletions src/producer/__tests__/idempotentProduceMessages.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@ const {
createCluster,
createTopic,
waitForMessages,
waitFor,
} = require('testHelpers')
const { KafkaJSError, KafkaJSProtocolError } = require('../../errors')

const createProducer = require('../index')
const createConsumer = require('../../consumer/index')
const { describe } = require('jest-circus')
const sleep = require('../../utils/sleep')

const arrayUnique = a => [...new Set(a)]

Expand Down Expand Up @@ -78,8 +78,7 @@ describe('Producer > Idempotent producer', () => {
const broker = await cluster.findBroker({ nodeId })

const brokerProduce = jest.spyOn(broker, 'produce')
brokerProduce.mockImplementationOnce(async () => {
await sleep(5)
brokerProduce.mockImplementationOnce(() => {
throw new KafkaJSError('retriable error')
})
}
Expand Down Expand Up @@ -149,7 +148,7 @@ describe('Producer > Idempotent producer', () => {

const brokerProduce = jest.spyOn(broker, 'produce')
brokerProduce.mockImplementationOnce(async () => {
await sleep(100)
await waitFor(() => brokerProduce.mock.results.length === messages.length) // for all the other concurrent calls to have completed
throw new KafkaJSError('retriable error')
})
}
Expand All @@ -175,7 +174,7 @@ describe('Producer > Idempotent producer', () => {
const brokerProduce = jest.spyOn(broker, 'produce')
brokerProduce.mockImplementationOnce()
brokerProduce.mockImplementationOnce(async () => {
await sleep(1)
await waitFor(() => brokerProduce.mock.results.length === messages.length) // for all the other concurrent calls to have completed
throw new KafkaJSError('retriable error')
})
}
Expand Down