Skip to content

Commit 5e4ed6c

Browse files
authored
Allow dst creator to create table with index (#5779)
1 parent 6564267 commit 5e4ed6c

File tree

5 files changed

+106
-22
lines changed

5 files changed

+106
-22
lines changed

ydb/core/tx/replication/controller/dst_creator.cpp

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,8 @@ class TDstCreator: public TActorBootstrapped<TDstCreator> {
168168

169169
Ydb::StatusIds::StatusCode status;
170170
TString error;
171-
if (!FillTableDescription(TxBody, scheme, TableProfiles, status, error)) {
171+
172+
if (!FillTableDescription(TxBody, scheme, TableProfiles, status, error, scheme.indexes_size())) {
172173
return Error(NKikimrScheme::StatusSchemeError, error);
173174
}
174175

@@ -177,15 +178,28 @@ class TDstCreator: public TActorBootstrapped<TDstCreator> {
177178
return Error(NKikimrScheme::StatusSchemeError, error);
178179
}
179180

180-
// TODO: support indexed tables
181-
TxBody.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateTable);
182181
TxBody.SetWorkingDir(pathPair.first);
183182

184-
auto& desc = *TxBody.MutableCreateTable();
185-
desc.SetName(pathPair.second);
183+
NKikimrSchemeOp::TTableDescription* tableDesc = nullptr;
184+
if (scheme.indexes_size()) {
185+
TxBody.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateIndexedTable);
186+
tableDesc = TxBody.MutableCreateIndexedTable()->MutableTableDescription();
187+
TxBody.SetInternal(true);
188+
} else {
189+
TxBody.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateTable);
190+
tableDesc = TxBody.MutableCreateTable();
191+
}
192+
193+
Ydb::StatusIds::StatusCode dummyCode;
194+
195+
if (!FillIndexDescription(*TxBody.MutableCreateIndexedTable(), scheme, dummyCode, error)) {
196+
return Error(NKikimrScheme::StatusSchemeError, error);
197+
}
198+
199+
tableDesc->SetName(pathPair.second);
186200

187201
// TODO: support other modes
188-
auto& replicationConfig = *desc.MutableReplicationConfig();
202+
auto& replicationConfig = *tableDesc->MutableReplicationConfig();
189203
replicationConfig.SetMode(NKikimrSchemeOp::TTableReplicationConfig::REPLICATION_MODE_READ_ONLY);
190204
replicationConfig.SetConsistency(NKikimrSchemeOp::TTableReplicationConfig::CONSISTENCY_WEAK);
191205

ydb/core/tx/replication/controller/dst_creator_ut.cpp

