Skip to content

Commit b353499

Browse files
committed
EnableChangefeedsOnIndexTables feature flag (#7085)
1 parent 9079b56 commit b353499

File tree

11 files changed

+61
-15
lines changed

11 files changed

+61
-15
lines changed

ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4076,7 +4076,9 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
40764076
}
40774077

40784078
Y_UNIT_TEST(ChangefeedOnIndexTable) {
4079-
TKikimrRunner kikimr(TKikimrSettings().SetPQConfig(DefaultPQConfig()));
4079+
TKikimrRunner kikimr(TKikimrSettings()
4080+
.SetPQConfig(DefaultPQConfig())
4081+
.SetEnableChangefeedsOnIndexTables(true));
40804082
auto db = kikimr.GetTableClient();
40814083
auto session = db.CreateSession().GetValueSync().GetSession();
40824084

ydb/core/protos/feature_flags.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,4 +145,5 @@ message TFeatureFlags {
145145
optional bool EnableColumnStatistics = 130 [default = false];
146146
optional bool EnableSingleCompositeActionGroup = 131 [default = false];
147147
optional bool EnableResourcePoolsOnServerless = 132 [default = false];
148+
optional bool EnableChangefeedsOnIndexTables = 134 [default = false];
148149
}

ydb/core/testlib/basics/feature_flags.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ class TTestFeatureFlagsHolder {
6161
FEATURE_FLAG_SETTER(EnableCMSRequestPriorities)
6262
FEATURE_FLAG_SETTER(EnableTableDatetime64)
6363
FEATURE_FLAG_SETTER(EnableResourcePools)
64+
FEATURE_FLAG_SETTER(EnableChangefeedsOnIndexTables)
6465

6566
#undef FEATURE_FLAG_SETTER
6667
};

ydb/core/tx/replication/controller/dst_creator_ut.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ Y_UNIT_TEST_SUITE(DstCreator) {
7171
}
7272

7373
void WithIndex(const TString& replicatedPath, NKikimrSchemeOp::EIndexType indexType) {
74-
TEnv env;
74+
TEnv env(TFeatureFlags().SetEnableChangefeedsOnIndexTables(true));
7575
env.GetRuntime().SetLogPriority(NKikimrServices::REPLICATION_CONTROLLER, NLog::PRI_TRACE);
7676

7777
const auto tableDesc = TTestTableDescription{

ydb/core/tx/replication/service/table_writer_ut.cpp

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,7 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) {
3939
}
4040

4141
Y_UNIT_TEST(SupportedTypes) {
42-
auto featureFlags = TFeatureFlags();
43-
featureFlags.SetEnableTableDatetime64(true);
44-
45-
TEnv env(featureFlags);
42+
TEnv env(TFeatureFlags().SetEnableTableDatetime64(true));
4643
env.GetRuntime().SetLogPriority(NKikimrServices::REPLICATION_SERVICE, NLog::PRI_DEBUG);
4744

4845
env.CreateTable("/Root", *MakeTableDescription(TTestTableDescription{

ydb/core/tx/replication/ut_helpers/test_env.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,9 @@
1010

1111
namespace NKikimr::NReplication::NTestHelpers {
1212

13+
class TFeatureFlags: public TTestFeatureFlagsHolder<TFeatureFlags> {
14+
};
15+
1316
template <bool UseDatabase = true>
1417
class TEnv {
1518
static constexpr char DomainName[] = "Root";
@@ -63,7 +66,7 @@ class TEnv {
6366
TEnv(const TFeatureFlags& featureFlags, bool init = true)
6467
: Settings(Tests::TServerSettings(PortManager.GetPort(), {}, MakePqConfig())
6568
.SetDomainName(DomainName)
66-
.SetFeatureFlags(featureFlags)
69+
.SetFeatureFlags(featureFlags.FeatureFlags)
6770
)
6871
, Server(Settings)
6972
, Client(Settings)

ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -803,8 +803,15 @@ ISubOperation::TPtr RejectOnTablePathChecks(const TOperationId& opId, const TPat
803803
if (checks) {
804804
if (!tablePath.IsInsideTableIndexPath()) {
805805
checks.IsCommonSensePath();
806-
} else if (!tablePath.Parent().IsTableIndex(NKikimrSchemeOp::EIndexTypeGlobal)) {
807-
return CreateReject(opId, NKikimrScheme::StatusPreconditionFailed, "Cannot add changefeed to index table");
806+
} else {
807+
if (!tablePath.Parent().IsTableIndex(NKikimrSchemeOp::EIndexTypeGlobal)) {
808+
return CreateReject(opId, NKikimrScheme::StatusPreconditionFailed,
809+
"Cannot add changefeed to index table");
810+
}
811+
if (!AppData()->FeatureFlags.GetEnableChangefeedsOnIndexTables()) {
812+
return CreateReject(opId, NKikimrScheme::StatusPreconditionFailed,
813+
"Changefeed on index table is not supported yet");
814+
}
808815
}
809816
}
810817

ydb/core/tx/schemeshard/ut_cdc_stream/ut_cdc_stream.cpp

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1200,9 +1200,38 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) {
12001200
}
12011201
}
12021202

1203+
Y_UNIT_TEST(StreamOnIndexTableNegative) {
1204+
TTestBasicRuntime runtime;
1205+
TTestEnv env(runtime, TTestEnvOptions().EnableChangefeedsOnIndexTables(false));
1206+
ui64 txId = 100;
1207+
1208+
TestCreateIndexedTable(runtime, ++txId, "/MyRoot", R"(
1209+
TableDescription {
1210+
Name: "Table"
1211+
Columns { Name: "key" Type: "Uint64" }
1212+
Columns { Name: "indexed" Type: "Uint64" }
1213+
KeyColumnNames: ["key"]
1214+
}
1215+
IndexDescription {
1216+
Name: "Index"
1217+
KeyColumnNames: ["indexed"]
1218+
}
1219+
)");
1220+
env.TestWaitNotification(runtime, txId);
1221+
1222+
TestCreateCdcStream(runtime, ++txId, "/MyRoot/Table/Index", R"(
1223+
TableName: "indexImplTable"
1224+
StreamDescription {
1225+
Name: "Stream"
1226+
Mode: ECdcStreamModeKeysOnly
1227+
Format: ECdcStreamFormatProto
1228+
}
1229+
)", {NKikimrScheme::StatusPreconditionFailed});
1230+
}
1231+
12031232
Y_UNIT_TEST(StreamOnIndexTable) {
12041233
TTestBasicRuntime runtime;
1205-
TTestEnv env(runtime, TTestEnvOptions().EnableProtoSourceIdInfo(true));
1234+
TTestEnv env(runtime, TTestEnvOptions().EnableChangefeedsOnIndexTables(true));
12061235
ui64 txId = 100;
12071236

12081237
TestCreateIndexedTable(runtime, ++txId, "/MyRoot", R"(
@@ -1298,7 +1327,7 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) {
12981327

12991328
Y_UNIT_TEST(StreamOnBuildingIndexTable) {
13001329
TTestBasicRuntime runtime;
1301-
TTestEnv env(runtime, TTestEnvOptions().EnableProtoSourceIdInfo(true));
1330+
TTestEnv env(runtime, TTestEnvOptions().EnableChangefeedsOnIndexTables(true));
13021331
ui64 txId = 100;
13031332

13041333
TestCreateTable(runtime, ++txId, "/MyRoot", R"(
@@ -1350,7 +1379,7 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) {
13501379

13511380
Y_UNIT_TEST(DropIndexWithStream) {
13521381
TTestBasicRuntime runtime;
1353-
TTestEnv env(runtime, TTestEnvOptions().EnableProtoSourceIdInfo(true));
1382+
TTestEnv env(runtime, TTestEnvOptions().EnableChangefeedsOnIndexTables(true));
13541383
ui64 txId = 100;
13551384

13561385
TestCreateIndexedTable(runtime, ++txId, "/MyRoot", R"(
@@ -1393,7 +1422,7 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) {
13931422

13941423
Y_UNIT_TEST(DropTableWithIndexWithStream) {
13951424
TTestBasicRuntime runtime;
1396-
TTestEnv env(runtime, TTestEnvOptions().EnableProtoSourceIdInfo(true));
1425+
TTestEnv env(runtime, TTestEnvOptions().EnableChangefeedsOnIndexTables(true));
13971426
ui64 txId = 100;
13981427

13991428
TestCreateIndexedTable(runtime, ++txId, "/MyRoot", R"(

ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,9 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) {
1010
template <typename T>
1111
void CreateStream(const TMaybe<NKikimrSchemeOp::ECdcStreamState>& state = Nothing(), bool vt = false, bool onIndex = false) {
1212
T t;
13-
t.GetTestEnvOptions().EnableChangefeedInitialScan(true);
13+
t.GetTestEnvOptions()
14+
.EnableChangefeedInitialScan(true)
15+
.EnableChangefeedsOnIndexTables(true);
1416

1517
t.Run([&](TTestActorRuntime& runtime, bool& activeZone) {
1618
{
@@ -290,7 +292,9 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) {
290292
template <typename T>
291293
void DropStream(const TMaybe<NKikimrSchemeOp::ECdcStreamState>& state = Nothing(), bool onIndex = false) {
292294
T t;
293-
t.GetTestEnvOptions().EnableChangefeedInitialScan(true);
295+
t.GetTestEnvOptions()
296+
.EnableChangefeedInitialScan(true)
297+
.EnableChangefeedsOnIndexTables(true);
294298

295299
t.Run([&](TTestActorRuntime& runtime, bool& activeZone) {
296300
const TString path = !onIndex ? "/MyRoot" : "/MyRoot/Table/Index";

ydb/core/tx/schemeshard/ut_helpers/test_env.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -541,6 +541,7 @@ NSchemeShardUT_Private::TTestEnv::TTestEnv(TTestActorRuntime& runtime, const TTe
541541
app.SetEnableServerlessExclusiveDynamicNodes(opts.EnableServerlessExclusiveDynamicNodes_);
542542
app.SetEnableAddColumsWithDefaults(opts.EnableAddColumsWithDefaults_);
543543
app.SetEnableReplaceIfExistsForExternalEntities(opts.EnableReplaceIfExistsForExternalEntities_);
544+
app.SetEnableChangefeedsOnIndexTables(opts.EnableChangefeedsOnIndexTables_);
544545

545546
app.ColumnShardConfig.SetDisabledOnSchemeShard(false);
546547

ydb/core/tx/schemeshard/ut_helpers/test_env.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ namespace NSchemeShardUT_Private {
6565
OPTION(std::optional<bool>, EnableAddColumsWithDefaults, std::nullopt);
6666
OPTION(std::optional<bool>, EnableReplaceIfExistsForExternalEntities, std::nullopt);
6767
OPTION(std::optional<TString>, GraphBackendType, std::nullopt);
68+
OPTION(std::optional<bool>, EnableChangefeedsOnIndexTables, std::nullopt);
6869

6970
#undef OPTION
7071
};

0 commit comments

Comments
 (0)