Skip to content

Commit 205168b

Browse files
authored
Merge eca6c9f into a12ce1c
2 parents a12ce1c + eca6c9f commit 205168b

4 files changed

+88
-14
lines changed

ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.cpp

+8-3
Original file line numberDiff line numberDiff line change
@@ -539,18 +539,15 @@ void DoAlterStream(
539539
{
540540
auto outTx = TransactionTemplate(tablePath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpAlterCdcStreamImpl);
541541
outTx.MutableAlterCdcStream()->CopyFrom(op);
542-
543542
if (op.HasGetReady()) {
544543
outTx.MutableLockGuard()->SetOwnerTxId(op.GetGetReady().GetLockTxId());
545544
}
546545

547546
result.push_back(CreateAlterCdcStreamImpl(NextPartId(opId, result), outTx));
548547
}
549-
550548
{
551549
auto outTx = TransactionTemplate(workingDirPath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpAlterCdcStreamAtTable);
552550
outTx.MutableAlterCdcStream()->CopyFrom(op);
553-
554551
if (op.HasGetReady()) {
555552
outTx.MutableLockGuard()->SetOwnerTxId(op.GetGetReady().GetLockTxId());
556553
}
@@ -622,6 +619,14 @@ TVector<ISubOperation::TPtr> CreateAlterCdcStream(TOperationId opId, const TTxTr
622619
result.push_back(DropLock(NextPartId(opId, result), outTx));
623620
}
624621

622+
if (workingDirPath.IsTableIndex()) {
623+
auto outTx = TransactionTemplate(workingDirPath.Parent().PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpAlterTableIndex);
624+
outTx.MutableAlterTableIndex()->SetName(workingDirPath.LeafName());
625+
outTx.MutableAlterTableIndex()->SetState(NKikimrSchemeOp::EIndexState::EIndexStateReady);
626+
627+
result.push_back(CreateAlterTableIndex(NextPartId(opId, result), outTx));
628+
}
629+
625630
return result;
626631
}
627632

ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp

+8
Original file line numberDiff line numberDiff line change
@@ -955,6 +955,14 @@ TVector<ISubOperation::TPtr> CreateNewCdcStream(TOperationId opId, const TTxTran
955955
DoCreateLock(result, opId, workingDirPath, tablePath);
956956
}
957957

958+
if (workingDirPath.IsTableIndex()) {
959+
auto outTx = TransactionTemplate(workingDirPath.Parent().PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpAlterTableIndex);
960+
outTx.MutableAlterTableIndex()->SetName(workingDirPath.LeafName());
961+
outTx.MutableAlterTableIndex()->SetState(NKikimrSchemeOp::EIndexState::EIndexStateReady);
962+
963+
result.push_back(CreateAlterTableIndex(NextPartId(opId, result), outTx));
964+
}
965+
958966
Y_ABORT_UNLESS(context.SS->Tables.contains(tablePath.Base()->PathId));
959967
auto table = context.SS->Tables.at(tablePath.Base()->PathId);
960968

ydb/core/tx/schemeshard/schemeshard__operation_drop_cdc_stream.cpp

+8
Original file line numberDiff line numberDiff line change
@@ -539,6 +539,14 @@ void DoDropStream(
539539
result.push_back(DropLock(NextPartId(opId, result), outTx));
540540
}
541541

542+
if (workingDirPath.IsTableIndex()) {
543+
auto outTx = TransactionTemplate(workingDirPath.Parent().PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpAlterTableIndex);
544+
outTx.MutableAlterTableIndex()->SetName(workingDirPath.LeafName());
545+
outTx.MutableAlterTableIndex()->SetState(NKikimrSchemeOp::EIndexState::EIndexStateReady);
546+
547+
result.push_back(CreateAlterTableIndex(NextPartId(opId, result), outTx));
548+
}
549+
542550
{
543551
auto outTx = TransactionTemplate(tablePath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpDropCdcStreamImpl);
544552
outTx.MutableDrop()->SetName(streamPath.Base()->Name);

ydb/core/tx/schemeshard/ut_cdc_stream/ut_cdc_stream.cpp

+64-11
Original file line numberDiff line numberDiff line change
@@ -1224,6 +1224,9 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) {
12241224
)");
12251225
env.TestWaitNotification(runtime, txId);
12261226

1227+
TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/SyncIndex"), {NLs::PathVersionEqual(2)});
1228+
TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/SyncIndex/indexImplTable"), {NLs::PathVersionEqual(3)});
1229+
12271230
TestCreateCdcStream(runtime, ++txId, "/MyRoot/Table/UnknownIndex", R"(
12281231
TableName: "indexImplTable"
12291232
StreamDescription {
@@ -1252,12 +1255,10 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) {
12521255
)");
12531256
env.TestWaitNotification(runtime, txId);
12541257

1255-
TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/SyncIndex/indexImplTable/Stream"), {
1256-
NLs::PathExist,
1257-
});
1258-
TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/SyncIndex/indexImplTable/Stream/streamImpl"), {
1259-
NLs::PathExist,
1260-
});
1258+
TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/SyncIndex"), {NLs::PathVersionEqual(3)});
1259+
TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/SyncIndex/indexImplTable"), {NLs::PathVersionEqual(4)});
1260+
TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/SyncIndex/indexImplTable/Stream"), {NLs::PathExist});
1261+
TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/SyncIndex/indexImplTable/Stream/streamImpl"), {NLs::PathExist});
12611262

