Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Only fetch for partitions with initialized offsets #582

Merged
merged 2 commits into from
Dec 9, 2019

Conversation

Nevon
Copy link
Collaborator

@Nevon Nevon commented Dec 6, 2019

Second attempt at solving the issue with concurrent recovery of offset out of range, this time without locks.

Fixes #555

Closes #578

@tulios
Copy link
Owner

tulios commented Dec 6, 2019

@JaapRood this is a new approach to the problem describe in the Lock PR.

@Nevon
Copy link
Collaborator Author

Nevon commented Dec 6, 2019

To reproduce the issue, I ran the branch replace-fetch-promise-all-with-generator. First I created a topic with the following configuration:

{
  topic,
  numPartitions: 3,
  configEntries: [
    { name: 'delete.retention.ms', value: '100' },
    { name: 'file.delete.delay.ms', value: '200' },
    { name: 'min.cleanable.dirty.ratio', value: '0.01' },
    { name: 'retention.ms', value: '1' },
    { name: 'segment.bytes', value: '50000' },
    { name: 'max.compaction.lag.ms', value: '200' },
  ],
}

I produced a bunch of messages to that topic, and then I started a consumer and consumed a bit. Then I killed my consumer and kept producing until the broker did a cleanup of old segments (this takes minutes btw...) Now when I started the consumer again, it would break because the old committed offset was now invalid, as described in #578.

With this fix, what happened instead was that we detected that some partitions didn't have a valid offset, and didn't fetch for them. Shortly after, each partition had recovered and fetches continued as usual.

@@ -368,13 +368,34 @@ module.exports = class ConsumerGroup {
)

const leaders = keys(partitionsPerLeader)
const committedOffsets = this.offsetManager.committedOffsets()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason why we're using committedOffsets here, instead of just resolved? An invalid offset could come from attempting to resume from a committed offset, but also from a consumer.seek. Your great comment touches on how both are cleared, so I'm not sure whether there would be any actual difference in behaviour, but as future changes are made this subtle difference might be harder to spot while becoming more consequential.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initially I was actually using resolvedOffsets, but that didn't work because OffsetManager.resolveOffsets actually just sets the initialized consumer offsets in committedOffsets, not in resolvedOffsets (did someone mention that our naming is confusing...? 😅). When the consumer first boots, it doesn't actually have any resolved offsets, so the only source of offsets is the initialized offsets in committedOffsets.

Regarding the seek behavior, I would expect it to work the same way, no? Seek would commit (potentially invalid) offsets and then clear both committedOffsets and resolvedOffsets using OffsetManager.clearOffsets. In the fetch loop we'd get the consumer offsets from the brokers and from there on it's the same.

Maybe I'm missing something?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding the seek behavior, I would expect it to work the same way, no? Seek would commit (potentially invalid) offsets

Seeking shouldn't commit, only move the "playhead", see #395, so to rely on that behaviour is probably not the thing we want.

Initially I was actually using resolvedOffsets, but that didn't work because OffsetManager.resolveOffsets actually just sets the initialized consumer offsets in committedOffsets, not in resolvedOffsets (did someone mention that our naming is confusing...? 😅).

I guess having to use committedOffsets is a symptom of there being an issue in there then. Conceptually, it's the resolvedOffsets (which I understand is the "next to consume offset" or "playhead" for reading the log) that should always exist and the committed offset which is optional, as using Kafka for committing offsets is / should be totally optional (see #395).

Since that seems like a different issue, maybe it's an idea we create a separate issue for it and tag that in a comment. Being able to spot outside of the context of these changes that we conceptually want the resolved offsets rather than committed there might be a lot to ask from our future selves (or others) 😅.

Copy link
Collaborator Author

@Nevon Nevon Dec 6, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since that seems like a different issue, maybe it's an idea we create a separate issue for it and tag that in a comment.

That sounds like a good idea. I would prefer to do that kind of holistic refactoring in a PR that doesn't actually change any behavior, rather than squeezing it into a bugfix. Could you create that issue?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created the issue, trying to preserve the context of this conversation properly: #585. To help the audit suggested in there, I'd suggest linking that issue in a comment above where committedOffsets() is called.

@JaapRood
Copy link
Collaborator

JaapRood commented Dec 6, 2019

Pragmatic fix, no locks, 🙌.

@tulios
Copy link
Owner

tulios commented Dec 9, 2019

The pre-release version 1.12.0-beta.11 was published with this fix.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants