Skip to content

Commit 54b1989

Browse files
authored
Merge 2aaf5f4 into f95d5a6
2 parents f95d5a6 + 2aaf5f4 commit 54b1989

File tree

6 files changed

+107
-8
lines changed

6 files changed

+107
-8
lines changed

ydb/core/tx/replication/controller/dst_creator.cpp

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,8 @@ class TDstCreator: public TActorBootstrapped<TDstCreator> {
168168

169169
Ydb::StatusIds::StatusCode status;
170170
TString error;
171-
if (!FillTableDescription(TxBody, scheme, TableProfiles, status, error)) {
171+
172+
if (!FillTableDescription(TxBody, scheme, TableProfiles, status, error, scheme.indexes_size())) {
172173
return Error(NKikimrScheme::StatusSchemeError, error);
173174
}
174175

@@ -177,15 +178,28 @@ class TDstCreator: public TActorBootstrapped<TDstCreator> {
177178
return Error(NKikimrScheme::StatusSchemeError, error);
178179
}
179180

180-
// TODO: support indexed tables
181-
TxBody.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateTable);
182181
TxBody.SetWorkingDir(pathPair.first);
183182

184-
auto& desc = *TxBody.MutableCreateTable();
185-
desc.SetName(pathPair.second);
183+
NKikimrSchemeOp::TTableDescription* tableDesc = nullptr;
184+
if (scheme.indexes_size()) {
185+
TxBody.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateIndexedTable);
186+
tableDesc = TxBody.MutableCreateIndexedTable()->MutableTableDescription();
187+
TxBody.SetInternal(true);
188+
} else {
189+
TxBody.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateTable);
190+
tableDesc = TxBody.MutableCreateTable();
191+
}
192+
193+
Ydb::StatusIds::StatusCode dummyCode;
194+
195+
if (!FillIndexDescription(*TxBody.MutableCreateIndexedTable(), scheme, dummyCode, error)) {
196+
return Error(NKikimrScheme::StatusSchemeError, error);
197+
}
198+
199+
tableDesc->SetName(pathPair.second);
186200

187201
// TODO: support other modes
188-
auto& replicationConfig = *desc.MutableReplicationConfig();
202+
auto& replicationConfig = *tableDesc->MutableReplicationConfig();
189203
replicationConfig.SetMode(NKikimrSchemeOp::TTableReplicationConfig::REPLICATION_MODE_READ_ONLY);
190204
replicationConfig.SetConsistency(NKikimrSchemeOp::TTableReplicationConfig::CONSISTENCY_WEAK);
191205

ydb/core/tx/replication/controller/dst_creator_ut.cpp

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,71 @@ Y_UNIT_TEST_SUITE(DstCreator) {
5858
UNIT_ASSERT_VALUES_EQUAL(replCfg.GetConsistency(), NKikimrSchemeOp::TTableReplicationConfig::CONSISTENCY_WEAK);
5959
}
6060

61+
void WithSyncIndex(const TString& replicatedPath) {
62+
TEnv env;
63+
env.GetRuntime().SetLogPriority(NKikimrServices::REPLICATION_CONTROLLER, NLog::PRI_TRACE);
64+
65+
const auto tableDesc = TTestTableDescription{
66+
.Name = "Table",
67+
.KeyColumns = {"key"},
68+
.Columns = {
69+
{.Name = "key", .Type = "Uint32"},
70+
{.Name = "value", .Type = "Uint32"},
71+
},
72+
.ReplicationConfig = Nothing(),
73+
};
74+
75+
const TString indexName = "index_by_value";
76+
77+
env.CreateTableWithIndex("/Root", *MakeTableDescription(tableDesc),
78+
indexName, TVector<TString>{"value"}, NKikimrSchemeOp::EIndexTypeGlobal,
79+
TVector<TString>{}, TDuration::Seconds(5000));
80+
env.GetRuntime().Register(CreateDstCreator(
81+
env.GetSender(), env.GetSchemeshardId("/Root/Table"), env.GetYdbProxy(), env.GetPathId("/Root"),
82+
1 /* rid */, 1 /* tid */, TReplication::ETargetKind::Table, "/Root/Table", replicatedPath
83+
));
84+
85+
auto ev = env.GetRuntime().GrabEdgeEvent<TEvPrivate::TEvCreateDstResult>(env.GetSender());
86+
UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Status, NKikimrScheme::StatusSuccess);
87+
88+
{
89+
auto desc = env.GetDescription(replicatedPath);
90+
const auto& replicatedDesc = desc.GetPathDescription().GetTable();
91+
92+
UNIT_ASSERT_VALUES_EQUAL(replicatedDesc.KeyColumnNamesSize(), tableDesc.KeyColumns.size());
93+
for (ui32 i = 0; i < replicatedDesc.KeyColumnNamesSize(); ++i) {
94+
UNIT_ASSERT_VALUES_EQUAL(replicatedDesc.GetKeyColumnNames(i), tableDesc.KeyColumns[i]);
95+
}
96+
97+
UNIT_ASSERT_VALUES_EQUAL(replicatedDesc.ColumnsSize(), tableDesc.Columns.size());
98+
for (ui32 i = 0; i < replicatedDesc.ColumnsSize(); ++i) {
99+
auto pred = [name = replicatedDesc.GetColumns(i).GetName()](const auto& column) {
100+
return name == column.Name;
101+
};
102+
103+
UNIT_ASSERT(FindIfPtr(tableDesc.Columns, pred));
104+
}
105+
106+
const auto& replCfg = replicatedDesc.GetReplicationConfig();
107+
UNIT_ASSERT_VALUES_EQUAL(replCfg.GetMode(), NKikimrSchemeOp::TTableReplicationConfig::REPLICATION_MODE_READ_ONLY);
108+
UNIT_ASSERT_VALUES_EQUAL(replCfg.GetConsistency(), NKikimrSchemeOp::TTableReplicationConfig::CONSISTENCY_WEAK);
109+
}
110+
111+
{
112+
auto desc = env.GetDescription(replicatedPath + "/" + indexName);
113+
UNIT_ASSERT_VALUES_EQUAL(desc.GetPathDescription().GetTableIndex().GetName(), indexName);
114+
UNIT_ASSERT_VALUES_EQUAL(desc.GetPathDescription().GetTableIndex().GetType(), NKikimrSchemeOp::EIndexType::EIndexTypeGlobal);
115+
}
116+
117+
{
118+
auto desc = env.GetDescription(replicatedPath + "/" + indexName + "/indexImplTable");
119+
Cerr << desc.DebugString() << Endl;
120+
const auto& indexTableDesc = desc.GetPathDescription().GetTable();
121+
UNIT_ASSERT_VALUES_EQUAL(indexTableDesc.KeyColumnNamesSize(), 2);
122+
}
123+
}
124+
125+
61126
Y_UNIT_TEST(Basic) {
62127
Basic("/Root/Replicated");
63128
}
@@ -66,6 +131,15 @@ Y_UNIT_TEST_SUITE(DstCreator) {
66131
Basic("/Root/Dir/Replicated");
67132
}
68133

