Skip to content

Import changefeed's configuration from s3 #13943

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
73 commits
Select commit Hold shift + click to select a range
c4827ff
start
stanislav-shchetinin Jan 28, 2025
e9ee178
CreateChangefeedsPropose
stanislav-shchetinin Jan 28, 2025
ad58c18
CreateChangefeedsPropose
stanislav-shchetinin Jan 29, 2025
02791a1
list obj
stanislav-shchetinin Jan 29, 2025
953651e
import_scheme_getter is complere
stanislav-shchetinin Jan 30, 2025
5cfaac1
fix log
stanislav-shchetinin Jan 30, 2025
3844535
added topics
stanislav-shchetinin Jan 30, 2025
c371c98
fix name
stanislav-shchetinin Jan 30, 2025
078c578
try fix build
stanislav-shchetinin Jan 30, 2025
d7a0c4b
fix logs
stanislav-shchetinin Jan 30, 2025
4a7297a
fix review 1
stanislav-shchetinin Jan 31, 2025
7b3c4dc
schemeshard schema
stanislav-shchetinin Jan 31, 2025
58659d2
more reqs
stanislav-shchetinin Feb 3, 2025
82c4acb
requests one after another
stanislav-shchetinin Feb 3, 2025
1cb0dca
fix after review
stanislav-shchetinin Feb 3, 2025
9a1595d
fix build
stanislav-shchetinin Feb 3, 2025
f742d92
new state in python test
stanislav-shchetinin Feb 4, 2025
6920f43
update schema
stanislav-shchetinin Feb 4, 2025
957ee2b
feature flag added
stanislav-shchetinin Feb 4, 2025
dc96c31
cli & feature flag
stanislav-shchetinin Feb 4, 2025
33f672e
Merge branch 'main' into import-s3-changefeeds
stanislav-shchetinin Feb 4, 2025
2a447f5
removed cancel
stanislav-shchetinin Feb 4, 2025
c3d6c22
Update ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.cpp
stanislav-shchetinin Feb 4, 2025
8435331
ui32 & replace
stanislav-shchetinin Feb 4, 2025
c6e4156
fix review
stanislav-shchetinin Feb 4, 2025
1c0e4b8
fix build
stanislav-shchetinin Feb 4, 2025
a54b586
SetTableName
stanislav-shchetinin Feb 5, 2025
b671aa7
removed extra spaces
stanislav-shchetinin Feb 5, 2025
cd27fff
removed convert to TVector
stanislav-shchetinin Feb 5, 2025
ad292e9
check tests after reply in ListObjects
stanislav-shchetinin Feb 5, 2025
69fe837
removed extra
stanislav-shchetinin Feb 6, 2025
872ae51
fix
stanislav-shchetinin Feb 7, 2025
af768cd
working version
stanislav-shchetinin Feb 7, 2025
c1bf58e
feature flag
stanislav-shchetinin Feb 7, 2025
7feb38d
set table name
stanislav-shchetinin Feb 7, 2025
cab1c25
clean up
stanislav-shchetinin Feb 7, 2025
07b7834
clean up
stanislav-shchetinin Feb 7, 2025
3eddacf
Merge branch 'main' into import-s3-changefeeds
stanislav-shchetinin Feb 7, 2025
9f61dde
fix review
stanislav-shchetinin Feb 10, 2025
9ce2215
removed set flag
stanislav-shchetinin Feb 10, 2025
86c19bc
add
stanislav-shchetinin Feb 10, 2025
90e0c8c
fix
stanislav-shchetinin Feb 10, 2025
1e205ac
removed
stanislav-shchetinin Feb 10, 2025
8749ea1
Update ydb/core/tx/schemeshard/schemeshard_import_scheme_getter.cpp
stanislav-shchetinin Feb 10, 2025
39e7aac
removed test
stanislav-shchetinin Feb 10, 2025
a943093
list objects in mock
stanislav-shchetinin Feb 11, 2025
e4ac3c1
py tests
stanislav-shchetinin Feb 11, 2025
b5b42b7
py3 test
stanislav-shchetinin Feb 12, 2025
1aaf7bc
clean up s3_mock
stanislav-shchetinin Feb 12, 2025
e3b6729
prepare tests
stanislav-shchetinin Feb 12, 2025
f524e30
try/except in py
stanislav-shchetinin Feb 13, 2025
8c66604
test removed
stanislav-shchetinin Feb 13, 2025
6c58e40
fix build ut_restore
stanislav-shchetinin Feb 13, 2025
f777f6f
try test
stanislav-shchetinin Feb 13, 2025
2684d4a
tests
stanislav-shchetinin Feb 14, 2025
2bc46f5
removed extra
stanislav-shchetinin Feb 14, 2025
7dbb241
removed cerr
stanislav-shchetinin Feb 14, 2025
bbf1f4f
tests with permissions
stanislav-shchetinin Feb 14, 2025
163c209
new place for tests
stanislav-shchetinin Feb 14, 2025
5a4f833
Merge branch 'main' into import-s3-changefeeds
stanislav-shchetinin Feb 14, 2025
e411748
fix logs
stanislav-shchetinin Feb 17, 2025
0dfa84a
Merge branch 'import-s3-changefeeds' of https://github.com/stanislav-…
stanislav-shchetinin Feb 17, 2025
296df70
Merge branch 'import-s3-changefeeds' of https://github.com/stanislav-…
stanislav-shchetinin Feb 17, 2025
3c79503
Merge branch 'import-s3-changefeeds' of https://github.com/stanislav-…
stanislav-shchetinin Feb 17, 2025
c3cb505
test with reboots
stanislav-shchetinin Feb 17, 2025
d87b5f6
fix reboot
stanislav-shchetinin Feb 18, 2025
52f83b7
removed cancel create changefeed
stanislav-shchetinin Feb 18, 2025
e999a7e
reboot cancel test
stanislav-shchetinin Feb 18, 2025
974aa47
Merge branch 'main' into import-s3-changefeeds
stanislav-shchetinin Feb 18, 2025
c448cc8
Apply suggestions from code review
stanislav-shchetinin Feb 18, 2025
9f7a019
review
stanislav-shchetinin Feb 18, 2025
54d021f
review tests
stanislav-shchetinin Feb 18, 2025
f3ca9e4
ut_restore fix
stanislav-shchetinin Feb 18, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ydb/core/protos/feature_flags.proto
Original file line number Diff line number Diff line change
Expand Up @@ -193,4 +193,5 @@ message TFeatureFlags {
// deny non-administrators the privilege of administering local users and groups
optional bool EnableStrictUserManagement = 168 [default = false];
optional bool EnableDatabaseAdmin = 169 [default = false];
optional bool EnableChangefeedsImport = 170 [default = false];
}
9 changes: 9 additions & 0 deletions ydb/core/protos/flat_scheme_op.proto
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import "ydb/library/mkql_proto/protos/minikql.proto";
import "ydb/public/api/protos/ydb_coordination.proto";
import "ydb/public/api/protos/ydb_export.proto";
import "ydb/public/api/protos/ydb_table.proto";
import "ydb/public/api/protos/ydb_topic.proto";
import "ydb/public/api/protos/ydb_value.proto";

