Skip to content

Commit c2adf95

Browse files
committed
Support PG types in YT export
1 parent be00795 commit c2adf95

File tree

6 files changed

+84
-42
lines changed

6 files changed

+84
-42
lines changed

ydb/core/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ RECURSE(
6464
ydb_convert
6565
ymq
6666
yql_testlib
67+
yt
6768
)
6869

6970
RECURSE_FOR_TESTS(

ydb/core/yt/export_yt.cpp

Lines changed: 76 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,20 @@
33
#include "export_yt.h"
44
#include "yt_wrapper.h"
55

6-
#include <contrib/ydb/core/protos/flat_scheme_op.pb.h>
7-
#include <contrib/ydb/library/services/services.pb.h>
8-
#include <contrib/ydb/core/tablet_flat/flat_row_state.h>
9-
#include <contrib/ydb/core/tx/datashard/export_common.h>
10-
#include <contrib/ydb/library/binary_json/read.h>
11-
#include <contrib/ydb/library/actors/core/actor_bootstrapped.h>
12-
#include <contrib/ydb/library/actors/core/hfunc.h>
6+
#include <ydb/core/protos/flat_scheme_op.pb.h>
7+
#if __has_include("ydb/core/protos/flat_scheme_op.deps.pb.h")
8+
#include <ydb/core/protos/flat_scheme_op.deps.pb.h> // Y_IGNORE
9+
#endif
10+
#include <ydb/library/services/services.pb.h>
11+
#include <ydb/core/tablet_flat/flat_row_state.h>
12+
#include <ydb/core/tx/datashard/export_common.h>
13+
#include <ydb/core/tx/datashard/type_serialization.h>
14+
#include <ydb/library/actors/core/actor_bootstrapped.h>
15+
#include <ydb/library/actors/core/hfunc.h>
16+
#include <ydb/library/actors/core/log.h>
17+
18+
#include <yql/essentials/types/binary_json/read.h>
19+
#include <yql/essentials/parser/pg_wrapper/interface/type_desc.h>
1320

1421
#include <yt/yt/client/table_client/config.h>
1522
#include <yt/yt/client/table_client/name_table.h>
@@ -99,8 +106,6 @@ class TYtUploader: public TActorBootstrapped<TYtUploader> {
99106
TVector<TColumnSchema> schema;
100107

101108
for (const auto& [_, column] : columns) {
102-
// TODO: support pg types
103-
Y_ABORT_UNLESS(column.Type.GetTypeId() != NScheme::NTypeIds::Pg, "pg types are not supported");
104109
schema.emplace_back(column.Name, ConvertType(column.Type.GetTypeId()));
105110
}
106111

@@ -129,6 +134,7 @@ class TYtUploader: public TActorBootstrapped<TYtUploader> {
129134
|| error.FindMatching(NYT::NTableClient::EErrorCode::FormatCannotRepresentRow)
130135
|| error.FindMatching(NYT::NTableClient::EErrorCode::IncompatibleSchemas)
131136
// Cypress errors
137+
|| error.FindMatching(NYT::NYTree::EErrorCode::ResolveError)
132138
|| error.FindMatching(NYT::NYTree::EErrorCode::MaxChildCountViolation)
133139
|| error.FindMatching(NYT::NYTree::EErrorCode::MaxStringLengthViolation)
134140
|| error.FindMatching(NYT::NYTree::EErrorCode::MaxAttributeSizeViolation)
@@ -158,12 +164,16 @@ class TYtUploader: public TActorBootstrapped<TYtUploader> {
158164
return {};
159165
}
160166

161-
return NYT::TError(NCustomErrorCodes::InvalidNodeType, TStringBuilder() << "Invalid type of " << path
162-
<< ": expected \"" << expected << "\""
163-
<< ", actual \"" << actual << "\"");
167+
return NYT::TError(
168+
NCustomErrorCodes::InvalidNodeType,
169+
"Invalid type of %v"
170+
": expected %Qv"
171+
", actual %Qv",
172+
path,
173+
expected,
174+
actual);
164175
} catch (const yexception& ex) {
165-
return NYT::TError(TStringBuilder() << "Error while checking type of " << path
166-
<< ": " << ex.what());
176+
return NYT::TError("Error while checking type of %v: %v", path, ex.what());
167177
}
168178
}
169179

@@ -189,12 +199,12 @@ class TYtUploader: public TActorBootstrapped<TYtUploader> {
189199
opts.Attributes = std::move(attrs);
190200
}
191201

192-
Send(Client, new TEvYtWrapper::TEvCreateNodeRequest(DstPath.GetPath(), EObjectType::Table, opts));
202+
Send(Client, new TEvYtWrapper::TEvCreateNodeRequest(DstPath->GetPath(), EObjectType::Table, opts));
193203
} else {
194204
TGetNodeOptions opts;
195205
opts.Attributes = TVector<TString>{"type"};
196206

197-
Send(Client, new TEvYtWrapper::TEvGetNodeRequest(DstPath.GetPath(), opts));
207+
Send(Client, new TEvYtWrapper::TEvGetNodeRequest(DstPath->GetPath(), opts));
198208
}
199209
}
200210

