Skip to content

Commit a642ab4

Browse files
Merge cd96ff4 into 93cb95f
2 parents 93cb95f + cd96ff4 commit a642ab4

9 files changed

+263
-12
lines changed

ydb/core/tablet_flat/flat_cxx_database.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,7 @@ template <> struct NSchemeTypeMapper<NScheme::NTypeIds::Date32> { typedef i32 Ty
238238
template <> struct NSchemeTypeMapper<NScheme::NTypeIds::Datetime64> { typedef i64 Type; };
239239
template <> struct NSchemeTypeMapper<NScheme::NTypeIds::Timestamp64> { typedef i64 Type; };
240240
template <> struct NSchemeTypeMapper<NScheme::NTypeIds::Interval64> { typedef i64 Type; };
241+
template <> struct NSchemeTypeMapper<NScheme::NTypeIds::Json> { typedef TVector<TString> Type; };
241242

242243
/// only for compatibility with old code
243244
template <NScheme::TTypeId ValType>

ydb/core/tx/schemeshard/schemeshard__init.cpp

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4464,6 +4464,20 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
44644464
item.Metadata = NBackup::TMetadata::Deserialize(rowset.GetValue<Schema::ImportItems::Metadata>());
44654465
}
44664466

4467+
if (rowset.HaveValue<Schema::ImportItems::Changefeeds>() && rowset.HaveValue<Schema::ImportItems::Topics>()) {
4468+
const ui64 count = rowset.GetValue<Schema::ImportItems::Changefeeds>().size();
4469+
TVector<TImportInfo::TChangefeedImportDescriptions> changefeeds;
4470+
changefeeds.reserve(count);
4471+
for (ui64 i = 0; i < count; ++i) {
4472+
Ydb::Table::ChangefeedDescription changefeed;
4473+
Ydb::Topic::DescribeTopicResult topic;
4474+
Y_ABORT_UNLESS(ParseFromStringNoSizeLimit(changefeed, rowset.GetValue<Schema::ImportItems::Changefeeds>()[i] ));
4475+
Y_ABORT_UNLESS(ParseFromStringNoSizeLimit(topic, rowset.GetValue<Schema::ImportItems::Topics>()[i] ));
4476+
changefeeds.emplace_back(changefeed, topic);
4477+
}
4478+
item.Changefeeds = std::move(changefeeds);
4479+
}
4480+
44674481
item.State = static_cast<TImportInfo::EState>(rowset.GetValue<Schema::ImportItems::State>());
44684482
item.WaitTxId = rowset.GetValueOrDefault<Schema::ImportItems::WaitTxId>(InvalidTxId);
44694483
item.NextIndexIdx = rowset.GetValueOrDefault<Schema::ImportItems::NextIndexIdx>(0);

ydb/core/tx/schemeshard/schemeshard_import.cpp

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,23 @@ void TSchemeShard::PersistImportItemScheme(NIceDb::TNiceDb& db, const TImportInf
189189
db.Table<Schema::ImportItems>().Key(importInfo->Id, itemIdx).Update(
190190
NIceDb::TUpdate<Schema::ImportItems::Metadata>(item.Metadata.Serialize())
191191
);
192+
const ui64 count = item.Changefeeds.size();
193+
TVector<TString> jsonChangefeeds;
194+
TVector<TString> jsonTopics;
195+
jsonChangefeeds.reserve(count);
196+
jsonTopics.reserve(count);
197+
198+
for (const auto& [changefeed, topic] : item.Changefeeds) {
199+
jsonChangefeeds.push_back(changefeed.SerializeAsString());
200+
jsonTopics.push_back(topic.SerializeAsString());
201+
}
202+
203+
db.Table<Schema::ImportItems>().Key(importInfo->Id, itemIdx).Update(
204+
NIceDb::TUpdate<Schema::ImportItems::Changefeeds>(jsonChangefeeds)
205+
);
206+
db.Table<Schema::ImportItems>().Key(importInfo->Id, itemIdx).Update(
207+
NIceDb::TUpdate<Schema::ImportItems::Topics>(jsonTopics)
208+
);
192209
}
193210

