diff --git a/tests/docker/ducktape-deps/kgo-verifier b/tests/docker/ducktape-deps/kgo-verifier index e8b31cf8165f..2f67f5c0d527 100644 --- a/tests/docker/ducktape-deps/kgo-verifier +++ b/tests/docker/ducktape-deps/kgo-verifier @@ -2,6 +2,6 @@ set -e git -C /opt clone https://github.com/redpanda-data/kgo-verifier.git cd /opt/kgo-verifier -git reset --hard bb6953c81662237c9a8fb42ee90cc870df258907 +git reset --hard 8f4fdb77f2c6173d8e1b7020c9899601a441d0d6 go mod tidy make diff --git a/tests/rptest/services/kgo_verifier_services.py b/tests/rptest/services/kgo_verifier_services.py index 75b02fa0e8bc..fb2dc183d6f3 100644 --- a/tests/rptest/services/kgo_verifier_services.py +++ b/tests/rptest/services/kgo_verifier_services.py @@ -14,12 +14,14 @@ import signal import threading import requests -from typing import Any, Optional +from typing import Any, List, Optional from ducktape.services.service import Service from ducktape.utils.util import wait_until from ducktape.cluster.remoteaccount import RemoteCommandError +from rptest.services.redpanda import RedpandaService + # Install location, specified by Dockerfile or AMI TESTS_DIR = os.path.join("/opt", "kgo-verifier") @@ -60,7 +62,7 @@ def __init__(self, assert not self.nodes self.nodes = custom_node - self._redpanda = redpanda + self._redpanda: RedpandaService = redpanda self._topic = topic self._msg_size = msg_size self._pid = None @@ -448,7 +450,9 @@ class ValidatorStatus: differ per-worker, although at time of writing they don't. """ def __init__(self, name: str, valid_reads: int, invalid_reads: int, - out_of_scope_invalid_reads: int, max_offsets_consumed: int): + out_of_scope_invalid_reads: int, + max_offsets_consumed: Optional[int], + lost_offsets: Optional[List[int]]): # Validator name is just a unique name per worker thread in kgo-verifier: useful in logging # but we mostly don't care self.name = name @@ -457,6 +461,7 @@ def __init__(self, name: str, valid_reads: int, invalid_reads: int, self.invalid_reads = invalid_reads self.out_of_scope_invalid_reads = out_of_scope_invalid_reads self.max_offsets_consumed = max_offsets_consumed + self.lost_offsets = lost_offsets @property def total_reads(self): @@ -468,17 +473,25 @@ def merge(self, rhs: ValidatorStatus): # Clear name if we are merging multiple statuses together, to avoid confusion. self.name = "" + # Clear other fields we aren't interested in, to avoid confusion. + self.max_offsets_consumed = None + self.lost_offsets = None + self.valid_reads += rhs.valid_reads self.invalid_reads += rhs.invalid_reads self.out_of_scope_invalid_reads += rhs.out_of_scope_invalid_reads def __str__(self): - return f"ValidatorStatus<{self.valid_reads} {self.invalid_reads} {self.out_of_scope_invalid_reads}>" + return f"ValidatorStatus<" \ + f"valid_reads={self.valid_reads}, " \ + f"invalid_reads={self.invalid_reads}, " \ + f"out_of_scope_invalid_reads={self.out_of_scope_invalid_reads}, " \ + f"lost_offsets={self.lost_offsets}>" class ConsumerStatus: def __init__(self, - topic: str = None, + topic: Optional[str] = None, validator: dict[str, Any] | None = None, errors: int = 0, active: bool = True): @@ -493,7 +506,8 @@ def __init__(self, 'invalid_reads': 0, 'out_of_scope_invalid_reads': 0, 'name': "", - 'max_offsets_consumed': dict() + 'max_offsets_consumed': dict(), + 'lost_offsets': dict() } self.validator = ValidatorStatus(**validator) @@ -530,7 +544,9 @@ def __init__(self, username=None, password=None, enable_tls=False, - msgs_per_producer_id=None): + msgs_per_producer_id=None, + max_buffered_records=None, + tolerate_data_loss=False): super(KgoVerifierProducer, self).__init__(context, redpanda, topic, msg_size, custom_node, debug_logs, trace_logs, username, password, @@ -546,6 +562,8 @@ def __init__(self, self._rate_limit_bps = rate_limit_bps self._key_set_cardinality = key_set_cardinality self._msgs_per_producer_id = msgs_per_producer_id + self._max_buffered_records = max_buffered_records + self._tolerate_data_loss = tolerate_data_loss @property def produce_status(self): @@ -638,6 +656,13 @@ def start_node(self, node, clean=False): cmd += f" --key-set-cardinality {self._key_set_cardinality}" if self._msgs_per_producer_id is not None: cmd += f" --msgs-per-producer-id {self._msgs_per_producer_id}" + + if self._max_buffered_records is not None: + cmd += f" --max-buffered-records {self._max_buffered_records}" + + if self._tolerate_data_loss: + cmd += " --tolerate-data-loss" + self.spawn(cmd, node) self._status_thread = StatusThread(self, node, ProduceStatus) @@ -657,6 +682,8 @@ def __init__( debug_logs=False, trace_logs=False, loop=True, + continuous=False, + tolerate_data_loss=False, producer: Optional[KgoVerifierProducer] = None, username: Optional[str] = None, password: Optional[str] = None, @@ -669,6 +696,8 @@ def __init__( self._max_throughput_mb = max_throughput_mb self._status = ConsumerStatus() self._loop = loop + self._continuous = continuous + self._tolerate_data_loss = tolerate_data_loss self._producer = producer @property @@ -691,6 +720,10 @@ def start_node(self, node, clean=False): cmd += f" --seq_read_msgs {self._max_msgs}" if self._max_throughput_mb is not None: cmd += f" --consume-throughput-mb {self._max_throughput_mb}" + if self._continuous: + cmd += " --continuous" + if self._tolerate_data_loss: + cmd += " --tolerate-data-loss" self.spawn(cmd, node) self._status_thread = StatusThread(self, node, ConsumerStatus) @@ -784,7 +817,10 @@ def __init__(self, trace_logs=False, username=None, password=None, - enable_tls=False): + enable_tls=False, + continuous=False, + tolerate_data_loss=False, + group_name=None): super().__init__(context, redpanda, topic, msg_size, nodes, debug_logs, trace_logs, username, password, enable_tls) @@ -792,6 +828,9 @@ def __init__(self, self._loop = loop self._max_msgs = max_msgs self._max_throughput_mb = max_throughput_mb + self._group_name = group_name + self._continuous = continuous + self._tolerate_data_loss = tolerate_data_loss self._status = ConsumerStatus() @property @@ -815,6 +854,12 @@ def start_node(self, node, clean=False): cmd += f" --seq_read_msgs {self._max_msgs}" if self._max_throughput_mb is not None: cmd += f" --consume-throughput-mb {self._max_throughput_mb}" + if self._continuous: + cmd += " --continuous" + if self._tolerate_data_loss: + cmd += " --tolerate-data-loss" + if self._group_name is not None: + cmd += f" --consumer_group_name {self._group_name}" self.spawn(cmd, node) self._status_thread = StatusThread(self, node, ConsumerStatus)