Skip to content

Commit

Permalink
Merge pull request #81 from tulios/support-concurrent-broker-connect
Browse files Browse the repository at this point in the history
Support concurrent broker connect
  • Loading branch information
tulios authored Jun 27, 2018
2 parents 09fad59 + db7f8a0 commit 4934e90
Show file tree
Hide file tree
Showing 9 changed files with 147 additions and 16 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ Kafka has support for using SASL to authenticate clients. The `sasl` option can
new Kafka({
clientId: 'my-app',
brokers: ['kafka1:9092', 'kafka2:9092'],
// authenticationTimeout: 1000,
sasl: {
mechanism: 'plain', // scram-sha-256 or scram-sha-512
username: 'my-username',
Expand Down
21 changes: 20 additions & 1 deletion src/broker/__tests__/connect.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ describe('Broker > connect', () => {
})

test('establish the connection', async () => {
await expect(broker.connect()).resolves.toEqual(true)
await broker.connect()
expect(broker.connection.connected).toEqual(true)
})

Expand Down Expand Up @@ -64,6 +64,25 @@ describe('Broker > connect', () => {
expect(broker.authenticated).toEqual(true)
})

test('parallel calls to connect using SCRAM', async () => {
broker = new Broker({
connection: createConnection(saslSCRAM256ConnectionOpts()),
logger: newLogger(),
})

expect(broker.authenticated).toEqual(false)

await Promise.all([
broker.connect(),
broker.connect(),
broker.connect(),
broker.connect(),
broker.connect(),
])

expect(broker.authenticated).toEqual(true)
})

test('switches the authenticated flag to false', async () => {
const error = new Error('not connected')
broker.authenticated = true
Expand Down
43 changes: 31 additions & 12 deletions src/broker/index.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
const Lock = require('../utils/lock')
const { Types: Compression } = require('../protocol/message/compression')
const { requests, lookup } = require('../protocol/requests')
const apiKeys = require('../protocol/requests/apiKeys')
Expand All @@ -11,15 +12,25 @@ const SASLAuthenticator = require('./saslAuthenticator')
* @param {Object} logger
* @param {Object} [versions=null] The object with all available versions and APIs
* supported by this cluster. The output of broker#apiVersions
* @param {number} [authenticationTimeout=1000]
*/
module.exports = class Broker {
constructor({ connection, logger, allowExperimentalV011, nodeId = null, versions = null }) {
constructor({
connection,
logger,
allowExperimentalV011,
nodeId = null,
versions = null,
authenticationTimeout = 1000,
}) {
this.connection = connection
this.nodeId = nodeId
this.rootLogger = logger
this.versions = versions
this.allowExperimentalV011 = allowExperimentalV011
this.authenticationTimeout = authenticationTimeout
this.authenticated = false
this.lock = new Lock()
this.lookupRequest = () => {
throw new Error('Broker not connected')
}
Expand All @@ -39,21 +50,29 @@ module.exports = class Broker {
* @returns {Promise}
*/
async connect() {
this.authenticated = false
await this.connection.connect()
try {
await this.lock.acquire({ timeout: this.authenticationTimeout })

if (!this.versions) {
this.versions = await this.apiVersions()
}
if (this.isConnected()) {
return
}

this.lookupRequest = lookup(this.versions, this.allowExperimentalV011)
this.authenticated = false
await this.connection.connect()

if (!this.authenticated && this.connection.sasl) {
await new SASLAuthenticator(this.connection, this.rootLogger, this.versions).authenticate()
this.authenticated = true
}
if (!this.versions) {
this.versions = await this.apiVersions()
}

this.lookupRequest = lookup(this.versions, this.allowExperimentalV011)

return true
if (!this.authenticated && this.connection.sasl) {
await new SASLAuthenticator(this.connection, this.rootLogger, this.versions).authenticate()
this.authenticated = true
}
} finally {
await this.lock.release()
}
}

/**
Expand Down
8 changes: 6 additions & 2 deletions src/cluster/brokerPool.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,17 @@ module.exports = class BrokerPool {
* @param {ConnectionBuilder} connectionBuilder
* @param {Logger} logger
* @param {Object} retry
* @param {number} authenticationTimeout
*/
constructor({ connectionBuilder, logger, retry, allowExperimentalV011 }) {
constructor({ connectionBuilder, logger, retry, allowExperimentalV011, authenticationTimeout }) {
this.connectionBuilder = connectionBuilder
this.rootLogger = logger
this.logger = logger.namespace('BrokerPool')
this.retrier = createRetry(assign({}, retry))

this.createBroker = options => new Broker({ allowExperimentalV011, ...options })
this.createBroker = options =>
new Broker({ allowExperimentalV011, authenticationTimeout, ...options })

this.seedBroker = this.createBroker({
connection: this.connectionBuilder.build(),
logger: this.rootLogger,
Expand Down Expand Up @@ -202,6 +205,7 @@ module.exports = class BrokerPool {
await broker.connect()
} catch (e) {
if (e.name === 'KafkaJSConnectionError' || e.type === 'ILLEGAL_SASL_STATE') {
await broker.disconnect()
// Rebuild the connection since it can't recover from illegal SASL state
broker.connection = this.connectionBuilder.build({
host: broker.connection.host,
Expand Down
5 changes: 4 additions & 1 deletion src/cluster/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,20 @@ const mergeTopics = (obj, { topic, partitions }) =>
* @param {Object} sasl
* @param {string} clientId
* @param {number} connectionTimeout
* @param {number} authenticationTimeout
* @param {Object} retry
* @param {Object} logger
*/
module.exports = class Cluster {
constructor({
logger: rootLogger,
brokers,
ssl,
sasl,
clientId,
connectionTimeout,
authenticationTimeout,
retry,
logger: rootLogger,
allowExperimentalV011,
}) {
this.rootLogger = rootLogger
Expand All @@ -59,6 +61,7 @@ module.exports = class Cluster {
logger: this.rootLogger,
retry,
allowExperimentalV011,
authenticationTimeout,
})
}

Expand Down
2 changes: 2 additions & 0 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ module.exports = class Client {
sasl,
clientId,
connectionTimeout,
authenticationTimeout,
retry,
logLevel = INFO,
logCreator = LoggerConsole,
Expand All @@ -28,6 +29,7 @@ module.exports = class Client {
sasl,
clientId,
connectionTimeout,
authenticationTimeout,
retry,
allowExperimentalV011,
})
Expand Down
40 changes: 40 additions & 0 deletions src/utils/lock.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
const EventEmitter = require('events')
const { KafkaJSError } = require('../errors')

module.exports = class Lock {
constructor({ timeout = 1000 } = {}) {
this.locked = false
this.timeout = timeout
this.emitter = new EventEmitter()
}

async acquire() {
return new Promise((resolve, reject) => {
if (!this.locked) {
this.locked = true
return resolve()
}

let timeoutId
const tryToAcquire = () => {
if (!this.locked) {
this.locked = true
clearTimeout(timeoutId)
this.emitter.removeListener('releaseLock', tryToAcquire)
return resolve()
}
}

this.emitter.on('releaseLock', tryToAcquire)
timeoutId = setTimeout(
() => reject(new KafkaJSError('Timeout while acquiring lock')),
this.timeout
)
})
}

async release() {
this.locked = false
setImmediate(() => this.emitter.emit('releaseLock'))
}
}
42 changes: 42 additions & 0 deletions src/utils/lock.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
const waitFor = require('./waitFor')
const flatten = require('./flatten')
const Lock = require('./lock')

const sleep = value => waitFor(delay => delay >= value)

describe('Utils > Lock', () => {
it('allows only one resource at a time', async () => {
const lock = new Lock()
const resource = jest.fn()
const callResource = async () => {
try {
await lock.acquire()
resource(Date.now())
await sleep(50)
} finally {
await lock.release()
}
}

await Promise.all([callResource(), callResource(), callResource()])
const calls = flatten(resource.mock.calls)
expect(calls.length).toEqual(3)
expect(calls[1] - calls[0]).toBeGreaterThanOrEqual(50)
expect(calls[2] - calls[1]).toBeGreaterThanOrEqual(50)
})

it('throws an error if the lock cannot be acquired within a period', async () => {
const lock = new Lock({ timeout: 60 })
const resource = jest.fn()
const callResource = async () => {
await lock.acquire()
resource(Date.now())
await sleep(50)
// it never releases the lock
}

await expect(
Promise.all([callResource(), callResource(), callResource()])
).rejects.toHaveProperty('message', 'Timeout while acquiring lock')
})
})
1 change: 1 addition & 0 deletions testHelpers/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ const saslBrokers = (host = getHost()) => [`${host}:9094`, `${host}:9097`, `${ho

const connectionOpts = (opts = {}) => ({
clientId: `test-${secureRandom()}`,
connectionTimeout: 3000,
logger: newLogger(),
host: getHost(),
port: 9092,
Expand Down

0 comments on commit 4934e90

Please sign in to comment.