Skip to content

Commit fcff2bc

Browse files
committed
Allow to add changefeed to index table via AlterTable (ydb-platform#6883)
1 parent c3cd8a4 commit fcff2bc

File tree

5 files changed

+117
-13
lines changed

5 files changed

+117
-13
lines changed

ydb/core/grpc_services/rpc_alter_table.cpp

Lines changed: 69 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -109,12 +109,12 @@ class TAlterTableRPC : public TRpcSchemeRequestActor<TAlterTableRPC, TEvAlterTab
109109
break;
110110

111111
case EOp::Attribute:
112-
PrepareAlterUserAttrubutes();
112+
case EOp::AddChangefeed:
113+
case EOp::DropChangefeed:
114+
GetProxyServices();
113115
break;
114116

115-
case EOp::AddChangefeed:
116117
case EOp::DropIndex:
117-
case EOp::DropChangefeed:
118118
case EOp::RenameIndex:
119119
AlterTable(ctx);
120120
break;
@@ -197,7 +197,7 @@ class TAlterTableRPC : public TRpcSchemeRequestActor<TAlterTableRPC, TEvAlterTab
197197
Navigate(msg->Services.SchemeCache, ctx);
198198
}
199199

200-
void PrepareAlterUserAttrubutes() {
200+
void GetProxyServices() {
201201
using namespace NTxProxy;
202202
Send(MakeTxProxyID(), new TEvTxUserProxy::TEvGetProxyServicesRequest);
203203
}
@@ -222,13 +222,38 @@ class TAlterTableRPC : public TRpcSchemeRequestActor<TAlterTableRPC, TEvAlterTab
222222
auto ev = CreateNavigateForPath(DatabaseName);
223223
{
224224
auto& entry = static_cast<TEvTxProxySchemeCache::TEvNavigateKeySet*>(ev)->Request->ResultSet.emplace_back();
225-
entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpTable;
226225
entry.Path = paths;
227226
}
228227

229228
Send(schemeCache, ev);
230229
}
231230

231+
void Navigate(const TTableId& pathId, const TActorContext& ctx) {
232+
DatabaseName = Request_->GetDatabaseName()
233+
.GetOrElse(DatabaseFromDomain(AppData()));
234+
235+
auto ev = CreateNavigateForPath(DatabaseName);
236+
{
237+
auto& entry = static_cast<TEvTxProxySchemeCache::TEvNavigateKeySet*>(ev)->Request->ResultSet.emplace_back();
238+
entry.RequestType = NSchemeCache::TSchemeCacheNavigate::TEntry::ERequestType::ByTableId;
239+
entry.TableId = pathId;
240+
entry.ShowPrivatePath = true;
241+
entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpList;
242+
}
243+
244+
Send(MakeSchemeCacheID(), ev);
245+
}
246+
247+
static bool IsChangefeedOperation(EOp type) {
248+
switch (type) {
249+
case EOp::AddChangefeed:
250+
case EOp::DropChangefeed:
251+
return true;
252+
default:
253+
return false;
254+
}
255+
}
256+
232257
void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx) {
233258
TXLOG_D("Handle TEvTxProxySchemeCache::TEvNavigateKeySetResult"
234259
<< ", errors# " << ev->Get()->Request.Get()->ErrorCount);
@@ -251,13 +276,48 @@ class TAlterTableRPC : public TRpcSchemeRequestActor<TAlterTableRPC, TEvAlterTab
251276
return Reply(Ydb::StatusIds::SCHEME_ERROR, ctx);
252277
}
253278

