Skip to content

Commit 229f3db

Browse files
Merge 08a45a1 into ee5ceb4
2 parents ee5ceb4 + 08a45a1 commit 229f3db

File tree

8 files changed

+155
-17
lines changed

8 files changed

+155
-17
lines changed

ydb/core/protos/flat_scheme_op.proto

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2102,3 +2102,7 @@ message TImportTableChangefeeds {
21022102
}
21032103
repeated TImportChangefeedTopic Changefeeds = 1;
21042104
}
2105+
2106+
message TChangefeedUnderlyingTopics {
2107+
repeated TPathDescription ChangefeedUnderlyingTopics = 1;
2108+
}

ydb/core/tx/datashard/export_s3_uploader.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,7 @@ class TS3Uploader: public TActorBootstrapped<TS3Uploader> {
241241
}
242242

243243
void UploadChangefeed() {
244+
Y_ABORT_UNLESS(!ChangefeedsUploaded);
244245
if (IndexExportedChangefeed == Changefeeds.size()) {
245246
ChangefeedsUploaded = true;
246247
if (Scanner) {

ydb/core/tx/schemeshard/schemeshard__init.cpp

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -805,7 +805,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
805805
return true;
806806
}
807807

808-
typedef std::tuple<TPathId, TString, TString, TString, TString, bool, TString, ui32, bool, bool> TBackupSettingsRec;
808+
typedef std::tuple<TPathId, TString, TString, TString, TString, bool, TString, ui32, bool, bool, TString> TBackupSettingsRec;
809809
typedef TDeque<TBackupSettingsRec> TBackupSettingsRows;
810810

811811
template <typename SchemaTable, typename TRowSet>
@@ -819,7 +819,8 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
819819
rowSet.template GetValueOrDefault<typename SchemaTable::TableDescription>(""),
820820
rowSet.template GetValueOrDefault<typename SchemaTable::NumberOfRetries>(0),
821821
rowSet.template GetValueOrDefault<typename SchemaTable::EnableChecksums>(false),
822-
rowSet.template GetValueOrDefault<typename SchemaTable::EnablePermissions>(false)
822+
rowSet.template GetValueOrDefault<typename SchemaTable::EnablePermissions>(false),
823+
rowSet.template GetValueOrDefault<typename SchemaTable::ChangefeedUnderlyingTopics>("")
823824
);
824825
}
825826

@@ -3804,6 +3805,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
38043805
ui32 nRetries = std::get<7>(rec);
38053806
bool enableChecksums = std::get<8>(rec);
38063807
bool enablePermissions = std::get<9>(rec);
3808+
TString changefeedUnderlyingTopics = std::get<10>(rec);
38073809

38083810
Y_ABORT_UNLESS(tableName.size() > 0);
38093811

@@ -3835,6 +3837,14 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
38353837
auto desc = tableInfo->BackupSettings.MutableTable();
38363838
Y_ABORT_UNLESS(ParseFromStringNoSizeLimit(*desc, tableDesc));
38373839
}
3840+
3841+
if (changefeedUnderlyingTopics) {
3842+
NKikimrSchemeOp::TChangefeedUnderlyingTopics wrapperOverTopics;
3843+
Y_ABORT_UNLESS(ParseFromStringNoSizeLimit(wrapperOverTopics, changefeedUnderlyingTopics));
3844+
for (const auto& topic : wrapperOverTopics.GetChangefeedUnderlyingTopics()) {
3845+
*tableInfo->BackupSettings.AddChangefeedUnderlyingTopics() = topic;
3846+
}
3847+
}
38383848

38393849
LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "Loaded backup settings"
38403850
<< ", pathId: " << pathId

ydb/core/tx/schemeshard/schemeshard_impl.cpp

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3362,6 +3362,16 @@ void TSchemeShard::PersistTxShardStatus(NIceDb::TNiceDb& db, TOperationId opId,
33623362
);
33633363
}
33643364

