Skip to content

25-1: Import changefeed's consumers from s3 #15243

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 65 additions & 7 deletions ydb/core/tx/schemeshard/schemeshard_import__create.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,21 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
Send(Self->SelfId(), CreateChangefeedPropose(Self, txId, item));
}

void CreateConsumers(TImportInfo::TPtr importInfo, ui32 itemIdx, TTxId txId) {
Y_ABORT_UNLESS(itemIdx < importInfo->Items.size());
auto& item = importInfo->Items.at(itemIdx);
item.SubState = ESubState::Proposed;

LOG_I("TImport::TTxProgress: CreateConsumers propose"
<< ": info# " << importInfo->ToString()
<< ", item# " << item.ToString(itemIdx)
<< ", txId# " << txId);

Y_ABORT_UNLESS(item.WaitTxId == InvalidTxId);

Send(Self->SelfId(), CreateConsumersPropose(Self, txId, item));
}

void AllocateTxId(TImportInfo::TPtr importInfo, ui32 itemIdx) {
Y_ABORT_UNLESS(itemIdx < importInfo->Items.size());
auto& item = importInfo->Items.at(itemIdx);
Expand Down Expand Up @@ -622,6 +637,26 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
return path->LastTxId;
}

TTxId GetActiveCreateConsumerTxId(TImportInfo::TPtr importInfo, ui32 itemIdx) {
Y_ABORT_UNLESS(itemIdx < importInfo->Items.size());
const auto& item = importInfo->Items.at(itemIdx);

Y_ABORT_UNLESS(item.State == EState::CreateChangefeed);
Y_ABORT_UNLESS(item.ChangefeedState == TImportInfo::TItem::EChangefeedState::CreateConsumers);
Y_ABORT_UNLESS(item.StreamImplPathId);

if (!Self->PathsById.contains(item.StreamImplPathId)) {
return InvalidTxId;
}

auto path = Self->PathsById.at(item.StreamImplPathId);
if (path->PathState != NKikimrSchemeOp::EPathStateAlter) {
return InvalidTxId;
}

return path->LastTxId;
}

