Skip to content

Commit 46a50cb

Browse files
authored
Merge c3d5f34 into b3c26e4
2 parents b3c26e4 + c3d5f34 commit 46a50cb

File tree

6 files changed

+98
-1
lines changed

6 files changed

+98
-1
lines changed

ydb/core/protos/flat_scheme_op.proto

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -926,9 +926,12 @@ message TAlterContinuousBackup {
926926
message TStop {
927927
}
928928

929+
message TTakeIncrementalBackup {
930+
}
931+
929932
oneof Action {
930933
TStop Stop = 2;
931-
// TODO(innokentii): something like TakeIncremental
934+
TTakeIncrementalBackup TakeIncrementalBackup = 3;
932935
}
933936
}
934937

ydb/core/protos/pqconfig.proto

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -305,6 +305,10 @@ enum EConsumerScalingSupport {
305305
FULL_SUPPORT = 3;
306306
}
307307

308+
message TOffloadConfig {
309+
310+
}
311+
308312
message TPQTabletConfig {
309313
optional uint64 CacheSize = 1 [default = 104857600]; //100Mb, per tablet
310314
optional TPartitionConfig PartitionConfig = 2; //mandatory
@@ -410,6 +414,8 @@ message TPQTabletConfig {
410414
optional TPartitionStrategy PartitionStrategy = 35;
411415

412416
repeated TPartition AllPartitions = 36; // filled by schemeshard
417+
418+
optional TOffloadConfig OffloadConfig = 38;
413419
}
414420

415421
message THeartbeat {

ydb/core/tx/schemeshard/schemeshard__operation_alter_continuous_backup.cpp

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,30 @@
1111

1212
namespace NKikimr::NSchemeShard {
1313

14+
void DoAlterPqPart(const TOperationId& opId, const TPath& topicPath, TTopicInfo::TPtr topic, TVector<ISubOperation::TPtr>& result)
15+
{
16+
auto outTx = TransactionTemplate(topicPath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpAlterPersQueueGroup);
17+
// outTx.SetFailOnExist(!acceptExisted);
18+
19+
outTx.SetAllowAccessToPrivatePaths(true);
20+
21+
auto& desc = *outTx.MutableAlterPersQueueGroup();
22+
desc.SetPathId(topicPath.Base()->PathId.LocalPathId);
23+
24+
NKikimrPQ::TPQTabletConfig tabletConfig;
25+
if (!topic->TabletConfig.empty()) {
26+
bool parseOk = ParseFromStringNoSizeLimit(tabletConfig, topic->TabletConfig);
27+
Y_ABORT_UNLESS(parseOk, "Previously serialized pq tablet config cannot be parsed");
28+
}
29+
30+
auto& pqConfig = *desc.MutablePQTabletConfig();
31+
pqConfig.CopyFrom(tabletConfig);
32+
pqConfig.ClearPartitionKeySchema();
33+
pqConfig.MutableOffloadConfig();
34+
35+
result.push_back(CreateAlterPQ(NextPartId(opId, result), outTx));
36+
}
37+
1438
TVector<ISubOperation::TPtr> CreateAlterContinuousBackup(TOperationId opId, const TTxTransaction& tx, TOperationContext& context) {
1539
Y_ABORT_UNLESS(tx.GetOperationType() == NKikimrSchemeOp::EOperationType::ESchemeOpAlterContinuousBackup);
1640

@@ -24,6 +48,8 @@ TVector<ISubOperation::TPtr> CreateAlterContinuousBackup(TOperationId opId, cons
2448
}
2549

2650
const auto [tablePath, streamPath] = std::get<NCdc::TStreamPaths>(checksResult);
51+
const auto topicPath = streamPath.Child("streamImpl");
52+
TTopicInfo::TPtr topic = context.SS->Topics.at(topicPath.Base()->PathId);
2753

2854
TString errStr;
2955
if (!context.SS->CheckApplyIf(tx, errStr)) {
@@ -40,6 +66,7 @@ TVector<ISubOperation::TPtr> CreateAlterContinuousBackup(TOperationId opId, cons
4066

4167
switch (cbOp.GetActionCase()) {
4268
case NKikimrSchemeOp::TAlterContinuousBackup::kStop:
69+
case NKikimrSchemeOp::TAlterContinuousBackup::kTakeIncrementalBackup:
4370
alterCdcStreamOp.MutableDisable();
4471
break;
4572
default:
@@ -51,6 +78,10 @@ TVector<ISubOperation::TPtr> CreateAlterContinuousBackup(TOperationId opId, cons
5178

5279
NCdc::DoAlterStream(alterCdcStreamOp, opId, workingDirPath, tablePath, result);
5380

81+
if (cbOp.GetActionCase() == NKikimrSchemeOp::TAlterContinuousBackup::kTakeIncrementalBackup) {
82+
DoAlterPqPart(opId, topicPath, topic, result);
83+
}
84+
5485
return result;
5586
}
5687

ydb/core/tx/schemeshard/ut_continuous_backup/ut_continuous_backup.cpp

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,4 +64,48 @@ Y_UNIT_TEST_SUITE(TContinuousBackupTests) {
6464
TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/continuousBackupImpl"), {NLs::PathNotExist});
6565
TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/continuousBackupImpl/streamImpl"), {NLs::PathNotExist});
6666
}
67+
68+
Y_UNIT_TEST(TakeIncrementalBackup) {
69+
TTestBasicRuntime runtime;
70+
TTestEnv env(runtime, TTestEnvOptions().EnableProtoSourceIdInfo(true));
71+
ui64 txId = 100;
72+
73+
TestCreateTable(runtime, ++txId, "/MyRoot", R"(
74+
Name: "Table"
75+
Columns { Name: "key" Type: "Uint64" }
76+
Columns { Name: "value" Type: "Uint64" }
77+
KeyColumnNames: ["key"]
78+
)");
79+
env.TestWaitNotification(runtime, txId);
80+
81+
TestCreateContinuousBackup(runtime, ++txId, "/MyRoot", R"(
82+
TableName: "Table"
83+
ContinuousBackupDescription {
84+
}
85+
)");
86+
env.TestWaitNotification(runtime, txId);
87+
88+
TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/continuousBackupImpl"), {
89+
NLs::PathExist,
90+
NLs::StreamMode(NKikimrSchemeOp::ECdcStreamModeUpdate),
91+
NLs::StreamFormat(NKikimrSchemeOp::ECdcStreamFormatProto),
92+
NLs::StreamState(NKikimrSchemeOp::ECdcStreamStateReady),
93+
NLs::StreamVirtualTimestamps(false),
94+
});
95+
TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/continuousBackupImpl/streamImpl"), {
96+
NLs::PathExist,
97+
NLs::HasNotOffloadConfig,
98+
});
99+
100+
TestAlterContinuousBackup(runtime, ++txId, "/MyRoot", R"(
101+
TableName: "Table"
102+
TakeIncrementalBackup {}
103+
)");
104+
env.TestWaitNotification(runtime, txId);
105+
106+
TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/continuousBackupImpl/streamImpl"), {
107+
NLs::PathExist,
108+
NLs::HasOffloadConfig,
109+
});
110+
}
67111
} // TCdcStreamWithInitialScanTests

ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1251,6 +1251,11 @@ TCheckFunc ServerlessComputeResourcesMode(NKikimrSubDomains::EServerlessComputeR
12511251
};
12521252
}
12531253

