-
Notifications
You must be signed in to change notification settings - Fork 14.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-9169: fix standby checkpoint initialization #7681
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the fix @vvcephei! LGTM. Should we add a test for this fix?
Thanks @bbejeck , not a bad idea... I'll work on it. |
@vvcephei Is this regression in 2.4 or existing bug from previous release? Normally we want to treat regression issues as blockers. |
Thanks, @omkreddy , It's pretty hard to tell when the regression was introduced, because there are a large number of code paths involved. However, I've just ported my test to the 2.3 branch, and it does look like it is broken there, too. It sounds like it should not be a blocker, then. I'll downgrade the ticket. |
FWIW, system tests from the original commit all passed:
Of course, I need to run them again, since I've changed a system test. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@bbejeck ,
Can you take another look? System tests are running already.
log.trace("Checkpointable offsets updated with active acked offsets: {}", checkpointFileCache); | ||
|
||
log.trace("Writing checkpoint: {}", checkpointFileCache); | ||
log.debug("Writing checkpoint: {}", checkpointFileCache); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed a redundant log line and upgraded this log to DEBUG.
System tests run at DEBUG level, so this change makes this log visible to the system tests. Ideally, I could have just turned this class's logger to TRACE, but it happens to be a real pain with the way the system tests are set up.
I think this is reasonable, though, since this is not a high-volume log message, and it's pretty handy to know this information while debugging state restoration and initialization.
@@ -195,7 +193,7 @@ public void close(final boolean clean, | |||
} | |||
|
|||
Map<TopicPartition, Long> checkpointedOffsets() { | |||
return checkpointedOffsets; | |||
return Collections.unmodifiableMap(stateMgr.checkpointed()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The main fix for this PR. Report the current checkpointedOffsets
, not the ones from when the standby task was first created.
@@ -440,6 +440,7 @@ private void assignStandbyPartitions() { | |||
checkpointedOffsets.putAll(standbyTask.checkpointedOffsets()); | |||
} | |||
|
|||
log.debug("Assigning and seeking restoreConsumer to {}", checkpointedOffsets); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding a DEBUG log. This is also in support of the new system tests, but I think it's again nice to have this information available. I actually added this log while debugging the issue, before I wrote the test, to manually verify the bug.
self.streams_sink_topic_1: {'partitions': 1, | ||
'replication-factor': 1}, | ||
self.streams_sink_topic_2: {'partitions': 1, | ||
'replication-factor': 1} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lines were too long.
self.assert_consume(self.client_id, "assert all messages consumed from %s" % self.streams_sink_topic_1, | ||
self.streams_sink_topic_1, self.num_messages) | ||
self.assert_consume(self.client_id, "assert all messages consumed from %s" % self.streams_sink_topic_2, | ||
self.streams_sink_topic_2, self.num_messages) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lines were too long
|
||
wait_until(lambda: producer.num_acked >= self.num_messages, | ||
timeout_sec=60, | ||
err_msg="Failed to send all %s messages" % str(self.num_messages)) | ||
|
||
producer.stop() | ||
|
||
processor_1.stop() | ||
processor_2.stop() | ||
processor_3.stop() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's no reason not to stop the processors at this point.
|
||
# Validate the checkpoint/restore logs for monotonicity | ||
# This was added to ensure that standby restoration resumes from the checkpoint | ||
# rather than the beginning of the changelog, as part of KAFKA-9169 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment should explain the new verification I've added.
System tests are still passing: |
Filed https://issues.apache.org/jira/browse/KAFKA-9179 for the unrelated JDK8/Scala11 test failure. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the test @vvcephei. Overall I'm ok with the change to the system test, but I'm thinking a unit test is helpful as well.
I looked at the StandbyTaskTest
and I updated the testUpdate()
test so it will fail without your change.
After line 257 add this
final Map<TopicPartition, Long> originalCheckpointOffsets = task.checkpointedOffsets();
assertThat(originalCheckpointOffsets.get(partition2), equalTo(-1L));
Then after line 296 add this block
final Map<TopicPartition, Long> updatedCheckpointOffsets = task.checkpointedOffsets();
final long originalCheckpoint = originalCheckpointOffsets.get(partition2);
final long updatedCheckpoint = updatedCheckpointOffsets.get(partition2);
assertTrue("Original checkpoint should be less than updated checkpoint", originalCheckpoint < updatedCheckpoint);
assertThat(updatedCheckpoint, equalTo((31L)));
WDYT?
Thanks, @bbejeck , I think that unit test assertion is a good idea. I've added it. I think it still makes sense to keep the system test, though, since the reasoning on the mailing list thread about this was that, although it's true the checkpoint wasn't updated on this path, it would be fine because the restore consumer wouldn't actually be re-assigned after suspend resumes, just un-paused. Verifying that, though requires testing almost all of Streams, since it includes the code paths of StreamThread, as well as StreamPartitionAssignor/TaskManager/etc. I know you didn't object to the system test, just felt like presenting the argument to keep it, for posterity. |
I'll wait for a green build and then merge. Thanks again for the review, @bbejeck ! |
Instead of caching the checkpoint map during StandbyTask initialization, use the latest checkpoints (which would have been updated during suspend).
Committer Checklist (excluded from commit message)