Skip to content

Commit 56c818b

Browse files
authored
Merge be241de into 2c1d472
2 parents 2c1d472 + be241de commit 56c818b

21 files changed

+372
-74
lines changed

ydb/core/protos/counters_schemeshard.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -614,4 +614,5 @@ enum ETxTypes {
614614
TXTYPE_DROP_BACKUP_COLLECTION_RESULT = 89 [(TxTypeOpts) = {Name: "TxDropBackupCollectionResult"}];
615615

616616
TXTYPE_LIST_USERS = 90 [(TxTypeOpts) = {Name: "TxListUsers"}];
617+
TXTYPE_UNMARK_RESTORE_TABLES = 91 [(TxTypeOpts) = {Name: "TxUnmarkRestoreTables"}];
617618
}

ydb/core/protos/flat_scheme_op.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -317,6 +317,7 @@ message TTableDescription {
317317

318318
optional bool OmitFollowers = 36 [default = false]; // used with CopyFromTable
319319
optional bool IsBackup = 37 [default = false];
320+
optional bool IsRestore = 45 [default = false];
320321

321322
repeated TCdcStreamDescription CdcStreams = 38;
322323
repeated TSequenceDescription Sequences = 39;

ydb/core/tx/schemeshard/schemeshard__init.cpp

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
2222
TVector<ui64> ExportsToResume;
2323
TVector<ui64> ImportsToResume;
2424
THashMap<TPathId, TVector<TPathId>> CdcStreamScansToResume;
25+
TVector<TPathId> RestoreTablesToUnmark;
2526
bool Broken = false;
2627

2728
explicit TTxInit(TSelf *self)
@@ -308,7 +309,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
308309
return true;
309310
}
310311

311-
typedef std::tuple<TPathId, ui32, ui64, TString, TString, TString, ui64, TString, bool, TString, bool, TString, TString> TTableRec;
312+
typedef std::tuple<TPathId, ui32, ui64, TString, TString, TString, ui64, TString, bool, TString, bool, TString, TString, bool> TTableRec;
312313
typedef TDeque<TTableRec> TTableRows;
313314

314315
template <typename SchemaTable, typename TRowSet>
@@ -325,7 +326,8 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
325326
rowSet.template GetValueOrDefault<typename SchemaTable::ReplicationConfig>(),
326327
rowSet.template GetValueOrDefault<typename SchemaTable::IsTemporary>(false),
327328
rowSet.template GetValueOrDefault<typename SchemaTable::OwnerActorId>(""),
328-
rowSet.template GetValueOrDefault<typename SchemaTable::IncrementalBackupConfig>()
329+
rowSet.template GetValueOrDefault<typename SchemaTable::IncrementalBackupConfig>(),
330+
rowSet.template GetValueOrDefault<typename SchemaTable::IsRestore>(false)
329331
);
330332
}
331333

@@ -1845,12 +1847,13 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
18451847
bool parseOk = ParseFromStringNoSizeLimit(tableInfo->MutableIncrementalBackupConfig(), incrementalBackupConfig);
18461848
Y_ABORT_UNLESS(parseOk);
18471849

1848-
if (tableInfo->IsRestoreTable()) {
1849-
Self->PathsById.at(pathId)->SetRestoreTable();
1850+
if (tableInfo->IsIncrementalRestoreTable()) {
1851+
Self->PathsById.at(pathId)->SetIncrementalRestoreTable();
18501852
}
18511853
}
18521854

18531855
tableInfo->IsBackup = std::get<8>(rec);
1856+
tableInfo->IsRestore = std::get<13>(rec);
18541857

18551858
Self->Tables[pathId] = tableInfo;
18561859
Self->IncrementPathDbRefCount(pathId);
@@ -3942,6 +3945,8 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
39423945
<< ", read records: " << history.size()
39433946
<< ", at schemeshard: " << Self->TabletID());
39443947

3948+
RestoreTablesToUnmark.clear();
3949+
39453950
for (auto& rec: history) {
39463951
auto pathId = std::get<0>(rec);
39473952
auto txId = std::get<1>(rec);
@@ -3988,6 +3993,9 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
39883993
break;
39893994
case TTableInfo::TBackupRestoreResult::EKind::Restore:
39903995
tableInfo->RestoreHistory[txId] = std::move(info);
3996+
if (tableInfo->IsRestore) {
3997+
RestoreTablesToUnmark.push_back(pathId);
3998+
}
39913999
break;
39924000
}
39934001

