1+ #include < ydb/library/yql/providers/common/schema/parser/yql_type_parser.h>
12#include < ydb/library/yql/public/udf/udf_version.h>
23#include < ydb/library/yql/public/purecalc/purecalc.h>
34#include < ydb/library/yql/public/purecalc/io_specs/mkql/spec.h>
1011#include < ydb/core/fq/libs/common/util.h>
1112#include < ydb/core/fq/libs/row_dispatcher/json_filter.h>
1213
13-
1414namespace {
1515
1616using TCallback = NFq::TJsonFilter::TCallback;
@@ -23,6 +23,12 @@ NYT::TNode CreateTypeNode(const TString& fieldType) {
2323 .Add (fieldType);
2424}
2525
26+ NYT::TNode CreateOptionalTypeNode (const TString& fieldType) {
27+ return NYT::TNode::CreateList ()
28+ .Add (" OptionalType" )
29+ .Add (CreateTypeNode (fieldType));
30+ }
31+
2632void AddField (NYT::TNode& node, const TString& fieldName, const TString& fieldType) {
2733 node.Add (
2834 NYT::TNode::CreateList ()
@@ -31,18 +37,29 @@ void AddField(NYT::TNode& node, const TString& fieldName, const TString& fieldTy
3137 );
3238}
3339
34- void AddOptionalField (NYT::TNode& node, const TString& fieldName, const TString& fieldType) {
35- node.Add (NYT::TNode::CreateList ()
36- .Add (fieldName)
37- .Add (NYT::TNode::CreateList ().Add (" OptionalType" ).Add (CreateTypeNode (fieldType)))
40+ void AddTypedField (NYT::TNode& node, const TString& fieldName, const TString& fieldTypeYson) {
41+ NYT::TNode parsedType;
42+ Y_ENSURE (NYql::NCommon::ParseYson (parsedType, fieldTypeYson, Cerr), " Invalid field type" );
43+
44+ // TODO: remove this when the re-parsing is removed from pq read actor
45+ if (parsedType == CreateTypeNode (" Json" )) {
46+ parsedType = CreateTypeNode (" String" );
47+ } else if (parsedType == CreateOptionalTypeNode (" Json" )) {
48+ parsedType = CreateOptionalTypeNode (" String" );
49+ }
50+
51+ node.Add (
52+ NYT::TNode::CreateList ()
53+ .Add (fieldName)
54+ .Add (parsedType)
3855 );
3956}
4057
41- NYT::TNode MakeInputSchema (const TVector<TString>& columns) {
58+ NYT::TNode MakeInputSchema (const TVector<TString>& columns, const TVector<TString>& types ) {
4259 auto structMembers = NYT::TNode::CreateList ();
4360 AddField (structMembers, OffsetFieldName, " Uint64" );
44- for (const auto & col : columns) {
45- AddOptionalField (structMembers, col, " String " );
61+ for (size_t i = 0 ; i < columns. size (); ++i ) {
62+ AddTypedField (structMembers, columns[i], types[i] );
4663 }
4764 return NYT::TNode::CreateList ().Add (" StructType" ).Add (std::move (structMembers));
4865}
@@ -68,7 +85,7 @@ class TFilterInputSpec : public NYql::NPureCalc::TInputSpecBase {
6885 TVector<NYT::TNode> Schemas;
6986};
7087
71- class TFilterInputConsumer : public NYql ::NPureCalc::IConsumer<std::pair<const TVector<ui64>&, const TVector<TVector<std::string_view> >&>> {
88+ class TFilterInputConsumer : public NYql ::NPureCalc::IConsumer<std::pair<const TVector<ui64>&, const TVector<const NKikimr::NMiniKQL::TUnboxedValueVector* >&>> {
7289public:
7390 TFilterInputConsumer (
7491 const TFilterInputSpec& spec,
@@ -106,15 +123,15 @@ class TFilterInputConsumer : public NYql::NPureCalc::IConsumer<std::pair<const T
106123 }
107124 }
108125
109- void OnObject (std::pair<const TVector<ui64>&, const TVector<TVector<std::string_view> >&> values) override {
126+ void OnObject (std::pair<const TVector<ui64>&, const TVector<const NKikimr::NMiniKQL::TUnboxedValueVector* >&> values) override {
110127 Y_ENSURE (FieldsPositions.size () == values.second .size ());
111128
112129 NKikimr::NMiniKQL::TThrowingBindTerminator bind;
113130 with_lock (Worker->GetScopedAlloc ()) {
114131 auto & holderFactory = Worker->GetGraph ().GetHolderFactory ();
115132
116133 // TODO: use blocks here
117- for (size_t rowId = 0 ; rowId < values.second .front (). size (); ++rowId) {
134+ for (size_t rowId = 0 ; rowId < values.second .front ()-> size (); ++rowId) {
118135 NYql::NUdf::TUnboxedValue* items = nullptr ;
119136
120137 NYql::NUdf::TUnboxedValue result = Cache.NewArray (
@@ -126,13 +143,16 @@ class TFilterInputConsumer : public NYql::NPureCalc::IConsumer<std::pair<const T
126143
127144 size_t fieldId = 0 ;
128145 for (const auto & column : values.second ) {
129- items[FieldsPositions[fieldId++]] = column[rowId].data () // Check that std::string_view was initialized in json_parser
130- ? NKikimr::NMiniKQL::MakeString (column[rowId]).MakeOptional ()
131- : NKikimr::NUdf::TUnboxedValuePod ();
146+ items[FieldsPositions[fieldId++]] = column->at (rowId);
132147 }
133148
134149 Worker->Push (std::move (result));
135150 }
151+
152+ // Clear cache after each object because
153+ // values allocated on another allocator and should be released
154+ Cache.Clear ();
155+ Worker->GetGraph ().Invalidate ();
136156 }
137157 }
138158
@@ -216,7 +236,7 @@ struct NYql::NPureCalc::TInputSpecTraits<TFilterInputSpec> {
216236 static constexpr bool IsPartial = false ;
217237 static constexpr bool SupportPushStreamMode = true ;
218238
219- using TConsumerType = THolder<NYql::NPureCalc::IConsumer<std::pair<const TVector<ui64>&, const TVector<TVector<std::string_view> >&>>>;
239+ using TConsumerType = THolder<NYql::NPureCalc::IConsumer<std::pair<const TVector<ui64>&, const TVector<const NKikimr::NMiniKQL::TUnboxedValueVector* >&>>>;
220240
221241 static TConsumerType MakeConsumer (
222242 const TFilterInputSpec& spec,
@@ -244,12 +264,15 @@ class TJsonFilter::TImpl {
244264 const TVector<TString>& types,
245265 const TString& whereFilter,
246266 TCallback callback)
247- : Sql(GenerateSql(columns, types, whereFilter)) {
267+ : Sql(GenerateSql(whereFilter)) {
268+ Y_ENSURE (columns.size () == types.size (), " Number of columns and types should by equal" );
248269 auto factory = NYql::NPureCalc::MakeProgramFactory (NYql::NPureCalc::TProgramFactoryOptions ());
249270
271+ // Program should be stateless because input values
272+ // allocated on another allocator and should be released
250273 LOG_ROW_DISPATCHER_DEBUG (" Creating program..." );
251274 Program = factory->MakePushStreamProgram (
252- TFilterInputSpec (MakeInputSchema (columns)),
275+ TFilterInputSpec (MakeInputSchema (columns, types )),
253276 TFilterOutputSpec (MakeOutputSchema ()),
254277 Sql,
255278 NYql::NPureCalc::ETranslationMode::SQL
@@ -258,7 +281,7 @@ class TJsonFilter::TImpl {
258281 LOG_ROW_DISPATCHER_DEBUG (" Program created" );
259282 }
260283
261- void Push (const TVector<ui64>& offsets, const TVector<TVector<std::string_view> >& values) {
284+ void Push (const TVector<ui64>& offsets, const TVector<const NKikimr::NMiniKQL::TUnboxedValueVector* >& values) {
262285 Y_ENSURE (values, " Expected non empty schema" );
263286 InputConsumer->OnObject (std::make_pair (offsets, values));
264287 }
@@ -268,29 +291,9 @@ class TJsonFilter::TImpl {
268291 }
269292
270293private:
271- TString GenerateSql (const TVector<TString>& columnNames, const TVector<TString>& columnTypes, const TString& whereFilter) {
294+ TString GenerateSql (const TString& whereFilter) {
272295 TStringStream str;
273- str << " $fields = SELECT " ;
274- Y_ABORT_UNLESS (columnNames.size () == columnTypes.size ());
275- str << OffsetFieldName << " , " ;
276- for (size_t i = 0 ; i < columnNames.size (); ++i) {
277- TString columnType = columnTypes[i];
278- TString columnName = NFq::EncloseAndEscapeString (columnNames[i], ' `' );
279- if (columnType == " Json" ) {
280- columnType = " String" ;
281- } else if (columnType == " Optional<Json>" ) {
282- columnType = " Optional<String>" ;
283- }
284-
285- if (columnType.StartsWith (" Optional" )) {
286- str << " IF(" << columnName << " IS NOT NULL, Unwrap(CAST(" << columnName << " as " << columnType << " )), NULL)" ;
287- } else {
288- str << " Unwrap(CAST(" << columnName << " as " << columnType << " ))" ;
289- }
290- str << " as " << columnName << ((i != columnNames.size () - 1 ) ? " ," : " " );
291- }
292- str << " FROM Input;\n " ;
293- str << " $filtered = SELECT * FROM $fields " << whereFilter << " ;\n " ;
296+ str << " $filtered = SELECT * FROM Input " << whereFilter << " ;\n " ;
294297
295298 str << " SELECT " << OffsetFieldName << " , Unwrap(Json::SerializeJson(Yson::From(RemoveMembers(TableRow(), [\" " << OffsetFieldName;
296299 str << " \" ])))) as data FROM $filtered" ;
@@ -300,7 +303,7 @@ class TJsonFilter::TImpl {
300303
301304private:
302305 THolder<NYql::NPureCalc::TPushStreamProgram<TFilterInputSpec, TFilterOutputSpec>> Program;
303- THolder<NYql::NPureCalc::IConsumer<std::pair<const TVector<ui64>&, const TVector<TVector<std::string_view> >&>>> InputConsumer;
306+ THolder<NYql::NPureCalc::IConsumer<std::pair<const TVector<ui64>&, const TVector<const NKikimr::NMiniKQL::TUnboxedValueVector* >&>>> InputConsumer;
304307 const TString Sql;
305308};
306309
@@ -315,7 +318,7 @@ TJsonFilter::TJsonFilter(
315318TJsonFilter::~TJsonFilter () {
316319}
317320
318- void TJsonFilter::Push (const TVector<ui64>& offsets, const TVector<TVector<std::string_view> >& values) {
321+ void TJsonFilter::Push (const TVector<ui64>& offsets, const TVector<const NKikimr::NMiniKQL::TUnboxedValueVector* >& values) {
319322 Impl->Push (offsets, values);
320323}
321324
0 commit comments