Skip to content

Commit 3f1e08d

Browse files
authored
Merge 8222d36 into dc9f405
2 parents dc9f405 + 8222d36 commit 3f1e08d

File tree

10 files changed

+406
-419
lines changed

10 files changed

+406
-419
lines changed

ydb/core/fq/libs/row_dispatcher/json_filter.cpp

Lines changed: 28 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ class TFilterInputSpec : public NYql::NPureCalc::TInputSpecBase {
5353
TVector<NYT::TNode> Schemas;
5454
};
5555

56-
class TFilterInputConsumer : public NYql::NPureCalc::IConsumer<std::pair<ui64, TList<TString>>> {
56+
class TFilterInputConsumer : public NYql::NPureCalc::IConsumer<std::pair<ui64, TVector<TVector<std::string_view>>>> {
5757
public:
5858
TFilterInputConsumer(
5959
const TFilterInputSpec& spec,
@@ -91,28 +91,32 @@ class TFilterInputConsumer : public NYql::NPureCalc::IConsumer<std::pair<ui64, T
9191
}
9292
}
9393

94-
void OnObject(std::pair<ui64, TList<TString>> value) override {
94+
void OnObject(std::pair<ui64, TVector<TVector<std::string_view>>> values) override {
95+
Y_ENSURE(FieldsPositions.size() == values.second.size());
96+
9597
NKikimr::NMiniKQL::TThrowingBindTerminator bind;
96-
9798
with_lock (Worker->GetScopedAlloc()) {
9899
auto& holderFactory = Worker->GetGraph().GetHolderFactory();
99-
NYql::NUdf::TUnboxedValue* items = nullptr;
100100

101-
NYql::NUdf::TUnboxedValue result = Cache.NewArray(
102-
holderFactory,
103-
static_cast<ui32>(value.second.size() + 1),
104-
items);
105-
106-
items[OffsetPosition] = NYql::NUdf::TUnboxedValuePod(value.first);
101+
// TODO: use blocks here
102+
for (size_t rowId = 0; rowId < values.second.front().size(); ++rowId) {
103+
NYql::NUdf::TUnboxedValue* items = nullptr;
104+
105+
NYql::NUdf::TUnboxedValue result = Cache.NewArray(
106+
holderFactory,
107+
static_cast<ui32>(values.second.size() + 1),
108+
items);
107109

108-
Y_ENSURE(FieldsPositions.size() == value.second.size());
110+
items[OffsetPosition] = NYql::NUdf::TUnboxedValuePod(values.first++);
109111

110-
size_t i = 0;
111-
for (const auto& v : value.second) {
112-
NYql::NUdf::TStringValue str(v);
113-
items[FieldsPositions[i++]] = NYql::NUdf::TUnboxedValuePod(std::move(str));
112+
size_t fieldId = 0;
113+
for (const auto& column : values.second) {
114+
NYql::NUdf::TStringValue str(column[rowId]);
115+
items[FieldsPositions[fieldId++]] = NYql::NUdf::TUnboxedValuePod(std::move(str));
116+
}
117+
118+
Worker->Push(std::move(result));
114119
}
115-
Worker->Push(std::move(result));
116120
}
117121
}
118122

@@ -196,7 +200,7 @@ struct NYql::NPureCalc::TInputSpecTraits<TFilterInputSpec> {
196200
static constexpr bool IsPartial = false;
197201
static constexpr bool SupportPushStreamMode = true;
198202

199-
using TConsumerType = THolder<NYql::NPureCalc::IConsumer<std::pair<ui64, TList<TString>>>>;
203+
using TConsumerType = THolder<NYql::NPureCalc::IConsumer<std::pair<ui64, TVector<TVector<std::string_view>>>>>;
200204

201205
static TConsumerType MakeConsumer(
202206
const TFilterInputSpec& spec,
@@ -238,8 +242,9 @@ class TJsonFilter::TImpl {
238242
LOG_ROW_DISPATCHER_DEBUG("Program created");
239243
}
240244

241-
void Push(ui64 offset, const TList<TString>& value) {
242-
InputConsumer->OnObject(std::make_pair(offset, value));
245+
void Push(ui64 offset, const TVector<TVector<std::string_view>>& values) {
246+
Y_ENSURE(values, "Expected non empty schema");
247+
InputConsumer->OnObject(std::make_pair(offset, values));
243248
}
244249

245250
TString GetSql() const {
@@ -266,7 +271,7 @@ class TJsonFilter::TImpl {
266271

267272
private:
268273
THolder<NYql::NPureCalc::TPushStreamProgram<TFilterInputSpec, TFilterOutputSpec>> Program;
269-
THolder<NYql::NPureCalc::IConsumer<std::pair<ui64, TList<TString>>>> InputConsumer;
274+
THolder<NYql::NPureCalc::IConsumer<std::pair<ui64, TVector<TVector<std::string_view>>>>> InputConsumer;
270275
const TString Sql;
271276
};
272277

@@ -280,9 +285,9 @@ TJsonFilter::TJsonFilter(
280285

281286
TJsonFilter::~TJsonFilter() {
282287
}
283-
284-
void TJsonFilter::Push(ui64 offset, const TList<TString>& value) {
285-
Impl->Push(offset, value);
288+
289+
void TJsonFilter::Push(ui64 offset, const TVector<TVector<std::string_view>>& values) {
290+
Impl->Push(offset, values);
286291
}
287292

288293
TString TJsonFilter::GetSql() {

ydb/core/fq/libs/row_dispatcher/json_filter.h

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,24 @@
1-
21
#pragma once
32

4-
namespace NFq {
5-
63
#include <ydb/library/yql/public/udf/udf_data_type.h>
74
#include <ydb/library/yql/public/udf/udf_value.h>
85

6+
namespace NFq {
7+
98
class TJsonFilter {
109
public:
1110
using TCallback = std::function<void(ui64, const TString&)>;
12-
11+
1312
public:
1413
TJsonFilter(
15-
const TVector<TString>& columns,
14+
const TVector<TString>& columns,
1615
const TVector<TString>& types,
1716
const TString& whereFilter,
1817
TCallback callback);
18+
1919
~TJsonFilter();
20-
void Push(ui64 offset, const TList<TString>& value);
20+
21+
void Push(ui64 offset, const TVector<TVector<std::string_view>>& values);
2122
TString GetSql();
2223

2324
private:

0 commit comments

Comments
 (0)