Skip to content

Commit

Permalink
Merge pull request #2 from meister/eero-broker-rediscovery
Browse files Browse the repository at this point in the history
Eero broker rediscovery
  • Loading branch information
meister authored Sep 2, 2020
2 parents 5987f27 + ccfdd7c commit 23b0df8
Show file tree
Hide file tree
Showing 5 changed files with 161 additions and 37 deletions.
1 change: 1 addition & 0 deletions src/cluster/__tests__/brokerPool.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ describe('Cluster > BrokerPool', () => {
connectionBuilder: createConnectionBuilder(),
logger: newLogger(),
})
await brokerPool.initSeedBroker()
})

afterEach(async () => {
Expand Down
80 changes: 69 additions & 11 deletions src/cluster/__tests__/connectionBuilder.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ describe('Cluster > ConnectionBuilder', () => {
})
})

test('creates a new connection using a random broker', () => {
const connection = builder.build()
test('creates a new connection using a random broker', async () => {
const connection = await builder.build()
expect(connection).toBeInstanceOf(Connection)
expect(connection.host).toBeOneOf(['host.test', 'host2.test', 'host3.test'])
expect(connection.port).toBeOneOf([7777, 7778, 7779])
Expand All @@ -42,18 +42,18 @@ describe('Cluster > ConnectionBuilder', () => {
expect(connection.socketFactory).toBe(socketFactory)
})

test('when called without host and port iterates throught the seed brokers', () => {
const connections = Array(brokers.length)
.fill()
.map(() => {
const { host, port } = builder.build()
return `${host}:${port}`
})
test('when called without host and port iterates throught the seed brokers', async () => {
const connections = []
for (let i = 0; i < brokers.length; i++) {
const { host, port } = await builder.build()
connections.push(`${host}:${port}`)
}

expect(connections).toIncludeSameMembers(brokers)
})

test('accepts overrides for host, port and rack', () => {
const connection = builder.build({
test('accepts overrides for host, port and rack', async () => {
const connection = await builder.assign({
host: 'host.another',
port: 8888,
rack: 'rack',
Expand Down Expand Up @@ -98,4 +98,62 @@ describe('Cluster > ConnectionBuilder', () => {
'Failed to connect: expected brokers array and got nothing'
)
})

it('brokers can be function that returns array of host:port strings', async () => {
const builder = connectionBuilder({
socketFactory,
brokers: () => ['host.test:7777'],
ssl,
sasl,
clientId,
connectionTimeout,
retry,
logger,
})

const connection = await builder.build()
expect(connection).toBeInstanceOf(Connection)
expect(connection.host).toBe('host.test')
expect(connection.port).toBe(7777)
})

it('brokers can be async function that returns array of host:port strings', async () => {
const builder = connectionBuilder({
socketFactory,
brokers: async () => ['host.test:7777'],
ssl,
sasl,
clientId,
connectionTimeout,
retry,
logger,
})

const connection = await builder.build()
expect(connection).toBeInstanceOf(Connection)
expect(connection.host).toBe('host.test')
expect(connection.port).toBe(7777)
})

it('brokers can be async function that returns brokers and sasl parameters', async () => {
const builder = connectionBuilder({
socketFactory,
brokers: async () => ({
brokers: ['host.test:7777'],
sasl: { username: 'user' },
}),
ssl,
sasl,
clientId,
connectionTimeout,
retry,
logger,
})

const connection = await builder.build()
expect(connection).toBeInstanceOf(Connection)
expect(connection.host).toBe('host.test')
expect(connection.port).toBe(7777)
expect(connection.sasl).toEqual({ username: 'user' })
})
})
37 changes: 24 additions & 13 deletions src/cluster/brokerPool.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,6 @@ module.exports = class BrokerPool {
...options,
})

this.seedBroker = this.createBroker({
connection: this.connectionBuilder.build(),
logger: this.rootLogger,
})

this.brokers = {}
this.metadata = null
this.metadataExpireAt = null
Expand All @@ -62,7 +57,21 @@ module.exports = class BrokerPool {
*/
hasConnectedBrokers() {
const brokers = values(this.brokers)
return !!brokers.find(broker => broker.isConnected()) || this.seedBroker.isConnected()
return (
!!brokers.find(broker => broker.isConnected()) ||
(this.seedBroker && this.seedBroker.isConnected())
)
}

async initSeedBroker() {
if (this.seedBroker) {
await this.seedBroker.disconnect()
}

this.seedBroker = this.createBroker({
connection: await this.connectionBuilder.build(),
logger: this.rootLogger,
})
}

/**
Expand All @@ -74,17 +83,18 @@ module.exports = class BrokerPool {
return
}

if (!this.seedBroker) {
await this.initSeedBroker()
}

return this.retrier(async (bail, retryCount, retryTime) => {
try {
await this.seedBroker.connect()
this.versions = this.seedBroker.versions
} catch (e) {
if (e.name === 'KafkaJSConnectionError' || e.type === 'ILLEGAL_SASL_STATE') {
// Connection builder will always rotate the seed broker
this.seedBroker = this.createBroker({
connection: this.connectionBuilder.build(),
logger: this.rootLogger,
})
await this.initSeedBroker()
this.logger.error(
`Failed to connect to seed broker, trying another broker from the list: ${e.message}`,
{ retryCount, retryTime }
Expand All @@ -104,7 +114,7 @@ module.exports = class BrokerPool {
* @returns {Promise}
*/
async disconnect() {
await this.seedBroker.disconnect()
this.seedBroker && (await this.seedBroker.disconnect())
await Promise.all(values(this.brokers).map(broker => broker.disconnect()))

this.brokers = {}
Expand All @@ -128,6 +138,7 @@ module.exports = class BrokerPool {
this.metadataExpireAt = Date.now() + this.metadataMaxAge

const replacedBrokers = []

this.brokers = this.metadata.brokers.reduce((result, { nodeId, host, port, rack }) => {
if (result[nodeId]) {
if (!hasBrokerBeenReplaced(result[nodeId], { host, port, rack })) {
Expand All @@ -150,7 +161,7 @@ module.exports = class BrokerPool {
logger: this.rootLogger,
versions: this.versions,
supportAuthenticationProtocol: this.supportAuthenticationProtocol,
connection: this.connectionBuilder.build({ host, port, rack }),
connection: this.connectionBuilder.assign({ host, port, rack }),
nodeId,
}),
})
Expand Down Expand Up @@ -285,7 +296,7 @@ module.exports = class BrokerPool {
}

// Rebuild the connection since it can't recover from illegal SASL state
broker.connection = this.connectionBuilder.build({
broker.connection = await this.connectionBuilder.build({
host: broker.connection.host,
port: broker.connection.port,
rack: broker.connection.rack,
Expand Down
72 changes: 60 additions & 12 deletions src/cluster/connectionBuilder.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
const Connection = require('../network/connection')
const { KafkaJSNonRetriableError } = require('../errors')
const { KafkaJSNonRetriableError, KafkaJSConnectionError } = require('../errors')
const shuffle = require('../utils/shuffle')

const validateBrokers = brokers => {
Expand All @@ -22,21 +22,69 @@ module.exports = ({
logger,
instrumentationEmitter = null,
}) => {
validateBrokers(brokers)
let getNext

const shuffledBrokers = shuffle(brokers)
const size = brokers.length
let index = 0
// dynamic list of brokers
if (typeof brokers === 'function') {
getNext = async () => {
try {
const discovered = await brokers()

return {
build: ({ host, port, rack } = {}) => {
if (!host) {
// Always rotate the seed broker
const [seedHost, seedPort] = shuffledBrokers[index++ % size].split(':')
host = seedHost
port = Number(seedPort)
const brokersList = Array.isArray(discovered) ? discovered : discovered.brokers

const [seedHost, seedPort] = shuffle(brokersList)[0].split(':')

return {
host: seedHost,
port: Number(seedPort),
sasl: discovered.sasl,
}
} catch (e) {
throw new KafkaJSConnectionError('dynamic brokers function crashed, retrying...')
}
}

// static list of seed brokers
} else {
validateBrokers(brokers)

const shuffledBrokers = shuffle(brokers)
const size = brokers.length
let index = 0

getNext = () => {
// Always rotate the seed broker
const [seedHost, seedPort] = shuffledBrokers[index++ % size].split(':')

return {
host: seedHost,
port: Number(seedPort),
sasl,
}
}
}

return {
build: async () => {
const broker = await getNext()

return new Connection({
host: broker.host,
port: broker.port,
sasl: broker.sasl,
ssl,
clientId,
socketFactory,
connectionTimeout,
requestTimeout,
enforceRequestTimeout,
maxInFlightRequests,
instrumentationEmitter,
retry,
logger,
})
},
assign: ({ host, port, rack }) => {
return new Connection({
host,
port,
Expand Down
8 changes: 7 additions & 1 deletion types/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,14 @@ export class Kafka {
logger(): Logger
}

export interface DiscoveryResult {
brokers: KafkaConfig["brokers"]
sasl?: SASLOptions
}
export type DiscoveryFunction = () => Promise<DiscoveryResult>;

export interface KafkaConfig {
brokers: string[]
brokers: string[] | DiscoveryFunction
ssl?: tls.ConnectionOptions | boolean
sasl?: SASLOptions
clientId?: string
Expand Down

0 comments on commit 23b0df8

Please sign in to comment.