Skip to content

Commit 731ba2d

Browse files
Testing the full export/import pipeline of changefeeds (#15588)
1 parent ee06b69 commit 731ba2d

File tree

2 files changed

+74
-1
lines changed

2 files changed

+74
-1
lines changed

ydb/core/tx/schemeshard/schemeshard_import__create.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -537,8 +537,11 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
537537
<< ", txId# " << txId);
538538

539539
Y_ABORT_UNLESS(item.WaitTxId == InvalidTxId);
540+
541+
auto propose = CreateChangefeedPropose(Self, txId, item);
542+
Y_ABORT_UNLESS(propose);
540543

541-
Send(Self->SelfId(), CreateChangefeedPropose(Self, txId, item));
544+
Send(Self->SelfId(), std::move(propose));
542545
}
543546

544547
void CreateConsumers(TImportInfo::TPtr importInfo, ui32 itemIdx, TTxId txId) {

ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5203,6 +5203,76 @@ Y_UNIT_TEST_SUITE(TImportTests) {
52035203
TestImportChangefeeds(3, AddedSchemeWithPermissions);
52045204
}
52055205

5206+
void TestCreateCdcStreams(TTestEnv& env, TTestActorRuntime& runtime, ui64& txId, const TString& dbName, ui64 count) {
5207+
for (ui64 i = 1; i <= count; ++i) {
5208+
TestCreateCdcStream(runtime, ++txId, dbName, Sprintf(R"(
5209+
TableName: "Original"
5210+
StreamDescription {
5211+
Name: "Stream_%d"
5212+
Mode: ECdcStreamModeKeysOnly
5213+
Format: ECdcStreamFormatJson
5214+
}
5215+
)", i));
5216+
env.TestWaitNotification(runtime, txId);
5217+
}
5218+
}
5219+
5220+
Y_UNIT_TEST(ChangefeedsExportRestore) {
5221+
TPortManager portManager;
5222+
const ui16 port = portManager.GetPort();
5223+
5224+
TS3Mock s3Mock({}, TS3Mock::TSettings(port));
5225+
UNIT_ASSERT(s3Mock.Start());
5226+
5227+
TTestBasicRuntime runtime;
5228+
TTestEnv env(runtime);
5229+
ui64 txId = 100;
5230+
5231+
runtime.SetLogPriority(NKikimrServices::DATASHARD_BACKUP, NActors::NLog::PRI_TRACE);
5232+
runtime.SetLogPriority(NKikimrServices::DATASHARD_RESTORE, NActors::NLog::PRI_TRACE);
5233+
runtime.SetLogPriority(NKikimrServices::EXPORT, NActors::NLog::PRI_TRACE);
5234+
runtime.SetLogPriority(NKikimrServices::IMPORT, NActors::NLog::PRI_TRACE);
5235+
runtime.GetAppData().FeatureFlags.SetEnableChangefeedsExport(true);
5236+
runtime.GetAppData().FeatureFlags.SetEnableChangefeedsImport(true);
5237+
5238+
TestCreateTable(runtime, ++txId, "/MyRoot", R"(
5239+
Name: "Original"
5240+
Columns { Name: "key" Type: "Uint32" }
5241+
Columns { Name: "double_value" Type: "Double" }
5242+
Columns { Name: "float_value" Type: "Float" }
5243+
KeyColumnNames: ["key"]
5244+
)");
5245+
env.TestWaitNotification(runtime, txId);
5246+
5247+
TestCreateCdcStreams(env, runtime, txId, "/MyRoot", 3);
5248+
5249+
TestExport(runtime, ++txId, "/MyRoot", Sprintf(R"(
5250+
ExportToS3Settings {
5251+
endpoint: "localhost:%d"
5252+
scheme: HTTP
5253+
items {
5254+
source_path: "/MyRoot/Original"
5255+
destination_prefix: ""
5256+
}
5257+
}
5258+
)", port));
5259+
env.TestWaitNotification(runtime, txId);
5260+
TestGetExport(runtime, txId, "/MyRoot");
5261+
5262+
TestImport(runtime, ++txId, "/MyRoot", Sprintf(R"(
5263+
ImportFromS3Settings {
5264+
endpoint: "localhost:%d"
5265+
scheme: HTTP
5266+
items {
5267+
source_prefix: ""
5268+
destination_path: "/MyRoot/Restored"
5269+
}
5270+
}
5271+
)", port));
5272+
env.TestWaitNotification(runtime, txId);
5273+
TestGetImport(runtime, txId, "/MyRoot");
5274+
}
5275+
52065276
Y_UNIT_TEST(IgnoreBasicSchemeLimits) {
52075277
TTestBasicRuntime runtime;
52085278
TTestEnv env(runtime);

0 commit comments

Comments
 (0)