static TString MakeIndexBuildUid(TImportInfo::TPtr importInfo, ui32 itemIdx) {
Y_ABORT_UNLESS(itemIdx < importInfo->Items.size());
const auto& item = importInfo->Items.at(itemIdx);
Expand Down Expand Up @@ -816,10 +851,6 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
TTxId txId = InvalidTxId;

switch (item.State) {
case EState::CreateChangefeed:
txId = GetActiveCreateChangefeedTxId(importInfo, itemIdx);
break;

case EState::Transferring:
if (!CancelTransferring(importInfo, itemIdx)) {
txId = GetActiveRestoreTxId(importInfo, itemIdx);
Expand Down Expand Up @@ -1045,7 +1076,11 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
break;

case EState::CreateChangefeed:
CreateChangefeed(importInfo, i, txId);
if (item.ChangefeedState == TImportInfo::TItem::EChangefeedState::CreateChangefeed) {
CreateChangefeed(importInfo, i, txId);
} else {
CreateConsumers(importInfo, i, txId);
}
itemIdx = i;
break;

Expand Down Expand Up @@ -1109,11 +1144,30 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
} else if (item.State == EState::Transferring) {
txId = GetActiveRestoreTxId(importInfo, itemIdx);
} else if (item.State == EState::CreateChangefeed) {
txId = GetActiveCreateChangefeedTxId(importInfo, itemIdx);
if (item.ChangefeedState == TImportInfo::TItem::EChangefeedState::CreateChangefeed) {
txId = GetActiveCreateChangefeedTxId(importInfo, itemIdx);
} else {
txId = GetActiveCreateConsumerTxId(importInfo, itemIdx);
}

}
}

if (txId == InvalidTxId) {

if (record.GetStatus() == NKikimrScheme::StatusAlreadyExists && item.State == EState::CreateChangefeed) {
if (item.ChangefeedState == TImportInfo::TItem::EChangefeedState::CreateChangefeed) {
item.ChangefeedState = TImportInfo::TItem::EChangefeedState::CreateConsumers;
AllocateTxId(importInfo, itemIdx);
} else if (++item.NextChangefeedIdx < item.Changefeeds.GetChangefeeds().size()) {
item.ChangefeedState = TImportInfo::TItem::EChangefeedState::CreateChangefeed;
AllocateTxId(importInfo, itemIdx);
} else {
item.State = EState::Done;
}
return;
}

return CancelAndPersist(db, importInfo, itemIdx, record.GetReason(), "unhappy propose");
}

Expand Down Expand Up @@ -1290,7 +1344,11 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
break;

case EState::CreateChangefeed:
if (++item.NextChangefeedIdx < item.Changefeeds.GetChangefeeds().size()) {
if (item.ChangefeedState == TImportInfo::TItem::EChangefeedState::CreateChangefeed) {
item.ChangefeedState = TImportInfo::TItem::EChangefeedState::CreateConsumers;
AllocateTxId(importInfo, itemIdx);
} else if (++item.NextChangefeedIdx < item.Changefeeds.GetChangefeeds().size()) {
item.ChangefeedState = TImportInfo::TItem::EChangefeedState::CreateChangefeed;
AllocateTxId(importInfo, itemIdx);
} else {
item.State = EState::Done;
Expand Down
53 changes: 53 additions & 0 deletions ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -291,5 +291,58 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> CreateChangefeedPropose(
return propose;
}

THolder<TEvSchemeShard::TEvModifySchemeTransaction> CreateConsumersPropose(
TSchemeShard* ss,
TTxId txId,
TImportInfo::TItem& item
) {
Y_ABORT_UNLESS(item.NextChangefeedIdx < item.Changefeeds.GetChangefeeds().size());

const auto& importChangefeedTopic = item.Changefeeds.GetChangefeeds()[item.NextChangefeedIdx];
const auto& topic = importChangefeedTopic.GetTopic();

auto propose = MakeHolder<TEvSchemeShard::TEvModifySchemeTransaction>(ui64(txId), ss->TabletID());
auto& record = propose->Record;
auto& modifyScheme = *record.AddTransaction();
modifyScheme.SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpAlterPersQueueGroup);
auto& pqGroup = *modifyScheme.MutableAlterPersQueueGroup();

const TPath dstPath = TPath::Init(item.DstPathId, ss);
const TString changefeedPath = dstPath.PathString() + "/" + importChangefeedTopic.GetChangefeed().name();
modifyScheme.SetWorkingDir(changefeedPath);
modifyScheme.SetInternal(true);

pqGroup.SetName("streamImpl");

NKikimrSchemeOp::TDescribeOptions opts;
opts.SetReturnPartitioningInfo(false);
opts.SetReturnPartitionConfig(true);
opts.SetReturnBoundaries(true);
opts.SetReturnIndexTableBoundaries(true);
opts.SetShowPrivateTable(true);
auto describeSchemeResult = DescribePath(ss, TlsActivationContext->AsActorContext(),changefeedPath + "/streamImpl", opts);

const auto& response = describeSchemeResult->GetRecord().GetPathDescription();
item.StreamImplPathId = {response.GetSelf().GetSchemeshardId(), response.GetSelf().GetPathId()};
pqGroup.CopyFrom(response.GetPersQueueGroup());

pqGroup.ClearTotalGroupCount();
pqGroup.MutablePQTabletConfig()->ClearPartitionKeySchema();

auto* tabletConfig = pqGroup.MutablePQTabletConfig();
const auto& pqConfig = AppData()->PQConfig;

for (const auto& consumer : topic.consumers()) {
auto& addedConsumer = *tabletConfig->AddConsumers();
auto consumerName = NPersQueue::ConvertNewConsumerName(consumer.name(), pqConfig);
addedConsumer.SetName(consumerName);
if (consumer.important()) {
addedConsumer.SetImportant(true);
}
}

return propose;
}

} // NSchemeShard
} // NKikimr
6 changes: 6 additions & 0 deletions ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,5 +52,11 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> CreateChangefeedPropose(
const TImportInfo::TItem& item
);

THolder<TEvSchemeShard::TEvModifySchemeTransaction> CreateConsumersPropose(
TSchemeShard* ss,
TTxId txId,
TImportInfo::TItem& item
);

} // NSchemeShard
} // NKikimr
7 changes: 7 additions & 0 deletions ydb/core/tx/schemeshard/schemeshard_info_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -2846,6 +2846,11 @@ struct TImportInfo: public TSimpleRefCount<TImportInfo> {
Subscribed,
};

enum class EChangefeedState: ui8 {
CreateChangefeed = 0,
CreateConsumers,
};

