Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Broker rediscovery with config.brokers parameter taking callback function #854

Merged
merged 19 commits into from
Sep 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions src/cluster/__tests__/brokerPool.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ describe('Cluster > BrokerPool', () => {
})

describe('#connect', () => {
it('connects to the seed broker when the broker pool is created', async () => {
expect(brokerPool.seedBroker.isConnected()).toEqual(false)
it('when the broker pool is created seed broker is null', async () => {
expect(brokerPool.seedBroker).toEqual(undefined)
await brokerPool.connect()
expect(brokerPool.seedBroker.isConnected()).toEqual(true)
})
Expand All @@ -42,6 +42,8 @@ describe('Cluster > BrokerPool', () => {
})

test('select a different seed broker on ILLEGAL_SASL_STATE error', async () => {
await brokerPool.createSeedBroker()

const originalSeedPort = brokerPool.seedBroker.connection.port
const illegalStateError = new KafkaJSProtocolError({
message: 'ILLEGAL_SASL_STATE',
Expand All @@ -58,6 +60,8 @@ describe('Cluster > BrokerPool', () => {
})

test('select a different seed broker on connection errors', async () => {
await brokerPool.createSeedBroker()

const originalSeedPort = brokerPool.seedBroker.connection.port
brokerPool.seedBroker.connect = jest.fn(() => {
throw new KafkaJSConnectionError('Test connection error')
Expand Down
122 changes: 95 additions & 27 deletions src/cluster/__tests__/connectionBuilder.spec.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
const { newLogger } = require('testHelpers')
const connectionBuilder = require('../connectionBuilder')
const Connection = require('../../network/connection')
const { KafkaJSNonRetriableError } = require('../../errors')
const { KafkaJSConnectionError, KafkaJSNonRetriableError } = require('../../errors')

describe('Cluster > ConnectionBuilder', () => {
let builder
Expand All @@ -28,8 +28,8 @@ describe('Cluster > ConnectionBuilder', () => {
})
})

test('creates a new connection using a random broker', () => {
const connection = builder.build()
test('creates a new connection using a random broker', async () => {
const connection = await builder.build()
expect(connection).toBeInstanceOf(Connection)
expect(connection.host).toBeOneOf(['host.test', 'host2.test', 'host3.test'])
expect(connection.port).toBeOneOf([7777, 7778, 7779])
Expand All @@ -42,18 +42,18 @@ describe('Cluster > ConnectionBuilder', () => {
expect(connection.socketFactory).toBe(socketFactory)
})

test('when called without host and port iterates throught the seed brokers', () => {
const connections = Array(brokers.length)
.fill()
.map(() => {
const { host, port } = builder.build()
return `${host}:${port}`
})
test('when called without host and port iterates throught the seed brokers', async () => {
const connections = []
for (let i = 0; i < brokers.length; i++) {
const { host, port } = await builder.build()
connections.push(`${host}:${port}`)
}

expect(connections).toIncludeSameMembers(brokers)
})

test('accepts overrides for host, port and rack', () => {
const connection = builder.build({
test('accepts overrides for host, port and rack', async () => {
const connection = await builder.build({
host: 'host.another',
port: 8888,
rack: 'rack',
Expand All @@ -63,9 +63,9 @@ describe('Cluster > ConnectionBuilder', () => {
expect(connection.rack).toEqual('rack')
})

it('throws an exception if brokers list is empty', () => {
expect(() => {
builder = connectionBuilder({
it('throws an exception if brokers list is empty', async () => {
await expect(
connectionBuilder({
socketFactory,
brokers: [],
ssl,
Expand All @@ -74,16 +74,13 @@ describe('Cluster > ConnectionBuilder', () => {
connectionTimeout,
retry,
logger,
})
}).toThrow(
KafkaJSNonRetriableError,
'Failed to connect: expected brokers array and got nothing'
)
}).build()
).rejects.toEqual(new KafkaJSNonRetriableError('Failed to connect: brokers array is empty'))
})

it('throws an exception if brokers is null', () => {
expect(() => {
builder = connectionBuilder({
it('throws an exception if brokers is null', async () => {
await expect(
connectionBuilder({
socketFactory,
brokers: null,
ssl,
Expand All @@ -92,10 +89,81 @@ describe('Cluster > ConnectionBuilder', () => {
connectionTimeout,
retry,
logger,
})
}).toThrow(
KafkaJSNonRetriableError,
'Failed to connect: expected brokers array and got nothing'
}).build()
).rejects.toEqual(
new KafkaJSNonRetriableError('Failed to connect: brokers parameter should not be null')
)
})

it('throws an KafkaJSConnectionError if brokers is function and returning null', async () => {
await expect(
connectionBuilder({
socketFactory,
brokers: () => null,
ssl,
sasl,
clientId,
connectionTimeout,
retry,
logger,
}).build()
).rejects.toEqual(
new KafkaJSConnectionError('Failed to connect: brokers function returned nothing')
)
})

it('throws an KafkaJSConnectionError if brokers is function crashes', async () => {
await expect(
connectionBuilder({
socketFactory,
brokers: () => {
throw new Error('oh a crash!')
},
ssl,
sasl,
clientId,
connectionTimeout,
retry,
logger,
}).build()
).rejects.toEqual(
new KafkaJSConnectionError('Failed to connect: brokers function returned exception')
)
})

it('brokers can be function that returns array of host:port strings', async () => {
const builder = connectionBuilder({
socketFactory,
brokers: () => ['host.test:7777'],
ssl,
sasl,
clientId,
connectionTimeout,
retry,
logger,
})

const connection = await builder.build()
expect(connection).toBeInstanceOf(Connection)
expect(connection.host).toBe('host.test')
expect(connection.port).toBe(7777)
})

it('brokers can be async function that returns array of host:port strings', async () => {
const builder = connectionBuilder({
socketFactory,
brokers: async () => ['host.test:7777'],
ssl,
sasl,
clientId,
connectionTimeout,
retry,
logger,
})

const connection = await builder.build()
expect(connection).toBeInstanceOf(Connection)
expect(connection.host).toBe('host.test')
expect(connection.port).toBe(7777)
})
})
84 changes: 50 additions & 34 deletions src/cluster/brokerPool.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,6 @@ module.exports = class BrokerPool {
...options,
})

this.seedBroker = this.createBroker({
connection: this.connectionBuilder.build(),
logger: this.rootLogger,
})

this.brokers = {}
this.metadata = null
this.metadataExpireAt = null
Expand All @@ -60,7 +55,21 @@ module.exports = class BrokerPool {
*/
hasConnectedBrokers() {
const brokers = values(this.brokers)
return !!brokers.find(broker => broker.isConnected()) || this.seedBroker.isConnected()
return (
!!brokers.find(broker => broker.isConnected()) ||
(this.seedBroker ? this.seedBroker.isConnected() : false)
)
}

async createSeedBroker() {
if (this.seedBroker) {
await this.seedBroker.disconnect()
}

this.seedBroker = this.createBroker({
connection: await this.connectionBuilder.build(),
logger: this.rootLogger,
})
}

/**
Expand All @@ -72,17 +81,18 @@ module.exports = class BrokerPool {
return
}

if (!this.seedBroker) {
await this.createSeedBroker()
}

return this.retrier(async (bail, retryCount, retryTime) => {
try {
await this.seedBroker.connect()
this.versions = this.seedBroker.versions
} catch (e) {
if (e.name === 'KafkaJSConnectionError' || e.type === 'ILLEGAL_SASL_STATE') {
// Connection builder will always rotate the seed broker
this.seedBroker = this.createBroker({
connection: this.connectionBuilder.build(),
logger: this.rootLogger,
})
await this.createSeedBroker()
this.logger.error(
`Failed to connect to seed broker, trying another broker from the list: ${e.message}`,
{ retryCount, retryTime }
Expand All @@ -102,7 +112,7 @@ module.exports = class BrokerPool {
* @returns {Promise}
*/
async disconnect() {
await this.seedBroker.disconnect()
this.seedBroker && (await this.seedBroker.disconnect())
await Promise.all(values(this.brokers).map(broker => broker.disconnect()))

this.brokers = {}
Expand Down Expand Up @@ -146,33 +156,39 @@ module.exports = class BrokerPool {
this.metadataExpireAt = Date.now() + this.metadataMaxAge

const replacedBrokers = []
this.brokers = this.metadata.brokers.reduce((result, { nodeId, host, port, rack }) => {
if (result[nodeId]) {
if (!hasBrokerBeenReplaced(result[nodeId], { host, port, rack })) {
return result

this.brokers = await this.metadata.brokers.reduce(
async (resultPromise, { nodeId, host, port, rack }) => {
const result = await resultPromise

if (result[nodeId]) {
if (!hasBrokerBeenReplaced(result[nodeId], { host, port, rack })) {
return result
}

replacedBrokers.push(result[nodeId])
}

replacedBrokers.push(result[nodeId])
}
if (host === seedHost && port === seedPort) {
this.seedBroker.nodeId = nodeId
this.seedBroker.connection.rack = rack
return assign(result, {
[nodeId]: this.seedBroker,
})
}

if (host === seedHost && port === seedPort) {
this.seedBroker.nodeId = nodeId
this.seedBroker.connection.rack = rack
return assign(result, {
[nodeId]: this.seedBroker,
[nodeId]: this.createBroker({
logger: this.rootLogger,
versions: this.versions,
supportAuthenticationProtocol: this.supportAuthenticationProtocol,
connection: await this.connectionBuilder.build({ host, port, rack }),
nodeId,
}),
})
}

return assign(result, {
[nodeId]: this.createBroker({
logger: this.rootLogger,
versions: this.versions,
supportAuthenticationProtocol: this.supportAuthenticationProtocol,
connection: this.connectionBuilder.build({ host, port, rack }),
nodeId,
}),
})
}, this.brokers)
},
this.brokers
)

const freshBrokerIds = this.metadata.brokers.map(({ nodeId }) => `${nodeId}`).sort()
const currentBrokerIds = keys(this.brokers).sort()
Expand Down Expand Up @@ -303,7 +319,7 @@ module.exports = class BrokerPool {
}

// Rebuild the connection since it can't recover from illegal SASL state
broker.connection = this.connectionBuilder.build({
broker.connection = await this.connectionBuilder.build({
host: broker.connection.host,
port: broker.connection.port,
rack: broker.connection.rack,
Expand Down
Loading