Skip to content

Commit a8ce0c2

Browse files
authored
Merge 2794beb into 16e0aab
2 parents 16e0aab + 2794beb commit a8ce0c2

19 files changed

+365
-65
lines changed

ydb/core/protos/counters_schemeshard.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -574,4 +574,6 @@ enum ETxTypes {
574574
TXTYPE_UPDATE_DOMAIN_REPLY = 85 [(TxTypeOpts) = {Name: "TxUpdateDomainReply"}];
575575

576576
TXTYPE_SEQUENCESHARD_GET_SEQUENCE_RESULT = 86 [(TxTypeOpts) = {Name: "TxSequenceShardGetSequenceResult"}];
577+
578+
TXTYPE_UNMARK_RESTORE_TABLES = 87 [(TxTypeOpts) = {Name: "TxUnmarkRestoreTables"}];
577579
}

ydb/core/protos/flat_scheme_op.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -390,6 +390,7 @@ message TTableDescription {
390390

391391
optional bool OmitFollowers = 36 [default = false]; // used with CopyFromTable
392392
optional bool IsBackup = 37 [default = false];
393+
optional bool IsRestore = 45 [default = false];
393394

394395
repeated TCdcStreamDescription CdcStreams = 38;
395396
repeated TSequenceDescription Sequences = 39;

ydb/core/tx/schemeshard/schemeshard__init.cpp

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
2121
TVector<ui64> ExportsToResume;
2222
TVector<ui64> ImportsToResume;
2323
THashMap<TPathId, TVector<TPathId>> CdcStreamScansToResume;
24+
TVector<TPathId> RestoreTablesToUnmark;
2425
bool Broken = false;
2526

2627
explicit TTxInit(TSelf *self)
@@ -307,7 +308,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
307308
return true;
308309
}
309310

310-
typedef std::tuple<TPathId, ui32, ui64, TString, TString, TString, ui64, TString, bool, TString, bool, TString> TTableRec;
311+
typedef std::tuple<TPathId, ui32, ui64, TString, TString, TString, ui64, TString, bool, TString, bool, TString, bool> TTableRec;
311312
typedef TDeque<TTableRec> TTableRows;
312313

313314
template <typename SchemaTable, typename TRowSet>
@@ -323,7 +324,8 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
323324
rowSet.template GetValueOrDefault<typename SchemaTable::IsBackup>(false),
324325
rowSet.template GetValueOrDefault<typename SchemaTable::ReplicationConfig>(),
325326
rowSet.template GetValueOrDefault<typename SchemaTable::IsTemporary>(false),
326-
rowSet.template GetValueOrDefault<typename SchemaTable::OwnerActorId>("")
327+
rowSet.template GetValueOrDefault<typename SchemaTable::OwnerActorId>(""),
328+
rowSet.template GetValueOrDefault<typename SchemaTable::IsRestore>(false)
327329
);
328330
}
329331

@@ -1831,6 +1833,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
18311833
}
18321834

18331835
tableInfo->IsBackup = std::get<8>(rec);
1836+
tableInfo->IsRestore = std::get<12>(rec);
18341837

18351838
Self->Tables[pathId] = tableInfo;
18361839
Self->IncrementPathDbRefCount(pathId);
@@ -3883,6 +3886,8 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
38833886
<< ", read records: " << history.size()
38843887
<< ", at schemeshard: " << Self->TabletID());
38853888

3889+
RestoreTablesToUnmark.clear();
3890+
38863891
for (auto& rec: history) {
38873892
auto pathId = std::get<0>(rec);
38883893
auto txId = std::get<1>(rec);
@@ -3929,6 +3934,9 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
39293934
break;
39303935
case TTableInfo::TBackupRestoreResult::EKind::Restore:
39313936
tableInfo->RestoreHistory[txId] = std::move(info);
3937+
if (tableInfo->IsRestore) {
3938+
RestoreTablesToUnmark.push_back(pathId);
3939+
}
39323940
break;
39333941
}
39343942

@@ -4928,6 +4936,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
49284936
.CdcStreamScans = std::move(cdcStreamScansToResume),
49294937
.TablesToClean = std::move(TablesToClean),
49304938
.BlockStoreVolumesToClean = std::move(BlockStoreVolumesToClean),
4939+
.RestoreTablesToUnmark = std::move(RestoreTablesToUnmark),
49314940
});
49324941
}
49334942
};

