11#include " kqp_topics.h"
22
33#include < ydb/core/base/path.h>
4+ #include < ydb/core/protos/kqp.pb.h>
45#include < ydb/core/persqueue/utils.h>
56#include < ydb/library/actors/core/log.h>
67
@@ -26,21 +27,44 @@ static void UpdateSupportivePartition(TMaybe<ui32>& lhs, const TMaybe<ui32>& rhs
2627//
2728bool TConsumerOperations::IsValid () const
2829{
29- return Offsets_.GetNumIntervals () = = 1 ;
30+ return Offsets_.GetNumIntervals () < = 1 ;
3031}
3132
32- std::pair<ui64, ui64> TConsumerOperations::GetRange () const
33+ std::pair<ui64, ui64> TConsumerOperations::GetOffsetsCommitRange () const
3334{
3435 Y_ABORT_UNLESS (IsValid ());
3536
36- return {Offsets_.Min (), Offsets_.Max ()};
37+ if (Offsets_.Empty ()) {
38+ return {0 ,0 };
39+ } else {
40+ return {Offsets_.Min (), Offsets_.Max ()};
41+ }
42+ }
43+
44+ bool TConsumerOperations::GetForceCommit () const
45+ {
46+ return ForceCommit_;
47+ }
48+
49+ bool TConsumerOperations::GetKillReadSession () const
50+ {
51+ return KillReadSession_;
3752}
3853
39- void TConsumerOperations::AddOperation (const TString& consumer, const Ydb::Topic::OffsetsRange& range)
54+ bool TConsumerOperations::GetOnlyCheckCommitedToFinish () const
55+ {
56+ return OnlyCheckCommitedToFinish_;
57+ }
58+
59+ void TConsumerOperations::AddOperation (const TString& consumer,
60+ const NKikimrKqp::TTopicOperationsRequest_TopicOffsets_PartitionOffsets_OffsetsRange& range,
61+ bool forceCommit,
62+ bool killReadSession,
63+ bool onlyCheckCommitedToFinish)
4064{
4165 Y_ABORT_UNLESS (Consumer_.Empty () || Consumer_ == consumer);
4266
43- AddOperationImpl (consumer, range.start (), range.end ());
67+ AddOperationImpl (consumer, range.start (), range.end (), forceCommit, killReadSession, onlyCheckCommitedToFinish );
4468}
4569
4670void TConsumerOperations::Merge (const TConsumerOperations& rhs)
@@ -49,12 +73,16 @@ void TConsumerOperations::Merge(const TConsumerOperations& rhs)
4973 Y_ABORT_UNLESS (Consumer_.Empty () || Consumer_ == rhs.Consumer_ );
5074
5175 for (auto & range : rhs.Offsets_ ) {
52- AddOperationImpl (*rhs.Consumer_ , range.first , range.second );
76+ AddOperationImpl (*rhs.Consumer_ , range.first , range.second , rhs. GetForceCommit (), rhs. GetKillReadSession (), rhs. GetOnlyCheckCommitedToFinish () );
5377 }
5478}
5579
5680void TConsumerOperations::AddOperationImpl (const TString& consumer,
57- ui64 begin, ui64 end)
81+ ui64 begin,
82+ ui64 end,
83+ bool forceCommit,
84+ bool killReadSession,
85+ bool onlyCheckCommitedToFinish)
5886{
5987 if (Offsets_.Intersects (begin, end)) {
6088 ythrow TOffsetsRangeIntersectExpection () << " offset ranges intersect" ;
@@ -64,7 +92,13 @@ void TConsumerOperations::AddOperationImpl(const TString& consumer,
6492 Consumer_ = consumer;
6593 }
6694
67- Offsets_.InsertInterval (begin, end);
95+ if (end != 0 ) {
96+ Offsets_.InsertInterval (begin, end);
97+ }
98+
99+ ForceCommit_ = forceCommit;
100+ KillReadSession_ = killReadSession;
101+ OnlyCheckCommitedToFinish_ = onlyCheckCommitedToFinish;
68102}
69103
70104//
@@ -76,9 +110,13 @@ bool TTopicPartitionOperations::IsValid() const
76110 [](auto & x) { return x.second .IsValid (); });
77111}
78112
79- void TTopicPartitionOperations::AddOperation (const TString& topic, ui32 partition,
113+ void TTopicPartitionOperations::AddOperation (const TString& topic,
114+ ui32 partition,
80115 const TString& consumer,
81- const Ydb::Topic::OffsetsRange& range)
116+ const NKikimrKqp::TTopicOperationsRequest_TopicOffsets_PartitionOffsets_OffsetsRange& range,
117+ bool forceCommit,
118+ bool killReadSession,
119+ bool onlyCheckCommitedToFinish)
82120{
83121 Y_ABORT_UNLESS (Topic_.Empty () || Topic_ == topic);
84122 Y_ABORT_UNLESS (Partition_.Empty () || Partition_ == partition);
@@ -88,7 +126,7 @@ void TTopicPartitionOperations::AddOperation(const TString& topic, ui32 partitio
88126 Partition_ = partition;
89127 }
90128
91- Operations_[consumer].AddOperation (consumer, range);
129+ Operations_[consumer].AddOperation (consumer, range, forceCommit, killReadSession, onlyCheckCommitedToFinish );
92130}
93131
94132void TTopicPartitionOperations::AddOperation (const TString& topic, ui32 partition,
@@ -117,11 +155,14 @@ void TTopicPartitionOperations::BuildTopicTxs(TTopicOperationTransactions& txs)
117155 for (auto & [consumer, operations] : Operations_) {
118156 NKikimrPQ::TPartitionOperation* o = t.tx .MutableOperations ()->Add ();
119157 o->SetPartitionId (*Partition_);
120- auto [begin, end] = operations.GetRange ();
121- o->SetBegin (begin);
122- o->SetEnd (end);
158+ auto [begin, end] = operations.GetOffsetsCommitRange ();
159+ o->SetCommitOffsetsBegin (begin);
160+ o->SetCommitOffsetsEnd (end);
123161 o->SetConsumer (consumer);
124162 o->SetPath (*Topic_);
163+ o->SetKillReadSession (operations.GetKillReadSession ());
164+ o->SetForceCommit (operations.GetForceCommit ());
165+ o->SetOnlyCheckCommitedToFinish (operations.GetOnlyCheckCommitedToFinish ());
125166 }
126167
127168 if (HasWriteOperations_) {
@@ -251,14 +292,23 @@ bool TTopicOperations::TabletHasReadOperations(ui64 tabletId) const
251292 return false ;
252293}
253294
254- void TTopicOperations::AddOperation (const TString& topic, ui32 partition,
295+ void TTopicOperations::AddOperation (const TString& topic,
296+ ui32 partition,
255297 const TString& consumer,
256- const Ydb::Topic::OffsetsRange& range)
298+ const NKikimrKqp::TTopicOperationsRequest_TopicOffsets_PartitionOffsets_OffsetsRange& range,
299+ bool forceCommit,
300+ bool killReadSession,
301+ bool onlyCheckCommitedToFinish
302+ )
257303{
258304 TTopicPartition key{topic, partition};
259- Operations_[key].AddOperation (topic, partition,
305+ Operations_[key].AddOperation (topic,
306+ partition,
260307 consumer,
261- range);
308+ range,
309+ forceCommit,
310+ killReadSession,
311+ onlyCheckCommitedToFinish);
262312 HasReadOperations_ = true ;
263313}
264314
0 commit comments