Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ydb/library/yql/dq/tasks/dq_connection_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ void BuildStreamLookupChannels(TGraph& graph, const NNodes::TDqPhyStage& stage,
auto& originStageInfo = graph.GetStageInfo(cnStreamLookup.Output().Stage());
auto outputIndex = FromString<ui32>(cnStreamLookup.Output().Index().Value());

BuildMapChannels(graph, stageInfo, inputIndex, originStageInfo, outputIndex, false /*spilling*/, logFunc);
BuildUnionAllChannels(graph, stageInfo, inputIndex, originStageInfo, outputIndex, false /*spilling*/, logFunc);
}

template <typename TGraph>
Expand Down
205 changes: 124 additions & 81 deletions ydb/tests/fq/generic/test_join_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
import os
import json
import sys
from collections import Counter
from operator import itemgetter

import ydb.public.api.protos.draft.fq_pb2 as fq
from ydb.tests.tools.fq_runner.kikimr_utils import yq_v1
Expand All @@ -11,6 +13,31 @@
from ydb.tests.fq.generic.utils.settings import Settings

DEBUG = 0


def ResequenceId(messages):
res = []
i = 1
for pair in messages:
rpair = []
for it in pair:
src = json.loads(it)
src["id"] = i
rpair += [json.dumps(src)]
res += [tuple(rpair)]
i += 1
return res


def freeze(json):
t = type(json)
if t == dict:
return frozenset((k, freeze(v)) for k, v in json.items())
if t == list:
return tuple(map(freeze, json))
return json


