Skip to content

Commit 9526f04

Browse files
authored
Merge 061c9c8 into 67e35be
2 parents 67e35be + 061c9c8 commit 9526f04

File tree

7 files changed

+277
-105
lines changed

7 files changed

+277
-105
lines changed

ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp

Lines changed: 167 additions & 96 deletions
Large diffs are not rendered by default.

ydb/library/yql/dq/proto/dq_tasks.proto

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,8 @@ message TDqInputTransformLookupSettings {
184184
repeated string RightJoinKeyNames = 6; //Join column names in the right hand, in the same order as previous
185185
bytes NarrowInputRowType = 7; //Serialized struct type
186186
bytes NarrowOutputRowType = 8; //Serialized struct type
187-
//TODO add lookup cache parameters
187+
uint64 CacheLimit = 9;
188+
uint64 CacheTtlSeconds = 10;
188189
}
189190

190191
message TDqTask {

ydb/library/yql/minikql/computation/mkql_key_payload_value_lru_cache.h

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ namespace NKikimr::NMiniKQL {
1919
// Never requests system time, expects monotonically increased time points in methods argument
2020
class TUnboxedKeyValueLruCacheWithTtl {
2121
struct TEntry {
22-
TEntry(NUdf::TUnboxedValue key, NUdf::TUnboxedValue value, std::chrono::time_point<std::chrono::steady_clock> expiration)
22+
TEntry(NUdf::TUnboxedValue key, NUdf::TUnboxedValue value, std::chrono::time_point<std::chrono::steady_clock> expiration)
2323
: Key(std::move(key))
2424
, Value(std::move(value))
2525
, Expiration(std::move(expiration))
@@ -73,16 +73,23 @@ class TUnboxedKeyValueLruCacheWithTtl {
7373
return std::nullopt;
7474
}
7575

76-
// Perform garbage collection.
76+
// Perform garbage collection, single step, O(1) time.
7777
// Must be called periodically
78-
void Tick(const std::chrono::time_point<std::chrono::steady_clock>& now) {
78+
bool Tick(const std::chrono::time_point<std::chrono::steady_clock>& now) {
7979
if (UsageList.empty()) {
80-
return;
80+
return false;
8181
}
8282
if (now < UsageList.front().Expiration) {
83-
return;
83+
return false;
8484
}
8585
RemoveLeastRecentlyUsedEntry();
86+
return true;
87+
}
88+
89+
// Perform garbage collection, O(1) amortized, but O(n) one-time
90+
void Prune(const std::chrono::time_point<std::chrono::steady_clock>& now) {
91+
while (Tick(now)) {
92+
}
8693
}
8794

8895
size_t Size() const {

ydb/library/yql/providers/dq/planner/execution_planner.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -604,6 +604,8 @@ namespace NYql::NDqs {
604604
const auto narrowOutputRowType = GetSeqItemType(streamLookup.Ptr()->GetTypeAnn());
605605
Y_ABORT_UNLESS(narrowOutputRowType->GetKind() == ETypeAnnotationKind::Struct);
606606
settings.SetNarrowOutputRowType(NYql::NCommon::GetSerializedTypeAnnotation(narrowOutputRowType));
607+
settings.SetCacheLimit(1'000'000); //TODO configure me
608+
settings.SetCacheTtlSeconds(60); //TODO configure me
607609

608610
const auto inputRowType = GetSeqItemType(streamLookup.Output().Stage().Program().Ref().GetTypeAnn());
609611
const auto outputRowType = GetSeqItemType(stage.Program().Args().Arg(inputIndex).Ref().GetTypeAnn());

ydb/tests/fq/generic/test_streaming_join.py

Lines changed: 86 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,15 @@
1515
DEBUG = 0
1616

1717

18-
def ResequenceId(messages):
18+
def ResequenceId(messages, field="id"):
1919
res = []
2020
i = 1
2121
for pair in messages:
2222
rpair = []
2323
for it in pair:
2424
src = json.loads(it)
25-
src["id"] = i
25+
if field in src:
26+
src[field] = i
2627
rpair += [json.dumps(src)]
2728
res += [tuple(rpair)]
2829
i += 1
@@ -310,6 +311,88 @@ def freeze(json):
310311
),
311312
],
312313
),
314+
# 6
315+
(
316+
R'''
317+
$input = SELECT * FROM myyds.`{input_topic}`
318+
WITH (
319+
FORMAT=json_each_row,
320+
SCHEMA (
321+
za Int32,
322+
yb STRING,
323+
yc Int32,
324+
zd Int32,
325+
)
326+
) ;
327+
328+
$enriched = select a, b, c, d, e, f, za, yb, yc, zd
329+
from
330+
$input as e
331+
left join {streamlookup} ydb_conn_{table_name}.db as u
332+
on(e.yb = u.b AND e.za = u.a )
333+
;
334+
335+
insert into myyds.`{output_topic}`
336+
select Unwrap(Yson::SerializeJson(Yson::From(TableRow()))) from $enriched;
337+
''',
338+
ResequenceId(
339+
[
340+
(
341+
'{"id":1,"za":1,"yb":"2","yc":100,"zd":101}',
342+
'{"a":1,"b":"2","c":3,"d":4,"e":5,"f":6,"za":1,"yb":"2","yc":100,"zd":101}',
343+
),
344+
(
345+
'{"id":2,"za":7,"yb":"8","yc":106,"zd":107}',
346+
'{"a":7,"b":"8","c":9,"d":10,"e":11,"f":12,"za":7,"yb":"8","yc":106,"zd":107}',
347+
),
348+
(
349+
'{"id":3,"za":2,"yb":"1","yc":114,"zd":115}',
350+
'{"a":null,"b":null,"c":null,"d":null,"e":null,"f":null,"za":2,"yb":"1","yc":114,"zd":115}',
351+
),
352+
]
353+
),
354+
),
355+
# 7
356+
(
357+
R'''
358+
$input = SELECT * FROM myyds.`{input_topic}`
359+
WITH (
360+
FORMAT=json_each_row,
361+
SCHEMA (
362+
za Int32,
363+
yb STRING,
364+
yc Int32,
365+
zd Int32,
366+
)
367+
) ;
368+
369+
$enriched = select a, b, c, d, e, f, za, yb, yc, zd
370+
from
371+
$input as e
372+
left join {streamlookup} ydb_conn_{table_name}.db as u
373+
on(e.za = u.a AND e.yb = u.b)
374+
;
375+
376+
insert into myyds.`{output_topic}`
377+
select Unwrap(Yson::SerializeJson(Yson::From(TableRow()))) from $enriched;
378+
''',
379+
ResequenceId(
380+
[
381+
(
382+
'{"id":1,"za":1,"yb":"2","yc":100,"zd":101}',
383+
'{"a":1,"b":"2","c":3,"d":4,"e":5,"f":6,"za":1,"yb":"2","yc":100,"zd":101}',
384+
),
385+
(
386+
'{"id":2,"za":7,"yb":"8","yc":106,"zd":107}',
387+
'{"a":7,"b":"8","c":9,"d":10,"e":11,"f":12,"za":7,"yb":"8","yc":106,"zd":107}',
388+
),
389+
(
390+
'{"id":3,"za":2,"yb":"1","yc":114,"zd":115}',
391+
'{"a":null,"b":null,"c":null,"d":null,"e":null,"f":null,"za":2,"yb":"1","yc":114,"zd":115}',
392+
),
393+
]
394+
),
395+
),
313396
]
314397

315398

@@ -367,7 +450,7 @@ def test_simple(self, kikimr, fq_client: FederatedQueryClient, settings: Setting
367450
@yq_v1
368451
@pytest.mark.parametrize("mvp_external_ydb_endpoint", [{"endpoint": "tests-fq-generic-ydb:2136"}], indirect=True)
369452
@pytest.mark.parametrize("fq_client", [{"folder_id": "my_folder_slj"}], indirect=True)
370-
@pytest.mark.parametrize("partitions_count", [1, 3])
453+
@pytest.mark.parametrize("partitions_count", [1, 3] if DEBUG else [3])
371454
@pytest.mark.parametrize("streamlookup", [False, True] if DEBUG else [True])
372455
@pytest.mark.parametrize("testcase", [*range(len(TESTCASES))])
373456
def test_streamlookup(

ydb/tests/fq/generic/ya.make

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,4 +71,6 @@ TEST_SRCS(
7171
test_ydb.py
7272
)
7373

74+
TIMEOUT(1800)
75+
7476
END()

ydb/tests/fq/generic/ydb/01_basic.sh

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,12 @@ set -ex
3535
(56, 12, "2a02:1812:1713:4f00:517e:1d79:c88b:704", "Elena", 2),
3636
(18, 17, "ivalid ip", "newUser", 12);
3737
COMMIT;
38+
CREATE TABLE db (b STRING NOT NULL, c Int32, a Int32 NOT NULL, d Int32, f Int32, e Int32, PRIMARY KEY(b, a));
39+
COMMIT;
40+
INSERT INTO db (a, b, c, d, e, f) VALUES
41+
(1, "2", 3, 4, 5, 6),
42+
(7, "8", 9, 10, 11, 12);
43+
COMMIT;
3844
'
3945

4046
retVal=$?

0 commit comments

Comments
 (0)