Skip to content

Commit 036a7ce

Browse files
authored
YQ-3933 Fix FlagTrackDelivery / to stable (#12232)
1 parent 57b3c20 commit 036a7ce

File tree

2 files changed

+44
-21
lines changed

2 files changed

+44
-21
lines changed

ydb/library/yql/dq/actors/common/retry_queue.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ class TRetryEventsQueue {
8989
void Send(THolder<T> ev, ui64 cookie = 0) {
9090
if (LocalRecipient) {
9191
LastSentDataTime = TInstant::Now();
92-
NActors::TActivationContext::Send(new NActors::IEventHandle(RecipientId, SenderId, ev.Release(), /* flags */ 0, cookie));
92+
NActors::TActivationContext::Send(new NActors::IEventHandle(RecipientId, SenderId, ev.Release(), /* flags */ NActors::IEventHandle::FlagTrackDelivery, cookie));
9393
return;
9494
}
9595

ydb/tests/fq/yds/test_row_dispatcher.py

Lines changed: 43 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
@pytest.fixture
2727
def kikimr(request):
2828
kikimr_conf = StreamingOverKikimrConfig(
29-
cloud_mode=True, node_count={"/cp": TenantConfig(1), "/compute": TenantConfig(2)}
29+
cloud_mode=True, node_count={"/cp": TenantConfig(1), "/compute": TenantConfig(3)}
3030
)
3131
kikimr = StreamingOverKikimr(kikimr_conf)
3232
kikimr.compute_plane.fq_config['row_dispatcher']['enabled'] = True
@@ -686,39 +686,41 @@ def test_restart_compute_node(self, kikimr, client):
686686
client.create_yds_connection(
687687
YDS_CONNECTION, os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"), shared_reading=True
688688
)
689-
self.init_topics("test_restart_compute_node")
689+
self.init_topics("test_restart_compute_node", partitions_count=4)
690690

691691
sql = Rf'''
692692
INSERT INTO {YDS_CONNECTION}.`{self.output_topic}`
693693
SELECT Cast(time as String) FROM {YDS_CONNECTION}.`{self.input_topic}`
694694
WITH (format=json_each_row, SCHEMA (time Int32 NOT NULL));'''
695695

696696
query_id = start_yds_query(kikimr, client, sql)
697-
wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 1)
697+
wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 4)
698698

699-
data = ['{"time": 101, "data": "hello1"}', '{"time": 102, "data": "hello2"}']
699+
write_stream(self.input_topic, [Rf'''{{"time": {c}}}''' for c in range(100, 102)], "partition_key1")
700+
write_stream(self.input_topic, [Rf'''{{"time": {c}}}''' for c in range(102, 104)], "partition_key2")
701+
write_stream(self.input_topic, [Rf'''{{"time": {c}}}''' for c in range(104, 106)], "partition_key3")
702+
write_stream(self.input_topic, [Rf'''{{"time": {c}}}''' for c in range(106, 108)], "partition_key4")
700703

701-
self.write_stream(data)
702-
expected = ['101', '102']
703-
assert self.read_stream(len(expected), topic_path=self.output_topic) == expected
704+
expected = [Rf'''{c}''' for c in range(100, 108)]
705+
assert sorted(self.read_stream(len(expected), topic_path=self.output_topic)) == expected
704706

705707
kikimr.compute_plane.wait_completed_checkpoints(
706-
query_id, kikimr.compute_plane.get_completed_checkpoints(query_id) + 1
707-
)
708-
709-
wait_actor_count(kikimr, "DQ_PQ_READ_ACTOR", 1)
710-
wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 1)
708+
query_id, kikimr.compute_plane.get_completed_checkpoints(query_id) + 2)
711709

712710
node_index = 2
713711
logging.debug("Restart compute node {}".format(node_index))
714712
kikimr.compute_plane.kikimr_cluster.nodes[node_index].stop()
715713
kikimr.compute_plane.kikimr_cluster.nodes[node_index].start()
716714
kikimr.compute_plane.wait_bootstrap(node_index)
717715

718-
data = ['{"time": 103, "data": "hello3"}', '{"time": 104, "data": "hello4"}']
719-
self.write_stream(data)
720-
expected = ['103', '104']
721-
assert self.read_stream(len(expected), topic_path=self.output_topic) == expected
716+
write_stream(self.input_topic, [Rf'''{{"time": {c}}}''' for c in range(108, 110)], "partition_key1")
717+
write_stream(self.input_topic, [Rf'''{{"time": {c}}}''' for c in range(110, 112)], "partition_key2")
718+
write_stream(self.input_topic, [Rf'''{{"time": {c}}}''' for c in range(112, 114)], "partition_key3")
719+
write_stream(self.input_topic, [Rf'''{{"time": {c}}}''' for c in range(114, 116)], "partition_key4")
720+
721+
expected = [Rf'''{c}''' for c in range(108, 116)]
722+
assert sorted(self.read_stream(len(expected), topic_path=self.output_topic)) == expected
723+
722724
kikimr.compute_plane.wait_completed_checkpoints(
723725
query_id, kikimr.compute_plane.get_completed_checkpoints(query_id) + 1
724726
)
@@ -729,10 +731,31 @@ def test_restart_compute_node(self, kikimr, client):
729731
kikimr.compute_plane.kikimr_cluster.nodes[node_index].start()
730732
kikimr.compute_plane.wait_bootstrap(node_index)
731733

732-
data = ['{"time": 105, "data": "hello5"}', '{"time": 106, "data": "hello6"}']
733-
self.write_stream(data)
734-
expected = ['105', '106']
735-
assert self.read_stream(len(expected), topic_path=self.output_topic) == expected
734+
write_stream(self.input_topic, [Rf'''{{"time": {c}}}''' for c in range(116, 118)], "partition_key1")
735+
write_stream(self.input_topic, [Rf'''{{"time": {c}}}''' for c in range(118, 120)], "partition_key2")
736+
write_stream(self.input_topic, [Rf'''{{"time": {c}}}''' for c in range(120, 122)], "partition_key3")
737+
write_stream(self.input_topic, [Rf'''{{"time": {c}}}''' for c in range(122, 124)], "partition_key4")
738+
739+
expected = [Rf'''{c}''' for c in range(116, 124)]
740+
assert sorted(self.read_stream(len(expected), topic_path=self.output_topic)) == expected
741+
742+
kikimr.compute_plane.wait_completed_checkpoints(
743+
query_id, kikimr.compute_plane.get_completed_checkpoints(query_id) + 2
744+
)
745+
746+
node_index = 3
747+
logging.debug("Restart compute node {}".format(node_index))
748+
kikimr.compute_plane.kikimr_cluster.nodes[node_index].stop()
749+
kikimr.compute_plane.kikimr_cluster.nodes[node_index].start()
750+
kikimr.compute_plane.wait_bootstrap(node_index)
751+
752+
write_stream(self.input_topic, [Rf'''{{"time": {c}}}''' for c in range(124, 126)], "partition_key1")
753+
write_stream(self.input_topic, [Rf'''{{"time": {c}}}''' for c in range(126, 128)], "partition_key2")
754+
write_stream(self.input_topic, [Rf'''{{"time": {c}}}''' for c in range(128, 130)], "partition_key3")
755+
write_stream(self.input_topic, [Rf'''{{"time": {c}}}''' for c in range(130, 132)], "partition_key4")
756+
757+
expected = [Rf'''{c}''' for c in range(124, 132)]
758+
assert sorted(self.read_stream(len(expected), topic_path=self.output_topic)) == expected
736759

737760
stop_yds_query(client, query_id)
738761
wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 0)

0 commit comments

Comments
 (0)