Skip to content

Commit 5a7c650

Browse files
authored
Merge 9502713 into 266a618
2 parents 266a618 + 9502713 commit 5a7c650

File tree

9 files changed

+350
-44
lines changed

9 files changed

+350
-44
lines changed

ydb/library/yql/dq/runtime/dq_async_output.cpp

Lines changed: 49 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
#include "dq_transport.h"
33

44
#include <yql/essentials/utils/yql_panic.h>
5+
#include "arrow/array.h"
56

67
#include <deque>
78
#include <variant>
@@ -11,7 +12,7 @@ namespace {
1112

1213
class TDqAsyncOutputBuffer : public IDqAsyncOutputBuffer {
1314
struct TValueDesc {
14-
std::variant<NUdf::TUnboxedValue, NDqProto::TWatermark, NDqProto::TCheckpoint> Value;
15+
std::variant<NUdf::TUnboxedValue, NDqProto::TWatermark, NDqProto::TCheckpoint, NKikimr::NMiniKQL::TUnboxedValueVector> Value;
1516
ui64 EstimatedSize;
1617

1718
TValueDesc(NUdf::TUnboxedValue&& value, ui64 size)
@@ -20,6 +21,12 @@ class TDqAsyncOutputBuffer : public IDqAsyncOutputBuffer {
2021
{
2122
}
2223

24+
TValueDesc(NKikimr::NMiniKQL::TUnboxedValueVector&& values, ui64 size)
25+
: Value(std::move(values))
26+
, EstimatedSize(size)
27+
{
28+
}
29+
2330
TValueDesc(NDqProto::TWatermark&& watermark, ui64 size)
2431
: Value(std::move(watermark))
2532
, EstimatedSize(size)
@@ -78,9 +85,18 @@ class TDqAsyncOutputBuffer : public IDqAsyncOutputBuffer {
7885
}
7986

8087
void WidePush(NUdf::TUnboxedValue* values, ui32 count) override {
81-
Y_UNUSED(values);
82-
Y_UNUSED(count);
83-
YQL_ENSURE(false, "Wide stream is not supported");
88+
if (ValuesPushed++ % 1000 == 0) {
89+
ReestimateRowBytes(values, count);
90+
}
91+
Y_ABORT_UNLESS(EstimatedRowBytes > 0);
92+
NKikimr::NMiniKQL::TUnboxedValueVector valuesVector;
93+
for (ui32 i = 0; i < count; ++i) {
94+
valuesVector.emplace_back(values[i]);
95+
}
96+
Values.emplace_back(std::move(valuesVector), EstimatedRowBytes);
97+
EstimatedStoredBytes += EstimatedRowBytes;
98+
99+
ReportChunkIn(1, EstimatedRowBytes);
84100
}
85101

86102
void Push(NDqProto::TWatermark&& watermark) override {
@@ -115,7 +131,7 @@ class TDqAsyncOutputBuffer : public IDqAsyncOutputBuffer {
115131

116132
// Calc values count.
117133
for (auto iter = Values.cbegin(), end = Values.cend();
118-
usedBytes < bytes && iter != end && std::holds_alternative<NUdf::TUnboxedValue>(iter->Value);
134+
usedBytes < bytes && iter != end && (std::holds_alternative<NUdf::TUnboxedValue>(iter->Value) || std::holds_alternative<NKikimr::NMiniKQL::TUnboxedValueVector>(iter->Value));
119135
++iter)
120136
{
121137
++valuesCount;
@@ -124,7 +140,16 @@ class TDqAsyncOutputBuffer : public IDqAsyncOutputBuffer {
124140

125141
// Reserve size and return data.
126142
while (valuesCount--) {
127-
batch.emplace_back(std::move(std::get<NUdf::TUnboxedValue>(Values.front().Value)));
143+
const auto& value = Values.front().Value;
144+
if (std::holds_alternative<NUdf::TUnboxedValue>(value)) {
145+
batch.emplace_back(std::move(std::get<NUdf::TUnboxedValue>(value)));
146+
} else if (std::holds_alternative<NKikimr::NMiniKQL::TUnboxedValueVector>(value)) {
147+
// Cerr << TString(TStringBuilder() << "-------------------------------- TDqAsyncOutputBuffer::Pop, pop multi value\n");
148+
auto multiValue = std::get<NKikimr::NMiniKQL::TUnboxedValueVector>(value);
149+
batch.PushRow(multiValue.data(), multiValue.size());
150+
} else {
151+
YQL_ENSURE(false, "Unsupported output value");
152+
}
128153
Values.pop_front();
129154
}
130155
Y_ABORT_UNLESS(EstimatedStoredBytes >= usedBytes);
@@ -184,6 +209,9 @@ class TDqAsyncOutputBuffer : public IDqAsyncOutputBuffer {
184209
if (std::holds_alternative<NUdf::TUnboxedValue>(v.Value)) {
185210
return false;
186211
}
212+
if (std::holds_alternative<NKikimr::NMiniKQL::TUnboxedValueVector>(v.Value)) {
213+
return false;
214+
}
187215
}
188216
// Finished and no data values.
189217
return true;
@@ -195,7 +223,21 @@ class TDqAsyncOutputBuffer : public IDqAsyncOutputBuffer {
195223

196224
private:
197225
void ReestimateRowBytes(const NUdf::TUnboxedValue& value) {
198-
const ui64 valueSize = TDqDataSerializer::EstimateSize(value, OutputType);
226+
ReestimateRowBytes(TDqDataSerializer::EstimateSize(value, OutputType));
227+
}
228+
229+
void ReestimateRowBytes(const NUdf::TUnboxedValue* values, ui32 count) {
230+
const auto* multiType = static_cast<NKikimr::NMiniKQL::TMultiType* const>(OutputType);
231+
YQL_ENSURE(multiType, "Expected multi type for wide output");
232+
233+
ui64 valueSize = 0;
234+
for (ui32 i = 0; i < count; ++i) {
235+
valueSize += TDqDataSerializer::EstimateSize(values[i], multiType->GetElementType(i));
236+
}
237+
ReestimateRowBytes(valueSize);
238+
}
239+
240+
void ReestimateRowBytes(ui64 valueSize) {
199241
if (EstimatedRowBytes) {
200242
EstimatedRowBytes = static_cast<ui64>(0.6 * valueSize + 0.4 * EstimatedRowBytes);
201243
} else {

0 commit comments

Comments
 (0)