TESTCASES = [
# 0
(
Expand Down Expand Up @@ -96,17 +123,19 @@
insert into myyds.`{output_topic}`
select Unwrap(Yson::SerializeJson(Yson::From(TableRow()))) from $enriched;
''',
[
('{"id":3,"user":5}', '{"id":3,"user_id":5,"lookup":null}'),
('{"id":9,"user":3}', '{"id":9,"user_id":3,"lookup":"ydb30"}'),
('{"id":2,"user":2}', '{"id":2,"user_id":2,"lookup":"ydb20"}'),
('{"id":1,"user":1}', '{"id":1,"user_id":1,"lookup":"ydb10"}'),
('{"id":4,"user":3}', '{"id":4,"user_id":3,"lookup":"ydb30"}'),
('{"id":5,"user":3}', '{"id":5,"user_id":3,"lookup":"ydb30"}'),
('{"id":6,"user":1}', '{"id":6,"user_id":1,"lookup":"ydb10"}'),
('{"id":7,"user":2}', '{"id":7,"user_id":2,"lookup":"ydb20"}'),
]
* 20,
ResequenceId(
[
('{"id":3,"user":5}', '{"id":3,"user_id":5,"lookup":null}'),
('{"id":9,"user":3}', '{"id":9,"user_id":3,"lookup":"ydb30"}'),
('{"id":2,"user":2}', '{"id":2,"user_id":2,"lookup":"ydb20"}'),
('{"id":1,"user":1}', '{"id":1,"user_id":1,"lookup":"ydb10"}'),
('{"id":4,"user":3}', '{"id":4,"user_id":3,"lookup":"ydb30"}'),
('{"id":5,"user":3}', '{"id":5,"user_id":3,"lookup":"ydb30"}'),
('{"id":6,"user":1}', '{"id":6,"user_id":1,"lookup":"ydb10"}'),
('{"id":7,"user":2}', '{"id":7,"user_id":2,"lookup":"ydb20"}'),
]
* 20
),
),
# 3
(
Expand Down Expand Up @@ -137,37 +166,39 @@
insert into myyds.`{output_topic}`
select Unwrap(Yson::SerializeJson(Yson::From(TableRow()))) from $enriched;
''',
[
(
'{"id":2,"ts":"20240701T113344","ev_type":"foo1","user":2}',
'{"id":2,"ts":"11:33:44","user_id":2,"lookup":"ydb20"}',
),
(
'{"id":1,"ts":"20240701T112233","ev_type":"foo2","user":1}',
'{"id":1,"ts":"11:22:33","user_id":1,"lookup":"ydb10"}',
),
(
'{"id":3,"ts":"20240701T113355","ev_type":"foo3","user":5}',
'{"id":3,"ts":"11:33:55","user_id":5,"lookup":null}',
),
(
'{"id":4,"ts":"20240701T113356","ev_type":"foo4","user":3}',
'{"id":4,"ts":"11:33:56","user_id":3,"lookup":"ydb30"}',
),
(
'{"id":5,"ts":"20240701T113357","ev_type":"foo5","user":3}',
'{"id":5,"ts":"11:33:57","user_id":3,"lookup":"ydb30"}',
),
(
'{"id":6,"ts":"20240701T112238","ev_type":"foo6","user":1}',
'{"id":6,"ts":"11:22:38","user_id":1,"lookup":"ydb10"}',
),
(
'{"id":7,"ts":"20240701T113349","ev_type":"foo7","user":2}',
'{"id":7,"ts":"11:33:49","user_id":2,"lookup":"ydb20"}',
),
]
* 10,
ResequenceId(
[
(
'{"id":2,"ts":"20240701T113344","ev_type":"foo1","user":2}',
'{"id":2,"ts":"11:33:44","user_id":2,"lookup":"ydb20"}',
),
(
'{"id":1,"ts":"20240701T112233","ev_type":"foo2","user":1}',
'{"id":1,"ts":"11:22:33","user_id":1,"lookup":"ydb10"}',
),
(
'{"id":3,"ts":"20240701T113355","ev_type":"foo3","user":5}',
'{"id":3,"ts":"11:33:55","user_id":5,"lookup":null}',
),
(
'{"id":4,"ts":"20240701T113356","ev_type":"foo4","user":3}',
'{"id":4,"ts":"11:33:56","user_id":3,"lookup":"ydb30"}',
),
(
'{"id":5,"ts":"20240701T113357","ev_type":"foo5","user":3}',
'{"id":5,"ts":"11:33:57","user_id":3,"lookup":"ydb30"}',
),
(
'{"id":6,"ts":"20240701T112238","ev_type":"foo6","user":1}',
'{"id":6,"ts":"11:22:38","user_id":1,"lookup":"ydb10"}',
),
(
'{"id":7,"ts":"20240701T113349","ev_type":"foo7","user":2}',
'{"id":7,"ts":"11:33:49","user_id":2,"lookup":"ydb20"}',
),
]
* 10
),
),
# 4
(
Expand Down Expand Up @@ -200,37 +231,39 @@
insert into myyds.`{output_topic}`
select Unwrap(Yson::SerializeJson(Yson::From(TableRow()))) from $enriched;
''',
[
(
'{"id":1,"ts":"20240701T113344","ev_type":"foo1","user":2}',
'{"id":1,"ts":"11:33:44","uid":2,"user_id":2,"name":"Petr","age":25}',
),
(
'{"id":2,"ts":"20240701T112233","ev_type":"foo2","user":1}',
'{"id":2,"ts":"11:22:33","uid":1,"user_id":1,"name":"Anya","age":15}',
),
(
'{"id":3,"ts":"20240701T113355","ev_type":"foo3","user":100}',
'{"id":3,"ts":"11:33:55","uid":null,"user_id":100,"name":null,"age":null}',
),
(
'{"id":4,"ts":"20240701T113356","ev_type":"foo4","user":3}',
'{"id":4,"ts":"11:33:56","uid":3,"user_id":3,"name":"Masha","age":17}',
),
(
'{"id":5,"ts":"20240701T113357","ev_type":"foo5","user":3}',
'{"id":5,"ts":"11:33:57","uid":3,"user_id":3,"name":"Masha","age":17}',
),
(
'{"id":6,"ts":"20240701T112238","ev_type":"foo6","user":1}',
'{"id":6,"ts":"11:22:38","uid":1,"user_id":1,"name":"Anya","age":15}',
),
(
'{"id":7,"ts":"20240701T113349","ev_type":"foo7","user":2}',
'{"id":7,"ts":"11:33:49","uid":2,"user_id":2,"name":"Petr","age":25}',
),
]
* 1000,
ResequenceId(
[
(
'{"id":1,"ts":"20240701T113344","ev_type":"foo1","user":2}',
'{"id":1,"ts":"11:33:44","uid":2,"user_id":2,"name":"Petr","age":25}',
),
(
'{"id":2,"ts":"20240701T112233","ev_type":"foo2","user":1}',
'{"id":2,"ts":"11:22:33","uid":1,"user_id":1,"name":"Anya","age":15}',
),
(
'{"id":3,"ts":"20240701T113355","ev_type":"foo3","user":100}',
'{"id":3,"ts":"11:33:55","uid":null,"user_id":100,"name":null,"age":null}',
),
(
'{"id":4,"ts":"20240701T113356","ev_type":"foo4","user":3}',
'{"id":4,"ts":"11:33:56","uid":3,"user_id":3,"name":"Masha","age":17}',
),
(
'{"id":5,"ts":"20240701T113357","ev_type":"foo5","user":3}',
'{"id":5,"ts":"11:33:57","uid":3,"user_id":3,"name":"Masha","age":17}',
),
(
'{"id":6,"ts":"20240701T112238","ev_type":"foo6","user":1}',
'{"id":6,"ts":"11:22:38","uid":1,"user_id":1,"name":"Anya","age":15}',
),
(
'{"id":7,"ts":"20240701T113349","ev_type":"foo7","user":2}',
'{"id":7,"ts":"11:33:49","uid":2,"user_id":2,"name":"Petr","age":25}',
),
]
* 1000
),
),
# 5
(
Expand Down Expand Up @@ -334,12 +367,23 @@ def test_simple(self, kikimr, fq_client: FederatedQueryClient, settings: Setting
@yq_v1
@pytest.mark.parametrize("mvp_external_ydb_endpoint", [{"endpoint": "tests-fq-generic-ydb:2136"}], indirect=True)
@pytest.mark.parametrize("fq_client", [{"folder_id": "my_folder_slj"}], indirect=True)
@pytest.mark.parametrize("streamlookup", [False, True])
@pytest.mark.parametrize("partitions_count", [1, 3])
@pytest.mark.parametrize("streamlookup", [False, True] if DEBUG else [True])
@pytest.mark.parametrize("testcase", [*range(len(TESTCASES))])
def test_streamlookup(
self, kikimr, testcase, streamlookup, fq_client: FederatedQueryClient, settings: Settings, yq_version
self,
kikimr,
testcase,
streamlookup,
partitions_count,
fq_client: FederatedQueryClient,
settings: Settings,
yq_version,
):
self.init_topics(f"pq_yq_streaming_test_lookup_{streamlookup}{testcase}_{yq_version}")
self.init_topics(
f"pq_yq_str_lookup_{partitions_count}{streamlookup}{testcase}_{yq_version}",
partitions_count=partitions_count,
)
fq_client.create_yds_connection("myyds", os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"))

table_name = 'join_table'
Expand All @@ -359,7 +403,7 @@ def test_streamlookup(
)

query_id = fq_client.create_query(
f"streamlookup_{streamlookup}{testcase}", sql, type=fq.QueryContent.QueryType.STREAMING
f"streamlookup_{partitions_count}{streamlookup}{testcase}", sql, type=fq.QueryContent.QueryType.STREAMING
).result.query_id
fq_client.wait_query_status(query_id, fq.QueryMeta.RUNNING)
kikimr.compute_plane.wait_zero_checkpoint(query_id)
Expand All @@ -375,10 +419,9 @@ def test_streamlookup(
print(streamlookup, testcase, file=sys.stderr)
print(sql, file=sys.stderr)
print(*zip(messages, read_data), file=sys.stderr, sep="\n")
for r, exp in zip(read_data, messages):
r = json.loads(r)
exp = json.loads(exp[1])
assert r == exp
read_data_ctr = Counter(map(freeze, map(json.loads, read_data)))
messages_ctr = Counter(map(freeze, map(json.loads, map(itemgetter(1), messages))))
assert read_data_ctr == messages_ctr

fq_client.abort_query(query_id)
fq_client.wait_query(query_id)
Expand Down