279+
Y_ABORT_UNLESS(!resp->ResultSet.empty());
280+
const auto& entry = resp->ResultSet.back();
281+
282+
switch (entry.Kind) {
283+
case NSchemeCache::TSchemeCacheNavigate::KindTable:
284+
case NSchemeCache::TSchemeCacheNavigate::KindColumnTable:
285+
case NSchemeCache::TSchemeCacheNavigate::KindExternalTable:
286+
case NSchemeCache::TSchemeCacheNavigate::KindExternalDataSource:
287+
case NSchemeCache::TSchemeCacheNavigate::KindView:
288+
break; // table
289+
case NSchemeCache::TSchemeCacheNavigate::KindIndex:
290+
if (IsChangefeedOperation(OpType)) {
291+
break;
292+
}
293+
[[fallthrough]];
294+
default:
295+
Request_->RaiseIssue(MakeIssue(NKikimrIssues::TIssuesIds::GENERIC_RESOLVE_ERROR, TStringBuilder()
296+
<< "Unable to nagivate: " << JoinPath(entry.Path) << " status: PathNotTable"));
297+
return Reply(Ydb::StatusIds::SCHEME_ERROR, ctx);
298+
}
299+
254300
switch (OpType) {
255301
case EOp::AddIndex:
256302
return AlterTableAddIndexOp(resp, ctx);
257303
case EOp::Attribute:
258-
Y_ABORT_UNLESS(!resp->ResultSet.empty());
259304
ResolvedPathId = resp->ResultSet.back().TableId.PathId;
260305
return AlterTable(ctx);
306+
case EOp::AddChangefeed:
307+
case EOp::DropChangefeed:
308+
if (entry.Kind != NSchemeCache::TSchemeCacheNavigate::KindIndex) {
309+
AlterTable(ctx);
310+
} else if (auto list = entry.ListNodeEntry) {
311+
if (list->Children.size() != 1) {
312+
return Reply(Ydb::StatusIds::SCHEME_ERROR, ctx);
313+
}
314+
315+
const auto& child = list->Children.at(0);
316+
AlterTable(ctx, CanonizePath(ChildPath(NKikimr::SplitPath(GetProtoRequest()->path()), child.Name)));
317+
} else {
318+
Navigate(entry.TableId, ctx);
319+
}
320+
break;
261321
default:
262322
TXLOG_E("Got unexpected cache response");
263323
return Reply(Ydb::StatusIds::INTERNAL_ERROR, ctx);
@@ -351,13 +411,14 @@ class TAlterTableRPC : public TRpcSchemeRequestActor<TAlterTableRPC, TEvAlterTab
351411
Die(ctx);
352412
}
353413

354-
void AlterTable(const TActorContext &ctx) {
414+
void AlterTable(const TActorContext &ctx, const TMaybe<TString>& overridePath = {}) {
355415
const auto req = GetProtoRequest();
356416
std::unique_ptr<TEvTxUserProxy::TEvProposeTransaction> proposeRequest = CreateProposeTransaction();
357417
auto modifyScheme = proposeRequest->Record.MutableTransaction()->MutableModifyScheme();
418+
modifyScheme->SetAllowAccessToPrivatePaths(overridePath.Defined());
358419
Ydb::StatusIds::StatusCode code;
359420
TString error;
360-
if (!BuildAlterTableModifyScheme(req, modifyScheme, Profiles, ResolvedPathId, code, error)) {
421+
if (!BuildAlterTableModifyScheme(overridePath.GetOrElse(req->path()), req, modifyScheme, Profiles, ResolvedPathId, code, error)) {
361422
NYql::TIssues issues;
362423
issues.AddIssue(NYql::TIssue(error));
363424
return Reply(code, issues, ctx);

ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3929,6 +3929,42 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
39293929
}
39303930
}
39313931

3932+
Y_UNIT_TEST(ChangefeedOnIndexTable) {
3933+
TKikimrRunner kikimr(TKikimrSettings().SetPQConfig(DefaultPQConfig()));
3934+
auto db = kikimr.GetTableClient();
3935+
auto session = db.CreateSession().GetValueSync().GetSession();
3936+
3937+
{
3938+
auto query = R"(
3939+
--!syntax_v1
3940+
CREATE TABLE `/Root/table` (
3941+
Key Uint64,
3942+
Value String,
3943+
PRIMARY KEY (Key),
3944+
INDEX SyncIndex GLOBAL SYNC ON (`Value`),
3945+
INDEX AsyncIndex GLOBAL ASYNC ON (`Value`)
3946+
);
3947+
)";
3948+
3949+
auto result = session.ExecuteSchemeQuery(query).GetValueSync();
3950+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
3951+
}
3952+
3953+
const auto changefeed = TChangefeedDescription("feed", EChangefeedMode::KeysOnly, EChangefeedFormat::Json);
3954+
{
3955+
auto result = session.AlterTable("/Root/table/AsyncIndex", TAlterTableSettings()
3956+
.AppendAddChangefeeds(changefeed)
3957+
).ExtractValueSync();
3958+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::PRECONDITION_FAILED, result.GetIssues().ToString());
3959+
}
3960+
{
3961+
auto result = session.AlterTable("/Root/table/SyncIndex", TAlterTableSettings()
3962+
.AppendAddChangefeeds(changefeed)
3963+
).ExtractValueSync();
3964+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
3965+
}
3966+
}
3967+
39323968
Y_UNIT_TEST(CreatedAt) {
39333969
TKikimrRunner kikimr(TKikimrSettings().SetPQConfig(DefaultPQConfig()));
39343970
auto scheme = NYdb::NScheme::TSchemeClient(kikimr.GetDriver(), TCommonClientSettings().Database("/Root"));

ydb/core/tx/scheme_board/cache.cpp

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -897,8 +897,6 @@ class TSchemeCache: public TMonitorableActor<TSchemeCache> {
897897
default:
898898
return false;
899899
}
900-
case NKikimrSchemeOp::EPathTypeTableIndex:
901-
return true;
902900
default:
903901
return false;
904902
}

