@@ -53,7 +53,7 @@ class TFilterInputSpec : public NYql::NPureCalc::TInputSpecBase {
53
53
TVector<NYT::TNode> Schemas;
54
54
};
55
55
56
- class TFilterInputConsumer : public NYql ::NPureCalc::IConsumer<std::pair<ui64, TList<TString> >> {
56
+ class TFilterInputConsumer : public NYql ::NPureCalc::IConsumer<std::pair<ui64, const TVector<TVector<std::string_view>>& >> {
57
57
public:
58
58
TFilterInputConsumer (
59
59
const TFilterInputSpec& spec,
@@ -91,28 +91,32 @@ class TFilterInputConsumer : public NYql::NPureCalc::IConsumer<std::pair<ui64, T
91
91
}
92
92
}
93
93
94
- void OnObject (std::pair<ui64, TList<TString>> value) override {
94
+ void OnObject (std::pair<ui64, const TVector<TVector<std::string_view>>&> values) override {
95
+ Y_ENSURE (FieldsPositions.size () == values.second .size ());
96
+
95
97
NKikimr::NMiniKQL::TThrowingBindTerminator bind;
96
-
97
98
with_lock (Worker->GetScopedAlloc ()) {
98
99
auto & holderFactory = Worker->GetGraph ().GetHolderFactory ();
99
- NYql::NUdf::TUnboxedValue* items = nullptr ;
100
100
101
- NYql::NUdf::TUnboxedValue result = Cache.NewArray (
102
- holderFactory,
103
- static_cast <ui32>(value.second .size () + 1 ),
104
- items);
105
-
106
- items[OffsetPosition] = NYql::NUdf::TUnboxedValuePod (value.first );
101
+ // TODO: use blocks here
102
+ for (size_t rowId = 0 ; rowId < values.second .front ().size (); ++rowId) {
103
+ NYql::NUdf::TUnboxedValue* items = nullptr ;
104
+
105
+ NYql::NUdf::TUnboxedValue result = Cache.NewArray (
106
+ holderFactory,
107
+ static_cast <ui32>(values.second .size () + 1 ),
108
+ items);
107
109
108
- Y_ENSURE (FieldsPositions. size () == value. second . size () );
110
+ items[OffsetPosition] = NYql::NUdf::TUnboxedValuePod (values. first ++ );
109
111
110
- size_t i = 0 ;
111
- for (const auto & v : value.second ) {
112
- NYql::NUdf::TStringValue str (v);
113
- items[FieldsPositions[i++]] = NYql::NUdf::TUnboxedValuePod (std::move (str));
112
+ size_t fieldId = 0 ;
113
+ for (const auto & column : values.second ) {
114
+ NYql::NUdf::TStringValue str (column[rowId]);
115
+ items[FieldsPositions[fieldId++]] = NYql::NUdf::TUnboxedValuePod (std::move (str));
116
+ }
117
+
118
+ Worker->Push (std::move (result));
114
119
}
115
- Worker->Push (std::move (result));
116
120
}
117
121
}
118
122
@@ -196,7 +200,7 @@ struct NYql::NPureCalc::TInputSpecTraits<TFilterInputSpec> {
196
200
static constexpr bool IsPartial = false ;
197
201
static constexpr bool SupportPushStreamMode = true ;
198
202
199
- using TConsumerType = THolder<NYql::NPureCalc::IConsumer<std::pair<ui64, TList<TString> >>>;
203
+ using TConsumerType = THolder<NYql::NPureCalc::IConsumer<std::pair<ui64, const TVector<TVector<std::string_view>>& >>>;
200
204
201
205
static TConsumerType MakeConsumer (
202
206
const TFilterInputSpec& spec,
@@ -238,8 +242,9 @@ class TJsonFilter::TImpl {
238
242
LOG_ROW_DISPATCHER_DEBUG (" Program created" );
239
243
}
240
244
241
- void Push (ui64 offset, const TList<TString>& value) {
242
- InputConsumer->OnObject (std::make_pair (offset, value));
245
+ void Push (ui64 offset, const TVector<TVector<std::string_view>>& values) {
246
+ Y_ENSURE (values, " Expected non empty schema" );
247
+ InputConsumer->OnObject (std::make_pair (offset, values));
243
248
}
244
249
245
250
TString GetSql () const {
@@ -266,7 +271,7 @@ class TJsonFilter::TImpl {
266
271
267
272
private:
268
273
THolder<NYql::NPureCalc::TPushStreamProgram<TFilterInputSpec, TFilterOutputSpec>> Program;
269
- THolder<NYql::NPureCalc::IConsumer<std::pair<ui64, TList<TString> >>> InputConsumer;
274
+ THolder<NYql::NPureCalc::IConsumer<std::pair<ui64, const TVector<TVector<std::string_view>>& >>> InputConsumer;
270
275
const TString Sql;
271
276
};
272
277
@@ -280,9 +285,9 @@ TJsonFilter::TJsonFilter(
280
285
281
286
TJsonFilter::~TJsonFilter () {
282
287
}
283
-
284
- void TJsonFilter::Push (ui64 offset, const TList<TString>& value ) {
285
- Impl->Push (offset, value );
288
+
289
+ void TJsonFilter::Push (ui64 offset, const TVector<TVector<std::string_view>>& values ) {
290
+ Impl->Push (offset, values );
286
291
}
287
292
288
293
TString TJsonFilter::GetSql () {
0 commit comments