-
Notifications
You must be signed in to change notification settings - Fork 14.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-9169: fix standby checkpoint initialization #7681
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -46,7 +46,6 @@ public class StandbyTask extends AbstractTask { | |
private boolean updateOffsetLimits; | ||
private final Sensor closeTaskSensor; | ||
private final Map<TopicPartition, Long> offsetLimits = new HashMap<>(); | ||
private Map<TopicPartition, Long> checkpointedOffsets = new HashMap<>(); | ||
|
||
/** | ||
* Create {@link StandbyTask} with its assigned partitions | ||
|
@@ -86,7 +85,6 @@ public void initializeMetadata() {} | |
public boolean initializeStateStores() { | ||
log.trace("Initializing state stores"); | ||
registerStateStores(); | ||
checkpointedOffsets = Collections.unmodifiableMap(stateMgr.checkpointed()); | ||
processorContext.initialize(); | ||
taskInitialized = true; | ||
return true; | ||
|
@@ -195,7 +193,7 @@ public List<ConsumerRecord<byte[], byte[]>> update(final TopicPartition partitio | |
} | ||
|
||
Map<TopicPartition, Long> checkpointedOffsets() { | ||
return checkpointedOffsets; | ||
return Collections.unmodifiableMap(stateMgr.checkpointed()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The main fix for this PR. Report the current |
||
} | ||
|
||
private long updateOffsetLimits(final TopicPartition partition) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -440,6 +440,7 @@ private void assignStandbyPartitions() { | |
checkpointedOffsets.putAll(standbyTask.checkpointedOffsets()); | ||
} | ||
|
||
log.debug("Assigning and seeking restoreConsumer to {}", checkpointedOffsets); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Adding a DEBUG log. This is also in support of the new system tests, but I think it's again nice to have this information available. I actually added this log while debugging the issue, before I wrote the test, to manually verify the bug. |
||
restoreConsumerAssignedStandbys = true; | ||
restoreConsumer.assign(checkpointedOffsets.keySet()); | ||
for (final Map.Entry<TopicPartition, Long> entry : checkpointedOffsets.entrySet()) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -35,13 +35,15 @@ class StreamsStandbyTask(BaseStreamsTest): | |
def __init__(self, test_context): | ||
super(StreamsStandbyTask, self).__init__(test_context, | ||
topics={ | ||
self.streams_source_topic: {'partitions': 6, 'replication-factor': 1}, | ||
self.streams_sink_topic_1: {'partitions': 1, 'replication-factor': 1}, | ||
self.streams_sink_topic_2: {'partitions': 1, 'replication-factor': 1} | ||
self.streams_source_topic: {'partitions': 6, | ||
'replication-factor': 1}, | ||
self.streams_sink_topic_1: {'partitions': 1, | ||
'replication-factor': 1}, | ||
self.streams_sink_topic_2: {'partitions': 1, | ||
'replication-factor': 1} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. lines were too long. |
||
}) | ||
|
||
def test_standby_tasks_rebalance(self): | ||
|
||
configs = self.get_configs(",sourceTopic=%s,sinkTopic1=%s,sinkTopic2=%s" % (self.streams_source_topic, | ||
self.streams_sink_topic_1, | ||
self.streams_sink_topic_2)) | ||
|
@@ -108,13 +110,85 @@ def test_standby_tasks_rebalance(self): | |
self.wait_for_verification(processor_3, "ACTIVE_TASKS:2 STANDBY_TASKS:2", processor_3.STDOUT_FILE) | ||
self.wait_for_verification(processor_2, "ACTIVE_TASKS:2 STANDBY_TASKS:2", processor_2.STDOUT_FILE, num_lines=2) | ||
|
||
self.assert_consume(self.client_id, "assert all messages consumed from %s" % self.streams_sink_topic_1, self.streams_sink_topic_1, self.num_messages) | ||
self.assert_consume(self.client_id, "assert all messages consumed from %s" % self.streams_sink_topic_2, self.streams_sink_topic_2, self.num_messages) | ||
self.assert_consume(self.client_id, "assert all messages consumed from %s" % self.streams_sink_topic_1, | ||
self.streams_sink_topic_1, self.num_messages) | ||
self.assert_consume(self.client_id, "assert all messages consumed from %s" % self.streams_sink_topic_2, | ||
self.streams_sink_topic_2, self.num_messages) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. lines were too long |
||
|
||
wait_until(lambda: producer.num_acked >= self.num_messages, | ||
timeout_sec=60, | ||
err_msg="Failed to send all %s messages" % str(self.num_messages)) | ||
|
||
producer.stop() | ||
|
||
processor_1.stop() | ||
processor_2.stop() | ||
processor_3.stop() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There's no reason not to stop the processors at this point. |
||
|
||
# Validate the checkpoint/restore logs for monotonicity | ||
# This was added to ensure that standby restoration resumes from the checkpoint | ||
# rather than the beginning of the changelog, as part of KAFKA-9169 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This comment should explain the new verification I've added. |
||
|
||
# First, process the logs to look for invariant violations | ||
processor_1.node.account.ssh(validateMonotonicCheckpointsCmd(processor_1.LOG_FILE, processor_1.STDOUT_FILE)) | ||
processor_2.node.account.ssh(validateMonotonicCheckpointsCmd(processor_2.LOG_FILE, processor_2.STDOUT_FILE)) | ||
processor_3.node.account.ssh(validateMonotonicCheckpointsCmd(processor_3.LOG_FILE, processor_3.STDOUT_FILE)) | ||
|
||
# Second, check to make sure no invariant violations were reported | ||
processor_1.node.account.ssh("! grep ERROR " + processor_1.STDOUT_FILE, allow_fail=False) | ||
processor_2.node.account.ssh("! grep ERROR " + processor_2.STDOUT_FILE, allow_fail=False) | ||
processor_3.node.account.ssh("! grep ERROR " + processor_3.STDOUT_FILE, allow_fail=False) | ||
|
||
|
||
def validateMonotonicCheckpointsCmd(log_file, stdout_file): | ||
""" | ||
Enforces an invariant that, if we look at the offsets written to | ||
checkpoint files and offsets used to resume the restore consumer, | ||
for a given topic/partition, we should always observe the offsets | ||
to be non-decreasing. | ||
Note that this specifically would not hold for EOS in an unclean | ||
shutdown, but outside of that, we should be able to rely on it. | ||
""" | ||
# This script gets turned into a one-liner and executed over SSH | ||
# | ||
# The idea here is to parse the logs and enforce an invariant. This | ||
# should be resilient against meaningless variations in what tasks | ||
# exactly get assigned to which instances and other factors that could | ||
# make this test flaky. | ||
# | ||
# A quick overview, which should make this easier to read: | ||
# PK is prior key | ||
# PV is prior value | ||
# | ||
# 1. Extract only the relevant lines from the log (grep) | ||
# 2. The log lines contain a map of topic-partition -> offset, | ||
# so the role of the sed expressions is to extract those map values | ||
# onto one k/v pair per line. | ||
# 3. The sort is a _stable_ sort on the key, which puts all the | ||
# events for a key together, while preserving their relative order | ||
# 4. Now, we use K (key), V (value), PK, and PV to make sure that | ||
# the offset (V) for each topic-partition (K) is non-decreasing. | ||
# 5. If this invariant is violated, log an ERROR (so we can check for it later) | ||
|
||
return "PK=''; " \ | ||
"PV=-999; " \ | ||
"cat %s " \ | ||
"| grep 'Assigning and seeking\|Writing checkpoint' " \ | ||
"| sed -e 's/.*{\(.*\)}.*/\\1/' -e 's/, /\\n/g' -e 's/=/\\t/g' " \ | ||
"| sort --key=1,1 --stable " \ | ||
"| while read LINE; do" \ | ||
" if [[ ${LINE} ]]; then" \ | ||
" K=$(cut -f1 <<< ${LINE});" \ | ||
" V=$(cut -f2 <<< ${LINE});" \ | ||
" if [[ ${K} != ${PK} ]]; then" \ | ||
" PK=${K};" \ | ||
" PV=${V};" \ | ||
" echo \"INFO: First occurrence of ${K}; set PV=${V}.\";" \ | ||
" elif [[ ${V} -lt ${PV} ]]; then" \ | ||
" echo \"ERROR: ${K} offset regressed from ${PV} to ${V}.\"; " \ | ||
" else" \ | ||
" PV=${V};" \ | ||
" echo \"INFO: Updated ${K} to ${V}.\";" \ | ||
" fi;" \ | ||
" fi;" \ | ||
" done >> %s" % (log_file, stdout_file) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed a redundant log line and upgraded this log to DEBUG.
System tests run at DEBUG level, so this change makes this log visible to the system tests. Ideally, I could have just turned this class's logger to TRACE, but it happens to be a real pain with the way the system tests are set up.
I think this is reasonable, though, since this is not a high-volume log message, and it's pretty handy to know this information while debugging state restoration and initialization.