Skip to content

Replicate index tables #6937

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions ydb/core/base/path.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,12 @@ inline TVector<TString> ChildPath(const TVector<TString>& parentPath, const TStr
return path;
}

inline TVector<TString> ChildPath(const TVector<TString>& parentPath, const TVector<TString>& childPath) {
auto path = parentPath;
for (const auto& childName : childPath) {
path.push_back(childName);
}
return path;
}

}
1 change: 1 addition & 0 deletions ydb/core/tx/replication/controller/dst_alterer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class TDstAlterer: public TActorBootstrapped<TDstAlterer> {

switch (Kind) {
case TReplication::ETargetKind::Table:
case TReplication::ETargetKind::IndexTable:
tx.SetOperationType(NKikimrSchemeOp::ESchemeOpAlterTable);
PathIdFromPathId(DstPathId, tx.MutableAlterTable()->MutablePathId());
tx.MutableAlterTable()->MutableReplicationConfig()->SetMode(
Expand Down
144 changes: 117 additions & 27 deletions ydb/core/tx/replication/controller/dst_creator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
#include <ydb/core/cms/console/configs_dispatcher.h>
#include <ydb/core/protos/console_config.pb.h>
#include <ydb/core/tx/replication/ydb_proxy/ydb_proxy.h>
#include <ydb/core/tx/scheme_board/events.h>
#include <ydb/core/tx/scheme_board/subscriber.h>
#include <ydb/core/tx/scheme_cache/scheme_cache.h>
#include <ydb/core/tx/schemeshard/schemeshard.h>
#include <ydb/core/tx/tx_proxy/proxy.h>
Expand Down Expand Up @@ -116,6 +118,8 @@ class TDstCreator: public TActorBootstrapped<TDstCreator> {
.WithKeyShardBoundary(true)));
}
break;
case TReplication::ETargetKind::IndexTable:
Y_ABORT("unreachable");
}
}

Expand All @@ -128,7 +132,7 @@ class TDstCreator: public TActorBootstrapped<TDstCreator> {
}
}

