Skip to content

Commit dcaab53

Browse files
committed
try to fix flaky tests
1 parent 94f9b8f commit dcaab53

File tree

5 files changed

+42
-44
lines changed

5 files changed

+42
-44
lines changed

ydb/library/yql/providers/pq/async_io/dq_pq_read_actor_base.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ void TDqPqReadActorBase::SaveState(const NDqProto::TCheckpoint& /*checkpoint*/,
3636
partitionState->SetCluster(cluster);
3737
partitionState->SetPartition(partition);
3838
partitionState->SetOffset(offset);
39+
SRC_LOG_D("SessionId: " << GetSessionId() << " SaveState: partition " << partition << ", offset: " << offset);
3940
}
4041

4142
stateProto.SetStartingMessageTimestampMs(StartingMessageTimestamp.MilliSeconds());

ydb/tests/fq/yds/test_recovery.py

Lines changed: 38 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import ydb.tests.library.common.yatest_common as yatest_common
1515
from ydb.tests.tools.fq_runner.kikimr_runner import StreamingOverKikimr
1616
from ydb.tests.tools.fq_runner.kikimr_runner import StreamingOverKikimrConfig
17+
from ydb.tests.tools.fq_runner.kikimr_runner import TenantConfig
1718
from ydb.tests.tools.fq_runner.kikimr_utils import yq_v1
1819
from ydb.tests.tools.datastreams_helpers.test_yds_base import TestYdsBase
1920
from ydb.tests.tools.datastreams_helpers.control_plane import create_stream
@@ -33,7 +34,7 @@ def run_with_sleep(args):
3334

3435
@pytest.fixture
3536
def kikimr():
36-
kikimr_conf = StreamingOverKikimrConfig(node_count=8, cloud_mode=True)
37+
kikimr_conf = StreamingOverKikimrConfig(node_count={"/cp": TenantConfig(1), "/compute": TenantConfig(2)}, cloud_mode=True)
3738
kikimr = StreamingOverKikimr(kikimr_conf)
3839
kikimr.start_mvp_mock_server()
3940
kikimr.start()
@@ -50,8 +51,8 @@ def setup_class(cls):
5051

5152
@retry.retry_intrusive
5253
def get_graph_master_node_id(self, query_id):
53-
for node_index in self.kikimr.control_plane.kikimr_cluster.nodes:
54-
if self.kikimr.control_plane.get_task_count(node_index, query_id) > 0:
54+
for node_index in self.kikimr.compute_plane.kikimr_cluster.nodes:
55+
if self.kikimr.compute_plane.get_task_count(node_index, query_id) > 0:
5556
return node_index
5657
assert False, "No active graphs found"
5758

