22import os
33import json
44import sys
5+ from collections import Counter
6+ from operator import itemgetter
57
68import ydb .public .api .protos .draft .fq_pb2 as fq
79from ydb .tests .tools .fq_runner .kikimr_utils import yq_v1
1113from ydb .tests .fq .generic .utils .settings import Settings
1214
1315DEBUG = 0
16+
17+
18+ def ResequenceId (messages ):
19+ res = []
20+ i = 1
21+ for pair in messages :
22+ rpair = []
23+ for it in pair :
24+ src = json .loads (it )
25+ src ["id" ] = i
26+ rpair += [json .dumps (src )]
27+ res += [tuple (rpair )]
28+ i += 1
29+ return res
30+
31+
32+ def freeze (json ):
33+ t = type (json )
34+ if t == dict :
35+ return frozenset ((k , freeze (v )) for k , v in json .items ())
36+ if t == list :
37+ return tuple (map (freeze , json ))
38+ return json
39+
40+
1441TESTCASES = [
1542 # 0
1643 (
96123 insert into myyds.`{output_topic}`
97124 select Unwrap(Yson::SerializeJson(Yson::From(TableRow()))) from $enriched;
98125 ''' ,
99- [
100- ('{"id":3,"user":5}' , '{"id":3,"user_id":5,"lookup":null}' ),
101- ('{"id":9,"user":3}' , '{"id":9,"user_id":3,"lookup":"ydb30"}' ),
102- ('{"id":2,"user":2}' , '{"id":2,"user_id":2,"lookup":"ydb20"}' ),
103- ('{"id":1,"user":1}' , '{"id":1,"user_id":1,"lookup":"ydb10"}' ),
104- ('{"id":4,"user":3}' , '{"id":4,"user_id":3,"lookup":"ydb30"}' ),
105- ('{"id":5,"user":3}' , '{"id":5,"user_id":3,"lookup":"ydb30"}' ),
106- ('{"id":6,"user":1}' , '{"id":6,"user_id":1,"lookup":"ydb10"}' ),
107- ('{"id":7,"user":2}' , '{"id":7,"user_id":2,"lookup":"ydb20"}' ),
108- ]
109- * 20 ,
126+ ResequenceId (
127+ [
128+ ('{"id":3,"user":5}' , '{"id":3,"user_id":5,"lookup":null}' ),
129+ ('{"id":9,"user":3}' , '{"id":9,"user_id":3,"lookup":"ydb30"}' ),
130+ ('{"id":2,"user":2}' , '{"id":2,"user_id":2,"lookup":"ydb20"}' ),
131+ ('{"id":1,"user":1}' , '{"id":1,"user_id":1,"lookup":"ydb10"}' ),
132+ ('{"id":4,"user":3}' , '{"id":4,"user_id":3,"lookup":"ydb30"}' ),
133+ ('{"id":5,"user":3}' , '{"id":5,"user_id":3,"lookup":"ydb30"}' ),
134+ ('{"id":6,"user":1}' , '{"id":6,"user_id":1,"lookup":"ydb10"}' ),
135+ ('{"id":7,"user":2}' , '{"id":7,"user_id":2,"lookup":"ydb20"}' ),
136+ ]
137+ * 20
138+ ),
110139 ),
111140 # 3
112141 (
137166 insert into myyds.`{output_topic}`
138167 select Unwrap(Yson::SerializeJson(Yson::From(TableRow()))) from $enriched;
139168 ''' ,
140- [
141- (
142- '{"id":2,"ts":"20240701T113344","ev_type":"foo1","user":2}' ,
143- '{"id":2,"ts":"11:33:44","user_id":2,"lookup":"ydb20"}' ,
144- ),
145- (
146- '{"id":1,"ts":"20240701T112233","ev_type":"foo2","user":1}' ,
147- '{"id":1,"ts":"11:22:33","user_id":1,"lookup":"ydb10"}' ,
148- ),
149- (
150- '{"id":3,"ts":"20240701T113355","ev_type":"foo3","user":5}' ,
151- '{"id":3,"ts":"11:33:55","user_id":5,"lookup":null}' ,
152- ),
153- (
154- '{"id":4,"ts":"20240701T113356","ev_type":"foo4","user":3}' ,
155- '{"id":4,"ts":"11:33:56","user_id":3,"lookup":"ydb30"}' ,
156- ),
157- (
158- '{"id":5,"ts":"20240701T113357","ev_type":"foo5","user":3}' ,
159- '{"id":5,"ts":"11:33:57","user_id":3,"lookup":"ydb30"}' ,
160- ),
161- (
162- '{"id":6,"ts":"20240701T112238","ev_type":"foo6","user":1}' ,
163- '{"id":6,"ts":"11:22:38","user_id":1,"lookup":"ydb10"}' ,
164- ),
165- (
166- '{"id":7,"ts":"20240701T113349","ev_type":"foo7","user":2}' ,
167- '{"id":7,"ts":"11:33:49","user_id":2,"lookup":"ydb20"}' ,
168- ),
169- ]
170- * 10 ,
169+ ResequenceId (
170+ [
171+ (
172+ '{"id":2,"ts":"20240701T113344","ev_type":"foo1","user":2}' ,
173+ '{"id":2,"ts":"11:33:44","user_id":2,"lookup":"ydb20"}' ,
174+ ),
175+ (
176+ '{"id":1,"ts":"20240701T112233","ev_type":"foo2","user":1}' ,
177+ '{"id":1,"ts":"11:22:33","user_id":1,"lookup":"ydb10"}' ,
178+ ),
179+ (
180+ '{"id":3,"ts":"20240701T113355","ev_type":"foo3","user":5}' ,
181+ '{"id":3,"ts":"11:33:55","user_id":5,"lookup":null}' ,
182+ ),
183+ (
184+ '{"id":4,"ts":"20240701T113356","ev_type":"foo4","user":3}' ,
185+ '{"id":4,"ts":"11:33:56","user_id":3,"lookup":"ydb30"}' ,
186+ ),
187+ (
188+ '{"id":5,"ts":"20240701T113357","ev_type":"foo5","user":3}' ,
189+ '{"id":5,"ts":"11:33:57","user_id":3,"lookup":"ydb30"}' ,
190+ ),
191+ (
192+ '{"id":6,"ts":"20240701T112238","ev_type":"foo6","user":1}' ,
193+ '{"id":6,"ts":"11:22:38","user_id":1,"lookup":"ydb10"}' ,
194+ ),
195+ (
196+ '{"id":7,"ts":"20240701T113349","ev_type":"foo7","user":2}' ,
197+ '{"id":7,"ts":"11:33:49","user_id":2,"lookup":"ydb20"}' ,
198+ ),
199+ ]
200+ * 10
201+ ),
171202 ),
172203 # 4
173204 (
200231 insert into myyds.`{output_topic}`
201232 select Unwrap(Yson::SerializeJson(Yson::From(TableRow()))) from $enriched;
202233 ''' ,
203- [
204- (
205- '{"id":1,"ts":"20240701T113344","ev_type":"foo1","user":2}' ,
206- '{"id":1,"ts":"11:33:44","uid":2,"user_id":2,"name":"Petr","age":25}' ,
207- ),
208- (
209- '{"id":2,"ts":"20240701T112233","ev_type":"foo2","user":1}' ,
210- '{"id":2,"ts":"11:22:33","uid":1,"user_id":1,"name":"Anya","age":15}' ,
211- ),
212- (
213- '{"id":3,"ts":"20240701T113355","ev_type":"foo3","user":100}' ,
214- '{"id":3,"ts":"11:33:55","uid":null,"user_id":100,"name":null,"age":null}' ,
215- ),
216- (
217- '{"id":4,"ts":"20240701T113356","ev_type":"foo4","user":3}' ,
218- '{"id":4,"ts":"11:33:56","uid":3,"user_id":3,"name":"Masha","age":17}' ,
219- ),
220- (
221- '{"id":5,"ts":"20240701T113357","ev_type":"foo5","user":3}' ,
222- '{"id":5,"ts":"11:33:57","uid":3,"user_id":3,"name":"Masha","age":17}' ,
223- ),
224- (
225- '{"id":6,"ts":"20240701T112238","ev_type":"foo6","user":1}' ,
226- '{"id":6,"ts":"11:22:38","uid":1,"user_id":1,"name":"Anya","age":15}' ,
227- ),
228- (
229- '{"id":7,"ts":"20240701T113349","ev_type":"foo7","user":2}' ,
230- '{"id":7,"ts":"11:33:49","uid":2,"user_id":2,"name":"Petr","age":25}' ,
231- ),
232- ]
233- * 1000 ,
234+ ResequenceId (
235+ [
236+ (
237+ '{"id":1,"ts":"20240701T113344","ev_type":"foo1","user":2}' ,
238+ '{"id":1,"ts":"11:33:44","uid":2,"user_id":2,"name":"Petr","age":25}' ,
239+ ),
240+ (
241+ '{"id":2,"ts":"20240701T112233","ev_type":"foo2","user":1}' ,
242+ '{"id":2,"ts":"11:22:33","uid":1,"user_id":1,"name":"Anya","age":15}' ,
243+ ),
244+ (
245+ '{"id":3,"ts":"20240701T113355","ev_type":"foo3","user":100}' ,
246+ '{"id":3,"ts":"11:33:55","uid":null,"user_id":100,"name":null,"age":null}' ,
247+ ),
248+ (
249+ '{"id":4,"ts":"20240701T113356","ev_type":"foo4","user":3}' ,
250+ '{"id":4,"ts":"11:33:56","uid":3,"user_id":3,"name":"Masha","age":17}' ,
251+ ),
252+ (
253+ '{"id":5,"ts":"20240701T113357","ev_type":"foo5","user":3}' ,
254+ '{"id":5,"ts":"11:33:57","uid":3,"user_id":3,"name":"Masha","age":17}' ,
255+ ),
256+ (
257+ '{"id":6,"ts":"20240701T112238","ev_type":"foo6","user":1}' ,
258+ '{"id":6,"ts":"11:22:38","uid":1,"user_id":1,"name":"Anya","age":15}' ,
259+ ),
260+ (
261+ '{"id":7,"ts":"20240701T113349","ev_type":"foo7","user":2}' ,
262+ '{"id":7,"ts":"11:33:49","uid":2,"user_id":2,"name":"Petr","age":25}' ,
263+ ),
264+ ]
265+ * 1000
266+ ),
234267 ),
235268 # 5
236269 (
@@ -334,12 +367,23 @@ def test_simple(self, kikimr, fq_client: FederatedQueryClient, settings: Setting
334367 @yq_v1
335368 @pytest .mark .parametrize ("mvp_external_ydb_endpoint" , [{"endpoint" : "tests-fq-generic-ydb:2136" }], indirect = True )
336369 @pytest .mark .parametrize ("fq_client" , [{"folder_id" : "my_folder_slj" }], indirect = True )
337- @pytest .mark .parametrize ("streamlookup" , [False , True ])
370+ @pytest .mark .parametrize ("partitions_count" , [1 , 3 ])
371+ @pytest .mark .parametrize ("streamlookup" , [False , True ] if DEBUG else [True ])
338372 @pytest .mark .parametrize ("testcase" , [* range (len (TESTCASES ))])
339373 def test_streamlookup (
340- self , kikimr , testcase , streamlookup , fq_client : FederatedQueryClient , settings : Settings , yq_version
374+ self ,
375+ kikimr ,
376+ testcase ,
377+ streamlookup ,
378+ partitions_count ,
379+ fq_client : FederatedQueryClient ,
380+ settings : Settings ,
381+ yq_version ,
341382 ):
342- self .init_topics (f"pq_yq_streaming_test_lookup_{ streamlookup } { testcase } _{ yq_version } " )
383+ self .init_topics (
384+ f"pq_yq_str_lookup_{ partitions_count } { streamlookup } { testcase } _{ yq_version } " ,
385+ partitions_count = partitions_count ,
386+ )
343387 fq_client .create_yds_connection ("myyds" , os .getenv ("YDB_DATABASE" ), os .getenv ("YDB_ENDPOINT" ))
344388
345389 table_name = 'join_table'
@@ -359,7 +403,7 @@ def test_streamlookup(
359403 )
360404
361405 query_id = fq_client .create_query (
362- f"streamlookup_{ streamlookup } { testcase } " , sql , type = fq .QueryContent .QueryType .STREAMING
406+ f"streamlookup_{ partitions_count } { streamlookup } { testcase } " , sql , type = fq .QueryContent .QueryType .STREAMING
363407 ).result .query_id
364408 fq_client .wait_query_status (query_id , fq .QueryMeta .RUNNING )
365409 kikimr .compute_plane .wait_zero_checkpoint (query_id )
@@ -375,10 +419,9 @@ def test_streamlookup(
375419 print (streamlookup , testcase , file = sys .stderr )
376420 print (sql , file = sys .stderr )
377421 print (* zip (messages , read_data ), file = sys .stderr , sep = "\n " )
378- for r , exp in zip (read_data , messages ):
379- r = json .loads (r )
380- exp = json .loads (exp [1 ])
381- assert r == exp
422+ read_data_ctr = Counter (map (freeze , map (json .loads , read_data )))
423+ messages_ctr = Counter (map (freeze , map (json .loads , map (itemgetter (1 ), messages ))))
424+ assert read_data_ctr == messages_ctr
382425
383426 fq_client .abort_query (query_id )
384427 fq_client .wait_query (query_id )
0 commit comments