88#include < ydb/library/yql/providers/yt/gateway/file/yql_yt_file_services.h>
99#include < ydb/library/yql/providers/common/provider/yql_provider_names.h>
1010#include < ydb/library/yql/providers/common/proto/gateways_config.pb.h>
11+ #include " ydb/library/yql/providers/yt/common/yql_names.h"
12+ #include < ydb/library/yql/providers/yt/expr_nodes/yql_yt_expr_nodes.h>
13+ #include < ydb/library/yql/providers/yt/provider/yql_yt_key.h>
1114#include < ydb/library/yql/providers/yt/provider/yql_yt_provider.h>
1215#include < ydb/library/yql/providers/pg/provider/yql_pg_provider.h>
1316#include < ydb/library/yql/public/issue/yql_issue.h>
1417#include < ydb/library/yql/parser/pg_wrapper/interface/utils.h>
18+ #include < ydb/library/yql/providers/yt/lib/schema/schema.h>
19+ #include < ydb/library/yql/providers/common/schema/expr/yql_expr_schema.h>
20+ #include < ydb/library/yql/ast/yql_expr.h>
1521
1622#include < library/cpp/getopt/last_getopt.h>
1723#include < library/cpp/yson/public.h>
24+ #include " library/cpp/yson/node/node_io.h"
1825#include < library/cpp/yt/yson_string/string.h>
1926#include < fmt/format.h>
2027
3542
3643using namespace NYql ;
3744using namespace NKikimr ::NMiniKQL;
45+ using namespace NNodes ;
46+ using NUdf::EDataSlot;
3847
3948namespace NMiniKQL = NKikimr::NMiniKQL;
4049
@@ -939,36 +948,53 @@ std::pair<TString, TString> GetYtTableDataPaths(const TFsPath& dataDir, const TS
939948 return {dataFileName, attrFileName};
940949}
941950
942- void CreateYtFileTable (const TFsPath& dataDir, const TString tableName, const TExprNode::TPtr columnsNode, THashMap<TString, TString>& tablesMapping) {
943- const auto [dataFilePath, attrFilePath] =
944- GetYtTableDataPaths (dataDir, tableName);
951+ void CreateYtFileTable (const TFsPath& dataDir, TYtTableInfo& tableInfo, const TExprNode::TPtr columnsNode,
952+ THashMap<TString, TString>& tablesMapping, TExprContext& ctx, const TPosition& pos) {
953+ const auto [dataFilePath, attrFilePath] =
954+ GetYtTableDataPaths (dataDir, tableInfo.Name );
945955
946- TFile dataFile{dataFilePath, CreateNew};
947- TFile attrFile{attrFilePath, CreateNew};
956+ TFile dataFile{dataFilePath, CreateNew};
957+ TFile attrFile{attrFilePath, CreateNew};
948958
949- THolder<TFixedBufferFileOutput> fo;
950- fo.Reset (new TFixedBufferFileOutput{attrFile.GetName ()});
951- IOutputStream *attrS{fo.Get ()};
959+ auto rowSpec = MakeIntrusive<TYqlRowSpecInfo>();
952960
953- *attrS << R"__( {
954- "_yql_row_spec"={
955- "Type"=["StructType";[
956- )__" ;
961+ TColumnOrder columnOrder;
962+ columnOrder.reserve (columnsNode->ChildrenSize ());
957963
958- for (const auto &columnNode : columnsNode->Children ()) {
959- const auto &colName = columnNode->Child (0 )->Content ();
960- const auto &colTypeNode = columnNode->Child (1 );
964+ TStringBuilder ysonType;
965+ ysonType << " [\" StructType\" ;[" ;
961966
962- *attrS << fmt::format (R"__( ["{0}";["{1}";"{2}";];];
963- )__" ,
967+ for (const auto &columnNode : columnsNode->Children ()) {
968+ const auto &colName = columnNode->Child (0 )->Content ();
969+ const auto &colTypeNode = columnNode->Child (1 );
970+
971+ columnOrder.emplace_back (colName);
972+
973+ ysonType << fmt::format (" [\" {0}\" ;[\" {1}\" ;\" {2}\" ;];];" ,
964974 colName, colTypeNode->Content (),
965975 colTypeNode->Child (0 )->Content ());
966976 }
967- *attrS << R"__( ];];
968- };
969- })__" ;
977+ ysonType << " ];]" ;
978+ const auto *typeNode = NCommon::ParseTypeFromYson (TStringBuf (ysonType), ctx, pos);
979+
980+ rowSpec->SetType (typeNode->Cast <TStructExprType>());
981+ rowSpec->SetColumnOrder (std::move (columnOrder));
982+ tableInfo.RowSpec = rowSpec;
983+
984+ NYT::TNode attrs = NYT::TNode::CreateMap ();
985+ tableInfo.RowSpec ->FillAttrNode (attrs[YqlRowSpecAttribute], 0 , false );
986+
987+ NYT::TNode spec;
988+ tableInfo.RowSpec ->FillCodecNode (spec[YqlRowSpecAttribute]);
989+
990+ attrs[" schema" ] = RowSpecToYTSchema (spec[YqlRowSpecAttribute], 0 ).ToNode ();
970991
971- tablesMapping[TString (" yt.plato." ) + tableName] = dataFile.GetName ();
992+
993+ TOFStream of (attrFile.GetName ());
994+ of.Write (NYT::NodeToYsonString (attrs, NYson::EYsonFormat::Pretty));
995+
996+ const TString fullTableName (TStringBuilder () << " yt." << tableInfo.Cluster << ' .' << tableInfo.Name );
997+ tablesMapping[fullTableName] = dataFile.GetName ();
972998}
973999
9741000bool RemoveFile (const TString& fileName) {
@@ -1009,15 +1035,12 @@ int SplitStatements(int argc, char* argv[]) {
10091035
10101036void WriteToYtTableScheme (
10111037 const NYql::TExprNode& writeNode,
1038+ const TYtWrite& write,
1039+ const TYtOutputKey& key,
10121040 const TTempDir& tempDir,
1013- const TIntrusivePtr<class NYql ::NFile::TYtFileServices> yqlNativeServices) {
1014- const auto * keyNode = writeNode.Child (2 );
1015-
1016- const auto * tableNameNode = keyNode->Child (0 )->Child (1 );
1017- Y_ENSURE (tableNameNode->IsCallable (" String" ));
1018-
1019- const auto & tableName = tableNameNode->Child (0 )->Content ();
1020- Y_ENSURE (!tableName.empty ());
1041+ const TIntrusivePtr<class NYql ::NFile::TYtFileServices> yqlNativeServices,
1042+ TExprContext& ctx) {
1043+ const auto & tableName = key.GetPath ();
10211044
10221045 const auto * optionsNode = writeNode.Child (4 );
10231046 Y_ENSURE (optionsNode);
@@ -1030,8 +1053,10 @@ void WriteToYtTableScheme(
10301053 const auto columnsNode = GetSetting (*optionsNode, " columns" );
10311054 Y_ENSURE (columnsNode);
10321055
1033- CreateYtFileTable (tempDir.Path (), TString (tableName), columnsNode->ChildPtr (1 ),
1034- yqlNativeServices->GetTablesMapping ());
1056+ TYtTableInfo tableInfo (key, write.DataSink ().Cluster ().Value ());
1057+
1058+ CreateYtFileTable (tempDir.Path (), tableInfo, columnsNode->ChildPtr (1 ),
1059+ yqlNativeServices->GetTablesMapping (), ctx, writeNode.Pos (ctx));
10351060 }
10361061 else if (mode == " drop" ) {
10371062 DeleteYtFileTable (tempDir.Path (), TString (tableName), yqlNativeServices->GetTablesMapping ());
@@ -1061,6 +1086,11 @@ void ProcessMetaCmd(const TStringBuf& cmd) {
10611086 Cerr << " Metacommand " << cmd << " is not supported\n " ;
10621087}
10631088
1089+ void ShowFinalAst (TProgramPtr& program, IOutputStream& stream) {
1090+ Cerr << " Final AST:\n " ;
1091+ PrintExprTo (program, stream);
1092+ }
1093+
10641094int Main (int argc, char * argv[])
10651095{
10661096 using namespace NLastGetopt ;
@@ -1077,11 +1107,16 @@ int Main(int argc, char* argv[])
10771107 clusterMapping[" information_schema" ] = PgProviderName;
10781108
10791109 opts.AddHelpOption ();
1110+ opts.AddLongOption (" print-ast" , " print initial & final ASTs to stderr" ).NoArgument ();
1111+ opts.AddLongOption (" print-result" , " print program execution result to stderr" ).NoArgument ();
10801112 opts.AddLongOption (" datadir" , " directory for tables" ).StoreResult <TString>(&rawDataDir);
10811113 opts.SetFreeArgsMax (0 );
10821114
10831115 TOptsParseResult res (&opts, argc, argv);
10841116
1117+ const auto needPrintAst = res.Has (" print-ast" );
1118+ const auto needPrintResult = res.Has (" print-result" );
1119+
10851120 const bool tempDirExists = !rawDataDir.empty () && NFs::Exists (rawDataDir);
10861121 TTempDir tempDir{rawDataDir.empty () ? TTempDir{} : TTempDir{rawDataDir}};
10871122 if (tempDirExists) {
@@ -1187,18 +1222,37 @@ int Main(int argc, char* argv[])
11871222 }
11881223#endif
11891224
1225+ if (needPrintAst) {
1226+ Cerr << " Initial AST:\n " ;
1227+ PrintExprTo (program, Cerr);
1228+ }
1229+
11901230 static const THashSet<TString> ignoredNodes{" CommitAll!" , " Commit!" };
11911231 const auto opNode = NYql::FindNode (program->ExprRoot (),
11921232 [] (const TExprNode::TPtr& node) { return !ignoredNodes.contains (node->Content ()); });
1193- if (opNode->IsCallable (" Write!" )) {
1194- Y_ENSURE (opNode->ChildrenSize () == 5 );
1233+ if (const auto maybeWrite = TMaybeNode<TYtWrite>(opNode)) {
1234+ const auto write = maybeWrite.Cast ();
1235+
1236+ TYtOutputKey key;
1237+ if (!key.Parse (write.Arg (2 ).Ref (), ctx)) {
1238+ WriteErrorToStream (program);
1239+
1240+ return {};
1241+ }
11951242
1196- const auto * keyNode = opNode-> Child ( 2 );
1243+ const auto * keyNode = key. GetNode ( );
11971244 const bool isWriteToTableSchemeNode = keyNode->IsCallable (" Key" ) && 0 < keyNode->ChildrenSize () &&
11981245 keyNode->Child (0 )->Child (0 )->IsAtom (" tablescheme" );
11991246
12001247 if (isWriteToTableSchemeNode) {
1201- WriteToYtTableScheme (*opNode, tempDir, yqlNativeServices);
1248+ WriteToYtTableScheme (*opNode, write, key, tempDir, yqlNativeServices, program->ExprCtx ());
1249+
1250+ if (needPrintAst) {
1251+ program->Optimize (username);
1252+
1253+ ShowFinalAst (program, Cerr);
1254+ }
1255+
12021256 continue ;
12031257 }
12041258 }
@@ -1210,10 +1264,14 @@ int Main(int argc, char* argv[])
12101264 WriteErrorToStream (program);
12111265 continue ;
12121266 }
1267+ if (needPrintAst) {
1268+ ShowFinalAst (program, Cerr);
1269+ }
12131270
12141271 if (program->HasResults ()) {
1215- // PrintExprTo(program, Cout);
1216- // Cout << program->ResultsAsString() << Endl;
1272+ if (needPrintResult) {
1273+ Cerr << program->ResultsAsString () << Endl;
1274+ }
12171275
12181276 const auto root = ParseYson (program->ResultsAsString ());
12191277
0 commit comments