Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EoS #200 idempotent producer #203

Merged
merged 16 commits into from
Nov 15, 2018
Merged

EoS #200 idempotent producer #203

merged 16 commits into from
Nov 15, 2018

Conversation

ianwsperber
Copy link
Contributor

@ianwsperber ianwsperber commented Nov 13, 2018

Resolves #200

Add an idempotent flag to the producer factory method to support creation of idempotent producers. Documented this as an "experimental" feature, since I haven't had a chance to observe its behavior in practice yet.

Implementation details

  • Idempotent producer will init the PID & epoch during the producer.connect() call
  • Idempotent producer uses the transactionManager to track sequences across messages and include in produce request
  • Idempotent producer enforces config optionsacks=-1 and retries>0. Default retry of MAX_SAFE_INTEGER

This PR additionally includes a few tests verifying the recent work allowing the firstSequence option to be provided in record batches.

@ianwsperber ianwsperber self-assigned this Nov 13, 2018
const logger = rootLogger.namespace('Producer')
const sendMessages = createSendMessages({ logger, cluster, partitioner })
const transactionManager = createTransactionManager({ logger, cluster })
const sendMessages = createSendMessages({ logger, cluster, partitioner, transactionManager })
Copy link
Contributor Author

@ianwsperber ianwsperber Nov 13, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently I pass transactionManager to the factory method. I do wonder if ultimately it will make more sense to provide a transaction manager in the sendMessages call. Could also allow us to make the transaction manager less stateful. Currently it doesn't really matter but it may be something to refactor in #173

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In what way would it make the transaction manager less stateful? Maybe I'm missing something, but passing it into the factory method seems much more appropriate than passing it with each sendMessage call.

Copy link
Contributor Author

@ianwsperber ianwsperber Nov 14, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're probably right @Nevon, it's just that once we introduce transactions the transaction manager will become a bit of a state machine and I was trying to think of strategies to minimize the state we have to manage.

One way of minimizing that would to be to have the transaction manager fetch the producer id on instantiation, so that there's no ambiguity about whether the idempotent producer has already be initialized. Since that fetch occur after connection that would probably mean passing the transaction manager into the send messages method, or lazily creating the sendMessages instance.

Ultimately I do think what I've implemented is the most straight forward solution, with the benefit that it closely mirrors how transactions are managed in other Kafka libraries. Thus far in practice I haven't had an issue working with the state machine (see preview here, just thinking through the alternatives.

const size = topicMetadata.get(topicName).messagesPerPartition[partition].length
const previous = topicMetadata.get(topicName).sequencePerPartition[partition]

transactionManager.updateSequence(topicName, partition, previous + size)
Copy link
Owner

@tulios tulios Nov 14, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The sequence number is an int32; it will eventually reach the max number. We will need to rotate the number, similar to what I do with the correlation id.

nextCorrelationId() {
if (this.correlationId === Number.MAX_VALUE) {
this.correlationId = 0
}
return this.correlationId++
}

WDYT?

We should also move this logic to the transaction manager:

transactionManager.updateSequence({ topicName, partition, previousSequence })

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me! Worth noting that the Java implementation appears not to bother with this check (will implement nonetheless): https://github.com/apache/kafka/blob/9a0ea25fee85837748145d37c69cf4d9bb7f9933/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L417

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tulios Also, Number.MAX_VALUE is significantly larger than the maximum (signed) Int32 value (2 ^ 32 vs 2 ^ 1024). AFAIK JS has no predefined constant for Int32, so I'll need to define my own.

const broker = await cluster.findControllerBroker()
const result = await broker.initProducerId({ transactionTimeout })

producerId = result.producerId
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we preserve this across reconnections?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tulios Yes I think we should, good point! I'll update the producer so that it only calls initProducerId if uninitialized.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ianwsperber will you do this on the transactional producer PR or this one? (If you decide for the transactional PR we can merge)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tulios Forgot about this, just pushed a commit to address

Copy link
Owner

@tulios tulios left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Outstanding work @ianwsperber, I pushed some cosmetic changes. I think it looks good; I just have a couple of questions regarding the sequence number and the init.

Thanks for the great work.

@ianwsperber
Copy link
Contributor Author

@tulios @Nevon Thanks for the feedback! Made the requested changes for updating sequence

@tulios tulios merged commit 5cf2fff into master Nov 15, 2018
@tulios tulios deleted the eos-200-idempotent-producer branch November 29, 2018 08:43
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants