33#include " export_yt.h"
44#include " yt_wrapper.h"
55
6- #include < contrib/ydb/core/protos/flat_scheme_op.pb.h>
7- #include < contrib/ydb/library/services/services.pb.h>
8- #include < contrib/ydb/core/tablet_flat/flat_row_state.h>
9- #include < contrib/ydb/core/tx/datashard/export_common.h>
10- #include < contrib/ydb/library/binary_json/read.h>
11- #include < contrib/ydb/library/actors/core/actor_bootstrapped.h>
12- #include < contrib/ydb/library/actors/core/hfunc.h>
6+ #include < ydb/core/protos/flat_scheme_op.pb.h>
7+ #if __has_include("ydb/core/protos/flat_scheme_op.deps.pb.h")
8+ #include < ydb/core/protos/flat_scheme_op.deps.pb.h> // Y_IGNORE
9+ #endif
10+ #include < ydb/library/services/services.pb.h>
11+ #include < ydb/core/tablet_flat/flat_row_state.h>
12+ #include < ydb/core/tx/datashard/export_common.h>
13+ #include < ydb/core/tx/datashard/type_serialization.h>
14+ #include < ydb/library/actors/core/actor_bootstrapped.h>
15+ #include < ydb/library/actors/core/hfunc.h>
16+ #include < ydb/library/actors/core/log.h>
17+
18+ #include < yql/essentials/types/binary_json/read.h>
19+ #include < yql/essentials/parser/pg_wrapper/interface/type_desc.h>
1320
1421#include < yt/yt/client/table_client/config.h>
1522#include < yt/yt/client/table_client/name_table.h>
@@ -99,8 +106,6 @@ class TYtUploader: public TActorBootstrapped<TYtUploader> {
99106 TVector<TColumnSchema> schema;
100107
101108 for (const auto & [_, column] : columns) {
102- // TODO: support pg types
103- Y_ABORT_UNLESS (column.Type .GetTypeId () != NScheme::NTypeIds::Pg, " pg types are not supported" );
104109 schema.emplace_back (column.Name , ConvertType (column.Type .GetTypeId ()));
105110 }
106111
@@ -129,6 +134,7 @@ class TYtUploader: public TActorBootstrapped<TYtUploader> {
129134 || error.FindMatching (NYT::NTableClient::EErrorCode::FormatCannotRepresentRow)
130135 || error.FindMatching (NYT::NTableClient::EErrorCode::IncompatibleSchemas)
131136 // Cypress errors
137+ || error.FindMatching (NYT::NYTree::EErrorCode::ResolveError)
132138 || error.FindMatching (NYT::NYTree::EErrorCode::MaxChildCountViolation)
133139 || error.FindMatching (NYT::NYTree::EErrorCode::MaxStringLengthViolation)
134140 || error.FindMatching (NYT::NYTree::EErrorCode::MaxAttributeSizeViolation)
@@ -158,12 +164,16 @@ class TYtUploader: public TActorBootstrapped<TYtUploader> {
158164 return {};
159165 }
160166
161- return NYT::TError (NCustomErrorCodes::InvalidNodeType, TStringBuilder () << " Invalid type of " << path
162- << " : expected \" " << expected << " \" "
163- << " , actual \" " << actual << " \" " );
167+ return NYT::TError (
168+ NCustomErrorCodes::InvalidNodeType,
169+ " Invalid type of %v"
170+ " : expected %Qv"
171+ " , actual %Qv" ,
172+ path,
173+ expected,
174+ actual);
164175 } catch (const yexception& ex) {
165- return NYT::TError (TStringBuilder () << " Error while checking type of " << path
166- << " : " << ex.what ());
176+ return NYT::TError (" Error while checking type of %v: %v" , path, ex.what ());
167177 }
168178 }
169179
@@ -189,12 +199,12 @@ class TYtUploader: public TActorBootstrapped<TYtUploader> {
189199 opts.Attributes = std::move (attrs);
190200 }
191201
192- Send (Client, new TEvYtWrapper::TEvCreateNodeRequest (DstPath. GetPath (), EObjectType::Table, opts));
202+ Send (Client, new TEvYtWrapper::TEvCreateNodeRequest (DstPath-> GetPath (), EObjectType::Table, opts));
193203 } else {
194204 TGetNodeOptions opts;
195205 opts.Attributes = TVector<TString>{" type" };
196206
197- Send (Client, new TEvYtWrapper::TEvGetNodeRequest (DstPath. GetPath (), opts));
207+ Send (Client, new TEvYtWrapper::TEvGetNodeRequest (DstPath-> GetPath (), opts));
198208 }
199209 }
200210
@@ -217,7 +227,7 @@ class TYtUploader: public TActorBootstrapped<TYtUploader> {
217227 return ;
218228 }
219229
220- Send (Client, new TEvYtWrapper::TEvCreateTableWriterRequest (DstPath, TableWriterOptions ()));
230+ Send (Client, new TEvYtWrapper::TEvCreateTableWriterRequest (* DstPath, TableWriterOptions ()));
221231 }
222232
223233 void Handle (TEvYtWrapper::TEvGetNodeResponse::TPtr& ev) {
@@ -231,11 +241,11 @@ class TYtUploader: public TActorBootstrapped<TYtUploader> {
231241 return ;
232242 }
233243
234- if (!CheckResult (CheckNodeType (DstPath. GetPath (), result.Value (), " table" ), TStringBuf (" CheckNodeType" ))) {
244+ if (!CheckResult (CheckNodeType (DstPath-> GetPath (), result.Value (), " table" ), TStringBuf (" CheckNodeType" ))) {
235245 return ;
236246 }
237247
238- Send (Client, new TEvYtWrapper::TEvCreateTableWriterRequest (DstPath, TableWriterOptions ()));
248+ Send (Client, new TEvYtWrapper::TEvCreateTableWriterRequest (* DstPath, TableWriterOptions ()));
239249 }
240250
241251 void Handle (TEvYtWrapper::TEvCreateTableWriterResponse::TPtr& ev) {
@@ -394,7 +404,7 @@ class TYtUploader: public TActorBootstrapped<TYtUploader> {
394404 ui32 retries)
395405 : ServerName(serverName)
396406 , Token(token)
397- , DstPath(TRichYPath::Parse( dstPath) )
407+ , DstPathStr( dstPath)
398408 , UseTypeV3(useTypeV3)
399409 , Schema(GenTableSchema(columns))
400410 , Retries(retries)
@@ -407,6 +417,16 @@ class TYtUploader: public TActorBootstrapped<TYtUploader> {
407417 << " : self# " << SelfId ()
408418 << " , attempt# " << Attempt);
409419
420+ Become (&TThis::StateWork);
421+
422+ if (!DstPath) {
423+ try {
424+ DstPath.ConstructInPlace (DstPathStr);
425+ } catch (const NYT::TErrorException& ex) {
426+ return Finish (false , ex.what ());
427+ }
428+ }
429+
410430 NameTable.Reset ();
411431 Last = false ;
412432
@@ -417,9 +437,7 @@ class TYtUploader: public TActorBootstrapped<TYtUploader> {
417437 }
418438
419439 Client = RegisterWithSameMailbox (CreateYtWrapper (ServerName, Token));
420- Send (Client, new TEvYtWrapper::TEvNodeExistsRequest (DstPath.GetPath (), TNodeExistsOptions ()));
421-
422- Become (&TThis::StateWork);
440+ Send (Client, new TEvYtWrapper::TEvNodeExistsRequest (DstPath->GetPath (), TNodeExistsOptions ()));
423441 }
424442
425443 STATEFN (StateWork) {
@@ -442,7 +460,7 @@ class TYtUploader: public TActorBootstrapped<TYtUploader> {
442460private:
443461 const TString ServerName;
444462 const TString Token;
445- const TRichYPath DstPath ;
463+ const TString DstPathStr ;
446464 const bool UseTypeV3;
447465 const TTableSchema Schema;
448466 const ui32 Retries;
@@ -452,6 +470,7 @@ class TYtUploader: public TActorBootstrapped<TYtUploader> {
452470 TActorId Writer;
453471 TActorId Scanner;
454472
473+ TMaybe<TRichYPath> DstPath;
455474 TNameTablePtr NameTable;
456475 bool Last;
457476 TMaybe<TString> Error;
@@ -460,7 +479,7 @@ class TYtUploader: public TActorBootstrapped<TYtUploader> {
460479
461480class TYtBuffer : public IBuffer {
462481 struct TColumn {
463- NScheme::TTypeId Type;
482+ NScheme::TTypeInfo Type;
464483 int Id; // in name table
465484 };
466485
@@ -472,20 +491,25 @@ class TYtBuffer: public IBuffer {
472491
473492 int i = 0 ;
474493 for (const auto & [tag, column] : columns) {
475- // TODO: support pg types
476- Y_ABORT_UNLESS (column.Type .GetTypeId () != NScheme::NTypeIds::Pg, " pg types are not supported" );
477- result[tag] = {column.Type .GetTypeId (), i++};
494+ result[tag] = {column.Type , i++};
478495 }
479496
480497 return result;
481498 }
482499
483- static TUnversionedValue ConvertValue (NScheme::TTypeId type, const TCell& cell, int id, bool useTypeV3, TString& buffer) {
500+ static TUnversionedValue ConvertValue (
501+ NScheme::TTypeInfo type,
502+ const TCell& cell,
503+ int id,
504+ bool useTypeV3,
505+ TString& buffer,
506+ TString& error)
507+ {
484508 if (cell.IsNull ()) {
485509 return MakeUnversionedNullValue (id);
486510 }
487511
488- switch (type) {
512+ switch (type. GetTypeId () ) {
489513 case NScheme::NTypeIds::Int32:
490514 return MakeUnversionedInt64Value (cell.AsValue <i32 >(), id);
491515 case NScheme::NTypeIds::Uint32:
@@ -509,9 +533,19 @@ class TYtBuffer: public IBuffer {
509533 return MakeUnversionedDoubleValue (cell.AsValue <double >(), id);
510534 case NScheme::NTypeIds::Float:
511535 return MakeUnversionedDoubleValue (cell.AsValue <float >(), id);
536+ case NScheme::NTypeIds::Pg:
537+ if (auto pgResult = NPg::PgNativeTextFromNativeBinary (cell.AsBuf (), type.GetPgTypeDesc ());
538+ pgResult.Error )
539+ {
540+ error.swap (*pgResult.Error );
541+ return {};
542+ } else {
543+ buffer.swap (pgResult.Str );
544+ return MakeUnversionedStringValue (buffer, id);
545+ }
512546 default :
513547 if (useTypeV3) {
514- switch (type) {
548+ switch (type. GetTypeId () ) {
515549 case NScheme::NTypeIds::Date:
516550 return MakeUnversionedUint64Value (cell.AsValue <ui16>(), id);
517551 case NScheme::NTypeIds::Datetime:
@@ -527,7 +561,7 @@ class TYtBuffer: public IBuffer {
527561 case NScheme::NTypeIds::Interval64:
528562 return MakeUnversionedInt64Value (cell.AsValue <i64 >(), id);
529563 case NScheme::NTypeIds::Decimal:
530- buffer = NDataShard::DecimalToString (cell.AsValue <std::pair<ui64, i64 >>());
564+ buffer = NDataShard::DecimalToString (cell.AsValue <std::pair<ui64, i64 >>(), type );
531565 return MakeUnversionedStringValue (buffer, id);
532566 case NScheme::NTypeIds::DyNumber:
533567 buffer = NDataShard::DyNumberToString (cell.AsBuf ());
@@ -579,7 +613,12 @@ class TYtBuffer: public IBuffer {
579613 const auto & cell = (*row)[i];
580614
581615 TString buffer;
582- const auto value = ConvertValue (column.Type , cell, column.Id , UseTypeV3, buffer);
616+ TString error;
617+ const auto value = ConvertValue (column.Type , cell, column.Id , UseTypeV3, buffer, error);
618+ if (!error.empty ()) {
619+ ErrorString.swap (error);
620+ return false ;
621+ }
583622
584623 rowBuilder.AddValue (value);
585624 BytesRead += cell.Size ();
@@ -613,7 +652,7 @@ class TYtBuffer: public IBuffer {
613652 }
614653
615654 TString GetError () const override {
616- Y_ABORT ( " unreachable " ) ;
655+ return ErrorString ;
617656 }
618657
619658private:
@@ -628,6 +667,8 @@ class TYtBuffer: public IBuffer {
628667 ui64 BytesRead;
629668 ui64 BytesSent;
630669
670+ TString ErrorString;
671+
631672}; // TYtBuffer
632673
633674IActor* TYtExport::CreateUploader (const TActorId&, ui64) const {
@@ -658,4 +699,4 @@ void TYtExport::Shutdown() const {
658699} // NYndx
659700} // NKikimr
660701
661- #endif // KIKIMR_DISABLE_YT
702+ #endif // KIKIMR_DISABLE_YT
0 commit comments