Skip to content

Commit b0023e7

Browse files
Merge 40bea22 into 152834e
2 parents 152834e + 40bea22 commit b0023e7

File tree

10 files changed

+148
-17
lines changed

10 files changed

+148
-17
lines changed

ydb/core/protos/flat_scheme_op.proto

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2136,3 +2136,7 @@ message TImportTableChangefeeds {
21362136
}
21372137
repeated TImportChangefeedTopic Changefeeds = 1;
21382138
}
2139+
2140+
message TChangefeedUnderlyingTopics {
2141+
repeated TPathDescription ChangefeedUnderlyingTopics = 1;
2142+
}

ydb/core/tx/datashard/export_s3_uploader.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,7 @@ class TS3Uploader: public TActorBootstrapped<TS3Uploader> {
241241
}
242242

243243
void UploadChangefeed() {
244+
Y_ABORT_UNLESS(!ChangefeedsUploaded);
244245
if (IndexExportedChangefeed == Changefeeds.size()) {
245246
ChangefeedsUploaded = true;
246247
if (Scanner) {

ydb/core/tx/schemeshard/schemeshard__init.cpp

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -804,7 +804,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
804804
return true;
805805
}
806806

807-
typedef std::tuple<TPathId, TString, TString, TString, TString, bool, TString, ui32, bool, bool> TBackupSettingsRec;
807+
typedef std::tuple<TPathId, TString, TString, TString, TString, bool, TString, ui32, bool, bool, TString> TBackupSettingsRec;
808808
typedef TDeque<TBackupSettingsRec> TBackupSettingsRows;
809809

810810
template <typename SchemaTable, typename TRowSet>
@@ -818,7 +818,8 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
818818
rowSet.template GetValueOrDefault<typename SchemaTable::TableDescription>(""),
819819
rowSet.template GetValueOrDefault<typename SchemaTable::NumberOfRetries>(0),
820820
rowSet.template GetValueOrDefault<typename SchemaTable::EnableChecksums>(false),
821-
rowSet.template GetValueOrDefault<typename SchemaTable::EnablePermissions>(false)
821+
rowSet.template GetValueOrDefault<typename SchemaTable::EnablePermissions>(false),
822+
rowSet.template GetValueOrDefault<typename SchemaTable::ChangefeedUnderlyingTopics>("")
822823
);
823824
}
824825

@@ -3803,6 +3804,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
38033804
ui32 nRetries = std::get<7>(rec);
38043805
bool enableChecksums = std::get<8>(rec);
38053806
bool enablePermissions = std::get<9>(rec);
3807+
TString changefeedUnderlyingTopics = std::get<10>(rec);
38063808

38073809
Y_ABORT_UNLESS(tableName.size() > 0);
38083810

@@ -3834,6 +3836,14 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
38343836
auto desc = tableInfo->BackupSettings.MutableTable();
38353837
Y_ABORT_UNLESS(ParseFromStringNoSizeLimit(*desc, tableDesc));
38363838
}
3839+
3840+
if (changefeedUnderlyingTopics) {
3841+
NKikimrSchemeOp::TChangefeedUnderlyingTopics wrapperOverTopics;
3842+
Y_ABORT_UNLESS(ParseFromStringNoSizeLimit(wrapperOverTopics, changefeedUnderlyingTopics));
3843+
for (const auto& topic : wrapperOverTopics.GetChangefeedUnderlyingTopics()) {
3844+
*tableInfo->BackupSettings.AddChangefeedUnderlyingTopics() = topic;
3845+
}
3846+
}
38373847

38383848
LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "Loaded backup settings"
38393849
<< ", pathId: " << pathId

ydb/core/tx/schemeshard/schemeshard_impl.cpp

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3361,6 +3361,16 @@ void TSchemeShard::PersistTxShardStatus(NIceDb::TNiceDb& db, TOperationId opId,
33613361
);
33623362
}
33633363