194211
void TSchemeShard::PersistImportItemPreparedCreationQuery(NIceDb::TNiceDb& db, const TImportInfo::TPtr importInfo, ui32 itemIdx) {

ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.cpp

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,23 @@
88
namespace NKikimr {
99
namespace NSchemeShard {
1010

11+
bool CreateChangefeedsPropose(THolder<TEvSchemeShard::TEvModifySchemeTransaction>& propose, const TImportInfo::TItem& item, TString& error) {
12+
auto& record = propose->Record;
13+
const auto& changefeeds = item.Changefeeds;
14+
15+
for (const auto& [changefeed, topic]: changefeeds) {
16+
auto& modifyScheme = *record.AddTransaction();
17+
auto& cdcStream = *modifyScheme.MutableCreateCdcStream();
18+
Ydb::StatusIds::StatusCode status;
19+
auto& cdcStreamDescription = *cdcStream.MutableStreamDescription();
20+
if (!FillChangefeedDescription(cdcStreamDescription, changefeed, status, error)) {
21+
return false;
22+
}
23+
cdcStream.SetRetentionPeriodSeconds(topic.Getretention_period().seconds());
24+
}
25+
return true;
26+
}
27+
1128
THolder<TEvSchemeShard::TEvModifySchemeTransaction> CreateTablePropose(
1229
TSchemeShard* ss,
1330
TTxId txId,
@@ -73,6 +90,8 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> CreateTablePropose(
7390
return nullptr;
7491
}
7592

93+
CreateChangefeedsPropose(propose, item, error);
94+
7695
return propose;
7796
}
7897

ydb/core/tx/schemeshard/schemeshard_import_scheme_getter.cpp

Lines changed: 183 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,14 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
4343
return TStringBuilder() << settings.items(itemIdx).source_prefix() << "/permissions.pb";
4444
}
4545

46+
static TString ChangefeedDescriptionKey(const TString& changefeedPrefix) {
47+
return TStringBuilder() << changefeedPrefix << "/changefeed_description.pb";
48+
}
49+
50+
static TString TopicDescriptionKey(const TString& changefeedPrefix) {
51+
return TStringBuilder() << changefeedPrefix << "/topic_description.pb";
52+
}
53+
4654
static bool IsView(TStringBuf schemeKey) {
4755
return schemeKey.EndsWith(NYdb::NDump::NFiles::CreateView().FileName);
4856
}
@@ -51,6 +59,41 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
5159
return errorType == S3Errors::RESOURCE_NOT_FOUND || errorType == S3Errors::NO_SUCH_KEY;
5260
}
5361

62+
void ListObjects(const TString& prefix) {
63+
auto request = Model::ListObjectsRequest()
64+
.WithPrefix(prefix);
65+
66+
Send(Client, new TEvExternalStorage::TEvListObjectsRequest(request));
67+
}
68+
69+
void HandleChangefeeds(TEvExternalStorage::TEvListObjectsResponse::TPtr& ev) {
70+
const auto& result = ev.Get()->Get()->Result;
71+
72+
LOG_D("HandleChangefeeds TEvExternalStorage::TEvListObjectResponse"
73+
<< ": self# " << SelfId()
74+
<< ", result# " << result);
75+
76+
if (!CheckResult(result, "ListObject")) {
77+
return;
78+
}
79+
80+
const auto& objects = result.GetResult().GetContents();
81+
ChangefeedsKeys.reserve(objects.size());
82+
83+
for (const auto& obj : objects) {
84+
const TFsPath& path = obj.GetKey();
85+
if (path.GetName() == "changefeed_description.pb") {
86+
ChangefeedsKeys.push_back(path.Dirname());
87+
}
88+
}
89+
90+
if (!ChangefeedsKeys.empty()) {
91+
HeadObject(ChangefeedDescriptionKey(ChangefeedsKeys[0]));
92+
} else {
93+
Reply();
94+
}
95+
}
96+
5497
void HeadObject(const TString& key) {
5598
auto request = Model::HeadObjectRequest()
5699
.WithKey(key);
@@ -128,6 +171,36 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
128171
GetObject(ChecksumKey, std::make_pair(0, contentLength - 1));
129172
}
130173

174+
void HandleChangefeed(TEvExternalStorage::TEvHeadObjectResponse::TPtr& ev) {
175+
const auto& result = ev->Get()->Result;
176+
177+
LOG_D("HandleChangefeeds TEvExternalStorage::TEvHeadObjectResponse"
178+
<< ": self# " << SelfId()
179+
<< ", result# " << result);
180+
181+
if (!CheckResult(result, "HeadObject")) {
182+
return;
183+
}
184+
185+
const auto contentLength = result.GetResult().GetContentLength();
186+
GetObject(ChangefeedDescriptionKey(ChangefeedsKeys[IndexDownloadedChangefeed]), std::make_pair(0, contentLength - 1));
187+
}
188+
189+
void HandleTopic(TEvExternalStorage::TEvHeadObjectResponse::TPtr& ev) {
190+
const auto& result = ev->Get()->Result;
191+
192+
LOG_D("HandleTopic TEvExternalStorage::TEvHeadObjectResponse"
193+
<< ": self# " << SelfId()
194+
<< ", result# " << result);
195+
196+
if (!CheckResult(result, "HeadObject")) {
197+
return;
198+
}
199+
200+
const auto contentLength = result.GetResult().GetContentLength();
201+
GetObject(TopicDescriptionKey(ChangefeedsKeys[IndexDownloadedChangefeed]), std::make_pair(0, contentLength - 1));
202+
}
203+
131204
void GetObject(const TString& key, const std::pair<ui64, ui64>& range) {
132205
auto request = Model::GetObjectRequest()
133206
.WithKey(key)
@@ -205,7 +278,7 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
205278
if (NeedDownloadPermissions) {
206279
StartDownloadingPermissions();
207280
} else {
208-
Reply();
281+
StartDownloadingChangefeeds();
209282
}
210283
};
211284

@@ -242,7 +315,7 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
242315
item.Permissions = std::move(permissions);
243316

244317
auto nextStep = [this]() {
245-
Reply();
318+
StartDownloadingChangefeeds();
246319
};
247320

248321
if (NeedValidateChecksums) {
@@ -274,6 +347,82 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
274347
ChecksumValidatedCallback();
275348
}
276349

350+
void HandleChangefeed(TEvExternalStorage::TEvGetObjectResponse::TPtr& ev) {
351+
const auto& msg = *ev->Get();
352+
const auto& result = msg.Result;
353+
354+
LOG_D("HandleChangefeeds TEvExternalStorage::TEvGetObjectResponse"
355+
<< ": self# " << SelfId()
356+
<< ", result# " << result);
357+
358+
if (!CheckResult(result, "GetObject")) {
359+
return;
360+
}
361+
362+
Y_ABORT_UNLESS(ItemIdx < ImportInfo->Items.size());
363+
auto& item = ImportInfo->Items.at(ItemIdx);
364+
365+
LOG_T("Trying to parse changefeed"
366+
<< ": self# " << SelfId()
367+
<< ", body# " << SubstGlobalCopy(msg.Body, "\n", "\\n"));
368+
369+
Ydb::Table::ChangefeedDescription changefeed;
370+
if (!google::protobuf::TextFormat::ParseFromString(msg.Body, &changefeed)) {
371+
return Reply(false, "Cannot parse permissions");
372+
}
373+
item.Changefeeds[IndexDownloadedChangefeed].ChangefeedDescription = std::move(changefeed);
374+
375+
auto nextStep = [this]() {
376+
HeadObject(TopicDescriptionKey(ChangefeedsKeys[IndexDownloadedChangefeed]));
377+
};
378+
379+
if (NeedValidateChecksums) {
380+
StartValidatingChecksum(ChangefeedDescriptionKey(ChangefeedsKeys[IndexDownloadedChangefeed]), msg.Body, nextStep);
381+
} else {
382+
nextStep();
383+
}
384+
}
385+
386+
void HandleTopic(TEvExternalStorage::TEvGetObjectResponse::TPtr& ev) {
387+
const auto& msg = *ev->Get();
388+
const auto& result = msg.Result;
389+
390+
LOG_D("HandleChangefeeds TEvExternalStorage::TEvGetObjectResponse"
391+
<< ": self# " << SelfId()
392+
<< ", result# " << result);
393+
394+
if (!CheckResult(result, "GetObject")) {
395+
return;
396+
}
397+
398+
Y_ABORT_UNLESS(ItemIdx < ImportInfo->Items.size());
399+
auto& item = ImportInfo->Items.at(ItemIdx);
400+
401+
LOG_T("Trying to parse changefeed"
402+
<< ": self# " << SelfId()
403+
<< ", body# " << SubstGlobalCopy(msg.Body, "\n", "\\n"));
404+
405+
Ydb::Topic::DescribeTopicResult topic;
406+
if (!google::protobuf::TextFormat::ParseFromString(msg.Body, &topic)) {
407+
return Reply(false, "Cannot parse permissions");
408+
}
409+
item.Changefeeds[IndexDownloadedChangefeed].Topic = std::move(topic);
410+
411+
auto nextStep = [this]() {
412+
if (++IndexDownloadedChangefeed == ChangefeedsKeys.size()) {
413+
Reply();
414+
} else {
415+
HeadObject(ChangefeedDescriptionKey(ChangefeedsKeys[IndexDownloadedChangefeed]));
416+
}
417+
};
418+
419+
if (NeedValidateChecksums) {
420+
StartValidatingChecksum(TopicDescriptionKey(ChangefeedsKeys[IndexDownloadedChangefeed]), msg.Body, nextStep);
421+
} else {
422+
nextStep();
423+
}
424+
}
425+
277426
template <typename TResult>
278427
bool CheckResult(const TResult& result, const TStringBuf marker) {
279428
if (result.IsSuccess()) {
@@ -312,12 +461,20 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
312461
TActor::PassAway();
313462
}
314463

315-
void Download(const TString& key) {
464+
void DownloadCommon() {
316465
if (Client) {
317466
Send(Client, new TEvents::TEvPoisonPill());
318467
}
319468
Client = RegisterWithSameMailbox(CreateS3Wrapper(ExternalStorageConfig->ConstructStorageOperator()));
469+
}
470+
471+
void DownloadWithoutKey() {
472+
DownloadCommon();
473+
ListObjects(ImportInfo->Settings.items(ItemIdx).source_prefix());
474+
}
320475

476+
void Download(const TString& key) {
477+
DownloadCommon();
321478
HeadObject(key);
322479
}
323480

@@ -337,6 +494,10 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
337494
Download(ChecksumKey);
338495
}
339496

497+
void DownloadChangefeeds() {
498+
DownloadWithoutKey();
499+
}
500+
340501
void ResetRetries() {
341502
Attempt = 0;
342503
}
@@ -353,6 +514,12 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
353514
Become(&TThis::StateDownloadPermissions);
354515
}
355516

517+
void StartDownloadingChangefeeds() {
518+
ResetRetries();
519+
DownloadChangefeeds();
520+
Become(&TThis::StateDownloadChangefeeds);
521+
}
522+
356523
void StartValidatingChecksum(const TString& key, const TString& object, std::function<void()> checksumValidatedCallback) {
357524
ChecksumKey = NBackup::ChecksumKey(key);
358525
Checksum = NBackup::ComputeChecksum(object);
@@ -413,6 +580,17 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
413580
}
414581
}
415582

