Skip to content

Commit 5cc4e57

Browse files
author
Vadim Averin
committed
Generalize DoGenGetValues of wide Filter() with limit
1 parent 9bbc1e0 commit 5cc4e57

File tree

1 file changed

+21
-77
lines changed

1 file changed

+21
-77
lines changed

ydb/library/yql/minikql/comp_nodes/mkql_wide_filter.cpp

Lines changed: 21 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#include "mkql_wide_filter.h"
22
#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
33
#include <ydb/library/yql/minikql/computation/mkql_computation_node_codegen.h> // Y_IGNORE
4+
#include <ydb/library/yql/minikql/computation/mkql_computation_node_codegen_impl.h>
45
#include <ydb/library/yql/minikql/mkql_node_cast.h>
56
#include <ydb/library/yql/utils/cast.h>
67

@@ -134,8 +135,8 @@ using TBaseComputation = TStatelessWideFlowCodegeneratorNode<TWideFilterWrapper>
134135
}
135136
};
136137

137-
class TWideFilterWithLimitWrapper : public TStatefulWideFlowCodegeneratorNode<TWideFilterWithLimitWrapper>, public TBaseWideFilterWrapper {
138-
using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TWideFilterWithLimitWrapper>;
138+
class TWideFilterWithLimitWrapper : public TSimpleStatefulWideFlowCodegeneratorNode<TWideFilterWithLimitWrapper, ui64>, public TBaseWideFilterWrapper {
139+
using TBaseComputation = TSimpleStatefulWideFlowCodegeneratorNode<TWideFilterWithLimitWrapper, ui64>;
139140
public:
140141
TWideFilterWithLimitWrapper(TComputationMutables& mutables, IComputationWideFlowNode* flow, IComputationNode* limit,
141142
TComputationExternalNodePtrVector&& items, IComputationNode* predicate)
@@ -144,87 +145,30 @@ using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TWideFilterWithLimit
144145
, Limit(limit)
145146
{}
146147

147-
EFetchResult DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const {
148-
if (state.IsInvalid()) {
149-
state = Limit->GetValue(ctx);
150-
} else if (!state.Get<ui64>()) {
151-
return EFetchResult::Finish;
152-
}
153-
154-
auto **fields = GetFields(ctx);
155-
while (true) {
156-
PrepareArguments(ctx, output);
157-
158-
if (const auto result = Flow->FetchValues(ctx, fields); EFetchResult::One != result)
159-
return result;
148+
void InitState(ui64& limit, TComputationContext& ctx) const {
149+
limit = Limit->GetValue(ctx).Get<ui64>();
150+
}
160151

152+
EProcessResult DoProcess(ui64& limit, TComputationContext& ctx, EFetchResult fetchRes, NUdf::TUnboxedValue*const* values) const {
153+
if (limit == 0) {
154+
return EProcessResult::Finish;
155+
}
156+
if (fetchRes == EFetchResult::One) {
157+
auto **fields = GetFields(ctx);
158+
PrepareArguments(ctx, values);
159+
for (size_t idx = 0; idx < Items.size(); idx++) {
160+
*fields[idx] = *values[idx];
161+
}
161162
if (Predicate->GetValue(ctx).Get<bool>()) {
162-
FillOutputs(ctx, output);
163-
164-
auto todo = state.Get<ui64>();
165-
state = NUdf::TUnboxedValuePod(--todo);
166-
return EFetchResult::One;
163+
FillOutputs(ctx, values);
164+
limit--;
165+
return EProcessResult::One;
167166
}
167+
return EProcessResult::Fetch;
168168
}
169+
return static_cast<EProcessResult>(fetchRes);
169170
}
170-
#ifndef MKQL_DISABLE_CODEGEN
171-
TGenerateResult DoGenGetValues(const TCodegenContext& ctx, Value* statePtr, BasicBlock*& block) const {
172-
auto& context = ctx.Codegen.GetContext();
173-
174-
const auto init = BasicBlock::Create(context, "init", ctx.Func);
175-
const auto test = BasicBlock::Create(context, "test", ctx.Func);
176-
const auto loop = BasicBlock::Create(context, "loop", ctx.Func);
177-
const auto work = BasicBlock::Create(context, "work", ctx.Func);
178-
const auto pass = BasicBlock::Create(context, "pass", ctx.Func);
179-
const auto exit = BasicBlock::Create(context, "exit", ctx.Func);
180-
181-
const auto valueType = Type::getInt128Ty(context);
182-
const auto resultType = Type::getInt32Ty(context);
183-
const auto result = PHINode::Create(resultType, 3U, "result", exit);
184-
185-
BranchInst::Create(test, init, IsValid(statePtr, block), block);
186-
187-
block = init;
188-
189-
GetNodeValue(statePtr, Limit, ctx, block);
190-
BranchInst::Create(test, block);
191171

192-
block = test;
193-
194-
const auto state = new LoadInst(valueType, statePtr, "state", block);
195-
const auto done = CmpInst::Create(Instruction::ICmp, ICmpInst::ICMP_EQ, state, GetFalse(context), "done", block);
196-
result->addIncoming(ConstantInt::get(resultType, -1), block);
197-
198-
BranchInst::Create(exit, loop, done, block);
199-
200-
block = loop;
201-
202-
auto status = GetNodeValues(Flow, ctx, block);
203-
const auto good = CmpInst::Create(Instruction::ICmp, ICmpInst::ICMP_SGT, status.first, ConstantInt::get(status.first->getType(), 0), "good", block);
204-
205-
result->addIncoming(status.first, block);
206-
207-
BranchInst::Create(work, exit, good, block);
208-
209-
block = work;
210-
211-
const auto predicate = GenGetPredicate(ctx, status.second, block);
212-
213-
BranchInst::Create(pass, loop, predicate, block);
214-
215-
block = pass;
216-
217-
const auto decr = BinaryOperator::CreateSub(state, ConstantInt::get(state->getType(), 1ULL), "decr", block);
218-
new StoreInst(decr, statePtr, block);
219-
220-
result->addIncoming(status.first, block);
221-
222-
BranchInst::Create(exit, block);
223-
224-
block = exit;
225-
return {result, std::move(status.second)};
226-
}
227-
#endif
228172
private:
229173
void RegisterDependencies() const final {
230174
if (const auto flow = FlowDependsOn(Flow)) {

0 commit comments

Comments
 (0)