NKikimrScheme::EStatus ConvertStatus(NYdb::EStatus status) {
static NKikimrScheme::EStatus ConvertStatus(NYdb::EStatus status) {
switch (status) {
case NYdb::EStatus::SUCCESS:
return NKikimrScheme::StatusSuccess;
Expand Down Expand Up @@ -165,8 +169,20 @@ class TDstCreator: public TActorBootstrapped<TDstCreator> {

Ydb::Table::CreateTableRequest scheme;
result.GetTableDescription().SerializeTo(scheme);
// Disable index support until other replicator code be ready to process index replication
scheme.mutable_indexes()->Clear();

// filter out unsupported index types
auto& indexes = *scheme.mutable_indexes();
for (auto it = indexes.begin(); it != indexes.end();) {
switch (it->type_case()) {
case Ydb::Table::TableIndex::kGlobalIndex:
case Ydb::Table::TableIndex::kGlobalUniqueIndex:
++it;
continue;
default:
it = indexes.erase(it);
break;
}
}

Ydb::StatusIds::StatusCode status;
TString error;
Expand All @@ -182,30 +198,37 @@ class TDstCreator: public TActorBootstrapped<TDstCreator> {

TxBody.SetWorkingDir(pathPair.first);

NKikimrSchemeOp::TTableDescription* tableDesc = nullptr;
NKikimrSchemeOp::TTableDescription* desc = nullptr;
if (scheme.indexes_size()) {
NeedToCheck = true;
TxBody.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateIndexedTable);
tableDesc = TxBody.MutableCreateIndexedTable()->MutableTableDescription();
TxBody.SetInternal(true);
desc = TxBody.MutableCreateIndexedTable()->MutableTableDescription();
if (!FillIndexDescription(*TxBody.MutableCreateIndexedTable(), scheme, status, error)) {
return Error(NKikimrScheme::StatusSchemeError, error);
}
} else {
TxBody.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateTable);
tableDesc = TxBody.MutableCreateTable();
desc = TxBody.MutableCreateTable();
}

Ydb::StatusIds::StatusCode dummyCode;
Y_ABORT_UNLESS(desc);
desc->SetName(pathPair.second);

if (!FillIndexDescription(*TxBody.MutableCreateIndexedTable(), scheme, dummyCode, error)) {
return Error(NKikimrScheme::StatusSchemeError, error);
FillReplicationConfig(*desc->MutableReplicationConfig());
if (scheme.indexes_size()) {
for (auto& index : *TxBody.MutableCreateIndexedTable()->MutableIndexDescription()) {
FillReplicationConfig(*index.MutableIndexImplTableDescriptions(0)->MutableReplicationConfig());
}
}

tableDesc->SetName(pathPair.second);
AllocateTxId();
}

static void FillReplicationConfig(NKikimrSchemeOp::TTableReplicationConfig& replicationConfig) {
// TODO: support other modes
auto& replicationConfig = *tableDesc->MutableReplicationConfig();
replicationConfig.SetMode(NKikimrSchemeOp::TTableReplicationConfig::REPLICATION_MODE_READ_ONLY);
replicationConfig.SetConsistency(NKikimrSchemeOp::TTableReplicationConfig::CONSISTENCY_WEAK);

AllocateTxId();
}

void AllocateTxId() {
Expand Down Expand Up @@ -257,7 +280,9 @@ class TDstCreator: public TActorBootstrapped<TDstCreator> {

switch (record.GetStatus()) {
case NKikimrScheme::StatusAccepted:
DstPathId = TPathId(SchemeShardId, record.GetPathId());
if (!NeedToCheck) {
DstPathId = TPathId(SchemeShardId, record.GetPathId());
}
Y_DEBUG_ABORT_UNLESS(TxId == record.GetTxId());
return SubscribeTx(record.GetTxId());
case NKikimrScheme::StatusMultipleModifications:
Expand Down Expand Up @@ -338,6 +363,8 @@ class TDstCreator: public TActorBootstrapped<TDstCreator> {
switch (Kind) {
case TReplication::ETargetKind::Table:
return CheckTableScheme(desc.GetTable(), error);
case TReplication::ETargetKind::IndexTable:
Y_ABORT("unreachable");
}
}

Expand Down Expand Up @@ -366,21 +393,30 @@ class TDstCreator: public TActorBootstrapped<TDstCreator> {
return false;
}

const auto& expected = TxBody.GetCreateTable();
const NKikimrSchemeOp::TIndexedTableCreationConfig* indexedDesc = nullptr;
const NKikimrSchemeOp::TTableDescription* tableDesc = nullptr;
if (TxBody.GetOperationType() == NKikimrSchemeOp::ESchemeOpCreateIndexedTable) {
indexedDesc = &TxBody.GetCreateIndexedTable();
tableDesc = &indexedDesc->GetTableDescription();
} else {
tableDesc = &TxBody.GetCreateTable();
}

Y_ABORT_UNLESS(tableDesc);

// check key
if (expected.KeyColumnNamesSize() != got.KeyColumnNamesSize()) {
if (tableDesc->KeyColumnNamesSize() != got.KeyColumnNamesSize()) {
error = TStringBuilder() << "Key columns size mismatch"
<< ": expected: " << expected.KeyColumnNamesSize()
<< ": expected: " << tableDesc->KeyColumnNamesSize()
<< ", got: " << got.KeyColumnNamesSize();
return false;
}

for (ui32 i = 0; i < expected.KeyColumnNamesSize(); ++i) {
if (expected.GetKeyColumnNames(i) != got.GetKeyColumnNames(i)) {
for (ui32 i = 0; i < tableDesc->KeyColumnNamesSize(); ++i) {
if (tableDesc->GetKeyColumnNames(i) != got.GetKeyColumnNames(i)) {
error = TStringBuilder() << "Key column name mismatch"
<< ": position: " << i
<< ", expected: " << expected.GetKeyColumnNames(i)
<< ", expected: " << tableDesc->GetKeyColumnNames(i)
<< ", got: " << got.GetKeyColumnNames(i);
return false;
}
Expand All @@ -392,14 +428,14 @@ class TDstCreator: public TActorBootstrapped<TDstCreator> {
columns.emplace(column.GetName(), column.GetType());
}

if (expected.ColumnsSize() != columns.size()) {
if (tableDesc->ColumnsSize() != columns.size()) {
error = TStringBuilder() << "Columns size mismatch"
<< ": expected: " << expected.ColumnsSize()
<< ": expected: " << tableDesc->ColumnsSize()
<< ", got: " << columns.size();
return false;
}

for (const auto& column : expected.GetColumns()) {
for (const auto& column : tableDesc->GetColumns()) {
auto it = columns.find(column.GetName());
if (it == columns.end()) {
error = TStringBuilder() << "Cannot find column"
Expand All @@ -422,14 +458,25 @@ class TDstCreator: public TActorBootstrapped<TDstCreator> {
indexes.emplace(index.GetName(), &index);
}

if (expected.TableIndexesSize() != indexes.size()) {
if (!indexedDesc) {
if (!indexes.empty()) {
error = TStringBuilder() << "Indexes size mismatch"
<< ": expected: " << 0
<< ", got: " << indexes.size();
return false;
}

return true;
}

if (indexedDesc->IndexDescriptionSize() != indexes.size()) {
error = TStringBuilder() << "Indexes size mismatch"
<< ": expected: " << expected.TableIndexesSize()
<< ": expected: " << indexedDesc->IndexDescriptionSize()
<< ", got: " << indexes.size();
return false;
}

for (const auto& index : expected.GetTableIndexes()) {
for (const auto& index : indexedDesc->GetIndexDescription()) {
auto it = indexes.find(index.GetName());
if (it == indexes.end()) {
error = TStringBuilder() << "Cannot find index"
Expand Down Expand Up @@ -487,6 +534,36 @@ class TDstCreator: public TActorBootstrapped<TDstCreator> {
return true;
}

void SubscribeDstPath() {
Subscriber = Register(CreateSchemeBoardSubscriber(SelfId(), DstPath));
Become(&TThis::StateSubscribeDstPath);
}

STATEFN(StateSubscribeDstPath) {
switch (ev->GetTypeRewrite()) {
hFunc(TSchemeBoardEvents::TEvNotifyUpdate, Handle);
default:
return StateBase(ev);
}
}

void Handle(TSchemeBoardEvents::TEvNotifyUpdate::TPtr& ev) {
LOG_T("Handle " << ev->Get()->ToString());

const auto& desc = ev->Get()->DescribeSchemeResult;
if (desc.GetStatus() != NKikimrScheme::StatusSuccess) {
return;
}

const auto& entryDesc = desc.GetPathDescription().GetSelf();
if (!entryDesc.HasCreateFinished() || !entryDesc.GetCreateFinished()) {
return;
}

DstPathId = ev->Get()->PathId;
return Success();
}

void Handle(TEvPipeCache::TEvDeliveryProblem::TPtr& ev) {
LOG_T("Handle " << ev->Get()->ToString());

Expand Down Expand Up @@ -525,6 +602,12 @@ class TDstCreator: public TActorBootstrapped<TDstCreator> {
Schedule(TDuration::Seconds(10), new TEvents::TEvWakeup);
}

void PassAway() override {
if (const auto& actorId = std::exchange(Subscriber, {})) {
Send(actorId, new TEvents::TEvPoison());
}
}

public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
return NKikimrServices::TActivity::REPLICATION_CONTROLLER_DST_CREATOR;
Expand Down Expand Up @@ -554,7 +637,13 @@ class TDstCreator: public TActorBootstrapped<TDstCreator> {
}

void Bootstrap() {
Resolve(PathId);
switch (Kind) {
case TReplication::ETargetKind::Table:
return Resolve(PathId);
case TReplication::ETargetKind::IndexTable:
// indexed table will be created along with its indexes
return SubscribeDstPath();
}
}

STATEFN(StateBase) {
Expand Down Expand Up @@ -586,6 +675,7 @@ class TDstCreator: public TActorBootstrapped<TDstCreator> {
TActorId PipeCache;
bool NeedToCheck = false;
TPathId DstPathId;
TActorId Subscriber;

}; // TDstCreator

Expand Down
62 changes: 42 additions & 20 deletions ydb/core/tx/replication/controller/dst_creator_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,15 @@ Y_UNIT_TEST_SUITE(DstCreator) {
CheckTableReplica(tableDesc, replicatedDesc);
}

void WithSyncIndex(const TString& replicatedPath) {
Y_UNIT_TEST(Basic) {
Basic("/Root/Replicated");
}

Y_UNIT_TEST(WithIntermediateDir) {
Basic("/Root/Dir/Replicated");
}

void WithIndex(const TString& replicatedPath, NKikimrSchemeOp::EIndexType indexType) {
TEnv env;
env.GetRuntime().SetLogPriority(NKikimrServices::REPLICATION_CONTROLLER, NLog::PRI_TRACE);

Expand All @@ -79,25 +87,45 @@ Y_UNIT_TEST_SUITE(DstCreator) {
const TString indexName = "index_by_value";

env.CreateTableWithIndex("/Root", *MakeTableDescription(tableDesc),
indexName, TVector<TString>{"value"}, NKikimrSchemeOp::EIndexTypeGlobal,
TVector<TString>{}, TDuration::Seconds(5000));
indexName, TVector<TString>{"value"}, indexType);
env.GetRuntime().Register(CreateDstCreator(
env.GetSender(), env.GetSchemeshardId("/Root/Table"), env.GetYdbProxy(), env.GetPathId("/Root"),
1 /* rid */, 1 /* tid */, TReplication::ETargetKind::Table, "/Root/Table", replicatedPath
));

auto ev = env.GetRuntime().GrabEdgeEvent<TEvPrivate::TEvCreateDstResult>(env.GetSender());
UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Status, NKikimrScheme::StatusSuccess);
{
auto ev = env.GetRuntime().GrabEdgeEvent<TEvPrivate::TEvCreateDstResult>(env.GetSender());
UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Status, NKikimrScheme::StatusSuccess);
}

auto desc = env.GetDescription(replicatedPath);
const auto& replicatedDesc = desc.GetPathDescription().GetTable();

CheckTableReplica(tableDesc, replicatedDesc);

switch (indexType) {
case NKikimrSchemeOp::EIndexTypeGlobal:
case NKikimrSchemeOp::EIndexTypeGlobalUnique:
UNIT_ASSERT_VALUES_EQUAL(replicatedDesc.TableIndexesSize(), 1);
break;
default:
UNIT_ASSERT_VALUES_EQUAL(replicatedDesc.TableIndexesSize(), 0);
return;
}

env.GetRuntime().Register(CreateDstCreator(
env.GetSender(), env.GetSchemeshardId("/Root/Table"), env.GetYdbProxy(), env.GetPathId("/Root"),
1 /* rid */, 2 /* tid */, TReplication::ETargetKind::IndexTable,
"/Root/Table/" + indexName + "/indexImplTable", replicatedPath + "/" + indexName + "/indexImplTable"
));
{
auto ev = env.GetRuntime().GrabEdgeEvent<TEvPrivate::TEvCreateDstResult>(env.GetSender());
UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Status, NKikimrScheme::StatusSuccess);
}

{
auto desc = env.GetDescription(replicatedPath + "/" + indexName);
UNIT_ASSERT_VALUES_EQUAL(desc.GetPathDescription().GetTableIndex().GetName(), indexName);
UNIT_ASSERT_VALUES_EQUAL(desc.GetPathDescription().GetTableIndex().GetType(), NKikimrSchemeOp::EIndexType::EIndexTypeGlobal);
UNIT_ASSERT_VALUES_EQUAL(desc.GetPathDescription().GetTableIndex().GetType(), indexType);
}

{
Expand All @@ -106,25 +134,19 @@ Y_UNIT_TEST_SUITE(DstCreator) {
const auto& indexTableDesc = desc.GetPathDescription().GetTable();
UNIT_ASSERT_VALUES_EQUAL(indexTableDesc.KeyColumnNamesSize(), 2);
}
}


Y_UNIT_TEST(Basic) {
Basic("/Root/Replicated");
}

Y_UNIT_TEST(WithIntermediateDir) {
Basic("/Root/Dir/Replicated");
}
/*
Y_UNIT_TEST(WithSyncIndex) {
WithSyncIndex("/Root/Replicated");
WithIndex("/Root/Replicated", NKikimrSchemeOp::EIndexTypeGlobal);
}

Y_UNIT_TEST(WithSyncIndexAndIntermediateDir) {
WithIndex("/Root/Dir/Replicated", NKikimrSchemeOp::EIndexTypeGlobal);
}

Y_UNIT_TEST(WithSyncIndexWithIntermediateDir) {
WithSyncIndex("/Root/Dir/Replicated");
Y_UNIT_TEST(WithAsyncIndex) {
WithIndex("/Root/Replicated", NKikimrSchemeOp::EIndexTypeGlobalAsync);
}
*/

Y_UNIT_TEST(SameOwner) {
TEnv env;
Expand Down
Loading
Loading