import "google/protobuf/empty.proto";
Expand Down Expand Up @@ -2124,3 +2125,11 @@ message TBackupBackupCollection {
optional NKikimrProto.TPathID PathId = 2;
optional string TargetDir = 3; // must be set on Rewrite
}

message TImportTableChangefeeds {
message TImportChangefeedTopic {
optional Ydb.Table.ChangefeedDescription Changefeed = 1;
optional Ydb.Topic.DescribeTopicResult Topic = 2;
}
repeated TImportChangefeedTopic Changefeeds = 1;
}
5 changes: 5 additions & 0 deletions ydb/core/tx/schemeshard/schemeshard__init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4464,9 +4464,14 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
item.Metadata = NBackup::TMetadata::Deserialize(rowset.GetValue<Schema::ImportItems::Metadata>());
}

if (rowset.HaveValue<Schema::ImportItems::Changefeeds>()) {
item.Changefeeds = rowset.GetValue<Schema::ImportItems::Changefeeds>();
}

item.State = static_cast<TImportInfo::EState>(rowset.GetValue<Schema::ImportItems::State>());
item.WaitTxId = rowset.GetValueOrDefault<Schema::ImportItems::WaitTxId>(InvalidTxId);
item.NextIndexIdx = rowset.GetValueOrDefault<Schema::ImportItems::NextIndexIdx>(0);
item.NextChangefeedIdx = rowset.GetValueOrDefault<Schema::ImportItems::NextChangefeedIdx>(0);
item.Issue = rowset.GetValueOrDefault<Schema::ImportItems::Issue>(TString());

