From ab0ae4b1762944b3c8f8bccf49251ac0bc5d04d1 Mon Sep 17 00:00:00 2001 From: tulios Date: Wed, 27 Jun 2018 10:00:00 +0200 Subject: [PATCH 1/7] Add lock primitive --- src/utils/lock.js | 40 ++++++++++++++++++++++++++++++++++++++++ src/utils/lock.spec.js | 42 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 82 insertions(+) create mode 100644 src/utils/lock.js create mode 100644 src/utils/lock.spec.js diff --git a/src/utils/lock.js b/src/utils/lock.js new file mode 100644 index 000000000..79343b86a --- /dev/null +++ b/src/utils/lock.js @@ -0,0 +1,40 @@ +const EventEmitter = require('events') +const { KafkaJSError } = require('../errors') + +module.exports = class Lock { + constructor({ timeout = 1000 } = {}) { + this.locked = false + this.timeout = timeout + this.emitter = new EventEmitter() + } + + async acquire() { + return new Promise((resolve, reject) => { + if (!this.locked) { + this.locked = true + return resolve() + } + + let timeoutId + const tryToAcquire = () => { + if (!this.locked) { + this.locked = true + clearTimeout(timeoutId) + this.emitter.removeListener('releaseLock', tryToAcquire) + return resolve() + } + } + + this.emitter.on('releaseLock', tryToAcquire) + timeoutId = setTimeout( + () => reject(new KafkaJSError('Timeout while acquiring lock')), + this.timeout + ) + }) + } + + async release() { + this.locked = false + setImmediate(() => this.emitter.emit('releaseLock')) + } +} diff --git a/src/utils/lock.spec.js b/src/utils/lock.spec.js new file mode 100644 index 000000000..c00ceee6c --- /dev/null +++ b/src/utils/lock.spec.js @@ -0,0 +1,42 @@ +const waitFor = require('./waitFor') +const flatten = require('./flatten') +const Lock = require('./lock') + +const sleep = value => waitFor(delay => delay >= value) + +describe('Utils > Lock', () => { + it('allows only one resource at a time', async () => { + const lock = new Lock() + const resource = jest.fn() + const callResource = async () => { + try { + await lock.acquire() + resource(Date.now()) + await sleep(50) + } finally { + await lock.release() + } + } + + await Promise.all([callResource(), callResource(), callResource()]) + const calls = flatten(resource.mock.calls) + expect(calls.length).toEqual(3) + expect(calls[1] - calls[0]).toBeGreaterThanOrEqual(50) + expect(calls[2] - calls[1]).toBeGreaterThanOrEqual(50) + }) + + it('throws an error if the lock cannot be acquired within a period', async () => { + const lock = new Lock({ timeout: 60 }) + const resource = jest.fn() + const callResource = async () => { + await lock.acquire() + resource(Date.now()) + await sleep(50) + // it never releases the lock + } + + await expect( + Promise.all([callResource(), callResource(), callResource()]) + ).rejects.toHaveProperty('message', 'Timeout while acquiring lock') + }) +}) From ad0a730e8303603094f44491c22720f219a85713 Mon Sep 17 00:00:00 2001 From: tulios Date: Wed, 27 Jun 2018 10:00:32 +0200 Subject: [PATCH 2/7] Lock broker connect to prevent issues when auth with SCRAM --- src/broker/__tests__/connect.spec.js | 19 ++++++++++++++++ src/broker/index.js | 34 +++++++++++++++++++--------- 2 files changed, 42 insertions(+), 11 deletions(-) diff --git a/src/broker/__tests__/connect.spec.js b/src/broker/__tests__/connect.spec.js index 71b893674..909d2ec31 100644 --- a/src/broker/__tests__/connect.spec.js +++ b/src/broker/__tests__/connect.spec.js @@ -64,6 +64,25 @@ describe('Broker > connect', () => { expect(broker.authenticated).toEqual(true) }) + test('parallel calls to connect using SCRAM', async () => { + broker = new Broker({ + connection: createConnection(saslSCRAM256ConnectionOpts()), + logger: newLogger(), + }) + + expect(broker.authenticated).toEqual(false) + + await Promise.all([ + broker.connect(), + broker.connect(), + broker.connect(), + broker.connect(), + broker.connect(), + ]) + + expect(broker.authenticated).toEqual(true) + }) + test('switches the authenticated flag to false', async () => { const error = new Error('not connected') broker.authenticated = true diff --git a/src/broker/index.js b/src/broker/index.js index d6aa50823..0025dca16 100644 --- a/src/broker/index.js +++ b/src/broker/index.js @@ -1,3 +1,4 @@ +const Lock = require('../utils/lock') const { Types: Compression } = require('../protocol/message/compression') const { requests, lookup } = require('../protocol/requests') const apiKeys = require('../protocol/requests/apiKeys') @@ -20,6 +21,7 @@ module.exports = class Broker { this.versions = versions this.allowExperimentalV011 = allowExperimentalV011 this.authenticated = false + this.lock = new Lock() this.lookupRequest = () => { throw new Error('Broker not connected') } @@ -39,21 +41,31 @@ module.exports = class Broker { * @returns {Promise} */ async connect() { - this.authenticated = false - await this.connection.connect() + try { + await this.lock.acquire() - if (!this.versions) { - this.versions = await this.apiVersions() - } + if (this.isConnected()) { + return false + } - this.lookupRequest = lookup(this.versions, this.allowExperimentalV011) + this.authenticated = false + await this.connection.connect() - if (!this.authenticated && this.connection.sasl) { - await new SASLAuthenticator(this.connection, this.rootLogger, this.versions).authenticate() - this.authenticated = true - } + if (!this.versions) { + this.versions = await this.apiVersions() + } - return true + this.lookupRequest = lookup(this.versions, this.allowExperimentalV011) + + if (!this.authenticated && this.connection.sasl) { + await new SASLAuthenticator(this.connection, this.rootLogger, this.versions).authenticate() + this.authenticated = true + } + + return true + } finally { + await this.lock.release() + } } /** From a3a9893e82f4d4ea2ae4b8b9a81f7e0657638116 Mon Sep 17 00:00:00 2001 From: tulios Date: Wed, 27 Jun 2018 11:06:23 +0200 Subject: [PATCH 3/7] Expose the lock timeout as authorization timeout --- src/broker/index.js | 13 +++++++++++-- src/cluster/brokerPool.js | 7 +++++-- src/cluster/index.js | 5 ++++- src/index.js | 2 ++ 4 files changed, 22 insertions(+), 5 deletions(-) diff --git a/src/broker/index.js b/src/broker/index.js index 0025dca16..dc0442a23 100644 --- a/src/broker/index.js +++ b/src/broker/index.js @@ -12,14 +12,23 @@ const SASLAuthenticator = require('./saslAuthenticator') * @param {Object} logger * @param {Object} [versions=null] The object with all available versions and APIs * supported by this cluster. The output of broker#apiVersions + * @param {number} [authenticationTimeout=1000] */ module.exports = class Broker { - constructor({ connection, logger, allowExperimentalV011, nodeId = null, versions = null }) { + constructor({ + connection, + logger, + allowExperimentalV011, + nodeId = null, + versions = null, + authenticationTimeout = 1000, + }) { this.connection = connection this.nodeId = nodeId this.rootLogger = logger this.versions = versions this.allowExperimentalV011 = allowExperimentalV011 + this.authenticationTimeout = authenticationTimeout this.authenticated = false this.lock = new Lock() this.lookupRequest = () => { @@ -42,7 +51,7 @@ module.exports = class Broker { */ async connect() { try { - await this.lock.acquire() + await this.lock.acquire({ timeout: this.authenticationTimeout }) if (this.isConnected()) { return false diff --git a/src/cluster/brokerPool.js b/src/cluster/brokerPool.js index 86d21394a..02a26ad9a 100644 --- a/src/cluster/brokerPool.js +++ b/src/cluster/brokerPool.js @@ -10,14 +10,17 @@ module.exports = class BrokerPool { * @param {ConnectionBuilder} connectionBuilder * @param {Logger} logger * @param {Object} retry + * @param {number} authenticationTimeout */ - constructor({ connectionBuilder, logger, retry, allowExperimentalV011 }) { + constructor({ connectionBuilder, logger, retry, allowExperimentalV011, authenticationTimeout }) { this.connectionBuilder = connectionBuilder this.rootLogger = logger this.logger = logger.namespace('BrokerPool') this.retrier = createRetry(assign({}, retry)) - this.createBroker = options => new Broker({ allowExperimentalV011, ...options }) + this.createBroker = options => + new Broker({ allowExperimentalV011, authenticationTimeout, ...options }) + this.seedBroker = this.createBroker({ connection: this.connectionBuilder.build(), logger: this.rootLogger, diff --git a/src/cluster/index.js b/src/cluster/index.js index e95f0f9a7..1544fe00a 100644 --- a/src/cluster/index.js +++ b/src/cluster/index.js @@ -26,18 +26,20 @@ const mergeTopics = (obj, { topic, partitions }) => * @param {Object} sasl * @param {string} clientId * @param {number} connectionTimeout + * @param {number} authenticationTimeout * @param {Object} retry * @param {Object} logger */ module.exports = class Cluster { constructor({ + logger: rootLogger, brokers, ssl, sasl, clientId, connectionTimeout, + authenticationTimeout, retry, - logger: rootLogger, allowExperimentalV011, }) { this.rootLogger = rootLogger @@ -59,6 +61,7 @@ module.exports = class Cluster { logger: this.rootLogger, retry, allowExperimentalV011, + authenticationTimeout, }) } diff --git a/src/index.js b/src/index.js index bf7e61cec..6ebde0d07 100644 --- a/src/index.js +++ b/src/index.js @@ -14,6 +14,7 @@ module.exports = class Client { sasl, clientId, connectionTimeout, + authenticationTimeout, retry, logLevel = INFO, logCreator = LoggerConsole, @@ -28,6 +29,7 @@ module.exports = class Client { sasl, clientId, connectionTimeout, + authenticationTimeout, retry, allowExperimentalV011, }) From 60aad593674a49a344d02c8640e18086453b86ab Mon Sep 17 00:00:00 2001 From: tulios Date: Wed, 27 Jun 2018 11:11:02 +0200 Subject: [PATCH 4/7] Remove unnecessary boolean returns from broker connect --- src/broker/__tests__/connect.spec.js | 2 +- src/broker/index.js | 4 +--- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/src/broker/__tests__/connect.spec.js b/src/broker/__tests__/connect.spec.js index 909d2ec31..d95d9882b 100644 --- a/src/broker/__tests__/connect.spec.js +++ b/src/broker/__tests__/connect.spec.js @@ -24,7 +24,7 @@ describe('Broker > connect', () => { }) test('establish the connection', async () => { - await expect(broker.connect()).resolves.toEqual(true) + await broker.connect() expect(broker.connection.connected).toEqual(true) }) diff --git a/src/broker/index.js b/src/broker/index.js index dc0442a23..c2588d70e 100644 --- a/src/broker/index.js +++ b/src/broker/index.js @@ -54,7 +54,7 @@ module.exports = class Broker { await this.lock.acquire({ timeout: this.authenticationTimeout }) if (this.isConnected()) { - return false + return } this.authenticated = false @@ -70,8 +70,6 @@ module.exports = class Broker { await new SASLAuthenticator(this.connection, this.rootLogger, this.versions).authenticate() this.authenticated = true } - - return true } finally { await this.lock.release() } From 09df998fc50c331d3c53dc17809ff3626b9c71ec Mon Sep 17 00:00:00 2001 From: tulios Date: Wed, 27 Jun 2018 11:18:03 +0200 Subject: [PATCH 5/7] Make sure the broker is disconnected before a reconnect --- src/cluster/brokerPool.js | 1 + 1 file changed, 1 insertion(+) diff --git a/src/cluster/brokerPool.js b/src/cluster/brokerPool.js index 02a26ad9a..52717919f 100644 --- a/src/cluster/brokerPool.js +++ b/src/cluster/brokerPool.js @@ -205,6 +205,7 @@ module.exports = class BrokerPool { await broker.connect() } catch (e) { if (e.name === 'KafkaJSConnectionError' || e.type === 'ILLEGAL_SASL_STATE') { + await broker.disconnect() // Rebuild the connection since it can't recover from illegal SASL state broker.connection = this.connectionBuilder.build({ host: broker.connection.host, From 533f0dc8e799e449bd8b904bc29ddfc79535adf5 Mon Sep 17 00:00:00 2001 From: tulios Date: Wed, 27 Jun 2018 11:25:01 +0200 Subject: [PATCH 6/7] Add a reference to authentication timeout in the readme --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index 03b7eef59..2741420b5 100644 --- a/README.md +++ b/README.md @@ -98,6 +98,7 @@ Kafka has support for using SASL to authenticate clients. The `sasl` option can new Kafka({ clientId: 'my-app', brokers: ['kafka1:9092', 'kafka2:9092'], + // authenticationTimeout: 1000, sasl: { mechanism: 'plain', // scram-sha-256 or scram-sha-512 username: 'my-username', From db7f8a0a2b8c5cef1e83057d42f0623425931cf7 Mon Sep 17 00:00:00 2001 From: tulios Date: Wed, 27 Jun 2018 11:32:11 +0200 Subject: [PATCH 7/7] Increase the connection timeout when running tests --- testHelpers/index.js | 1 + 1 file changed, 1 insertion(+) diff --git a/testHelpers/index.js b/testHelpers/index.js index 280ddee21..edac5edbe 100644 --- a/testHelpers/index.js +++ b/testHelpers/index.js @@ -25,6 +25,7 @@ const saslBrokers = (host = getHost()) => [`${host}:9094`, `${host}:9097`, `${ho const connectionOpts = (opts = {}) => ({ clientId: `test-${secureRandom()}`, + connectionTimeout: 3000, logger: newLogger(), host: getHost(), port: 9092,