Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Eero broker rediscovery #2

Merged
merged 5 commits into from
Sep 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/cluster/__tests__/brokerPool.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ describe('Cluster > BrokerPool', () => {
connectionBuilder: createConnectionBuilder(),
logger: newLogger(),
})
await brokerPool.initSeedBroker()
})

afterEach(async () => {
Expand Down
80 changes: 69 additions & 11 deletions src/cluster/__tests__/connectionBuilder.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ describe('Cluster > ConnectionBuilder', () => {
})
})

test('creates a new connection using a random broker', () => {
const connection = builder.build()
test('creates a new connection using a random broker', async () => {
const connection = await builder.build()
expect(connection).toBeInstanceOf(Connection)
expect(connection.host).toBeOneOf(['host.test', 'host2.test', 'host3.test'])
expect(connection.port).toBeOneOf([7777, 7778, 7779])
Expand All @@ -42,18 +42,18 @@ describe('Cluster > ConnectionBuilder', () => {
expect(connection.socketFactory).toBe(socketFactory)
})

test('when called without host and port iterates throught the seed brokers', () => {
const connections = Array(brokers.length)
.fill()
.map(() => {
const { host, port } = builder.build()
return `${host}:${port}`
})
test('when called without host and port iterates throught the seed brokers', async () => {
const connections = []
for (let i = 0; i < brokers.length; i++) {
const { host, port } = await builder.build()
connections.push(`${host}:${port}`)
}

expect(connections).toIncludeSameMembers(brokers)
})

test('accepts overrides for host, port and rack', () => {
const connection = builder.build({
test('accepts overrides for host, port and rack', async () => {
const connection = await builder.assign({
host: 'host.another',
port: 8888,
rack: 'rack',
Expand Down Expand Up @@ -98,4 +98,62 @@ describe('Cluster > ConnectionBuilder', () => {
'Failed to connect: expected brokers array and got nothing'
)
})

it('brokers can be function that returns array of host:port strings', async () => {
const builder = connectionBuilder({
socketFactory,
brokers: () => ['host.test:7777'],
ssl,
sasl,
clientId,
connectionTimeout,
retry,
logger,
})

const connection = await builder.build()
expect(connection).toBeInstanceOf(Connection)
expect(connection.host).toBe('host.test')
expect(connection.port).toBe(7777)
})

it('brokers can be async function that returns array of host:port strings', async () => {
const builder = connectionBuilder({
socketFactory,
brokers: async () => ['host.test:7777'],
ssl,
sasl,
clientId,
connectionTimeout,
retry,
logger,
})

const connection = await builder.build()
expect(connection).toBeInstanceOf(Connection)
expect(connection.host).toBe('host.test')
expect(connection.port).toBe(7777)
})

it('brokers can be async function that returns brokers and sasl parameters', async () => {
const builder = connectionBuilder({
socketFactory,
brokers: async () => ({
brokers: ['host.test:7777'],
sasl: { username: 'user' },
}),
ssl,
sasl,
clientId,
connectionTimeout,
retry,
logger,
})

const connection = await builder.build()
expect(connection).toBeInstanceOf(Connection)
expect(connection.host).toBe('host.test')
expect(connection.port).toBe(7777)
expect(connection.sasl).toEqual({ username: 'user' })
})
})
37 changes: 24 additions & 13 deletions src/cluster/brokerPool.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,6 @@ module.exports = class BrokerPool {
...options,
})

this.seedBroker = this.createBroker({
connection: this.connectionBuilder.build(),
logger: this.rootLogger,
})

this.brokers = {}
this.metadata = null
this.metadataExpireAt = null
Expand All @@ -62,7 +57,21 @@ module.exports = class BrokerPool {
*/
hasConnectedBrokers() {
const brokers = values(this.brokers)
return !!brokers.find(broker => broker.isConnected()) || this.seedBroker.isConnected()
return (
!!brokers.find(broker => broker.isConnected()) ||
(this.seedBroker && this.seedBroker.isConnected())
)
}

async initSeedBroker() {
if (this.seedBroker) {
await this.seedBroker.disconnect()
}

this.seedBroker = this.createBroker({
connection: await this.connectionBuilder.build(),
logger: this.rootLogger,
})
}

/**
Expand All @@ -74,17 +83,18 @@ module.exports = class BrokerPool {
return
}

if (!this.seedBroker) {
await this.initSeedBroker()
}

return this.retrier(async (bail, retryCount, retryTime) => {
try {
await this.seedBroker.connect()
this.versions = this.seedBroker.versions
} catch (e) {
if (e.name === 'KafkaJSConnectionError' || e.type === 'ILLEGAL_SASL_STATE') {
// Connection builder will always rotate the seed broker
this.seedBroker = this.createBroker({
connection: this.connectionBuilder.build(),
logger: this.rootLogger,
})
await this.initSeedBroker()
this.logger.error(
`Failed to connect to seed broker, trying another broker from the list: ${e.message}`,
{ retryCount, retryTime }
Expand All @@ -104,7 +114,7 @@ module.exports = class BrokerPool {
* @returns {Promise}
*/
async disconnect() {
await this.seedBroker.disconnect()
this.seedBroker && (await this.seedBroker.disconnect())
await Promise.all(values(this.brokers).map(broker => broker.disconnect()))

this.brokers = {}
Expand All @@ -128,6 +138,7 @@ module.exports = class BrokerPool {
this.metadataExpireAt = Date.now() + this.metadataMaxAge

const replacedBrokers = []

this.brokers = this.metadata.brokers.reduce((result, { nodeId, host, port, rack }) => {
if (result[nodeId]) {
if (!hasBrokerBeenReplaced(result[nodeId], { host, port, rack })) {
Expand All @@ -150,7 +161,7 @@ module.exports = class BrokerPool {
logger: this.rootLogger,
versions: this.versions,
supportAuthenticationProtocol: this.supportAuthenticationProtocol,
connection: this.connectionBuilder.build({ host, port, rack }),
connection: this.connectionBuilder.assign({ host, port, rack }),
nodeId,
}),
})
Expand Down Expand Up @@ -285,7 +296,7 @@ module.exports = class BrokerPool {
}

// Rebuild the connection since it can't recover from illegal SASL state
broker.connection = this.connectionBuilder.build({
broker.connection = await this.connectionBuilder.build({
host: broker.connection.host,
port: broker.connection.port,
rack: broker.connection.rack,
Expand Down
72 changes: 60 additions & 12 deletions src/cluster/connectionBuilder.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
const Connection = require('../network/connection')
const { KafkaJSNonRetriableError } = require('../errors')
const { KafkaJSNonRetriableError, KafkaJSConnectionError } = require('../errors')
const shuffle = require('../utils/shuffle')

const validateBrokers = brokers => {
Expand All @@ -22,21 +22,69 @@ module.exports = ({
logger,
instrumentationEmitter = null,
}) => {
validateBrokers(brokers)
let getNext

const shuffledBrokers = shuffle(brokers)
const size = brokers.length
let index = 0
// dynamic list of brokers
if (typeof brokers === 'function') {
getNext = async () => {
try {
const discovered = await brokers()

return {
build: ({ host, port, rack } = {}) => {
if (!host) {
// Always rotate the seed broker
const [seedHost, seedPort] = shuffledBrokers[index++ % size].split(':')
host = seedHost
port = Number(seedPort)
const brokersList = Array.isArray(discovered) ? discovered : discovered.brokers

const [seedHost, seedPort] = shuffle(brokersList)[0].split(':')

return {
host: seedHost,
port: Number(seedPort),
sasl: discovered.sasl,
}
} catch (e) {
throw new KafkaJSConnectionError('dynamic brokers function crashed, retrying...')
}
}

// static list of seed brokers
} else {
validateBrokers(brokers)

const shuffledBrokers = shuffle(brokers)
const size = brokers.length
let index = 0

getNext = () => {
// Always rotate the seed broker
const [seedHost, seedPort] = shuffledBrokers[index++ % size].split(':')

return {
host: seedHost,
port: Number(seedPort),
sasl,
}
}
}

return {
build: async () => {
const broker = await getNext()

return new Connection({
host: broker.host,
port: broker.port,
sasl: broker.sasl,
ssl,
clientId,
socketFactory,
connectionTimeout,
requestTimeout,
enforceRequestTimeout,
maxInFlightRequests,
instrumentationEmitter,
retry,
logger,
})
},
assign: ({ host, port, rack }) => {
return new Connection({
host,
port,
Expand Down
8 changes: 7 additions & 1 deletion types/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,14 @@ export class Kafka {
logger(): Logger
}

export interface DiscoveryResult {
brokers: KafkaConfig["brokers"]
sasl?: SASLOptions
}
export type DiscoveryFunction = () => Promise<DiscoveryResult>;

export interface KafkaConfig {
brokers: string[]
brokers: string[] | DiscoveryFunction
ssl?: tls.ConnectionOptions | boolean
sasl?: SASLOptions
clientId?: string
Expand Down