Skip to content

Commit f33f3fe

Browse files
authored
streamlookup: pass and use MaxDelayedRows parameter (#10489)
1 parent 9f8e3e6 commit f33f3fe

File tree

4 files changed

+18
-3
lines changed

4 files changed

+18
-3
lines changed

ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ class TInputTransformStreamLookupBase
4242
const NMiniKQL::TStructType* lookupPayloadType,
4343
const NMiniKQL::TMultiType* outputRowType,
4444
TOutputRowColumnOrder&& outputRowColumnOrder,
45+
size_t maxDelayedRows,
4546
size_t cacheLimit,
4647
std::chrono::seconds cacheTtl
4748
)
@@ -63,6 +64,7 @@ class TInputTransformStreamLookupBase
6364
, OutputRowColumnOrder(std::move(outputRowColumnOrder))
6465
, InputFlowFetchStatus(NUdf::EFetchStatus::Yield)
6566
, LruCache(std::make_unique<NKikimr::NMiniKQL::TUnboxedKeyValueLruCacheWithTtl>(cacheLimit, lookupKeyType))
67+
, MaxDelayedRows(maxDelayedRows)
6668
, CacheTtl(cacheTtl)
6769
, ReadyQueue(OutputRowType)
6870
{
@@ -142,8 +144,9 @@ class TInputTransformStreamLookupBase
142144
}
143145

144146
void Handle(IDqAsyncLookupSource::TEvLookupResult::TPtr ev) {
145-
if (!KeysForLookup)
147+
if (!KeysForLookup) {
146148
return;
149+
}
147150
auto guard = BindAllocator();
148151
const auto now = std::chrono::steady_clock::now();
149152
auto lookupResult = ev->Get()->Result.lock();
@@ -207,7 +210,10 @@ class TInputTransformStreamLookupBase
207210
NUdf::TUnboxedValue inputRow = HolderFactory.CreateDirectArrayHolder(InputRowType->GetElementsCount(), inputRowItems);
208211
const auto now = std::chrono::steady_clock::now();
209212
LruCache->Prune(now);
213+
size_t rowLimit = std::numeric_limits<size_t>::max();
214+
size_t row = 0;
210215
while (
216+
row < rowLimit &&
211217
(KeysForLookup->size() < MaxKeysInRequest) &&
212218
((InputFlowFetchStatus = FetchWideInputValue(inputRowItems)) == NUdf::EFetchStatus::Ok)) {
213219
NUdf::TUnboxedValue* keyItems;
@@ -223,9 +229,14 @@ class TInputTransformStreamLookupBase
223229
if (auto lookupPayload = LruCache->Get(key, now)) {
224230
AddReadyQueue(key, other, &*lookupPayload);
225231
} else {
232+
if (AwaitingQueue.empty()) {
233+
// look ahead at most MaxDelayedRows after first missing
234+
rowLimit = row + MaxDelayedRows;
235+
}
226236
AwaitingQueue.emplace_back(key, std::move(other));
227237
KeysForLookup->emplace(std::move(key), NUdf::TUnboxedValue{});
228238
}
239+
++row;
229240
}
230241
if (!KeysForLookup->empty()) {
231242
Send(LookupSourceId, new IDqAsyncLookupSource::TEvLookupRequest(KeysForLookup));
@@ -283,6 +294,7 @@ class TInputTransformStreamLookupBase
283294

284295
NUdf::EFetchStatus InputFlowFetchStatus;
285296
std::unique_ptr<NKikimr::NMiniKQL::TUnboxedKeyValueLruCacheWithTtl> LruCache;
297+
size_t MaxDelayedRows;
286298
std::chrono::seconds CacheTtl;
287299
using TInputKeyOtherPair = std::pair<NUdf::TUnboxedValue, NUdf::TUnboxedValue>;
288300
using TAwaitingQueue = std::deque<TInputKeyOtherPair, NKikimr::NMiniKQL::TMKQLAllocator<TInputKeyOtherPair>>; //input row split in two parts: key columns and other columns
@@ -545,6 +557,7 @@ std::pair<IDqComputeActorAsyncInput*, NActors::IActor*> CreateInputTransformStre
545557
lookupPayloadType,
546558
outputRowType,
547559
std::move(outputColumnsOrder),
560+
settings.GetMaxDelayedRows(),
548561
settings.GetCacheLimit(),
549562
std::chrono::seconds(settings.GetCacheTtlSeconds())
550563
) :
@@ -564,6 +577,7 @@ std::pair<IDqComputeActorAsyncInput*, NActors::IActor*> CreateInputTransformStre
564577
lookupPayloadType,
565578
outputRowType,
566579
std::move(outputColumnsOrder),
580+
settings.GetMaxDelayedRows(),
567581
settings.GetCacheLimit(),
568582
std::chrono::seconds(settings.GetCacheTtlSeconds())
569583
);

ydb/library/yql/dq/expr_nodes/dq_expr_nodes.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@
193193
{"Index": 6, "Name": "LeftJoinKeyNames", "Type": "TCoAtomList"},
194194
{"Index": 7, "Name": "RightJoinKeyNames", "Type": "TCoAtomList"},
195195
{"Index": 8, "Name": "TTL", "Type": "TCoAtom"},
196-
{"Index": 9, "Name": "MaxDelay", "Type": "TCoAtom"},
196+
{"Index": 9, "Name": "MaxDelayedRows", "Type": "TCoAtom"},
197197
{"Index": 10, "Name": "MaxCachedRows", "Type": "TCoAtom"}
198198
]
199199
},

ydb/library/yql/dq/proto/dq_tasks.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,7 @@ message TDqInputTransformLookupSettings {
186186
bytes NarrowOutputRowType = 8; //Serialized struct type
187187
uint64 CacheLimit = 9;
188188
uint64 CacheTtlSeconds = 10;
189+
uint64 MaxDelayedRows = 11;
189190
}
190191

191192
message TDqTask {

ydb/library/yql/providers/dq/opt/physical_optimize.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,7 @@ class TDqsPhysicalOptProposalTransformer : public TOptimizeTransformerBase {
263263
.RightJoinKeyNames(join.RightJoinKeyNames())
264264
.TTL(ctx.NewAtom(pos, 300)) //TODO configure me
265265
.MaxCachedRows(ctx.NewAtom(pos, 1'000'000)) //TODO configure me
266-
.MaxDelay(ctx.NewAtom(pos, 1'000'000)) //Configure me
266+
.MaxDelayedRows(ctx.NewAtom(pos, 1'000'000)) //Configure me
267267
.Done();
268268

269269
auto lambda = Build<TCoLambda>(ctx, pos)

0 commit comments

Comments
 (0)