Skip to content

Commit

Permalink
Merge pull request redpanda-data#17588 from nvartolomei/nv/kgo-bump-a…
Browse files Browse the repository at this point in the history
…pr-3

rptest: upgrade kgo
  • Loading branch information
nvartolomei authored Apr 5, 2024
2 parents 94df335 + 8d9d677 commit 30096ce
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 9 deletions.
2 changes: 1 addition & 1 deletion tests/docker/ducktape-deps/kgo-verifier
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
set -e
git -C /opt clone https://github.com/redpanda-data/kgo-verifier.git
cd /opt/kgo-verifier
git reset --hard bb6953c81662237c9a8fb42ee90cc870df258907
git reset --hard 8f4fdb77f2c6173d8e1b7020c9899601a441d0d6
go mod tidy
make
61 changes: 53 additions & 8 deletions tests/rptest/services/kgo_verifier_services.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@
import signal
import threading
import requests
from typing import Any, Optional
from typing import Any, List, Optional

from ducktape.services.service import Service
from ducktape.utils.util import wait_until
from ducktape.cluster.remoteaccount import RemoteCommandError

from rptest.services.redpanda import RedpandaService

# Install location, specified by Dockerfile or AMI
TESTS_DIR = os.path.join("/opt", "kgo-verifier")

Expand Down Expand Up @@ -60,7 +62,7 @@ def __init__(self,
assert not self.nodes
self.nodes = custom_node

self._redpanda = redpanda
self._redpanda: RedpandaService = redpanda
self._topic = topic
self._msg_size = msg_size
self._pid = None
Expand Down Expand Up @@ -448,7 +450,9 @@ class ValidatorStatus:
differ per-worker, although at time of writing they don't.
"""
def __init__(self, name: str, valid_reads: int, invalid_reads: int,
out_of_scope_invalid_reads: int, max_offsets_consumed: int):
out_of_scope_invalid_reads: int,
max_offsets_consumed: Optional[int],
lost_offsets: Optional[List[int]]):
# Validator name is just a unique name per worker thread in kgo-verifier: useful in logging
# but we mostly don't care
self.name = name
Expand All @@ -457,6 +461,7 @@ def __init__(self, name: str, valid_reads: int, invalid_reads: int,
self.invalid_reads = invalid_reads
self.out_of_scope_invalid_reads = out_of_scope_invalid_reads
self.max_offsets_consumed = max_offsets_consumed
self.lost_offsets = lost_offsets

@property
def total_reads(self):
Expand All @@ -468,17 +473,25 @@ def merge(self, rhs: ValidatorStatus):
# Clear name if we are merging multiple statuses together, to avoid confusion.
self.name = ""

# Clear other fields we aren't interested in, to avoid confusion.
self.max_offsets_consumed = None
self.lost_offsets = None

self.valid_reads += rhs.valid_reads
self.invalid_reads += rhs.invalid_reads
self.out_of_scope_invalid_reads += rhs.out_of_scope_invalid_reads

def __str__(self):
return f"ValidatorStatus<{self.valid_reads} {self.invalid_reads} {self.out_of_scope_invalid_reads}>"
return f"ValidatorStatus<" \
f"valid_reads={self.valid_reads}, " \
f"invalid_reads={self.invalid_reads}, " \
f"out_of_scope_invalid_reads={self.out_of_scope_invalid_reads}, " \
f"lost_offsets={self.lost_offsets}>"


class ConsumerStatus:
def __init__(self,
topic: str = None,
topic: Optional[str] = None,
validator: dict[str, Any] | None = None,
errors: int = 0,
active: bool = True):
Expand All @@ -493,7 +506,8 @@ def __init__(self,
'invalid_reads': 0,
'out_of_scope_invalid_reads': 0,
'name': "",
'max_offsets_consumed': dict()
'max_offsets_consumed': dict(),
'lost_offsets': dict()
}

self.validator = ValidatorStatus(**validator)
Expand Down Expand Up @@ -530,7 +544,9 @@ def __init__(self,
username=None,
password=None,
enable_tls=False,
msgs_per_producer_id=None):
msgs_per_producer_id=None,
max_buffered_records=None,
tolerate_data_loss=False):
super(KgoVerifierProducer,
self).__init__(context, redpanda, topic, msg_size, custom_node,
debug_logs, trace_logs, username, password,
Expand All @@ -546,6 +562,8 @@ def __init__(self,
self._rate_limit_bps = rate_limit_bps
self._key_set_cardinality = key_set_cardinality
self._msgs_per_producer_id = msgs_per_producer_id
self._max_buffered_records = max_buffered_records
self._tolerate_data_loss = tolerate_data_loss

@property
def produce_status(self):
Expand Down Expand Up @@ -638,6 +656,13 @@ def start_node(self, node, clean=False):
cmd += f" --key-set-cardinality {self._key_set_cardinality}"
if self._msgs_per_producer_id is not None:
cmd += f" --msgs-per-producer-id {self._msgs_per_producer_id}"

if self._max_buffered_records is not None:
cmd += f" --max-buffered-records {self._max_buffered_records}"

if self._tolerate_data_loss:
cmd += " --tolerate-data-loss"

self.spawn(cmd, node)

self._status_thread = StatusThread(self, node, ProduceStatus)
Expand All @@ -657,6 +682,8 @@ def __init__(
debug_logs=False,
trace_logs=False,
loop=True,
continuous=False,
tolerate_data_loss=False,
producer: Optional[KgoVerifierProducer] = None,
username: Optional[str] = None,
password: Optional[str] = None,
Expand All @@ -669,6 +696,8 @@ def __init__(
self._max_throughput_mb = max_throughput_mb
self._status = ConsumerStatus()
self._loop = loop
self._continuous = continuous
self._tolerate_data_loss = tolerate_data_loss
self._producer = producer

@property
Expand All @@ -691,6 +720,10 @@ def start_node(self, node, clean=False):
cmd += f" --seq_read_msgs {self._max_msgs}"
if self._max_throughput_mb is not None:
cmd += f" --consume-throughput-mb {self._max_throughput_mb}"
if self._continuous:
cmd += " --continuous"
if self._tolerate_data_loss:
cmd += " --tolerate-data-loss"
self.spawn(cmd, node)

self._status_thread = StatusThread(self, node, ConsumerStatus)
Expand Down Expand Up @@ -784,14 +817,20 @@ def __init__(self,
trace_logs=False,
username=None,
password=None,
enable_tls=False):
enable_tls=False,
continuous=False,
tolerate_data_loss=False,
group_name=None):
super().__init__(context, redpanda, topic, msg_size, nodes, debug_logs,
trace_logs, username, password, enable_tls)

self._readers = readers
self._loop = loop
self._max_msgs = max_msgs
self._max_throughput_mb = max_throughput_mb
self._group_name = group_name
self._continuous = continuous
self._tolerate_data_loss = tolerate_data_loss
self._status = ConsumerStatus()

@property
Expand All @@ -815,6 +854,12 @@ def start_node(self, node, clean=False):
cmd += f" --seq_read_msgs {self._max_msgs}"
if self._max_throughput_mb is not None:
cmd += f" --consume-throughput-mb {self._max_throughput_mb}"
if self._continuous:
cmd += " --continuous"
if self._tolerate_data_loss:
cmd += " --tolerate-data-loss"
if self._group_name is not None:
cmd += f" --consumer_group_name {self._group_name}"
self.spawn(cmd, node)

self._status_thread = StatusThread(self, node, ConsumerStatus)
Expand Down

0 comments on commit 30096ce

Please sign in to comment.