3365+
NKikimrSchemeOp::TChangefeedUnderlyingTopics PutRepeatedFieldInOneMessage(
3366+
const google::protobuf::RepeatedPtrField<NKikimrSchemeOp::TPathDescription>& changefeedUnderlyingTopics
3367+
) {
3368+
NKikimrSchemeOp::TChangefeedUnderlyingTopics result;
3369+
for (const auto& x : changefeedUnderlyingTopics) {
3370+
*result.AddChangefeedUnderlyingTopics() = x;
3371+
}
3372+
return result;
3373+
}
3374+
33653375
void TSchemeShard::PersistBackupSettings(
33663376
NIceDb::TNiceDb& db,
33673377
TPathId pathId,
@@ -3376,6 +3386,7 @@ void TSchemeShard::PersistBackupSettings(
33763386
NIceDb::TUpdate<Schema::BackupSettings::ScanSettings>(settings.GetScanSettings().SerializeAsString()), \
33773387
NIceDb::TUpdate<Schema::BackupSettings::NeedToBill>(settings.GetNeedToBill()), \
33783388
NIceDb::TUpdate<Schema::BackupSettings::TableDescription>(settings.GetTable().SerializeAsString()), \
3389+
NIceDb::TUpdate<Schema::BackupSettings::ChangefeedUnderlyingTopics>(PutRepeatedFieldInOneMessage(settings.GetChangefeedUnderlyingTopics()).SerializeAsString()), \
33793390
NIceDb::TUpdate<Schema::BackupSettings::NumberOfRetries>(settings.GetNumberOfRetries()), \
33803391
NIceDb::TUpdate<Schema::BackupSettings::EnableChecksums>(settings.GetEnableChecksums()), \
33813392
NIceDb::TUpdate<Schema::BackupSettings::EnablePermissions>(settings.GetEnablePermissions())); \
@@ -3386,6 +3397,7 @@ void TSchemeShard::PersistBackupSettings(
33863397
NIceDb::TUpdate<Schema::MigratedBackupSettings::ScanSettings>(settings.GetScanSettings().SerializeAsString()), \
33873398
NIceDb::TUpdate<Schema::MigratedBackupSettings::NeedToBill>(settings.GetNeedToBill()), \
33883399
NIceDb::TUpdate<Schema::MigratedBackupSettings::TableDescription>(settings.GetTable().SerializeAsString()), \
3400+
NIceDb::TUpdate<Schema::MigratedBackupSettings::ChangefeedUnderlyingTopics>(PutRepeatedFieldInOneMessage(settings.GetChangefeedUnderlyingTopics()).SerializeAsString()), \
33893401
NIceDb::TUpdate<Schema::MigratedBackupSettings::NumberOfRetries>(settings.GetNumberOfRetries()), \
33903402
NIceDb::TUpdate<Schema::MigratedBackupSettings::EnableChecksums>(settings.GetEnableChecksums()), \
33913403
NIceDb::TUpdate<Schema::MigratedBackupSettings::EnablePermissions>(settings.GetEnablePermissions())); \

ydb/core/tx/schemeshard/schemeshard_schema.h

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -583,6 +583,7 @@ struct Schema : NIceDb::Schema {
583583
struct YTSettings : Column<3, NScheme::NTypeIds::String> {};
584584
struct S3Settings : Column<6, NScheme::NTypeIds::String> {};
585585
struct TableDescription : Column<7, NScheme::NTypeIds::String> {};
586+
struct ChangefeedUnderlyingTopics : Column<13, NScheme::NTypeIds::String> {};
586587
struct NumberOfRetries : Column<8, NScheme::NTypeIds::Uint32> {};
587588
struct ScanSettings : Column<9, NScheme::NTypeIds::String> {};
588589
struct NeedToBill : Column<10, NScheme::NTypeIds::Bool> {};
@@ -605,7 +606,8 @@ struct Schema : NIceDb::Schema {
605606
ScanSettings,
606607
NeedToBill,
607608
EnableChecksums,
608-
EnablePermissions
609+
EnablePermissions,
610+
ChangefeedUnderlyingTopics
609611
>;
610612
};
611613

@@ -617,6 +619,7 @@ struct Schema : NIceDb::Schema {
617619
struct YTSettings : Column<4, NScheme::NTypeIds::String> {};
618620
struct S3Settings : Column<7, NScheme::NTypeIds::String> {};
619621
struct TableDescription : Column<8, NScheme::NTypeIds::String> {};
622+
struct ChangefeedUnderlyingTopics : Column<14, NScheme::NTypeIds::String> {};
620623
struct NumberOfRetries : Column<9, NScheme::NTypeIds::Uint32> {};
621624
struct ScanSettings : Column<10, NScheme::NTypeIds::String> {};
622625
struct NeedToBill : Column<11, NScheme::NTypeIds::Bool> {};
@@ -640,7 +643,8 @@ struct Schema : NIceDb::Schema {
640643
ScanSettings,
641644
NeedToBill,
642645
EnableChecksums,
643-
EnablePermissions
646+
EnablePermissions,
647+
ChangefeedUnderlyingTopics
644648
>;
645649
};
646650

ydb/core/tx/schemeshard/ut_export_reboots_s3/ut_export_reboots_s3.cpp

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -509,4 +509,83 @@ Y_UNIT_TEST_SUITE(TExportToS3WithRebootsTests) {
509509
}
510510
)");
511511
}
512+
513+
class TestData {
514+
public:
515+
static const TTypedScheme& Table() {
516+
return TableScheme;
517+
}
518+
519+
static const TTypedScheme& Changefeed() {
520+
return ChangefeedScheme;
521+
}
522+
523+
static const TString& Request() {
524+
return RequestString;
525+
}
526+
527+
private:
528+
static const char* TableName;
529+
static const TTypedScheme TableScheme;
530+
static const TTypedScheme ChangefeedScheme;
531+
static const TString RequestString;
532+
533+
};
534+
535+
const char* TestData::TableName = "Table";
536+
537+
const TTypedScheme TestData::TableScheme = TTypedScheme {
538+
EPathTypeTable,
539+
Sprintf(R"(
540+
Name: "%s"
541+
Columns { Name: "key" Type: "Utf8" }
542+
Columns { Name: "value" Type: "Utf8" }
543+
KeyColumnNames: ["key"]
544+
)", TableName)
545+
};
546+
547+
const TTypedScheme TestData::ChangefeedScheme = TTypedScheme {
548+
EPathTypeCdcStream,
549+
Sprintf(R"(
550+
TableName: "%s"
551+
StreamDescription {
552+
Name: "update_feed"
553+
Mode: ECdcStreamModeUpdate
554+
Format: ECdcStreamFormatJson
555+
State: ECdcStreamStateReady
556+
}
557+
)", TableName)
558+
};
559+
560+
const TString TestData::RequestString = R"(
561+
ExportToS3Settings {
562+
endpoint: "localhost:%d"
563+
scheme: HTTP
564+
items {
565+
source_path: "/MyRoot/Table"
566+
destination_prefix: ""
567+
}
568+
}
569+
)";
570+
571+
Y_UNIT_TEST(ShouldSucceedOnSingleShardTableWithChangefeed) {
572+
RunS3({
573+
TestData::Table(),
574+
TestData::Changefeed()
575+
}, TestData::Request());
576+
}
577+
578+
Y_UNIT_TEST(CancelOnSingleShardTableWithChangefeed) {
579+
CancelS3({
580+
TestData::Table(),
581+
TestData::Changefeed()
582+
}, TestData::Request());
583+
}
584+
585+
Y_UNIT_TEST(ForgetShouldSucceedOnSingleShardTableWithChangefeed) {
586+
ForgetS3({
587+
TestData::Table(),
588+
TestData::Changefeed()
589+
}, TestData::Request());
590+
}
512591
}

