Skip to content

YDB-2757 Fix UUID column export #2789

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions ydb/core/io_formats/cell_maker/cell_maker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <ydb/library/binary_json/write.h>
#include <ydb/library/dynumber/dynumber.h>
#include <ydb/library/uuid/uuid.h>

#include <ydb/library/yql/minikql/dom/yson.h>
#include <ydb/library/yql/minikql/dom/json.h>
Expand Down Expand Up @@ -132,6 +133,21 @@ namespace {
return true;
}

struct TUuidHolder {
union {
ui16 Array[8];
ui64 Halves[2];
} Buf;
};

template <>
bool TryParse(TStringBuf value, TUuidHolder& result) {
if (!NUuid::ParseUuidToArray(value, result.Buf.Array, false)) {
return false;
}
return true;
}

template <typename T, typename U>
using TConverter = std::function<U(const T&)>;

Expand Down Expand Up @@ -171,6 +187,14 @@ namespace {
return v.Str;
}

TStringBuf UuidToStringBuf(const TUuidHolder& uuid) {
char uuidBuf[16];

NUuid::UuidHalfsToBytes(uuidBuf, 16, uuid.Buf.Halves[1], uuid.Buf.Halves[0]);

return TStringBuf(uuidBuf, 16);
}

template <typename T, typename U = T>
struct TCellMaker {
static bool Make(TCell& c, TStringBuf v, TMemoryPool& pool, TString& err, TConverter<T, U> conv = &Implicit<T, U>) {
Expand Down Expand Up @@ -297,6 +321,8 @@ bool MakeCell(TCell& cell, TStringBuf value, NScheme::TTypeInfo type, TMemoryPoo
return TCellMaker<NYql::NDecimal::TInt128, std::pair<ui64, ui64>>::Make(cell, value, pool, err, &Int128ToPair);
case NScheme::NTypeIds::Pg:
return TCellMaker<NPg::TConvertResult, TStringBuf>::Make(cell, value, pool, err, &PgToStringBuf, type.GetTypeDesc());
case NScheme::NTypeIds::Uuid:
return TCellMaker<TUuidHolder, TStringBuf>::Make(cell, value, pool, err, &UuidToStringBuf);
default:
return false;
}
Expand Down Expand Up @@ -390,6 +416,7 @@ bool CheckCellValue(const TCell& cell, NScheme::TTypeInfo type) {
case NScheme::NTypeIds::JsonDocument: // checked at parsing time
case NScheme::NTypeIds::DyNumber: // checked at parsing time
case NScheme::NTypeIds::Pg: // checked at parsing time
case NScheme::NTypeIds::Uuid: // checked at parsing time
return true;
case NScheme::NTypeIds::Date:
return cell.AsValue<ui16>() < NUdf::MAX_DATE;
Expand Down
10 changes: 10 additions & 0 deletions ydb/core/tx/datashard/export_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,5 +118,15 @@ bool PgToStream(TStringBuf data, void* typeDesc, IOutputStream& out, TString& er
return true;
}

bool UuidToStream(const std::pair<ui64, ui64>& loHi, IOutputStream& out, TString& err) {
Y_UNUSED(err);

NYdb::TUuidValue uuid(loHi.first, loHi.second);

out << uuid.ToString();

return true;
}

} // NDataShard
} // NKikimr
1 change: 1 addition & 0 deletions ydb/core/tx/datashard/export_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ TString DyNumberToString(TStringBuf data);
bool DecimalToStream(const std::pair<ui64, i64>& loHi, IOutputStream& out, TString& err);
bool DyNumberToStream(TStringBuf data, IOutputStream& out, TString& err);
bool PgToStream(TStringBuf data, void* typeDesc, IOutputStream& out, TString& err);
bool UuidToStream(const std::pair<ui64, ui64>& loHi, IOutputStream& out, TString& err);

} // NDataShard
} // NKikimr
3 changes: 3 additions & 0 deletions ydb/core/tx/datashard/export_s3_buffer_raw.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,9 @@ bool TS3BufferRaw::Collect(const NTable::IScan::TRow& row, IOutputStream& out) {
case NScheme::NTypeIds::Pg:
serialized = PgToStream(cell.AsBuf(), column.Type.GetTypeDesc(), out, ErrorString);
break;
case NScheme::NTypeIds::Uuid:
serialized = UuidToStream(cell.AsValue<std::pair<ui64, ui64>>(), out, ErrorString);
break;
default:
Y_ABORT("Unsupported type");
}
Expand Down
24 changes: 24 additions & 0 deletions ydb/core/tx/schemeshard/ut_backup/ut_backup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,30 @@ Y_UNIT_TEST_SUITE(TBackupTests) {
});
}

