Skip to content

fix(e2e): cherry pick fix e2e 1.5 #2616

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Jun 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -969,8 +969,8 @@ public static void main(final String[] args) {
switch (mode) {
case "driver": {
// this starts the driver (data generation and result verification)
final int numArticles = 1_000;
final int numComments = 10_000;
final int numArticles = 1_00;
final int numComments = 10_0;
final DataSet dataSet = DataSet.generate(numArticles, numComments);
// publish the data for at least one minute
dataSet.produce(kafka, Duration.ofMinutes(1));
Expand Down
8 changes: 4 additions & 4 deletions tests/kafkatest/automq/compaction_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def __init__(self, test_context):
self.s3_wal_upload_threshold = 50 * 1024
self.automq_stream_object_compaction_jitter_max_delay_minute = 1

def create_kafka(self, num_nodes=1, partition=1, broker_wal='file', env=None):
def create_kafka(self, num_nodes=1, partition=1, broker_wal='s3', env=None):
"""
Create and configure Kafka service.

Expand Down Expand Up @@ -84,8 +84,8 @@ def create_kafka(self, num_nodes=1, partition=1, broker_wal='file', env=None):

@cluster(num_nodes=4)
@matrix(stream_set_object_compaction=[True, False],
stream_object_compaction_type=[STREAM_OBJECT_COMPACTION_TYPE_MINOR_V1, STREAM_OBJECT_COMPACTION_TYPE_MAJOR_V1], wal=['file', 's3'])
@matrix(stream_set_object_compaction=[True], stream_object_compaction_type=['None'], wal=['file', 's3'])
stream_object_compaction_type=[STREAM_OBJECT_COMPACTION_TYPE_MINOR_V1, STREAM_OBJECT_COMPACTION_TYPE_MAJOR_V1], wal=['s3'])
@matrix(stream_set_object_compaction=[True], stream_object_compaction_type=['None'], wal=['s3'])
def test_case(self, stream_set_object_compaction, stream_object_compaction_type, wal):
'''

Expand All @@ -97,7 +97,7 @@ def test_case(self, stream_set_object_compaction, stream_object_compaction_type,
self.run0(stream_set_object_compaction, stream_object_compaction_type, wal)

def run0(self, stream_set_object_compaction=False,
stream_object_compaction_type=STREAM_OBJECT_COMPACTION_TYPE_MINOR_V1, wal=FILE_WAL):
stream_object_compaction_type=STREAM_OBJECT_COMPACTION_TYPE_MINOR_V1, wal=S3_WAL):
"""
Run the test with specified compaction type.

Expand Down
4 changes: 2 additions & 2 deletions tests/kafkatest/automq/memory_occupancy_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def __init__(self, test_context):
self.consume_group = 'test_group'
self.records_consumed = []

def create_kafka(self, num_nodes=1, partition=None, log_size=None, block_size=None, wal='file', **kwargs):
def create_kafka(self, num_nodes=1, partition=None, log_size=None, block_size=None, wal='s3', **kwargs):
"""
Create and configure Kafka service.

Expand Down Expand Up @@ -96,7 +96,7 @@ def check_the_consumption_quantity(self, records):
assert int(receive_num) == records, f"Receive count does not match the expected records count: expected {records}, but got {receive_num}"

@cluster(num_nodes=3)
@matrix(partition=[128, 512], log_size=[256 * 1024 * 1024], block_size=[128 * 1024 * 1024, 256 * 1024 * 1024], wal=['file', 's3'])
@matrix(partition=[128, 512], log_size=[256 * 1024 * 1024], block_size=[128 * 1024 * 1024, 256 * 1024 * 1024], wal=['s3'])
def test(self, partition, log_size, block_size, wal):
"""
At any time, 1/writable record in Metric<=log cache size+100MB
Expand Down
2 changes: 1 addition & 1 deletion tests/kafkatest/automq/quota_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def start_console_consumer(self):
assert len(messages) > 0, "consumer %d didn't consume any message before timeout" % idx

@cluster(num_nodes=5)
@matrix(broker_in=[2500000], broker_out=[2000000], wal=['file', 's3'])
@matrix(broker_in=[2500000], broker_out=[2000000], wal=['s3'])
def test_quota(self, broker_in, broker_out, wal):
self.create_kafka(self.test_context, broker_in, broker_out, wal)
self.kafka.start()
Expand Down
6 changes: 3 additions & 3 deletions tests/kafkatest/automq/s3_leakage_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def __init__(self, test_context):
self.s3_wal_upload_threshold = 16 * 1024 * 1024
self.automq_stream_object_compaction_jitter_max_delay = 1

def create_kafka(self, num_nodes=1, partition=1, broker_wal='file', env=None):
def create_kafka(self, num_nodes=1, partition=1, broker_wal='s3', env=None):
"""
Create and configure Kafka service.

Expand Down Expand Up @@ -143,7 +143,7 @@ def run0(self, stream_object_compaction_type, wal, env=None):
)

@cluster(num_nodes=2)
@matrix(wal=['file', 's3'])
@matrix(wal=['s3'])
def test_s3_leak_major_v1(self, wal):
"""
Test S3 leak with major V1 compaction.
Expand All @@ -153,7 +153,7 @@ def test_s3_leak_major_v1(self, wal):
self.run0(stream_object_compaction_type=STREAM_OBJECT_COMPACTION_TYPE_MAJOR_V1, wal=wal, env=[f'AUTOMQ_STREAM_COMPACTION_MINOR_V1_COMPACTION_SIZE_THRESHOLD=0'])

@cluster(num_nodes=2)
@matrix(wal=['file', 's3'])
@matrix(wal=['s3'])
def test_s3_leak_minor_v1(self, wal):
"""
Test S3 leak with minor V1 compaction.
Expand Down
3 changes: 2 additions & 1 deletion tests/kafkatest/benchmarks/core/benchmark_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from ducktape.mark import matrix
from ducktape.mark import matrix, ignore
from ducktape.mark import parametrize
from ducktape.mark.resource import cluster
from ducktape.services.service import Service
Expand Down Expand Up @@ -158,6 +158,7 @@ def test_long_term_producer_throughput(self, compression_type="none",
self.logger.info("\n".join(summary))
return data

@ignore
@cluster(num_nodes=5)
@matrix(security_protocol=['SSL'], interbroker_security_protocol=['PLAINTEXT'], tls_version=['TLSv1.2', 'TLSv1.3'], compression_type=["none", "snappy"])
@matrix(security_protocol=['PLAINTEXT'], compression_type=["none", "snappy"])
Expand Down
4 changes: 2 additions & 2 deletions tests/kafkatest/sanity_checks/test_performance_services.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from ducktape.mark import matrix, parametrize
from ducktape.mark import matrix, parametrize, ignore
from ducktape.mark.resource import cluster
from ducktape.tests.test import Test

Expand All @@ -36,7 +36,7 @@ def __init__(self, test_context):
def setUp(self):
if self.zk:
self.zk.start()

@ignore #test performance send and receive in autoMQ is meaningless
@cluster(num_nodes=5)
# We are keeping 0.8.2 here so that we don't inadvertently break support for it. Since this is just a sanity check,
# the overhead should be manageable.
Expand Down
2 changes: 1 addition & 1 deletion tests/kafkatest/services/log_compaction_tester.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def start_cmd(self, node):
cmd += " export CLASSPATH;"
cmd += self.path.script("kafka-run-class.sh", node)
cmd += " %s" % self.java_class_name()
cmd += " --bootstrap-server %s --messages 1000000 --sleep 20 --duplicates 10 --percent-deletes 10" % (self.kafka.bootstrap_servers(self.security_protocol))
cmd += " --bootstrap-server %s --messages 1000 --sleep 20 --duplicates 10 --percent-deletes 10" % (self.kafka.bootstrap_servers(self.security_protocol))

cmd += " 2>> %s | tee -a %s &" % (self.logs["tool_logs"]["path"], self.logs["tool_logs"]["path"])
return cmd
Expand Down
15 changes: 5 additions & 10 deletions tests/kafkatest/tests/connect/connect_distributed_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,12 +368,7 @@ def test_pause_and_resume_source(self, exactly_once_source, connect_protocol, me
@matrix(
connect_protocol=['sessioned', 'compatible', 'eager'],
metadata_quorum=[quorum.isolated_kraft],
use_new_coordinator=[False]
)
@matrix(
connect_protocol=['sessioned', 'compatible', 'eager'],
metadata_quorum=[quorum.isolated_kraft],
use_new_coordinator=[True],
use_new_coordinator=[True, False],
group_protocol=consumer_group.all_group_protocols
)
def test_pause_and_resume_sink(self, connect_protocol, metadata_quorum, use_new_coordinator=False, group_protocol=None):
Expand Down Expand Up @@ -419,7 +414,7 @@ def test_pause_and_resume_sink(self, connect_protocol, metadata_quorum, use_new_
err_msg="Failed to see connector transition to the RUNNING state")

# after resuming, we should see records consumed again
wait_until(lambda: len(self.sink.received_messages()) > num_messages, timeout_sec=40,
wait_until(lambda: len(self.sink.received_messages()) > num_messages, timeout_sec=300,
err_msg="Failed to consume messages after resuming sink connector")

@cluster(num_nodes=5)
Expand Down Expand Up @@ -649,7 +644,7 @@ def _wait_for_loggers(self, level, request_time, namespace, workers=None):
)
@matrix(
security_protocol=[SecurityConfig.PLAINTEXT, SecurityConfig.SASL_SSL],
exactly_once_source=[True, False],
exactly_once_source=[True, False],
connect_protocol=['sessioned', 'compatible', 'eager'],
metadata_quorum=[quorum.isolated_kraft],
use_new_coordinator=[True],
Expand Down Expand Up @@ -680,15 +675,15 @@ def test_file_source_and_sink(self, security_protocol, exactly_once_source, conn
# do rebalancing of the group, etc, and b) without explicit leave group support, rebalancing takes awhile
for node in self.cc.nodes:
node.account.ssh("echo -e -n " + repr(self.FIRST_INPUTS) + " >> " + self.INPUT_FILE)
wait_until(lambda: self._validate_file_output(self.FIRST_INPUT_LIST), timeout_sec=90, err_msg="Data added to input file was not seen in the output file in a reasonable amount of time.")
wait_until(lambda: self._validate_file_output(self.FIRST_INPUT_LIST), timeout_sec=300, err_msg="Data added to input file was not seen in the output file in a reasonable amount of time.")

# Restarting both should result in them picking up where they left off,
# only processing new data.
self.cc.restart()

for node in self.cc.nodes:
node.account.ssh("echo -e -n " + repr(self.SECOND_INPUTS) + " >> " + self.INPUT_FILE)
wait_until(lambda: self._validate_file_output(self.FIRST_INPUT_LIST + self.SECOND_INPUT_LIST), timeout_sec=150, err_msg="Sink output file never converged to the same state as the input file")
wait_until(lambda: self._validate_file_output(self.FIRST_INPUT_LIST + self.SECOND_INPUT_LIST), timeout_sec=300, err_msg="Sink output file never converged to the same state as the input file")

@cluster(num_nodes=6)
@matrix(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def __init__(self, test_context):
self.num_input_partitions = 9
self.num_output_partitions = 9
self.num_copiers = 3
self.num_seed_messages = 100000
self.num_seed_messages = 1000
self.transaction_size = 750
# The transaction timeout should be lower than the progress timeout, but at
# least as high as the request timeout (which is 30s by default). When the
Expand Down
10 changes: 7 additions & 3 deletions tests/kafkatest/tests/core/reassign_partitions_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def __init__(self, test_context):
},
controller_num_nodes_override=self.num_zk)
self.timeout_sec = 60
self.producer_throughput = 1000
self.producer_throughput = 10000
self.num_producers = 1
self.num_consumers = 1

Expand Down Expand Up @@ -94,6 +94,10 @@ def reassign_partitions(self, bounce_brokers):
partition_info["partitions"][i]["partition"] = shuffled_list[i]
self.logger.debug("Jumbled partitions: " + str(partition_info))

acked_partitions = self.producer.acked_by_partition
for partition in acked_partitions:
self.logger.debug("Partition acked " + str(partition))

def check_all_partitions():
acked_partitions = self.producer.acked_by_partition
for i in range(self.num_partitions):
Expand All @@ -103,8 +107,8 @@ def check_all_partitions():

# ensure all partitions have data so we don't hit OutOfOrderExceptions due to broker restarts
wait_until(check_all_partitions,
timeout_sec=60,
err_msg="Failed to produce to all partitions in 30s")
timeout_sec=120,
err_msg="Failed to produce to all partitions in 120s")

# send reassign partitions command
self.kafka.execute_reassign_partitions(partition_info)
Expand Down
8 changes: 4 additions & 4 deletions tests/kafkatest/tests/core/replica_scale_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,10 @@ def test_produce_consume(self, topic_count, partition_count, replication_factor,
produce_spec = ProduceBenchWorkloadSpec(0, TaskSpec.MAX_DURATION_MS,
producer_workload_service.producer_node,
producer_workload_service.bootstrap_servers,
target_messages_per_sec=150000,
target_messages_per_sec=1500,
# optimize multiple partition read
# max_messages=3400000,
max_messages=1700000,
max_messages=17000,
producer_conf={},
admin_client_conf={},
common_client_conf={},
Expand All @@ -117,8 +117,8 @@ def test_produce_consume(self, topic_count, partition_count, replication_factor,
consume_spec = ConsumeBenchWorkloadSpec(0, TaskSpec.MAX_DURATION_MS,
consumer_workload_service.consumer_node,
consumer_workload_service.bootstrap_servers,
target_messages_per_sec=150000,
max_messages=1700000,
target_messages_per_sec=1500,
max_messages=17000,
consumer_conf=consumer_conf,
admin_client_conf={},
common_client_conf={},
Expand Down
4 changes: 2 additions & 2 deletions tests/kafkatest/tests/core/round_trip_fault_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ def __init__(self, test_context):
self.round_trip_spec = RoundTripWorkloadSpec(0, TaskSpec.MAX_DURATION_MS,
self.workload_service.client_node,
self.workload_service.bootstrap_servers,
target_messages_per_sec=10000,
max_messages=100000,
target_messages_per_sec=1000,
max_messages=10000,
active_topics=active_topics)

def setUp(self):
Expand Down
20 changes: 2 additions & 18 deletions tests/kafkatest/tests/core/transactions_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def __init__(self, test_context):
# Test parameters
self.num_input_partitions = 2
self.num_output_partitions = 3
self.num_seed_messages = 100000
self.num_seed_messages = 10000
self.transaction_size = 750

# The transaction timeout should be lower than the progress timeout, but at
Expand Down Expand Up @@ -189,7 +189,7 @@ def copy_messages_transactionally(self, failure_mode, bounce_target,
elif bounce_target == "clients":
self.bounce_copiers(copiers, clean_shutdown)

copier_timeout_sec = 120
copier_timeout_sec = 600
for copier in copiers:
wait_until(lambda: copier.is_done,
timeout_sec=copier_timeout_sec,
Expand Down Expand Up @@ -218,22 +218,6 @@ def setup_topics(self):
}

@cluster(num_nodes=9)
# @matrix(
# failure_mode=["hard_bounce", "clean_bounce"],
# bounce_target=["brokers", "clients"],
# check_order=[True, False],
# use_group_metadata=[True, False],
# metadata_quorum=[quorum.zk],
# use_new_coordinator=[False]
# )
@matrix(
failure_mode=["hard_bounce", "clean_bounce"],
bounce_target=["brokers", "clients"],
check_order=[True, False],
use_group_metadata=[True, False],
metadata_quorum=quorum.all_kraft,
use_new_coordinator=[False]
)
@matrix(
failure_mode=["hard_bounce", "clean_bounce"],
bounce_target=["brokers", "clients"],
Expand Down
6 changes: 3 additions & 3 deletions tests/kafkatest/tests/produce_consume_validate.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ def __init__(self, test_context):
super(ProduceConsumeValidateTest, self).__init__(test_context=test_context)
# How long to wait for the producer to declare itself healthy? This can
# be overidden by inheriting classes.
self.producer_start_timeout_sec = 20
self.producer_start_timeout_sec = 600

# How long to wait for the consumer to start consuming messages?
self.consumer_start_timeout_sec = 60
self.consumer_start_timeout_sec = 600

# How long wait for the consumer process to fork? This
# is important in the case when the consumer is starting from the end,
Expand Down Expand Up @@ -65,7 +65,7 @@ def start_producer_and_consumer(self):


self.producer.start()
wait_until(lambda: self.producer.num_acked > 5,
wait_until(lambda: self.producer.num_acked > 19,
timeout_sec=self.producer_start_timeout_sec,
err_msg="Producer failed to produce messages for %ds." %\
self.producer_start_timeout_sec)
Expand Down
2 changes: 1 addition & 1 deletion tests/kafkatest/tests/streams/base_streams_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def get_configs(extra_configs=""):

def wait_for_verification(self, processor, message, file, num_lines=1):
wait_until(lambda: self.verify_from_file(processor, message, file) >= num_lines,
timeout_sec=60,
timeout_sec=300,
err_msg="Did expect to read '%s' from %s" % (message, processor.node.account))

def verify_from_file(self, processor, message, file):
Expand Down
Loading
Loading