@@ -105,22 +105,38 @@ class TNewCdcStream: public TSubOperation {
105105 }
106106 }
107107
108+ TString BuildWorkingDir () const {
109+ if (Transaction.GetCreateCdcStream ().HasIndexName ()) {
110+ return Transaction.GetWorkingDir () + " /"
111+ + Transaction.GetCreateCdcStream ().GetIndexName () + " /indexImplTable" ;
112+ } else {
113+ return Transaction.GetWorkingDir ();
114+ }
115+ }
116+
108117public:
109118 using TSubOperation::TSubOperation;
110119
111120 THolder<TProposeResponse> Propose (const TString& owner, TOperationContext& context) override {
112- const auto & workingDir = Transaction.GetWorkingDir ();
113121 const auto & op = Transaction.GetCreateCdcStream ();
114122 const auto & streamDesc = op.GetStreamDescription ();
115123 const auto & streamName = streamDesc.GetName ();
116124 const auto acceptExisted = !Transaction.GetFailOnExist ();
117125
126+ auto result = MakeHolder<TProposeResponse>(NKikimrScheme::StatusAccepted, ui64 (OperationId.GetTxId ()), context.SS ->TabletID ());
127+
128+ if (op.HasAllIndexes ()) {
129+ result->SetError (NKikimrScheme::StatusInvalidParameter,
130+ " Illigal part operation with all indexes flag" );
131+ return result;
132+ }
133+
134+ const auto & workingDir = BuildWorkingDir ();
135+
118136 LOG_N (" TNewCdcStream Propose"
119137 << " : opId# " << OperationId
120138 << " , stream# " << workingDir << " /" << streamName);
121139
122- auto result = MakeHolder<TProposeResponse>(NKikimrScheme::StatusAccepted, ui64 (OperationId.GetTxId ()), context.SS ->TabletID ());
123-
124140 const auto tablePath = TPath::Resolve (workingDir, context.SS );
125141 {
126142 const auto checks = tablePath.Check ();
@@ -130,11 +146,17 @@ class TNewCdcStream: public TSubOperation {
130146 .IsAtLocalSchemeShard ()
131147 .IsResolved ()
132148 .NotDeleted ()
133- .IsTable ()
134149 .NotAsyncReplicaTable ()
135- .IsCommonSensePath ()
136150 .NotUnderDeleting ();
137151
152+ if (op.HasIndexName () && op.GetIndexName ()) {
153+ checks.IsInsideTableIndexPath ();
154+ } else {
155+ checks
156+ .IsTable ()
157+ .IsCommonSensePath ();
158+ }
159+
138160 if (!checks) {
139161 result->SetError (checks.GetStatus (), checks.GetError ());
140162 return result;
@@ -507,17 +529,35 @@ class TNewCdcStreamAtTable: public TSubOperation {
507529 }
508530
509531 THolder<TProposeResponse> Propose (const TString&, TOperationContext& context) override {
510- const auto & workingDir = Transaction.GetWorkingDir ();
532+ auto workingDir = Transaction.GetWorkingDir ();
511533 const auto & op = Transaction.GetCreateCdcStream ();
512- const auto & tableName = op.GetTableName ();
534+ auto tableName = op.GetTableName ();
513535 const auto & streamName = op.GetStreamDescription ().GetName ();
514536
537+ auto result = MakeHolder<TProposeResponse>(NKikimrScheme::StatusAccepted, ui64 (OperationId.GetTxId ()), context.SS ->TabletID ());
538+ bool isIndexTable = false ;
539+
540+ if (op.HasAllIndexes ()) {
541+ result->SetError (NKikimrScheme::StatusInvalidParameter,
542+ " Illigal part operation with all indexes flag" );
543+ return result;
544+ }
545+
546+ if (op.HasIndexName ()) {
547+ if (!op.GetIndexName ()) {
548+ result->SetError (NKikimrScheme::StatusInvalidParameter,
549+ " Unexpected empty index name" );
550+ return result;
551+ }
552+ isIndexTable = true ;
553+ workingDir += (" /" + tableName + " /" + op.GetIndexName ());
554+ tableName = " indexImplTable" ;
555+ }
556+
515557 LOG_N (" TNewCdcStreamAtTable Propose"
516558 << " : opId# " << OperationId
517559 << " , stream# " << workingDir << " /" << tableName << " /" << streamName);
518560
519- auto result = MakeHolder<TProposeResponse>(NKikimrScheme::StatusAccepted, ui64 (OperationId.GetTxId ()), context.SS ->TabletID ());
520-
521561 const auto workingDirPath = TPath::Resolve (workingDir, context.SS );
522562 {
523563 const auto checks = workingDirPath.Check ();
@@ -526,10 +566,15 @@ class TNewCdcStreamAtTable: public TSubOperation {
526566 .IsAtLocalSchemeShard ()
527567 .IsResolved ()
528568 .NotDeleted ()
529- .IsCommonSensePath ()
530569 .IsLikeDirectory ()
531570 .NotUnderDeleting ();
532571
572+ if (isIndexTable) {
573+ checks.IsInsideTableIndexPath ();
574+ } else {
575+ checks.IsCommonSensePath ();
576+ }
577+
533578 if (!checks) {
534579 result->SetError (checks.GetStatus (), checks.GetError ());
535580 return result;
@@ -547,10 +592,12 @@ class TNewCdcStreamAtTable: public TSubOperation {
547592 .NotDeleted ()
548593 .IsTable ()
549594 .NotAsyncReplicaTable ()
550- .IsCommonSensePath ()
551595 .NotUnderDeleting ();
552596
553597 if (checks) {
598+ if (!isIndexTable) {
599+ checks.IsCommonSensePath ();
600+ }
554601 if (InitialScan) {
555602 checks.IsUnderTheSameOperation (OperationId.GetTxId ()); // lock op
556603 } else {
@@ -632,17 +679,18 @@ class TNewCdcStreamAtTable: public TSubOperation {
632679
633680private:
634681 const bool InitialScan;
635-
636682}; // TNewCdcStreamAtTable
637683
638- void DoCreateLock (const TOperationId& opId, const TPath& workingDirPath, const TPath& tablePath,
684+ void DoCreateLock (const TOperationId opId, const TPath& workingDirPath, const TPath& tablePath, bool allowIndexImplLock ,
639685 TVector<ISubOperation::TPtr>& result)
640686{
641687 auto outTx = TransactionTemplate (workingDirPath.PathString (),
642688 NKikimrSchemeOp::EOperationType::ESchemeOpCreateLock);
643689 outTx.SetFailOnExist (false );
644690 outTx.SetInternal (true );
645- outTx.MutableLockConfig ()->SetName (tablePath.LeafName ());
691+ auto cfg = outTx.MutableLockConfig ();
692+ cfg->SetName (tablePath.LeafName ());
693+ cfg->SetAllowIndexImplLock (allowIndexImplLock);
646694
647695 result.push_back (CreateLock (NextPartId (opId, result), outTx));
648696}
@@ -704,30 +752,34 @@ void DoCreatePqPart(const TOperationId& opId, const TPath& streamPath, const TSt
704752 result.push_back (CreateNewPQ (NextPartId (opId, result), outTx));
705753}
706754
755+ void FillModifySchemaForCdc (NKikimrSchemeOp::TModifyScheme& outTx, const NKikimrSchemeOp::TCreateCdcStream& op,
756+ const TOperationId& opId, const TString& indexName, bool acceptExisted, bool initialScan)
757+ {
758+ outTx.SetFailOnExist (!acceptExisted);
759+ outTx.MutableCreateCdcStream ()->CopyFrom (op);
760+ if (indexName) {
761+ outTx.MutableCreateCdcStream ()->SetIndexName (indexName);
762+ } else {
763+ outTx.MutableCreateCdcStream ()->ClearIndexMode ();
764+ }
765+
766+ if (initialScan) {
767+ outTx.MutableLockGuard ()->SetOwnerTxId (ui64 (opId.GetTxId ()));
768+ }
769+ }
770+
707771void DoCreateStream (const NKikimrSchemeOp::TCreateCdcStream& op, const TOperationId& opId, const TPath& workingDirPath, const TPath& tablePath,
708- const bool acceptExisted, const bool initialScan, TVector<ISubOperation::TPtr>& result)
772+ const bool acceptExisted, const bool initialScan, const TString& indexName, TVector<ISubOperation::TPtr>& result)
709773{
710774 {
711775 auto outTx = TransactionTemplate (tablePath.PathString (), NKikimrSchemeOp::EOperationType::ESchemeOpCreateCdcStreamImpl);
712- outTx.SetFailOnExist (!acceptExisted);
713- outTx.MutableCreateCdcStream ()->CopyFrom (op);
714-
715- if (initialScan) {
716- outTx.MutableLockGuard ()->SetOwnerTxId (ui64 (opId.GetTxId ()));
717- }
718-
776+ FillModifySchemaForCdc (outTx, op, opId, indexName, acceptExisted, initialScan);
719777 result.push_back (CreateNewCdcStreamImpl (NextPartId (opId, result), outTx));
720778 }
721779
722780 {
723781 auto outTx = TransactionTemplate (workingDirPath.PathString (), NKikimrSchemeOp::EOperationType::ESchemeOpCreateCdcStreamAtTable);
724- outTx.SetFailOnExist (!acceptExisted);
725- outTx.MutableCreateCdcStream ()->CopyFrom (op);
726-
727- if (initialScan) {
728- outTx.MutableLockGuard ()->SetOwnerTxId (ui64 (opId.GetTxId ()));
729- }
730-
782+ FillModifySchemaForCdc (outTx, op, opId, indexName, acceptExisted, initialScan);
731783 result.push_back (CreateNewCdcStreamAtTable (NextPartId (opId, result), outTx, initialScan));
732784 }
733785}
@@ -785,6 +837,36 @@ ISubOperation::TPtr RejectOnTablePathChecks(const TOperationId& opId, const TPat
785837 return nullptr ;
786838}
787839
840+ void CalcBoundaries (const TTableInfo& table, TVector<TString>& boundaries) {
841+ const auto & partitions = table.GetPartitions ();
842+ boundaries.reserve (partitions.size () - 1 );
843+
844+ for (ui32 i = 0 ; i < partitions.size (); ++i) {
845+ const auto & partition = partitions.at (i);
846+ if (i != partitions.size () - 1 ) {
847+ boundaries.push_back (partition.EndOfRange );
848+ }
849+ }
850+ }
851+
852+ bool FillBoundaries (const TTableInfo& table, const ::NKikimrSchemeOp::TCreateCdcStream& op, TVector<TString>& boundaries, TString& errStr) {
853+ if (op.HasTopicPartitions ()) {
854+ const auto & keyColumns = table.KeyColumnIds ;
855+ const auto & columns = table.Columns ;
856+
857+ Y_ABORT_UNLESS (!keyColumns.empty ());
858+ Y_ABORT_UNLESS (columns.contains (keyColumns.at (0 )));
859+ const auto firstKeyColumnType = columns.at (keyColumns.at (0 )).PType ;
860+
861+ if (!TSchemeShard::FillUniformPartitioning (boundaries, keyColumns.size (), firstKeyColumnType, op.GetTopicPartitions (), AppData ()->TypeRegistry , errStr)) {
862+ return false ;
863+ }
864+ } else {
865+ CalcBoundaries (table, boundaries);
866+ }
867+ return true ;
868+ }
869+
788870} // anonymous
789871
790872std::variant<TStreamPaths, ISubOperation::TPtr> DoNewStreamPathChecks (
@@ -889,46 +971,76 @@ TVector<ISubOperation::TPtr> CreateNewCdcStream(TOperationId opId, const TTxTran
889971 << " Initial scan is not supported yet" )};
890972 }
891973
892- Y_ABORT_UNLESS (context.SS ->Tables .contains (tablePath.Base ()->PathId ));
893- auto table = context.SS ->Tables .at (tablePath.Base ()->PathId );
894-
895- TVector<TString> boundaries;
896974 if (op.HasTopicPartitions ()) {
897975 if (op.GetTopicPartitions () <= 0 ) {
898976 return {CreateReject (opId, NKikimrScheme::StatusInvalidParameter, " Topic partitions count must be greater than 0" )};
899977 }
978+ }
900979
901- const auto & keyColumns = table->KeyColumnIds ;
902- const auto & columns = table->Columns ;
903-
904- Y_ABORT_UNLESS (!keyColumns.empty ());
905- Y_ABORT_UNLESS (columns.contains (keyColumns.at (0 )));
906- const auto firstKeyColumnType = columns.at (keyColumns.at (0 )).PType ;
980+ std::vector<TString> candidates;
907981
908- if (!TSchemeShard::FillUniformPartitioning (boundaries, keyColumns.size (), firstKeyColumnType, op.GetTopicPartitions (), AppData ()->TypeRegistry , errStr)) {
909- return {CreateReject (opId, NKikimrScheme::StatusInvalidParameter, errStr)};
982+ if (op.GetIndexModeCase () == NKikimrSchemeOp::TCreateCdcStream::kAllIndexes && op.GetAllIndexes ()) {
983+ candidates.reserve (tablePath->GetChildren ().size ());
984+ for (const auto & child : tablePath->GetChildren ()) {
985+ candidates.emplace_back (child.first );
910986 }
911- } else {
912- const auto & partitions = table->GetPartitions ();
913- boundaries.reserve (partitions.size () - 1 );
914-
915- for (ui32 i = 0 ; i < partitions.size (); ++i) {
916- const auto & partition = partitions.at (i);
917- if (i != partitions.size () - 1 ) {
918- boundaries.push_back (partition.EndOfRange );
919- }
987+ } else if (op.GetIndexModeCase () == NKikimrSchemeOp::TCreateCdcStream::kIndexName ) {
988+ auto it = tablePath->GetChildren ().find (op.GetIndexName ());
989+ if (it == tablePath->GetChildren ().end ()) {
990+ return {CreateReject (opId, NKikimrScheme::StatusSchemeError,
991+ " requested particular path hasn't been found" )};
920992 }
993+ candidates.emplace_back (it->first );
921994 }
922995
923996 TVector<ISubOperation::TPtr> result;
924997
998+ for (const auto & name : candidates) {
999+ const TPath indexPath = tablePath.Child (name);
1000+ if (!indexPath.IsTableIndex () || indexPath.IsDeleted ()) {
1001+ continue ;
1002+ }
1003+
1004+ const TPath indexImplPath = indexPath.Child (" indexImplTable" );
1005+ if (!indexImplPath) {
1006+ return {CreateReject (opId, NKikimrScheme::StatusSchemeError,
1007+ " indexImplTable hasn't been found" )};
1008+ }
1009+
1010+ Y_ABORT_UNLESS (context.SS ->Tables .contains (tablePath.Base ()->PathId ));
1011+ auto indexImplTable = context.SS ->Tables .at (indexImplPath.Base ()->PathId );
1012+
1013+ const TPath indexStreamPath = indexImplPath.Child (streamName);
1014+ if (auto reject = RejectOnCdcChecks (opId, indexStreamPath, acceptExisted)) {
1015+ return {reject};
1016+ }
1017+
1018+ if (initialScan) {
1019+ DoCreateLock (opId, indexPath, indexImplPath, true , result);
1020+ }
1021+
1022+ TVector<TString> boundaries;
1023+ if (!FillBoundaries (*indexImplTable, op, boundaries, errStr)) {
1024+ return {CreateReject (opId, NKikimrScheme::StatusInvalidParameter, errStr)};
1025+ }
1026+
1027+ DoCreateStream (op, opId, workingDirPath, tablePath, acceptExisted, initialScan, name, result);
1028+ DoCreatePqPart (opId, indexStreamPath, streamName, indexImplTable, op, boundaries, acceptExisted, result);
1029+ }
1030+
9251031 if (initialScan) {
926- DoCreateLock (opId, workingDirPath, tablePath, result);
1032+ DoCreateLock (opId, workingDirPath, tablePath, false , result);
9271033 }
9281034
929- DoCreateStream (op, opId, workingDirPath, tablePath, acceptExisted, initialScan, result);
930- DoCreatePqPart (opId, streamPath, streamName, table, op, boundaries, acceptExisted, result);
1035+ Y_ABORT_UNLESS (context.SS ->Tables .contains (tablePath.Base ()->PathId ));
1036+ auto table = context.SS ->Tables .at (tablePath.Base ()->PathId );
1037+ TVector<TString> boundaries;
1038+ if (!FillBoundaries (*table, op, boundaries, errStr)) {
1039+ return {CreateReject (opId, NKikimrScheme::StatusInvalidParameter, errStr)};
1040+ }
9311041
1042+ DoCreateStream (op, opId, workingDirPath, tablePath, acceptExisted, initialScan, {}, result);
1043+ DoCreatePqPart (opId, streamPath, streamName, table, op, boundaries, acceptExisted, result);
9321044 return result;
9331045}
9341046
0 commit comments