3364+
NKikimrSchemeOp::TChangefeedUnderlyingTopics ConvertChangefeedUnderlyingTopics(
3365+
const google::protobuf::RepeatedPtrField<NKikimrSchemeOp::TPathDescription>& changefeedUnderlyingTopics
3366+
) {
3367+
NKikimrSchemeOp::TChangefeedUnderlyingTopics result;
3368+
for (const auto& x : changefeedUnderlyingTopics) {
3369+
*result.AddChangefeedUnderlyingTopics() = x;
3370+
}
3371+
return result;
3372+
}
3373+
33643374
void TSchemeShard::PersistBackupSettings(
33653375
NIceDb::TNiceDb& db,
33663376
TPathId pathId,
@@ -3375,6 +3385,7 @@ void TSchemeShard::PersistBackupSettings(
33753385
NIceDb::TUpdate<Schema::BackupSettings::ScanSettings>(settings.GetScanSettings().SerializeAsString()), \
33763386
NIceDb::TUpdate<Schema::BackupSettings::NeedToBill>(settings.GetNeedToBill()), \
33773387
NIceDb::TUpdate<Schema::BackupSettings::TableDescription>(settings.GetTable().SerializeAsString()), \
3388+
NIceDb::TUpdate<Schema::BackupSettings::ChangefeedUnderlyingTopics>(ConvertChangefeedUnderlyingTopics(settings.GetChangefeedUnderlyingTopics()).SerializeAsString()), \
33783389
NIceDb::TUpdate<Schema::BackupSettings::NumberOfRetries>(settings.GetNumberOfRetries()), \
33793390
NIceDb::TUpdate<Schema::BackupSettings::EnableChecksums>(settings.GetEnableChecksums()), \
33803391
NIceDb::TUpdate<Schema::BackupSettings::EnablePermissions>(settings.GetEnablePermissions())); \
@@ -3385,6 +3396,7 @@ void TSchemeShard::PersistBackupSettings(
33853396
NIceDb::TUpdate<Schema::MigratedBackupSettings::ScanSettings>(settings.GetScanSettings().SerializeAsString()), \
33863397
NIceDb::TUpdate<Schema::MigratedBackupSettings::NeedToBill>(settings.GetNeedToBill()), \
33873398
NIceDb::TUpdate<Schema::MigratedBackupSettings::TableDescription>(settings.GetTable().SerializeAsString()), \
3399+
NIceDb::TUpdate<Schema::MigratedBackupSettings::ChangefeedUnderlyingTopics>(ConvertChangefeedUnderlyingTopics(settings.GetChangefeedUnderlyingTopics()).SerializeAsString()), \
33883400
NIceDb::TUpdate<Schema::MigratedBackupSettings::NumberOfRetries>(settings.GetNumberOfRetries()), \
33893401
NIceDb::TUpdate<Schema::MigratedBackupSettings::EnableChecksums>(settings.GetEnableChecksums()), \
33903402
NIceDb::TUpdate<Schema::MigratedBackupSettings::EnablePermissions>(settings.GetEnablePermissions())); \

ydb/core/tx/schemeshard/schemeshard_schema.h

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -583,6 +583,7 @@ struct Schema : NIceDb::Schema {
583583
struct YTSettings : Column<3, NScheme::NTypeIds::String> {};
584584
struct S3Settings : Column<6, NScheme::NTypeIds::String> {};
585585
struct TableDescription : Column<7, NScheme::NTypeIds::String> {};
586+
struct ChangefeedUnderlyingTopics : Column<13, NScheme::NTypeIds::String> {};
586587
struct NumberOfRetries : Column<8, NScheme::NTypeIds::Uint32> {};
587588
struct ScanSettings : Column<9, NScheme::NTypeIds::String> {};
588589
struct NeedToBill : Column<10, NScheme::NTypeIds::Bool> {};
@@ -605,7 +606,8 @@ struct Schema : NIceDb::Schema {
605606
ScanSettings,
606607
NeedToBill,
607608
EnableChecksums,
608-
EnablePermissions
609+
EnablePermissions,
610+
ChangefeedUnderlyingTopics
609611
>;
610612
};
611613

@@ -617,6 +619,7 @@ struct Schema : NIceDb::Schema {
617619
struct YTSettings : Column<4, NScheme::NTypeIds::String> {};
618620
struct S3Settings : Column<7, NScheme::NTypeIds::String> {};
619621
struct TableDescription : Column<8, NScheme::NTypeIds::String> {};
622+
struct ChangefeedUnderlyingTopics : Column<14, NScheme::NTypeIds::String> {};
620623
struct NumberOfRetries : Column<9, NScheme::NTypeIds::Uint32> {};
621624
struct ScanSettings : Column<10, NScheme::NTypeIds::String> {};
622625
struct NeedToBill : Column<11, NScheme::NTypeIds::Bool> {};
@@ -640,7 +643,8 @@ struct Schema : NIceDb::Schema {
640643
ScanSettings,
641644
NeedToBill,
642645
EnableChecksums,
643-
EnablePermissions
646+
EnablePermissions,
647+
ChangefeedUnderlyingTopics
644648
>;
645649
};
646650

ydb/core/tx/schemeshard/ut_export_reboots_s3/ut_export_reboots_s3.cpp

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -509,4 +509,82 @@ Y_UNIT_TEST_SUITE(TExportToS3WithRebootsTests) {
509509
}
510510
)");
511511
}
512+
513+
class TestData {
514+
public:
515+
static const TTypedScheme& Table() {
516+
return TableScheme;
517+
}
518+
519+
static const TTypedScheme& Changefeed() {
520+
return ChangefeedScheme;
521+
}
522+
523+
static const TString& Request() {
524+
return RequestString;
525+
}
526+
527+
private:
528+
static const char* TableName;
529+
static const TTypedScheme TableScheme;
530+
static const TTypedScheme ChangefeedScheme;
531+
static const TString RequestString;
532+
};
533+
534+
const char* TestData::TableName = "Table";
535+
536+
const TTypedScheme TestData::TableScheme = TTypedScheme {
537+
EPathTypeTable,
538+
Sprintf(R"(
539+
Name: "%s"
540+
Columns { Name: "key" Type: "Utf8" }
541+
Columns { Name: "value" Type: "Utf8" }
542+
KeyColumnNames: ["key"]
543+
)", TableName)
544+
};
545+
546+
const TTypedScheme TestData::ChangefeedScheme = TTypedScheme {
547+
EPathTypeCdcStream,
548+
Sprintf(R"(
549+
TableName: "%s"
550+
StreamDescription {
551+
Name: "update_feed"
552+
Mode: ECdcStreamModeUpdate
553+
Format: ECdcStreamFormatJson
554+
State: ECdcStreamStateReady
555+
}
556+
)", TableName)
557+
};
558+
559+
const TString TestData::RequestString = R"(
560+
ExportToS3Settings {
561+
endpoint: "localhost:%d"
562+
scheme: HTTP
563+
items {
564+
source_path: "/MyRoot/Table"
565+
destination_prefix: ""
566+
}
567+
}
568+
)";
569+
570+
Y_UNIT_TEST(ShouldSucceedOnSingleShardTableWithChangefeed) {
571+
RunS3({
572+
TestData::Table(),
573+
TestData::Changefeed()
574+
}, TestData::Request());
575+
}
576+
577+
Y_UNIT_TEST(CancelOnSingleShardTableWithChangefeed) {
578+
CancelS3({
579+
TestData::Table(),
580+
TestData::Changefeed()
581+
}, TestData::Request());
582+
}
583+
584+
Y_UNIT_TEST(ForgetShouldSucceedOnSingleShardTableWithChangefeed) {
585+
ForgetS3({
586+
TestData::Table(),
587+
TestData::Changefeed()
588+
}, TestData::Request());
589+
}
512590
}

