Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-9169: fix standby checkpoint initialization #7681

Merged
merged 3 commits into from
Nov 14, 2019
Merged

KAFKA-9169: fix standby checkpoint initialization #7681

merged 3 commits into from
Nov 14, 2019

Conversation

vvcephei
Copy link
Contributor

Instead of caching the checkpoint map during StandbyTask initialization, use the latest checkpoints (which would have been updated during suspend).

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

Copy link
Member

@bbejeck bbejeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the fix @vvcephei! LGTM. Should we add a test for this fix?

@vvcephei
Copy link
Contributor Author

Thanks @bbejeck , not a bad idea...

I'll work on it.

@omkreddy
Copy link
Contributor

omkreddy commented Nov 12, 2019

@vvcephei Is this regression in 2.4 or existing bug from previous release? Normally we want to treat regression issues as blockers.

@vvcephei
Copy link
Contributor Author

Thanks, @omkreddy ,

It's pretty hard to tell when the regression was introduced, because there are a large number of code paths involved.

However, I've just ported my test to the 2.3 branch, and it does look like it is broken there, too.

It sounds like it should not be a blocker, then. I'll downgrade the ticket.

Copy link
Contributor Author

@vvcephei vvcephei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bbejeck ,

Can you take another look? System tests are running already.

log.trace("Checkpointable offsets updated with active acked offsets: {}", checkpointFileCache);

log.trace("Writing checkpoint: {}", checkpointFileCache);
log.debug("Writing checkpoint: {}", checkpointFileCache);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed a redundant log line and upgraded this log to DEBUG.

System tests run at DEBUG level, so this change makes this log visible to the system tests. Ideally, I could have just turned this class's logger to TRACE, but it happens to be a real pain with the way the system tests are set up.

I think this is reasonable, though, since this is not a high-volume log message, and it's pretty handy to know this information while debugging state restoration and initialization.

@@ -195,7 +193,7 @@ public void close(final boolean clean,
}

Map<TopicPartition, Long> checkpointedOffsets() {
return checkpointedOffsets;
return Collections.unmodifiableMap(stateMgr.checkpointed());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main fix for this PR. Report the current checkpointedOffsets, not the ones from when the standby task was first created.

@@ -440,6 +440,7 @@ private void assignStandbyPartitions() {
checkpointedOffsets.putAll(standbyTask.checkpointedOffsets());
}

log.debug("Assigning and seeking restoreConsumer to {}", checkpointedOffsets);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding a DEBUG log. This is also in support of the new system tests, but I think it's again nice to have this information available. I actually added this log while debugging the issue, before I wrote the test, to manually verify the bug.

self.streams_sink_topic_1: {'partitions': 1,
'replication-factor': 1},
self.streams_sink_topic_2: {'partitions': 1,
'replication-factor': 1}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lines were too long.

self.assert_consume(self.client_id, "assert all messages consumed from %s" % self.streams_sink_topic_1,
self.streams_sink_topic_1, self.num_messages)
self.assert_consume(self.client_id, "assert all messages consumed from %s" % self.streams_sink_topic_2,
self.streams_sink_topic_2, self.num_messages)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lines were too long


wait_until(lambda: producer.num_acked >= self.num_messages,
timeout_sec=60,
err_msg="Failed to send all %s messages" % str(self.num_messages))

producer.stop()

processor_1.stop()
processor_2.stop()
processor_3.stop()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no reason not to stop the processors at this point.


# Validate the checkpoint/restore logs for monotonicity
# This was added to ensure that standby restoration resumes from the checkpoint
# rather than the beginning of the changelog, as part of KAFKA-9169
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment should explain the new verification I've added.

@vvcephei vvcephei requested a review from bbejeck November 13, 2019 07:10
@vvcephei
Copy link
Contributor Author

Filed https://issues.apache.org/jira/browse/KAFKA-9179 for the unrelated JDK8/Scala11 test failure.

Copy link
Member

@bbejeck bbejeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the test @vvcephei. Overall I'm ok with the change to the system test, but I'm thinking a unit test is helpful as well.

I looked at the StandbyTaskTest and I updated the testUpdate() test so it will fail without your change.
After line 257 add this

final Map<TopicPartition, Long> originalCheckpointOffsets = task.checkpointedOffsets();
assertThat(originalCheckpointOffsets.get(partition2), equalTo(-1L));

Then after line 296 add this block

final Map<TopicPartition, Long> updatedCheckpointOffsets = task.checkpointedOffsets();
final long originalCheckpoint = originalCheckpointOffsets.get(partition2);
final long updatedCheckpoint = updatedCheckpointOffsets.get(partition2);
assertTrue("Original checkpoint should be less than updated checkpoint", originalCheckpoint < updatedCheckpoint);
assertThat(updatedCheckpoint, equalTo((31L)));

WDYT?

@vvcephei
Copy link
Contributor Author

Thanks, @bbejeck , I think that unit test assertion is a good idea. I've added it.

I think it still makes sense to keep the system test, though, since the reasoning on the mailing list thread about this was that, although it's true the checkpoint wasn't updated on this path, it would be fine because the restore consumer wouldn't actually be re-assigned after suspend resumes, just un-paused. Verifying that, though requires testing almost all of Streams, since it includes the code paths of StreamThread, as well as StreamPartitionAssignor/TaskManager/etc.

I know you didn't object to the system test, just felt like presenting the argument to keep it, for posterity.

@vvcephei
Copy link
Contributor Author

I'll wait for a green build and then merge. Thanks again for the review, @bbejeck !

@vvcephei vvcephei merged commit cac8560 into apache:trunk Nov 14, 2019
@vvcephei vvcephei deleted the KAFKA-9169 branch November 14, 2019 04:04
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants