Skip to content

Commit 2aaf5f4

Browse files
committed
Allow dst cretor to create table with index
1 parent eed9f4d commit 2aaf5f4

File tree

6 files changed

+107
-8
lines changed

6 files changed

+107
-8
lines changed

ydb/core/tx/replication/controller/dst_creator.cpp

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,8 @@ class TDstCreator: public TActorBootstrapped<TDstCreator> {
167167

168168
Ydb::StatusIds::StatusCode status;
169169
TString error;
170-
if (!FillTableDescription(TxBody, scheme, TableProfiles, status, error)) {
170+
171+
if (!FillTableDescription(TxBody, scheme, TableProfiles, status, error, scheme.indexes_size())) {
171172
return Error(NKikimrScheme::StatusSchemeError, error);
172173
}
173174

@@ -176,15 +177,28 @@ class TDstCreator: public TActorBootstrapped<TDstCreator> {
176177
return Error(NKikimrScheme::StatusSchemeError, error);
177178
}
178179

179-
// TODO: support indexed tables
180-
TxBody.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateTable);
181180
TxBody.SetWorkingDir(pathPair.first);
182181

183-
auto& desc = *TxBody.MutableCreateTable();
184-
desc.SetName(pathPair.second);
182+
NKikimrSchemeOp::TTableDescription* tableDesc = nullptr;
183+
if (scheme.indexes_size()) {
184+
TxBody.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateIndexedTable);
185+
tableDesc = TxBody.MutableCreateIndexedTable()->MutableTableDescription();
186+
TxBody.SetInternal(true);
187+
} else {
188+
TxBody.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateTable);
189+
tableDesc = TxBody.MutableCreateTable();
190+
}
191+
192+
Ydb::StatusIds::StatusCode dummyCode;
193+
194+
if (!FillIndexDescription(*TxBody.MutableCreateIndexedTable(), scheme, dummyCode, error)) {
195+
return Error(NKikimrScheme::StatusSchemeError, error);
196+
}
197+
198+
tableDesc->SetName(pathPair.second);
185199

186200
// TODO: support other modes
187-
auto& replicationConfig = *desc.MutableReplicationConfig();
201+
auto& replicationConfig = *tableDesc->MutableReplicationConfig();
188202
replicationConfig.SetMode(NKikimrSchemeOp::TTableReplicationConfig::REPLICATION_MODE_READ_ONLY);
189203
replicationConfig.SetConsistency(NKikimrSchemeOp::TTableReplicationConfig::CONSISTENCY_WEAK);
190204

ydb/core/tx/replication/controller/dst_creator_ut.cpp

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,71 @@ Y_UNIT_TEST_SUITE(DstCreator) {
5858
UNIT_ASSERT_VALUES_EQUAL(replCfg.GetConsistency(), NKikimrSchemeOp::TTableReplicationConfig::CONSISTENCY_WEAK);
5959
}
6060

61+
void WithSyncIndex(const TString& replicatedPath) {
62+
TEnv env;
63+
env.GetRuntime().SetLogPriority(NKikimrServices::REPLICATION_CONTROLLER, NLog::PRI_TRACE);
64+
65+
const auto tableDesc = TTestTableDescription{
66+
.Name = "Table",
67+
.KeyColumns = {"key"},
68+
.Columns = {
69+
{.Name = "key", .Type = "Uint32"},
70+
{.Name = "value", .Type = "Uint32"},
71+
},
72+
.ReplicationConfig = Nothing(),
73+
};
74+
75+
const TString indexName = "index_by_value";
76+
77+
env.CreateTableWithIndex("/Root", *MakeTableDescription(tableDesc),
78+
indexName, TVector<TString>{"value"}, NKikimrSchemeOp::EIndexTypeGlobal,
79+
TVector<TString>{}, TDuration::Seconds(5000));
80+
env.GetRuntime().Register(CreateDstCreator(
81+
env.GetSender(), env.GetSchemeshardId("/Root/Table"), env.GetYdbProxy(), env.GetPathId("/Root"),
82+
1 /* rid */, 1 /* tid */, TReplication::ETargetKind::Table, "/Root/Table", replicatedPath
83+
));
84+
85+
auto ev = env.GetRuntime().GrabEdgeEvent<TEvPrivate::TEvCreateDstResult>(env.GetSender());
86+
UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Status, NKikimrScheme::StatusSuccess);
87+
88+
{
89+
auto desc = env.GetDescription(replicatedPath);
90+
const auto& replicatedDesc = desc.GetPathDescription().GetTable();
91+
92+
UNIT_ASSERT_VALUES_EQUAL(replicatedDesc.KeyColumnNamesSize(), tableDesc.KeyColumns.size());
93+
for (ui32 i = 0; i < replicatedDesc.KeyColumnNamesSize(); ++i) {
94+
UNIT_ASSERT_VALUES_EQUAL(replicatedDesc.GetKeyColumnNames(i), tableDesc.KeyColumns[i]);
95+
}
96+
97+
UNIT_ASSERT_VALUES_EQUAL(replicatedDesc.ColumnsSize(), tableDesc.Columns.size());
98+
for (ui32 i = 0; i < replicatedDesc.ColumnsSize(); ++i) {
99+
auto pred = [name = replicatedDesc.GetColumns(i).GetName()](const auto& column) {
100+
return name == column.Name;
101+
};
102+
103+
UNIT_ASSERT(FindIfPtr(tableDesc.Columns, pred));
104+
}
105+
106+
const auto& replCfg = replicatedDesc.GetReplicationConfig();
107+
UNIT_ASSERT_VALUES_EQUAL(replCfg.GetMode(), NKikimrSchemeOp::TTableReplicationConfig::REPLICATION_MODE_READ_ONLY);
108+
UNIT_ASSERT_VALUES_EQUAL(replCfg.GetConsistency(), NKikimrSchemeOp::TTableReplicationConfig::CONSISTENCY_WEAK);
109+
}
110+
111+
{
112+
auto desc = env.GetDescription(replicatedPath + "/" + indexName);
113+
UNIT_ASSERT_VALUES_EQUAL(desc.GetPathDescription().GetTableIndex().GetName(), indexName);
114+
UNIT_ASSERT_VALUES_EQUAL(desc.GetPathDescription().GetTableIndex().GetType(), NKikimrSchemeOp::EIndexType::EIndexTypeGlobal);
115+
}
116+
117+
{
118+
auto desc = env.GetDescription(replicatedPath + "/" + indexName + "/indexImplTable");
119+
Cerr << desc.DebugString() << Endl;
120+
const auto& indexTableDesc = desc.GetPathDescription().GetTable();
121+
UNIT_ASSERT_VALUES_EQUAL(indexTableDesc.KeyColumnNamesSize(), 2);
122+
}
123+
}
124+
125+
61126
Y_UNIT_TEST(Basic) {
62127
Basic("/Root/Replicated");
63128
}
@@ -66,6 +131,15 @@ Y_UNIT_TEST_SUITE(DstCreator) {
66131
Basic("/Root/Dir/Replicated");
67132
}
68133