ydb/core/tx/schemeshard/schemeshard__operation_alter_table.cpp

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,12 +77,18 @@ TTableInfo::TAlterDataPtr ParseParams(const TPath& path, TTableInfo::TPtr table,
7777
copyAlter.ColumnsSize() != 0 ||
7878
copyAlter.DropColumnsSize() != 0);
7979

80-
if (copyAlter.HasIsBackup() && copyAlter.GetIsBackup() != table->IsBackup) {
80+
if (copyAlter.HasIsBackup() && copyAlter.GetIsBackup() != table->IsBackup) {
8181
errStr = Sprintf("Cannot add/remove 'IsBackup' property");
8282
status = NKikimrScheme::StatusInvalidParameter;
8383
return nullptr;
8484
}
8585

86+
if (copyAlter.HasIsRestore() && copyAlter.GetIsRestore() != table->IsRestore) {
87+
errStr = Sprintf("Cannot add/remove 'IsRestore' property");
88+
status = NKikimrScheme::StatusInvalidParameter;
89+
return nullptr;
90+
}
91+
8692
if (!hasSchemaChanges
8793
&& !copyAlter.HasPartitionConfig()
8894
&& !copyAlter.HasTTLSettings()

ydb/core/tx/schemeshard/schemeshard__operation_backup_restore_common.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,7 @@ class TProposedWaitParts: public TSubOperationState {
245245
return false;
246246
}
247247

248-
TKind::FinishStats(OperationId, txState, context);
248+
TKind::Finish(OperationId, txState, context);
249249
return true;
250250
}
251251

@@ -285,7 +285,7 @@ class TProposedWaitParts: public TSubOperationState {
285285
if (txState->ShardsInProgress.empty()) {
286286
NTableState::AckAllSchemaChanges(OperationId, *txState, context);
287287
context.SS->ChangeTxState(db, OperationId, TTxState::Done);
288-
TKind::FinishStats(OperationId, *txState, context);
288+
TKind::Finish(OperationId, *txState, context);
289289
return true;
290290
}
291291

@@ -361,7 +361,7 @@ class TAborting: public TProposedWaitParts<TKind> {
361361
if (txState->ShardsInProgress.empty()) {
362362
NTableState::AckAllSchemaChanges(OperationId, *txState, context);
363363
context.SS->ChangeTxState(db, OperationId, TTxState::Done);
364-
TKind::FinishStats(OperationId, *txState, context);
364+
TKind::Finish(OperationId, *txState, context);
365365
return true;
366366
}
367367

ydb/core/tx/schemeshard/schemeshard__operation_create_backup.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ struct TBackup {
5353
return TRUCalculator::ReadTable(bytes);
5454
}
5555

56-
static void FinishStats(const TOperationId& opId, TTxState& txState, TOperationContext& context) {
56+
static void Finish(const TOperationId& opId, TTxState& txState, TOperationContext& context) {
5757
if (txState.TxType != TTxState::TxBackup) {
5858
return;
5959
}

ydb/core/tx/schemeshard/schemeshard__operation_create_restore.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ struct TRestore {
5353
return TRUCalculator::BulkUpsert(bytes, rows);
5454
}
5555

56-
static void FinishStats(const TOperationId& opId, TTxState& txState, TOperationContext& context) {
56+
static void Finish(const TOperationId& opId, TTxState& txState, TOperationContext& context) {
5757
if (txState.TxType != TTxState::TxRestore) {
5858
return;
5959
}
@@ -77,6 +77,9 @@ struct TRestore {
7777

7878
NIceDb::TNiceDb db(context.GetDB());
7979
context.SS->PersistCompletedRestore(db, opId.GetTxId(), txState, restoreInfo);
80+
81+
table->IsRestore = false;
82+
context.SS->PersistTableIsRestore(db, txState.TargetPathId, table);
8083
}
8184

8285
static void PersistTask(const TPathId& pathId, const TTxTransaction& tx, TOperationContext& context) {

ydb/core/tx/schemeshard/schemeshard__operation_split_merge.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -784,6 +784,13 @@ class TSplitMerge: public TSubOperation {
784784
return result;
785785
}
786786

787+
if (tableInfo->IsRestore) {
788+
TString errMsg = TStringBuilder()
789+
<< "cannot split/merge restore table " << info.GetTablePath();
790+
result->SetError(NKikimrScheme::StatusInvalidParameter, errMsg);
791+
return result;
792+
}
793+
787794
const THashMap<TShardIdx, ui64>& shardIdx2partition = tableInfo->GetShard2PartitionIdx();
788795

789796
TVector<ui64> srcPartitionIdxs;
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
#include "schemeshard_impl.h"
2+
3+
namespace NKikimr::NSchemeShard {
4+
5+
using namespace NTabletFlatExecutor;
6+
7+
struct TSchemeShard::TTxUnmarkRestoreTables : public TTransactionBase<TSchemeShard> {
8+
static const ui32 BucketSize = 100;
9+
TVector<TPathId> RestoreTablesToUnmark;
10+
ui32 UnmarkedCount;
11+
12+
TTxUnmarkRestoreTables(TSelf* self, TVector<TPathId> tablesToClean)
13+
: TTransactionBase<TSchemeShard>(self)
14+
, RestoreTablesToUnmark(std::move(tablesToClean))
15+
, UnmarkedCount(0)
16+
{}
17+
18+
TTxType GetTxType() const override {
19+
return TXTYPE_UNMARK_RESTORE_TABLES;
20+
}
21+
22+
bool Execute(TTransactionContext &txc, const TActorContext&) override {
23+
NIceDb::TNiceDb db(txc.DB);
24+
25+
UnmarkedCount = 0;
26+
while (UnmarkedCount < BucketSize && RestoreTablesToUnmark) {
27+
TPathId tableId = RestoreTablesToUnmark.back();
28+
if (Self->Tables.contains(tableId)) {
29+
auto table = Self->Tables[tableId];
30+
table->IsRestore = false;
31+
Self->PersistTableIsRestore(db, tableId, table);
32+
}
33+
34+
++UnmarkedCount;
35+
RestoreTablesToUnmark.pop_back();
36+
}
37+
38+
return true;
39+
}
40+
41+
void Complete(const TActorContext &ctx) override {
42+
if (UnmarkedCount) {
43+
LOG_NOTICE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
44+
"TTxUnmarkRestoreTables Complete"
45+
<< ", done for " << UnmarkedCount << " tables"
46+
<< ", left " << RestoreTablesToUnmark.size()
47+
<< ", at schemeshard: "<< Self->TabletID()
48+
);
49+
}
50+
51+
if (RestoreTablesToUnmark) {
52+
Self->Execute(Self->CreateTxUnmarkRestoreTables(std::move(RestoreTablesToUnmark)), ctx);
53+
}
54+
}
55+
};
56+
57+
NTabletFlatExecutor::ITransaction* TSchemeShard::CreateTxUnmarkRestoreTables(TVector<TPathId>&& tablesToUnmark) {
58+
return new TTxUnmarkRestoreTables(this, std::move(tablesToUnmark));
59+
}
60+
61+
} // NKikimr::NSchemeShard
62+

ydb/core/tx/schemeshard/schemeshard_impl.cpp

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,10 @@ void TSchemeShard::ActivateAfterInitialization(const TActorContext& ctx, TActiva
9797
Execute(CreateTxCleanBlockStoreVolumes(std::move(opts.BlockStoreVolumesToClean)), ctx);
9898
}
9999

100+
if (opts.RestoreTablesToUnmark) {
101+
Execute(CreateTxUnmarkRestoreTables(std::move(opts.RestoreTablesToUnmark)), ctx);
102+
}
103+
100104
if (IsDomainSchemeShard) {
101105
InitializeTabletMigrations();
102106
}
@@ -2621,6 +2625,16 @@ void TSchemeShard::PersistTableFinishColumnBuilding(NIceDb::TNiceDb& db, const T
26212625
}
26222626
}
26232627

2628+
void TSchemeShard::PersistTableIsRestore(NIceDb::TNiceDb& db, const TPathId pathId, const TTableInfo::TPtr tableInfo) {
2629+
if (pathId.OwnerId == TabletID()) {
2630+
db.Table<Schema::Tables>().Key(pathId.LocalPathId).Update(
2631+
NIceDb::TUpdate<Schema::Tables::IsRestore>(tableInfo->IsRestore));
2632+
} else {
2633+
db.Table<Schema::MigratedTables>().Key(pathId.OwnerId, pathId.LocalPathId).Update(
2634+
NIceDb::TUpdate<Schema::MigratedTables::IsRestore>(tableInfo->IsRestore));
2635+
}
2636+
}
2637+
26242638
void TSchemeShard::PersistTableAltered(NIceDb::TNiceDb& db, const TPathId pathId, const TTableInfo::TPtr tableInfo) {
26252639
TString partitionConfig;
26262640
Y_PROTOBUF_SUPPRESS_NODISCARD tableInfo->PartitionConfig().SerializeToString(&partitionConfig);
@@ -2644,6 +2658,7 @@ void TSchemeShard::PersistTableAltered(NIceDb::TNiceDb& db, const TPathId pathId
26442658
NIceDb::TUpdate<Schema::Tables::AlterTableFull>(TString()),
26452659
NIceDb::TUpdate<Schema::Tables::TTLSettings>(ttlSettings),
26462660
NIceDb::TUpdate<Schema::Tables::IsBackup>(tableInfo->IsBackup),
2661+
NIceDb::TUpdate<Schema::Tables::IsRestore>(tableInfo->IsRestore),
26472662
NIceDb::TUpdate<Schema::Tables::ReplicationConfig>(replicationConfig),
26482663
NIceDb::TUpdate<Schema::Tables::IsTemporary>(tableInfo->IsTemporary),
26492664
NIceDb::TUpdate<Schema::Tables::OwnerActorId>(tableInfo->OwnerActorId.ToString()));
@@ -2656,6 +2671,7 @@ void TSchemeShard::PersistTableAltered(NIceDb::TNiceDb& db, const TPathId pathId
26562671
NIceDb::TUpdate<Schema::MigratedTables::AlterTableFull>(TString()),
26572672
NIceDb::TUpdate<Schema::MigratedTables::TTLSettings>(ttlSettings),
26582673
NIceDb::TUpdate<Schema::MigratedTables::IsBackup>(tableInfo->IsBackup),
2674+
NIceDb::TUpdate<Schema::MigratedTables::IsRestore>(tableInfo->IsRestore),
26592675
NIceDb::TUpdate<Schema::MigratedTables::ReplicationConfig>(replicationConfig),
26602676
NIceDb::TUpdate<Schema::MigratedTables::IsTemporary>(tableInfo->IsTemporary),
26612677
NIceDb::TUpdate<Schema::MigratedTables::OwnerActorId>(tableInfo->OwnerActorId.ToString()));
@@ -6666,6 +6682,10 @@ void TSchemeShard::FillTableDescriptionForShardIdx(
66666682
tableDescr->SetIsBackup(true);
66676683
}
66686684

6685+
if (tinfo->IsRestore) {
6686+
tableDescr->SetIsRestore(true);
6687+
}
6688+
66696689
if (tinfo->HasReplicationConfig()) {
66706690
tableDescr->MutableReplicationConfig()->CopyFrom(tinfo->ReplicationConfig());
66716691
}

ydb/core/tx/schemeshard/schemeshard_impl.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -679,6 +679,7 @@ class TSchemeShard
679679
void PersistTableCreated(NIceDb::TNiceDb& db, const TPathId tableId);
680680
void PersistTableAlterVersion(NIceDb::TNiceDb &db, const TPathId pathId, const TTableInfo::TPtr tableInfo);
681681
void PersistTableFinishColumnBuilding(NIceDb::TNiceDb& db, const TPathId pathId, const TTableInfo::TPtr tableInfo, ui64 colId);
682+
void PersistTableIsRestore(NIceDb::TNiceDb &db, const TPathId pathId, const TTableInfo::TPtr tableInfo);
682683
void PersistTableAltered(NIceDb::TNiceDb &db, const TPathId pathId, const TTableInfo::TPtr tableInfo);
683684
void PersistAddAlterTable(NIceDb::TNiceDb& db, TPathId pathId, const TTableInfo::TAlterDataPtr alter);
684685
void PersistPersQueueGroup(NIceDb::TNiceDb &db, TPathId pathId, const TTopicInfo::TPtr);
@@ -857,6 +858,7 @@ class TSchemeShard
857858
TVector<TPathId> CdcStreamScans;
858859
TVector<TPathId> TablesToClean;
859860
TDeque<TPathId> BlockStoreVolumesToClean;
861+
TVector<TPathId> RestoreTablesToUnmark;
860862
};
861863

862864
void SubscribeToTempTableOwners();
@@ -883,6 +885,9 @@ class TSchemeShard
883885

884886
void ScheduleCleanDroppedPaths();
885887
void Handle(TEvPrivate::TEvCleanDroppedPaths::TPtr& ev, const TActorContext& ctx);
888+
889+
struct TTxUnmarkRestoreTables;
890+
NTabletFlatExecutor::ITransaction* CreateTxUnmarkRestoreTables(TVector<TPathId>&& tablesToUnmark);
886891

887892
void EnqueueBackgroundCompaction(const TShardIdx& shardIdx, const TPartitionStats& stats);
888893
void UpdateBackgroundCompaction(const TShardIdx& shardIdx, const TPartitionStats& stats);

ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> CreateTablePropose(
4040
auto* indexedTable = modifyScheme.MutableCreateIndexedTable();
4141
auto& tableDesc = *(indexedTable->MutableTableDescription());
4242
tableDesc.SetName(wdAndPath.second);
43+
tableDesc.SetIsRestore(true);
4344

4445
Y_ABORT_UNLESS(ss->TableProfilesLoaded);
4546
Ydb::StatusIds::StatusCode status;

ydb/core/tx/schemeshard/schemeshard_info_types.cpp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -541,6 +541,7 @@ TTableInfo::TAlterDataPtr TTableInfo::CreateAlterData(
541541
}
542542

543543
alterData->IsBackup = op.GetIsBackup();
544+
alterData->IsRestore = op.GetIsRestore();
544545

545546
if (source && op.KeyColumnNamesSize() == 0)
546547
return alterData;
@@ -1890,6 +1891,11 @@ bool TTableInfo::CheckCanMergePartitions(const TSplitSettings& splitSettings,
18901891
return false;
18911892
}
18921893

1894+
// Don't split/merge restore tables
1895+
if (IsRestore) {
1896+
return false;
1897+
}
1898+
18931899
// Ignore stats from unknown datashard (it could have been split)
18941900
if (!Stats.PartitionStats.contains(shardIdx)) {
18951901
return false;
@@ -1941,6 +1947,10 @@ bool TTableInfo::CheckSplitByLoad(
19411947
if (IsBackup)
19421948
return false;
19431949

1950+
// Don't split/merge restore tables
1951+
if (IsRestore)
1952+
return false;
1953+
19441954
if (!splitSettings.SplitByLoadEnabled)
19451955
return false;
19461956

ydb/core/tx/schemeshard/schemeshard_info_types.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -394,6 +394,7 @@ struct TTableInfo : public TSimpleRefCount<TTableInfo> {
394394
THashMap<ui32, TColumn> Columns;
395395
TVector<ui32> KeyColumnIds;
396396
bool IsBackup = false;
397+
bool IsRestore = false;
397398

398399
NKikimrSchemeOp::TTableDescription TableDescriptionDiff;
399400
TMaybeFail<NKikimrSchemeOp::TTableDescription> TableDescriptionFull;
@@ -437,6 +438,7 @@ struct TTableInfo : public TSimpleRefCount<TTableInfo> {
437438
THashMap<ui32, TColumn> Columns;
438439
TVector<ui32> KeyColumnIds;
439440
bool IsBackup = false;
441+
bool IsRestore = false;
440442
bool IsTemporary = false;
441443
TActorId OwnerActorId;
442444

@@ -549,6 +551,7 @@ struct TTableInfo : public TSimpleRefCount<TTableInfo> {
549551
, Columns(std::move(alterData.Columns))
550552
, KeyColumnIds(std::move(alterData.KeyColumnIds))
551553
, IsBackup(alterData.IsBackup)
554+
, IsRestore(alterData.IsRestore)
552555
{
553556
TableDescription.Swap(alterData.TableDescriptionFull.Get());
554557
}

ydb/core/tx/schemeshard/schemeshard_path_describer.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1269,6 +1269,7 @@ void TSchemeShard::DescribeTable(
12691269
}
12701270

12711271
entry->SetIsBackup(tableInfo->IsBackup);
1272+
entry->SetIsRestore(tableInfo->IsRestore);
12721273
}
12731274

12741275
void TSchemeShard::DescribeTableIndex(const TPathId& pathId, const TString& name,

0 commit comments

Comments
 (0)