Skip to content

Commit eec9aa8

Browse files
authored
Merge f18d598 into f39d389
2 parents f39d389 + f18d598 commit eec9aa8

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

51 files changed

+2017
-457
lines changed

ydb/core/kqp/executer_actor/kqp_data_executer.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2162,6 +2162,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
21622162
TTopicTabletTxs topicTxs;
21632163
TDatashardTxs datashardTxs;
21642164
TEvWriteTxs evWriteTxs;
2165+
21652166
if (!TxManager) {
21662167
BuildDatashardTxs(datashardTasks, datashardTxs, evWriteTxs, topicTxs);
21672168
}

ydb/core/kqp/session_actor/kqp_query_state.cpp

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -429,7 +429,7 @@ bool TKqpQueryState::PrepareNextStatementPart() {
429429
void TKqpQueryState::AddOffsetsToTransaction() {
430430
YQL_ENSURE(HasTopicOperations());
431431

432-
const auto& operations = GetTopicOperations();
432+
const auto& operations = GetTopicOperationsFromRequest();
433433

434434
TMaybe<TString> consumer;
435435
if (operations.HasConsumer()) {
@@ -442,7 +442,6 @@ void TKqpQueryState::AddOffsetsToTransaction() {
442442
}
443443

444444
TopicOperations = NTopic::TTopicOperations();
445-
446445
for (auto& topic : operations.GetTopics()) {
447446
auto path = CanonizePath(NPersQueue::GetFullTopicPath(GetDatabase(), topic.path()));
448447

@@ -452,8 +451,7 @@ void TKqpQueryState::AddOffsetsToTransaction() {
452451
} else {
453452
for (auto& range : partition.partition_offsets()) {
454453
YQL_ENSURE(consumer.Defined());
455-
456-
TopicOperations.AddOperation(path, partition.partition_id(), *consumer, range);
454+
TopicOperations.AddOperation(path, partition.partition_id(), *consumer, range, partition.force_commit(), partition.kill_read_session(), partition.only_check_commited_to_finish(), partition.read_session_id());
457455
}
458456
}
459457
}
@@ -474,7 +472,7 @@ std::unique_ptr<NSchemeCache::TSchemeCacheNavigate> TKqpQueryState::BuildSchemeC
474472
auto navigate = std::make_unique<NSchemeCache::TSchemeCacheNavigate>();
475473
navigate->DatabaseName = CanonizePath(GetDatabase());
476474

477-
const auto& operations = GetTopicOperations();
475+
const auto& operations = GetTopicOperationsFromRequest();
478476
TMaybe<TString> consumer;
479477
if (operations.HasConsumer())
480478
consumer = operations.GetConsumer();

ydb/core/kqp/session_actor/kqp_query_state.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -332,7 +332,7 @@ class TKqpQueryState : public TNonCopyable {
332332
return RequestEv->GetQuery();
333333
}
334334

335-
const ::NKikimrKqp::TTopicOperationsRequest& GetTopicOperations() const {
335+
const ::NKikimrKqp::TTopicOperationsRequest& GetTopicOperationsFromRequest() const {
336336
return RequestEv->GetTopicOperations();
337337
}
338338

ydb/core/kqp/session_actor/kqp_session_actor.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1358,7 +1358,6 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
13581358
}
13591359
}
13601360
}
1361-
13621361
request.TopicOperations = std::move(txCtx.TopicOperations);
13631362
} else if (QueryState->ShouldAcquireLocks(tx) && (!txCtx.HasOlapTable || Settings.TableService.GetEnableOlapSink())) {
13641363
request.AcquireLocksTxId = txCtx.Locks.GetLockTxId();

ydb/core/kqp/topics/kqp_topics.cpp

Lines changed: 85 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#include "kqp_topics.h"
22

33
#include <ydb/core/base/path.h>
4+
#include <ydb/core/protos/kqp.pb.h>
45
#include <ydb/core/persqueue/utils.h>
56
#include <ydb/library/actors/core/log.h>
67

@@ -26,35 +27,73 @@ static void UpdateSupportivePartition(TMaybe<ui32>& lhs, const TMaybe<ui32>& rhs
2627
//
2728
bool TConsumerOperations::IsValid() const
2829
{
29-
return Offsets_.GetNumIntervals() == 1;
30+
return Offsets_.GetNumIntervals() <= 1;
3031
}
3132

32-
std::pair<ui64, ui64> TConsumerOperations::GetRange() const
33+
std::pair<ui64, ui64> TConsumerOperations::GetOffsetsCommitRange() const
3334
{
3435
Y_ABORT_UNLESS(IsValid());
3536

36-
return {Offsets_.Min(), Offsets_.Max()};
37+
if (Offsets_.Empty()) {
38+
return {0,0};
39+
} else {
40+
return {Offsets_.Min(), Offsets_.Max()};
41+
}
42+
}
43+
44+
bool TConsumerOperations::GetForceCommit() const
45+
{
46+
return ForceCommit_;
47+
}
48+
49+
bool TConsumerOperations::GetKillReadSession() const
50+
{
51+
return KillReadSession_;
52+
}
53+
54+
bool TConsumerOperations::GetOnlyCheckCommitedToFinish() const
55+
{
56+
return OnlyCheckCommitedToFinish_;
3757
}
3858

39-
void TConsumerOperations::AddOperation(const TString& consumer, const Ydb::Topic::OffsetsRange& range)
59+
TString TConsumerOperations::GetReadSessionId() const
60+
{
61+
return ReadSessionId_;
62+
}
63+
64+
void TConsumerOperations::AddOperation(const TString& consumer,
65+
const NKikimrKqp::TTopicOperationsRequest_TopicOffsets_PartitionOffsets_OffsetsRange& range,
66+
bool forceCommit,
67+
bool killReadSession,
68+
bool onlyCheckCommitedToFinish,
69+
const TString& readSessionId)
4070
{
4171
Y_ABORT_UNLESS(Consumer_.Empty() || Consumer_ == consumer);
4272

43-
AddOperationImpl(consumer, range.start(), range.end());
73+
AddOperationImpl(consumer, range.start(), range.end(), forceCommit, killReadSession, onlyCheckCommitedToFinish, readSessionId);
4474
}
4575

4676
void TConsumerOperations::Merge(const TConsumerOperations& rhs)
4777
{
4878
Y_ABORT_UNLESS(rhs.Consumer_.Defined());
4979
Y_ABORT_UNLESS(Consumer_.Empty() || Consumer_ == rhs.Consumer_);
5080

51-
for (auto& range : rhs.Offsets_) {
52-
AddOperationImpl(*rhs.Consumer_, range.first, range.second);
81+
if (!rhs.Offsets_.Empty()) {
82+
for (auto& range : rhs.Offsets_) {
83+
AddOperationImpl(*rhs.Consumer_, range.first, range.second, rhs.GetForceCommit(), rhs.GetKillReadSession(), rhs.GetOnlyCheckCommitedToFinish(), rhs.GetReadSessionId());
84+
}
85+
} else {
86+
AddOperationImpl(*rhs.Consumer_, 0, 0, rhs.GetForceCommit(), rhs.GetKillReadSession(), rhs.GetOnlyCheckCommitedToFinish(), rhs.GetReadSessionId());
5387
}
5488
}
5589

5690
void TConsumerOperations::AddOperationImpl(const TString& consumer,
57-
ui64 begin, ui64 end)
91+
ui64 begin,
92+
ui64 end,
93+
bool forceCommit,
94+
bool killReadSession,
95+
bool onlyCheckCommitedToFinish,
96+
const TString& readSessionId)
5897
{
5998
if (Offsets_.Intersects(begin, end)) {
6099
ythrow TOffsetsRangeIntersectExpection() << "offset ranges intersect";
@@ -64,7 +103,14 @@ void TConsumerOperations::AddOperationImpl(const TString& consumer,
64103
Consumer_ = consumer;
65104
}
66105

67-
Offsets_.InsertInterval(begin, end);
106+
if (end != 0) {
107+
Offsets_.InsertInterval(begin, end);
108+
}
109+
110+
ForceCommit_ = forceCommit;
111+
KillReadSession_ = killReadSession;
112+
OnlyCheckCommitedToFinish_ = onlyCheckCommitedToFinish;
113+
ReadSessionId_ = readSessionId;
68114
}
69115

70116
//
@@ -76,9 +122,14 @@ bool TTopicPartitionOperations::IsValid() const
76122
[](auto& x) { return x.second.IsValid(); });
77123
}
78124

79-
void TTopicPartitionOperations::AddOperation(const TString& topic, ui32 partition,
125+
void TTopicPartitionOperations::AddOperation(const TString& topic,
126+
ui32 partition,
80127
const TString& consumer,
81-
const Ydb::Topic::OffsetsRange& range)
128+
const NKikimrKqp::TTopicOperationsRequest_TopicOffsets_PartitionOffsets_OffsetsRange& range,
129+
bool forceCommit,
130+
bool killReadSession,
131+
bool onlyCheckCommitedToFinish,
132+
const TString& readSessionId)
82133
{
83134
Y_ABORT_UNLESS(Topic_.Empty() || Topic_ == topic);
84135
Y_ABORT_UNLESS(Partition_.Empty() || Partition_ == partition);
@@ -88,7 +139,7 @@ void TTopicPartitionOperations::AddOperation(const TString& topic, ui32 partitio
88139
Partition_ = partition;
89140
}
90141

91-
Operations_[consumer].AddOperation(consumer, range);
142+
Operations_[consumer].AddOperation(consumer, range, forceCommit, killReadSession, onlyCheckCommitedToFinish, readSessionId);
92143
}
93144

94145
void TTopicPartitionOperations::AddOperation(const TString& topic, ui32 partition,
@@ -117,11 +168,15 @@ void TTopicPartitionOperations::BuildTopicTxs(TTopicOperationTransactions& txs)
117168
for (auto& [consumer, operations] : Operations_) {
118169
NKikimrPQ::TPartitionOperation* o = t.tx.MutableOperations()->Add();
119170
o->SetPartitionId(*Partition_);
120-
auto [begin, end] = operations.GetRange();
121-
o->SetBegin(begin);
122-
o->SetEnd(end);
171+
auto [begin, end] = operations.GetOffsetsCommitRange();
172+
o->SetCommitOffsetsBegin(begin);
173+
o->SetCommitOffsetsEnd(end);
123174
o->SetConsumer(consumer);
124175
o->SetPath(*Topic_);
176+
o->SetKillReadSession(operations.GetKillReadSession());
177+
o->SetForceCommit(operations.GetForceCommit());
178+
o->SetOnlyCheckCommitedToFinish(operations.GetOnlyCheckCommitedToFinish());
179+
o->SetReadSessionId(operations.GetReadSessionId());
125180
}
126181

127182
if (HasWriteOperations_) {
@@ -256,14 +311,25 @@ bool TTopicOperations::TabletHasReadOperations(ui64 tabletId) const
256311
return false;
257312
}
258313

259-
void TTopicOperations::AddOperation(const TString& topic, ui32 partition,
314+
void TTopicOperations::AddOperation(const TString& topic,
315+
ui32 partition,
260316
const TString& consumer,
261-
const Ydb::Topic::OffsetsRange& range)
317+
const NKikimrKqp::TTopicOperationsRequest_TopicOffsets_PartitionOffsets_OffsetsRange& range,
318+
bool forceCommit,
319+
bool killReadSession,
320+
bool onlyCheckCommitedToFinish,
321+
const TString& readSessionId
322+
)
262323
{
263324
TTopicPartition key{topic, partition};
264-
Operations_[key].AddOperation(topic, partition,
325+
Operations_[key].AddOperation(topic,
326+
partition,
265327
consumer,
266-
range);
328+
range,
329+
forceCommit,
330+
killReadSession,
331+
onlyCheckCommitedToFinish,
332+
readSessionId);
267333
HasReadOperations_ = true;
268334
}
269335

ydb/core/kqp/topics/kqp_topics.h

Lines changed: 38 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#include <ydb/public/api/protos/ydb_topic.pb.h>
55
#include <ydb/core/protos/pqconfig.pb.h>
66

7+
#include <ydb/core/protos/kqp.pb.h>
78
#include <ydb/core/tx/long_tx_service/public/lock_handle.h>
89
#include <ydb/core/tx/scheme_cache/scheme_cache.h>
910

@@ -26,20 +27,40 @@ class TConsumerOperations {
2627
public:
2728
bool IsValid() const;
2829

29-
std::pair<ui64, ui64> GetRange() const;
30+
std::pair<ui64, ui64> GetOffsetsCommitRange() const;
3031

31-
ui64 GetBegin() const;
32-
ui64 GetEnd() const;
32+
ui64 GetOffsetCommitBegin() const;
33+
ui64 GetOffsetCommitEnd() const;
34+
35+
bool GetForceCommit() const;
36+
bool GetKillReadSession() const;
37+
bool GetOnlyCheckCommitedToFinish() const;
38+
TString GetReadSessionId() const;
39+
40+
void AddOperation(const TString& consumer,
41+
const NKikimrKqp::TTopicOperationsRequest_TopicOffsets_PartitionOffsets_OffsetsRange& range,
42+
bool forceCommit = false,
43+
bool killReadSession = false,
44+
bool onlyCheckCommitedToFinish = false,
45+
const TString& readSessionId = {});
3346

34-
void AddOperation(const TString& consumer, const Ydb::Topic::OffsetsRange& range);
3547
void Merge(const TConsumerOperations& rhs);
3648

3749
private:
3850
void AddOperationImpl(const TString& consumer,
39-
ui64 begin, ui64 end);
51+
ui64 begin,
52+
ui64 end,
53+
bool forceCommit = false,
54+
bool killReadSession = false,
55+
bool onlyCheckCommitedToFinish = false,
56+
const TString& readSessionId = {});
4057

4158
TMaybe<TString> Consumer_;
4259
TDisjointIntervalTree<ui64> Offsets_;
60+
bool ForceCommit_ = false;
61+
bool KillReadSession_ = false;
62+
bool OnlyCheckCommitedToFinish_ = false;
63+
TString ReadSessionId_;
4364
};
4465

4566
struct TTopicOperationTransaction {
@@ -53,9 +74,14 @@ class TTopicPartitionOperations {
5374
public:
5475
bool IsValid() const;
5576

56-
void AddOperation(const TString& topic, ui32 partition,
77+
void AddOperation(const TString& topic,
78+
ui32 partition,
5779
const TString& consumer,
58-
const Ydb::Topic::OffsetsRange& range);
80+
const NKikimrKqp::TTopicOperationsRequest_TopicOffsets_PartitionOffsets_OffsetsRange& range,
81+
bool forceCommit = false,
82+
bool killReadSession = false,
83+
bool onlyCheckCommitedToFinish = false,
84+
const TString& readSessionId = {});
5985
void AddOperation(const TString& topic, ui32 partition,
6086
TMaybe<ui32> supportivePartition);
6187

@@ -108,7 +134,11 @@ class TTopicOperations {
108134

109135
void AddOperation(const TString& topic, ui32 partition,
110136
const TString& consumer,
111-
const Ydb::Topic::OffsetsRange& range);
137+
const NKikimrKqp::TTopicOperationsRequest_TopicOffsets_PartitionOffsets_OffsetsRange& range,
138+
bool forceCommit,
139+
bool killReadSession,
140+
bool onlyCheckCommitedToFinish,
141+
const TString& readSessionId);
112142
void AddOperation(const TString& topic, ui32 partition,
113143
TMaybe<ui32> supportivePartition);
114144

ydb/core/persqueue/events/internal.h

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -811,11 +811,15 @@ struct TEvPQ {
811811
{
812812
}
813813

814-
void AddOperation(TString consumer, ui64 begin, ui64 end) {
814+
void AddOperation(TString consumer, ui64 begin, ui64 end, bool forceCommit = false, bool killReadSession = false, bool onlyCheckCommitedToFinish = false, TString readSessionId = {}) {
815815
NKikimrPQ::TPartitionOperation operation;
816-
operation.SetBegin(begin);
817-
operation.SetEnd(end);
816+
operation.SetCommitOffsetsBegin(begin);
817+
operation.SetCommitOffsetsEnd(end);
818818
operation.SetConsumer(std::move(consumer));
819+
operation.SetForceCommit(forceCommit);
820+
operation.SetKillReadSession(killReadSession);
821+
operation.SetOnlyCheckCommitedToFinish(onlyCheckCommitedToFinish);
822+
operation.SetReadSessionId(readSessionId);
819823

820824
Operations.push_back(std::move(operation));
821825
}

0 commit comments

Comments
 (0)