@@ -649,34 +649,6 @@ void DoCreateLock(const TOperationId& opId, const TPath& workingDirPath, const T
649649
650650} // anonymous
651651
652- void DoCreateStream (const NKikimrSchemeOp::TCreateCdcStream& op, const TOperationId& opId, const TPath& workingDirPath, const TPath& tablePath,
653- const bool acceptExisted, const bool initialScan, TVector<ISubOperation::TPtr>& result)
654- {
655- {
656- auto outTx = TransactionTemplate (tablePath.PathString (), NKikimrSchemeOp::EOperationType::ESchemeOpCreateCdcStreamImpl);
657- outTx.SetFailOnExist (!acceptExisted);
658- outTx.MutableCreateCdcStream ()->CopyFrom (op);
659-
660- if (initialScan) {
661- outTx.MutableLockGuard ()->SetOwnerTxId (ui64 (opId.GetTxId ()));
662- }
663-
664- result.push_back (CreateNewCdcStreamImpl (NextPartId (opId, result), outTx));
665- }
666-
667- {
668- auto outTx = TransactionTemplate (workingDirPath.PathString (), NKikimrSchemeOp::EOperationType::ESchemeOpCreateCdcStreamAtTable);
669- outTx.SetFailOnExist (!acceptExisted);
670- outTx.MutableCreateCdcStream ()->CopyFrom (op);
671-
672- if (initialScan) {
673- outTx.MutableLockGuard ()->SetOwnerTxId (ui64 (opId.GetTxId ()));
674- }
675-
676- result.push_back (CreateNewCdcStreamAtTable (NextPartId (opId, result), outTx, initialScan));
677- }
678- }
679-
680652void DoCreatePqPart (const TOperationId& opId, const TPath& streamPath, const TString& streamName,
681653 const TIntrusivePtr<TTableInfo> table, const NKikimrSchemeOp::TCreateCdcStream& op,
682654 const TVector<TString>& boundaries, const bool acceptExisted, TVector<ISubOperation::TPtr>& result)
@@ -732,6 +704,34 @@ void DoCreatePqPart(const TOperationId& opId, const TPath& streamPath, const TSt
732704 result.push_back (CreateNewPQ (NextPartId (opId, result), outTx));
733705}
734706
707+ void DoCreateStream (const NKikimrSchemeOp::TCreateCdcStream& op, const TOperationId& opId, const TPath& workingDirPath, const TPath& tablePath,
708+ const bool acceptExisted, const bool initialScan, TVector<ISubOperation::TPtr>& result)
709+ {
710+ {
711+ auto outTx = TransactionTemplate (tablePath.PathString (), NKikimrSchemeOp::EOperationType::ESchemeOpCreateCdcStreamImpl);
712+ outTx.SetFailOnExist (!acceptExisted);
713+ outTx.MutableCreateCdcStream ()->CopyFrom (op);
714+
715+ if (initialScan) {
716+ outTx.MutableLockGuard ()->SetOwnerTxId (ui64 (opId.GetTxId ()));
717+ }
718+
719+ result.push_back (CreateNewCdcStreamImpl (NextPartId (opId, result), outTx));
720+ }
721+
722+ {
723+ auto outTx = TransactionTemplate (workingDirPath.PathString (), NKikimrSchemeOp::EOperationType::ESchemeOpCreateCdcStreamAtTable);
724+ outTx.SetFailOnExist (!acceptExisted);
725+ outTx.MutableCreateCdcStream ()->CopyFrom (op);
726+
727+ if (initialScan) {
728+ outTx.MutableLockGuard ()->SetOwnerTxId (ui64 (opId.GetTxId ()));
729+ }
730+
731+ result.push_back (CreateNewCdcStreamAtTable (NextPartId (opId, result), outTx, initialScan));
732+ }
733+ }
734+
735735namespace {
736736
737737ISubOperation::TPtr RejectOnCdcChecks (const TOperationId& opId, const TPath& streamPath, const bool acceptExisted) {
0 commit comments