12621263
TestAlterCdcStream(runtime, ++txId, "/MyRoot/Table/UnknownIndex", R"(
12631264
TableName: "indexImplTable"
@@ -1272,6 +1273,8 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) {
12721273
)");
12731274
env.TestWaitNotification(runtime, txId);
12741275

1276+
TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/SyncIndex"), {NLs::PathVersionEqual(4)});
1277+
TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/SyncIndex/indexImplTable"), {NLs::PathVersionEqual(5)});
12751278
TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/SyncIndex/indexImplTable/Stream"), {
12761279
NLs::StreamState(NKikimrSchemeOp::ECdcStreamStateDisabled),
12771280
});
@@ -1287,12 +1290,62 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) {
12871290
)");
12881291
env.TestWaitNotification(runtime, txId);
12891292

1290-
TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/SyncIndex/indexImplTable/Stream"), {
1291-
NLs::PathNotExist,
1292-
});
1293-
TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/SyncIndex/indexImplTable/Stream/streamImpl"), {
1294-
NLs::PathNotExist,
1293+
TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/SyncIndex"), {NLs::PathVersionEqual(5)});
1294+
TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/SyncIndex/indexImplTable"), {NLs::PathVersionEqual(6)});
1295+
TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/SyncIndex/indexImplTable/Stream"), {NLs::PathNotExist});
1296+
TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/SyncIndex/indexImplTable/Stream/streamImpl"), {NLs::PathNotExist});
1297+
}
1298+
1299+
Y_UNIT_TEST(StreamOnBuildingIndexTable) {
1300+
TTestBasicRuntime runtime;
1301+
TTestEnv env(runtime, TTestEnvOptions().EnableProtoSourceIdInfo(true));
1302+
ui64 txId = 100;
1303+
1304+
TestCreateTable(runtime, ++txId, "/MyRoot", R"(
1305+
Name: "Table"
1306+
Columns { Name: "key" Type: "Uint64" }
1307+
Columns { Name: "indexed" Type: "Uint64" }
1308+
KeyColumnNames: ["key"]
1309+
)");
1310+
env.TestWaitNotification(runtime, txId);
1311+
1312+
THolder<IEventHandle> blockedBuildIndexRequest;
1313+
auto blockBuildIndexRequest = runtime.AddObserver<TEvDataShard::TEvBuildIndexCreateRequest>([&](auto& ev) {
1314+
blockedBuildIndexRequest.Reset(ev.Release());
12951315
});
1316+
1317+
AsyncBuildIndex(runtime, ++txId, TTestTxConfig::SchemeShard, "/MyRoot", "/MyRoot/Table", "Index", {"indexed"});
1318+
const auto buildIndexId = txId;
1319+
{
1320+
TDispatchOptions opts;
1321+
opts.FinalEvents.emplace_back([&blockedBuildIndexRequest](IEventHandle&) {
1322+
return bool(blockedBuildIndexRequest);
1323+
});
1324+
runtime.DispatchEvents(opts);
1325+
}
1326+
blockBuildIndexRequest.Remove();
1327+
1328+
TestCreateCdcStream(runtime, ++txId, "/MyRoot/Table/Index", R"(
1329+
TableName: "indexImplTable"
1330+
StreamDescription {
1331+
Name: "Stream"
1332+
Mode: ECdcStreamModeKeysOnly
1333+
Format: ECdcStreamFormatProto
1334+
}
1335+
)", {NKikimrScheme::StatusMultipleModifications});
1336+
1337+
runtime.Send(blockedBuildIndexRequest.Release(), 0, true);
1338+
env.TestWaitNotification(runtime, buildIndexId);
1339+
1340+
TestCreateCdcStream(runtime, ++txId, "/MyRoot/Table/Index", R"(
1341+
TableName: "indexImplTable"
1342+
StreamDescription {
1343+
Name: "Stream"
1344+
Mode: ECdcStreamModeKeysOnly
1345+
Format: ECdcStreamFormatProto
1346+
}
1347+
)");
1348+
env.TestWaitNotification(runtime, txId);
12961349
}
12971350

12981351
Y_UNIT_TEST(DropIndexWithStream) {

0 commit comments

Comments
 (0)