Skip to content

Copy table with sequences and backup without NextVal #2285

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Mar 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ydb/core/tx/datashard/export_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ TMaybe<Ydb::Table::CreateTableRequest> GenYdbScheme(
FillPartitioningSettings(scheme, tableDesc);
FillKeyBloomFilter(scheme, tableDesc);
FillReadReplicasSettings(scheme, tableDesc);
FillSequenceDescription(scheme, tableDesc);

return scheme;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,7 @@ class TBackupRestoreOperationBase: public TSubOperation {
.NotAsyncReplicaTable()
.NotUnderOperation()
.IsCommonSensePath() //forbid alter impl index tables
.NotChildren(); //forbid backup table with indexes
.CanBackupTable(); //forbid backup table with indexes

if (!checks) {
result->SetError(checks.GetStatus(), checks.GetError());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,13 +115,33 @@ TVector<ISubOperation::TPtr> CreateConsistentCopyTables(TOperationId nextId, con
TPath dstPath = TPath::Resolve(dstStr, context.SS);
TPath dstParentPath = dstPath.Parent();

THashSet<TString> sequences;
for (const auto& child: srcPath.Base()->GetChildren()) {
auto name = child.first;
auto pathId = child.second;

TPath childPath = srcPath.Child(name);
if (!childPath.IsSequence() || childPath.IsDeleted()) {
continue;
}

Y_ABORT_UNLESS(childPath.Base()->PathId == pathId);

TSequenceInfo::TPtr sequenceInfo = context.SS->Sequences.at(pathId);
const auto& sequenceDesc = sequenceInfo->Description;
const auto& sequenceName = sequenceDesc.GetName();

sequences.emplace(sequenceName);
}

result.push_back(CreateCopyTable(NextPartId(nextId, result),
CopyTableTask(srcPath, dstPath, descr.GetOmitFollowers(), descr.GetIsBackup())));
CopyTableTask(srcPath, dstPath, descr.GetOmitFollowers(), descr.GetIsBackup()), sequences));

if (descr.GetOmitIndexes()) {
continue;
}

TVector<NKikimrSchemeOp::TSequenceDescription> sequenceDescriptions;
for (const auto& child: srcPath.Base()->GetChildren()) {
const auto& name = child.first;
const auto& pathId = child.second;
Expand All @@ -133,6 +153,13 @@ TVector<ISubOperation::TPtr> CreateConsistentCopyTables(TOperationId nextId, con
continue;
}

if (srcIndexPath.IsSequence()) {
TSequenceInfo::TPtr sequenceInfo = context.SS->Sequences.at(pathId);
const auto& sequenceDesc = sequenceInfo->Description;
sequenceDescriptions.push_back(sequenceDesc);
continue;
}

if (!srcIndexPath.IsTableIndex()) {
continue;
}
Expand All @@ -151,6 +178,17 @@ TVector<ISubOperation::TPtr> CreateConsistentCopyTables(TOperationId nextId, con
result.push_back(CreateCopyTable(NextPartId(nextId, result),
CopyTableTask(srcImplTable, dstImplTable, descr.GetOmitFollowers(), descr.GetIsBackup())));
}

for (auto&& sequenceDescription : sequenceDescriptions) {
auto scheme = TransactionTemplate(
dstPath.PathString(),
NKikimrSchemeOp::EOperationType::ESchemeOpCreateSequence);
scheme.SetFailOnExist(true);

*scheme.MutableSequence() = std::move(sequenceDescription);

result.push_back(CreateNewSequence(NextPartId(nextId, result), scheme));
}
}

return result;
Expand Down
62 changes: 57 additions & 5 deletions ydb/core/tx/schemeshard/schemeshard__operation_copy_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,9 @@ class TPropose: public TSubOperationState {
};

class TCopyTable: public TSubOperation {

THashSet<TString> LocalSequences;

static TTxState::ETxState NextState() {
return TTxState::CreateParts;
}
Expand Down Expand Up @@ -267,6 +270,12 @@ class TCopyTable: public TSubOperation {
public:
using TSubOperation::TSubOperation;

explicit TCopyTable(const TOperationId& id, const TTxTransaction& tx, const THashSet<TString>& localSequences)
: TSubOperation(id, tx)
, LocalSequences(localSequences)
{
}

bool IsShadowDataAllowed() const {
return AppData()->AllowShadowDataInSchemeShardForTests;
}
Expand Down Expand Up @@ -459,7 +468,8 @@ class TCopyTable: public TSubOperation {

const NScheme::TTypeRegistry* typeRegistry = AppData()->TypeRegistry;
const TSchemeLimits& limits = domainInfo->GetSchemeLimits();
TTableInfo::TAlterDataPtr alterData = TTableInfo::CreateAlterData(nullptr, schema, *typeRegistry, limits, *domainInfo, context.SS->EnableTablePgTypes, errStr);
TTableInfo::TAlterDataPtr alterData = TTableInfo::CreateAlterData(nullptr, schema, *typeRegistry,
limits, *domainInfo, context.SS->EnableTablePgTypes, errStr, LocalSequences);
if (!alterData.Get()) {
result->SetError(NKikimrScheme::StatusSchemeError, errStr);
return result;
Expand Down Expand Up @@ -626,8 +636,9 @@ class TCopyTable: public TSubOperation {

namespace NKikimr::NSchemeShard {

ISubOperation::TPtr CreateCopyTable(TOperationId id, const TTxTransaction& tx) {
return MakeSubOperation<TCopyTable>(id, tx);
ISubOperation::TPtr CreateCopyTable(TOperationId id, const TTxTransaction& tx, const THashSet<TString>& localSequences)
{
return MakeSubOperation<TCopyTable>(id, tx, localSequences);
}

ISubOperation::TPtr CreateCopyTable(TOperationId id, TTxState::ETxState state) {
Expand Down Expand Up @@ -659,6 +670,25 @@ TVector<ISubOperation::TPtr> CreateCopyTable(TOperationId nextId, const TTxTrans
}
}

THashSet<TString> sequences;
for (auto& child: srcPath.Base()->GetChildren()) {
auto name = child.first;
auto pathId = child.second;

TPath childPath = srcPath.Child(name);
if (!childPath.IsSequence() || childPath.IsDeleted()) {
continue;
}

Y_ABORT_UNLESS(childPath.Base()->PathId == pathId);

TSequenceInfo::TPtr sequenceInfo = context.SS->Sequences.at(pathId);
const auto& sequenceDesc = sequenceInfo->Description;
const auto& sequenceName = sequenceDesc.GetName();

sequences.emplace(sequenceName);
}

TPath workDir = TPath::Resolve(tx.GetWorkingDir(), context.SS);
TPath dstPath = workDir.Child(copying.GetName());

Expand All @@ -674,15 +704,27 @@ TVector<ISubOperation::TPtr> CreateCopyTable(TOperationId nextId, const TTxTrans
operation->SetIsBackup(copying.GetIsBackup());
operation->MutablePartitionConfig()->CopyFrom(copying.GetPartitionConfig());

result.push_back(CreateCopyTable(NextPartId(nextId, result), schema));
result.push_back(CreateCopyTable(NextPartId(nextId, result), schema, sequences));
}

TVector<NKikimrSchemeOp::TSequenceDescription> sequenceDescriptions;
for (auto& child: srcPath.Base()->GetChildren()) {
auto name = child.first;
auto pathId = child.second;

TPath childPath = srcPath.Child(name);
if (!childPath.IsTableIndex() || childPath.IsDeleted()) {
if (childPath.IsDeleted()) {
continue;
}

if (childPath.IsSequence()) {
TSequenceInfo::TPtr sequenceInfo = context.SS->Sequences.at(pathId);
const auto& sequenceDesc = sequenceInfo->Description;
sequenceDescriptions.push_back(sequenceDesc);
continue;
}

if (!childPath.IsTableIndex()) {
continue;
}

Expand Down Expand Up @@ -727,6 +769,16 @@ TVector<ISubOperation::TPtr> CreateCopyTable(TOperationId nextId, const TTxTrans
}
}

for (auto&& sequenceDescription : sequenceDescriptions) {
auto scheme = TransactionTemplate(
tx.GetWorkingDir() + "/" + copying.GetName(),
NKikimrSchemeOp::EOperationType::ESchemeOpCreateSequence);
scheme.SetFailOnExist(tx.GetFailOnExist());

*scheme.MutableSequence() = std::move(sequenceDescription);

result.push_back(CreateNewSequence(NextPartId(nextId, result), scheme));
}
return result;
}

Expand Down
3 changes: 2 additions & 1 deletion ydb/core/tx/schemeshard/schemeshard__operation_part.h
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,8 @@ ISubOperation::TPtr CreateForceDropUnsafe(TOperationId id, TTxState::ETxState st
ISubOperation::TPtr CreateNewTable(TOperationId id, const TTxTransaction& tx, const THashSet<TString>& localSequences = { });
ISubOperation::TPtr CreateNewTable(TOperationId id, TTxState::ETxState state);

ISubOperation::TPtr CreateCopyTable(TOperationId id, const TTxTransaction& tx);
ISubOperation::TPtr CreateCopyTable(TOperationId id, const TTxTransaction& tx,
const THashSet<TString>& localSequences = { });
ISubOperation::TPtr CreateCopyTable(TOperationId id, TTxState::ETxState state);
TVector<ISubOperation::TPtr> CreateCopyTable(TOperationId nextId, const TTxTransaction& tx, TOperationContext& context);

Expand Down
5 changes: 4 additions & 1 deletion ydb/core/tx/schemeshard/schemeshard_import__create.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -850,7 +850,10 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
}

if (item.State == EState::CreateTable) {
item.DstPathId = Self->MakeLocalId(TLocalPathId(record.GetPathId()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Не очень понял, почему понадобилось это изменение?

auto createPath = TPath::Resolve(item.DstPathName, Self);
Y_ABORT_UNLESS(createPath);

item.DstPathId = createPath.Base()->PathId;
Self->PersistImportItemDstPathId(db, importInfo, itemIdx);
}

Expand Down
42 changes: 39 additions & 3 deletions ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> CreateTablePropose(
}

auto& modifyScheme = *record.AddTransaction();
modifyScheme.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateTable);
modifyScheme.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateIndexedTable);
modifyScheme.SetInternal(true);

const TPath domainPath = TPath::Init(importInfo->DomainPathId, ss);
Expand All @@ -37,15 +37,51 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> CreateTablePropose(

modifyScheme.SetWorkingDir(wdAndPath.first);

auto& tableDesc = *modifyScheme.MutableCreateTable();
auto* indexedTable = modifyScheme.MutableCreateIndexedTable();
auto& tableDesc = *(indexedTable->MutableTableDescription());
tableDesc.SetName(wdAndPath.second);

Y_ABORT_UNLESS(ss->TableProfilesLoaded);
Ydb::StatusIds::StatusCode status;
if (!FillTableDescription(modifyScheme, item.Scheme, ss->TableProfiles, status, error)) {
if (!FillTableDescription(modifyScheme, item.Scheme, ss->TableProfiles, status, error, true)) {
return nullptr;
}

for(const auto& column: item.Scheme.columns()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Это нужно унести в core/ydb_convert.

switch (column.default_value_case()) {
case Ydb::Table::ColumnMeta::kFromSequence: {
const auto& fromSequence = column.from_sequence();

auto seqDesc = indexedTable->MutableSequenceDescription()->Add();
seqDesc->SetName(fromSequence.name());
if (fromSequence.has_min_value()) {
seqDesc->SetMinValue(fromSequence.min_value());
}
if (fromSequence.has_max_value()) {
seqDesc->SetMaxValue(fromSequence.max_value());
}
if (fromSequence.has_start_value()) {
seqDesc->SetStartValue(fromSequence.start_value());
}
if (fromSequence.has_cache()) {
seqDesc->SetCache(fromSequence.cache());
}
if (fromSequence.has_increment()) {
seqDesc->SetIncrement(fromSequence.increment());
}
if (fromSequence.has_cycle()) {
seqDesc->SetCycle(fromSequence.cycle());
}

break;
}
case Ydb::Table::ColumnMeta::kFromLiteral: {
break;
}
default: break;
}
}

return propose;
}

Expand Down
17 changes: 17 additions & 0 deletions ydb/core/tx/schemeshard/schemeshard_path.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -923,6 +923,23 @@ const TPath::TChecker& TPath::TChecker::NotChildren(EStatus status) const {
<< ", children: " << childrenCount);
}

const TPath::TChecker& TPath::TChecker::CanBackupTable(EStatus status) const {
if (Failed) {
return *this;
}

for (const auto& child: Path.Base()->GetChildren()) {
auto name = child.first;

TPath childPath = Path.Child(name);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

const.

if (childPath->IsTableIndex()) {
return Fail(status, TStringBuilder() << "path has indexes, request doesn't accept it");
}
}

return *this;
}

const TPath::TChecker& TPath::TChecker::NotDeleted(EStatus status) const {
if (Failed) {
return *this;
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/schemeshard/schemeshard_path.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ class TPath {
const TChecker& ShardsLimit(ui64 delta = 1, EStatus status = EStatus::StatusResourceExhausted) const;
const TChecker& PathShardsLimit(ui64 delta = 1, EStatus status = EStatus::StatusResourceExhausted) const;
const TChecker& NotChildren(EStatus status = EStatus::StatusInvalidParameter) const;
const TChecker& CanBackupTable(EStatus status = EStatus::StatusInvalidParameter) const;
const TChecker& IsValidACL(const TString& acl, EStatus status = EStatus::StatusInvalidParameter) const;
const TChecker& PQPartitionsLimit(ui64 delta = 1, EStatus status = EStatus::StatusResourceExhausted) const;
const TChecker& PQReservedStorageLimit(ui64 delta = 1, EStatus status = EStatus::StatusResourceExhausted) const;
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/schemeshard/ut_export/ut_export.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <ydb/core/wrappers/ut_helpers/s3_mock.h>
#include <ydb/core/wrappers/s3_wrapper.h>
#include <ydb/core/metering/metering.h>
#include <ydb/public/api/protos/ydb_export.pb.h>

#include <util/string/builder.h>
#include <util/string/cast.h>
Expand Down
Loading