134+
Y_UNIT_TEST(WithSyncIndex) {
135+
WithSyncIndex("/Root/Replicated");
136+
}
137+
138+
Y_UNIT_TEST(WithSyncIndexWithIntermediateDir) {
139+
WithSyncIndex("/Root/Dir/Replicated");
140+
}
141+
142+
69143
Y_UNIT_TEST(SameOwner) {
70144
TEnv env;
71145
env.GetRuntime().SetLogPriority(NKikimrServices::REPLICATION_CONTROLLER, NLog::PRI_TRACE);

ydb/core/tx/replication/ut_helpers/test_env.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,11 @@ class TEnv {
143143
return Client.CreateTable(std::forward<Args>(args)...);
144144
}
145145

146+
template <typename... Args>
147+
auto CreateTableWithIndex(Args&&... args) {
148+
return Client.CreateTableWithUniformShardedIndex(std::forward<Args>(args)...);
149+
}
150+
146151
void SendAsync(const TActorId& recipient, IEventBase* ev) {
147152
Server.GetRuntime()->Send(new IEventHandle(recipient, Sender, ev));
148153
}

ydb/core/tx/replication/ut_helpers/test_table.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ struct TTestTableDescription {
4343
TString Name;
4444
TVector<TString> KeyColumns;
4545
TVector<TColumn> Columns;
46+
TVector<std::pair<TString, TVector<TString>>> SyncSecondaryIndexes;
4647
TMaybe<TReplicationConfig> ReplicationConfig = TReplicationConfig::Default();
4748

4849
void SerializeTo(NKikimrSchemeOp::TTableDescription& proto) const;

ydb/core/tx/schemeshard/schemeshard__operation_create_index.cpp

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ class TCreateTableIndex: public TSubOperation {
105105

106106
const auto acceptExisted = !Transaction.GetFailOnExist();
107107
const auto& tableIndexCreation = Transaction.GetCreateTableIndex();
108+
const bool internal = Transaction.HasInternal() && Transaction.GetInternal();
108109

109110
const TString& parentPathStr = Transaction.GetWorkingDir();
110111
const TString& name = tableIndexCreation.GetName();
@@ -138,8 +139,11 @@ class TCreateTableIndex: public TSubOperation {
138139
.NotDeleted()
139140
.NotUnderDeleting()
140141
.IsCommonSensePath()
141-
.IsTable()
142-
.NotAsyncReplicaTable();
142+
.IsTable();
143+
144+
if (!internal) {
145+
checks.NotAsyncReplicaTable();
146+
}
143147

144148
if (tableIndexCreation.GetState() == NKikimrSchemeOp::EIndexState::EIndexStateReady) {
145149
checks

ydb/core/tx/schemeshard/schemeshard__operation_create_indexed_table.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,7 @@ TVector<ISubOperation::TPtr> CreateIndexedTable(TOperationId nextId, const TTxTr
217217
tx.GetWorkingDir() + "/" + baseTableDescription.GetName(),
218218
NKikimrSchemeOp::EOperationType::ESchemeOpCreateTableIndex);
219219
scheme.SetFailOnExist(tx.GetFailOnExist());
220+
scheme.SetInternal(tx.GetInternal());
220221

221222
scheme.MutableCreateTableIndex()->CopyFrom(indexDescription);
222223
if (!indexDescription.HasType()) {

0 commit comments

Comments
 (0)