@@ -61,9 +62,9 @@ def dump_workers(self, worker_count, ca_count, wait_time=yatest_common.plain_or_
6162
wcs = 0
6263
ccs = 0
6364
list = []
64-
for node_index in self.kikimr.control_plane.kikimr_cluster.nodes:
65-
wc = self.kikimr.control_plane.get_worker_count(node_index)
66-
cc = self.kikimr.control_plane.get_ca_count(node_index)
65+
for node_index in self.kikimr.compute_plane.kikimr_cluster.nodes:
66+
wc = self.kikimr.compute_plane.get_worker_count(node_index)
67+
cc = self.kikimr.compute_plane.get_ca_count(node_index)
6768
wcs += wc
6869
ccs += cc
6970
list.append([node_index, wc, cc])
@@ -81,8 +82,8 @@ def dump_workers(self, worker_count, ca_count, wait_time=yatest_common.plain_or_
8182

8283
@yq_v1
8384
def test_delete(self, client, kikimr):
84-
for node_index in kikimr.control_plane.kikimr_cluster.nodes:
85-
kikimr.control_plane.wait_bootstrap(node_index)
85+
kikimr.control_plane.wait_bootstrap()
86+
kikimr.compute_plane.wait_bootstrap()
8687

8788
self.kikimr = kikimr
8889
self.init_topics("recovery", partitions_count=2)
@@ -117,10 +118,9 @@ def test_program_state_recovery(self, client, kikimr):
117118
# [ Bucket2 ) |(emited)
118119
# .<------------------------------------- restart
119120
# [ Bucket3 ) |(emited)
120-
for node_index in kikimr.control_plane.kikimr_cluster.nodes:
121-
kikimr.control_plane.wait_bootstrap(node_index)
122-
for node_index in kikimr.control_plane.kikimr_cluster.nodes:
123-
kikimr.control_plane.wait_discovery(node_index)
121+
kikimr.control_plane.wait_bootstrap()
122+
kikimr.compute_plane.wait_bootstrap()
123+
kikimr.compute_plane.wait_discovery()
124124

125125
self.kikimr = kikimr
126126
self.init_topics("program_state_recovery", partitions_count=1)
@@ -162,18 +162,18 @@ def test_program_state_recovery(self, client, kikimr):
162162

163163
# restart node with CA
164164
node_to_restart = None
165-
for node_index in kikimr.control_plane.kikimr_cluster.nodes:
166-
wc = kikimr.control_plane.get_worker_count(node_index)
165+
for node_index in kikimr.compute_plane.kikimr_cluster.nodes:
166+
wc = kikimr.compute_plane.get_worker_count(node_index)
167167
if wc is not None:
168168
if wc > 0 and node_index != master_node_index and node_to_restart is None:
169169
node_to_restart = node_index
170170
assert node_to_restart is not None, "Can't find any task on non master node"
171171

172172
logging.debug("Restart non-master node {}".format(node_to_restart))
173173

174-
kikimr.control_plane.kikimr_cluster.nodes[node_to_restart].stop()
175-
kikimr.control_plane.kikimr_cluster.nodes[node_to_restart].start()
176-
kikimr.control_plane.wait_bootstrap(node_to_restart)
174+
kikimr.compute_plane.kikimr_cluster.nodes[node_to_restart].stop()
175+
kikimr.compute_plane.kikimr_cluster.nodes[node_to_restart].start()
176+
kikimr.compute_plane.wait_bootstrap(node_to_restart)
177177

178178
self.write_stream([f'{{"time" = {i};}}' for i in range(116, 144, 2)])
179179

@@ -198,10 +198,9 @@ def test_program_state_recovery(self, client, kikimr):
198198
# ids=["not_master", "master"]
199199
# )
200200
def test_recovery(self, client, kikimr):
201-
for node_index in kikimr.control_plane.kikimr_cluster.nodes:
202-
kikimr.control_plane.wait_bootstrap(node_index)
203-
for node_index in kikimr.control_plane.kikimr_cluster.nodes:
204-
kikimr.control_plane.wait_discovery(node_index)
201+
kikimr.control_plane.wait_bootstrap()
202+
kikimr.compute_plane.wait_bootstrap()
203+
kikimr.compute_plane.wait_discovery()
205204

206205
self.init_topics("recovery", partitions_count=2)
207206

@@ -242,18 +241,18 @@ def test_recovery(self, client, kikimr):
242241
self.dump_workers(2, 4)
243242

244243
node_to_restart = None
245-
for node_index in kikimr.control_plane.kikimr_cluster.nodes:
246-
wc = kikimr.control_plane.get_worker_count(node_index)
244+
for node_index in kikimr.compute_plane.kikimr_cluster.nodes:
245+
wc = kikimr.compute_plane.get_worker_count(node_index)
247246
if wc is not None:
248247
if wc > 0 and node_index != master_node_index and node_to_restart is None:
249248
node_to_restart = node_index
250249
assert node_to_restart is not None, "Can't find any task on non master node"
251250

252251
logging.debug("Restart non-master node {}".format(node_to_restart))
253252

254-
kikimr.control_plane.kikimr_cluster.nodes[node_to_restart].stop()
255-
kikimr.control_plane.kikimr_cluster.nodes[node_to_restart].start()
256-
kikimr.control_plane.wait_bootstrap(node_to_restart)
253+
kikimr.compute_plane.kikimr_cluster.nodes[node_to_restart].stop()
254+
kikimr.compute_plane.kikimr_cluster.nodes[node_to_restart].start()
255+
kikimr.compute_plane.wait_bootstrap(node_to_restart)
257256

258257
self.dump_workers(2, 4)
259258

@@ -273,9 +272,9 @@ def test_recovery(self, client, kikimr):
273272

274273
logging.debug("Restart Master node {}".format(master_node_index))
275274

276-
kikimr.control_plane.kikimr_cluster.nodes[master_node_index].stop()
277-
kikimr.control_plane.kikimr_cluster.nodes[master_node_index].start()
278-
kikimr.control_plane.wait_bootstrap(master_node_index)
275+
kikimr.compute_plane.kikimr_cluster.nodes[master_node_index].stop()
276+
kikimr.compute_plane.kikimr_cluster.nodes[master_node_index].start()
277+
kikimr.compute_plane.wait_bootstrap(master_node_index)
279278
master_node_index = self.get_graph_master_node_id(query_id)
280279

281280
logging.debug("New master node {}".format(master_node_index))
@@ -295,10 +294,10 @@ def test_recovery(self, client, kikimr):
295294
d[n] = 1
296295
assert len(d) == 30
297296

298-
zero_checkpoints_metric = kikimr.control_plane.get_checkpoint_coordinator_metric(
297+
zero_checkpoints_metric = kikimr.compute_plane.get_checkpoint_coordinator_metric(
299298
query_id, "StartedFromEmptyCheckpoint"
300299
)
301-
restored_metric = kikimr.control_plane.get_checkpoint_coordinator_metric(
300+
restored_metric = kikimr.compute_plane.get_checkpoint_coordinator_metric(
302301
query_id, "RestoredFromSavedCheckpoint"
303302
)
304303
assert restored_metric >= 1, "RestoredFromSavedCheckpoint: {}, StartedFromEmptyCheckpoint: {}".format(
@@ -420,10 +419,9 @@ def test_ic_disconnection(self, client):
420419

421420
@yq_v1
422421
def test_program_state_recovery_error_if_no_states(self, client, kikimr):
423-
for node_index in kikimr.control_plane.kikimr_cluster.nodes:
424-
kikimr.control_plane.wait_bootstrap(node_index)
425-
for node_index in kikimr.control_plane.kikimr_cluster.nodes:
426-
kikimr.control_plane.wait_discovery(node_index)
422+
kikimr.control_plane.wait_bootstrap()
423+
kikimr.compute_plane.wait_bootstrap()
424+
kikimr.compute_plane.wait_discovery()
427425
self.init_topics("error_if_no_states", partitions_count=1)
428426

429427
sql = R'''
@@ -443,17 +441,16 @@ def test_program_state_recovery_error_if_no_states(self, client, kikimr):
443441
query_id, kikimr.compute_plane.get_completed_checkpoints(query_id) + 1
444442
)
445443

446-
for node_index in kikimr.control_plane.kikimr_cluster.nodes:
447-
kikimr.control_plane.kikimr_cluster.nodes[node_index].stop()
444+
for node_index in kikimr.compute_plane.kikimr_cluster.nodes:
445+
kikimr.compute_plane.kikimr_cluster.nodes[node_index].stop()
448446

449447
session = kikimr.driver.table_client.session().create()
450448
checkpoint_table_prefix = "/local/CheckpointCoordinatorStorage_" + kikimr.uuid + '/states'
451449
session.transaction().execute(f"DELETE FROM `{checkpoint_table_prefix}`", commit_tx=True)
452450

453-
for node_index in kikimr.control_plane.kikimr_cluster.nodes:
454-
kikimr.control_plane.kikimr_cluster.nodes[node_index].start()
455-
for node_index in kikimr.control_plane.kikimr_cluster.nodes:
456-
kikimr.control_plane.wait_bootstrap(node_index)
451+
for node_index in kikimr.compute_plane.kikimr_cluster.nodes:
452+
kikimr.compute_plane.kikimr_cluster.nodes[node_index].start()
453+
kikimr.compute_plane.wait_bootstrap()
457454

458455
client.wait_query_status(query_id, fq.QueryMeta.FAILED)
459456
describe_result = client.describe_query(query_id).result

ydb/tests/fq/yds/test_recovery_mz.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
@pytest.fixture
2323
def kikimr():
2424
kikimr_conf = StreamingOverKikimrConfig(
25-
cloud_mode=True, node_count={"/cp": TenantConfig(1), "/compute": TenantConfig(8)}
25+
cloud_mode=True, node_count={"/cp": TenantConfig(1), "/compute": TenantConfig(2)}
2626
)
2727
kikimr = StreamingOverKikimr(kikimr_conf)
2828
# control

ydb/tests/fq/yds/test_row_dispatcher.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -544,7 +544,7 @@ def test_stop_start_with_filter(self, kikimr, client):
544544
self.write_stream(data)
545545

546546
kikimr.compute_plane.wait_completed_checkpoints(
547-
query_id, kikimr.compute_plane.get_completed_checkpoints(query_id) + 2
547+
query_id, kikimr.compute_plane.get_completed_checkpoints(query_id) + 10 # long sleep to send status from topic_session to read_actor
548548
)
549549
stop_yds_query(client, query_id)
550550
wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 0)

ydb/tests/tools/datastreams_helpers/data_plane.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
from ydb.public.api.protos.ydb_status_codes_pb2 import StatusIds
1515

1616

17-
READ_TOOL_TIMEOUT = yatest_common.plain_or_under_sanitizer(20, 300)
17+
READ_TOOL_TIMEOUT = yatest_common.plain_or_under_sanitizer(30, 300)
1818

1919

2020
def write_stream(path, data, partition_key=None):

0 commit comments

Comments
 (0)