44#include < ydb/library/yql/core/yql_opt_utils.h>
55#include < ydb/library/yql/core/yql_expr_type_annotation.h>
66#include < ydb/library/yql/core/yql_expr_optimize.h>
7+ #include < ydb/library/yql/core/yql_type_helpers.h>
78
89#include < ydb/library/yql/utils/log/log.h>
910
@@ -130,8 +131,146 @@ TExprNode::TListType OriginalJoinOutputMembers(const TDqPhyMapJoin& mapJoin, TEx
130131 }
131132 return structMembers;
132133}
134+
135+ TExprNode::TPtr ExpandJoinInput (const TStructExprType& type, TExprNode::TPtr&& arg, TExprContext& ctx) {
136+ return ctx.Builder (arg->Pos ())
137+ .Callable (" ExpandMap" )
138+ .Add (0 , std::move (arg))
139+ .Lambda (1 )
140+ .Param (" item" )
141+ .Do ([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
142+ auto i = 0U ;
143+ for (const auto & item : type.GetItems ()) {
144+ parent.Callable (i++, " Member" )
145+ .Arg (0 , " item" )
146+ .Atom (1 , item->GetName ())
147+ .Seal ();
148+ }
149+ return parent;
150+ })
151+ .Seal ()
152+ .Seal ().Build ();
153+ }
154+
133155} // anonymous namespace end
134156
157+ TExprBase DqPeepholeRewriteMapJoinWithGraceCore (const TExprBase& node, TExprContext& ctx) {
158+ if (!node.Maybe <TDqPhyGraceJoin>()) {
159+ return node;
160+ }
161+ const auto graceJoin = node.Cast <TDqPhyGraceJoin>();
162+ const auto pos = graceJoin.Pos ();
163+
164+ const TString leftTableLabel (GetTableLabel (graceJoin.LeftLabel ()));
165+ const TString rightTableLabel (GetTableLabel (graceJoin.RightLabel ()));
166+
167+ auto [leftKeyColumnNodes, rightKeyColumnNodes] = JoinKeysToAtoms (ctx, graceJoin, leftTableLabel, rightTableLabel);
168+ const auto keyWidth = leftKeyColumnNodes.size ();
169+
170+ const auto itemTypeLeft = GetSequenceItemType (graceJoin.LeftInput (), false , ctx)->Cast <TStructExprType>();
171+ const auto itemTypeRight = GetSequenceItemType (graceJoin.RightInput (), false , ctx)->Cast <TStructExprType>();
172+
173+ TExprNode::TListType leftRenames, rightRenames;
174+ std::vector<TString> fullColNames;
175+ ui32 outputIndex = 0 ;
176+
177+ for (auto i = 0u ; i < itemTypeLeft->GetSize (); i++) {
178+ TString name (itemTypeLeft->GetItems ()[i]->GetName ());
179+ if (leftTableLabel) {
180+ name = leftTableLabel + " ." + name;
181+ }
182+ fullColNames.push_back (name);
183+ leftRenames.emplace_back (ctx.NewAtom (pos, ctx.GetIndexAsString (i)));
184+ leftRenames.emplace_back (ctx.NewAtom (pos, ctx.GetIndexAsString (outputIndex++)));
185+ }
186+ if (graceJoin.JoinType ().Value () != " LeftOnly" && graceJoin.JoinType ().Value () != " LeftSemi" ) {
187+ for (auto i = 0u ; i < itemTypeRight->GetSize (); i++) {
188+ TString name (itemTypeRight->GetItems ()[i]->GetName ());
189+ if (rightTableLabel) {
190+ name = rightTableLabel + " ." + name;
191+ }
192+ fullColNames.push_back (name);
193+ rightRenames.emplace_back (ctx.NewAtom (pos, ctx.GetIndexAsString (i)));
194+ rightRenames.emplace_back (ctx.NewAtom (pos, ctx.GetIndexAsString (outputIndex++)));
195+ }
196+ }
197+
198+ TTypeAnnotationNode::TListType keyTypesLeft (keyWidth);
199+ TTypeAnnotationNode::TListType keyTypesRight (keyWidth);
200+ TTypeAnnotationNode::TListType keyTypes (keyWidth);
201+ for (auto i = 0U ; i < keyTypes.size (); ++i) {
202+ const auto keyTypeLeft = itemTypeLeft->FindItemType (leftKeyColumnNodes[i]->Content ());
203+ const auto keyTypeRight = itemTypeRight->FindItemType (rightKeyColumnNodes[i]->Content ());
204+ bool optKey = false ;
205+ keyTypes[i] = JoinDryKeyType (keyTypeLeft, keyTypeRight, optKey, ctx);
206+ if (!keyTypes[i]) {
207+ keyTypes.clear ();
208+ keyTypesLeft.clear ();
209+ keyTypesRight.clear ();
210+ break ;
211+ }
212+ keyTypesLeft[i] = optKey ? ctx.MakeType <TOptionalExprType>(keyTypes[i]) : keyTypes[i];
213+ keyTypesRight[i] = optKey ? ctx.MakeType <TOptionalExprType>(keyTypes[i]) : keyTypes[i];
214+ }
215+
216+ auto leftInput = ExpandJoinInput (*itemTypeLeft, ctx.NewCallable (graceJoin.LeftInput ().Pos (), " ToFlow" , {graceJoin.LeftInput ().Ptr ()}), ctx);
217+ auto rightInput = ExpandJoinInput (*itemTypeRight, ctx.NewCallable (graceJoin.RightInput ().Pos (), " ToFlow" , {graceJoin.RightInput ().Ptr ()}), ctx);
218+ YQL_ENSURE (!keyTypes.empty ());
219+
220+ for (auto i = 0U ; i < leftKeyColumnNodes.size (); i++) {
221+ const auto origName = TString (leftKeyColumnNodes[i]->Content ());
222+ auto index = itemTypeLeft->FindItem (origName);
223+ YQL_ENSURE (index);
224+ leftKeyColumnNodes[i] = ctx.NewAtom (leftKeyColumnNodes[i]->Pos (), ctx.GetIndexAsString (*index));
225+ }
226+ for (auto i = 0U ; i < rightKeyColumnNodes.size (); i++) {
227+ const auto origName = TString (rightKeyColumnNodes[i]->Content ());
228+ auto index = itemTypeRight->FindItem (origName);
229+ YQL_ENSURE (index);
230+ rightKeyColumnNodes[i] = ctx.NewAtom (rightKeyColumnNodes[i]->Pos (), ctx.GetIndexAsString (*index));
231+ }
232+
233+ auto [leftKeyColumnNodesCopy, rightKeyColumnNodesCopy] = JoinKeysToAtoms (ctx, graceJoin, leftTableLabel, rightTableLabel);
234+
235+ auto graceJoinCore = Build<TCoGraceJoinCore>(ctx, pos)
236+ .LeftInput (std::move (leftInput))
237+ .RightInput (std::move (rightInput))
238+ .JoinKind (graceJoin.JoinType ())
239+ .LeftKeysColumns (ctx.NewList (pos, std::move (leftKeyColumnNodes)))
240+ .RightKeysColumns (ctx.NewList (pos, std::move (rightKeyColumnNodes)))
241+ .LeftRenames (ctx.NewList (pos, std::move (leftRenames)))
242+ .RightRenames (ctx.NewList (pos, std::move (rightRenames)))
243+ .LeftKeysColumnNames (ctx.NewList (pos, std::move (leftKeyColumnNodesCopy)))
244+ .RightKeysColumnNames (ctx.NewList (pos, std::move (rightKeyColumnNodesCopy)))
245+ .Flags ()
246+ .Build ()
247+ .Done ();
248+
249+ auto graceNode = ctx.Builder (pos)
250+ .Callable (" NarrowMap" )
251+ .Add (0 , graceJoinCore.Ptr ())
252+ .Lambda (1 )
253+ .Params (" output" , fullColNames.size ())
254+ .Callable (" AsStruct" )
255+ .Do ([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
256+ ui32 i = 0U ;
257+ for (const auto & colName : fullColNames) {
258+ parent.List (i)
259+ .Atom (0 , colName)
260+ .Arg (1 , " output" , i)
261+ .Seal ();
262+ i++;
263+ }
264+ return parent;
265+ })
266+ .Seal ()
267+ .Seal ()
268+ .Seal ()
269+ .Build ();
270+
271+ return TExprBase (graceNode);
272+ }
273+
135274/* *
136275 * Rewrites a `KqpMapJoin` to the `MapJoinCore`.
137276 *
@@ -142,10 +281,11 @@ TExprNode::TListType OriginalJoinOutputMembers(const TDqPhyMapJoin& mapJoin, TEx
142281 * (rely on the fact that there will be only one element in the `FlatMap`-stream)
143282 * - Align key types using `StrictCast`, use internal columns to store converted left keys
144283 */
145- TExprBase DqPeepholeRewriteMapJoin (const TExprBase& node, TExprContext& ctx) {
284+ TExprBase DqPeepholeRewriteMapJoinWithMapCore (const TExprBase& node, TExprContext& ctx) {
146285 if (!node.Maybe <TDqPhyMapJoin>()) {
147286 return node;
148287 }
288+
149289 const auto mapJoin = node.Cast <TDqPhyMapJoin>();
150290 const auto pos = mapJoin.Pos ();
151291
0 commit comments