Skip to content

Commit d939c0f

Browse files
authored
YQ-3933 Fix FlagTrackDelivery (#12160)
1 parent d3b1607 commit d939c0f

File tree

2 files changed

+44
-21
lines changed

2 files changed

+44
-21
lines changed

ydb/library/yql/dq/actors/common/retry_queue.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ class TRetryEventsQueue {
8989
void Send(THolder<T> ev, ui64 cookie = 0) {
9090
if (LocalRecipient) {
9191
LastSentDataTime = TInstant::Now();
92-
NActors::TActivationContext::Send(new NActors::IEventHandle(RecipientId, SenderId, ev.Release(), /* flags */ 0, cookie));
92+
NActors::TActivationContext::Send(new NActors::IEventHandle(RecipientId, SenderId, ev.Release(), /* flags */ NActors::IEventHandle::FlagTrackDelivery, cookie));
9393
return;
9494
}
9595

ydb/tests/fq/yds/test_row_dispatcher.py

Lines changed: 43 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
@pytest.fixture
2727
def kikimr(request):
2828
kikimr_conf = StreamingOverKikimrConfig(
29-
cloud_mode=True, node_count={"/cp": TenantConfig(1), "/compute": TenantConfig(2)}
29+
cloud_mode=True, node_count={"/cp": TenantConfig(1), "/compute": TenantConfig(3)}
3030
)
3131
kikimr = StreamingOverKikimr(kikimr_conf)
3232
kikimr.compute_plane.fq_config['row_dispatcher']['enabled'] = True
@@ -687,39 +687,41 @@ def test_restart_compute_node(self, kikimr, client):
687687
client.create_yds_connection(
688688
YDS_CONNECTION, os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"), shared_reading=True
689689
)
690-
self.init_topics("test_restart_compute_node")
690+
self.init_topics("test_restart_compute_node", partitions_count=4)
691691

692692
sql = Rf'''
693693
INSERT INTO {YDS_CONNECTION}.`{self.output_topic}`
694694
SELECT Cast(time as String) FROM {YDS_CONNECTION}.`{self.input_topic}`
695695
WITH (format=json_each_row, SCHEMA (time Int32 NOT NULL));'''
696696

697697
query_id = start_yds_query(kikimr, client, sql)
698-
wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 1)
698+
wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 4)
699699

700-
data = ['{"time": 101, "data": "hello1"}', '{"time": 102, "data": "hello2"}']
700+
write_stream(self.input_topic, [Rf'''{{"time": {c}}}''' for c in range(100, 102)], "partition_key1")
701+
write_stream(self.input_topic, [Rf'''{{"time": {c}}}''' for c in range(102, 104)], "partition_key2")
702+
write_stream(self.input_topic, [Rf'''{{"time": {c}}}''' for c in range(104, 106)], "partition_key3")
703+
write_stream(self.input_topic, [Rf'''{{"time": {c}}}''' for c in range(106, 108)], "partition_key4")
701704

702-
self.write_stream(data)
703-
expected = ['101', '102']
704-
assert self.read_stream(len(expected), topic_path=self.output_topic) == expected
705+
expected = [Rf'''{c}''' for c in range(100, 108)]
706+
assert sorted(self.read_stream(len(expected), topic_path=self.output_topic)) == expected
705707

706708
kikimr.compute_plane.wait_completed_checkpoints(
707-
query_id, kikimr.compute_plane.get_completed_checkpoints(query_id) + 2
708-
)
709-
710-
wait_actor_count(kikimr, "DQ_PQ_READ_ACTOR", 1)
711-
wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 1)
709+
query_id, kikimr.compute_plane.get_completed_checkpoints(query_id) + 2)
712710

713711
node_index = 2
714712
logging.debug("Restart compute node {}".format(node_index))
715713
kikimr.compute_plane.kikimr_cluster.nodes[node_index].stop()
716714
kikimr.compute_plane.kikimr_cluster.nodes[node_index].start()
717715
kikimr.compute_plane.wait_bootstrap(node_index)
718716

719-
data = ['{"time": 103, "data": "hello3"}', '{"time": 104, "data": "hello4"}']
720-
self.write_stream(data)
721-
expected = ['103', '104']
722-
assert self.read_stream(len(expected), topic_path=self.output_topic) == expected
717+
write_stream(self.input_topic, [Rf'''{{"time": {c}}}''' for c in range(108, 110)], "partition_key1")
718+
write_stream(self.input_topic, [Rf'''{{"time": {c}}}''' for c in range(110, 112)], "partition_key2")
719+
write_stream(self.input_topic, [Rf'''{{"time": {c}}}''' for c in range(112, 114)], "partition_key3")
720+
write_stream(self.input_topic, [Rf'''{{"time": {c}}}''' for c in range(114, 116)], "partition_key4")
721+
722+
expected = [Rf'''{c}''' for c in range(108, 116)]
723+
assert sorted(self.read_stream(len(expected), topic_path=self.output_topic)) == expected
724+
723725
kikimr.compute_plane.wait_completed_checkpoints(
724726
query_id, kikimr.compute_plane.get_completed_checkpoints(query_id) + 2
725727
)
@@ -730,10 +732,31 @@ def test_restart_compute_node(self, kikimr, client):
730732
kikimr.compute_plane.kikimr_cluster.nodes[node_index].start()
731733
kikimr.compute_plane.wait_bootstrap(node_index)
732734

733-
data = ['{"time": 105, "data": "hello5"}', '{"time": 106, "data": "hello6"}']
734-
self.write_stream(data)
735-
expected = ['105', '106']
736-
assert self.read_stream(len(expected), topic_path=self.output_topic) == expected
735+
write_stream(self.input_topic, [Rf'''{{"time": {c}}}''' for c in range(116, 118)], "partition_key1")
736+
write_stream(self.input_topic, [Rf'''{{"time": {c}}}''' for c in range(118, 120)], "partition_key2")
737+
write_stream(self.input_topic, [Rf'''{{"time": {c}}}''' for c in range(120, 122)], "partition_key3")
738+
write_stream(self.input_topic, [Rf'''{{"time": {c}}}''' for c in range(122, 124)], "partition_key4")
739+
740+
expected = [Rf'''{c}''' for c in range(116, 124)]
741+
assert sorted(self.read_stream(len(expected), topic_path=self.output_topic)) == expected
742+
743+
kikimr.compute_plane.wait_completed_checkpoints(
744+
query_id, kikimr.compute_plane.get_completed_checkpoints(query_id) + 2
745+
)
746+
747+
node_index = 3
748+
logging.debug("Restart compute node {}".format(node_index))
749+
kikimr.compute_plane.kikimr_cluster.nodes[node_index].stop()
750+
kikimr.compute_plane.kikimr_cluster.nodes[node_index].start()
751+
kikimr.compute_plane.wait_bootstrap(node_index)
752+
753+
write_stream(self.input_topic, [Rf'''{{"time": {c}}}''' for c in range(124, 126)], "partition_key1")
754+
write_stream(self.input_topic, [Rf'''{{"time": {c}}}''' for c in range(126, 128)], "partition_key2")
755+
write_stream(self.input_topic, [Rf'''{{"time": {c}}}''' for c in range(128, 130)], "partition_key3")
756+
write_stream(self.input_topic, [Rf'''{{"time": {c}}}''' for c in range(130, 132)], "partition_key4")
757+
758+
expected = [Rf'''{c}''' for c in range(124, 132)]
759+
assert sorted(self.read_stream(len(expected), topic_path=self.output_topic)) == expected
737760

738761
stop_yds_query(client, query_id)
739762
wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 0)

0 commit comments

Comments
 (0)