11#include " kqp_topics.h"
22
33#include < ydb/core/base/path.h>
4+ #include < ydb/core/protos/kqp.pb.h>
45#include < ydb/core/persqueue/utils.h>
56#include < ydb/library/actors/core/log.h>
67
@@ -26,35 +27,73 @@ static void UpdateSupportivePartition(TMaybe<ui32>& lhs, const TMaybe<ui32>& rhs
2627//
2728bool TConsumerOperations::IsValid () const
2829{
29- return Offsets_.GetNumIntervals () = = 1 ;
30+ return Offsets_.GetNumIntervals () < = 1 ;
3031}
3132
32- std::pair<ui64, ui64> TConsumerOperations::GetRange () const
33+ std::pair<ui64, ui64> TConsumerOperations::GetOffsetsCommitRange () const
3334{
3435 Y_ABORT_UNLESS (IsValid ());
3536
36- return {Offsets_.Min (), Offsets_.Max ()};
37+ if (Offsets_.Empty ()) {
38+ return {0 ,0 };
39+ } else {
40+ return {Offsets_.Min (), Offsets_.Max ()};
41+ }
42+ }
43+
44+ bool TConsumerOperations::GetForceCommit () const
45+ {
46+ return ForceCommit_;
47+ }
48+
49+ bool TConsumerOperations::GetKillReadSession () const
50+ {
51+ return KillReadSession_;
52+ }
53+
54+ bool TConsumerOperations::GetOnlyCheckCommitedToFinish () const
55+ {
56+ return OnlyCheckCommitedToFinish_;
3757}
3858
39- void TConsumerOperations::AddOperation (const TString& consumer, const Ydb::Topic::OffsetsRange& range)
59+ TString TConsumerOperations::GetReadSessionId () const
60+ {
61+ return ReadSessionId_;
62+ }
63+
64+ void TConsumerOperations::AddOperation (const TString& consumer,
65+ const NKikimrKqp::TTopicOperationsRequest_TopicOffsets_PartitionOffsets_OffsetsRange& range,
66+ bool forceCommit,
67+ bool killReadSession,
68+ bool onlyCheckCommitedToFinish,
69+ const TString& readSessionId)
4070{
4171 Y_ABORT_UNLESS (Consumer_.Empty () || Consumer_ == consumer);
4272
43- AddOperationImpl (consumer, range.start (), range.end ());
73+ AddOperationImpl (consumer, range.start (), range.end (), forceCommit, killReadSession, onlyCheckCommitedToFinish, readSessionId );
4474}
4575
4676void TConsumerOperations::Merge (const TConsumerOperations& rhs)
4777{
4878 Y_ABORT_UNLESS (rhs.Consumer_ .Defined ());
4979 Y_ABORT_UNLESS (Consumer_.Empty () || Consumer_ == rhs.Consumer_ );
5080
51- for (auto & range : rhs.Offsets_ ) {
52- AddOperationImpl (*rhs.Consumer_ , range.first , range.second );
81+ if (!rhs.Offsets_ .Empty ()) {
82+ for (auto & range : rhs.Offsets_ ) {
83+ AddOperationImpl (*rhs.Consumer_ , range.first , range.second , rhs.GetForceCommit (), rhs.GetKillReadSession (), rhs.GetOnlyCheckCommitedToFinish (), rhs.GetReadSessionId ());
84+ }
85+ } else {
86+ AddOperationImpl (*rhs.Consumer_ , 0 , 0 , rhs.GetForceCommit (), rhs.GetKillReadSession (), rhs.GetOnlyCheckCommitedToFinish (), rhs.GetReadSessionId ());
5387 }
5488}
5589
5690void TConsumerOperations::AddOperationImpl (const TString& consumer,
57- ui64 begin, ui64 end)
91+ ui64 begin,
92+ ui64 end,
93+ bool forceCommit,
94+ bool killReadSession,
95+ bool onlyCheckCommitedToFinish,
96+ const TString& readSessionId)
5897{
5998 if (Offsets_.Intersects (begin, end)) {
6099 ythrow TOffsetsRangeIntersectExpection () << " offset ranges intersect" ;
@@ -64,7 +103,14 @@ void TConsumerOperations::AddOperationImpl(const TString& consumer,
64103 Consumer_ = consumer;
65104 }
66105
67- Offsets_.InsertInterval (begin, end);
106+ if (end != 0 ) {
107+ Offsets_.InsertInterval (begin, end);
108+ }
109+
110+ ForceCommit_ = forceCommit;
111+ KillReadSession_ = killReadSession;
112+ OnlyCheckCommitedToFinish_ = onlyCheckCommitedToFinish;
113+ ReadSessionId_ = readSessionId;
68114}
69115
70116//
@@ -76,9 +122,14 @@ bool TTopicPartitionOperations::IsValid() const
76122 [](auto & x) { return x.second .IsValid (); });
77123}
78124
79- void TTopicPartitionOperations::AddOperation (const TString& topic, ui32 partition,
125+ void TTopicPartitionOperations::AddOperation (const TString& topic,
126+ ui32 partition,
80127 const TString& consumer,
81- const Ydb::Topic::OffsetsRange& range)
128+ const NKikimrKqp::TTopicOperationsRequest_TopicOffsets_PartitionOffsets_OffsetsRange& range,
129+ bool forceCommit,
130+ bool killReadSession,
131+ bool onlyCheckCommitedToFinish,
132+ const TString& readSessionId)
82133{
83134 Y_ABORT_UNLESS (Topic_.Empty () || Topic_ == topic);
84135 Y_ABORT_UNLESS (Partition_.Empty () || Partition_ == partition);
@@ -88,7 +139,7 @@ void TTopicPartitionOperations::AddOperation(const TString& topic, ui32 partitio
88139 Partition_ = partition;
89140 }
90141
91- Operations_[consumer].AddOperation (consumer, range);
142+ Operations_[consumer].AddOperation (consumer, range, forceCommit, killReadSession, onlyCheckCommitedToFinish, readSessionId );
92143}
93144
94145void TTopicPartitionOperations::AddOperation (const TString& topic, ui32 partition,
@@ -117,11 +168,15 @@ void TTopicPartitionOperations::BuildTopicTxs(TTopicOperationTransactions& txs)
117168 for (auto & [consumer, operations] : Operations_) {
118169 NKikimrPQ::TPartitionOperation* o = t.tx .MutableOperations ()->Add ();
119170 o->SetPartitionId (*Partition_);
120- auto [begin, end] = operations.GetRange ();
121- o->SetBegin (begin);
122- o->SetEnd (end);
171+ auto [begin, end] = operations.GetOffsetsCommitRange ();
172+ o->SetCommitOffsetsBegin (begin);
173+ o->SetCommitOffsetsEnd (end);
123174 o->SetConsumer (consumer);
124175 o->SetPath (*Topic_);
176+ o->SetKillReadSession (operations.GetKillReadSession ());
177+ o->SetForceCommit (operations.GetForceCommit ());
178+ o->SetOnlyCheckCommitedToFinish (operations.GetOnlyCheckCommitedToFinish ());
179+ o->SetReadSessionId (operations.GetReadSessionId ());
125180 }
126181
127182 if (HasWriteOperations_) {
@@ -251,14 +306,25 @@ bool TTopicOperations::TabletHasReadOperations(ui64 tabletId) const
251306 return false ;
252307}
253308
254- void TTopicOperations::AddOperation (const TString& topic, ui32 partition,
309+ void TTopicOperations::AddOperation (const TString& topic,
310+ ui32 partition,
255311 const TString& consumer,
256- const Ydb::Topic::OffsetsRange& range)
312+ const NKikimrKqp::TTopicOperationsRequest_TopicOffsets_PartitionOffsets_OffsetsRange& range,
313+ bool forceCommit,
314+ bool killReadSession,
315+ bool onlyCheckCommitedToFinish,
316+ const TString& readSessionId
317+ )
257318{
258319 TTopicPartition key{topic, partition};
259- Operations_[key].AddOperation (topic, partition,
320+ Operations_[key].AddOperation (topic,
321+ partition,
260322 consumer,
261- range);
323+ range,
324+ forceCommit,
325+ killReadSession,
326+ onlyCheckCommitedToFinish,
327+ readSessionId);
262328 HasReadOperations_ = true ;
263329}
264330
0 commit comments