ydb/core/tx/schemeshard/ut_helpers/export_reboots_common.cpp

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -11,20 +11,36 @@ using namespace NKikimrSchemeOp;
1111
namespace NSchemeShardUT_Private {
1212
namespace NExportReboots {
1313

14+
using TCreateHandler = std::function<void(TTestActorRuntime&, ui64&, const TString&, const TString&)>;
15+
16+
#define GEN_CREATE_HANDLER(func) \
17+
TCreateHandler([](TTestActorRuntime& runtime, ui64& txId, const TString& parentPath, const TString& scheme) { \
18+
func(runtime, txId, parentPath, scheme); \
19+
})
20+
21+
THashMap<NKikimrSchemeOp::EPathType, TCreateHandler> CreateHandlers = {
22+
{EPathTypeTable, GEN_CREATE_HANDLER(TestCreateTable)},
23+
{EPathTypeView, GEN_CREATE_HANDLER(TestCreateView)},
24+
{EPathTypeCdcStream, GEN_CREATE_HANDLER(TestCreateCdcStream)},
25+
};
26+
27+
void TestCreate(
28+
TTestActorRuntime& runtime,
29+
ui64& txId,
30+
const TString& scheme,
31+
NKikimrSchemeOp::EPathType pathType
32+
) {
33+
if (CreateHandlers.contains(pathType)) {
34+
CreateHandlers[pathType](runtime, txId, "/MyRoot", scheme);
35+
} else {
36+
UNIT_FAIL("export is not implemented for the scheme object type: " << pathType);
37+
}
38+
}
39+
1440
void CreateSchemeObjects(TTestWithReboots& t, TTestActorRuntime& runtime, const TVector<TTypedScheme>& schemeObjects) {
1541
TSet<ui64> toWait;
1642
for (const auto& [type, scheme, _] : schemeObjects) {
17-
switch (type) {
18-
case EPathTypeTable:
19-
TestCreateTable(runtime, ++t.TxId, "/MyRoot", scheme);
20-
break;
21-
case EPathTypeView:
22-
TestCreateView(runtime, ++t.TxId, "/MyRoot", scheme);
23-
break;
24-
default:
25-
UNIT_FAIL("export is not implemented for the scheme object type: " << type);
26-
return;
27-
}
43+
TestCreate(runtime, ++t.TxId, scheme, type);
2844
toWait.insert(t.TxId);
2945
}
3046
t.TestEnv->TestWaitNotification(runtime, toWait);

ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1179,6 +1179,11 @@
11791179
"ColumnId": 12,
11801180
"ColumnName": "EnablePermissions",
11811181
"ColumnType": "Bool"
1182+
},
1183+
{
1184+
"ColumnId": 13,
1185+
"ColumnName": "ChangefeedUnderlyingTopics",
1186+
"ColumnType": "String"
11821187
}
11831188
],
11841189
"ColumnsDropped": [],
@@ -1196,7 +1201,8 @@
11961201
9,
11971202
10,
11981203
11,
1199-
12
1204+
12,
1205+
13
12001206
],
12011207
"RoomID": 0,
12021208
"Codec": 0,
@@ -4577,6 +4583,11 @@
45774583
"ColumnId": 13,
45784584
"ColumnName": "EnablePermissions",
45794585
"ColumnType": "Bool"
4586+
},
4587+
{
4588+
"ColumnId": 14,
4589+
"ColumnName": "ChangefeedUnderlyingTopics",
4590+
"ColumnType": "String"
45804591
}
45814592
],
45824593
"ColumnsDropped": [],
@@ -4595,7 +4606,8 @@
45954606
10,
45964607
11,
45974608
12,
4598-
13
4609+
13,
4610+
14
45994611
],
46004612
"RoomID": 0,
46014613
"Codec": 0,

0 commit comments

Comments
 (0)