Skip to content

Commit df933e6

Browse files
Merge e8d8896 into a4b20bc
2 parents a4b20bc + e8d8896 commit df933e6

File tree

3 files changed

+108
-11
lines changed

3 files changed

+108
-11
lines changed

ydb/core/tx/datashard/export_s3_uploader.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,7 @@ class TS3Uploader: public TActorBootstrapped<TS3Uploader> {
241241
}
242242

243243
void UploadChangefeed() {
244+
Y_ABORT_UNLESS(!ChangefeedsUploaded);
244245
if (IndexExportedChangefeed == Changefeeds.size()) {
245246
ChangefeedsUploaded = true;
246247
if (Scanner) {

ydb/core/tx/schemeshard/ut_export_reboots_s3/ut_export_reboots_s3.cpp

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -512,4 +512,83 @@ Y_UNIT_TEST_SUITE(TExportToS3WithRebootsTests) {
512512
}
513513
)");
514514
}
515+
516+
class TestData {
517+
public:
518+
static const TTypedScheme& Table() {
519+
return TableScheme;
520+
}
521+
522+
static const TTypedScheme& Changefeed() {
523+
return ChangefeedScheme;
524+
}
525+
526+
static const TString& Request() {
527+
return RequestString;
528+
}
529+
530+
private:
531+
static const char* TableName;
532+
static const TTypedScheme TableScheme;
533+
static const TTypedScheme ChangefeedScheme;
534+
static const TString RequestString;
535+
536+
};
537+
538+
const char* TestData::TableName = "Table";
539+
540+
const TTypedScheme TestData::TableScheme = TTypedScheme {
541+
EPathTypeTable,
542+
Sprintf(R"(
543+
Name: "%s"
544+
Columns { Name: "key" Type: "Utf8" }
545+
Columns { Name: "value" Type: "Utf8" }
546+
KeyColumnNames: ["key"]
547+
)", TableName)
548+
};
549+
550+
const TTypedScheme TestData::ChangefeedScheme = TTypedScheme {
551+
EPathTypeCdcStream,
552+
Sprintf(R"(
553+
TableName: "%s"
554+
StreamDescription {
555+
Name: "update_feed"
556+
Mode: ECdcStreamModeUpdate
557+
Format: ECdcStreamFormatJson
558+
State: ECdcStreamStateReady
559+
}
560+
)", TableName)
561+
};
562+
563+
const TString TestData::RequestString = R"(
564+
ExportToS3Settings {
565+
endpoint: "localhost:%d"
566+
scheme: HTTP
567+
items {
568+
source_path: "/MyRoot/Table"
569+
destination_prefix: ""
570+
}
571+
}
572+
)";
573+
574+
Y_UNIT_TEST(ShouldSucceedOnSingleShardTableWithChangefeed) {
575+
RunS3({
576+
TestData::Table(),
577+
TestData::Changefeed()
578+
}, TestData::Request());
579+
}
580+
581+
Y_UNIT_TEST(CancelOnSingleShardTableWithChangefeed) {
582+
CancelS3({
583+
TestData::Table(),
584+
TestData::Changefeed()
585+
}, TestData::Request());
586+
}
587+
588+
Y_UNIT_TEST(ForgetShouldSucceedOnSingleShardTableWithChangefeed) {
589+
ForgetS3({
590+
TestData::Table(),
591+
TestData::Changefeed()
592+
}, TestData::Request());
593+
}
515594
}

ydb/core/tx/schemeshard/ut_helpers/export_reboots_common.cpp

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -11,20 +11,37 @@ using namespace NKikimrSchemeOp;
1111
namespace NSchemeShardUT_Private {
1212
namespace NExportReboots {
1313

14+
using TCreateHandler = std::function<void(TTestActorRuntime&, ui64&, const TString&, const TString&)>;
15+
16+
#define GEN_CREATE_HANDLER(func) \
17+
TCreateHandler([](TTestActorRuntime& runtime, ui64& txId, const TString& parentPath, const TString& scheme) { \
18+
func(runtime, txId, parentPath, scheme); \
19+
})
20+
21+
THashMap<NKikimrSchemeOp::EPathType, TCreateHandler> CreateHandlers = {
22+
{EPathTypeTable, GEN_CREATE_HANDLER(TestCreateTable)},
23+
{EPathTypeView, GEN_CREATE_HANDLER(TestCreateView)},
24+
{EPathTypeCdcStream, GEN_CREATE_HANDLER(TestCreateCdcStream)},
25+
};
26+
27+
void TestCreate(
28+
TTestActorRuntime& runtime,
29+
ui64& txId,
30+
const TString& scheme,
31+
NKikimrSchemeOp::EPathType pathType
32+
) {
33+
if (CreateHandlers.contains(pathType)) {
34+
CreateHandlers[pathType](runtime, txId, "/MyRoot", scheme);
35+
} else {
36+
UNIT_FAIL("export is not implemented for the scheme object type: " << pathType);
37+
}
38+
}
39+
1440
void CreateSchemeObjects(TTestWithReboots& t, TTestActorRuntime& runtime, const TVector<TTypedScheme>& schemeObjects) {
1541
TSet<ui64> toWait;
42+
1643
for (const auto& [type, scheme, _] : schemeObjects) {
17-
switch (type) {
18-
case EPathTypeTable:
19-
TestCreateTable(runtime, ++t.TxId, "/MyRoot", scheme);
20-
break;
21-
case EPathTypeView:
22-
TestCreateView(runtime, ++t.TxId, "/MyRoot", scheme);
23-
break;
24-
default:
25-
UNIT_FAIL("export is not implemented for the scheme object type: " << type);
26-
return;
27-
}
44+
TestCreate(runtime, ++t.TxId, scheme, type);
2845
toWait.insert(t.TxId);
2946
}
3047
t.TestEnv->TestWaitNotification(runtime, toWait);

0 commit comments

Comments
 (0)