Skip to content

Commit

Permalink
Add createPartitions
Browse files Browse the repository at this point in the history
  • Loading branch information
adalbert-homa committed Mar 13, 2020
1 parent b5ffa13 commit 9347beb
Show file tree
Hide file tree
Showing 20 changed files with 477 additions and 2 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ yarn-error.log
.vscode
.vs
build/
*.class
*.class
.idea/
30 changes: 30 additions & 0 deletions docs/Admin.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,36 @@ Topic deletion is disabled by default in Apache Kafka versions prior to `1.0.0`.
delete.topic.enable=true
```

## <a name="create-partitions"></a> Create partitions

`createPartitions` will resolve to `true` in case of success. The method will throw exceptions in case of errors.

```javascript
await admin.createPartitions({
validateOnly: <boolean>,
timeout: <Number>,
topicPartitions: <TopicPartition[]>,
})
```

`TopicPartition` structure:

```javascript
{
topic: <String>,
count: <Number>, // partition count
assignments: <Array<Array<Number>>> // Example: [[0,1],[1,2],[2,0]]
}
```

| property | description | default |
| -------------- | ----------------------------------------------------------------------------------------------------- | ------- |
| topicPartitions| Topic partition definition | |
| validateOnly | If this is `true`, the request will be validated, but the topic won't be created. | false |
| timeout | The time in ms to wait for a topic to be completely created on the controller node | 5000 |
| count | New partition count, mandatory | |
| assignments | Assigned brokers for each new partition | null |

## <a name="get-topic-metadata"></a> Get topic metadata

Deprecated, see [Fetch topic metadata](#fetch-topic-metadata)
Expand Down
162 changes: 162 additions & 0 deletions examples/admin.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
const fs = require('fs')
const ip = require('ip')

const { Kafka, logLevel } = require('../index')

const host = process.env.HOST_IP || ip.address()

const logCreator = _logLevel => {
return ({ namespace, level, label, log }) => {
const { timestamp, message, error, ...extra } = log
console.log(`${timestamp}: ${label}: ${message} ${error}`)
}
}

const kafka = new Kafka({
logLevel: logLevel.INFO,
brokers: [`${host}:9094`, `${host}:9097`, `${host}:9100`],
clientId: 'test-admin-id',
ssl: {
servername: 'localhost',
rejectUnauthorized: false,
ca: [fs.readFileSync('./testHelpers/certs/cert-signed', 'utf-8')],
},
sasl: {
mechanism: 'plain',
username: 'test',
password: 'testtest',
},
})

const topic1 = 'topic-test1'
const topic2 = 'topic-test2'
const topic3 = 'topic-test3'

const expectedTopics = [topic1, topic2, topic3]

const filterTopics = ({ name }) => expectedTopics.includes(name)
const sortTopics = (a, b) => {
if (a.name < b.name) {
return -1
}
if (a.name > b.name) {
return 1
}
return 0
}

const admin = kafka.admin()

const getMyTopics = async () => {
const md = await admin.fetchTopicMetadata({})
return md.topics.filter(filterTopics).sort(sortTopics)
}

const wait = nrOfSec => {
return new Promise((resolve, _) => {
setTimeout(resolve, nrOfSec * 1000)
})
}

const run = async () => {
await admin.connect()
// Read back topic to check if we need to delete topics
let myTopics = await getMyTopics()
// delete test topic, ignore error
if (myTopics.length > 0) {
const topicNames = myTopics.map(({ name }) => name)
console.log('We have topics to delete: %s', topicNames.join(', '))
try {
await admin.deleteTopics({ topics: topicNames })
} catch (err) {
console.log('Error deleting topic: ', err)
}
console.log('Wait for delete to take effect')
await wait(5)
}
// We need to wait 10 seconds to make sure topics get deleted
myTopics = await getMyTopics()
if (myTopics.length !== 0) {
console.log('We should not have any topics here? %j', myTopics)
} else {
console.log('We have now 0 topics')
}
console.log('(Re)creating topics')
try {
console.log('Creating topics')
await admin.createTopics({
topics: expectedTopics.map(topic => ({ topic })),
waitForLeaders: true,
})
} catch (err) {
console.log('Error creating topics: ', err)
}
// Validate that we have our topics
myTopics = await getMyTopics()
if (myTopics.length !== expectedTopics.length) {
console.log('We are missing the %s topics: %j', expectedTopics.length, myTopics)
}
console.log('Add new partitions')
try {
await admin.createPartitions({
topicPartitions: [
{
topic: topic1,
count: 3, // test with no assignments
},
{
topic: topic2,
count: 4,
// test with correct assignments
assignments: [[1], [0], [2]],
},
{
topic: topic3,
count: 5,
// test with one of the assignment null
assignments: [[1], [0], [2], [1]],
},
],
})
} catch (err) {
console.log('Error: ', err)
throw err
}
console.log('Wait for new partitions take effect')
await wait(5)
// Read now information back to check
myTopics = await getMyTopics()
console.log(
'Topics: %s',
myTopics.map(t => `${t.name} => ${t.partitions.length} part`).join(', ')
)
await admin.disconnect()
}

run().catch(e => console.error(`[example/admin] ${e.message}`, e))

const errorTypes = ['unhandledRejection', 'uncaughtException']
const signalTraps = ['SIGTERM', 'SIGINT', 'SIGUSR2']

errorTypes.map(type => {
process.on(type, async e => {
try {
console.log(`process.on ${type}`)
console.error(e)
await admin.disconnect()
process.exit(0)
} catch (_) {
process.exit(1)
}
})
})

signalTraps.map(type => {
process.once(type, async () => {
try {
await admin.disconnect()
} finally {
process.kill(process.pid, type)
}
})
})
40 changes: 40 additions & 0 deletions src/admin/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,45 @@ module.exports = ({
}
})
}
/**
* @param {array} topicPartitions
* @param {boolean} [validateOnly=false]
* @param {number} [timeout=5000]
* @return {Promise}
*/
const createPartitions = async ({ topicPartitions, validateOnly, timeout}) => {
if (!topicPartitions || !Array.isArray(topicPartitions)) {
throw new KafkaJSNonRetriableError(`Invalid topics partitions array ${topicPartitions}`)
}
if (topicPartitions.length === 0){
throw new KafkaJSNonRetriableError(`Empty topics partitions array`)
}

if (topicPartitions.filter(({ topic }) => typeof topic !== 'string').length > 0) {
throw new KafkaJSNonRetriableError(
'Invalid topicPartitions array, the topic names have to be a valid string'
)
}

const retrier = createRetry(retry)

return retrier(async (bail, retryCount, retryTime) => {
try {
await cluster.refreshMetadata()
const broker = await cluster.findControllerBroker()
await broker.createPartitions({ topicPartitions, validateOnly, timeout })

return true
} catch (e) {
if (e.type === 'NOT_CONTROLLER') {
logger.warn('Could not create topics', { error: e.message, retryCount, retryTime })
throw e
}

bail(e)
}
})
}

/**
* @param {string[]} topics
Expand Down Expand Up @@ -685,6 +724,7 @@ module.exports = ({
disconnect,
createTopics,
deleteTopics,
createPartitions,
getTopicMetadata,
fetchTopicMetadata,
describeCluster,
Expand Down
20 changes: 20 additions & 0 deletions src/broker/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,26 @@ module.exports = class Broker {
return await this.connection.send(createTopics({ topics, validateOnly, timeout }))
}

/**
* @public
* @param {Array} topicPartitions e.g:
* [
* {
* topic: 'topic-name',
* newAssignments: []
* }
* ]
* @param {boolean} [validateOnly=false] If this is true, the request will be validated, but the topic
* won't be created
* @param {number} [timeout=5000] The time in ms to wait for a topic to be completely created
* on the controller node
* @returns {Promise}
*/
async createPartitions({ topicPartitions, validateOnly = false, timeout = 5000 }) {
const createPartitions = this.lookupRequest(apiKeys.CreatePartitions, requests.CreatePartitions)
return await this.connection.send(createPartitions({ topicPartitions, validateOnly, timeout }))
}

/**
* @public
* @param {Array<string>} topics An array of topics to be deleted
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"type":"Buffer","data":[0,0,0,2,0,31,116,101,115,116,45,116,111,112,105,99,45,99,56,100,56,99,97,51,100,57,53,52,57,53,99,54,98,57,48,48,100,0,0,0,3,255,255,255,255,0,31,116,101,115,116,45,116,111,112,105,99,45,48,53,48,102,98,50,101,54,97,101,100,49,51,97,57,53,52,50,56,56,0,0,0,5,0,0,0,3,0,0,0,1,0,0,0,0,0,0,0,1,0,0,0,1,0,0,0,1,0,0,0,2,0,0,19,136,0]}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"type":"Buffer","data":[0,0,0,2,0,31,116,101,115,116,45,116,111,112,105,99,45,99,56,100,56,99,97,51,100,57,53,52,57,53,99,54,98,57,48,48,100,0,0,0,31,116,101,115,116,45,116,111,112,105,99,45,48,53,48,102,98,50,101,54,97,101,100,49,51,97,57,53,52,50,56,56,0,0]}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"type":"Buffer","data":[0,0,0,2,0,31,116,101,115,116,45,116,111,112,105,99,45,99,56,100,56,99,97,51,100,57,53,52,57,53,99,54,98,57,48,48,100,0,0,0,3,255,255,255,255,0,31,116,101,115,116,45,116,111,112,105,99,45,48,53,48,102,98,50,101,54,97,101,100,49,51,97,57,53,52,50,56,56,0,0,0,5,0,0,0,3,0,0,0,1,0,0,0,0,0,0,0,1,0,0,0,1,0,0,0,1,0,0,0,2,0,0,19,136,0]}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"type":"Buffer","data":[0,0,0,1,0,31,116,101,115,116,45,116,111,112,105,99,45,57,56,97,98,101,57,55,100,52,49,50,100,102,55,54,98,52,100,101,98,0,0,255,255]}
17 changes: 17 additions & 0 deletions src/protocol/requests/createPartitions/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
const versions = {
0: ({ topicPartitions, timeout, validateOnly }) => {
const request = require('./v0/request')
const response = require('./v0/response')
return { request: request({ topicPartitions, timeout, validateOnly }), response }
},
1: ({ topicPartitions, validateOnly, timeout }) => {
const request = require('./v1/request')
const response = require('./v1/response')
return { request: request({ topicPartitions, validateOnly, timeout }), response }
},
}

module.exports = {
versions: Object.keys(versions),
protocol: ({ version }) => versions[version],
}
42 changes: 42 additions & 0 deletions src/protocol/requests/createPartitions/v0/request.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
const Encoder = require('../../../encoder')
const { CreatePartitions: apiKey } = require('../../apiKeys')


/**
* CreatePartitions Request (Version: 0) => [topic_partitions] timeout validate_only
* topic_partitions => topic new_partitions
* topic => STRING
* new_partitions => count [assignment]
* count => INT32
* assignment => ARRAY(INT32)
* timeout => INT32
* validate_only => BOOLEAN
*/

module.exports = ({ topicPartitions, validateOnly = false, timeout = 5000 }) => ({
apiKey,
apiVersion: 0,
apiName: 'CreatePartitions',
encode: async () => {
return new Encoder()
.writeArray(topicPartitions.map(encodeTopicPartitions))
.writeInt32(timeout)
.writeBoolean(validateOnly)
},
})

const encodeTopicPartitions = ({
topic,
count,
assignments = [],
}) => {
return new Encoder()
.writeString(topic)
.writeInt32(count)
.writeNullableArray(assignments.map(encodeAssignments))
}

const encodeAssignments = (brokerIds) => {
return new Encoder()
.writeNullableArray(brokerIds)
}
31 changes: 31 additions & 0 deletions src/protocol/requests/createPartitions/v0/request.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
const apiKeys = require('../../apiKeys')
const RequestV0Protocol = require('./request')

describe('Protocol > Requests > CreatePartitions > v0', () => {
describe('request', () => {
test('metadata about the API', () => {
const request = RequestV0Protocol({})
expect(request.apiKey).toEqual(apiKeys.CreatePartitions)
expect(request.apiVersion).toEqual(0)
expect(request.apiName).toEqual('CreatePartitions')
})

test('encode', async () => {
const { buffer } = await RequestV0Protocol({
topicPartitions: [
{
topic: 'test-topic-c8d8ca3d95495c6b900d',
count: 3,
},
{
topic: 'test-topic-050fb2e6aed13a954288',
count: 5,
assignments: [[0], [1], [2]],
},
],
timeout: 5000,
}).encode()
expect(buffer).toEqual(Buffer.from(require('../fixtures/v0_request.json')))
})
})
})
Loading

0 comments on commit 9347beb

Please sign in to comment.