Lines changed: 74 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,26 @@ namespace NKikimr::NReplication::NController {
1313
Y_UNIT_TEST_SUITE(DstCreator) {
1414
using namespace NTestHelpers;
1515

16+
void CheckTableReplica(const TTestTableDescription& tableDesc, const NKikimrSchemeOp::TTableDescription& replicatedDesc) {
17+
UNIT_ASSERT_VALUES_EQUAL(replicatedDesc.KeyColumnNamesSize(), tableDesc.KeyColumns.size());
18+
for (ui32 i = 0; i < replicatedDesc.KeyColumnNamesSize(); ++i) {
19+
UNIT_ASSERT_VALUES_EQUAL(replicatedDesc.GetKeyColumnNames(i), tableDesc.KeyColumns[i]);
20+
}
21+
22+
UNIT_ASSERT_VALUES_EQUAL(replicatedDesc.ColumnsSize(), tableDesc.Columns.size());
23+
for (ui32 i = 0; i < replicatedDesc.ColumnsSize(); ++i) {
24+
auto pred = [name = replicatedDesc.GetColumns(i).GetName()](const auto& column) {
25+
return name == column.Name;
26+
};
27+
28+
UNIT_ASSERT(FindIfPtr(tableDesc.Columns, pred));
29+
}
30+
31+
const auto& replCfg = replicatedDesc.GetReplicationConfig();
32+
UNIT_ASSERT_VALUES_EQUAL(replCfg.GetMode(), NKikimrSchemeOp::TTableReplicationConfig::REPLICATION_MODE_READ_ONLY);
33+
UNIT_ASSERT_VALUES_EQUAL(replCfg.GetConsistency(), NKikimrSchemeOp::TTableReplicationConfig::CONSISTENCY_WEAK);
34+
}
35+
1636
void Basic(const TString& replicatedPath) {
1737
TEnv env;
1838
env.GetRuntime().SetLogPriority(NKikimrServices::REPLICATION_CONTROLLER, NLog::PRI_TRACE);
@@ -39,24 +59,55 @@ Y_UNIT_TEST_SUITE(DstCreator) {
3959
auto desc = env.GetDescription(replicatedPath);
4060
const auto& replicatedDesc = desc.GetPathDescription().GetTable();
4161

42-
UNIT_ASSERT_VALUES_EQUAL(replicatedDesc.KeyColumnNamesSize(), tableDesc.KeyColumns.size());
43-
for (ui32 i = 0; i < replicatedDesc.KeyColumnNamesSize(); ++i) {
44-
UNIT_ASSERT_VALUES_EQUAL(replicatedDesc.GetKeyColumnNames(i), tableDesc.KeyColumns[i]);
45-
}
62+
CheckTableReplica(tableDesc, replicatedDesc);
63+
}
4664

47-
UNIT_ASSERT_VALUES_EQUAL(replicatedDesc.ColumnsSize(), tableDesc.Columns.size());
48-
for (ui32 i = 0; i < replicatedDesc.ColumnsSize(); ++i) {
49-
auto pred = [name = replicatedDesc.GetColumns(i).GetName()](const auto& column) {
50-
return name == column.Name;
51-
};
65+
void WithSyncIndex(const TString& replicatedPath) {
66+
TEnv env;
67+
env.GetRuntime().SetLogPriority(NKikimrServices::REPLICATION_CONTROLLER, NLog::PRI_TRACE);
5268

53-
UNIT_ASSERT(FindIfPtr(tableDesc.Columns, pred));
69+
const auto tableDesc = TTestTableDescription{
70+
.Name = "Table",
71+
.KeyColumns = {"key"},
72+
.Columns = {
73+
{.Name = "key", .Type = "Uint32"},
74+
{.Name = "value", .Type = "Uint32"},
75+
},
76+
.ReplicationConfig = Nothing(),
77+
};
78+
79+
const TString indexName = "index_by_value";
80+
81+
env.CreateTableWithIndex("/Root", *MakeTableDescription(tableDesc),
82+
indexName, TVector<TString>{"value"}, NKikimrSchemeOp::EIndexTypeGlobal,
83+
TVector<TString>{}, TDuration::Seconds(5000));
84+
env.GetRuntime().Register(CreateDstCreator(
85+
env.GetSender(), env.GetSchemeshardId("/Root/Table"), env.GetYdbProxy(), env.GetPathId("/Root"),
86+
1 /* rid */, 1 /* tid */, TReplication::ETargetKind::Table, "/Root/Table", replicatedPath
87+
));
88+
89+
auto ev = env.GetRuntime().GrabEdgeEvent<TEvPrivate::TEvCreateDstResult>(env.GetSender());
90+
UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Status, NKikimrScheme::StatusSuccess);
91+
92+
auto desc = env.GetDescription(replicatedPath);
93+
const auto& replicatedDesc = desc.GetPathDescription().GetTable();
94+
95+
CheckTableReplica(tableDesc, replicatedDesc);
96+
97+
{
98+
auto desc = env.GetDescription(replicatedPath + "/" + indexName);
99+
UNIT_ASSERT_VALUES_EQUAL(desc.GetPathDescription().GetTableIndex().GetName(), indexName);
100+
UNIT_ASSERT_VALUES_EQUAL(desc.GetPathDescription().GetTableIndex().GetType(), NKikimrSchemeOp::EIndexType::EIndexTypeGlobal);
54101
}
55102

56-
const auto& replCfg = replicatedDesc.GetReplicationConfig();
57-
UNIT_ASSERT_VALUES_EQUAL(replCfg.GetMode(), NKikimrSchemeOp::TTableReplicationConfig::REPLICATION_MODE_READ_ONLY);
58-
UNIT_ASSERT_VALUES_EQUAL(replCfg.GetConsistency(), NKikimrSchemeOp::TTableReplicationConfig::CONSISTENCY_WEAK);
59-
}
103+
{
104+
auto desc = env.GetDescription(replicatedPath + "/" + indexName + "/indexImplTable");
105+
Cerr << desc.DebugString() << Endl;
106+
const auto& indexTableDesc = desc.GetPathDescription().GetTable();
107+
UNIT_ASSERT_VALUES_EQUAL(indexTableDesc.KeyColumnNamesSize(), 2);
108+
}
109+
}
110+
60111

61112
Y_UNIT_TEST(Basic) {
62113
Basic("/Root/Replicated");
@@ -66,6 +117,15 @@ Y_UNIT_TEST_SUITE(DstCreator) {
66117
Basic("/Root/Dir/Replicated");
67118
}
68119

120+
Y_UNIT_TEST(WithSyncIndex) {
121+
WithSyncIndex("/Root/Replicated");
122+
}
123+
124+
Y_UNIT_TEST(WithSyncIndexWithIntermediateDir) {
125+
WithSyncIndex("/Root/Dir/Replicated");
126+
}
127+
128+
69129
Y_UNIT_TEST(SameOwner) {
70130
TEnv env;
71131
env.GetRuntime().SetLogPriority(NKikimrServices::REPLICATION_CONTROLLER, NLog::PRI_TRACE);

ydb/core/tx/replication/ut_helpers/test_env.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,11 @@ class TEnv {
156156
return Client.CreateTable(std::forward<Args>(args)...);
157157
}
158158

159+
template <typename... Args>
160+
auto CreateTableWithIndex(Args&&... args) {
161+
return Client.CreateTableWithUniformShardedIndex(std::forward<Args>(args)...);
162+
}
163+
159164
void SendAsync(const TActorId& recipient, IEventBase* ev) {
160165
Server.GetRuntime()->Send(new IEventHandle(recipient, Sender, ev));
161166
}

ydb/core/tx/schemeshard/schemeshard__operation_create_index.cpp

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ class TCreateTableIndex: public TSubOperation {
105105

106106
const auto acceptExisted = !Transaction.GetFailOnExist();
107107
const auto& tableIndexCreation = Transaction.GetCreateTableIndex();
108+
const bool internal = Transaction.HasInternal() && Transaction.GetInternal();
108109

109110
const TString& parentPathStr = Transaction.GetWorkingDir();
110111
const TString& name = tableIndexCreation.GetName();
@@ -138,8 +139,11 @@ class TCreateTableIndex: public TSubOperation {
138139
.NotDeleted()
139140
.NotUnderDeleting()
140141
.IsCommonSensePath()
141-
.IsTable()
142-
.NotAsyncReplicaTable();
142+
.IsTable();
143+
144+
if (!internal) {
145+
checks.NotAsyncReplicaTable();
146+
}
143147

144148
if (tableIndexCreation.GetState() == NKikimrSchemeOp::EIndexState::EIndexStateReady) {
145149
checks

ydb/core/tx/schemeshard/schemeshard__operation_create_indexed_table.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,7 @@ TVector<ISubOperation::TPtr> CreateIndexedTable(TOperationId nextId, const TTxTr
223223
NKikimrSchemeOp::EOperationType::ESchemeOpCreateTableIndex);
224224
scheme.SetFailOnExist(tx.GetFailOnExist());
225225
scheme.SetAllowCreateInTempDir(tx.GetAllowCreateInTempDir());
226+
scheme.SetInternal(tx.GetInternal());
226227

227228
scheme.MutableCreateTableIndex()->CopyFrom(indexDescription);
228229
if (!indexDescription.HasType()) {

0 commit comments

Comments
 (0)