@@ -101,22 +101,38 @@ class TNewCdcStream: public TSubOperation {
101101 }
102102 }
103103
104+ TString BuildWorkingDir () const {
105+ if (Transaction.GetCreateCdcStream ().HasIndexName ()) {
106+ return Transaction.GetWorkingDir () + " /"
107+ + Transaction.GetCreateCdcStream ().GetIndexName () + " /indexImplTable" ;
108+ } else {
109+ return Transaction.GetWorkingDir ();
110+ }
111+ }
112+
104113public:
105114 using TSubOperation::TSubOperation;
106115
107116 THolder<TProposeResponse> Propose (const TString& owner, TOperationContext& context) override {
108- const auto & workingDir = Transaction.GetWorkingDir ();
109117 const auto & op = Transaction.GetCreateCdcStream ();
110118 const auto & streamDesc = op.GetStreamDescription ();
111119 const auto & streamName = streamDesc.GetName ();
112120 const auto acceptExisted = !Transaction.GetFailOnExist ();
113121
122+ auto result = MakeHolder<TProposeResponse>(NKikimrScheme::StatusAccepted, ui64 (OperationId.GetTxId ()), context.SS ->TabletID ());
123+
124+ if (op.HasAllIndexes ()) {
125+ result->SetError (NKikimrScheme::StatusInvalidParameter,
126+ " Illigal part operation with all indexes flag" );
127+ return result;
128+ }
129+
130+ const auto & workingDir = BuildWorkingDir ();
131+
114132 LOG_N (" TNewCdcStream Propose"
115133 << " : opId# " << OperationId
116134 << " , stream# " << workingDir << " /" << streamName);
117135
118- auto result = MakeHolder<TProposeResponse>(NKikimrScheme::StatusAccepted, ui64 (OperationId.GetTxId ()), context.SS ->TabletID ());
119-
120136 const auto tablePath = TPath::Resolve (workingDir, context.SS );
121137 {
122138 const auto checks = tablePath.Check ();
@@ -126,11 +142,17 @@ class TNewCdcStream: public TSubOperation {
126142 .IsAtLocalSchemeShard ()
127143 .IsResolved ()
128144 .NotDeleted ()
129- .IsTable ()
130145 .NotAsyncReplicaTable ()
131- .IsCommonSensePath ()
132146 .NotUnderDeleting ();
133147
148+ if (op.HasIndexName () && op.GetIndexName ()) {
149+ checks.IsInsideTableIndexPath ();
150+ } else {
151+ checks
152+ .IsTable ()
153+ .IsCommonSensePath ();
154+ }
155+
134156 if (!checks) {
135157 result->SetError (checks.GetStatus (), checks.GetError ());
136158 return result;
@@ -503,17 +525,35 @@ class TNewCdcStreamAtTable: public TSubOperation {
503525 }
504526
505527 THolder<TProposeResponse> Propose (const TString&, TOperationContext& context) override {
506- const auto & workingDir = Transaction.GetWorkingDir ();
528+ auto workingDir = Transaction.GetWorkingDir ();
507529 const auto & op = Transaction.GetCreateCdcStream ();
508- const auto & tableName = op.GetTableName ();
530+ auto tableName = op.GetTableName ();
509531 const auto & streamName = op.GetStreamDescription ().GetName ();
510532
533+ auto result = MakeHolder<TProposeResponse>(NKikimrScheme::StatusAccepted, ui64 (OperationId.GetTxId ()), context.SS ->TabletID ());
534+ bool isIndexTable = false ;
535+
536+ if (op.HasAllIndexes ()) {
537+ result->SetError (NKikimrScheme::StatusInvalidParameter,
538+ " Illigal part operation with all indexes flag" );
539+ return result;
540+ }
541+
542+ if (op.HasIndexName ()) {
543+ if (!op.GetIndexName ()) {
544+ result->SetError (NKikimrScheme::StatusInvalidParameter,
545+ " Unexpected empty index name" );
546+ return result;
547+ }
548+ isIndexTable = true ;
549+ workingDir += (" /" + tableName + " /" + op.GetIndexName ());
550+ tableName = " indexImplTable" ;
551+ }
552+
511553 LOG_N (" TNewCdcStreamAtTable Propose"
512554 << " : opId# " << OperationId
513555 << " , stream# " << workingDir << " /" << tableName << " /" << streamName);
514556
515- auto result = MakeHolder<TProposeResponse>(NKikimrScheme::StatusAccepted, ui64 (OperationId.GetTxId ()), context.SS ->TabletID ());
516-
517557 const auto workingDirPath = TPath::Resolve (workingDir, context.SS );
518558 {
519559 const auto checks = workingDirPath.Check ();
@@ -522,10 +562,15 @@ class TNewCdcStreamAtTable: public TSubOperation {
522562 .IsAtLocalSchemeShard ()
523563 .IsResolved ()
524564 .NotDeleted ()
525- .IsCommonSensePath ()
526565 .IsLikeDirectory ()
527566 .NotUnderDeleting ();
528567
568+ if (isIndexTable) {
569+ checks.IsInsideTableIndexPath ();
570+ } else {
571+ checks.IsCommonSensePath ();
572+ }
573+
529574 if (!checks) {
530575 result->SetError (checks.GetStatus (), checks.GetError ());
531576 return result;
@@ -543,10 +588,12 @@ class TNewCdcStreamAtTable: public TSubOperation {
543588 .NotDeleted ()
544589 .IsTable ()
545590 .NotAsyncReplicaTable ()
546- .IsCommonSensePath ()
547591 .NotUnderDeleting ();
548592
549593 if (checks) {
594+ if (!isIndexTable) {
595+ checks.IsCommonSensePath ();
596+ }
550597 if (InitialScan) {
551598 checks.IsUnderTheSameOperation (OperationId.GetTxId ()); // lock op
552599 } else {
@@ -628,17 +675,18 @@ class TNewCdcStreamAtTable: public TSubOperation {
628675
629676private:
630677 const bool InitialScan;
631-
632678}; // TNewCdcStreamAtTable
633679
634- void DoCreateLock (const TOperationId& opId, const TPath& workingDirPath, const TPath& tablePath,
680+ void DoCreateLock (const TOperationId opId, const TPath& workingDirPath, const TPath& tablePath, bool allowIndexImplLock ,
635681 TVector<ISubOperation::TPtr>& result)
636682{
637683 auto outTx = TransactionTemplate (workingDirPath.PathString (),
638684 NKikimrSchemeOp::EOperationType::ESchemeOpCreateLock);
639685 outTx.SetFailOnExist (false );
640686 outTx.SetInternal (true );
641- outTx.MutableLockConfig ()->SetName (tablePath.LeafName ());
687+ auto cfg = outTx.MutableLockConfig ();
688+ cfg->SetName (tablePath.LeafName ());
689+ cfg->SetAllowIndexImplLock (allowIndexImplLock);
642690
643691 result.push_back (CreateLock (NextPartId (opId, result), outTx));
644692}
@@ -698,30 +746,34 @@ void DoCreatePqPart(const TOperationId& opId, const TPath& streamPath, const TSt
698746 result.push_back (CreateNewPQ (NextPartId (opId, result), outTx));
699747}
700748
749+ void FillModifySchemaForCdc (NKikimrSchemeOp::TModifyScheme& outTx, const NKikimrSchemeOp::TCreateCdcStream& op,
750+ const TOperationId& opId, const TString& indexName, bool acceptExisted, bool initialScan)
751+ {
752+ outTx.SetFailOnExist (!acceptExisted);
753+ outTx.MutableCreateCdcStream ()->CopyFrom (op);
754+ if (indexName) {
755+ outTx.MutableCreateCdcStream ()->SetIndexName (indexName);
756+ } else {
757+ outTx.MutableCreateCdcStream ()->ClearIndexMode ();
758+ }
759+
760+ if (initialScan) {
761+ outTx.MutableLockGuard ()->SetOwnerTxId (ui64 (opId.GetTxId ()));
762+ }
763+ }
764+
701765void DoCreateStream (const NKikimrSchemeOp::TCreateCdcStream& op, const TOperationId& opId, const TPath& workingDirPath, const TPath& tablePath,
702- const bool acceptExisted, const bool initialScan, TVector<ISubOperation::TPtr>& result)
766+ const bool acceptExisted, const bool initialScan, const TString& indexName, TVector<ISubOperation::TPtr>& result)
703767{
704768 {
705769 auto outTx = TransactionTemplate (tablePath.PathString (), NKikimrSchemeOp::EOperationType::ESchemeOpCreateCdcStreamImpl);
706- outTx.SetFailOnExist (!acceptExisted);
707- outTx.MutableCreateCdcStream ()->CopyFrom (op);
708-
709- if (initialScan) {
710- outTx.MutableLockGuard ()->SetOwnerTxId (ui64 (opId.GetTxId ()));
711- }
712-
770+ FillModifySchemaForCdc (outTx, op, opId, indexName, acceptExisted, initialScan);
713771 result.push_back (CreateNewCdcStreamImpl (NextPartId (opId, result), outTx));
714772 }
715773
716774 {
717775 auto outTx = TransactionTemplate (workingDirPath.PathString (), NKikimrSchemeOp::EOperationType::ESchemeOpCreateCdcStreamAtTable);
718- outTx.SetFailOnExist (!acceptExisted);
719- outTx.MutableCreateCdcStream ()->CopyFrom (op);
720-
721- if (initialScan) {
722- outTx.MutableLockGuard ()->SetOwnerTxId (ui64 (opId.GetTxId ()));
723- }
724-
776+ FillModifySchemaForCdc (outTx, op, opId, indexName, acceptExisted, initialScan);
725777 result.push_back (CreateNewCdcStreamAtTable (NextPartId (opId, result), outTx, initialScan));
726778 }
727779}
@@ -777,6 +829,18 @@ ISubOperation::TPtr RejectOnTablePathChecks(const TOperationId& opId, const TPat
777829 return nullptr ;
778830}
779831
832+ void CalcBoundaries (const TTableInfo::TPtr table, TVector<TString>& boundaries) {
833+ const auto & partitions = table->GetPartitions ();
834+ boundaries.reserve (partitions.size () - 1 );
835+
836+ for (ui32 i = 0 ; i < partitions.size (); ++i) {
837+ const auto & partition = partitions.at (i);
838+ if (i != partitions.size () - 1 ) {
839+ boundaries.push_back (partition.EndOfRange );
840+ }
841+ }
842+ }
843+
780844} // anonymous
781845
782846ISubOperation::TPtr CreateNewCdcStreamImpl (TOperationId id, const TTxTransaction& tx) {
@@ -811,11 +875,13 @@ TVector<ISubOperation::TPtr> CreateNewCdcStream(TOperationId opId, const TTxTran
811875 const auto workingDirPath = TPath::Resolve (tx.GetWorkingDir (), context.SS );
812876
813877 const auto tablePath = workingDirPath.Child (tableName);
878+
814879 if (auto reject = RejectOnTablePathChecks (opId, tablePath)) {
815880 return {reject};
816881 }
817882
818883 const auto streamPath = tablePath.Child (streamName);
884+
819885 if (auto reject = RejectOnCdcChecks (opId, streamPath, acceptExisted)) {
820886 return {reject};
821887 }
@@ -860,15 +926,84 @@ TVector<ISubOperation::TPtr> CreateNewCdcStream(TOperationId opId, const TTxTran
860926 << " Initial scan is not supported yet" )};
861927 }
862928
863- Y_ABORT_UNLESS (context.SS ->Tables .contains (tablePath.Base ()->PathId ));
864- auto table = context.SS ->Tables .at (tablePath.Base ()->PathId );
865-
866- TVector<TString> boundaries;
867929 if (op.HasTopicPartitions ()) {
868930 if (op.GetTopicPartitions () <= 0 ) {
869931 return {CreateReject (opId, NKikimrScheme::StatusInvalidParameter, " Topic partitions count must be greater than 0" )};
870932 }
933+ }
934+
935+ std::vector<std::pair<TString, TPathId>> indexes;
936+
937+ if (op.GetIndexModeCase () == NKikimrSchemeOp::TCreateCdcStream::kAllIndexes && op.GetAllIndexes ()) {
938+ indexes.reserve (tablePath->GetChildren ().size ());
939+ for (const auto & child : tablePath->GetChildren ()) {
940+ indexes.emplace_back (child);
941+ }
942+ } else if (op.GetIndexModeCase () == NKikimrSchemeOp::TCreateCdcStream::kIndexName ) {
943+ auto it = tablePath->GetChildren ().find (op.GetIndexName ());
944+ if (it == tablePath->GetChildren ().end ()) {
945+ return {CreateReject (opId, NKikimrScheme::StatusSchemeError,
946+ " requested particular index hasn't been found" )};
947+ }
948+ indexes.emplace_back (*it);
949+ }
950+
951+ TVector<ISubOperation::TPtr> result;
952+
953+ for (const auto & child : indexes) {
954+ const auto & name = child.first ;
955+
956+ const TPath indexPath = tablePath.Child (name);
957+ if (!indexPath.IsTableIndex () || indexPath.IsDeleted ()) {
958+ continue ;
959+ }
960+
961+ const TPath indexImplPath = indexPath.Child (" indexImplTable" );
962+ if (!indexImplPath) {
963+ return {CreateReject (opId, NKikimrScheme::StatusSchemeError,
964+ " indexImplTable hasn't been found" )};
965+ }
966+
967+ Y_ABORT_UNLESS (context.SS ->Tables .contains (tablePath.Base ()->PathId ));
968+ auto indexImpltable = context.SS ->Tables .at (indexImplPath.Base ()->PathId );
969+
970+ const TPath indexStreamPath = indexImplPath.Child (streamName);
971+ if (auto reject = RejectOnCdcChecks (opId, indexStreamPath, acceptExisted)) {
972+ return {reject};
973+ }
871974
975+ if (initialScan) {
976+ DoCreateLock (opId, indexPath, indexImplPath, true , result);
977+ }
978+
979+ TVector<TString> boundaries;
980+ if (op.HasTopicPartitions ()) {
981+ const auto & keyColumns = indexImpltable->KeyColumnIds ;
982+ const auto & columns = indexImpltable->Columns ;
983+
984+ Y_ABORT_UNLESS (!keyColumns.empty ());
985+ Y_ABORT_UNLESS (columns.contains (keyColumns.at (0 )));
986+ const auto firstKeyColumnType = columns.at (keyColumns.at (0 )).PType ;
987+
988+ if (!TSchemeShard::FillUniformPartitioning (boundaries, keyColumns.size (), firstKeyColumnType, op.GetTopicPartitions (), AppData ()->TypeRegistry , errStr)) {
989+ return {CreateReject (opId, NKikimrScheme::StatusInvalidParameter, errStr)};
990+ }
991+ } else {
992+ CalcBoundaries (indexImpltable, boundaries);
993+ }
994+
995+ DoCreateStream (op, opId, workingDirPath, tablePath, acceptExisted, initialScan, name, result);
996+ DoCreatePqPart (opId, indexStreamPath, streamName, indexImpltable, op, boundaries, acceptExisted, result);
997+ }
998+
999+ if (initialScan) {
1000+ DoCreateLock (opId, workingDirPath, tablePath, false , result);
1001+ }
1002+
1003+ Y_ABORT_UNLESS (context.SS ->Tables .contains (tablePath.Base ()->PathId ));
1004+ auto table = context.SS ->Tables .at (tablePath.Base ()->PathId );
1005+ TVector<TString> boundaries;
1006+ if (op.HasTopicPartitions ()) {
8721007 const auto & keyColumns = table->KeyColumnIds ;
8731008 const auto & columns = table->Columns ;
8741009
@@ -880,26 +1015,11 @@ TVector<ISubOperation::TPtr> CreateNewCdcStream(TOperationId opId, const TTxTran
8801015 return {CreateReject (opId, NKikimrScheme::StatusInvalidParameter, errStr)};
8811016 }
8821017 } else {
883- const auto & partitions = table->GetPartitions ();
884- boundaries.reserve (partitions.size () - 1 );
885-
886- for (ui32 i = 0 ; i < partitions.size (); ++i) {
887- const auto & partition = partitions.at (i);
888- if (i != partitions.size () - 1 ) {
889- boundaries.push_back (partition.EndOfRange );
890- }
891- }
1018+ CalcBoundaries (table, boundaries);
8921019 }
8931020
894- TVector<ISubOperation::TPtr> result;
895-
896- if (initialScan) {
897- DoCreateLock (opId, workingDirPath, tablePath, result);
898- }
899-
900- DoCreateStream (op, opId, workingDirPath, tablePath, acceptExisted, initialScan, result);
1021+ DoCreateStream (op, opId, workingDirPath, tablePath, acceptExisted, initialScan, {}, result);
9011022 DoCreatePqPart (opId, streamPath, streamName, table, op, boundaries, acceptExisted, result);
902-
9031023 return result;
9041024}
9051025
0 commit comments