Skip to content

Commit

Permalink
Merge pull request #635 from Ben-M/roundRobinAssigner
Browse files Browse the repository at this point in the history
Improve balance in the RoundRobinAssigner.
  • Loading branch information
tulios authored Feb 5, 2020
2 parents 7fc1a14 + 52f4238 commit 987db4b
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 19 deletions.
32 changes: 18 additions & 14 deletions src/consumer/assigners/roundRobinAssigner/index.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
const { MemberMetadata, MemberAssignment } = require('../../assignerProtocol')
const flatten = require('../../../utils/flatten')

/**
* RoundRobinAssigner
Expand All @@ -15,7 +16,6 @@ module.exports = ({ cluster }) => ({
* The members array contains information about each member, `memberMetadata` is the result of the
* `protocol` operation.
*
* This process can result in imbalanced assignments
* @param {array} members array of members, e.g:
[{ memberId: 'test-5f93f5a3', memberMetadata: Buffer }]
* @param {array} topics
Expand All @@ -25,14 +25,14 @@ module.exports = ({ cluster }) => ({
* memberId: 'test-5f93f5a3',
* memberAssignment: {
* 'topic-A': [0, 2, 4, 6],
* 'topic-B': [0, 2],
* 'topic-B': [1],
* },
* },
* {
* memberId: 'test-3d3d5341',
* memberAssignment: {
* 'topic-A': [1, 3, 5],
* 'topic-B': [1],
* 'topic-B': [0, 2],
* },
* }
* ]
Expand All @@ -42,20 +42,24 @@ module.exports = ({ cluster }) => ({
const sortedMembers = members.map(({ memberId }) => memberId).sort()
const assignment = {}

sortedMembers.forEach(memberId => {
assignment[memberId] = {}
const topicsPartionArrays = topics.map(topic => {
const partitionMetadata = cluster.findTopicPartitionMetadata(topic)
return partitionMetadata.map(m => ({ topic: topic, partitionId: m.partitionId }))
})
const topicsPartitions = flatten(topicsPartionArrays)

topics.forEach(topic => {
const partitionMetadata = cluster.findTopicPartitionMetadata(topic)
const partitions = partitionMetadata.map(m => m.partitionId)
sortedMembers.forEach((memberId, i) => {
if (!assignment[memberId][topic]) {
assignment[memberId][topic] = []
}
topicsPartitions.forEach((topicPartition, i) => {
const assignee = sortedMembers[i % membersCount]

if (!assignment[assignee]) {
assignment[assignee] = []
}

if (!assignment[assignee][topicPartition.topic]) {
assignment[assignee][topicPartition.topic] = []
}

assignment[memberId][topic].push(...partitions.filter(id => id % membersCount === i))
})
assignment[assignee][topicPartition.topic].push(topicPartition.partitionId)
})

return Object.keys(assignment).map(memberId => ({
Expand Down
10 changes: 5 additions & 5 deletions src/consumer/assigners/roundRobinAssigner/index.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ describe('Consumer > assigners > RoundRobinAssigner', () => {
})

describe('#assign', () => {
test('assign all partitions evenly', async () => {
test('assign all topic-partitions evenly', async () => {
metadata['topic-A'] = Array(14)
.fill()
.map((_, i) => ({ partitionId: i }))
Expand All @@ -37,7 +37,7 @@ describe('Consumer > assigners > RoundRobinAssigner', () => {
version: assigner.version,
assignment: {
'topic-A': [0, 4, 8, 12],
'topic-B': [0, 4],
'topic-B': [2],
},
}),
},
Expand All @@ -47,7 +47,7 @@ describe('Consumer > assigners > RoundRobinAssigner', () => {
version: assigner.version,
assignment: {
'topic-A': [1, 5, 9, 13],
'topic-B': [1],
'topic-B': [3],
},
}),
},
Expand All @@ -57,7 +57,7 @@ describe('Consumer > assigners > RoundRobinAssigner', () => {
version: assigner.version,
assignment: {
'topic-A': [2, 6, 10],
'topic-B': [2],
'topic-B': [0, 4],
},
}),
},
Expand All @@ -67,7 +67,7 @@ describe('Consumer > assigners > RoundRobinAssigner', () => {
version: assigner.version,
assignment: {
'topic-A': [3, 7, 11],
'topic-B': [3],
'topic-B': [1],
},
}),
},
Expand Down

0 comments on commit 987db4b

Please sign in to comment.