Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions ydb/core/protos/counters_schemeshard.proto
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,10 @@ enum ESimpleCounters {
COUNTER_BACKUP_CONTROLLER_TABLET_COUNT = 160 [(CounterOpts) = {Name: "BackupControllers"}];

COUNTER_IN_FLIGHT_OPS_TxDropReplication = 161 [(CounterOpts) = {Name: "InFlightOps/DropReplication"}];

COUNTER_IN_FLIGHT_OPS_TxCreateContinuousBackup = 162 [(CounterOpts) = {Name: "InFlightOps/CreateContinuousBackup"}];
COUNTER_IN_FLIGHT_OPS_TxAlterContinuousBackup = 163 [(CounterOpts) = {Name: "InFlightOps/AlterContinuousBackup"}];
COUNTER_IN_FLIGHT_OPS_TxDropContinuousBackup = 164 [(CounterOpts) = {Name: "InFlightOps/DropContinuousBackup"}];
}

enum ECumulativeCounters {
Expand Down Expand Up @@ -324,6 +328,10 @@ enum ECumulativeCounters {
COUNTER_FINISHED_OPS_TxCopySequence = 98 [(CounterOpts) = {Name: "FinishedOps/TxCopySequence"}];

COUNTER_FINISHED_OPS_TxDropReplication = 99 [(CounterOpts) = {Name: "FinishedOps/DropReplication"}];

COUNTER_FINISHED_OPS_TxCreateContinuousBackup = 100 [(CounterOpts) = {Name: "FinishedOps/CreateContinuousBackup"}];
COUNTER_FINISHED_OPS_TxAlterContinuousBackup = 101 [(CounterOpts) = {Name: "FinishedOps/AlterContinuousBackup"}];
COUNTER_FINISHED_OPS_TxDropContinuousBackup = 102 [(CounterOpts) = {Name: "FinishedOps/DropContinuousBackup"}];
}

enum EPercentileCounters {
Expand Down
36 changes: 36 additions & 0 deletions ydb/core/protos/flat_scheme_op.proto
Original file line number Diff line number Diff line change
Expand Up @@ -851,6 +851,30 @@ message TDropCdcStream {
optional string StreamName = 2;
}

message TContinuousBackupDescription {
}

message TCreateContinuousBackup {
optional string TableName = 1;
optional TContinuousBackupDescription ContinuousBackupDescription = 2;
}

message TAlterContinuousBackup {
optional string TableName = 1;

message TStop {
}

oneof Action {
TStop Stop = 2;
// TODO(innokentii): something like TakeIncremental
}
}

message TDropContinuousBackup {
optional string TableName = 1;
}

enum EIndexType {
EIndexTypeInvalid = 0;
EIndexTypeGlobal = 1;
Expand Down Expand Up @@ -1458,6 +1482,14 @@ enum EOperationType {
ESchemeOpDropView = 95;

ESchemeOpDropReplication = 96;

/// ContinuousBackup
// Create
ESchemeOpCreateContinuousBackup = 97;
// Alter
ESchemeOpAlterContinuousBackup = 98;
// Drop
ESchemeOpDropContinuousBackup = 99;
}

message TApplyIf {
Expand Down Expand Up @@ -1585,6 +1617,10 @@ message TModifyScheme {

optional TCopySequence CopySequence = 66;
optional TReplicationDescription AlterReplication = 67;

optional TCreateContinuousBackup CreateContinuousBackup = 68;
optional TAlterContinuousBackup AlterContinuousBackup = 69;
optional TDropContinuousBackup DropContinuousBackup = 70;
}

message TCopySequence {
Expand Down
7 changes: 7 additions & 0 deletions ydb/core/tx/schemeshard/backup/constants.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#pragma once

namespace NKikimr::NSchemeShard::NBackup {

constexpr static char const* CB_CDC_STREAM_NAME = "continuousBackupImpl";

} // namespace NKikimr::NSchemeShard::NBackup
7 changes: 7 additions & 0 deletions ydb/core/tx/schemeshard/backup/ya.make
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
LIBRARY()

SRCS(
constants.h
)

END()
23 changes: 23 additions & 0 deletions ydb/core/tx/schemeshard/schemeshard__operation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1083,6 +1083,14 @@ ISubOperation::TPtr TOperation::RestorePart(TTxState::ETxType txType, TTxState::
return CreateDropView(NextPartId(), txState);
case TTxState::ETxType::TxAlterView:
Y_ABORT("TODO: implement");
// Continuous Backup
// Now these functions won't be called because we presist only cdc function internally
case TTxState::ETxType::TxCreateContinuousBackup:
Y_ABORT("TODO: implement");
case TTxState::ETxType::TxAlterContinuousBackup:
Y_ABORT("TODO: implement");
case TTxState::ETxType::TxDropContinuousBackup:
Y_ABORT("TODO: implement");

case TTxState::ETxType::TxInvalid:
Y_UNREACHABLE();
Expand Down Expand Up @@ -1311,6 +1319,15 @@ ISubOperation::TPtr TOperation::ConstructPart(NKikimrSchemeOp::EOperationType op
return CreateDropView(NextPartId(), tx);
case NKikimrSchemeOp::EOperationType::ESchemeOpAlterView:
Y_ABORT("TODO: implement");

// CDC
case NKikimrSchemeOp::EOperationType::ESchemeOpCreateContinuousBackup:
Y_ABORT("multipart operations are handled before, also they require transaction details");
case NKikimrSchemeOp::EOperationType::ESchemeOpAlterContinuousBackup:
Y_ABORT("multipart operations are handled before, also they require transaction details");
case NKikimrSchemeOp::EOperationType::ESchemeOpDropContinuousBackup:
Y_ABORT("multipart operations are handled before, also they require transaction details");

}

Y_UNREACHABLE();
Expand Down Expand Up @@ -1363,6 +1380,12 @@ TVector<ISubOperation::TPtr> TOperation::ConstructParts(const TTxTransaction& tx
return CreateNewExternalDataSource(NextPartId(), tx, context);
case NKikimrSchemeOp::EOperationType::ESchemeOpCreateExternalTable:
return CreateNewExternalTable(NextPartId(), tx, context);
case NKikimrSchemeOp::EOperationType::ESchemeOpCreateContinuousBackup:
return CreateNewContinuousBackup(NextPartId(), tx, context);
case NKikimrSchemeOp::EOperationType::ESchemeOpAlterContinuousBackup:
return CreateAlterContinuousBackup(NextPartId(), tx, context);
case NKikimrSchemeOp::EOperationType::ESchemeOpDropContinuousBackup:
return CreateDropContinuousBackup(NextPartId(), tx, context);
default:
return {ConstructPart(opType, tx)};
}
Expand Down
116 changes: 75 additions & 41 deletions ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#include "schemeshard__operation_alter_cdc_stream.h"

#include "schemeshard__operation_part.h"
#include "schemeshard__operation_common.h"
#include "schemeshard_impl.h"
Expand All @@ -8,6 +10,8 @@

namespace NKikimr::NSchemeShard {

namespace NCdc {

namespace {

class TPropose: public TSubOperationState {
Expand Down Expand Up @@ -471,35 +475,12 @@ class TAlterCdcStreamAtTable: public TSubOperation {

} // anonymous

ISubOperation::TPtr CreateAlterCdcStreamImpl(TOperationId id, const TTxTransaction& tx) {
return MakeSubOperation<TAlterCdcStream>(id, tx);
}

ISubOperation::TPtr CreateAlterCdcStreamImpl(TOperationId id, TTxState::ETxState state) {
return MakeSubOperation<TAlterCdcStream>(id, state);
}

ISubOperation::TPtr CreateAlterCdcStreamAtTable(TOperationId id, const TTxTransaction& tx, bool dropSnapshot) {
return MakeSubOperation<TAlterCdcStreamAtTable>(id, tx, dropSnapshot);
}

ISubOperation::TPtr CreateAlterCdcStreamAtTable(TOperationId id, TTxState::ETxState state, bool dropSnapshot) {
return MakeSubOperation<TAlterCdcStreamAtTable>(id, state, dropSnapshot);
}

TVector<ISubOperation::TPtr> CreateAlterCdcStream(TOperationId opId, const TTxTransaction& tx, TOperationContext& context) {
Y_ABORT_UNLESS(tx.GetOperationType() == NKikimrSchemeOp::EOperationType::ESchemeOpAlterCdcStream);

LOG_D("CreateAlterCdcStream"
<< ": opId# " << opId
<< ", tx# " << tx.ShortDebugString());

const auto& op = tx.GetAlterCdcStream();
const auto& tableName = op.GetTableName();
const auto& streamName = op.GetStreamName();

const auto workingDirPath = TPath::Resolve(tx.GetWorkingDir(), context.SS);

std::variant<TStreamPaths, ISubOperation::TPtr> DoAlterStreamPathChecks(
const TOperationId& opId,
const TPath& workingDirPath,
const TString& tableName,
const TString& streamName)
{
const auto tablePath = workingDirPath.Child(tableName);
{
const auto checks = tablePath.Check();
Expand All @@ -515,7 +496,7 @@ TVector<ISubOperation::TPtr> CreateAlterCdcStream(TOperationId opId, const TTxTr
.NotUnderOperation();

if (!checks) {
return {CreateReject(opId, checks.GetStatus(), checks.GetError())};
return CreateReject(opId, checks.GetStatus(), checks.GetError());
}
}

Expand All @@ -532,21 +513,20 @@ TVector<ISubOperation::TPtr> CreateAlterCdcStream(TOperationId opId, const TTxTr
.NotUnderOperation();

if (!checks) {
return {CreateReject(opId, checks.GetStatus(), checks.GetError())};
return CreateReject(opId, checks.GetStatus(), checks.GetError());
}
}

TString errStr;
if (!context.SS->CheckApplyIf(tx, errStr)) {
return {CreateReject(opId, NKikimrScheme::StatusPreconditionFailed, errStr)};
}

if (!context.SS->CheckLocks(tablePath.Base()->PathId, tx, errStr)) {
return {CreateReject(opId, NKikimrScheme::StatusMultipleModifications, errStr)};
}

TVector<ISubOperation::TPtr> result;
return TStreamPaths{tablePath, streamPath};
}

void DoAlterStream(
const NKikimrSchemeOp::TAlterCdcStream& op,
const TOperationId& opId,
const TPath& workingDirPath,
const TPath& tablePath,
TVector<ISubOperation::TPtr>& result)
{
{
auto outTx = TransactionTemplate(tablePath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpAlterCdcStreamImpl);
outTx.MutableAlterCdcStream()->CopyFrom(op);
Expand All @@ -568,6 +548,60 @@ TVector<ISubOperation::TPtr> CreateAlterCdcStream(TOperationId opId, const TTxTr

result.push_back(CreateAlterCdcStreamAtTable(NextPartId(opId, result), outTx, op.HasGetReady()));
}
}

} // namespace NCdc

using namespace NCdc;

ISubOperation::TPtr CreateAlterCdcStreamImpl(TOperationId id, const TTxTransaction& tx) {
return MakeSubOperation<TAlterCdcStream>(id, tx);
}

ISubOperation::TPtr CreateAlterCdcStreamImpl(TOperationId id, TTxState::ETxState state) {
return MakeSubOperation<TAlterCdcStream>(id, state);
}

ISubOperation::TPtr CreateAlterCdcStreamAtTable(TOperationId id, const TTxTransaction& tx, bool dropSnapshot) {
return MakeSubOperation<TAlterCdcStreamAtTable>(id, tx, dropSnapshot);
}

ISubOperation::TPtr CreateAlterCdcStreamAtTable(TOperationId id, TTxState::ETxState state, bool dropSnapshot) {
return MakeSubOperation<TAlterCdcStreamAtTable>(id, state, dropSnapshot);
}

TVector<ISubOperation::TPtr> CreateAlterCdcStream(TOperationId opId, const TTxTransaction& tx, TOperationContext& context) {
Y_ABORT_UNLESS(tx.GetOperationType() == NKikimrSchemeOp::EOperationType::ESchemeOpAlterCdcStream);

LOG_D("CreateAlterCdcStream"
<< ": opId# " << opId
<< ", tx# " << tx.ShortDebugString());

const auto& op = tx.GetAlterCdcStream();
const auto& tableName = op.GetTableName();
const auto& streamName = op.GetStreamName();

const auto workingDirPath = TPath::Resolve(tx.GetWorkingDir(), context.SS);

const auto checksResult = DoAlterStreamPathChecks(opId, workingDirPath, tableName, streamName);
if (std::holds_alternative<ISubOperation::TPtr>(checksResult)) {
return {std::get<ISubOperation::TPtr>(checksResult)};
}

const auto [tablePath, streamPath] = std::get<TStreamPaths>(checksResult);

TString errStr;
if (!context.SS->CheckApplyIf(tx, errStr)) {
return {CreateReject(opId, NKikimrScheme::StatusPreconditionFailed, errStr)};
}

if (!context.SS->CheckLocks(tablePath.Base()->PathId, tx, errStr)) {
return {CreateReject(opId, NKikimrScheme::StatusMultipleModifications, errStr)};
}

TVector<ISubOperation::TPtr> result;

DoAlterStream(op, opId, workingDirPath, tablePath, result);

if (op.HasGetReady()) {
auto outTx = TransactionTemplate(workingDirPath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpDropLock);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#pragma once

#include "schemeshard__operation_create_cdc_stream.h" // for TStreamPaths
#include "schemeshard__operation_common.h"
#include "schemeshard__operation_part.h"
#include "schemeshard_impl.h"

#include <ydb/core/engine/mkql_proto.h>
#include <ydb/core/scheme/scheme_types_proto.h>

namespace NKikimr::NSchemeShard::NCdc {

std::variant<TStreamPaths, ISubOperation::TPtr> DoAlterStreamPathChecks(
const TOperationId& opId,
const TPath& workingDirPath,
const TString& tableName,
const TString& streamName);

void DoAlterStream(
const NKikimrSchemeOp::TAlterCdcStream& op,
const TOperationId& opId,
const TPath& workingDirPath,
const TPath& tablePath,
TVector<ISubOperation::TPtr>& result);

} // namespace NKikimr::NSchemesShard::NCdc
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
#include "schemeshard__operation_part.h"
#include "schemeshard__operation_common.h"
#include "schemeshard_impl.h"

#include "schemeshard__operation_alter_cdc_stream.h"

#include <ydb/core/tx/schemeshard/backup/constants.h>

#include <ydb/core/engine/mkql_proto.h>
#include <ydb/core/scheme/scheme_types_proto.h>

namespace NKikimr::NSchemeShard {

TVector<ISubOperation::TPtr> CreateAlterContinuousBackup(TOperationId opId, const TTxTransaction& tx, TOperationContext& context) {
Y_ABORT_UNLESS(tx.GetOperationType() == NKikimrSchemeOp::EOperationType::ESchemeOpAlterContinuousBackup);

const auto workingDirPath = TPath::Resolve(tx.GetWorkingDir(), context.SS);
const auto& cbOp = tx.GetAlterContinuousBackup();
const auto& tableName = cbOp.GetTableName();

const auto checksResult = NCdc::DoAlterStreamPathChecks(opId, workingDirPath, tableName, NBackup::CB_CDC_STREAM_NAME);
if (std::holds_alternative<ISubOperation::TPtr>(checksResult)) {
return {std::get<ISubOperation::TPtr>(checksResult)};
}

const auto [tablePath, streamPath] = std::get<NCdc::TStreamPaths>(checksResult);

TString errStr;
if (!context.SS->CheckApplyIf(tx, errStr)) {
return {CreateReject(opId, NKikimrScheme::StatusPreconditionFailed, errStr)};
}

if (!context.SS->CheckLocks(tablePath.Base()->PathId, tx, errStr)) {
return {CreateReject(opId, NKikimrScheme::StatusMultipleModifications, errStr)};
}

NKikimrSchemeOp::TAlterCdcStream alterCdcStreamOp;
alterCdcStreamOp.SetTableName(tableName);
alterCdcStreamOp.SetStreamName(NBackup::CB_CDC_STREAM_NAME);

switch (cbOp.GetActionCase()) {
case NKikimrSchemeOp::TAlterContinuousBackup::kStop:
alterCdcStreamOp.MutableDisable();
break;
default:
return {CreateReject(opId, NKikimrScheme::StatusInvalidParameter, TStringBuilder()
<< "Unknown action: " << static_cast<ui32>(cbOp.GetActionCase()))};
}

TVector<ISubOperation::TPtr> result;

NCdc::DoAlterStream(alterCdcStreamOp, opId, workingDirPath, tablePath, result);

return result;
}

} // namespace NKikimr::NSchemeShard
Loading