Skip to content

Commit cf3d81a

Browse files
authored
Merge ffae242 into af75d41
2 parents af75d41 + ffae242 commit cf3d81a

File tree

4 files changed

+222
-345
lines changed

4 files changed

+222
-345
lines changed

ydb/library/yql/minikql/comp_nodes/mkql_skip.cpp

Lines changed: 14 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#include "mkql_skip.h"
22
#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
33
#include <ydb/library/yql/minikql/computation/mkql_computation_node_codegen.h> // Y_IGNORE
4+
#include <ydb/library/yql/minikql/computation/mkql_computation_node_codegen_impl.h>
45
#include <ydb/library/yql/minikql/mkql_node_cast.h>
56

67
namespace NKikimr {
@@ -117,8 +118,8 @@ using TBaseComputation = TStatefulFlowCodegeneratorNode<TSkipFlowWrapper>;
117118
IComputationNode* const Count;
118119
};
119120

120-
class TWideSkipWrapper : public TStatefulWideFlowCodegeneratorNode<TWideSkipWrapper> {
121-
using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TWideSkipWrapper>;
121+
class TWideSkipWrapper : public TSimpleStatefulWideFlowCodegeneratorNode<TWideSkipWrapper, ui64> {
122+
using TBaseComputation = TSimpleStatefulWideFlowCodegeneratorNode<TWideSkipWrapper, ui64>;
122123
public:
123124
TWideSkipWrapper(TComputationMutables& mutables, IComputationWideFlowNode* flow, IComputationNode* count, ui32 size)
124125
: TBaseComputation(mutables, flow, EValueRepresentation::Embedded)
@@ -127,95 +128,22 @@ using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TWideSkipWrapper>;
127128
, StubsIndex(mutables.IncrementWideFieldsIndex(size))
128129
{}
129130

130-
EFetchResult DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const {
131-
if (state.IsInvalid()) {
132-
state = Count->GetValue(ctx);
133-
}
131+
void InitState(ui64& count, TComputationContext& ctx) const {
132+
count = Count->GetValue(ctx).Get<ui64>();
133+
}
134134

135-
if (auto count = state.Get<ui64>()) {
136-
do if (const auto result = Flow->FetchValues(ctx, ctx.WideFields.data() + StubsIndex); EFetchResult::One != result) {
137-
state = NUdf::TUnboxedValuePod(count);
138-
return result;
139-
} while (--count);
135+
NUdf::TUnboxedValue*const* PrepareInput(ui64& skipCount, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const {
136+
return skipCount == 0 ? output : ctx.WideFields.data() + StubsIndex;
137+
}
140138

141-
state = NUdf::TUnboxedValuePod::Zero();
139+
EProcessResult DoProcess(ui64& skipCount, TComputationContext&, EFetchResult fetchRes, NUdf::TUnboxedValue*const*) const {
140+
if (fetchRes == EFetchResult::One && skipCount) {
141+
skipCount--;
142+
return EProcessResult::Fetch;
142143
}
143-
144-
return Flow->FetchValues(ctx, output);
144+
return static_cast<EProcessResult>(fetchRes);
145145
}
146146

147-
#ifndef MKQL_DISABLE_CODEGEN
148-
TGenerateResult DoGenGetValues(const TCodegenContext& ctx, Value* statePtr, BasicBlock*& block) const {
149-
auto& context = ctx.Codegen.GetContext();
150-
151-
const auto valueType = Type::getInt128Ty(context);
152-
153-
const auto init = BasicBlock::Create(context, "init", ctx.Func);
154-
const auto main = BasicBlock::Create(context, "main", ctx.Func);
155-
156-
const auto load = new LoadInst(valueType, statePtr, "load", block);
157-
const auto state = PHINode::Create(valueType, 2U, "state", main);
158-
state->addIncoming(load, block);
159-
BranchInst::Create(init, main, IsInvalid(load, block), block);
160-
161-
block = init;
162-
163-
GetNodeValue(statePtr, Count, ctx, block);
164-
const auto save = new LoadInst(valueType, statePtr, "save", block);
165-
state->addIncoming(save, block);
166-
BranchInst::Create(main, block);
167-
168-
block = main;
169-
170-
const auto work = BasicBlock::Create(context, "work", ctx.Func);
171-
const auto good = BasicBlock::Create(context, "good", ctx.Func);
172-
const auto pass = BasicBlock::Create(context, "pass", ctx.Func);
173-
const auto exit = BasicBlock::Create(context, "exit", ctx.Func);
174-
const auto skip = BasicBlock::Create(context, "skip", ctx.Func);
175-
const auto done = BasicBlock::Create(context, "done", ctx.Func);
176-
177-
const auto resultType = Type::getInt32Ty(context);
178-
const auto result = PHINode::Create(resultType, 2U, "result", done);
179-
180-
const auto trunc = GetterFor<ui64>(state, context, block);
181-
182-
const auto count = PHINode::Create(trunc->getType(), 2U, "count", work);
183-
count->addIncoming(trunc, block);
184-
185-
const auto plus = CmpInst::Create(Instruction::ICmp, ICmpInst::ICMP_UGT, trunc, ConstantInt::get(trunc->getType(), 0ULL), "plus", block);
186-
187-
BranchInst::Create(work, skip, plus, block);
188-
189-
block = work;
190-
const auto status = GetNodeValues(Flow, ctx, block).first;
191-
const auto special = CmpInst::Create(Instruction::ICmp, ICmpInst::ICMP_SLE, status, ConstantInt::get(status->getType(), 0), "special", block);
192-
BranchInst::Create(pass, good, special, block);
193-
194-
block = pass;
195-
new StoreInst(SetterFor<ui64>(count, context, block), statePtr, block);
196-
result->addIncoming(status, block);
197-
BranchInst::Create(done, block);
198-
199-
block = good;
200-
201-
const auto decr = BinaryOperator::CreateSub(count, ConstantInt::get(count->getType(), 1ULL), "decr", block);
202-
const auto next = CmpInst::Create(Instruction::ICmp, ICmpInst::ICMP_UGT, decr, ConstantInt::get(decr->getType(), 0ULL), "next", block);
203-
count->addIncoming(decr, block);
204-
BranchInst::Create(work, exit, next, block);
205-
206-
block = exit;
207-
new StoreInst(SetterFor<ui64>(decr, context, block), statePtr, block);
208-
BranchInst::Create(skip, block);
209-
210-
block = skip;
211-
auto getres = GetNodeValues(Flow, ctx, block);
212-
result->addIncoming(getres.first, block);
213-
BranchInst::Create(done, block);
214-
215-
block = done;
216-
return {result, std::move(getres.second)};
217-
}
218-
#endif
219147
private:
220148
void RegisterDependencies() const final {
221149
if (const auto flow = FlowDependsOn(Flow))

ydb/library/yql/minikql/comp_nodes/mkql_take.cpp

Lines changed: 16 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#include "mkql_take.h"
22
#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
33
#include <ydb/library/yql/minikql/computation/mkql_computation_node_codegen.h> // Y_IGNORE
4+
#include <ydb/library/yql/minikql/computation/mkql_computation_node_codegen_impl.h>
45
#include <ydb/library/yql/minikql/mkql_node_cast.h>
56

67
namespace NKikimr {
@@ -95,84 +96,30 @@ using TBaseComputation = TStatefulFlowCodegeneratorNode<TTakeFlowWrapper>;
9596
IComputationNode* const Count;
9697
};
9798

98-
class TWideTakeWrapper : public TStatefulWideFlowCodegeneratorNode<TWideTakeWrapper> {
99-
using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TWideTakeWrapper>;
99+
class TWideTakeWrapper : public TSimpleStatefulWideFlowCodegeneratorNode<TWideTakeWrapper, ui64> {
100+
using TBaseComputation = TSimpleStatefulWideFlowCodegeneratorNode<TWideTakeWrapper, ui64>;
100101
public:
101102
TWideTakeWrapper(TComputationMutables& mutables, IComputationWideFlowNode* flow, IComputationNode* count)
102103
: TBaseComputation(mutables, flow, EValueRepresentation::Embedded), Flow(flow), Count(count)
103104
{}
104105

105-
EFetchResult DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const {
106-
if (state.IsInvalid()) {
107-
state = Count->GetValue(ctx);
108-
}
109-
110-
if (auto count = state.Get<ui64>()) {
111-
if (const auto result = Flow->FetchValues(ctx, output); EFetchResult::One == result) {
112-
state = NUdf::TUnboxedValuePod(--count);
113-
return EFetchResult::One;
114-
} else {
115-
return result;
116-
}
117-
}
118-
119-
return EFetchResult::Finish;
106+
void InitState(ui64 &count, TComputationContext& ctx) const {
107+
count = Count->GetValue(ctx).Get<ui64>();
120108
}
121-
#ifndef MKQL_DISABLE_CODEGEN
122-
TGenerateResult DoGenGetValues(const TCodegenContext& ctx, Value* statePtr, BasicBlock*& block) const {
123-
auto& context = ctx.Codegen.GetContext();
124-
125-
const auto valueType = Type::getInt128Ty(context);
126-
127-
const auto init = BasicBlock::Create(context, "init", ctx.Func);
128-
const auto main = BasicBlock::Create(context, "main", ctx.Func);
129-
130-
const auto load = new LoadInst(valueType, statePtr, "load", block);
131-
const auto state = PHINode::Create(load->getType(), 2U, "state", main);
132-
state->addIncoming(load, block);
133-
134-
BranchInst::Create(init, main, IsInvalid(load, block), block);
135-
136-
block = init;
137-
138-
GetNodeValue(statePtr, Count, ctx, block);
139-
const auto save = new LoadInst(valueType, statePtr, "save", block);
140-
state->addIncoming(save, block);
141-
BranchInst::Create(main, block);
142-
143-
block = main;
144-
145-
const auto work = BasicBlock::Create(context, "work", ctx.Func);
146-
const auto good = BasicBlock::Create(context, "good", ctx.Func);
147-
const auto done = BasicBlock::Create(context, "done", ctx.Func);
148-
149-
const auto resultType = Type::getInt32Ty(context);
150-
const auto result = PHINode::Create(resultType, 3U, "result", done);
151-
result->addIncoming(ConstantInt::get(resultType, static_cast<i32>(EFetchResult::Finish)), block);
152-
153-
const auto trunc = GetterFor<ui64>(state, context, block);
154109

155-
const auto plus = CmpInst::Create(Instruction::ICmp, ICmpInst::ICMP_UGT, trunc, ConstantInt::get(trunc->getType(), 0ULL), "plus", block);
156-
157-
BranchInst::Create(work, done, plus, block);
158-
159-
block = work;
160-
const auto getres = GetNodeValues(Flow, ctx, block);
161-
const auto special = CmpInst::Create(Instruction::ICmp, ICmpInst::ICMP_SLE, getres.first, ConstantInt::get(getres.first->getType(), 0), "special", block);
162-
result->addIncoming(getres.first, block);
163-
BranchInst::Create(done, good, special, block);
164-
165-
block = good;
166-
167-
const auto decr = BinaryOperator::CreateSub(trunc, ConstantInt::get(trunc->getType(), 1ULL), "decr", block);
168-
new StoreInst(SetterFor<ui64>(decr, context, block), statePtr, block);
169-
result->addIncoming(getres.first, block);
170-
BranchInst::Create(done, block);
110+
NUdf::TUnboxedValue*const* PrepareInput(ui64& takeCount, TComputationContext&, NUdf::TUnboxedValue*const* output) const {
111+
return takeCount != 0 ? output : nullptr;
112+
}
171113

172-
block = done;
173-
return {result, std::move(getres.second)};
114+
EProcessResult DoProcess(ui64& takeCount, TComputationContext& , EFetchResult fetchRes, NUdf::TUnboxedValue*const*) const {
115+
if (takeCount == 0) {
116+
return EProcessResult::Finish;
117+
} else if (fetchRes == EFetchResult::One) {
118+
takeCount--;
119+
}
120+
return static_cast<EProcessResult>(fetchRes);
174121
}
175-
#endif
122+
176123
private:
177124
void RegisterDependencies() const final {
178125
if (const auto flow = FlowDependsOn(Flow))

0 commit comments

Comments
 (0)