if (item.WaitTxId != InvalidTxId) {
Expand Down
8 changes: 8 additions & 0 deletions ydb/core/tx/schemeshard/schemeshard_import.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ void TSchemeShard::FromXxportInfo(NKikimrImport::TImport& import, const TImportI
case TImportInfo::EState::BuildIndexes:
import.SetProgress(Ydb::Import::ImportProgress::PROGRESS_BUILD_INDEXES);
break;
case TImportInfo::EState::CreateChangefeed:
import.SetProgress(Ydb::Import::ImportProgress::PROGRESS_CREATE_CHANGEFEEDS);
break;
case TImportInfo::EState::Done:
import.SetProgress(Ydb::Import::ImportProgress::PROGRESS_DONE);
break;
Expand Down Expand Up @@ -163,6 +166,7 @@ void TSchemeShard::PersistImportItemState(NIceDb::TNiceDb& db, const TImportInfo
NIceDb::TUpdate<Schema::ImportItems::State>(static_cast<ui8>(item.State)),
NIceDb::TUpdate<Schema::ImportItems::WaitTxId>(item.WaitTxId),
NIceDb::TUpdate<Schema::ImportItems::NextIndexIdx>(item.NextIndexIdx),
NIceDb::TUpdate<Schema::ImportItems::NextChangefeedIdx>(item.NextChangefeedIdx),
NIceDb::TUpdate<Schema::ImportItems::Issue>(item.Issue)
);
}
Expand All @@ -189,6 +193,10 @@ void TSchemeShard::PersistImportItemScheme(NIceDb::TNiceDb& db, const TImportInf
db.Table<Schema::ImportItems>().Key(importInfo->Id, itemIdx).Update(
NIceDb::TUpdate<Schema::ImportItems::Metadata>(item.Metadata.Serialize())
);

db.Table<Schema::ImportItems>().Key(importInfo->Id, itemIdx).Update(
NIceDb::TUpdate<Schema::ImportItems::Changefeeds>(item.Changefeeds)
);
}

void TSchemeShard::PersistImportItemPreparedCreationQuery(NIceDb::TNiceDb& db, const TImportInfo::TPtr importInfo, ui32 itemIdx) {
Expand Down
62 changes: 62 additions & 0 deletions ydb/core/tx/schemeshard/schemeshard_import__create.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,21 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
return true;
}

void CreateChangefeed(TImportInfo::TPtr importInfo, ui32 itemIdx, TTxId txId) {
Y_ABORT_UNLESS(itemIdx < importInfo->Items.size());
auto& item = importInfo->Items.at(itemIdx);
item.SubState = ESubState::Proposed;

LOG_I("TImport::TTxProgress: CreateChangefeed propose"
<< ": info# " << importInfo->ToString()
<< ", item# " << item.ToString(itemIdx)
<< ", txId# " << txId);

Y_ABORT_UNLESS(item.WaitTxId == InvalidTxId);

Send(Self->SelfId(), CreateChangefeedPropose(Self, txId, item));
}

void AllocateTxId(TImportInfo::TPtr importInfo, ui32 itemIdx) {
Y_ABORT_UNLESS(itemIdx < importInfo->Items.size());
auto& item = importInfo->Items.at(itemIdx);
Expand Down Expand Up @@ -588,6 +603,25 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
return TTxId(ui64((*infoPtr)->Id));
}

TTxId GetActiveCreateChangefeedTxId(TImportInfo::TPtr importInfo, ui32 itemIdx) {
Y_ABORT_UNLESS(itemIdx < importInfo->Items.size());
const auto& item = importInfo->Items.at(itemIdx);

Y_ABORT_UNLESS(item.State == EState::CreateChangefeed);
Y_ABORT_UNLESS(item.DstPathId);

if (!Self->PathsById.contains(item.DstPathId)) {
return InvalidTxId;
}

auto path = Self->PathsById.at(item.DstPathId);
if (path->PathState != NKikimrSchemeOp::EPathStateAlter) {
return InvalidTxId;
}

return path->LastTxId;
Comment on lines +618 to +622
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Выглядит не очень надежно, между схемными транзакциями импорта же могут влезть другие схемные операции.

}

static TString MakeIndexBuildUid(TImportInfo::TPtr importInfo, ui32 itemIdx) {
Y_ABORT_UNLESS(itemIdx < importInfo->Items.size());
const auto& item = importInfo->Items.at(itemIdx);
Expand Down Expand Up @@ -756,6 +790,7 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
case EState::CreateSchemeObject:
case EState::Transferring:
case EState::BuildIndexes:
case EState::CreateChangefeed:
if (item.WaitTxId == InvalidTxId) {
if (!IsCreatedByQuery(item) || item.PreparedCreationQuery) {
AllocateTxId(importInfo, itemIdx);
Expand All @@ -781,6 +816,10 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
TTxId txId = InvalidTxId;

switch (item.State) {
case EState::CreateChangefeed:
txId = GetActiveCreateChangefeedTxId(importInfo, itemIdx);
break;

case EState::Transferring:
if (!CancelTransferring(importInfo, itemIdx)) {
txId = GetActiveRestoreTxId(importInfo, itemIdx);
Expand Down Expand Up @@ -1004,6 +1043,11 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
BuildIndex(importInfo, i, txId);
itemIdx = i;
break;

case EState::CreateChangefeed:
CreateChangefeed(importInfo, i, txId);
itemIdx = i;
break;

default:
break;
Expand Down Expand Up @@ -1064,6 +1108,8 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
txId = TTxId(record.GetPathCreateTxId());
} else if (item.State == EState::Transferring) {
txId = GetActiveRestoreTxId(importInfo, itemIdx);
} else if (item.State == EState::CreateChangefeed) {
txId = GetActiveCreateChangefeedTxId(importInfo, itemIdx);
}
}

Expand Down Expand Up @@ -1216,6 +1262,10 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
if (item.NextIndexIdx < item.Scheme.indexes_size()) {
item.State = EState::BuildIndexes;
AllocateTxId(importInfo, itemIdx);
} else if (item.NextChangefeedIdx < item.Changefeeds.changefeeds_size() &&
AppData()->FeatureFlags.GetEnableChangefeedsImport()) {
item.State = EState::CreateChangefeed;
AllocateTxId(importInfo, itemIdx);
} else {
item.State = EState::Done;
}
Expand All @@ -1229,11 +1279,23 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
} else {
if (++item.NextIndexIdx < item.Scheme.indexes_size()) {
AllocateTxId(importInfo, itemIdx);
} else if (item.NextChangefeedIdx < item.Changefeeds.changefeeds_size() &&
AppData()->FeatureFlags.GetEnableChangefeedsImport()) {
item.State = EState::CreateChangefeed;
AllocateTxId(importInfo, itemIdx);
} else {
item.State = EState::Done;
}
}
break;

case EState::CreateChangefeed:
if (++item.NextChangefeedIdx < item.Changefeeds.GetChangefeeds().size()) {
AllocateTxId(importInfo, itemIdx);
} else {
item.State = EState::Done;
}
break;

default:
return SendNotificationsIfFinished(importInfo);
Expand Down
60 changes: 60 additions & 0 deletions ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -231,5 +231,65 @@ THolder<TEvIndexBuilder::TEvCancelRequest> CancelIndexBuildPropose(
return MakeHolder<TEvIndexBuilder::TEvCancelRequest>(ui64(indexBuildId), domainPath.PathString(), ui64(indexBuildId));
}

THolder<TEvSchemeShard::TEvModifySchemeTransaction> CreateChangefeedPropose(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

В идеале, тут нужно переиспользовать код вместе с

modifyScheme->SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpCreateCdcStream);
auto op = modifyScheme->MutableCreateCdcStream();
op->SetTableName(name);
if (add.has_retention_period()) {
op->SetRetentionPeriodSeconds(add.retention_period().seconds());
}
if (add.has_topic_partitioning_settings()) {
i64 minActivePartitions =
add.topic_partitioning_settings().min_active_partitions();
if (minActivePartitions < 0) {
code = Ydb::StatusIds::BAD_REQUEST;
error = "Topic partitions count must be positive";
return false;
} else if (minActivePartitions == 0) {
minActivePartitions = 1;
}
op->SetTopicPartitions(minActivePartitions);
if (add.topic_partitioning_settings().has_auto_partitioning_settings()) {
auto& partitioningSettings = add.topic_partitioning_settings().auto_partitioning_settings();
op->SetTopicAutoPartitioning(partitioningSettings.strategy() != ::Ydb::Topic::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_DISABLED);
i64 maxActivePartitions =
add.topic_partitioning_settings().max_active_partitions();
if (maxActivePartitions < 0) {
code = Ydb::StatusIds::BAD_REQUEST;
error = "Topic max active partitions count must be positive";
return false;
} else if (maxActivePartitions == 0) {
maxActivePartitions = 50;
}
op->SetMaxPartitionCount(maxActivePartitions);
}
}
if (!FillChangefeedDescription(*op->MutableStreamDescription(), add, code, error)) {
return false;
}
. Можно отдельным pr.

TSchemeShard* ss,
TTxId txId,
const TImportInfo::TItem& item
) {
Y_ABORT_UNLESS(item.NextChangefeedIdx < item.Changefeeds.GetChangefeeds().size());

const auto& importChangefeedTopic = item.Changefeeds.GetChangefeeds()[item.NextChangefeedIdx];
const auto& changefeed = importChangefeedTopic.GetChangefeed();
const auto& topic = importChangefeedTopic.GetTopic();

auto propose = MakeHolder<TEvSchemeShard::TEvModifySchemeTransaction>(ui64(txId), ss->TabletID());
auto& record = propose->Record;
auto& modifyScheme = *record.AddTransaction();
modifyScheme.SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpCreateCdcStream);
auto& cdcStream = *modifyScheme.MutableCreateCdcStream();

const TPath dstPath = TPath::Init(item.DstPathId, ss);
modifyScheme.SetWorkingDir(dstPath.Parent().PathString());
cdcStream.SetTableName(dstPath.LeafName());

TString error;
Ydb::StatusIds::StatusCode status;

auto& cdcStreamDescription = *cdcStream.MutableStreamDescription();
if (!FillChangefeedDescription(cdcStreamDescription, changefeed, status, error)) {
return nullptr;
}

if (topic.has_retention_period()) {
cdcStream.SetRetentionPeriodSeconds(topic.retention_period().seconds());
}

if (topic.has_partitioning_settings()) {
i64 minActivePartitions =
topic.partitioning_settings().min_active_partitions();
if (minActivePartitions < 0) {
return nullptr;
} else if (minActivePartitions == 0) {
minActivePartitions = 1;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

А почему так? Это для совместимости с чем-то?

}
cdcStream.SetTopicPartitions(minActivePartitions);

if (topic.partitioning_settings().has_auto_partitioning_settings()) {
auto& partitioningSettings = topic.partitioning_settings().auto_partitioning_settings();
cdcStream.SetTopicAutoPartitioning(partitioningSettings.strategy() != ::Ydb::Topic::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_DISABLED);

i64 maxActivePartitions =
topic.partitioning_settings().max_active_partitions();
if (maxActivePartitions < 0) {
return nullptr;
} else if (maxActivePartitions == 0) {
maxActivePartitions = 50;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

А почему так? Это для совместимости с чем-то?

}
cdcStream.SetMaxPartitionCount(maxActivePartitions);
}
}
return propose;
}

} // NSchemeShard
} // NKikimr
6 changes: 6 additions & 0 deletions ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,5 +46,11 @@ THolder<TEvIndexBuilder::TEvCancelRequest> CancelIndexBuildPropose(
TTxId indexBuildId
);

THolder<TEvSchemeShard::TEvModifySchemeTransaction> CreateChangefeedPropose(
TSchemeShard* ss,
TTxId txId,
const TImportInfo::TItem& item
);

} // NSchemeShard
} // NKikimr
Loading
Loading