Skip to content

Commit 071f73e

Browse files
authored
Merge 27a59e5 into 81253b5
2 parents 81253b5 + 27a59e5 commit 071f73e

24 files changed

+731
-91
lines changed

ydb/core/protos/counters_schemeshard.proto

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,10 @@ enum ESimpleCounters {
201201
COUNTER_BACKUP_CONTROLLER_TABLET_COUNT = 160 [(CounterOpts) = {Name: "BackupControllers"}];
202202

203203
COUNTER_IN_FLIGHT_OPS_TxDropReplication = 161 [(CounterOpts) = {Name: "InFlightOps/DropReplication"}];
204+
205+
COUNTER_IN_FLIGHT_OPS_TxCreateContinuousBackup = 162 [(CounterOpts) = {Name: "InFlightOps/CreateContinuousBackup"}];
206+
COUNTER_IN_FLIGHT_OPS_TxAlterContinuousBackup = 163 [(CounterOpts) = {Name: "InFlightOps/AlterContinuousBackup"}];
207+
COUNTER_IN_FLIGHT_OPS_TxDropContinuousBackup = 164 [(CounterOpts) = {Name: "InFlightOps/DropContinuousBackup"}];
204208
}
205209

206210
enum ECumulativeCounters {
@@ -324,6 +328,10 @@ enum ECumulativeCounters {
324328
COUNTER_FINISHED_OPS_TxCopySequence = 98 [(CounterOpts) = {Name: "FinishedOps/TxCopySequence"}];
325329

326330
COUNTER_FINISHED_OPS_TxDropReplication = 99 [(CounterOpts) = {Name: "FinishedOps/DropReplication"}];
331+
332+
COUNTER_FINISHED_OPS_TxCreateContinuousBackup = 100 [(CounterOpts) = {Name: "FinishedOps/CreateContinuousBackup"}];
333+
COUNTER_FINISHED_OPS_TxAlterContinuousBackup = 101 [(CounterOpts) = {Name: "FinishedOps/AlterContinuousBackup"}];
334+
COUNTER_FINISHED_OPS_TxDropContinuousBackup = 102 [(CounterOpts) = {Name: "FinishedOps/DropContinuousBackup"}];
327335
}
328336

329337
enum EPercentileCounters {

ydb/core/protos/flat_scheme_op.proto

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -851,6 +851,30 @@ message TDropCdcStream {
851851
optional string StreamName = 2;
852852
}
853853

854+
message TContinuousBackupDescription {
855+
}
856+
857+
message TCreateContinuousBackup {
858+
optional string TableName = 1;
859+
optional TContinuousBackupDescription ContinuousBackupDescription = 2;
860+
}
861+
862+
message TAlterContinuousBackup {
863+
optional string TableName = 1;
864+
865+
message TStop {
866+
}
867+
868+
oneof Action {
869+
TStop Stop = 2;
870+
// TODO(innokentii): something like TakeIncremental
871+
}
872+
}
873+
874+
message TDropContinuousBackup {
875+
optional string TableName = 1;
876+
}
877+
854878
enum EIndexType {
855879
EIndexTypeInvalid = 0;
856880
EIndexTypeGlobal = 1;
@@ -1458,6 +1482,14 @@ enum EOperationType {
14581482
ESchemeOpDropView = 95;
14591483

14601484
ESchemeOpDropReplication = 96;
1485+
1486+
/// ContinuousBackup
1487+
// Create
1488+
ESchemeOpCreateContinuousBackup = 97;
1489+
// Alter
1490+
ESchemeOpAlterContinuousBackup = 98;
1491+
// Drop
1492+
ESchemeOpDropContinuousBackup = 99;
14611493
}
14621494

14631495
message TApplyIf {
@@ -1585,6 +1617,10 @@ message TModifyScheme {
15851617

15861618
optional TCopySequence CopySequence = 66;
15871619
optional TReplicationDescription AlterReplication = 67;
1620+
1621+
optional TCreateContinuousBackup CreateContinuousBackup = 68;
1622+
optional TAlterContinuousBackup AlterContinuousBackup = 69;
1623+
optional TDropContinuousBackup DropContinuousBackup = 70;
15881624
}
15891625

15901626
message TCopySequence {
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
#pragma once
2+
3+
namespace NKikimr::NSchemeShard::NBackup {
4+
5+
constexpr static char const* CB_CDC_STREAM_NAME = "continuousBackupImpl";
6+
7+
} // namespace NKikimr::NSchemeShard::NBackup
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
LIBRARY()
2+
3+
SRCS(
4+
constants.h
5+
)
6+
7+
END()

ydb/core/tx/schemeshard/schemeshard__operation.cpp

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1083,6 +1083,14 @@ ISubOperation::TPtr TOperation::RestorePart(TTxState::ETxType txType, TTxState::
10831083
return CreateDropView(NextPartId(), txState);
10841084
case TTxState::ETxType::TxAlterView:
10851085
Y_ABORT("TODO: implement");
1086+
// Continuous Backup
1087+
// Now these functions won't be called because we presist only cdc function internally
1088+
case TTxState::ETxType::TxCreateContinuousBackup:
1089+
Y_ABORT("TODO: implement");
1090+
case TTxState::ETxType::TxAlterContinuousBackup:
1091+
Y_ABORT("TODO: implement");
1092+
case TTxState::ETxType::TxDropContinuousBackup:
1093+
Y_ABORT("TODO: implement");
10861094

10871095
case TTxState::ETxType::TxInvalid:
10881096
Y_UNREACHABLE();
@@ -1311,6 +1319,15 @@ ISubOperation::TPtr TOperation::ConstructPart(NKikimrSchemeOp::EOperationType op
13111319
return CreateDropView(NextPartId(), tx);
13121320
case NKikimrSchemeOp::EOperationType::ESchemeOpAlterView:
13131321
Y_ABORT("TODO: implement");
1322+
1323+
// CDC
1324+
case NKikimrSchemeOp::EOperationType::ESchemeOpCreateContinuousBackup:
1325+
Y_ABORT("multipart operations are handled before, also they require transaction details");
1326+
case NKikimrSchemeOp::EOperationType::ESchemeOpAlterContinuousBackup:
1327+
Y_ABORT("multipart operations are handled before, also they require transaction details");
1328+
case NKikimrSchemeOp::EOperationType::ESchemeOpDropContinuousBackup:
1329+
Y_ABORT("multipart operations are handled before, also they require transaction details");
1330+
13141331
}
13151332

13161333
Y_UNREACHABLE();
@@ -1363,6 +1380,12 @@ TVector<ISubOperation::TPtr> TOperation::ConstructParts(const TTxTransaction& tx
13631380
return CreateNewExternalDataSource(NextPartId(), tx, context);
13641381
case NKikimrSchemeOp::EOperationType::ESchemeOpCreateExternalTable:
13651382
return CreateNewExternalTable(NextPartId(), tx, context);
1383+
case NKikimrSchemeOp::EOperationType::ESchemeOpCreateContinuousBackup:
1384+
return CreateNewContinuousBackup(NextPartId(), tx, context);
1385+
case NKikimrSchemeOp::EOperationType::ESchemeOpAlterContinuousBackup:
1386+
return CreateAlterContinuousBackup(NextPartId(), tx, context);
1387+
case NKikimrSchemeOp::EOperationType::ESchemeOpDropContinuousBackup:
1388+
return CreateDropContinuousBackup(NextPartId(), tx, context);
13661389
default:
13671390
return {ConstructPart(opType, tx)};
13681391
}

ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.cpp

Lines changed: 75 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
#include "schemeshard__operation_alter_cdc_stream.h"
2+
13
#include "schemeshard__operation_part.h"
24
#include "schemeshard__operation_common.h"
35
#include "schemeshard_impl.h"
@@ -8,6 +10,8 @@
810

911
namespace NKikimr::NSchemeShard {
1012

13+
namespace NCdc {
14+
1115
namespace {
1216

1317
class TPropose: public TSubOperationState {
@@ -471,35 +475,12 @@ class TAlterCdcStreamAtTable: public TSubOperation {
471475

472476
} // anonymous
473477

474-
ISubOperation::TPtr CreateAlterCdcStreamImpl(TOperationId id, const TTxTransaction& tx) {
475-
return MakeSubOperation<TAlterCdcStream>(id, tx);
476-
}
477-
478-
ISubOperation::TPtr CreateAlterCdcStreamImpl(TOperationId id, TTxState::ETxState state) {
479-
return MakeSubOperation<TAlterCdcStream>(id, state);
480-
}
481-
482-
ISubOperation::TPtr CreateAlterCdcStreamAtTable(TOperationId id, const TTxTransaction& tx, bool dropSnapshot) {
483-
return MakeSubOperation<TAlterCdcStreamAtTable>(id, tx, dropSnapshot);
484-
}
485-
486-
ISubOperation::TPtr CreateAlterCdcStreamAtTable(TOperationId id, TTxState::ETxState state, bool dropSnapshot) {
487-
return MakeSubOperation<TAlterCdcStreamAtTable>(id, state, dropSnapshot);
488-
}
489-
490-
TVector<ISubOperation::TPtr> CreateAlterCdcStream(TOperationId opId, const TTxTransaction& tx, TOperationContext& context) {
491-
Y_ABORT_UNLESS(tx.GetOperationType() == NKikimrSchemeOp::EOperationType::ESchemeOpAlterCdcStream);
492-
493-
LOG_D("CreateAlterCdcStream"
494-
<< ": opId# " << opId
495-
<< ", tx# " << tx.ShortDebugString());
496-
497-
const auto& op = tx.GetAlterCdcStream();
498-
const auto& tableName = op.GetTableName();
499-
const auto& streamName = op.GetStreamName();
500-
501-
const auto workingDirPath = TPath::Resolve(tx.GetWorkingDir(), context.SS);
502-
478+
std::variant<TStreamPaths, ISubOperation::TPtr> DoAlterStreamPathChecks(
479+
const TOperationId& opId,
480+
const TPath& workingDirPath,
481+
const TString& tableName,
482+
const TString& streamName)
483+
{
503484
const auto tablePath = workingDirPath.Child(tableName);
504485
{
505486
const auto checks = tablePath.Check();
@@ -515,7 +496,7 @@ TVector<ISubOperation::TPtr> CreateAlterCdcStream(TOperationId opId, const TTxTr
515496
.NotUnderOperation();
516497

517498
if (!checks) {
518-
return {CreateReject(opId, checks.GetStatus(), checks.GetError())};
499+
return CreateReject(opId, checks.GetStatus(), checks.GetError());
519500
}
520501
}
521502

@@ -532,21 +513,20 @@ TVector<ISubOperation::TPtr> CreateAlterCdcStream(TOperationId opId, const TTxTr
532513
.NotUnderOperation();
533514

534515
if (!checks) {
535-
return {CreateReject(opId, checks.GetStatus(), checks.GetError())};
516+
return CreateReject(opId, checks.GetStatus(), checks.GetError());
536517
}
537518
}
538519

539-
TString errStr;
540-
if (!context.SS->CheckApplyIf(tx, errStr)) {
541-
return {CreateReject(opId, NKikimrScheme::StatusPreconditionFailed, errStr)};
542-
}
543-
544-
if (!context.SS->CheckLocks(tablePath.Base()->PathId, tx, errStr)) {
545-
return {CreateReject(opId, NKikimrScheme::StatusMultipleModifications, errStr)};
546-
}
547-
548-
TVector<ISubOperation::TPtr> result;
520+
return TStreamPaths{tablePath, streamPath};
521+
}
549522

523+
void DoAlterStream(
524+
const NKikimrSchemeOp::TAlterCdcStream& op,
525+
const TOperationId& opId,
526+
const TPath& workingDirPath,
527+
const TPath& tablePath,
528+
TVector<ISubOperation::TPtr>& result)
529+
{
550530
{
551531
auto outTx = TransactionTemplate(tablePath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpAlterCdcStreamImpl);
552532
outTx.MutableAlterCdcStream()->CopyFrom(op);
@@ -568,6 +548,60 @@ TVector<ISubOperation::TPtr> CreateAlterCdcStream(TOperationId opId, const TTxTr
568548

569549
result.push_back(CreateAlterCdcStreamAtTable(NextPartId(opId, result), outTx, op.HasGetReady()));
570550
}
551+
}
552+
553+
} // namespace NCdc
554+
555+
using namespace NCdc;
556+
557+
ISubOperation::TPtr CreateAlterCdcStreamImpl(TOperationId id, const TTxTransaction& tx) {
558+
return MakeSubOperation<TAlterCdcStream>(id, tx);
559+
}
560+
561+
ISubOperation::TPtr CreateAlterCdcStreamImpl(TOperationId id, TTxState::ETxState state) {
562+
return MakeSubOperation<TAlterCdcStream>(id, state);
563+
}
564+
565+
ISubOperation::TPtr CreateAlterCdcStreamAtTable(TOperationId id, const TTxTransaction& tx, bool dropSnapshot) {
566+
return MakeSubOperation<TAlterCdcStreamAtTable>(id, tx, dropSnapshot);
567+
}
568+
569+
ISubOperation::TPtr CreateAlterCdcStreamAtTable(TOperationId id, TTxState::ETxState state, bool dropSnapshot) {
570+
return MakeSubOperation<TAlterCdcStreamAtTable>(id, state, dropSnapshot);
571+
}
572+
573+
TVector<ISubOperation::TPtr> CreateAlterCdcStream(TOperationId opId, const TTxTransaction& tx, TOperationContext& context) {
574+
Y_ABORT_UNLESS(tx.GetOperationType() == NKikimrSchemeOp::EOperationType::ESchemeOpAlterCdcStream);
575+
576+
LOG_D("CreateAlterCdcStream"
577+
<< ": opId# " << opId
578+
<< ", tx# " << tx.ShortDebugString());
579+
580+
const auto& op = tx.GetAlterCdcStream();
581+
const auto& tableName = op.GetTableName();
582+
const auto& streamName = op.GetStreamName();
583+
584+
const auto workingDirPath = TPath::Resolve(tx.GetWorkingDir(), context.SS);
585+
586+
const auto checksResult = DoAlterStreamPathChecks(opId, workingDirPath, tableName, streamName);
587+
if (std::holds_alternative<ISubOperation::TPtr>(checksResult)) {
588+
return {std::get<ISubOperation::TPtr>(checksResult)};
589+
}
590+
591+
const auto [tablePath, streamPath] = std::get<TStreamPaths>(checksResult);
592+
593+
TString errStr;
594+
if (!context.SS->CheckApplyIf(tx, errStr)) {
595+
return {CreateReject(opId, NKikimrScheme::StatusPreconditionFailed, errStr)};
596+
}
597+
598+
if (!context.SS->CheckLocks(tablePath.Base()->PathId, tx, errStr)) {
599+
return {CreateReject(opId, NKikimrScheme::StatusMultipleModifications, errStr)};
600+
}
601+
602+
TVector<ISubOperation::TPtr> result;
603+
604+
DoAlterStream(op, opId, workingDirPath, tablePath, result);
571605

572606
if (op.HasGetReady()) {
573607
auto outTx = TransactionTemplate(workingDirPath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpDropLock);
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
#pragma once
2+
3+
#include "schemeshard__operation_create_cdc_stream.h" // for TStreamPaths
4+
#include "schemeshard__operation_common.h"
5+
#include "schemeshard__operation_part.h"
6+
#include "schemeshard_impl.h"
7+
8+
#include <ydb/core/engine/mkql_proto.h>
9+
#include <ydb/core/scheme/scheme_types_proto.h>
10+
11+
namespace NKikimr::NSchemeShard::NCdc {
12+
13+
std::variant<TStreamPaths, ISubOperation::TPtr> DoAlterStreamPathChecks(
14+
const TOperationId& opId,
15+
const TPath& workingDirPath,
16+
const TString& tableName,
17+
const TString& streamName);
18+
19+
void DoAlterStream(
20+
const NKikimrSchemeOp::TAlterCdcStream& op,
21+
const TOperationId& opId,
22+
const TPath& workingDirPath,
23+
const TPath& tablePath,
24+
TVector<ISubOperation::TPtr>& result);
25+
26+
} // namespace NKikimr::NSchemesShard::NCdc
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
#include "schemeshard__operation_part.h"
2+
#include "schemeshard__operation_common.h"
3+
#include "schemeshard_impl.h"
4+
5+
#include "schemeshard__operation_alter_cdc_stream.h"
6+
7+
#include <ydb/core/tx/schemeshard/backup/constants.h>
8+
9+
#include <ydb/core/engine/mkql_proto.h>
10+
#include <ydb/core/scheme/scheme_types_proto.h>
11+
12+
namespace NKikimr::NSchemeShard {
13+
14+
TVector<ISubOperation::TPtr> CreateAlterContinuousBackup(TOperationId opId, const TTxTransaction& tx, TOperationContext& context) {
15+
Y_ABORT_UNLESS(tx.GetOperationType() == NKikimrSchemeOp::EOperationType::ESchemeOpAlterContinuousBackup);
16+
17+
const auto workingDirPath = TPath::Resolve(tx.GetWorkingDir(), context.SS);
18+
const auto& cbOp = tx.GetAlterContinuousBackup();
19+
const auto& tableName = cbOp.GetTableName();
20+
21+
const auto checksResult = NCdc::DoAlterStreamPathChecks(opId, workingDirPath, tableName, NBackup::CB_CDC_STREAM_NAME);
22+
if (std::holds_alternative<ISubOperation::TPtr>(checksResult)) {
23+
return {std::get<ISubOperation::TPtr>(checksResult)};
24+
}
25+
26+
const auto [tablePath, streamPath] = std::get<NCdc::TStreamPaths>(checksResult);
27+
28+
TString errStr;
29+
if (!context.SS->CheckApplyIf(tx, errStr)) {
30+
return {CreateReject(opId, NKikimrScheme::StatusPreconditionFailed, errStr)};
31+
}
32+
33+
if (!context.SS->CheckLocks(tablePath.Base()->PathId, tx, errStr)) {
34+
return {CreateReject(opId, NKikimrScheme::StatusMultipleModifications, errStr)};
35+
}
36+
37+
NKikimrSchemeOp::TAlterCdcStream alterCdcStreamOp;
38+
alterCdcStreamOp.SetTableName(tableName);
39+
alterCdcStreamOp.SetStreamName(NBackup::CB_CDC_STREAM_NAME);
40+
41+
switch (cbOp.GetActionCase()) {
42+
case NKikimrSchemeOp::TAlterContinuousBackup::kStop:
43+
alterCdcStreamOp.MutableDisable();
44+
default:
45+
return {CreateReject(opId, NKikimrScheme::StatusInvalidParameter, TStringBuilder()
46+
<< "Unknown action: " << static_cast<ui32>(cbOp.GetActionCase()))};
47+
}
48+
49+
TVector<ISubOperation::TPtr> result;
50+
51+
NCdc::DoAlterStream(alterCdcStreamOp, opId, workingDirPath, tablePath, result);
52+
53+
return result;
54+
}
55+
56+
} // namespace NKikimr::NSchemeShard

0 commit comments

Comments
 (0)