Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-9169: fix standby checkpoint initialization #7681

Merged
merged 3 commits into from
Nov 14, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -348,9 +348,7 @@ public void checkpoint(final Map<TopicPartition, Long> checkpointableOffsetsFrom

updateCheckpointFileCache(checkpointableOffsetsFromProcessing);

log.trace("Checkpointable offsets updated with active acked offsets: {}", checkpointFileCache);

log.trace("Writing checkpoint: {}", checkpointFileCache);
log.debug("Writing checkpoint: {}", checkpointFileCache);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed a redundant log line and upgraded this log to DEBUG.

System tests run at DEBUG level, so this change makes this log visible to the system tests. Ideally, I could have just turned this class's logger to TRACE, but it happens to be a real pain with the way the system tests are set up.

I think this is reasonable, though, since this is not a high-volume log message, and it's pretty handy to know this information while debugging state restoration and initialization.

try {
checkpointFile.write(checkpointFileCache);
} catch (final IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ public class StandbyTask extends AbstractTask {
private boolean updateOffsetLimits;
private final Sensor closeTaskSensor;
private final Map<TopicPartition, Long> offsetLimits = new HashMap<>();
private Map<TopicPartition, Long> checkpointedOffsets = new HashMap<>();

/**
* Create {@link StandbyTask} with its assigned partitions
Expand Down Expand Up @@ -86,7 +85,6 @@ public void initializeMetadata() {}
public boolean initializeStateStores() {
log.trace("Initializing state stores");
registerStateStores();
checkpointedOffsets = Collections.unmodifiableMap(stateMgr.checkpointed());
processorContext.initialize();
taskInitialized = true;
return true;
Expand Down Expand Up @@ -195,7 +193,7 @@ public List<ConsumerRecord<byte[], byte[]>> update(final TopicPartition partitio
}

Map<TopicPartition, Long> checkpointedOffsets() {
return checkpointedOffsets;
return Collections.unmodifiableMap(stateMgr.checkpointed());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main fix for this PR. Report the current checkpointedOffsets, not the ones from when the standby task was first created.

}

private long updateOffsetLimits(final TopicPartition partition) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,7 @@ private void assignStandbyPartitions() {
checkpointedOffsets.putAll(standbyTask.checkpointedOffsets());
}

log.debug("Assigning and seeking restoreConsumer to {}", checkpointedOffsets);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding a DEBUG log. This is also in support of the new system tests, but I think it's again nice to have this information available. I actually added this log while debugging the issue, before I wrote the test, to manually verify the bug.

restoreConsumerAssignedStandbys = true;
restoreConsumer.assign(checkpointedOffsets.keySet());
for (final Map.Entry<TopicPartition, Long> entry : checkpointedOffsets.entrySet()) {
Expand Down
86 changes: 80 additions & 6 deletions tests/kafkatest/tests/streams/streams_standby_replica_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,15 @@ class StreamsStandbyTask(BaseStreamsTest):
def __init__(self, test_context):
super(StreamsStandbyTask, self).__init__(test_context,
topics={
self.streams_source_topic: {'partitions': 6, 'replication-factor': 1},
self.streams_sink_topic_1: {'partitions': 1, 'replication-factor': 1},
self.streams_sink_topic_2: {'partitions': 1, 'replication-factor': 1}
self.streams_source_topic: {'partitions': 6,
'replication-factor': 1},
self.streams_sink_topic_1: {'partitions': 1,
'replication-factor': 1},
self.streams_sink_topic_2: {'partitions': 1,
'replication-factor': 1}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lines were too long.

})

def test_standby_tasks_rebalance(self):

configs = self.get_configs(",sourceTopic=%s,sinkTopic1=%s,sinkTopic2=%s" % (self.streams_source_topic,
self.streams_sink_topic_1,
self.streams_sink_topic_2))
Expand Down Expand Up @@ -108,13 +110,85 @@ def test_standby_tasks_rebalance(self):
self.wait_for_verification(processor_3, "ACTIVE_TASKS:2 STANDBY_TASKS:2", processor_3.STDOUT_FILE)
self.wait_for_verification(processor_2, "ACTIVE_TASKS:2 STANDBY_TASKS:2", processor_2.STDOUT_FILE, num_lines=2)

self.assert_consume(self.client_id, "assert all messages consumed from %s" % self.streams_sink_topic_1, self.streams_sink_topic_1, self.num_messages)
self.assert_consume(self.client_id, "assert all messages consumed from %s" % self.streams_sink_topic_2, self.streams_sink_topic_2, self.num_messages)
self.assert_consume(self.client_id, "assert all messages consumed from %s" % self.streams_sink_topic_1,
self.streams_sink_topic_1, self.num_messages)
self.assert_consume(self.client_id, "assert all messages consumed from %s" % self.streams_sink_topic_2,
self.streams_sink_topic_2, self.num_messages)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lines were too long


wait_until(lambda: producer.num_acked >= self.num_messages,
timeout_sec=60,
err_msg="Failed to send all %s messages" % str(self.num_messages))

producer.stop()

processor_1.stop()
processor_2.stop()
processor_3.stop()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no reason not to stop the processors at this point.


# Validate the checkpoint/restore logs for monotonicity
# This was added to ensure that standby restoration resumes from the checkpoint
# rather than the beginning of the changelog, as part of KAFKA-9169
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment should explain the new verification I've added.


# First, process the logs to look for invariant violations
processor_1.node.account.ssh(validateMonotonicCheckpointsCmd(processor_1.LOG_FILE, processor_1.STDOUT_FILE))
processor_2.node.account.ssh(validateMonotonicCheckpointsCmd(processor_2.LOG_FILE, processor_2.STDOUT_FILE))
processor_3.node.account.ssh(validateMonotonicCheckpointsCmd(processor_3.LOG_FILE, processor_3.STDOUT_FILE))

# Second, check to make sure no invariant violations were reported
processor_1.node.account.ssh("! grep ERROR " + processor_1.STDOUT_FILE, allow_fail=False)
processor_2.node.account.ssh("! grep ERROR " + processor_2.STDOUT_FILE, allow_fail=False)
processor_3.node.account.ssh("! grep ERROR " + processor_3.STDOUT_FILE, allow_fail=False)


def validateMonotonicCheckpointsCmd(log_file, stdout_file):
"""
Enforces an invariant that, if we look at the offsets written to
checkpoint files and offsets used to resume the restore consumer,
for a given topic/partition, we should always observe the offsets
to be non-decreasing.
Note that this specifically would not hold for EOS in an unclean
shutdown, but outside of that, we should be able to rely on it.
"""
# This script gets turned into a one-liner and executed over SSH
#
# The idea here is to parse the logs and enforce an invariant. This
# should be resilient against meaningless variations in what tasks
# exactly get assigned to which instances and other factors that could
# make this test flaky.
#
# A quick overview, which should make this easier to read:
# PK is prior key
# PV is prior value
#
# 1. Extract only the relevant lines from the log (grep)
# 2. The log lines contain a map of topic-partition -> offset,
# so the role of the sed expressions is to extract those map values
# onto one k/v pair per line.
# 3. The sort is a _stable_ sort on the key, which puts all the
# events for a key together, while preserving their relative order
# 4. Now, we use K (key), V (value), PK, and PV to make sure that
# the offset (V) for each topic-partition (K) is non-decreasing.
# 5. If this invariant is violated, log an ERROR (so we can check for it later)

return "PK=''; " \
"PV=-999; " \
"cat %s " \
"| grep 'Assigning and seeking\|Writing checkpoint' " \
"| sed -e 's/.*{\(.*\)}.*/\\1/' -e 's/, /\\n/g' -e 's/=/\\t/g' " \
"| sort --key=1,1 --stable " \
"| while read LINE; do" \
" if [[ ${LINE} ]]; then" \
" K=$(cut -f1 <<< ${LINE});" \
" V=$(cut -f2 <<< ${LINE});" \
" if [[ ${K} != ${PK} ]]; then" \
" PK=${K};" \
" PV=${V};" \
" echo \"INFO: First occurrence of ${K}; set PV=${V}.\";" \
" elif [[ ${V} -lt ${PV} ]]; then" \
" echo \"ERROR: ${K} offset regressed from ${PV} to ${V}.\"; " \
" else" \
" PV=${V};" \
" echo \"INFO: Updated ${K} to ${V}.\";" \
" fi;" \
" fi;" \
" done >> %s" % (log_file, stdout_file)