583+
STATEFN(StateDownloadChangefeeds) {
584+
switch (ev->GetTypeRewrite()) {
585+
hFunc(TEvExternalStorage::TEvListObjectsResponse, HandleChangefeeds);
586+
hFunc(TEvExternalStorage::TEvHeadObjectResponse, HandleChangefeed);
587+
hFunc(TEvExternalStorage::TEvGetObjectResponse, HandleChangefeed);
588+
589+
sFunc(TEvents::TEvWakeup, DownloadChangefeeds);
590+
sFunc(TEvents::TEvPoisonPill, PassAway);
591+
}
592+
}
593+
416594
STATEFN(StateDownloadChecksum) {
417595
switch (ev->GetTypeRewrite()) {
418596
hFunc(TEvExternalStorage::TEvHeadObjectResponse, HandleChecksum);
@@ -432,6 +610,8 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
432610
const TString MetadataKey;
433611
TString SchemeKey;
434612
const TString PermissionsKey;
613+
TVector<TString> ChangefeedsKeys;
614+
ui64 IndexDownloadedChangefeed = 0;
435615

436616
const ui32 Retries;
437617
ui32 Attempt = 0;

ydb/core/tx/schemeshard/schemeshard_info_types.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2837,6 +2837,11 @@ struct TImportInfo: public TSimpleRefCount<TImportInfo> {
28372837
S3 = 0,
28382838
};
28392839

2840+
struct TChangefeedImportDescriptions {
2841+
Ydb::Table::ChangefeedDescription ChangefeedDescription;
2842+
Ydb::Topic::DescribeTopicResult Topic;
2843+
};
2844+
28402845
struct TItem {
28412846
enum class ESubState: ui8 {
28422847
AllocateTxId = 0,
@@ -2851,6 +2856,7 @@ struct TImportInfo: public TSimpleRefCount<TImportInfo> {
28512856
TMaybe<NKikimrSchemeOp::TModifyScheme> PreparedCreationQuery;
28522857
TMaybeFail<Ydb::Scheme::ModifyPermissionsRequest> Permissions;
28532858
NBackup::TMetadata Metadata;
2859+
TVector<TChangefeedImportDescriptions> Changefeeds;
28542860

28552861
EState State = EState::GetScheme;
28562862
ESubState SubState = ESubState::AllocateTxId;

ydb/core/tx/schemeshard/schemeshard_schema.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1562,6 +1562,8 @@ struct Schema : NIceDb::Schema {
15621562
struct PreparedCreationQuery : Column<14, NScheme::NTypeIds::String> {};
15631563
struct Permissions : Column<11, NScheme::NTypeIds::String> {};
15641564
struct Metadata : Column<12, NScheme::NTypeIds::String> {};
1565+
struct Changefeeds : Column<13, NScheme::NTypeIds::Json> {};
1566+
struct Topics : Column<14, NScheme::NTypeIds::Json> {};
15651567

15661568
struct State : Column<7, NScheme::NTypeIds::Byte> {};
15671569
struct WaitTxId : Column<8, NScheme::NTypeIds::Uint64> { using Type = TTxId; };

0 commit comments

Comments
 (0)