Skip to content

Commit 7d8e806

Browse files
authored
Merge 19335ff into ec22f24
2 parents ec22f24 + 19335ff commit 7d8e806

24 files changed

+628
-152
lines changed

ydb/core/protos/counters_schemeshard.proto

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,10 @@ enum ESimpleCounters {
201201
COUNTER_BACKUP_CONTROLLER_TABLET_COUNT = 160 [(CounterOpts) = {Name: "BackupControllers"}];
202202

203203
COUNTER_IN_FLIGHT_OPS_TxDropReplication = 161 [(CounterOpts) = {Name: "InFlightOps/DropReplication"}];
204+
205+
COUNTER_IN_FLIGHT_OPS_TxCreateContinuousBackup = 162 [(CounterOpts) = {Name: "InFlightOps/CreateContinuousBackup"}];
206+
COUNTER_IN_FLIGHT_OPS_TxAlterContinuousBackup = 163 [(CounterOpts) = {Name: "InFlightOps/AlterContinuousBackup"}];
207+
COUNTER_IN_FLIGHT_OPS_TxDropContinuousBackup = 164 [(CounterOpts) = {Name: "InFlightOps/DropContinuousBackup"}];
204208
}
205209

206210
enum ECumulativeCounters {
@@ -324,6 +328,10 @@ enum ECumulativeCounters {
324328
COUNTER_FINISHED_OPS_TxCopySequence = 98 [(CounterOpts) = {Name: "FinishedOps/TxCopySequence"}];
325329

326330
COUNTER_FINISHED_OPS_TxDropReplication = 99 [(CounterOpts) = {Name: "FinishedOps/DropReplication"}];
331+
332+
COUNTER_FINISHED_OPS_TxCreateContinuousBackup = 100 [(CounterOpts) = {Name: "FinishedOps/CreateContinuousBackup"}];
333+
COUNTER_FINISHED_OPS_TxAlterContinuousBackup = 101 [(CounterOpts) = {Name: "FinishedOps/AlterContinuousBackup"}];
334+
COUNTER_FINISHED_OPS_TxDropContinuousBackup = 102 [(CounterOpts) = {Name: "FinishedOps/DropContinuousBackup"}];
327335
}
328336

329337
enum EPercentileCounters {

ydb/core/protos/flat_scheme_op.proto

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -851,6 +851,30 @@ message TDropCdcStream {
851851
optional string StreamName = 2;
852852
}
853853

854+
message TContinuousBackupDescription {
855+
}
856+
857+
message TCreateContinuousBackup {
858+
optional string TableName = 1;
859+
optional TContinuousBackupDescription ContinuousBackupDescription = 2;
860+
}
861+
862+
message TAlterContinuousBackup {
863+
optional string TableName = 1;
864+
865+
message TStop {
866+
}
867+
868+
oneof Action {
869+
TStop Stop = 2;
870+
// TODO(innokentii): something like TakeIncremental
871+
}
872+
}
873+
874+
message TDropContinuousBackup {
875+
optional string TableName = 1;
876+
}
877+
854878
enum EIndexType {
855879
EIndexTypeInvalid = 0;
856880
EIndexTypeGlobal = 1;
@@ -1458,6 +1482,14 @@ enum EOperationType {
14581482
ESchemeOpDropView = 95;
14591483

14601484
ESchemeOpDropReplication = 96;
1485+
1486+
/// ContinuousBackup
1487+
// Create
1488+
ESchemeOpCreateContinuousBackup = 97;
1489+
// Alter
1490+
ESchemeOpAlterContinuousBackup = 98;
1491+
// Drop
1492+
ESchemeOpDropContinuousBackup = 99;
14611493
}
14621494

14631495
message TApplyIf {
@@ -1585,6 +1617,10 @@ message TModifyScheme {
15851617

15861618
optional TCopySequence CopySequence = 66;
15871619
optional TReplicationDescription AlterReplication = 67;
1620+
1621+
optional TCreateContinuousBackup CreateContinuousBackup = 68;
1622+
optional TAlterContinuousBackup AlterContinuousBackup = 69;
1623+
optional TDropContinuousBackup DropContinuousBackup = 70;
15881624
}
15891625

15901626
message TCopySequence {
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
#pragma once
2+
3+
namespace NKikimr::NSchemeShard::NBackup {
4+
5+
constexpr static char const* CB_CDC_STREAM_NAME = "continuousBackupImpl";
6+
7+
} // namespace NKikimr::NSchemeShard::NBackup
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
LIBRARY()
2+
3+
SRCS(
4+
constants.h
5+
)
6+
7+
END()

ydb/core/tx/schemeshard/schemeshard__operation.cpp

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1083,6 +1083,14 @@ ISubOperation::TPtr TOperation::RestorePart(TTxState::ETxType txType, TTxState::
10831083
return CreateDropView(NextPartId(), txState);
10841084
case TTxState::ETxType::TxAlterView:
10851085
Y_ABORT("TODO: implement");
1086+
// Continuous Backup
1087+
// Now these functions won't be called because we presist only cdc function internally
1088+
case TTxState::ETxType::TxCreateContinuousBackup:
1089+
Y_ABORT("TODO: implement");
1090+
case TTxState::ETxType::TxAlterContinuousBackup:
1091+
Y_ABORT("TODO: implement");
1092+
case TTxState::ETxType::TxDropContinuousBackup:
1093+
Y_ABORT("TODO: implement");
10861094

10871095
case TTxState::ETxType::TxInvalid:
10881096
Y_UNREACHABLE();
@@ -1311,6 +1319,15 @@ ISubOperation::TPtr TOperation::ConstructPart(NKikimrSchemeOp::EOperationType op
13111319
return CreateDropView(NextPartId(), tx);
13121320
case NKikimrSchemeOp::EOperationType::ESchemeOpAlterView:
13131321
Y_ABORT("TODO: implement");
1322+
1323+
// CDC
1324+
case NKikimrSchemeOp::EOperationType::ESchemeOpCreateContinuousBackup:
1325+
Y_ABORT("multipart operations are handled before, also they require transaction details");
1326+
case NKikimrSchemeOp::EOperationType::ESchemeOpAlterContinuousBackup:
1327+
Y_ABORT("multipart operations are handled before, also they require transaction details");
1328+
case NKikimrSchemeOp::EOperationType::ESchemeOpDropContinuousBackup:
1329+
Y_ABORT("multipart operations are handled before, also they require transaction details");
1330+
13141331
}
13151332

13161333
Y_UNREACHABLE();
@@ -1363,6 +1380,12 @@ TVector<ISubOperation::TPtr> TOperation::ConstructParts(const TTxTransaction& tx
13631380
return CreateNewExternalDataSource(NextPartId(), tx, context);
13641381
case NKikimrSchemeOp::EOperationType::ESchemeOpCreateExternalTable:
13651382
return CreateNewExternalTable(NextPartId(), tx, context);
1383+
case NKikimrSchemeOp::EOperationType::ESchemeOpCreateContinuousBackup:
1384+
return CreateNewContinuousBackup(NextPartId(), tx, context);
1385+
case NKikimrSchemeOp::EOperationType::ESchemeOpAlterContinuousBackup:
1386+
return CreateAlterContinuousBackup(NextPartId(), tx, context);
1387+
case NKikimrSchemeOp::EOperationType::ESchemeOpDropContinuousBackup:
1388+
return CreateDropContinuousBackup(NextPartId(), tx, context);
13661389
default:
13671390
return {ConstructPart(opType, tx)};
13681391
}

ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.cpp

Lines changed: 37 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
#include "schemeshard__operation_alter_cdc_stream.h"
2+
13
#include "schemeshard__operation_part.h"
24
#include "schemeshard__operation_common.h"
35
#include "schemeshard_impl.h"
@@ -471,6 +473,40 @@ class TAlterCdcStreamAtTable: public TSubOperation {
471473

472474
} // anonymous
473475

476+
namespace NCdc {
477+
478+
void DoAlterStream(
479+
const NKikimrSchemeOp::TAlterCdcStream& op,
480+
const TOperationId& opId,
481+
const TPath& workingDirPath,
482+
const TPath& tablePath,
483+
TVector<ISubOperation::TPtr>& result)
484+
{
485+
{
486+
auto outTx = TransactionTemplate(tablePath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpAlterCdcStreamImpl);
487+
outTx.MutableAlterCdcStream()->CopyFrom(op);
488+
489+
if (op.HasGetReady()) {
490+
outTx.MutableLockGuard()->SetOwnerTxId(op.GetGetReady().GetLockTxId());
491+
}
492+
493+
result.push_back(CreateAlterCdcStreamImpl(NextPartId(opId, result), outTx));
494+
}
495+
496+
{
497+
auto outTx = TransactionTemplate(workingDirPath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpAlterCdcStreamAtTable);
498+
outTx.MutableAlterCdcStream()->CopyFrom(op);
499+
500+
if (op.HasGetReady()) {
501+
outTx.MutableLockGuard()->SetOwnerTxId(op.GetGetReady().GetLockTxId());
502+
}
503+
504+
result.push_back(CreateAlterCdcStreamAtTable(NextPartId(opId, result), outTx, op.HasGetReady()));
505+
}
506+
}
507+
508+
} // namespace NCdc
509+
474510
ISubOperation::TPtr CreateAlterCdcStreamImpl(TOperationId id, const TTxTransaction& tx) {
475511
return MakeSubOperation<TAlterCdcStream>(id, tx);
476512
}
@@ -547,27 +583,7 @@ TVector<ISubOperation::TPtr> CreateAlterCdcStream(TOperationId opId, const TTxTr
547583

548584
TVector<ISubOperation::TPtr> result;
549585

550-
{
551-
auto outTx = TransactionTemplate(tablePath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpAlterCdcStreamImpl);
552-
outTx.MutableAlterCdcStream()->CopyFrom(op);
553-
554-
if (op.HasGetReady()) {
555-
outTx.MutableLockGuard()->SetOwnerTxId(op.GetGetReady().GetLockTxId());
556-
}
557-
558-
result.push_back(CreateAlterCdcStreamImpl(NextPartId(opId, result), outTx));
559-
}
560-
561-
{
562-
auto outTx = TransactionTemplate(workingDirPath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpAlterCdcStreamAtTable);
563-
outTx.MutableAlterCdcStream()->CopyFrom(op);
564-
565-
if (op.HasGetReady()) {
566-
outTx.MutableLockGuard()->SetOwnerTxId(op.GetGetReady().GetLockTxId());
567-
}
568-
569-
result.push_back(CreateAlterCdcStreamAtTable(NextPartId(opId, result), outTx, op.HasGetReady()));
570-
}
586+
NCdc::DoAlterStream(op, opId, workingDirPath, tablePath, result);
571587

572588
if (op.HasGetReady()) {
573589
auto outTx = TransactionTemplate(workingDirPath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpDropLock);
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
#pragma once
2+
3+
#include "schemeshard__operation_part.h"
4+
#include "schemeshard__operation_common.h"
5+
#include "schemeshard_impl.h"
6+
7+
#include <ydb/core/engine/mkql_proto.h>
8+
#include <ydb/core/scheme/scheme_types_proto.h>
9+
10+
namespace NKikimr::NSchemeShard::NCdc {
11+
12+
void DoAlterStream(
13+
const NKikimrSchemeOp::TAlterCdcStream& op,
14+
const TOperationId& opId,
15+
const TPath& workingDirPath,
16+
const TPath& tablePath,
17+
TVector<ISubOperation::TPtr>& result);
18+
19+
} // namespace NKikimr::NSchemesShard::NCdc
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
#include "schemeshard__operation_part.h"
2+
#include "schemeshard__operation_common.h"
3+
#include "schemeshard_impl.h"
4+
5+
#include "schemeshard__operation_alter_cdc_stream.h"
6+
7+
#include <ydb/core/tx/schemeshard/backup/constants.h>
8+
9+
#include <ydb/core/engine/mkql_proto.h>
10+
#include <ydb/core/scheme/scheme_types_proto.h>
11+
12+
namespace NKikimr::NSchemeShard {
13+
14+
TVector<ISubOperation::TPtr> CreateAlterContinuousBackup(TOperationId opId, const TTxTransaction& tx, TOperationContext& context) {
15+
Y_ABORT_UNLESS(tx.GetOperationType() == NKikimrSchemeOp::EOperationType::ESchemeOpAlterContinuousBackup);
16+
17+
const auto workingDirPath = TPath::Resolve(tx.GetWorkingDir(), context.SS);
18+
const auto& cbOp = tx.GetAlterContinuousBackup();
19+
const auto& tableName = cbOp.GetTableName();
20+
const auto tablePath = workingDirPath.Child(tableName);
21+
22+
NKikimrSchemeOp::TAlterCdcStream alterCdcStreamOp;
23+
alterCdcStreamOp.SetTableName(tableName);
24+
alterCdcStreamOp.SetStreamName(NBackup::CB_CDC_STREAM_NAME);
25+
alterCdcStreamOp.MutableDisable();
26+
27+
TVector<ISubOperation::TPtr> result;
28+
29+
NCdc::DoAlterStream(alterCdcStreamOp, opId, workingDirPath, tablePath, result);
30+
31+
return result;
32+
}
33+
34+
} // namespace NKikimr::NSchemeShard

0 commit comments

Comments
 (0)