134+
Y_UNIT_TEST(WithSyncIndex) {
135+
WithSyncIndex("/Root/Replicated");
136+
}
137+
138+
Y_UNIT_TEST(WithSyncIndexWithIntermediateDir) {
139+
WithSyncIndex("/Root/Dir/Replicated");
140+
}
141+
142+
69143
Y_UNIT_TEST(SameOwner) {
70144
TEnv env;
71145
env.GetRuntime().SetLogPriority(NKikimrServices::REPLICATION_CONTROLLER, NLog::PRI_TRACE);

ydb/core/tx/replication/ut_helpers/test_env.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,11 @@ class TEnv {
156156
return Client.CreateTable(std::forward<Args>(args)...);
157157
}
158158

159+
template <typename... Args>
160+
auto CreateTableWithIndex(Args&&... args) {
161+
return Client.CreateTableWithUniformShardedIndex(std::forward<Args>(args)...);
162+
}
163+
159164
void SendAsync(const TActorId& recipient, IEventBase* ev) {
160165
Server.GetRuntime()->Send(new IEventHandle(recipient, Sender, ev));
161166
}

ydb/core/tx/replication/ut_helpers/test_table.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ struct TTestTableDescription {
4343
TString Name;
4444
TVector<TString> KeyColumns;
4545
TVector<TColumn> Columns;
46+
TVector<std::pair<TString, TVector<TString>>> SyncSecondaryIndexes;
4647
TMaybe<TReplicationConfig> ReplicationConfig = TReplicationConfig::Default();
4748
TMaybe<ui32> UniformPartitions = Nothing();
4849

ydb/core/tx/schemeshard/schemeshard__operation_create_index.cpp

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ class TCreateTableIndex: public TSubOperation {
105105

106106
const auto acceptExisted = !Transaction.GetFailOnExist();
107107
const auto& tableIndexCreation = Transaction.GetCreateTableIndex();
108+
const bool internal = Transaction.HasInternal() && Transaction.GetInternal();
108109

109110
const TString& parentPathStr = Transaction.GetWorkingDir();
110111
const TString& name = tableIndexCreation.GetName();
@@ -138,8 +139,11 @@ class TCreateTableIndex: public TSubOperation {
138139
.NotDeleted()
139140
.NotUnderDeleting()
140141
.IsCommonSensePath()
141-
.IsTable()
142-
.NotAsyncReplicaTable();
142+
.IsTable();
143+
144+
if (!internal) {
145+
checks.NotAsyncReplicaTable();
146+
}
143147

144148
if (tableIndexCreation.GetState() == NKikimrSchemeOp::EIndexState::EIndexStateReady) {
145149
checks

ydb/core/tx/schemeshard/schemeshard__operation_create_indexed_table.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,7 @@ TVector<ISubOperation::TPtr> CreateIndexedTable(TOperationId nextId, const TTxTr
217217
tx.GetWorkingDir() + "/" + baseTableDescription.GetName(),
218218
NKikimrSchemeOp::EOperationType::ESchemeOpCreateTableIndex);
219219
scheme.SetFailOnExist(tx.GetFailOnExist());
220+
scheme.SetInternal(tx.GetInternal());
220221

221222
scheme.MutableCreateTableIndex()->CopyFrom(indexDescription);
222223
if (!indexDescription.HasType()) {

0 commit comments

Comments
 (0)