ydb/core/tx/schemeshard/ut_helpers/export_reboots_common.cpp

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -11,20 +11,28 @@ using namespace NKikimrSchemeOp;
1111
namespace NSchemeShardUT_Private {
1212
namespace NExportReboots {
1313

14+
void TestCreate(TTestActorRuntime& runtime, ui64 txId, const TString& scheme, NKikimrSchemeOp::EPathType pathType) {
15+
using TTestCreateFunc = ui64(*)(TTestActorRuntime&, ui64, const TString&, const TString&,
16+
const TVector<TExpectedResult>&, const TApplyIf&);
17+
18+
static const THashMap<NKikimrSchemeOp::EPathType, TTestCreateFunc> functions = {
19+
{EPathTypeTable, &TestSimpleCreateTable},
20+
{EPathTypeView, &TestCreateView},
21+
{EPathTypeCdcStream, &TestCreateCdcStream},
22+
};
23+
24+
auto it = functions.find(pathType);
25+
if (it != functions.end()) {
26+
it->second(runtime, txId, "/MyRoot", scheme, {NKikimrScheme::StatusAccepted}, {});
27+
} else {
28+
UNIT_FAIL("export is not implemented for the scheme object type: " << pathType);
29+
}
30+
}
31+
1432
void CreateSchemeObjects(TTestWithReboots& t, TTestActorRuntime& runtime, const TVector<TTypedScheme>& schemeObjects) {
1533
TSet<ui64> toWait;
1634
for (const auto& [type, scheme, _] : schemeObjects) {
17-
switch (type) {
18-
case EPathTypeTable:
19-
TestCreateTable(runtime, ++t.TxId, "/MyRoot", scheme);
20-
break;
21-
case EPathTypeView:
22-
TestCreateView(runtime, ++t.TxId, "/MyRoot", scheme);
23-
break;
24-
default:
25-
UNIT_FAIL("export is not implemented for the scheme object type: " << type);
26-
return;
27-
}
35+
TestCreate(runtime, ++t.TxId, scheme, type);
2836
toWait.insert(t.TxId);
2937
}
3038
t.TestEnv->TestWaitNotification(runtime, toWait);

ydb/core/tx/schemeshard/ut_helpers/helpers.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -821,6 +821,7 @@ namespace NSchemeShardUT_Private {
821821

822822
// table
823823
GENERIC_WITH_ATTRS_HELPERS(CreateTable, NKikimrSchemeOp::EOperationType::ESchemeOpCreateTable, &NKikimrSchemeOp::TModifyScheme::MutableCreateTable)
824+
GENERIC_HELPERS(SimpleCreateTable, NKikimrSchemeOp::EOperationType::ESchemeOpCreateTable, &NKikimrSchemeOp::TModifyScheme::MutableCreateTable)
824825
GENERIC_HELPERS(CreateIndexedTable, NKikimrSchemeOp::EOperationType::ESchemeOpCreateIndexedTable, &NKikimrSchemeOp::TModifyScheme::MutableCreateIndexedTable)
825826
GENERIC_HELPERS(ConsistentCopyTables, NKikimrSchemeOp::EOperationType::ESchemeOpCreateConsistentCopyTables, &NKikimrSchemeOp::TModifyScheme::MutableCreateConsistentCopyTables)
826827
GENERIC_HELPERS(AlterTable, NKikimrSchemeOp::EOperationType::ESchemeOpAlterTable, &NKikimrSchemeOp::TModifyScheme::MutableAlterTable)

ydb/core/tx/schemeshard/ut_helpers/helpers.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,7 @@ namespace NSchemeShardUT_Private {
188188

189189
// table
190190
GENERIC_WITH_ATTRS_HELPERS(CreateTable);
191+
GENERIC_HELPERS(SimpleCreateTable);
191192
GENERIC_HELPERS(CreateIndexedTable);
192193
GENERIC_HELPERS(ConsistentCopyTables);
193194
GENERIC_HELPERS(AlterTable);

ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1179,6 +1179,11 @@
11791179
"ColumnId": 12,
11801180
"ColumnName": "EnablePermissions",
11811181
"ColumnType": "Bool"
1182+
},
1183+
{
1184+
"ColumnId": 13,
1185+
"ColumnName": "ChangefeedUnderlyingTopics",
1186+
"ColumnType": "String"
11821187
}
11831188
],
11841189
"ColumnsDropped": [],
@@ -1196,7 +1201,8 @@
11961201
9,
11971202
10,
11981203
11,
1199-
12
1204+
12,
1205+
13
12001206
],
12011207
"RoomID": 0,
12021208
"Codec": 0,
@@ -4577,6 +4583,11 @@
45774583
"ColumnId": 13,
45784584
"ColumnName": "EnablePermissions",
45794585
"ColumnType": "Bool"
4586+
},
4587+
{
4588+
"ColumnId": 14,
4589+
"ColumnName": "ChangefeedUnderlyingTopics",
4590+
"ColumnType": "String"
45804591
}
45814592
],
45824593
"ColumnsDropped": [],
@@ -4595,7 +4606,8 @@
45954606
10,
45964607
11,
45974608
12,
4598-
13
4609+
13,
4610+
14
45994611
],
46004612
"RoomID": 0,
46014613
"Codec": 0,

0 commit comments

Comments
 (0)