Skip to content

Support ydb dump for tables with serial types #9272

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Sep 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 21 additions & 4 deletions ydb/core/grpc_services/rpc_create_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,28 @@ class TCreateTableRPC : public TRpcSchemeRequestActor<TCreateTableRPC, TEvCreate
return;
}

StatusIds::StatusCode code = StatusIds::SUCCESS;
TString error;

bool hasSerial = false;
for (const auto& column : req->columns()) {
switch (column.default_value_case()) {
case Ydb::Table::ColumnMeta::kFromSequence: {
auto* seqDesc = modifyScheme->MutableCreateIndexedTable()->MutableSequenceDescription()->Add();
if (!FillSequenceDescription(*seqDesc, column.from_sequence(), code, error)) {
NYql::TIssues issues;
issues.AddIssue(NYql::TIssue(error));
return Reply(code, issues, ctx);
}
hasSerial = true;
break;
}
default: break;
}
}

NKikimrSchemeOp::TTableDescription* tableDesc = nullptr;
if (req->indexesSize()) {
if (req->indexesSize() || hasSerial) {
modifyScheme->SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpCreateIndexedTable);
tableDesc = modifyScheme->MutableCreateIndexedTable()->MutableTableDescription();
} else {
Expand All @@ -192,9 +212,6 @@ class TCreateTableRPC : public TRpcSchemeRequestActor<TCreateTableRPC, TEvCreate

tableDesc->SetName(name);

StatusIds::StatusCode code = StatusIds::SUCCESS;
TString error;

if (!FillColumnDescription(*tableDesc, req->columns(), code, error)) {
NYql::TIssues issues;
issues.AddIssue(NYql::TIssue(error));
Expand Down
12 changes: 12 additions & 0 deletions ydb/core/grpc_services/rpc_describe_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,14 @@ class TDescribeTableRPC : public TRpcSchemeRequestActor<TDescribeTableRPC, TEvDe
return Reply(Ydb::StatusIds::INTERNAL_ERROR, ctx);
}

StatusIds::StatusCode code = StatusIds::SUCCESS;
TString error;
if (!FillSequenceDescription(describeTableResult, tableDescription, code, error)) {
LOG_ERROR(ctx, NKikimrServices::GRPC_SERVER, "Unable to fill sequence description: %s", error.c_str());
Request_->RaiseIssue(NYql::TIssue(error));
return Reply(Ydb::StatusIds::INTERNAL_ERROR, ctx);
}

describeTableResult.mutable_primary_key()->CopyFrom(tableDescription.GetKeyColumnNames());

try {
Expand Down Expand Up @@ -213,6 +221,10 @@ class TDescribeTableRPC : public TRpcSchemeRequestActor<TDescribeTableRPC, TEvDe
record->MutableOptions()->SetReturnPartitionStats(true);
}

if (req->include_set_val()) {
record->MutableOptions()->SetReturnSetVal(true);
}

record->MutableOptions()->SetShowPrivateTable(ShowPrivatePath(path));

ctx.Send(MakeTxProxyID(), navigateRequest.release());
Expand Down
12 changes: 10 additions & 2 deletions ydb/core/ydb_convert/table_description.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1423,7 +1423,6 @@ bool FillTableDescription(NKikimrSchemeOp::TModifyScheme& out,
const Ydb::Table::CreateTableRequest& in, const TTableProfiles& profiles,
Ydb::StatusIds::StatusCode& status, TString& error, bool indexedTable)
{

NKikimrSchemeOp::TTableDescription* tableDesc = nullptr;
if (indexedTable) {
tableDesc = out.MutableCreateIndexedTable()->MutableTableDescription();
Expand Down Expand Up @@ -1465,7 +1464,8 @@ bool FillTableDescription(NKikimrSchemeOp::TModifyScheme& out,
return true;
}

bool FillSequenceDescription(Ydb::Table::CreateTableRequest& out, const NKikimrSchemeOp::TTableDescription& in, Ydb::StatusIds::StatusCode& status, TString& error) {
template <typename TYdbProto>
bool FillSequenceDescriptionImpl(TYdbProto& out, const NKikimrSchemeOp::TTableDescription& in, Ydb::StatusIds::StatusCode& status, TString& error) {
THashMap<TString, NKikimrSchemeOp::TSequenceDescription> sequences;

for (const auto& sequenceDescription : in.GetSequences()) {
Expand Down Expand Up @@ -1566,6 +1566,14 @@ bool FillSequenceDescription(Ydb::Table::CreateTableRequest& out, const NKikimrS
return true;
}

bool FillSequenceDescription(Ydb::Table::DescribeTableResult& out, const NKikimrSchemeOp::TTableDescription& in, Ydb::StatusIds::StatusCode& status, TString& error) {
return FillSequenceDescriptionImpl(out, in, status, error);
}

bool FillSequenceDescription(Ydb::Table::CreateTableRequest& out, const NKikimrSchemeOp::TTableDescription& in, Ydb::StatusIds::StatusCode& status, TString& error) {
return FillSequenceDescriptionImpl(out, in, status, error);
}

bool FillSequenceDescription(NKikimrSchemeOp::TSequenceDescription& out, const Ydb::Table::SequenceDescription& in, Ydb::StatusIds::StatusCode& status, TString& error) {
out.SetName(in.name());
if (in.has_min_value()) {
Expand Down
5 changes: 3 additions & 2 deletions ydb/core/ydb_convert/table_description.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,9 @@ bool FillTableDescription(NKikimrSchemeOp::TModifyScheme& out,


// out
bool FillSequenceDescription(Ydb::Table::CreateTableRequest& out, const NKikimrSchemeOp::TTableDescription& in,
Ydb::StatusIds::StatusCode& status, TString& error);
bool FillSequenceDescription(Ydb::Table::DescribeTableResult& out, const NKikimrSchemeOp::TTableDescription& in, Ydb::StatusIds::StatusCode& status, TString& error);

bool FillSequenceDescription(Ydb::Table::CreateTableRequest& out, const NKikimrSchemeOp::TTableDescription& in, Ydb::StatusIds::StatusCode& status, TString& error);

// in
bool FillSequenceDescription(
Expand Down
29 changes: 24 additions & 5 deletions ydb/library/backup/backup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ NTable::TTableDescription DescribeTable(TDriver driver, const TString& fullTable
NTable::TTableClient client(driver);

TStatus status = client.RetryOperationSync([fullTablePath, &desc](NTable::TSession session) {
auto settings = NTable::TDescribeTableSettings().WithKeyShardBoundary(true);
auto settings = NTable::TDescribeTableSettings().WithKeyShardBoundary(true).WithSetVal(true);
auto result = session.DescribeTable(fullTablePath, settings).GetValueSync();

VerifyStatus(result);
Expand Down Expand Up @@ -514,6 +514,7 @@ void BackupTable(TDriver driver, const TString& dbPrefix, const TString& backupP
<< " backupPrefix: " << backupPrefix << " path: " << path);

auto desc = DescribeTable(driver, JoinDatabasePath(schemaOnly ? dbPrefix : backupPrefix, path));

auto proto = ProtoFromTableDescription(desc, preservePoolKinds);

TString schemaStr;
Expand Down Expand Up @@ -742,19 +743,26 @@ void BackupFolder(TDriver driver, const TString& database, const TString& relDbP
// Restore
////////////////////////////////////////////////////////////////////////////////

TString ProcessColumnType(const TString& name, TTypeParser parser, NTable::TTableBuilder *builder) {
TString ProcessColumnType(const TString& name, TTypeParser parser, NTable::TTableBuilder *builder, std::optional<NTable::TSequenceDescription> sequenceDescription) {
TStringStream ss;
ss << "name: " << name << "; ";
if (parser.GetKind() == TTypeParser::ETypeKind::Optional) {
ss << " optional; ";
parser.OpenOptional();
}
if (sequenceDescription.has_value()) {
ss << "serial; ";
}
ss << "kind: " << parser.GetKind() << "; ";
switch (parser.GetKind()) {
case TTypeParser::ETypeKind::Primitive:
ss << " type_id: " << parser.GetPrimitive() << "; ";
if (builder) {
builder->AddNullableColumn(name, parser.GetPrimitive());
if (sequenceDescription.has_value()) {
builder->AddSerialColumn(name, parser.GetPrimitive(), std::move(*sequenceDescription));
} else {
builder->AddNullableColumn(name, parser.GetPrimitive());
}
}
break;
case TTypeParser::ETypeKind::Decimal:
Expand All @@ -775,8 +783,19 @@ TString ProcessColumnType(const TString& name, TTypeParser parser, NTable::TTabl
NTable::TTableDescription TableDescriptionFromProto(const Ydb::Table::CreateTableRequest& proto) {
NTable::TTableBuilder builder;

std::optional<NTable::TSequenceDescription> sequenceDescription;
for (const auto &col : proto.Getcolumns()) {
LOG_DEBUG("AddNullableColumn: " << ProcessColumnType(col.Getname(), TType(col.Gettype()), &builder));
if (col.from_sequence().name() == "_serial_column_" + col.name()) {
NTable::TSequenceDescription currentSequenceDescription;
if (col.from_sequence().has_set_val()) {
NTable::TSequenceDescription::TSetVal setVal;
setVal.NextUsed = col.from_sequence().set_val().next_used();
setVal.NextValue = col.from_sequence().set_val().next_value();
currentSequenceDescription.SetVal = std::move(setVal);
}
sequenceDescription = std::move(currentSequenceDescription);
}
LOG_DEBUG("AddColumn: " << ProcessColumnType(col.Getname(), TType(col.Gettype()), &builder, std::move(sequenceDescription)));
}

for (const auto &primary : proto.Getprimary_key()) {
Expand Down Expand Up @@ -805,7 +824,7 @@ TString SerializeColumnsToString(const TVector<TColumn>& columns, TVector<TStrin
if (BinarySearch(primary.cbegin(), primary.cend(), col.Name)) {
ss << "primary; ";
}
ss << ProcessColumnType(col.Name, col.Type, nullptr) << Endl;
ss << ProcessColumnType(col.Name, col.Type, nullptr, std::nullopt) << Endl;
}
// Cerr << "Parse column to : " << ss.Str() << Endl;
return ss.Str();
Expand Down
2 changes: 2 additions & 0 deletions ydb/public/api/protos/ydb_table.proto
Original file line number Diff line number Diff line change
Expand Up @@ -779,6 +779,8 @@ message DescribeTableRequest {
bool include_table_stats = 6;
// Includes partition statistics (required include_table_statistics)
bool include_partition_stats = 7;
// Includes set_val settings for sequences
bool include_set_val = 8;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Может include_sequence_positions?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SetVal уже используется в SequenceDescription

}

message DescribeTableResponse {
Expand Down
4 changes: 4 additions & 0 deletions ydb/public/sdk/cpp/client/ydb_table/impl/table_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,10 @@ TAsyncDescribeTableResult TTableClient::TImpl::DescribeTable(const TString& sess
request.set_include_partition_stats(true);
}

if (settings.WithSetVal_) {
request.set_include_set_val(true);
}

auto promise = NewPromise<TDescribeTableResult>();

auto extractor = [promise, settings]
Expand Down
57 changes: 46 additions & 11 deletions ydb/public/sdk/cpp/client/ydb_table/table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,24 @@ class TTableDescription::TImpl {
if (col.has_not_null()) {
not_null = col.not_null();
}
Columns_.emplace_back(col.name(), col.type(), col.family(), not_null);
std::optional<TSequenceDescription> sequenceDescription;
switch (col.default_value_case()) {
case Ydb::Table::ColumnMeta::kFromSequence: {
if (col.from_sequence().name() == "_serial_column_" + col.name()) {
TSequenceDescription currentSequenceDescription;
if (col.from_sequence().has_set_val()) {
TSequenceDescription::TSetVal setVal;
setVal.NextUsed = col.from_sequence().set_val().next_used();
setVal.NextValue = col.from_sequence().set_val().next_value();
currentSequenceDescription.SetVal = std::move(setVal);
}
sequenceDescription = std::move(currentSequenceDescription);
}
break;
}
default: break;
}
Columns_.emplace_back(col.name(), col.type(), col.family(), not_null, std::move(sequenceDescription));
}

// indexes
Expand Down Expand Up @@ -453,8 +470,8 @@ class TTableDescription::TImpl {
return Proto_;
}

void AddColumn(const TString& name, const Ydb::Type& type, const TString& family, std::optional<bool> notNull) {
Columns_.emplace_back(name, type, family, notNull);
void AddColumn(const TString& name, const Ydb::Type& type, const TString& family, std::optional<bool> notNull, std::optional<TSequenceDescription> sequenceDescription) {
Columns_.emplace_back(name, type, family, notNull, std::move(sequenceDescription));
}

void SetPrimaryKeyColumns(const TVector<TString>& primaryKeyColumns) {
Expand Down Expand Up @@ -737,8 +754,8 @@ const TVector<TKeyRange>& TTableDescription::GetKeyRanges() const {
return Impl_->GetKeyRanges();
}

void TTableDescription::AddColumn(const TString& name, const Ydb::Type& type, const TString& family, std::optional<bool> notNull) {
Impl_->AddColumn(name, type, family, notNull);
void TTableDescription::AddColumn(const TString& name, const Ydb::Type& type, const TString& family, std::optional<bool> notNull, std::optional<TSequenceDescription> sequenceDescription) {
Impl_->AddColumn(name, type, family, notNull, std::move(sequenceDescription));
}

void TTableDescription::SetPrimaryKeyColumns(const TVector<TString>& primaryKeyColumns) {
Expand Down Expand Up @@ -914,6 +931,15 @@ void TTableDescription::SerializeTo(Ydb::Table::CreateTableRequest& request) con
if (column.NotNull.has_value()) {
protoColumn.set_not_null(column.NotNull.value());
}
if (column.SequenceDescription.has_value()) {
auto* fromSequence = protoColumn.mutable_from_sequence();
if (column.SequenceDescription->SetVal.has_value()) {
auto* setVal = fromSequence->mutable_set_val();
setVal->set_next_value(column.SequenceDescription->SetVal->NextValue);
setVal->set_next_used(column.SequenceDescription->SetVal->NextUsed);
}
fromSequence->set_name("_serial_column_" + column.Name);
}
}

for (const auto& pk : Impl_->GetPrimaryKeyColumns()) {
Expand Down Expand Up @@ -1121,7 +1147,7 @@ TTableBuilder& TTableBuilder::AddNullableColumn(const TString& name, const EPrim
.EndOptional()
.Build();

TableDescription_.AddColumn(name, TProtoAccessor::GetProto(columnType), family, false);
TableDescription_.AddColumn(name, TProtoAccessor::GetProto(columnType), family, false, std::nullopt);
return *this;
}

Expand All @@ -1131,7 +1157,7 @@ TTableBuilder& TTableBuilder::AddNullableColumn(const TString& name, const TDeci
.Decimal(type)
.EndOptional()
.Build();
TableDescription_.AddColumn(name, TProtoAccessor::GetProto(columnType), family, false);
TableDescription_.AddColumn(name, TProtoAccessor::GetProto(columnType), family, false, std::nullopt);
return *this;
}

Expand All @@ -1140,7 +1166,7 @@ TTableBuilder& TTableBuilder::AddNullableColumn(const TString& name, const TPgTy
.Pg(type)
.Build();

TableDescription_.AddColumn(name, TProtoAccessor::GetProto(columnType), family, false);
TableDescription_.AddColumn(name, TProtoAccessor::GetProto(columnType), family, false, std::nullopt);
return *this;
}

Expand All @@ -1149,7 +1175,7 @@ TTableBuilder& TTableBuilder::AddNonNullableColumn(const TString& name, const EP
.Primitive(type)
.Build();

TableDescription_.AddColumn(name, TProtoAccessor::GetProto(columnType), family, true);
TableDescription_.AddColumn(name, TProtoAccessor::GetProto(columnType), family, true, std::nullopt);
return *this;
}

Expand All @@ -1158,7 +1184,7 @@ TTableBuilder& TTableBuilder::AddNonNullableColumn(const TString& name, const TD
.Decimal(type)
.Build();

TableDescription_.AddColumn(name, TProtoAccessor::GetProto(columnType), family, true);
TableDescription_.AddColumn(name, TProtoAccessor::GetProto(columnType), family, true, std::nullopt);
return *this;
}

Expand All @@ -1167,7 +1193,16 @@ TTableBuilder& TTableBuilder::AddNonNullableColumn(const TString& name, const TP
.Pg(type)
.Build();

TableDescription_.AddColumn(name, TProtoAccessor::GetProto(columnType), family, true);
TableDescription_.AddColumn(name, TProtoAccessor::GetProto(columnType), family, true, std::nullopt);
return *this;
}

TTableBuilder& TTableBuilder::AddSerialColumn(const TString& name, const EPrimitiveType& type, TSequenceDescription sequenceDescription, const TString& family) {
auto columnType = TTypeBuilder()
.Primitive(type)
.Build();

TableDescription_.AddColumn(name, TProtoAccessor::GetProto(columnType), family, true, std::move(sequenceDescription));
return *this;
}

Expand Down
16 changes: 14 additions & 2 deletions ydb/public/sdk/cpp/client/ydb_table/table.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,19 +100,29 @@ class TKeyRange {
TMaybe<TKeyBound> To_;
};

struct TSequenceDescription {
struct TSetVal {
i64 NextValue;
bool NextUsed;
};
std::optional<TSetVal> SetVal;
};

struct TTableColumn {
TString Name;
TType Type;
TString Family;
std::optional<bool> NotNull;
std::optional<TSequenceDescription> SequenceDescription;

TTableColumn() = default;

TTableColumn(TString name, TType type, TString family = TString(), std::optional<bool> notNull = std::nullopt)
TTableColumn(TString name, TType type, TString family = TString(), std::optional<bool> notNull = std::nullopt, std::optional<TSequenceDescription> sequenceDescription = std::nullopt)
: Name(std::move(name))
, Type(std::move(type))
, Family(std::move(family))
, NotNull(std::move(notNull))
, SequenceDescription(std::move(sequenceDescription))
{ }

// Conversion from TColumn for API compatibility
Expand Down Expand Up @@ -636,7 +646,7 @@ class TTableDescription {
TTableDescription();
explicit TTableDescription(const Ydb::Table::CreateTableRequest& request);

void AddColumn(const TString& name, const Ydb::Type& type, const TString& family, std::optional<bool> notNull);
void AddColumn(const TString& name, const Ydb::Type& type, const TString& family, std::optional<bool> notNull, std::optional<TSequenceDescription> sequenceDescription);
void SetPrimaryKeyColumns(const TVector<TString>& primaryKeyColumns);

// common
Expand Down Expand Up @@ -854,6 +864,7 @@ class TTableBuilder {
TTableBuilder& AddNonNullableColumn(const TString& name, const TPgType& type, const TString& family = TString());
TTableBuilder& SetPrimaryKeyColumns(const TVector<TString>& primaryKeyColumns);
TTableBuilder& SetPrimaryKeyColumn(const TString& primaryKeyColumn);
TTableBuilder& AddSerialColumn(const TString& name, const EPrimitiveType& type, TSequenceDescription sequenceDescription, const TString& family = TString());

// common
TTableBuilder& AddSecondaryIndex(const TIndexDescription& indexDescription);
Expand Down Expand Up @@ -1629,6 +1640,7 @@ struct TDescribeTableSettings : public TOperationRequestSettings<TDescribeTableS
FLUENT_SETTING_DEFAULT(bool, WithKeyShardBoundary, false);
FLUENT_SETTING_DEFAULT(bool, WithTableStatistics, false);
FLUENT_SETTING_DEFAULT(bool, WithPartitionStatistics, false);
FLUENT_SETTING_DEFAULT(bool, WithSetVal, false);
};

struct TExplainDataQuerySettings : public TOperationRequestSettings<TExplainDataQuerySettings> {
Expand Down
Loading
Loading