Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Broker rediscovery with config.brokers parameter taking callback function #854

Merged
merged 19 commits into from
Sep 21, 2020
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions src/cluster/__tests__/brokerPool.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ describe('Cluster > BrokerPool', () => {
})

describe('#connect', () => {
it('connects to the seed broker when the broker pool is created', async () => {
expect(brokerPool.seedBroker.isConnected()).toEqual(false)
it('when the broker pool is created seed broker is null', async () => {
expect(brokerPool.seedBroker).toEqual(undefined)
await brokerPool.connect()
expect(brokerPool.seedBroker.isConnected()).toEqual(true)
})
Expand All @@ -41,6 +41,8 @@ describe('Cluster > BrokerPool', () => {
})

test('select a different seed broker on ILLEGAL_SASL_STATE error', async () => {
await brokerPool.createSeedBroker()

const originalSeedPort = brokerPool.seedBroker.connection.port
const illegalStateError = new KafkaJSProtocolError({
message: 'ILLEGAL_SASL_STATE',
Expand All @@ -57,6 +59,8 @@ describe('Cluster > BrokerPool', () => {
})

test('select a different seed broker on connection errors', async () => {
await brokerPool.createSeedBroker()

const originalSeedPort = brokerPool.seedBroker.connection.port
brokerPool.seedBroker.connect = jest.fn(() => {
throw new KafkaJSConnectionError('Test connection error')
Expand Down
105 changes: 79 additions & 26 deletions src/cluster/__tests__/connectionBuilder.spec.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
const { newLogger } = require('testHelpers')
const connectionBuilder = require('../connectionBuilder')
const Connection = require('../../network/connection')
const { KafkaJSNonRetriableError } = require('../../errors')
const { KafkaJSConnectionError } = require('../../errors')

describe('Cluster > ConnectionBuilder', () => {
let builder
Expand All @@ -28,8 +28,8 @@ describe('Cluster > ConnectionBuilder', () => {
})
})

test('creates a new connection using a random broker', () => {
const connection = builder.build()
test('creates a new connection using a random broker', async () => {
const connection = await builder.build()
expect(connection).toBeInstanceOf(Connection)
expect(connection.host).toBeOneOf(['host.test', 'host2.test', 'host3.test'])
expect(connection.port).toBeOneOf([7777, 7778, 7779])
Expand All @@ -42,18 +42,18 @@ describe('Cluster > ConnectionBuilder', () => {
expect(connection.socketFactory).toBe(socketFactory)
})

test('when called without host and port iterates throught the seed brokers', () => {
const connections = Array(brokers.length)
.fill()
.map(() => {
const { host, port } = builder.build()
return `${host}:${port}`
})
test('when called without host and port picks random seed broker', async () => {
pimpelsang marked this conversation as resolved.
Show resolved Hide resolved
const connections = []
for (let i = 0; i < brokers.length; i++) {
const { host, port } = await builder.build()
connections.push(`${host}:${port}`)
}

expect(connections).toIncludeSameMembers(brokers)
})

test('accepts overrides for host, port and rack', () => {
const connection = builder.build({
test('accepts overrides for host, port and rack', async () => {
const connection = await builder.build({
host: 'host.another',
port: 8888,
rack: 'rack',
Expand All @@ -63,9 +63,9 @@ describe('Cluster > ConnectionBuilder', () => {
expect(connection.rack).toEqual('rack')
})

it('throws an exception if brokers list is empty', () => {
expect(() => {
builder = connectionBuilder({
it('throws an exception if brokers list is empty', async () => {
await expect(
connectionBuilder({
socketFactory,
brokers: [],
ssl,
Expand All @@ -74,16 +74,15 @@ describe('Cluster > ConnectionBuilder', () => {
connectionTimeout,
retry,
logger,
})
}).toThrow(
KafkaJSNonRetriableError,
'Failed to connect: expected brokers array and got nothing'
}).build()
).rejects.toEqual(
new KafkaJSConnectionError('Failed to connect: expected brokers array and got nothing')
)
})

it('throws an exception if brokers is null', () => {
expect(() => {
builder = connectionBuilder({
it('throws an exception if brokers is null', async () => {
await expect(
connectionBuilder({
socketFactory,
brokers: null,
ssl,
Expand All @@ -92,10 +91,64 @@ describe('Cluster > ConnectionBuilder', () => {
connectionTimeout,
retry,
logger,
})
}).toThrow(
KafkaJSNonRetriableError,
'Failed to connect: expected brokers array and got nothing'
}).build()
).rejects.toEqual(
new KafkaJSConnectionError('Failed to connect: expected brokers array and got nothing')
)
})

it('brokers can be function that returns array of host:port strings', async () => {
const builder = connectionBuilder({
socketFactory,
brokers: () => ['host.test:7777'],
ssl,
sasl,
clientId,
connectionTimeout,
retry,
logger,
})

const connection = await builder.build()
expect(connection).toBeInstanceOf(Connection)
expect(connection.host).toBe('host.test')
expect(connection.port).toBe(7777)
})

it('brokers can be async function that returns array of host:port strings', async () => {
const builder = connectionBuilder({
socketFactory,
brokers: async () => ['host.test:7777'],
ssl,
sasl,
clientId,
connectionTimeout,
retry,
logger,
})

const connection = await builder.build()
expect(connection).toBeInstanceOf(Connection)
expect(connection.host).toBe('host.test')
expect(connection.port).toBe(7777)
})

it('sasl can be async function that returns sasl parameters', async () => {
const builder = connectionBuilder({
socketFactory,
brokers,
sasl: async () => ({
username: 'user',
}),
ssl,
clientId,
connectionTimeout,
retry,
logger,
})

const connection = await builder.build()
expect(connection).toBeInstanceOf(Connection)
expect(connection.sasl).toEqual({ username: 'user' })
})
})
84 changes: 50 additions & 34 deletions src/cluster/brokerPool.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,6 @@ module.exports = class BrokerPool {
...options,
})

this.seedBroker = this.createBroker({
connection: this.connectionBuilder.build(),
logger: this.rootLogger,
})

this.brokers = {}
this.metadata = null
this.metadataExpireAt = null
Expand All @@ -60,7 +55,21 @@ module.exports = class BrokerPool {
*/
hasConnectedBrokers() {
const brokers = values(this.brokers)
return !!brokers.find(broker => broker.isConnected()) || this.seedBroker.isConnected()
return (
!!brokers.find(broker => broker.isConnected()) ||
(this.seedBroker ? this.seedBroker.isConnected() : false)
)
}

async createSeedBroker() {
if (this.seedBroker) {
await this.seedBroker.disconnect()
}

this.seedBroker = this.createBroker({
connection: await this.connectionBuilder.build(),
logger: this.rootLogger,
})
}

/**
Expand All @@ -72,17 +81,18 @@ module.exports = class BrokerPool {
return
}

if (!this.seedBroker) {
await this.createSeedBroker()
}

return this.retrier(async (bail, retryCount, retryTime) => {
try {
await this.seedBroker.connect()
this.versions = this.seedBroker.versions
} catch (e) {
if (e.name === 'KafkaJSConnectionError' || e.type === 'ILLEGAL_SASL_STATE') {
// Connection builder will always rotate the seed broker
this.seedBroker = this.createBroker({
connection: this.connectionBuilder.build(),
logger: this.rootLogger,
})
await this.createSeedBroker()
this.logger.error(
`Failed to connect to seed broker, trying another broker from the list: ${e.message}`,
{ retryCount, retryTime }
Expand All @@ -102,7 +112,7 @@ module.exports = class BrokerPool {
* @returns {Promise}
*/
async disconnect() {
await this.seedBroker.disconnect()
this.seedBroker && (await this.seedBroker.disconnect())
await Promise.all(values(this.brokers).map(broker => broker.disconnect()))

this.brokers = {}
Expand All @@ -126,33 +136,39 @@ module.exports = class BrokerPool {
this.metadataExpireAt = Date.now() + this.metadataMaxAge

const replacedBrokers = []
this.brokers = this.metadata.brokers.reduce((result, { nodeId, host, port, rack }) => {
if (result[nodeId]) {
if (!hasBrokerBeenReplaced(result[nodeId], { host, port, rack })) {
return result

this.brokers = await this.metadata.brokers.reduce(
async (resultPromise, { nodeId, host, port, rack }) => {
const result = await resultPromise

if (result[nodeId]) {
if (!hasBrokerBeenReplaced(result[nodeId], { host, port, rack })) {
return result
}

replacedBrokers.push(result[nodeId])
}

replacedBrokers.push(result[nodeId])
}
if (host === seedHost && port === seedPort) {
this.seedBroker.nodeId = nodeId
this.seedBroker.connection.rack = rack
return assign(result, {
[nodeId]: this.seedBroker,
})
}

if (host === seedHost && port === seedPort) {
this.seedBroker.nodeId = nodeId
this.seedBroker.connection.rack = rack
return assign(result, {
[nodeId]: this.seedBroker,
[nodeId]: this.createBroker({
logger: this.rootLogger,
versions: this.versions,
supportAuthenticationProtocol: this.supportAuthenticationProtocol,
connection: await this.connectionBuilder.build({ host, port, rack }),
nodeId,
}),
})
}

return assign(result, {
[nodeId]: this.createBroker({
logger: this.rootLogger,
versions: this.versions,
supportAuthenticationProtocol: this.supportAuthenticationProtocol,
connection: this.connectionBuilder.build({ host, port, rack }),
nodeId,
}),
})
}, this.brokers)
},
this.brokers
)

const freshBrokerIds = this.metadata.brokers.map(({ nodeId }) => `${nodeId}`).sort()
const currentBrokerIds = keys(this.brokers).sort()
Expand Down Expand Up @@ -283,7 +299,7 @@ module.exports = class BrokerPool {
}

// Rebuild the connection since it can't recover from illegal SASL state
broker.connection = this.connectionBuilder.build({
broker.connection = await this.connectionBuilder.build({
host: broker.connection.host,
port: broker.connection.port,
rack: broker.connection.rack,
Expand Down
24 changes: 11 additions & 13 deletions src/cluster/connectionBuilder.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
const Connection = require('../network/connection')
const { KafkaJSNonRetriableError } = require('../errors')
const shuffle = require('../utils/shuffle')
const { KafkaJSConnectionError } = require('../errors')

const validateBrokers = brokers => {
if (!brokers || brokers.length === 0) {
throw new KafkaJSNonRetriableError(`Failed to connect: expected brokers array and got nothing`)
throw new KafkaJSConnectionError(`Failed to connect: expected brokers array and got nothing`)
Copy link
Collaborator

@Nevon Nevon Sep 16, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this should be a KafkaJSConnectionError, since it's actually a validation error that's not retriable unless the data it's validating changes. KafkaJSConnectionError is retriable. I see three options:

  1. Keep it retriable and live with the fact that it's weird to validate the same static list N times.
  2. Make validateBrokers a bit smarter and have it throw a retriable error if brokers is a function and a non-retriable error if it's not.
  3. Wrap brokers in an async function that does retrying internally instead. Something along the lines of
const retrier = createRetry({ ...retry })
const getBrokers = () => retrier(async bail => {
  const static = Array.isArray(brokers)
  const list = static ? brokers : await brokers()
  try {
    validateBrokers(list) // Throws retriable error
    return list
  } catch (err) {
    // if it's a static list, don't bother retrying
    if (static) {
      // Important to throw non-retriable error, since it will bubble up outside this retrier
      bail(new KafkaJSNonRetriableError(err))
    } else {
      throw err
    }
  }
})

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest second option

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good!

}
}

Expand All @@ -22,27 +21,26 @@ module.exports = ({
logger,
instrumentationEmitter = null,
}) => {
validateBrokers(brokers)

const shuffledBrokers = shuffle(brokers)
const size = brokers.length
let index = 0

return {
build: ({ host, port, rack } = {}) => {
build: async ({ host, port, rack } = {}) => {
if (!host) {
// Always rotate the seed broker
const [seedHost, seedPort] = shuffledBrokers[index++ % size].split(':')
host = seedHost
port = Number(seedPort)
const list = typeof brokers === 'function' ? await brokers() : brokers
validateBrokers(list)

const randomBroker = list[index++ % list.length]

host = randomBroker.split(':')[0]
port = Number(randomBroker.split(':')[1])
}

return new Connection({
host,
port,
rack,
sasl: typeof sasl === 'function' ? await sasl() : sasl,
Nevon marked this conversation as resolved.
Show resolved Hide resolved
ssl,
sasl,
clientId,
socketFactory,
connectionTimeout,
Expand Down
7 changes: 5 additions & 2 deletions types/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@ export class Kafka {
logger(): Logger
}

export type BrokersFunction = () => Promise<string[]>;
Nevon marked this conversation as resolved.
Show resolved Hide resolved
export type SaslFunction = () => Promise<SASLOptions>;

export interface KafkaConfig {
brokers: string[]
brokers: string[] | BrokersFunction
ssl?: tls.ConnectionOptions | boolean
sasl?: SASLOptions
sasl?: SASLOptions | SaslFunction
clientId?: string
connectionTimeout?: number
authenticationTimeout?: number
Expand Down