Y_UNIT_TEST_WITH_COMPRESSION(BackupUuidColumn) {
TTestBasicRuntime runtime;

Backup(runtime, ToString(Codec), R"(
Name: "Table"
Columns { Name: "key" Type: "Uint32" }
Columns { Name: "value" Type: "Uuid" }
KeyColumnNames: ["key"]
)", [](TTestBasicRuntime& runtime) {
NKikimrMiniKQL::TResult result;
TString error;
NKikimrProto::EReplyStatus status = LocalMiniKQL(runtime, TTestTxConfig::FakeHiveTablets, Sprintf(R"(
(
(let key '( '('key (Uint32 '%d) ) ) )
(let row '( '('value (Uuid '"%s") ) ) )
(return (AsList (UpdateRow '__user__%s key row) ))
)
)", 1, "0000111122223333", "Table"), result, error);

UNIT_ASSERT_VALUES_EQUAL_C(status, NKikimrProto::EReplyStatus::OK, error);
UNIT_ASSERT_VALUES_EQUAL(error, "");
});
}

template<ECompressionCodec Codec>
void ShouldSucceedOnLargeData(ui32 minWriteBatchSize, const std::pair<ui32, ui32>& expectedResult) {
TTestBasicRuntime runtime;
Expand Down
71 changes: 70 additions & 1 deletion ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -989,7 +989,76 @@ value {
env.TestWaitNotification(runtime, txId);
TestGetExport(runtime, txId, "/MyRoot");

TestImport(runtime, txId, "/MyRoot", Sprintf(R"(
TestImport(runtime, ++txId, "/MyRoot", Sprintf(R"(
ImportFromS3Settings {
endpoint: "localhost:%d"
scheme: HTTP
items {
source_prefix: "Backup1"
destination_path: "/MyRoot/Restored"
}
}
)", port));
env.TestWaitNotification(runtime, txId);
TestGetImport(runtime, txId, "/MyRoot");
}

Y_UNIT_TEST(ExportImportUuid) {
TTestBasicRuntime runtime;
TTestEnv env(runtime, TTestEnvOptions().EnableTablePgTypes(true));
ui64 txId = 100;

TestCreateTable(runtime, ++txId, "/MyRoot", R"(
Name: "Table"
Columns { Name: "key" Type: "Uint32" }
Columns { Name: "value" Type: "Uuid" }
KeyColumnNames: ["key"]
)");
env.TestWaitNotification(runtime, txId);

{
TString tablePath = "/MyRoot/Table";
int partitionIdx = 0;

auto tableDesc = DescribePath(runtime, tablePath, true, true);
const auto& tablePartitions = tableDesc.GetPathDescription().GetTablePartitions();
UNIT_ASSERT(partitionIdx < tablePartitions.size());
const ui64 datashardTabletId = tablePartitions[partitionIdx].GetDatashardId();

NKikimrMiniKQL::TResult result;
TString error;
NKikimrProto::EReplyStatus status = LocalMiniKQL(runtime, datashardTabletId, Sprintf(R"(
(
(let key '( '('key (Uint32 '%d) ) ) )
(let row '( '('value (Uuid '"%s") ) ) )
(return (AsList (UpdateRow '__user__%s key row) ))
)
)", 1, "0123456789012345", "Table"), result, error);

UNIT_ASSERT_VALUES_EQUAL_C(status, NKikimrProto::EReplyStatus::OK, error);
UNIT_ASSERT_VALUES_EQUAL(error, "");
}

TPortManager portManager;
const ui16 port = portManager.GetPort();

TS3Mock s3Mock({}, TS3Mock::TSettings(port));
UNIT_ASSERT(s3Mock.Start());

TestExport(runtime, ++txId, "/MyRoot", Sprintf(R"(
ExportToS3Settings {
endpoint: "localhost:%d"
scheme: HTTP
items {
source_path: "/MyRoot/Table"
destination_prefix: "Backup1"
}
}
)", port));
env.TestWaitNotification(runtime, txId);
TestGetExport(runtime, txId, "/MyRoot");

TestImport(runtime, ++txId, "/MyRoot", Sprintf(R"(
ImportFromS3Settings {
endpoint: "localhost:%d"
scheme: HTTP
Expand Down