@@ -5050,6 +5058,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
50505058
.CdcStreamScans = std::move(cdcStreamScansToResume),
50515059
.TablesToClean = std::move(TablesToClean),
50525060
.BlockStoreVolumesToClean = std::move(BlockStoreVolumesToClean),
5061+
.RestoreTablesToUnmark = std::move(RestoreTablesToUnmark),
50535062
});
50545063
}
50555064
};

ydb/core/tx/schemeshard/schemeshard__operation_alter_table.cpp

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,12 +80,18 @@ TTableInfo::TAlterDataPtr ParseParams(const TPath& path, TTableInfo::TPtr table,
8080
copyAlter.ColumnsSize() != 0 ||
8181
copyAlter.DropColumnsSize() != 0);
8282

83-
if (copyAlter.HasIsBackup() && copyAlter.GetIsBackup() != table->IsBackup) {
83+
if (copyAlter.HasIsBackup() && copyAlter.GetIsBackup() != table->IsBackup) {
8484
errStr = Sprintf("Cannot add/remove 'IsBackup' property");
8585
status = NKikimrScheme::StatusInvalidParameter;
8686
return nullptr;
8787
}
8888

89+
if (copyAlter.HasIsRestore() && copyAlter.GetIsRestore() != table->IsRestore) {
90+
errStr = Sprintf("Cannot add/remove 'IsRestore' property");
91+
status = NKikimrScheme::StatusInvalidParameter;
92+
return nullptr;
93+
}
94+
8995
if (!hasSchemaChanges
9096
&& !copyAlter.HasPartitionConfig()
9197
&& !copyAlter.HasTTLSettings()

ydb/core/tx/schemeshard/schemeshard__operation_backup_restore_common.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,7 @@ class TProposedWaitParts: public TSubOperationState {
246246
return false;
247247
}
248248

249-
TKind::FinishStats(OperationId, txState, context);
249+
TKind::Finish(OperationId, txState, context);
250250
return true;
251251
}
252252

@@ -286,7 +286,7 @@ class TProposedWaitParts: public TSubOperationState {
286286
if (txState->ShardsInProgress.empty()) {
287287
NTableState::AckAllSchemaChanges(OperationId, *txState, context);
288288
context.SS->ChangeTxState(db, OperationId, TTxState::Done);
289-
TKind::FinishStats(OperationId, *txState, context);
289+
TKind::Finish(OperationId, *txState, context);
290290
return true;
291291
}
292292

@@ -362,7 +362,7 @@ class TAborting: public TProposedWaitParts<TKind> {
362362
if (txState->ShardsInProgress.empty()) {
363363
NTableState::AckAllSchemaChanges(OperationId, *txState, context);
364364
context.SS->ChangeTxState(db, OperationId, TTxState::Done);
365-
TKind::FinishStats(OperationId, *txState, context);
365+
TKind::Finish(OperationId, *txState, context);
366366
return true;
367367
}
368368

ydb/core/tx/schemeshard/schemeshard__operation_create_backup.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ struct TBackup {
5454
return TRUCalculator::ReadTable(bytes);
5555
}
5656

57-
static void FinishStats(const TOperationId& opId, TTxState& txState, TOperationContext& context) {
57+
static void Finish(const TOperationId& opId, TTxState& txState, TOperationContext& context) {
5858
if (txState.TxType != TTxState::TxBackup) {
5959
return;
6060
}

ydb/core/tx/schemeshard/schemeshard__operation_create_restore.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ struct TRestore {
5353
return TRUCalculator::BulkUpsert(bytes, rows);
5454
}
5555

56-
static void FinishStats(const TOperationId& opId, TTxState& txState, TOperationContext& context) {
56+
static void Finish(const TOperationId& opId, TTxState& txState, TOperationContext& context) {
5757
if (txState.TxType != TTxState::TxRestore) {
5858
return;
5959
}
@@ -77,6 +77,9 @@ struct TRestore {
7777

7878
NIceDb::TNiceDb db(context.GetDB());
7979
context.SS->PersistCompletedRestore(db, opId.GetTxId(), txState, restoreInfo);
80+
81+
table->IsRestore = false;
82+
context.SS->PersistTableIsRestore(db, txState.TargetPathId, table);
8083
}
8184

8285
static void PersistTask(const TPathId& pathId, const TTxTransaction& tx, TOperationContext& context) {

ydb/core/tx/schemeshard/schemeshard__operation_create_table.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -672,8 +672,8 @@ class TCreateTable: public TSubOperation {
672672
newTable->SetAsyncReplica(true);
673673
}
674674

675-
if (tableInfo->IsRestoreTable()) {
676-
newTable->SetRestoreTable();
675+
if (tableInfo->IsIncrementalRestoreTable()) {
676+
newTable->SetIncrementalRestoreTable();
677677
}
678678

679679
context.SS->Tables[newTable->PathId] = tableInfo;

ydb/core/tx/schemeshard/schemeshard__operation_split_merge.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -785,6 +785,13 @@ class TSplitMerge: public TSubOperation {
785785
return result;
786786
}
787787

788+
if (tableInfo->IsRestore) {
789+
TString errMsg = TStringBuilder()
790+
<< "cannot split/merge restore table " << info.GetTablePath();
791+
result->SetError(NKikimrScheme::StatusInvalidParameter, errMsg);
792+
return result;
793+
}
794+
788795
const THashMap<TShardIdx, ui64>& shardIdx2partition = tableInfo->GetShard2PartitionIdx();
789796

790797
TVector<ui64> srcPartitionIdxs;
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
#include "schemeshard_impl.h"
2+
3+
namespace NKikimr::NSchemeShard {
4+
5+
using namespace NTabletFlatExecutor;
6+
7+
struct TSchemeShard::TTxUnmarkRestoreTables : public TTransactionBase<TSchemeShard> {
8+
static const ui32 BucketSize = 100;
9+
TVector<TPathId> RestoreTablesToUnmark;
10+
ui32 UnmarkedCount;
11+
12+
TTxUnmarkRestoreTables(TSelf* self, TVector<TPathId> tablesToClean)
13+
: TTransactionBase<TSchemeShard>(self)
14+
, RestoreTablesToUnmark(std::move(tablesToClean))
15+
, UnmarkedCount(0)
16+
{}
17+
18+
TTxType GetTxType() const override {
19+
return TXTYPE_UNMARK_RESTORE_TABLES;
20+
}
21+
22+
bool Execute(TTransactionContext &txc, const TActorContext&) override {
23+
NIceDb::TNiceDb db(txc.DB);
24+
25+
UnmarkedCount = 0;
26+
while (UnmarkedCount < BucketSize && RestoreTablesToUnmark) {
27+
TPathId tableId = RestoreTablesToUnmark.back();
28+
if (Self->Tables.contains(tableId)) {
29+
auto table = Self->Tables[tableId];
30+
table->IsRestore = false;
31+
Self->PersistTableIsRestore(db, tableId, table);
32+
}
33+
34+
++UnmarkedCount;
35+
RestoreTablesToUnmark.pop_back();
36+
}
37+
38+
return true;
39+
}
40+
41+
void Complete(const TActorContext &ctx) override {
42+
if (UnmarkedCount) {
43+
LOG_NOTICE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
44+
"TTxUnmarkRestoreTables Complete"
45+
<< ", done for " << UnmarkedCount << " tables"
46+
<< ", left " << RestoreTablesToUnmark.size()
47+
<< ", at schemeshard: "<< Self->TabletID()
48+
);
49+
}
50+
51+
if (RestoreTablesToUnmark) {
52+
Self->Execute(Self->CreateTxUnmarkRestoreTables(std::move(RestoreTablesToUnmark)), ctx);
53+
}
54+
}
55+
};
56+
57+
NTabletFlatExecutor::ITransaction* TSchemeShard::CreateTxUnmarkRestoreTables(TVector<TPathId>&& tablesToUnmark) {
58+
return new TTxUnmarkRestoreTables(this, std::move(tablesToUnmark));
59+
}
60+
61+
} // NKikimr::NSchemeShard
62+

0 commit comments

Comments
 (0)