@@ -217,7 +227,7 @@ class TYtUploader: public TActorBootstrapped<TYtUploader> {
217227
return;
218228
}
219229

220-
Send(Client, new TEvYtWrapper::TEvCreateTableWriterRequest(DstPath, TableWriterOptions()));
230+
Send(Client, new TEvYtWrapper::TEvCreateTableWriterRequest(*DstPath, TableWriterOptions()));
221231
}
222232

223233
void Handle(TEvYtWrapper::TEvGetNodeResponse::TPtr& ev) {
@@ -231,11 +241,11 @@ class TYtUploader: public TActorBootstrapped<TYtUploader> {
231241
return;
232242
}
233243

234-
if (!CheckResult(CheckNodeType(DstPath.GetPath(), result.Value(), "table"), TStringBuf("CheckNodeType"))) {
244+
if (!CheckResult(CheckNodeType(DstPath->GetPath(), result.Value(), "table"), TStringBuf("CheckNodeType"))) {
235245
return;
236246
}
237247

238-
Send(Client, new TEvYtWrapper::TEvCreateTableWriterRequest(DstPath, TableWriterOptions()));
248+
Send(Client, new TEvYtWrapper::TEvCreateTableWriterRequest(*DstPath, TableWriterOptions()));
239249
}
240250

241251
void Handle(TEvYtWrapper::TEvCreateTableWriterResponse::TPtr& ev) {
@@ -394,7 +404,7 @@ class TYtUploader: public TActorBootstrapped<TYtUploader> {
394404
ui32 retries)
395405
: ServerName(serverName)
396406
, Token(token)
397-
, DstPath(TRichYPath::Parse(dstPath))
407+
, DstPathStr(dstPath)
398408
, UseTypeV3(useTypeV3)
399409
, Schema(GenTableSchema(columns))
400410
, Retries(retries)
@@ -407,6 +417,16 @@ class TYtUploader: public TActorBootstrapped<TYtUploader> {
407417
<< ": self# " << SelfId()
408418
<< ", attempt# " << Attempt);
409419

420+
Become(&TThis::StateWork);
421+
422+
if (!DstPath) {
423+
try {
424+
DstPath.ConstructInPlace(DstPathStr);
425+
} catch (const NYT::TErrorException& ex) {
426+
return Finish(false, ex.what());
427+
}
428+
}
429+
410430
NameTable.Reset();
411431
Last = false;
412432

@@ -417,9 +437,7 @@ class TYtUploader: public TActorBootstrapped<TYtUploader> {
417437
}
418438

419439
Client = RegisterWithSameMailbox(CreateYtWrapper(ServerName, Token));
420-
Send(Client, new TEvYtWrapper::TEvNodeExistsRequest(DstPath.GetPath(), TNodeExistsOptions()));
421-
422-
Become(&TThis::StateWork);
440+
Send(Client, new TEvYtWrapper::TEvNodeExistsRequest(DstPath->GetPath(), TNodeExistsOptions()));
423441
}
424442

425443
STATEFN(StateWork) {
@@ -442,7 +460,7 @@ class TYtUploader: public TActorBootstrapped<TYtUploader> {
442460
private:
443461
const TString ServerName;
444462
const TString Token;
445-
const TRichYPath DstPath;
463+
const TString DstPathStr;
446464
const bool UseTypeV3;
447465
const TTableSchema Schema;
448466
const ui32 Retries;
@@ -452,6 +470,7 @@ class TYtUploader: public TActorBootstrapped<TYtUploader> {
452470
TActorId Writer;
453471
TActorId Scanner;
454472

473+
TMaybe<TRichYPath> DstPath;
455474
TNameTablePtr NameTable;
456475
bool Last;
457476
TMaybe<TString> Error;
@@ -460,7 +479,7 @@ class TYtUploader: public TActorBootstrapped<TYtUploader> {
460479

461480
class TYtBuffer: public IBuffer {
462481
struct TColumn {
463-
NScheme::TTypeId Type;
482+
NScheme::TTypeInfo Type;
464483
int Id; // in name table
465484
};
466485

@@ -472,20 +491,25 @@ class TYtBuffer: public IBuffer {
472491

473492
int i = 0;
474493
for (const auto& [tag, column] : columns) {
475-
// TODO: support pg types
476-
Y_ABORT_UNLESS(column.Type.GetTypeId() != NScheme::NTypeIds::Pg, "pg types are not supported");
477-
result[tag] = {column.Type.GetTypeId(), i++};
494+
result[tag] = {column.Type, i++};
478495
}
479496

480497
return result;
481498
}
482499

483-
static TUnversionedValue ConvertValue(NScheme::TTypeId type, const TCell& cell, int id, bool useTypeV3, TString& buffer) {
500+
static TUnversionedValue ConvertValue(
501+
NScheme::TTypeInfo type,
502+
const TCell& cell,
503+
int id,
504+
bool useTypeV3,
505+
TString& buffer,
506+
TString& error)
507+
{
484508
if (cell.IsNull()) {
485509
return MakeUnversionedNullValue(id);
486510
}
487511

488-
switch (type) {
512+
switch (type.GetTypeId()) {
489513
case NScheme::NTypeIds::Int32:
490514
return MakeUnversionedInt64Value(cell.AsValue<i32>(), id);
491515
case NScheme::NTypeIds::Uint32:
@@ -509,9 +533,19 @@ class TYtBuffer: public IBuffer {
509533
return MakeUnversionedDoubleValue(cell.AsValue<double>(), id);
510534
case NScheme::NTypeIds::Float:
511535
return MakeUnversionedDoubleValue(cell.AsValue<float>(), id);
536+
case NScheme::NTypeIds::Pg:
537+
if (auto pgResult = NPg::PgNativeTextFromNativeBinary(cell.AsBuf(), type.GetPgTypeDesc());
538+
pgResult.Error)
539+
{
540+
error.swap(*pgResult.Error);
541+
return {};
542+
} else {
543+
buffer.swap(pgResult.Str);
544+
return MakeUnversionedStringValue(buffer, id);
545+
}
512546
default:
513547
if (useTypeV3) {
514-
switch (type) {
548+
switch (type.GetTypeId()) {
515549
case NScheme::NTypeIds::Date:
516550
return MakeUnversionedUint64Value(cell.AsValue<ui16>(), id);
517551
case NScheme::NTypeIds::Datetime:
@@ -527,7 +561,7 @@ class TYtBuffer: public IBuffer {
527561
case NScheme::NTypeIds::Interval64:
528562
return MakeUnversionedInt64Value(cell.AsValue<i64>(), id);
529563
case NScheme::NTypeIds::Decimal:
530-
buffer = NDataShard::DecimalToString(cell.AsValue<std::pair<ui64, i64>>());
564+
buffer = NDataShard::DecimalToString(cell.AsValue<std::pair<ui64, i64>>(), type);
531565
return MakeUnversionedStringValue(buffer, id);
532566
case NScheme::NTypeIds::DyNumber:
533567
buffer = NDataShard::DyNumberToString(cell.AsBuf());
@@ -579,7 +613,12 @@ class TYtBuffer: public IBuffer {
579613
const auto& cell = (*row)[i];
580614

581615
TString buffer;
582-
const auto value = ConvertValue(column.Type, cell, column.Id, UseTypeV3, buffer);
616+
TString error;
617+
const auto value = ConvertValue(column.Type, cell, column.Id, UseTypeV3, buffer, error);
618+
if (!error.empty()) {
619+
ErrorString.swap(error);
620+
return false;
621+
}
583622

584623
rowBuilder.AddValue(value);
585624
BytesRead += cell.Size();
@@ -613,7 +652,7 @@ class TYtBuffer: public IBuffer {
613652
}
614653

615654
TString GetError() const override {
616-
Y_ABORT("unreachable");
655+
return ErrorString;
617656
}
618657

619658
private:
@@ -628,6 +667,8 @@ class TYtBuffer: public IBuffer {
628667
ui64 BytesRead;
629668
ui64 BytesSent;
630669

670+
TString ErrorString;
671+
631672
}; // TYtBuffer
632673

633674
IActor* TYtExport::CreateUploader(const TActorId&, ui64) const {
@@ -658,4 +699,4 @@ void TYtExport::Shutdown() const {
658699
} // NYndx
659700
} // NKikimr
660701

661-
#endif // KIKIMR_DISABLE_YT
702+
#endif // KIKIMR_DISABLE_YT

ydb/core/yt/export_yt.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
#ifndef KIKIMR_DISABLE_YT
44

5-
#include <contrib/ydb/core/tx/datashard/export_iface.h>
5+
#include <ydb/core/tx/datashard/export_iface.h>
66

77
namespace NKikimr {
88
namespace NYndx {

ydb/core/yt/ya.make

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ ELSE()
1414
yt_wrapper.h
1515
)
1616
PEERDIR(
17-
contrib/ydb/library/actors/core
17+
ydb/library/actors/core
1818
ydb/core/base
1919
ydb/core/protos
2020
ydb/library/aclib

ydb/core/yt/yt_wrapper.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,10 @@
44

55
#include "yt_wrapper.h"
66

7-
#include <contrib/ydb/library/services/services.pb.h>
8-
9-
#include <contrib/ydb/library/actors/core/actor.h>
10-
#include <contrib/ydb/library/actors/core/hfunc.h>
7+
#include <ydb/library/actors/core/actor.h>
8+
#include <ydb/library/actors/core/actorsystem.h>
9+
#include <ydb/library/actors/core/hfunc.h>
10+
#include <ydb/library/services/services.pb.h>
1111

1212
#include <yt/yt/client/api/rpc_proxy/connection.h>
1313
#include <yt/yt/client/api/table_writer.h>

ydb/core/yt/yt_wrapper.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
#pragma once
22

3-
#include <contrib/ydb/core/base/events.h>
3+
#include <ydb/core/base/events.h>
44

55
#include <yt/yt/client/api/client.h>
66
#include <yt/yt/client/api/rpc_proxy/config.h>

0 commit comments

Comments
 (0)