Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion ydb/library/yql/dq/proto/dq_tasks.proto
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,8 @@ message TDqInputTransformLookupSettings {
repeated string RightJoinKeyNames = 6; //Join column names in the right hand, in the same order as previous
bytes NarrowInputRowType = 7; //Serialized struct type
bytes NarrowOutputRowType = 8; //Serialized struct type
//TODO add lookup cache parameters
uint64 CacheLimit = 9;
uint64 CacheTtlSeconds = 10;
}

message TDqTask {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ namespace NKikimr::NMiniKQL {
// Never requests system time, expects monotonically increased time points in methods argument
class TUnboxedKeyValueLruCacheWithTtl {
struct TEntry {
TEntry(NUdf::TUnboxedValue key, NUdf::TUnboxedValue value, std::chrono::time_point<std::chrono::steady_clock> expiration)
TEntry(NUdf::TUnboxedValue key, NUdf::TUnboxedValue value, std::chrono::time_point<std::chrono::steady_clock> expiration)
: Key(std::move(key))
, Value(std::move(value))
, Expiration(std::move(expiration))
Expand Down Expand Up @@ -73,16 +73,23 @@ class TUnboxedKeyValueLruCacheWithTtl {
return std::nullopt;
}

// Perform garbage collection.
// Perform garbage collection, single step, O(1) time.
// Must be called periodically
void Tick(const std::chrono::time_point<std::chrono::steady_clock>& now) {
bool Tick(const std::chrono::time_point<std::chrono::steady_clock>& now) {
if (UsageList.empty()) {
return;
return false;
}
if (now < UsageList.front().Expiration) {
return;
return false;
}
RemoveLeastRecentlyUsedEntry();
return true;
}

// Perform garbage collection, O(1) amortized, but O(n) one-time
void Prune(const std::chrono::time_point<std::chrono::steady_clock>& now) {
while (Tick(now)) {
}
}

size_t Size() const {
Expand Down
2 changes: 2 additions & 0 deletions ydb/library/yql/providers/dq/planner/execution_planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -604,6 +604,8 @@ namespace NYql::NDqs {
const auto narrowOutputRowType = GetSeqItemType(streamLookup.Ptr()->GetTypeAnn());
Y_ABORT_UNLESS(narrowOutputRowType->GetKind() == ETypeAnnotationKind::Struct);
settings.SetNarrowOutputRowType(NYql::NCommon::GetSerializedTypeAnnotation(narrowOutputRowType));
settings.SetCacheLimit(1'000'000); //TODO configure me
settings.SetCacheTtlSeconds(60); //TODO configure me

const auto inputRowType = GetSeqItemType(streamLookup.Output().Stage().Program().Ref().GetTypeAnn());
const auto outputRowType = GetSeqItemType(stage.Program().Args().Arg(inputIndex).Ref().GetTypeAnn());
Expand Down
89 changes: 86 additions & 3 deletions ydb/tests/fq/generic/test_streaming_join.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@
DEBUG = 0


def ResequenceId(messages):
def ResequenceId(messages, field="id"):
res = []
i = 1
for pair in messages:
rpair = []
for it in pair:
src = json.loads(it)
src["id"] = i
if field in src:
src[field] = i
rpair += [json.dumps(src)]
res += [tuple(rpair)]
i += 1
Expand Down Expand Up @@ -310,6 +311,88 @@ def freeze(json):
),
],
),
# 6
(
R'''
$input = SELECT * FROM myyds.`{input_topic}`
WITH (
FORMAT=json_each_row,
SCHEMA (
za Int32,
yb STRING,
yc Int32,
zd Int32,
)
) ;

$enriched = select a, b, c, d, e, f, za, yb, yc, zd
from
$input as e
left join {streamlookup} ydb_conn_{table_name}.db as u
on(e.yb = u.b AND e.za = u.a )
;

insert into myyds.`{output_topic}`
select Unwrap(Yson::SerializeJson(Yson::From(TableRow()))) from $enriched;
''',
ResequenceId(
[
(
'{"id":1,"za":1,"yb":"2","yc":100,"zd":101}',
'{"a":1,"b":"2","c":3,"d":4,"e":5,"f":6,"za":1,"yb":"2","yc":100,"zd":101}',
),
(
'{"id":2,"za":7,"yb":"8","yc":106,"zd":107}',
'{"a":7,"b":"8","c":9,"d":10,"e":11,"f":12,"za":7,"yb":"8","yc":106,"zd":107}',
),
(
'{"id":3,"za":2,"yb":"1","yc":114,"zd":115}',
'{"a":null,"b":null,"c":null,"d":null,"e":null,"f":null,"za":2,"yb":"1","yc":114,"zd":115}',
),
]
),
),
# 7
(
R'''
$input = SELECT * FROM myyds.`{input_topic}`
WITH (
FORMAT=json_each_row,
SCHEMA (
za Int32,
yb STRING,
yc Int32,
zd Int32,
)
) ;

$enriched = select a, b, c, d, e, f, za, yb, yc, zd
from
$input as e
left join {streamlookup} ydb_conn_{table_name}.db as u
on(e.za = u.a AND e.yb = u.b)
;

insert into myyds.`{output_topic}`
select Unwrap(Yson::SerializeJson(Yson::From(TableRow()))) from $enriched;
''',
ResequenceId(
[
(
'{"id":1,"za":1,"yb":"2","yc":100,"zd":101}',
'{"a":1,"b":"2","c":3,"d":4,"e":5,"f":6,"za":1,"yb":"2","yc":100,"zd":101}',
),
(
'{"id":2,"za":7,"yb":"8","yc":106,"zd":107}',
'{"a":7,"b":"8","c":9,"d":10,"e":11,"f":12,"za":7,"yb":"8","yc":106,"zd":107}',
),
(
'{"id":3,"za":2,"yb":"1","yc":114,"zd":115}',
'{"a":null,"b":null,"c":null,"d":null,"e":null,"f":null,"za":2,"yb":"1","yc":114,"zd":115}',
),
]
),
),
]


Expand Down Expand Up @@ -367,7 +450,7 @@ def test_simple(self, kikimr, fq_client: FederatedQueryClient, settings: Setting
@yq_v1
@pytest.mark.parametrize("mvp_external_ydb_endpoint", [{"endpoint": "tests-fq-generic-ydb:2136"}], indirect=True)
@pytest.mark.parametrize("fq_client", [{"folder_id": "my_folder_slj"}], indirect=True)
@pytest.mark.parametrize("partitions_count", [1, 3])
@pytest.mark.parametrize("partitions_count", [1, 3] if DEBUG else [3])
@pytest.mark.parametrize("streamlookup", [False, True] if DEBUG else [True])
@pytest.mark.parametrize("testcase", [*range(len(TESTCASES))])
def test_streamlookup(
Expand Down
2 changes: 2 additions & 0 deletions ydb/tests/fq/generic/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,6 @@ TEST_SRCS(
test_ydb.py
)

TIMEOUT(1800)

END()
6 changes: 6 additions & 0 deletions ydb/tests/fq/generic/ydb/01_basic.sh
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ set -ex
(56, 12, "2a02:1812:1713:4f00:517e:1d79:c88b:704", "Elena", 2),
(18, 17, "ivalid ip", "newUser", 12);
COMMIT;
CREATE TABLE db (b STRING NOT NULL, c Int32, a Int32 NOT NULL, d Int32, f Int32, e Int32, PRIMARY KEY(b, a));
COMMIT;
INSERT INTO db (a, b, c, d, e, f) VALUES
(1, "2", 3, 4, 5, 6),
(7, "8", 9, 10, 11, 12);
COMMIT;
'

retVal=$?
Expand Down