Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ydb/library/yql/dq/actors/common/retry_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class TRetryEventsQueue {
void Send(THolder<T> ev, ui64 cookie = 0) {
if (LocalRecipient) {
LastSentDataTime = TInstant::Now();
NActors::TActivationContext::Send(new NActors::IEventHandle(RecipientId, SenderId, ev.Release(), /* flags */ 0, cookie));
NActors::TActivationContext::Send(new NActors::IEventHandle(RecipientId, SenderId, ev.Release(), /* flags */ NActors::IEventHandle::FlagTrackDelivery, cookie));
return;
}

Expand Down
63 changes: 43 additions & 20 deletions ydb/tests/fq/yds/test_row_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
@pytest.fixture
def kikimr(request):
kikimr_conf = StreamingOverKikimrConfig(
cloud_mode=True, node_count={"/cp": TenantConfig(1), "/compute": TenantConfig(2)}
cloud_mode=True, node_count={"/cp": TenantConfig(1), "/compute": TenantConfig(3)}
)
kikimr = StreamingOverKikimr(kikimr_conf)
kikimr.compute_plane.fq_config['row_dispatcher']['enabled'] = True
Expand Down Expand Up @@ -679,39 +679,41 @@ def test_restart_compute_node(self, kikimr, client):
client.create_yds_connection(
YDS_CONNECTION, os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"), shared_reading=True
)
self.init_topics("test_restart_compute_node")
self.init_topics("test_restart_compute_node", partitions_count=4)

sql = Rf'''
INSERT INTO {YDS_CONNECTION}.`{self.output_topic}`
SELECT Cast(time as String) FROM {YDS_CONNECTION}.`{self.input_topic}`
WITH (format=json_each_row, SCHEMA (time Int32 NOT NULL));'''

query_id = start_yds_query(kikimr, client, sql)
wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 1)
wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 4)

data = ['{"time": 101, "data": "hello1"}', '{"time": 102, "data": "hello2"}']
write_stream(self.input_topic, [Rf'''{{"time": {c}}}''' for c in range(100, 102)], "partition_key1")
write_stream(self.input_topic, [Rf'''{{"time": {c}}}''' for c in range(102, 104)], "partition_key2")
write_stream(self.input_topic, [Rf'''{{"time": {c}}}''' for c in range(104, 106)], "partition_key3")
write_stream(self.input_topic, [Rf'''{{"time": {c}}}''' for c in range(106, 108)], "partition_key4")

self.write_stream(data)
expected = ['101', '102']
assert self.read_stream(len(expected), topic_path=self.output_topic) == expected
expected = [Rf'''{c}''' for c in range(100, 108)]
assert sorted(self.read_stream(len(expected), topic_path=self.output_topic)) == expected

kikimr.compute_plane.wait_completed_checkpoints(
query_id, kikimr.compute_plane.get_completed_checkpoints(query_id) + 2
)

wait_actor_count(kikimr, "DQ_PQ_READ_ACTOR", 1)
wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 1)
query_id, kikimr.compute_plane.get_completed_checkpoints(query_id) + 2)

node_index = 2
logging.debug("Restart compute node {}".format(node_index))
kikimr.compute_plane.kikimr_cluster.nodes[node_index].stop()
kikimr.compute_plane.kikimr_cluster.nodes[node_index].start()
kikimr.compute_plane.wait_bootstrap(node_index)

data = ['{"time": 103, "data": "hello3"}', '{"time": 104, "data": "hello4"}']
self.write_stream(data)
expected = ['103', '104']
assert self.read_stream(len(expected), topic_path=self.output_topic) == expected
write_stream(self.input_topic, [Rf'''{{"time": {c}}}''' for c in range(108, 110)], "partition_key1")
write_stream(self.input_topic, [Rf'''{{"time": {c}}}''' for c in range(110, 112)], "partition_key2")
write_stream(self.input_topic, [Rf'''{{"time": {c}}}''' for c in range(112, 114)], "partition_key3")
write_stream(self.input_topic, [Rf'''{{"time": {c}}}''' for c in range(114, 116)], "partition_key4")

expected = [Rf'''{c}''' for c in range(108, 116)]
assert sorted(self.read_stream(len(expected), topic_path=self.output_topic)) == expected

kikimr.compute_plane.wait_completed_checkpoints(
query_id, kikimr.compute_plane.get_completed_checkpoints(query_id) + 2
)
Expand All @@ -722,10 +724,31 @@ def test_restart_compute_node(self, kikimr, client):
kikimr.compute_plane.kikimr_cluster.nodes[node_index].start()
kikimr.compute_plane.wait_bootstrap(node_index)

data = ['{"time": 105, "data": "hello5"}', '{"time": 106, "data": "hello6"}']
self.write_stream(data)
expected = ['105', '106']
assert self.read_stream(len(expected), topic_path=self.output_topic) == expected
write_stream(self.input_topic, [Rf'''{{"time": {c}}}''' for c in range(116, 118)], "partition_key1")
write_stream(self.input_topic, [Rf'''{{"time": {c}}}''' for c in range(118, 120)], "partition_key2")
write_stream(self.input_topic, [Rf'''{{"time": {c}}}''' for c in range(120, 122)], "partition_key3")
write_stream(self.input_topic, [Rf'''{{"time": {c}}}''' for c in range(122, 124)], "partition_key4")

expected = [Rf'''{c}''' for c in range(116, 124)]
assert sorted(self.read_stream(len(expected), topic_path=self.output_topic)) == expected

kikimr.compute_plane.wait_completed_checkpoints(
query_id, kikimr.compute_plane.get_completed_checkpoints(query_id) + 2
)

node_index = 3
logging.debug("Restart compute node {}".format(node_index))
kikimr.compute_plane.kikimr_cluster.nodes[node_index].stop()
kikimr.compute_plane.kikimr_cluster.nodes[node_index].start()
kikimr.compute_plane.wait_bootstrap(node_index)

write_stream(self.input_topic, [Rf'''{{"time": {c}}}''' for c in range(124, 126)], "partition_key1")
write_stream(self.input_topic, [Rf'''{{"time": {c}}}''' for c in range(126, 128)], "partition_key2")
write_stream(self.input_topic, [Rf'''{{"time": {c}}}''' for c in range(128, 130)], "partition_key3")
write_stream(self.input_topic, [Rf'''{{"time": {c}}}''' for c in range(130, 132)], "partition_key4")

expected = [Rf'''{c}''' for c in range(124, 132)]
assert sorted(self.read_stream(len(expected), topic_path=self.output_topic)) == expected

stop_yds_query(client, query_id)
wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 0)
Expand Down
Loading