Skip to content

Commit 2d82177

Browse files
authored
Merge 121c415 into ec22f24
2 parents ec22f24 + 121c415 commit 2d82177

22 files changed

+612
-151
lines changed

ydb/core/protos/counters_schemeshard.proto

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,10 @@ enum ESimpleCounters {
201201
COUNTER_BACKUP_CONTROLLER_TABLET_COUNT = 160 [(CounterOpts) = {Name: "BackupControllers"}];
202202

203203
COUNTER_IN_FLIGHT_OPS_TxDropReplication = 161 [(CounterOpts) = {Name: "InFlightOps/DropReplication"}];
204+
205+
COUNTER_IN_FLIGHT_OPS_TxCreateContinuousBackup = 162 [(CounterOpts) = {Name: "InFlightOps/CreateContinuousBackup"}];
206+
COUNTER_IN_FLIGHT_OPS_TxAlterContinuousBackup = 163 [(CounterOpts) = {Name: "InFlightOps/AlterContinuousBackup"}];
207+
COUNTER_IN_FLIGHT_OPS_TxDropContinuousBackup = 164 [(CounterOpts) = {Name: "InFlightOps/DropContinuousBackup"}];
204208
}
205209

206210
enum ECumulativeCounters {
@@ -324,6 +328,10 @@ enum ECumulativeCounters {
324328
COUNTER_FINISHED_OPS_TxCopySequence = 98 [(CounterOpts) = {Name: "FinishedOps/TxCopySequence"}];
325329

326330
COUNTER_FINISHED_OPS_TxDropReplication = 99 [(CounterOpts) = {Name: "FinishedOps/DropReplication"}];
331+
332+
COUNTER_FINISHED_OPS_TxCreateContinuousBackup = 100 [(CounterOpts) = {Name: "FinishedOps/CreateContinuousBackup"}];
333+
COUNTER_FINISHED_OPS_TxAlterContinuousBackup = 101 [(CounterOpts) = {Name: "FinishedOps/AlterContinuousBackup"}];
334+
COUNTER_FINISHED_OPS_TxDropContinuousBackup = 102 [(CounterOpts) = {Name: "FinishedOps/DropContinuousBackup"}];
327335
}
328336

329337
enum EPercentileCounters {

ydb/core/protos/flat_scheme_op.proto

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -851,6 +851,30 @@ message TDropCdcStream {
851851
optional string StreamName = 2;
852852
}
853853

854+
message TContinuousBackupDescription {
855+
}
856+
857+
message TCreateContinuousBackup {
858+
optional string TableName = 1;
859+
optional TContinuousBackupDescription ContinuousBackupDescription = 2;
860+
}
861+
862+
message TAlterContinuousBackup {
863+
optional string TableName = 1;
864+
865+
message TStop {
866+
}
867+
868+
oneof Action {
869+
TStop Stop = 2;
870+
// TODO(innokentii): something like TakeIncremental
871+
}
872+
}
873+
874+
message TDropContinuousBackup {
875+
optional string TableName = 1;
876+
}
877+
854878
enum EIndexType {
855879
EIndexTypeInvalid = 0;
856880
EIndexTypeGlobal = 1;
@@ -1458,6 +1482,14 @@ enum EOperationType {
14581482
ESchemeOpDropView = 95;
14591483

14601484
ESchemeOpDropReplication = 96;
1485+
1486+
/// ContinuousBackup
1487+
// Create
1488+
ESchemeOpCreateContinuousBackup = 97;
1489+
// Alter
1490+
ESchemeOpAlterContinuousBackup = 98;
1491+
// Drop
1492+
ESchemeOpDropContinuousBackup = 99;
14611493
}
14621494

14631495
message TApplyIf {
@@ -1585,6 +1617,10 @@ message TModifyScheme {
15851617

15861618
optional TCopySequence CopySequence = 66;
15871619
optional TReplicationDescription AlterReplication = 67;
1620+
1621+
optional TCreateContinuousBackup CreateContinuousBackup = 68;
1622+
optional TAlterContinuousBackup AlterContinuousBackup = 69;
1623+
optional TDropContinuousBackup DropContinuousBackup = 70;
15881624
}
15891625

15901626
message TCopySequence {

ydb/core/tx/schemeshard/schemeshard__operation.cpp

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1083,6 +1083,14 @@ ISubOperation::TPtr TOperation::RestorePart(TTxState::ETxType txType, TTxState::
10831083
return CreateDropView(NextPartId(), txState);
10841084
case TTxState::ETxType::TxAlterView:
10851085
Y_ABORT("TODO: implement");
1086+
// Continuous Backup
1087+
// Now these functions won't be called because we presist only cdc function internally
1088+
case TTxState::ETxType::TxCreateContinuousBackup:
1089+
Y_ABORT("TODO: implement");
1090+
case TTxState::ETxType::TxAlterContinuousBackup:
1091+
Y_ABORT("TODO: implement");
1092+
case TTxState::ETxType::TxDropContinuousBackup:
1093+
Y_ABORT("TODO: implement");
10861094

10871095
case TTxState::ETxType::TxInvalid:
10881096
Y_UNREACHABLE();
@@ -1311,6 +1319,15 @@ ISubOperation::TPtr TOperation::ConstructPart(NKikimrSchemeOp::EOperationType op
13111319
return CreateDropView(NextPartId(), tx);
13121320
case NKikimrSchemeOp::EOperationType::ESchemeOpAlterView:
13131321
Y_ABORT("TODO: implement");
1322+
1323+
// CDC
1324+
case NKikimrSchemeOp::EOperationType::ESchemeOpCreateContinuousBackup:
1325+
Y_ABORT("multipart operations are handled before, also they require transaction details");
1326+
case NKikimrSchemeOp::EOperationType::ESchemeOpAlterContinuousBackup:
1327+
Y_ABORT("multipart operations are handled before, also they require transaction details");
1328+
case NKikimrSchemeOp::EOperationType::ESchemeOpDropContinuousBackup:
1329+
Y_ABORT("multipart operations are handled before, also they require transaction details");
1330+
13141331
}
13151332

13161333
Y_UNREACHABLE();
@@ -1363,6 +1380,12 @@ TVector<ISubOperation::TPtr> TOperation::ConstructParts(const TTxTransaction& tx
13631380
return CreateNewExternalDataSource(NextPartId(), tx, context);
13641381
case NKikimrSchemeOp::EOperationType::ESchemeOpCreateExternalTable:
13651382
return CreateNewExternalTable(NextPartId(), tx, context);
1383+
case NKikimrSchemeOp::EOperationType::ESchemeOpCreateContinuousBackup:
1384+
return CreateNewContinuousBackup(NextPartId(), tx, context);
1385+
case NKikimrSchemeOp::EOperationType::ESchemeOpAlterContinuousBackup:
1386+
return CreateAlterContinuousBackup(NextPartId(), tx, context);
1387+
case NKikimrSchemeOp::EOperationType::ESchemeOpDropContinuousBackup:
1388+
return CreateDropContinuousBackup(NextPartId(), tx, context);
13661389
default:
13671390
return {ConstructPart(opType, tx)};
13681391
}

ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.cpp

Lines changed: 33 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
#include "schemeshard__operation_alter_cdc_stream.h"
2+
13
#include "schemeshard__operation_part.h"
24
#include "schemeshard__operation_common.h"
35
#include "schemeshard_impl.h"
@@ -471,6 +473,36 @@ class TAlterCdcStreamAtTable: public TSubOperation {
471473

472474
} // anonymous
473475

476+
void DoAlterStream(
477+
const NKikimrSchemeOp::TAlterCdcStream& op,
478+
const TOperationId& opId,
479+
const TPath& workingDirPath,
480+
const TPath& tablePath,
481+
TVector<ISubOperation::TPtr>& result)
482+
{
483+
{
484+
auto outTx = TransactionTemplate(tablePath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpAlterCdcStreamImpl);
485+
outTx.MutableAlterCdcStream()->CopyFrom(op);
486+
487+
if (op.HasGetReady()) {
488+
outTx.MutableLockGuard()->SetOwnerTxId(op.GetGetReady().GetLockTxId());
489+
}
490+
491+
result.push_back(CreateAlterCdcStreamImpl(NextPartId(opId, result), outTx));
492+
}
493+
494+
{
495+
auto outTx = TransactionTemplate(workingDirPath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpAlterCdcStreamAtTable);
496+
outTx.MutableAlterCdcStream()->CopyFrom(op);
497+
498+
if (op.HasGetReady()) {
499+
outTx.MutableLockGuard()->SetOwnerTxId(op.GetGetReady().GetLockTxId());
500+
}
501+
502+
result.push_back(CreateAlterCdcStreamAtTable(NextPartId(opId, result), outTx, op.HasGetReady()));
503+
}
504+
}
505+
474506
ISubOperation::TPtr CreateAlterCdcStreamImpl(TOperationId id, const TTxTransaction& tx) {
475507
return MakeSubOperation<TAlterCdcStream>(id, tx);
476508
}
@@ -547,27 +579,7 @@ TVector<ISubOperation::TPtr> CreateAlterCdcStream(TOperationId opId, const TTxTr
547579

548580
TVector<ISubOperation::TPtr> result;
549581

550-
{
551-
auto outTx = TransactionTemplate(tablePath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpAlterCdcStreamImpl);
552-
outTx.MutableAlterCdcStream()->CopyFrom(op);
553-
554-
if (op.HasGetReady()) {
555-
outTx.MutableLockGuard()->SetOwnerTxId(op.GetGetReady().GetLockTxId());
556-
}
557-
558-
result.push_back(CreateAlterCdcStreamImpl(NextPartId(opId, result), outTx));
559-
}
560-
561-
{
562-
auto outTx = TransactionTemplate(workingDirPath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpAlterCdcStreamAtTable);
563-
outTx.MutableAlterCdcStream()->CopyFrom(op);
564-
565-
if (op.HasGetReady()) {
566-
outTx.MutableLockGuard()->SetOwnerTxId(op.GetGetReady().GetLockTxId());
567-
}
568-
569-
result.push_back(CreateAlterCdcStreamAtTable(NextPartId(opId, result), outTx, op.HasGetReady()));
570-
}
582+
DoAlterStream(op, opId, workingDirPath, tablePath, result);
571583

572584
if (op.HasGetReady()) {
573585
auto outTx = TransactionTemplate(workingDirPath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpDropLock);
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
#pragma once
2+
3+
#include "schemeshard__operation_part.h"
4+
#include "schemeshard__operation_common.h"
5+
#include "schemeshard_impl.h"
6+
7+
#include <ydb/core/engine/mkql_proto.h>
8+
#include <ydb/core/scheme/scheme_types_proto.h>
9+
10+
namespace NKikimr::NSchemeShard {
11+
12+
void DoAlterStream(
13+
const NKikimrSchemeOp::TAlterCdcStream& op,
14+
const TOperationId& opId,
15+
const TPath& workingDirPath,
16+
const TPath& tablePath,
17+
TVector<ISubOperation::TPtr>& result);
18+
19+
} // namespace NKikimr::NSchemesShard
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
#include "schemeshard__operation_part.h"
2+
#include "schemeshard__operation_common.h"
3+
#include "schemeshard_impl.h"
4+
5+
#include "schemeshard__operation_alter_cdc_stream.h"
6+
7+
#include <ydb/core/engine/mkql_proto.h>
8+
#include <ydb/core/scheme/scheme_types_proto.h>
9+
10+
namespace {
11+
12+
constexpr static char const* cbCdcStreamName = "continuousBackupImpl";
13+
14+
}
15+
16+
namespace NKikimr::NSchemeShard {
17+
18+
TVector<ISubOperation::TPtr> CreateAlterContinuousBackup(TOperationId opId, const TTxTransaction& tx, TOperationContext& context) {
19+
Y_ABORT_UNLESS(tx.GetOperationType() == NKikimrSchemeOp::EOperationType::ESchemeOpAlterContinuousBackup);
20+
21+
const auto workingDirPath = TPath::Resolve(tx.GetWorkingDir(), context.SS);
22+
const auto& cbOp = tx.GetAlterContinuousBackup();
23+
const auto& tableName = cbOp.GetTableName();
24+
const auto tablePath = workingDirPath.Child(tableName);
25+
26+
NKikimrSchemeOp::TAlterCdcStream alterCdcStreamOp;
27+
alterCdcStreamOp.SetTableName(tableName);
28+
alterCdcStreamOp.SetStreamName(cbCdcStreamName);
29+
alterCdcStreamOp.MutableDisable();
30+
31+
TVector<ISubOperation::TPtr> result;
32+
33+
DoAlterStream(alterCdcStreamOp, opId, workingDirPath, tablePath, result);
34+
35+
return result;
36+
}
37+
38+
} // namespace NKikimr::NSchemeShard

0 commit comments

Comments
 (0)