ydb/core/ydb_convert/table_description.cpp

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,8 @@ bool BuildAlterTableAddIndexRequest(const Ydb::Table::AlterTableRequest* req, NK
163163
return true;
164164
}
165165

166-
bool BuildAlterTableModifyScheme(const Ydb::Table::AlterTableRequest* req, NKikimrSchemeOp::TModifyScheme* modifyScheme, const TTableProfiles& profiles,
166+
bool BuildAlterTableModifyScheme(const TString& path, const Ydb::Table::AlterTableRequest* req,
167+
NKikimrSchemeOp::TModifyScheme* modifyScheme, const TTableProfiles& profiles,
167168
const TPathId& resolvedPathId,
168169
Ydb::StatusIds::StatusCode& code, TString& error)
169170
{
@@ -184,7 +185,7 @@ bool BuildAlterTableModifyScheme(const Ydb::Table::AlterTableRequest* req, NKiki
184185
const auto OpType = *ops.begin();
185186

186187
try {
187-
pathPair = SplitPathIntoWorkingDirAndName(req->path());
188+
pathPair = SplitPathIntoWorkingDirAndName(path);
188189
} catch (const std::exception&) {
189190
code = Ydb::StatusIds::BAD_REQUEST;
190191
return false;
@@ -227,7 +228,7 @@ bool BuildAlterTableModifyScheme(const Ydb::Table::AlterTableRequest* req, NKiki
227228
for(const auto& rename: req->rename_indexes()) {
228229
modifyScheme->SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpMoveIndex);
229230
auto& alter = *modifyScheme->MutableMoveIndex();
230-
alter.SetTablePath(req->path());
231+
alter.SetTablePath(path);
231232
alter.SetSrcPath(rename.source_name());
232233
alter.SetDstPath(rename.destination_name());
233234
alter.SetAllowOverwrite(rename.replace_destination());
@@ -352,6 +353,11 @@ bool BuildAlterTableModifyScheme(const Ydb::Table::AlterTableRequest* req, NKiki
352353
return true;
353354
}
354355

356+
bool BuildAlterTableModifyScheme(const Ydb::Table::AlterTableRequest* req, NKikimrSchemeOp::TModifyScheme* modifyScheme,
357+
const TTableProfiles& profiles, const TPathId& resolvedPathId, Ydb::StatusIds::StatusCode& code, TString& error)
358+
{
359+
return BuildAlterTableModifyScheme(req->path(), req, modifyScheme, profiles, resolvedPathId, code, error);
360+
}
355361

356362
template <typename TColumn>
357363
static Ydb::Type* AddColumn(Ydb::Table::ColumnMeta* newColumn, const TColumn& column) {

ydb/core/ydb_convert/table_description.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@ struct TPathId;
3030

3131

3232
THashSet<EAlterOperationKind> GetAlterOperationKinds(const Ydb::Table::AlterTableRequest* req);
33+
bool BuildAlterTableModifyScheme(const TString& path, const Ydb::Table::AlterTableRequest* req, NKikimrSchemeOp::TModifyScheme* modifyScheme,
34+
const TTableProfiles& profiles, const TPathId& resolvedPathId,
35+
Ydb::StatusIds::StatusCode& status, TString& error);
3336
bool BuildAlterTableModifyScheme(const Ydb::Table::AlterTableRequest* req, NKikimrSchemeOp::TModifyScheme* modifyScheme,
3437
const TTableProfiles& profiles, const TPathId& resolvedPathId,
3538
Ydb::StatusIds::StatusCode& status, TString& error);

0 commit comments

Comments
 (0)