Skip to content

Commit 435ec09

Browse files
Merge 1ba36a7 into 818a8a0
2 parents 818a8a0 + 1ba36a7 commit 435ec09

File tree

3 files changed

+141
-0
lines changed

3 files changed

+141
-0
lines changed

ydb/core/tx/datashard/export_s3_uploader.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,7 @@ class TS3Uploader: public TActorBootstrapped<TS3Uploader> {
241241
}
242242

243243
void UploadChangefeed() {
244+
Y_ABORT_UNLESS(!ChangefeedsUploaded);
244245
if (IndexExportedChangefeed == Changefeeds.size()) {
245246
ChangefeedsUploaded = true;
246247
if (Scanner) {

ydb/core/tx/schemeshard/ut_export_reboots_s3/ut_export_reboots_s3.cpp

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -439,4 +439,112 @@ Y_UNIT_TEST_SUITE(TExportToS3WithRebootsTests) {
439439
}
440440
)");
441441
}
442+
443+
Y_UNIT_TEST(ShouldSucceedOnSingleShardTableWithChangefeed) {
444+
const char* tableName = "Table";
445+
RunS3({
446+
TTypedScheme {
447+
Sprintf(R"(
448+
Name: "%s"
449+
Columns { Name: "key" Type: "Utf8" }
450+
Columns { Name: "value" Type: "Utf8" }
451+
KeyColumnNames: ["key"]
452+
)", tableName),
453+
EPathTypeTable
454+
},
455+
TTypedScheme {
456+
Sprintf(R"(
457+
TableName: "%s"
458+
StreamDescription {
459+
Name: "update_feed"
460+
Mode: ECdcStreamModeUpdate
461+
Format: ECdcStreamFormatJson
462+
State: ECdcStreamStateReady
463+
}
464+
)", tableName),
465+
EPathTypeCdcStream
466+
}
467+
}, R"(
468+
ExportToS3Settings {
469+
endpoint: "localhost:%d"
470+
scheme: HTTP
471+
items {
472+
source_path: "/MyRoot/Table"
473+
destination_prefix: ""
474+
}
475+
}
476+
)");
477+
}
478+
479+
Y_UNIT_TEST(CancelOnSingleShardTableWithChangefeed) {
480+
const char* tableName = "Table";
481+
CancelS3({
482+
TTypedScheme {
483+
Sprintf(R"(
484+
Name: "%s"
485+
Columns { Name: "key" Type: "Utf8" }
486+
Columns { Name: "value" Type: "Utf8" }
487+
KeyColumnNames: ["key"]
488+
)", tableName),
489+
EPathTypeTable
490+
},
491+
TTypedScheme {
492+
Sprintf(R"(
493+
TableName: "%s"
494+
StreamDescription {
495+
Name: "update_feed"
496+
Mode: ECdcStreamModeUpdate
497+
Format: ECdcStreamFormatJson
498+
State: ECdcStreamStateReady
499+
}
500+
)", tableName),
501+
EPathTypeCdcStream
502+
}
503+
}, R"(
504+
ExportToS3Settings {
505+
endpoint: "localhost:%d"
506+
scheme: HTTP
507+
items {
508+
source_path: "/MyRoot/Table"
509+
destination_prefix: ""
510+
}
511+
}
512+
)");
513+
}
514+
515+
Y_UNIT_TEST(ForgetShouldSucceedOnSingleShardTableWithChangefeed) {
516+
const char* tableName = "Table";
517+
ForgetS3({
518+
TTypedScheme {
519+
Sprintf(R"(
520+
Name: "%s"
521+
Columns { Name: "key" Type: "Utf8" }
522+
Columns { Name: "value" Type: "Utf8" }
523+
KeyColumnNames: ["key"]
524+
)", tableName),
525+
EPathTypeTable
526+
},
527+
TTypedScheme {
528+
Sprintf(R"(
529+
TableName: "%s"
530+
StreamDescription {
531+
Name: "update_feed"
532+
Mode: ECdcStreamModeUpdate
533+
Format: ECdcStreamFormatJson
534+
State: ECdcStreamStateReady
535+
}
536+
)", tableName),
537+
EPathTypeCdcStream
538+
}
539+
}, R"(
540+
ExportToS3Settings {
541+
endpoint: "localhost:%d"
542+
scheme: HTTP
543+
items {
544+
source_path: "/MyRoot/Table"
545+
destination_prefix: ""
546+
}
547+
}
548+
)");
549+
}
442550
}

ydb/core/tx/schemeshard/ut_helpers/export_reboots_common.cpp

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,16 +11,48 @@ using namespace NKikimrSchemeOp;
1111
namespace NSchemeShardUT_Private {
1212
namespace NExportReboots {
1313

14+
// using TCreateHandler = std::function<ui64(TTestActorRuntime&, ui64&, const TString&, const TString&)>;
15+
16+
// template <typename T>
17+
// TCreateHandler GenCreateHandler(T func) {
18+
// return [func](TTestActorRuntime& runtime, ui64& txId, const TString& parentPath, const TString& scheme) {
19+
// return func(runtime, txId, parentPath, scheme);
20+
// };
21+
// }
22+
23+
// THashMap<NKikimrSchemeOp::EPathType, TCreateHandler> CreateHandlers = {
24+
// {EPathTypeTable, GenCreateHandler(TestCreateTable)},
25+
// {EPathTypeView, GenCreateHandler(TestCreateView)},
26+
// {EPathTypeCdcStream, GenCreateHandler(TestCreateCdcStream)},
27+
// };
28+
29+
// void TestCreate(
30+
// TTestActorRuntime& runtime,
31+
// ui64& txId,
32+
// const TString& scheme,
33+
// NKikimrSchemeOp::EPathType pathType
34+
// ) {
35+
// if (CreateHandlers.contains(pathType)) {
36+
// CreateHandlers[pathType](runtime, txId, "/MyRoot", scheme);
37+
// } else {
38+
// UNIT_FAIL("export is not implemented for the scheme object type: " << pathType);
39+
// }
40+
// }
41+
1442
void CreateSchemeObjects(TTestWithReboots& t, TTestActorRuntime& runtime, const TVector<TTypedScheme>& schemeObjects) {
1543
TSet<ui64> toWait;
1644
for (const auto& [scheme, type] : schemeObjects) {
45+
// TestCreate(runtime, ++t.TxId, scheme, type);
1746
switch (type) {
1847
case EPathTypeTable:
1948
TestCreateTable(runtime, ++t.TxId, "/MyRoot", scheme);
2049
break;
2150
case EPathTypeView:
2251
TestCreateView(runtime, ++t.TxId, "/MyRoot", scheme);
2352
break;
53+
case EPathTypeCdcStream:
54+
TestCreateCdcStream(runtime, ++t.TxId, "/MyRoot", scheme);
55+
break;
2456
default:
2557
UNIT_FAIL("export is not implemented for the scheme object type: " << type);
2658
return;

0 commit comments

Comments
 (0)