Skip to content

Commit

Permalink
Add Admin.describeCluster
Browse files Browse the repository at this point in the history
Returns metadata for all brokers in the cluster.

Fixes #647
  • Loading branch information
Nevon committed Mar 1, 2020
1 parent 6863075 commit a1ace1b
Show file tree
Hide file tree
Showing 6 changed files with 122 additions and 3 deletions.
15 changes: 15 additions & 0 deletions docs/Admin.md
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,21 @@ await admin.setOffsets({
})
```

## <a name="describe-cluster"></a> Describe cluster

Allows you to get information about the broker cluster.

```javascript
await admin.describeCluster()
// {
// brokers: [
// { nodeId: 0, host: 'localhost', port: 9092 }
// ],
// controller: 0,
// clusterId: 'f8QmWTB8SQSLE6C99G4qzA'
// }
```

## <a name="describe-configs"></a> Describe configs

Get the configuration for the specified resources.
Expand Down
34 changes: 34 additions & 0 deletions src/admin/__tests__/describeCluster.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
const createAdmin = require('../index')

const { createCluster, newLogger } = require('testHelpers')

describe('Admin', () => {
let admin

afterEach(async () => {
admin && (await admin.disconnect())
})

describe('describeCluster', () => {
test('retrieves metadata for all brokers in the cluster', async () => {
const cluster = createCluster()
admin = createAdmin({ cluster, logger: newLogger() })

await admin.connect()
const { brokers, clusterId, controller } = await admin.describeCluster()

expect(brokers).toHaveLength(3)
expect(brokers).toEqual(
expect.arrayContaining([
expect.objectContaining({
nodeId: expect.any(Number),
host: expect.any(String),
port: expect.any(Number),
}),
])
)
expect(clusterId).toEqual(expect.any(String))
expect(brokers.map(({ nodeId }) => nodeId)).toContain(controller)
})
})
})
35 changes: 35 additions & 0 deletions src/admin/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ const RESOURCE_TYPES = require('../protocol/resourceTypes')

const { CONNECT, DISCONNECT } = events

const NO_CONTROLLER_ID = -1

const { values, keys } = Object
const eventNames = values(events)
const eventKeys = keys(events)
Expand Down Expand Up @@ -595,6 +597,38 @@ module.exports = ({
}
}

/**
* Describe cluster
*
* @return {Promise<ClusterMetadata>}
*
* @typedef {Object} ClusterMetadata
* @property {Array<Broker>} brokers
* @property {Number} controller Current controller id. Returns null if unknown.
* @property {String} clusterId
*
* @typedef {Object} Broker
* @property {Number} nodeId
* @property {String} host
* @property {Number} port
*/
const describeCluster = async () => {
const { brokers: nodes, clusterId, controllerId } = await cluster.metadata({ topics: [] })
const brokers = nodes.map(({ nodeId, host, port }) => ({
nodeId,
host,
port,
}))
const controller =
controllerId == null || controllerId === NO_CONTROLLER_ID ? null : controllerId

return {
brokers,
controller,
clusterId,
}
}

/**
* List groups in a broker
*
Expand Down Expand Up @@ -653,6 +687,7 @@ module.exports = ({
deleteTopics,
getTopicMetadata,
fetchTopicMetadata,
describeCluster,
events,
fetchOffsets,
fetchTopicOffsets,
Expand Down
22 changes: 22 additions & 0 deletions testHelpers/setup.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,28 @@
jest.setTimeout(90000)

require('jest-extended')

expect.extend({
toBeTypeOrNull(received, argument) {
if (received === null)
return {
message: () => `Ok`,
pass: true,
}
if (expect(received).toEqual(expect.any(argument))) {
return {
message: () => `Ok`,
pass: true,
}
} else {
return {
message: () => `expected ${received} to be ${argument} type or null`,
pass: false,
}
}
},
})

const glob = require('glob')
const path = require('path')

Expand Down
3 changes: 2 additions & 1 deletion types/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ export interface ConsumerConfig {
}

export interface PartitionAssigner {
new (config: { cluster: Cluster }): Assigner
new(config: { cluster: Cluster }): Assigner
}

export interface CoordinatorMetadata {
Expand Down Expand Up @@ -306,6 +306,7 @@ export type Admin = {
fetchTopicOffsets(
topic: string
): Promise<Array<{ partition: number; offset: string; high: string; low: string }>>
describeCluster(): Promise<{ brokers: Array<{ nodeId: number; host: string; port: number }>; controller: number | null, clusterId: string }>
setOffsets(options: { groupId: string; topic: string; partitions: SeekEntry[] }): Promise<void>
resetOffsets(options: { groupId: string; topic: string; earliest: boolean }): Promise<void>
describeConfigs(configs: {
Expand Down
16 changes: 14 additions & 2 deletions types/tests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ const kafka = new Kafka({
username: 'test',
password: 'testtest',
},
logCreator: (logLevel: logLevel) => (entry: LogEntry) => {},
logCreator: (logLevel: logLevel) => (entry: LogEntry) => { },
})

kafka.logger().error('Instantiated KafkaJS')
Expand Down Expand Up @@ -155,6 +155,18 @@ removeListener()

const runAdmin = async () => {
await admin.connect()

const { controller, brokers, clusterId } = await admin.describeCluster()
admin.logger().debug('Fetched cluster metadata', {
controller,
clusterId,
brokers: brokers.map(({ nodeId, host, port }) => ({
nodeId,
host,
port
}))
})

await admin.fetchTopicMetadata({ topics: ['string'] }).then(metadata => {
metadata.topics.forEach(topic => {
console.log(topic.name, topic.partitions)
Expand Down Expand Up @@ -216,7 +228,7 @@ new KafkaJSConnectionError('Connection error: ECONNREFUSED', {
});
new KafkaJSConnectionError('Connection error: ECONNREFUSED', { code: 'ECONNREFUSED' });

new KafkaJSRequestTimeoutError('Request requestInfo timed out', {
new KafkaJSRequestTimeoutError('Request requestInfo timed out', {
broker: `${host}:9094`,
clientId: 'example-consumer',
correlationId: 0,
Expand Down

0 comments on commit a1ace1b

Please sign in to comment.