Skip to content

Commit

Permalink
MINOR: Tolerate limited data loss for upgrade tests with old message …
Browse files Browse the repository at this point in the history
…format (apache#7102)

To avoid transient system test failures, tolerate a small amount of data loss due to truncation in upgrade system tests using older message format prior to KIP-101, where data loss was possible.

Reviewers: Ismael Juma <ismael@juma.me.uk>
  • Loading branch information
rajinisivaram authored Jul 31, 2019
1 parent 73ed9ea commit de8ce78
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 6 deletions.
6 changes: 5 additions & 1 deletion tests/kafkatest/tests/core/upgrade_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
from kafkatest.utils import is_int
from kafkatest.version import LATEST_0_8_2, LATEST_0_9, LATEST_0_10, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, V_0_9_0_0, DEV_BRANCH, KafkaVersion
from kafkatest.version import LATEST_0_8_2, LATEST_0_9, LATEST_0_10, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, V_0_9_0_0, V_0_11_0_0, DEV_BRANCH, KafkaVersion

class TestUpgrade(ProduceConsumeValidateTest):

Expand Down Expand Up @@ -129,6 +129,10 @@ def test_upgrade(self, from_kafka_version, to_message_format_version, compressio
if from_kafka_version <= LATEST_0_10_0:
assert self.kafka.cluster_id() is None

# With older message formats before KIP-101, message loss may occur due to truncation
# after leader change. Tolerate limited data loss for this case to avoid transient test failures.
self.may_truncate_acked_records = False if from_kafka_version >= V_0_11_0_0 else True

new_consumer = from_kafka_version >= V_0_9_0_0
# TODO - reduce the timeout
self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka,
Expand Down
6 changes: 5 additions & 1 deletion tests/kafkatest/tests/produce_consume_validate.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ def __init__(self, test_context):
self.consumer_init_timeout_sec = 0
self.enable_idempotence = False

# Allow tests to tolerate some data loss by overriding this for tests using older message formats
self.may_truncate_acked_records = False

def start_producer_and_consumer(self):
# Start background producer and consumer
self.consumer.start()
Expand Down Expand Up @@ -125,7 +128,8 @@ def check_lost_data(missing_records):
return self.kafka.search_data_files(self.topic, missing_records)

succeeded, error_msg = validate_delivery(self.producer.acked, messages_consumed,
self.enable_idempotence, check_lost_data)
self.enable_idempotence, check_lost_data,
self.may_truncate_acked_records)

# Collect all logs if validation fails
if not succeeded:
Expand Down
22 changes: 18 additions & 4 deletions tests/kafkatest/utils/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

from kafkatest import __version__ as __kafkatest_version__

import math
import re
import time

Expand Down Expand Up @@ -137,7 +138,7 @@ def annotate_data_lost(data_lost, msg, number_validated):
"This suggests they were lost on their way to the consumer." % number_validated
return msg

def validate_delivery(acked, consumed, idempotence_enabled=False, check_lost_data=None):
def validate_delivery(acked, consumed, idempotence_enabled=False, check_lost_data=None, may_truncate_acked_records=False):
"""Check that each acked message was consumed."""
success = True
msg = ""
Expand All @@ -149,13 +150,26 @@ def validate_delivery(acked, consumed, idempotence_enabled=False, check_lost_dat
# Were all acked messages consumed?
if len(missing) > 0:
msg = annotate_missing_msgs(missing, acked, consumed, msg)
success = False

# Did we miss anything due to data loss?
if check_lost_data:
to_validate = list(missing)[0:1000 if len(missing) > 1000 else len(missing)]
max_truncate_count = 100 if may_truncate_acked_records else 0
max_validate_count = max(1000, max_truncate_count)

to_validate = list(missing)[0:min(len(missing), max_validate_count)]
data_lost = check_lost_data(to_validate)
msg = annotate_data_lost(data_lost, msg, len(to_validate))

# With older versions of message format before KIP-101, data loss could occur due to truncation.
# These records won't be in the data logs. Tolerate limited data loss for this case.
if len(missing) < max_truncate_count and len(data_lost) == len(missing):
msg += "The %s missing messages were not present in Kafka's data files. This suggests data loss " \
"due to truncation, which is possible with older message formats and hence are ignored " \
"by this test. The messages lost: %s\n" % (len(data_lost), str(data_lost))
else:
msg = annotate_data_lost(data_lost, msg, len(to_validate))
success = False
else:
success = False

# Are there duplicates?
if len(set(consumed)) != len(consumed):
Expand Down

0 comments on commit de8ce78

Please sign in to comment.