3
3
#include " export_yt.h"
4
4
#include " yt_wrapper.h"
5
5
6
- #include < contrib/ydb/core/protos/flat_scheme_op.pb.h>
7
- #include < contrib/ydb/library/services/services.pb.h>
8
- #include < contrib/ydb/core/tablet_flat/flat_row_state.h>
9
- #include < contrib/ydb/core/tx/datashard/export_common.h>
10
- #include < contrib/ydb/library/binary_json/read.h>
11
- #include < contrib/ydb/library/actors/core/actor_bootstrapped.h>
12
- #include < contrib/ydb/library/actors/core/hfunc.h>
6
+ #include < ydb/core/protos/flat_scheme_op.pb.h>
7
+ #if __has_include("ydb/core/protos/flat_scheme_op.deps.pb.h")
8
+ #include < ydb/core/protos/flat_scheme_op.deps.pb.h> // Y_IGNORE
9
+ #endif
10
+ #include < ydb/library/services/services.pb.h>
11
+ #include < ydb/core/tablet_flat/flat_row_state.h>
12
+ #include < ydb/core/tx/datashard/export_common.h>
13
+ #include < ydb/core/tx/datashard/type_serialization.h>
14
+ #include < ydb/library/actors/core/actor_bootstrapped.h>
15
+ #include < ydb/library/actors/core/hfunc.h>
16
+ #include < ydb/library/actors/core/log.h>
17
+
18
+ #include < yql/essentials/types/binary_json/read.h>
19
+ #include < yql/essentials/parser/pg_wrapper/interface/type_desc.h>
13
20
14
21
#include < yt/yt/client/table_client/config.h>
15
22
#include < yt/yt/client/table_client/name_table.h>
@@ -99,8 +106,6 @@ class TYtUploader: public TActorBootstrapped<TYtUploader> {
99
106
TVector<TColumnSchema> schema;
100
107
101
108
for (const auto & [_, column] : columns) {
102
- // TODO: support pg types
103
- Y_ABORT_UNLESS (column.Type .GetTypeId () != NScheme::NTypeIds::Pg, " pg types are not supported" );
104
109
schema.emplace_back (column.Name , ConvertType (column.Type .GetTypeId ()));
105
110
}
106
111
@@ -129,6 +134,7 @@ class TYtUploader: public TActorBootstrapped<TYtUploader> {
129
134
|| error.FindMatching (NYT::NTableClient::EErrorCode::FormatCannotRepresentRow)
130
135
|| error.FindMatching (NYT::NTableClient::EErrorCode::IncompatibleSchemas)
131
136
// Cypress errors
137
+ || error.FindMatching (NYT::NYTree::EErrorCode::ResolveError)
132
138
|| error.FindMatching (NYT::NYTree::EErrorCode::MaxChildCountViolation)
133
139
|| error.FindMatching (NYT::NYTree::EErrorCode::MaxStringLengthViolation)
134
140
|| error.FindMatching (NYT::NYTree::EErrorCode::MaxAttributeSizeViolation)
@@ -158,12 +164,16 @@ class TYtUploader: public TActorBootstrapped<TYtUploader> {
158
164
return {};
159
165
}
160
166
161
- return NYT::TError (NCustomErrorCodes::InvalidNodeType, TStringBuilder () << " Invalid type of " << path
162
- << " : expected \" " << expected << " \" "
163
- << " , actual \" " << actual << " \" " );
167
+ return NYT::TError (
168
+ NCustomErrorCodes::InvalidNodeType,
169
+ " Invalid type of %v"
170
+ " : expected %Qv"
171
+ " , actual %Qv" ,
172
+ path,
173
+ expected,
174
+ actual);
164
175
} catch (const yexception& ex) {
165
- return NYT::TError (TStringBuilder () << " Error while checking type of " << path
166
- << " : " << ex.what ());
176
+ return NYT::TError (" Error while checking type of %v: %v" , path, ex.what ());
167
177
}
168
178
}
169
179
@@ -189,12 +199,12 @@ class TYtUploader: public TActorBootstrapped<TYtUploader> {
189
199
opts.Attributes = std::move (attrs);
190
200
}
191
201
192
- Send (Client, new TEvYtWrapper::TEvCreateNodeRequest (DstPath. GetPath (), EObjectType::Table, opts));
202
+ Send (Client, new TEvYtWrapper::TEvCreateNodeRequest (DstPath-> GetPath (), EObjectType::Table, opts));
193
203
} else {
194
204
TGetNodeOptions opts;
195
205
opts.Attributes = TVector<TString>{" type" };
196
206
197
- Send (Client, new TEvYtWrapper::TEvGetNodeRequest (DstPath. GetPath (), opts));
207
+ Send (Client, new TEvYtWrapper::TEvGetNodeRequest (DstPath-> GetPath (), opts));
198
208
}
199
209
}
200
210
@@ -217,7 +227,7 @@ class TYtUploader: public TActorBootstrapped<TYtUploader> {
217
227
return ;
218
228
}
219
229
220
- Send (Client, new TEvYtWrapper::TEvCreateTableWriterRequest (DstPath, TableWriterOptions ()));
230
+ Send (Client, new TEvYtWrapper::TEvCreateTableWriterRequest (* DstPath, TableWriterOptions ()));
221
231
}
222
232
223
233
void Handle (TEvYtWrapper::TEvGetNodeResponse::TPtr& ev) {
@@ -231,11 +241,11 @@ class TYtUploader: public TActorBootstrapped<TYtUploader> {
231
241
return ;
232
242
}
233
243
234
- if (!CheckResult (CheckNodeType (DstPath. GetPath (), result.Value (), " table" ), TStringBuf (" CheckNodeType" ))) {
244
+ if (!CheckResult (CheckNodeType (DstPath-> GetPath (), result.Value (), " table" ), TStringBuf (" CheckNodeType" ))) {
235
245
return ;
236
246
}
237
247
238
- Send (Client, new TEvYtWrapper::TEvCreateTableWriterRequest (DstPath, TableWriterOptions ()));
248
+ Send (Client, new TEvYtWrapper::TEvCreateTableWriterRequest (* DstPath, TableWriterOptions ()));
239
249
}
240
250
241
251
void Handle (TEvYtWrapper::TEvCreateTableWriterResponse::TPtr& ev) {
@@ -394,7 +404,7 @@ class TYtUploader: public TActorBootstrapped<TYtUploader> {
394
404
ui32 retries)
395
405
: ServerName(serverName)
396
406
, Token(token)
397
- , DstPath(TRichYPath::Parse( dstPath) )
407
+ , DstPathStr( dstPath)
398
408
, UseTypeV3(useTypeV3)
399
409
, Schema(GenTableSchema(columns))
400
410
, Retries(retries)
@@ -407,6 +417,16 @@ class TYtUploader: public TActorBootstrapped<TYtUploader> {
407
417
<< " : self# " << SelfId ()
408
418
<< " , attempt# " << Attempt);
409
419
420
+ Become (&TThis::StateWork);
421
+
422
+ if (!DstPath) {
423
+ try {
424
+ DstPath.ConstructInPlace (DstPathStr);
425
+ } catch (const NYT::TErrorException& ex) {
426
+ return Finish (false , ex.what ());
427
+ }
428
+ }
429
+
410
430
NameTable.Reset ();
411
431
Last = false ;
412
432
@@ -417,9 +437,7 @@ class TYtUploader: public TActorBootstrapped<TYtUploader> {
417
437
}
418
438
419
439
Client = RegisterWithSameMailbox (CreateYtWrapper (ServerName, Token));
420
- Send (Client, new TEvYtWrapper::TEvNodeExistsRequest (DstPath.GetPath (), TNodeExistsOptions ()));
421
-
422
- Become (&TThis::StateWork);
440
+ Send (Client, new TEvYtWrapper::TEvNodeExistsRequest (DstPath->GetPath (), TNodeExistsOptions ()));
423
441
}
424
442
425
443
STATEFN (StateWork) {
@@ -442,7 +460,7 @@ class TYtUploader: public TActorBootstrapped<TYtUploader> {
442
460
private:
443
461
const TString ServerName;
444
462
const TString Token;
445
- const TRichYPath DstPath ;
463
+ const TString DstPathStr ;
446
464
const bool UseTypeV3;
447
465
const TTableSchema Schema;
448
466
const ui32 Retries;
@@ -452,6 +470,7 @@ class TYtUploader: public TActorBootstrapped<TYtUploader> {
452
470
TActorId Writer;
453
471
TActorId Scanner;
454
472
473
+ TMaybe<TRichYPath> DstPath;
455
474
TNameTablePtr NameTable;
456
475
bool Last;
457
476
TMaybe<TString> Error;
@@ -460,7 +479,7 @@ class TYtUploader: public TActorBootstrapped<TYtUploader> {
460
479
461
480
class TYtBuffer : public IBuffer {
462
481
struct TColumn {
463
- NScheme::TTypeId Type;
482
+ NScheme::TTypeInfo Type;
464
483
int Id; // in name table
465
484
};
466
485
@@ -472,20 +491,25 @@ class TYtBuffer: public IBuffer {
472
491
473
492
int i = 0 ;
474
493
for (const auto & [tag, column] : columns) {
475
- // TODO: support pg types
476
- Y_ABORT_UNLESS (column.Type .GetTypeId () != NScheme::NTypeIds::Pg, " pg types are not supported" );
477
- result[tag] = {column.Type .GetTypeId (), i++};
494
+ result[tag] = {column.Type , i++};
478
495
}
479
496
480
497
return result;
481
498
}
482
499
483
- static TUnversionedValue ConvertValue (NScheme::TTypeId type, const TCell& cell, int id, bool useTypeV3, TString& buffer) {
500
+ static TUnversionedValue ConvertValue (
501
+ NScheme::TTypeInfo type,
502
+ const TCell& cell,
503
+ int id,
504
+ bool useTypeV3,
505
+ TString& buffer,
506
+ TString& error)
507
+ {
484
508
if (cell.IsNull ()) {
485
509
return MakeUnversionedNullValue (id);
486
510
}
487
511
488
- switch (type) {
512
+ switch (type. GetTypeId () ) {
489
513
case NScheme::NTypeIds::Int32:
490
514
return MakeUnversionedInt64Value (cell.AsValue <i32 >(), id);
491
515
case NScheme::NTypeIds::Uint32:
@@ -509,9 +533,19 @@ class TYtBuffer: public IBuffer {
509
533
return MakeUnversionedDoubleValue (cell.AsValue <double >(), id);
510
534
case NScheme::NTypeIds::Float:
511
535
return MakeUnversionedDoubleValue (cell.AsValue <float >(), id);
536
+ case NScheme::NTypeIds::Pg:
537
+ if (auto pgResult = NPg::PgNativeTextFromNativeBinary (cell.AsBuf (), type.GetPgTypeDesc ());
538
+ pgResult.Error )
539
+ {
540
+ error.swap (*pgResult.Error );
541
+ return {};
542
+ } else {
543
+ buffer.swap (pgResult.Str );
544
+ return MakeUnversionedStringValue (buffer, id);
545
+ }
512
546
default :
513
547
if (useTypeV3) {
514
- switch (type) {
548
+ switch (type. GetTypeId () ) {
515
549
case NScheme::NTypeIds::Date:
516
550
return MakeUnversionedUint64Value (cell.AsValue <ui16>(), id);
517
551
case NScheme::NTypeIds::Datetime:
@@ -527,7 +561,7 @@ class TYtBuffer: public IBuffer {
527
561
case NScheme::NTypeIds::Interval64:
528
562
return MakeUnversionedInt64Value (cell.AsValue <i64 >(), id);
529
563
case NScheme::NTypeIds::Decimal:
530
- buffer = NDataShard::DecimalToString (cell.AsValue <std::pair<ui64, i64 >>());
564
+ buffer = NDataShard::DecimalToString (cell.AsValue <std::pair<ui64, i64 >>(), type );
531
565
return MakeUnversionedStringValue (buffer, id);
532
566
case NScheme::NTypeIds::DyNumber:
533
567
buffer = NDataShard::DyNumberToString (cell.AsBuf ());
@@ -579,7 +613,12 @@ class TYtBuffer: public IBuffer {
579
613
const auto & cell = (*row)[i];
580
614
581
615
TString buffer;
582
- const auto value = ConvertValue (column.Type , cell, column.Id , UseTypeV3, buffer);
616
+ TString error;
617
+ const auto value = ConvertValue (column.Type , cell, column.Id , UseTypeV3, buffer, error);
618
+ if (!error.empty ()) {
619
+ ErrorString.swap (error);
620
+ return false ;
621
+ }
583
622
584
623
rowBuilder.AddValue (value);
585
624
BytesRead += cell.Size ();
@@ -613,7 +652,7 @@ class TYtBuffer: public IBuffer {
613
652
}
614
653
615
654
TString GetError () const override {
616
- Y_ABORT ( " unreachable " ) ;
655
+ return ErrorString ;
617
656
}
618
657
619
658
private:
@@ -628,6 +667,8 @@ class TYtBuffer: public IBuffer {
628
667
ui64 BytesRead;
629
668
ui64 BytesSent;
630
669
670
+ TString ErrorString;
671
+
631
672
}; // TYtBuffer
632
673
633
674
IActor* TYtExport::CreateUploader (const TActorId&, ui64) const {
@@ -658,4 +699,4 @@ void TYtExport::Shutdown() const {
658
699
} // NYndx
659
700
} // NKikimr
660
701
661
- #endif // KIKIMR_DISABLE_YT
702
+ #endif // KIKIMR_DISABLE_YT
0 commit comments