1254+
void HasOffloadConfigBase(const NKikimrScheme::TEvDescribeSchemeResult& record, TInverseTag inverse) {
1255+
UNIT_ASSERT(inverse.Value xor record.GetPathDescription().GetPersQueueGroup()
1256+
.GetPQTabletConfig().HasOffloadConfig());
1257+
}
1258+
12541259
#undef DESCRIBE_ASSERT_EQUAL
12551260
#undef DESCRIBE_ASSERT_GE
12561261
#undef DESCRIBE_ASSERT

ydb/core/tx/schemeshard/ut_helpers/ls_checks.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,14 @@ namespace NLs {
167167
TCheckFunc SharedHive(ui64 sharedHiveId);
168168
TCheckFunc ServerlessComputeResourcesMode(NKikimrSubDomains::EServerlessComputeResourcesMode serverlessComputeResourcesMode);
169169

170+
struct TInverseTag {
171+
bool Value = false;
172+
};
173+
174+
void HasOffloadConfigBase(const NKikimrScheme::TEvDescribeSchemeResult& record, TInverseTag inverse);
175+
inline void HasOffloadConfig(const NKikimrScheme::TEvDescribeSchemeResult& record) { return HasOffloadConfigBase(record, {}); };
176+
inline void HasNotOffloadConfig(const NKikimrScheme::TEvDescribeSchemeResult& record) { return HasOffloadConfigBase(record, {.Value = true}); };
177+
170178
template<class TCheck>
171179
void PerformAllChecks(const NKikimrScheme::TEvDescribeSchemeResult& result, TCheck&& check) {
172180
check(result);

0 commit comments

Comments
 (0)