Skip to content

Commit 7f0ccd9

Browse files
authored
[backups] add incremetnal backup to offload config (#5236)
1 parent 5f0b6cc commit 7f0ccd9

File tree

6 files changed

+67
-12
lines changed

6 files changed

+67
-12
lines changed

ydb/core/protos/pqconfig.proto

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,13 @@ import "ydb/public/api/protos/annotations/sensitive.proto";
55

66
import "ydb/core/protos/base.proto";
77
import "ydb/core/protos/msgbus_kv.proto";
8-
import "ydb/core/protos/node_limits.proto";
98
import "ydb/core/protos/netclassifier.proto";
10-
import "ydb/library/services/services.proto";
9+
import "ydb/core/protos/node_limits.proto";
10+
import "ydb/core/scheme/protos/pathid.proto";
1111
import "ydb/core/scheme/protos/type_info.proto";
1212

1313
import "ydb/library/actors/protos/actors.proto";
14-
14+
import "ydb/library/services/services.proto";
1515

1616
package NKikimrPQ;
1717
option java_package = "ru.yandex.kikimr.proto";
@@ -306,7 +306,14 @@ enum EConsumerScalingSupport {
306306
}
307307

308308
message TOffloadConfig {
309+
message TIncrementalBackup {
310+
optional string DstPath = 1;
311+
optional NKikimrProto.TPathID DstPathId = 2;
312+
}
309313

314+
oneof Strategy {
315+
TIncrementalBackup IncrementalBackup = 1;
316+
}
310317
}
311318

312319
message TPQTabletConfig {

ydb/core/tx/schemeshard/schemeshard__operation_alter_continuous_backup.cpp

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111

1212
namespace NKikimr::NSchemeShard {
1313

14-
void DoAlterPqPart(const TOperationId& opId, const TPath& topicPath, TTopicInfo::TPtr topic, TVector<ISubOperation::TPtr>& result)
14+
void DoAlterPqPart(const TOperationId& opId, const TPath& tablePath, const TPath& topicPath, TTopicInfo::TPtr topic, TVector<ISubOperation::TPtr>& result)
1515
{
1616
auto outTx = TransactionTemplate(topicPath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpAlterPersQueueGroup);
1717
// outTx.SetFailOnExist(!acceptExisted);
@@ -30,7 +30,8 @@ void DoAlterPqPart(const TOperationId& opId, const TPath& topicPath, TTopicInfo:
3030
auto& pqConfig = *desc.MutablePQTabletConfig();
3131
pqConfig.CopyFrom(tabletConfig);
3232
pqConfig.ClearPartitionKeySchema();
33-
pqConfig.MutableOffloadConfig();
33+
auto& ib = *pqConfig.MutableOffloadConfig()->MutableIncrementalBackup();
34+
ib.SetDstPath(tablePath.PathString());
3435

3536
result.push_back(CreateAlterPQ(NextPartId(opId, result), outTx));
3637
}
@@ -45,7 +46,14 @@ void DoCreateIncBackupTable(const TOperationId& opId, const TPath& dst, NKikimrS
4546
desc.CopyFrom(tableDesc);
4647
desc.SetName(dst.LeafName());
4748

48-
auto col = desc.AddColumns();
49+
auto& replicationConfig = *desc.MutableReplicationConfig();
50+
replicationConfig.SetMode(NKikimrSchemeOp::TTableReplicationConfig::REPLICATION_MODE_READ_ONLY);
51+
replicationConfig.SetConsistency(NKikimrSchemeOp::TTableReplicationConfig::CONSISTENCY_WEAK);
52+
53+
// TODO: remove NotNull from all columns for correct deletion writing
54+
// TODO: cleanup all sequences
55+
56+
auto* col = desc.AddColumns();
4957
col->SetName("__incrBackupImpl_deleted");
5058
col->SetType("Bool");
5159

@@ -107,7 +115,7 @@ TVector<ISubOperation::TPtr> CreateAlterContinuousBackup(TOperationId opId, cons
107115

108116
if (cbOp.GetActionCase() == NKikimrSchemeOp::TAlterContinuousBackup::kTakeIncrementalBackup) {
109117
DoCreateIncBackupTable(opId, backupTablePath, schema, result);
110-
DoAlterPqPart(opId, topicPath, topic, result);
118+
DoAlterPqPart(opId, backupTablePath, topicPath, topic, result);
111119
}
112120

113121
return result;

ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,13 @@ class TAlterPQ: public TSubOperation {
184184
const TString databasePath = TPath::Init(context.SS->RootPathId(), context.SS).PathString();
185185
alterConfig.SetYdbDatabasePath(databasePath);
186186

187+
188+
if (alterConfig.HasOffloadConfig()) {
189+
// TODO: check validity
190+
auto* pathId = alterConfig.MutableOffloadConfig()->MutableIncrementalBackup()->MutableDstPathId();
191+
PathIdFromPathId(TPath::Resolve(alterConfig.GetOffloadConfig().GetIncrementalBackup().GetDstPath(), context.SS).Base()->PathId, pathId);
192+
}
193+
187194
alterConfig.MutablePartitionKeySchema()->Swap(tabletConfig->MutablePartitionKeySchema());
188195
Y_PROTOBUF_SUPPRESS_NODISCARD alterConfig.SerializeToString(&params->TabletConfig);
189196
alterConfig.Swap(tabletConfig);

ydb/core/tx/schemeshard/ut_continuous_backup/ut_continuous_backup.cpp

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,9 +103,21 @@ Y_UNIT_TEST_SUITE(TContinuousBackupTests) {
103103
)");
104104
env.TestWaitNotification(runtime, txId);
105105

106+
auto pathInfo = DescribePrivatePath(runtime, "/MyRoot/Table/incBackupImpl");
107+
auto ownerId = pathInfo.GetPathOwnerId();
108+
auto localId = pathInfo.GetPathId();
109+
106110
TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/continuousBackupImpl/streamImpl"), {
107111
NLs::PathExist,
108-
NLs::HasOffloadConfig,
112+
NLs::HasOffloadConfig(Sprintf(R"(
113+
IncrementalBackup: {
114+
DstPath: "/MyRoot/Table/incBackupImpl"
115+
DstPathId: {
116+
OwnerId: %)" PRIu64 R"(
117+
LocalId: %)" PRIu64 R"(
118+
}
119+
}
120+
)", ownerId, localId)),
109121
});
110122

111123
TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/incBackupImpl"), {

ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,10 @@
1313

1414
#include <library/cpp/testing/unittest/registar.h>
1515

16+
#include <contrib/libs/protobuf/src/google/protobuf/text_format.h>
17+
#include <contrib/libs/protobuf/src/google/protobuf/util/field_comparator.h>
18+
#include <contrib/libs/protobuf/src/google/protobuf/util/message_differencer.h>
19+
1620
namespace NSchemeShardUT_Private {
1721
namespace NLs {
1822

@@ -1251,9 +1255,22 @@ TCheckFunc ServerlessComputeResourcesMode(NKikimrSubDomains::EServerlessComputeR
12511255
};
12521256
}
12531257

1254-
void HasOffloadConfigBase(const NKikimrScheme::TEvDescribeSchemeResult& record, TInverseTag inverse) {
1258+
void HasOffloadConfigBase(const NKikimrScheme::TEvDescribeSchemeResult& record, const TString* const config, TInverseTag inverse) {
12551259
UNIT_ASSERT(inverse.Value xor record.GetPathDescription().GetPersQueueGroup()
12561260
.GetPQTabletConfig().HasOffloadConfig());
1261+
1262+
if (config) {
1263+
NKikimrPQ::TOffloadConfig expectedConfig;
1264+
1265+
bool parseResult = google::protobuf::TextFormat::ParseFromString(*config, &expectedConfig);
1266+
UNIT_ASSERT(parseResult);
1267+
1268+
google::protobuf::util::MessageDifferencer md;
1269+
auto fieldComparator = google::protobuf::util::DefaultFieldComparator();
1270+
md.set_field_comparator(&fieldComparator);
1271+
auto& givenConfig = record.GetPathDescription().GetPersQueueGroup().GetPQTabletConfig().GetOffloadConfig();
1272+
UNIT_ASSERT(md.Compare(expectedConfig, givenConfig));
1273+
}
12571274
}
12581275

12591276
#undef DESCRIBE_ASSERT_EQUAL

ydb/core/tx/schemeshard/ut_helpers/ls_checks.h

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -171,9 +171,13 @@ namespace NLs {
171171
bool Value = false;
172172
};
173173

174-
void HasOffloadConfigBase(const NKikimrScheme::TEvDescribeSchemeResult& record, TInverseTag inverse);
175-
inline void HasOffloadConfig(const NKikimrScheme::TEvDescribeSchemeResult& record) { return HasOffloadConfigBase(record, {}); };
176-
inline void HasNotOffloadConfig(const NKikimrScheme::TEvDescribeSchemeResult& record) { return HasOffloadConfigBase(record, {.Value = true}); };
174+
void HasOffloadConfigBase(const NKikimrScheme::TEvDescribeSchemeResult& record, const TString* const config, TInverseTag inverse);
175+
inline TCheckFunc HasOffloadConfig(TString config) {
176+
return [config] (const NKikimrScheme::TEvDescribeSchemeResult& record) {
177+
return HasOffloadConfigBase(record, config ? &config : nullptr, {});
178+
};
179+
}
180+
inline void HasNotOffloadConfig(const NKikimrScheme::TEvDescribeSchemeResult& record) { return HasOffloadConfigBase(record, nullptr, {.Value = true}); }
177181

178182
template<class TCheck>
179183
void PerformAllChecks(const NKikimrScheme::TEvDescribeSchemeResult& result, TCheck&& check) {

0 commit comments

Comments
 (0)