Skip to content

Commit

Permalink
Add option to wait for leaders when creating topics
Browse files Browse the repository at this point in the history
  • Loading branch information
tulios committed Jun 11, 2018
1 parent 65296fc commit 71814e3
Show file tree
Hide file tree
Showing 5 changed files with 156 additions and 6 deletions.
33 changes: 30 additions & 3 deletions src/admin/index.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,23 @@
const createRetry = require('../retry')
const waitFor = require('../utils/waitFor')
const { KafkaJSNonRetriableError } = require('../errors')

const retryOnLeaderNotAvailable = (fn, opts = {}) => {
const callback = async () => {
try {
return await fn()
} catch (e) {
console.error(e)
if (e.type !== 'LEADER_NOT_AVAILABLE') {
throw e
}
return false
}
}

return waitFor(callback, opts)
}

module.exports = ({ retry = { retries: 5 }, logger: rootLogger, cluster }) => {
const logger = rootLogger.namespace('Admin')

Expand All @@ -18,9 +35,10 @@ module.exports = ({ retry = { retries: 5 }, logger: rootLogger, cluster }) => {
* @param {array} topics
* @param {boolean} [validateOnly=false]
* @param {number} [timeout=5000]
* @param {boolean} [waitForLeaders=true]
* @return {Promise}
*/
const createTopics = async ({ topics, validateOnly, timeout }) => {
const createTopics = async ({ topics, validateOnly, timeout, waitForLeaders = true }) => {
if (!topics || !Array.isArray(topics)) {
throw new KafkaJSNonRetriableError(`Invalid topics array ${topics}`)
}
Expand All @@ -31,8 +49,8 @@ module.exports = ({ retry = { retries: 5 }, logger: rootLogger, cluster }) => {
)
}

const set = new Set(topics.map(({ topic }) => topic))
if (set.size < topics.length) {
const topicNames = new Set(topics.map(({ topic }) => topic))
if (topicNames.size < topics.length) {
throw new KafkaJSNonRetriableError(
'Invalid topics array, it cannot have multiple entries for the same topic'
)
Expand All @@ -45,6 +63,15 @@ module.exports = ({ retry = { retries: 5 }, logger: rootLogger, cluster }) => {
await cluster.refreshMetadata()
const broker = await cluster.findControllerBroker()
await broker.createTopics({ topics, validateOnly, timeout })

if (waitForLeaders) {
const topicNamesArray = Array.from(topicNames.values())
await retryOnLeaderNotAvailable(async () => await broker.metadata(topicNamesArray), {
delay: 100,
timeoutMessage: 'Timed out while waiting for topic leaders',
})
}

return true
} catch (e) {
if (e.type === 'NOT_CONTROLLER') {
Expand Down
45 changes: 42 additions & 3 deletions src/admin/index.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,12 @@ describe('Admin', () => {
admin = createAdmin({ cluster: createCluster(), logger: newLogger() })

await admin.connect()
await expect(admin.createTopics({ topics: [{ topic: topicName }] })).resolves.toEqual(true)
await expect(
admin.createTopics({
waitForLeaders: false,
topics: [{ topic: topicName }],
})
).resolves.toEqual(true)
})

test('retries if the controller has moved', async () => {
Expand All @@ -104,7 +109,12 @@ describe('Admin', () => {
.mockImplementationOnce(() => broker)

admin = createAdmin({ cluster, logger: newLogger() })
await expect(admin.createTopics({ topics: [{ topic: topicName }] })).resolves.toEqual(true)
await expect(
admin.createTopics({
waitForLeaders: false,
topics: [{ topic: topicName }],
})
).resolves.toEqual(true)

expect(cluster.refreshMetadata).toHaveBeenCalledTimes(2)
expect(cluster.findControllerBroker).toHaveBeenCalledTimes(2)
Expand All @@ -122,11 +132,40 @@ describe('Admin', () => {
})

admin = createAdmin({ cluster, logger: newLogger() })
await expect(admin.createTopics({ topics: [{ topic: topicName }] })).resolves.toEqual(false)
await expect(
admin.createTopics({
waitForLeaders: false,
topics: [{ topic: topicName }],
})
).resolves.toEqual(false)

expect(cluster.refreshMetadata).toHaveBeenCalledTimes(1)
expect(cluster.findControllerBroker).toHaveBeenCalledTimes(1)
expect(broker.createTopics).toHaveBeenCalledTimes(1)
})

test('query metadata if waitForLeaders is true', async () => {
const topic2 = `test-topic-${secureRandom()}`
const topic3 = `test-topic-${secureRandom()}`

const cluster = createCluster()
const broker = { createTopics: jest.fn(), metadata: jest.fn(() => true) }

cluster.refreshMetadata = jest.fn()
cluster.findControllerBroker = jest.fn(() => broker)

broker.createTopics.mockImplementationOnce(() => true)
admin = createAdmin({ cluster, logger: newLogger() })

await expect(
admin.createTopics({
waitForLeaders: true,
topics: [{ topic: topicName }, { topic: topic2 }, { topic: topic3 }],
})
).resolves.toEqual(true)

expect(broker.metadata).toHaveBeenCalledTimes(1)
expect(broker.metadata).toHaveBeenCalledWith([topicName, topic2, topic3])
})
})
})
2 changes: 2 additions & 0 deletions src/errors.js
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ class KafkaJSPartialMessageError extends KafkaJSNonRetriableError {}
class KafkaJSSASLAuthenticationError extends KafkaJSNonRetriableError {}
class KafkaJSGroupCoordinatorNotFound extends KafkaJSNonRetriableError {}
class KafkaJSNotImplemented extends KafkaJSNonRetriableError {}
class KafkaJSTimeout extends KafkaJSNonRetriableError {}

module.exports = {
KafkaJSError,
Expand All @@ -74,4 +75,5 @@ module.exports = {
KafkaJSNotImplemented,
KafkaJSMetadataNotLoaded,
KafkaJSTopicMetadataNotLoaded,
KafkaJSTimeout,
}
45 changes: 45 additions & 0 deletions src/utils/waitFor.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
const sleep = require('./sleep')
const { KafkaJSTimeout } = require('../errors')

module.exports = (
fn,
{ delay = 50, maxWait = 10000, timeoutMessage = 'Timeout', ignoreTimeout = false } = {}
) => {
let timeoutId
let totalWait = 0
let fulfilled = false

const checkCondition = async (resolve, reject) => {
totalWait += delay
await sleep(delay)

try {
const result = await fn(totalWait)
if (result) {
fulfilled = true
clearTimeout(timeoutId)
return resolve(result)
}

checkCondition(resolve, reject)
} catch (e) {
fulfilled = true
clearTimeout(timeoutId)
reject(e)
}
}

return new Promise((resolve, reject) => {
checkCondition(resolve, reject)

if (ignoreTimeout) {
return
}

timeoutId = setTimeout(() => {
if (!fulfilled) {
return reject(new KafkaJSTimeout(timeoutMessage))
}
}, maxWait)
})
}
37 changes: 37 additions & 0 deletions src/utils/waitFor.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
const waitFor = require('./waitFor')

describe('Utils > waitFor', () => {
it('waits for the condition', async () => {
let conditionValid = false

setTimeout(() => {
conditionValid = true
}, 6)

await expect(waitFor(() => conditionValid, { delay: 5 }))
})

it('rejects the promise if the callback fail', async () => {
await expect(
waitFor(
() => {
throw new Error('callback failed!')
},
{ delay: 1 }
)
).rejects.toHaveProperty('message', 'callback failed!')
})

it('rejects the promise if the callback never succeeds', async () => {
await expect(waitFor(() => false, { delay: 1, maxWait: 2 })).rejects.toHaveProperty(
'message',
'Timeout'
)
})

it('rejects the promise with a custom timeout message', async () => {
await expect(
waitFor(() => false, { delay: 1, maxWait: 2, timeoutMessage: 'foo bar' })
).rejects.toHaveProperty('message', 'foo bar')
})
})

0 comments on commit 71814e3

Please sign in to comment.