Skip to content

Commit 0ea658a

Browse files
authored
YQ-3784 support computation graph invalidation in purecalc (#10799)
1 parent 081bba7 commit 0ea658a

File tree

4 files changed

+155
-2
lines changed

4 files changed

+155
-2
lines changed

ydb/library/yql/public/purecalc/common/worker.cpp

+14-2
Original file line numberDiff line numberDiff line change
@@ -526,6 +526,18 @@ void TPushStreamWorker::FeedToConsumer() {
526526
}
527527
}
528528

529+
NYql::NUdf::IBoxedValue* TPushStreamWorker::GetPushStream() const {
530+
auto& ctx = Graph_.ComputationGraph_->GetContext();
531+
NUdf::TUnboxedValue pushStream = SelfNode_->GetValue(ctx);
532+
533+
if (Y_UNLIKELY(pushStream.IsInvalid())) {
534+
SelfNode_->SetValue(ctx, Graph_.ComputationGraph_->GetHolderFactory().Create<TPushStream>());
535+
pushStream = SelfNode_->GetValue(ctx);
536+
}
537+
538+
return pushStream.AsBoxed().Get();
539+
}
540+
529541
void TPushStreamWorker::SetConsumer(THolder<IConsumer<const NKikimr::NUdf::TUnboxedValue*>> consumer) {
530542
auto guard = Guard(GetScopedAlloc());
531543
const auto inputsCount = Graph_.SelfNodes_.size();
@@ -553,7 +565,7 @@ void TPushStreamWorker::Push(NKikimr::NUdf::TUnboxedValue&& value) {
553565
YQL_ENSURE(!Finished_, "OnFinish has already been sent to the consumer; no new values can be pushed");
554566

555567
if (Y_LIKELY(SelfNode_)) {
556-
static_cast<TPushStream*>(SelfNode_->GetValue(Graph_.ComputationGraph_->GetContext()).AsBoxed().Get())->SetValue(std::move(value));
568+
static_cast<TPushStream*>(GetPushStream())->SetValue(std::move(value));
557569
}
558570

559571
FeedToConsumer();
@@ -564,7 +576,7 @@ void TPushStreamWorker::OnFinish() {
564576
YQL_ENSURE(!Finished_, "already finished");
565577

566578
if (Y_LIKELY(SelfNode_)) {
567-
static_cast<TPushStream*>(SelfNode_->GetValue(Graph_.ComputationGraph_->GetContext()).AsBoxed().Get())->SetFinished();
579+
static_cast<TPushStream*>(GetPushStream())->SetFinished();
568580
}
569581

570582
FeedToConsumer();

ydb/library/yql/public/purecalc/common/worker.h

+1
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,7 @@ namespace NYql {
164164

165165
private:
166166
void FeedToConsumer();
167+
NYql::NUdf::IBoxedValue* GetPushStream() const;
167168

168169
public:
169170
void SetConsumer(THolder<IConsumer<const NKikimr::NUdf::TUnboxedValue*>>) override;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
#include <library/cpp/testing/unittest/registar.h>
2+
3+
#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
4+
#include <ydb/library/yql/minikql/mkql_string_util.h>
5+
6+
#include <ydb/library/yql/public/purecalc/io_specs/protobuf/spec.h>
7+
#include <ydb/library/yql/public/purecalc/ut/protos/test_structs.pb.h>
8+
9+
using namespace NYql::NPureCalc;
10+
11+
namespace {
12+
class TStatelessInputSpec : public TInputSpecBase {
13+
public:
14+
TStatelessInputSpec()
15+
: Schemas_({NYT::TNode::CreateList()
16+
.Add("StructType")
17+
.Add(NYT::TNode::CreateList()
18+
.Add(NYT::TNode::CreateList()
19+
.Add("InputValue")
20+
.Add(NYT::TNode::CreateList()
21+
.Add("DataType")
22+
.Add("Utf8")
23+
)
24+
)
25+
)
26+
})
27+
{};
28+
29+
const TVector<NYT::TNode>& GetSchemas() const override {
30+
return Schemas_;
31+
}
32+
33+
private:
34+
const TVector<NYT::TNode> Schemas_;
35+
};
36+
37+
class TStatelessInputConsumer : public IConsumer<const NYql::NUdf::TUnboxedValue&> {
38+
public:
39+
TStatelessInputConsumer(TWorkerHolder<IPushStreamWorker> worker)
40+
: Worker_(std::move(worker))
41+
{}
42+
43+
void OnObject(const NYql::NUdf::TUnboxedValue& value) override {
44+
with_lock (Worker_->GetScopedAlloc()) {
45+
NYql::NUdf::TUnboxedValue* items = nullptr;
46+
NYql::NUdf::TUnboxedValue result = Worker_->GetGraph().GetHolderFactory().CreateDirectArrayHolder(1, items);
47+
48+
items[0] = value;
49+
50+
Worker_->Push(std::move(result));
51+
52+
// Clear graph after each object because
53+
// values allocated on another allocator and should be released
54+
Worker_->GetGraph().Invalidate();
55+
}
56+
}
57+
58+
void OnFinish() override {
59+
with_lock(Worker_->GetScopedAlloc()) {
60+
Worker_->OnFinish();
61+
}
62+
}
63+
64+
private:
65+
TWorkerHolder<IPushStreamWorker> Worker_;
66+
};
67+
68+
class TStatelessConsumer : public IConsumer<NPureCalcProto::TStringMessage*> {
69+
const TString ExpectedData_;
70+
const ui64 ExpectedRows_;
71+
ui64 RowId_ = 0;
72+
73+
public:
74+
TStatelessConsumer(const TString& expectedData, ui64 expectedRows)
75+
: ExpectedData_(expectedData)
76+
, ExpectedRows_(expectedRows)
77+
{}
78+
79+
void OnObject(NPureCalcProto::TStringMessage* message) override {
80+
UNIT_ASSERT_VALUES_EQUAL_C(ExpectedData_, message->GetX(), RowId_);
81+
RowId_++;
82+
}
83+
84+
void OnFinish() override {
85+
UNIT_ASSERT_VALUES_EQUAL(ExpectedRows_, RowId_);
86+
}
87+
};
88+
}
89+
90+
template <>
91+
struct TInputSpecTraits<TStatelessInputSpec> {
92+
static constexpr bool IsPartial = false;
93+
static constexpr bool SupportPushStreamMode = true;
94+
95+
using TConsumerType = THolder<IConsumer<const NYql::NUdf::TUnboxedValue&>>;
96+
97+
static TConsumerType MakeConsumer(const TStatelessInputSpec&, TWorkerHolder<IPushStreamWorker> worker) {
98+
return MakeHolder<TStatelessInputConsumer>(std::move(worker));
99+
}
100+
};
101+
102+
Y_UNIT_TEST_SUITE(TestMixedAllocators) {
103+
Y_UNIT_TEST(TestPushStream) {
104+
const auto targetString = "large string >= 14 bytes";
105+
const auto factory = MakeProgramFactory();
106+
const auto sql = TStringBuilder() << "SELECT InputValue AS X FROM Input WHERE InputValue = \"" << targetString << "\";";
107+
108+
const auto program = factory->MakePushStreamProgram(
109+
TStatelessInputSpec(),
110+
TProtobufOutputSpec<NPureCalcProto::TStringMessage>(),
111+
sql
112+
);
113+
114+
const ui64 numberRows = 5;
115+
const auto inputConsumer = program->Apply(MakeHolder<TStatelessConsumer>(targetString, numberRows));
116+
NKikimr::NMiniKQL::TScopedAlloc alloc(__LOCATION__, NKikimr::TAlignedPagePoolCounters(), true, false);
117+
118+
const auto pushString = [&](TString inputValue) {
119+
NYql::NUdf::TUnboxedValue stringValue;
120+
with_lock(alloc) {
121+
stringValue = NKikimr::NMiniKQL::MakeString(inputValue);
122+
alloc.Ref().LockObject(stringValue);
123+
}
124+
125+
inputConsumer->OnObject(stringValue);
126+
127+
with_lock(alloc) {
128+
alloc.Ref().UnlockObject(stringValue);
129+
stringValue.Clear();
130+
}
131+
};
132+
133+
for (ui64 i = 0; i < numberRows; ++i) {
134+
pushString(targetString);
135+
pushString("another large string >= 14 bytes");
136+
}
137+
inputConsumer->OnFinish();
138+
}
139+
}

ydb/library/yql/public/purecalc/ut/ya.make

+1
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ SRCS(
1212
test_user_data.cpp
1313
test_eval.cpp
1414
test_pool.cpp
15+
test_mixed_allocators.cpp
1516
)
1617

1718
PEERDIR(

0 commit comments

Comments
 (0)