TString DstPathName;
TPathId DstPathId;
Ydb::Table::CreateTableRequest Scheme;
Expand All @@ -2857,13 +2862,15 @@ struct TImportInfo: public TSimpleRefCount<TImportInfo> {

EState State = EState::GetScheme;
ESubState SubState = ESubState::AllocateTxId;
EChangefeedState ChangefeedState = EChangefeedState::CreateChangefeed;
TTxId WaitTxId = InvalidTxId;
TActorId SchemeGetter;
TActorId SchemeQueryExecutor;
int NextIndexIdx = 0;
int NextChangefeedIdx = 0;
TString Issue;
int ViewCreationRetries = 0;
TPathId StreamImplPathId;

TItem() = default;

Expand Down
23 changes: 23 additions & 0 deletions ydb/core/tx/schemeshard/schemeshard_path_describer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1308,6 +1308,29 @@ THolder<TEvSchemeShard::TEvDescribeSchemeResultBuilder> DescribePath(
return DescribePath(self, ctx, pathId, options);
}

THolder<TEvSchemeShard::TEvDescribeSchemeResultBuilder> DescribePath(
TSchemeShard* self,
const TActorContext& ctx,
const TString& path,
const NKikimrSchemeOp::TDescribeOptions& opts
) {
NKikimrSchemeOp::TDescribePath params;
params.SetPath(path);
params.MutableOptions()->CopyFrom(opts);

return TPathDescriber(self, std::move(params)).Describe(ctx);
}

THolder<TEvSchemeShard::TEvDescribeSchemeResultBuilder> DescribePath(
TSchemeShard* self,
const TActorContext& ctx,
const TString& path
) {
NKikimrSchemeOp::TDescribeOptions options;
options.SetShowPrivateTable(true);
return DescribePath(self, ctx, path, options);
}

void TSchemeShard::DescribeTable(
const TTableInfo& tableInfo,
const NScheme::TTypeRegistry* typeRegistry,
Expand Down
13 changes: 13 additions & 0 deletions ydb/core/tx/schemeshard/schemeshard_path_describer.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,5 +83,18 @@ THolder<TEvSchemeShard::TEvDescribeSchemeResultBuilder> DescribePath(
TPathId pathId
);

THolder<TEvSchemeShard::TEvDescribeSchemeResultBuilder> DescribePath(
TSchemeShard* self,
const TActorContext& ctx,
const TString& path,
const NKikimrSchemeOp::TDescribeOptions& opts
);

THolder<TEvSchemeShard::TEvDescribeSchemeResultBuilder> DescribePath(
TSchemeShard* self,
const TActorContext& ctx,
const TString& path
);

} // NSchemeShard
} // NKikimr
13 changes: 13 additions & 0 deletions ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -968,6 +968,19 @@ TCheckFunc RetentionPeriod(const TDuration& value) {
};
}

TCheckFunc ConsumerExist(const TString& name) {
return [=] (const NKikimrScheme::TEvDescribeSchemeResult& record) {
bool isExist = false;
for (const auto& consumer : record.GetPathDescription().GetPersQueueGroup().GetPQTabletConfig().GetConsumers()) {
if (consumer.GetName() == name) {
isExist = true;
break;
}
}
UNIT_ASSERT(isExist);
};
}

void NoChildren(const NKikimrScheme::TEvDescribeSchemeResult& record) {
ChildrenCount(0)(record);
}
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/schemeshard/ut_helpers/ls_checks.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ namespace NLs {
TCheckFunc StreamAwsRegion(const TString& value);
TCheckFunc StreamInitialScanProgress(ui32 total, ui32 completed);
TCheckFunc RetentionPeriod(const TDuration& value);
TCheckFunc ConsumerExist(const TString& name);

TCheckFunc HasBackupInFly(ui64 txId);
void NoBackupInFly(const NKikimrScheme::TEvDescribeSchemeResult& record);
Expand Down
5 changes: 4 additions & 1 deletion ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5067,7 +5067,10 @@ Y_UNIT_TEST_SUITE(TImportTests) {
{changefeedPath, GenerateTestData({EPathTypeCdcStream, changefeedDesc, std::move(attr)})},
[changefeedPath = TString(changefeedPath)](TTestBasicRuntime& runtime) {
TestDescribeResult(DescribePath(runtime, "/MyRoot/Table" + changefeedPath, false, false, true), {
NLs::PathExist
NLs::PathExist,
});
TestDescribeResult(DescribePath(runtime, "/MyRoot/Table" + changefeedPath + "/streamImpl", false, false, true), {
NLs::ConsumerExist("my_consumer")
});
}
};
Expand Down
Loading