-
-
Notifications
You must be signed in to change notification settings - Fork 527
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Improve offset commit handling #775
Merged
Merged
Conversation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This function "does something", and is getting called inside a tight loop inside a critical area. Essentially we can avoid the call overhead and the condition check in each invocation here.
This skips a bunch of redundant property lookups: We can grab the needed partition offsets array immediate for each topic, and then have the subtraction logic simply process each partition of that.
KafkaJS keeps track of offsets as strings and Longs to avoid problems with 64bit offsets. This does require frequent string->Long and Long->string conversions so that the Long parts don't leak into the interface. In the case of offsets in the offset manager we might also have undefined/null or empty offset strings, and isInvalidOffset needs to be able to detect these. But: We don't expect to have arbitrary strings here, and figuring out whether a thing is undefined/null/'' doesn't need a regular expression that checks the complete string -- a simple '!offset' is just fine.
Long.compare does a lot more than what we need here: We just want to know whether the offset is negative (-1 for undefined, or potentially one of the special values).
In some cases it seems possible that the committed offset isn't '-1', but rather undefined: ~~~~ kafkajs.Runner - Error when calling eachBatch ({"topic":"carrier-events","partition":0,"offset":"13435","stack":"TypeError: Cannot read property 'low' of undefined\n at Function.fromValue (/app/node_modules/long/src/long.js:286:25)\n at subtractOffsets (/app/node_modules/@collaborne/kafkajs/src/consumer/offsetManager/index.js:110:44)\n at subtractPartitionOffsets (/app/node_modules/@collaborne/kafkajs/src/consumer/offsetManager/index.js:114:7)\n at /app/node_modules/@collaborne/kafkajs/src/consumer/offsetManager/index.js:120:44\n at Array.map (<anonymous>)\n at subtractTopicOffsets (/app/node_modules/@collaborne/kafkajs/src/consumer/offsetManager/index.js:120:27)\n at Array.map (<anonymous>)\n at OffsetManager.countResolvedOffsets (/app/node_modules/@collaborne/kafkajs/src/consumer/offsetManager/index.js:122:37)\n at OffsetManager.commitOffsetsIfNecessary (/app/node_modules/@collaborne/kafkajs/src/consumer/offsetManager/index.js:189:12)\n at ConsumerGroup.commitOffsetsIfNecessary (/app/node_modules/@collaborne/kafkajs/src/consumer/consumerGroup.js:295:30)\n at commitOffsetsIfNecessary (/app/node_modules/@collaborne/kafkajs/src/consumer/runner.js:235:34)\n at messageGenerator (/app/node_modules/@collaborne/event-stream-library/src/client/shared-consumer.ts:341:10)\n at runMicrotasks (<anonymous>)\n at runNextTicks (internal/process/task_queues.js:62:5)\n at processImmediate (internal/timers.js:429:9)\n at process.topLevelDomainCallback (domain.js:137:15)"}) ~~~~ From what I can see this might be related to `seek` and `resolve` operations overlapping, which is possible: `seek` is only possible when the consumer is running, and when the consumer is running it will do resolving/committing. This is likely a bigger issue when the consumer group contains many topics. After reading the code for quite a while I think the fix here is trivial: Instead of checking explicitly for `-1` we simply should use the helper function created for that purpose.
goriunov
reviewed
Jun 23, 2020
Pointed-out-by: @goriunov
Nevon
approved these changes
Jun 23, 2020
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
The main part of this PR is fixing the problem that batch processing sometimes will fail with this or a similar stack trace:
Note that this is using @collaborne/kafkajs, our own fork of KafkaJS containing a bunch of the PRs that are still under review but that we found work well in practice. We have seen the same errors also with stock KafkaJS versions.
Basically after reviewing this error and the code in the mentioned areas the conclusion is pretty simple: This is a race between seeking and committing/resolving. The race is hard to reproduce in a controlled way, but the fix looks pretty trivial:
There might be other ways to prevent the race, for example by carefully going over the state changes of the committed and resolved offsets, but given that the code already checked for the "-1" special value switching this out with
isInvalidOffset
(which checks undefined/null-ness as well) looks simple enough.The other commits here are just "fall-out" from reading the code: