-
-
Notifications
You must be signed in to change notification settings - Fork 526
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
EoS #200 idempotent producer #203
Merged
Merged
Changes from 11 commits
Commits
Show all changes
16 commits
Select commit
Hold shift + click to select a range
ee21e6f
EoS #200 Create a transaction manager and call initProducerId for ide…
ianwsperber 154aee7
EoS #200 Support passing "firstSequence" to record batches
ianwsperber b426f49
EoS #200 Increment sequence for idempotent producers
ianwsperber 807619e
EoS #200 Test firstSequence is incremented in sendMessages
ianwsperber c48b900
EoS #200 Test broker produce with idempotent producer
ianwsperber bd321cd
EoS #200 Document transaction manager
ianwsperber 7f87ce3
EoS #200 Transaction manager should refresh metadata only if necessary
ianwsperber 8358494
EoS #200 Run producer send message tests with idempotent flag as well
ianwsperber 4c2db54
EoS #200 Test default retry value for idempotent producer
ianwsperber 3e4faab
EoS #200 Throw error if retries < 1
ianwsperber 6fd6a21
EoS #200 Document "idempotent" experimental flag in README
ianwsperber 80a4947
Cosmetic changes
tulios 3b719d9
Add default object
tulios b389772
Document and align transaction timeout with the Java client
tulios df65f8a
#203 Defensive coding for max signed int32
ianwsperber 413dc2f
EoS #173 Only init producer id once
ianwsperber File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,9 +1,12 @@ | ||
module.exports = topicDataForBroker => { | ||
return topicDataForBroker.map(({ topic, partitions, messagesPerPartition }) => ({ | ||
topic, | ||
partitions: partitions.map(partition => ({ | ||
partition, | ||
messages: messagesPerPartition[partition], | ||
})), | ||
})) | ||
return topicDataForBroker.map( | ||
({ topic, partitions, messagesPerPartition, sequencePerPartition }) => ({ | ||
topic, | ||
partitions: partitions.map(partition => ({ | ||
partition, | ||
firstSequence: sequencePerPartition[partition], | ||
messages: messagesPerPartition[partition], | ||
})), | ||
}) | ||
) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
const createTopicData = require('./createTopicData') | ||
|
||
describe('Producer > createTopicData', () => { | ||
let topic, partitions, messagesPerPartition, sequencePerPartition | ||
|
||
beforeEach(() => { | ||
topic = 'test-topic' | ||
partitions = [1, 2, 3] | ||
|
||
messagesPerPartition = { | ||
1: [{ key: '1' }], | ||
2: [{ key: '2' }], | ||
3: [{ key: '3' }, { key: '4' }], | ||
} | ||
|
||
sequencePerPartition = { | ||
1: 0, | ||
2: 5, | ||
3: 10, | ||
} | ||
}) | ||
|
||
test('format data by topic and partition', () => { | ||
const result = createTopicData([ | ||
{ topic, partitions, messagesPerPartition, sequencePerPartition }, | ||
]) | ||
expect(result).toEqual([ | ||
{ | ||
topic, | ||
partitions: [ | ||
{ partition: 1, firstSequence: 0, messages: [{ key: '1' }] }, | ||
{ partition: 2, firstSequence: 5, messages: [{ key: '2' }] }, | ||
{ partition: 3, firstSequence: 10, messages: [{ key: '3' }, { key: '4' }] }, | ||
], | ||
}, | ||
]) | ||
}) | ||
}) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently I pass
transactionManager
to the factory method. I do wonder if ultimately it will make more sense to provide a transaction manager in thesendMessages
call. Could also allow us to make the transaction manager less stateful. Currently it doesn't really matter but it may be something to refactor in #173There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In what way would it make the transaction manager less stateful? Maybe I'm missing something, but passing it into the factory method seems much more appropriate than passing it with each sendMessage call.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're probably right @Nevon, it's just that once we introduce transactions the transaction manager will become a bit of a state machine and I was trying to think of strategies to minimize the state we have to manage.
One way of minimizing that would to be to have the transaction manager fetch the producer id on instantiation, so that there's no ambiguity about whether the idempotent producer has already be initialized. Since that fetch occur after connection that would probably mean passing the transaction manager into the send messages method, or lazily creating the sendMessages instance.
Ultimately I do think what I've implemented is the most straight forward solution, with the benefit that it closely mirrors how transactions are managed in other Kafka libraries. Thus far in practice I haven't had an issue working with the state machine (see preview here, just thinking through the alternatives.