Skip to content

Commit 0cbe7ee

Browse files
authored
Merge 6b41472 into 9e60c00
2 parents 9e60c00 + 6b41472 commit 0cbe7ee

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+1455
-376
lines changed

ydb/core/kqp/executer_actor/kqp_data_executer.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2098,6 +2098,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
20982098
TTopicTabletTxs topicTxs;
20992099
TDatashardTxs datashardTxs;
21002100
TEvWriteTxs evWriteTxs;
2101+
21012102
if (!TxManager) {
21022103
BuildDatashardTxs(datashardTasks, datashardTxs, evWriteTxs, topicTxs);
21032104
}
@@ -2382,7 +2383,6 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
23822383
YQL_ENSURE(!ReadOnlyTx);
23832384
}
23842385
}
2385-
23862386
Request.TopicOperations.BuildTopicTxs(topicTxs);
23872387

23882388
const bool needRollback = Request.LocksOp == ELocksOp::Rollback;

ydb/core/kqp/session_actor/kqp_query_state.cpp

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -428,7 +428,7 @@ bool TKqpQueryState::PrepareNextStatementPart() {
428428
void TKqpQueryState::AddOffsetsToTransaction() {
429429
YQL_ENSURE(HasTopicOperations());
430430

431-
const auto& operations = GetTopicOperations();
431+
const auto& operations = GetTopicOperationsFromRequest();
432432

433433
TMaybe<TString> consumer;
434434
if (operations.HasConsumer()) {
@@ -441,7 +441,6 @@ void TKqpQueryState::AddOffsetsToTransaction() {
441441
}
442442

443443
TopicOperations = NTopic::TTopicOperations();
444-
445444
for (auto& topic : operations.GetTopics()) {
446445
auto path = CanonizePath(NPersQueue::GetFullTopicPath(TlsActivationContext->AsActorContext(),
447446
GetDatabase(), topic.path()));
@@ -452,8 +451,7 @@ void TKqpQueryState::AddOffsetsToTransaction() {
452451
} else {
453452
for (auto& range : partition.partition_offsets()) {
454453
YQL_ENSURE(consumer.Defined());
455-
456-
TopicOperations.AddOperation(path, partition.partition_id(), *consumer, range);
454+
TopicOperations.AddOperation(path, partition.partition_id(), *consumer, range, partition.force_commit(), partition.kill_read_session(), partition.only_check_commited_to_finish(), partition.read_session_id());
457455
}
458456
}
459457
}
@@ -474,7 +472,7 @@ std::unique_ptr<NSchemeCache::TSchemeCacheNavigate> TKqpQueryState::BuildSchemeC
474472
auto navigate = std::make_unique<NSchemeCache::TSchemeCacheNavigate>();
475473
navigate->DatabaseName = CanonizePath(GetDatabase());
476474

477-
const auto& operations = GetTopicOperations();
475+
const auto& operations = GetTopicOperationsFromRequest();
478476
TMaybe<TString> consumer;
479477
if (operations.HasConsumer())
480478
consumer = operations.GetConsumer();

ydb/core/kqp/session_actor/kqp_query_state.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -317,7 +317,7 @@ class TKqpQueryState : public TNonCopyable {
317317
return RequestEv->GetQuery();
318318
}
319319

320-
const ::NKikimrKqp::TTopicOperationsRequest& GetTopicOperations() const {
320+
const ::NKikimrKqp::TTopicOperationsRequest& GetTopicOperationsFromRequest() const {
321321
return RequestEv->GetTopicOperations();
322322
}
323323

ydb/core/kqp/session_actor/kqp_session_actor.cpp

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -817,7 +817,6 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
817817
QueryState->QueryData = std::make_shared<TQueryData>(QueryState->TxCtx->TxAlloc);
818818
QueryState->TxCtx->SetIsolationLevel(settings);
819819
QueryState->TxCtx->OnBeginQuery();
820-
821820
if (!Transactions.CreateNew(QueryState->TxId.GetValue(), QueryState->TxCtx)) {
822821
std::vector<TIssue> issues{
823822
YqlIssue({}, TIssuesIds::KIKIMR_TOO_MANY_TRANSACTIONS)};
@@ -1293,7 +1292,6 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
12931292
}
12941293
}
12951294
}
1296-
12971295
request.TopicOperations = std::move(txCtx.TopicOperations);
12981296
} else if (QueryState->ShouldAcquireLocks(tx) && (!txCtx.HasOlapTable || Settings.TableService.GetEnableOlapSink())) {
12991297
request.AcquireLocksTxId = txCtx.Locks.GetLockTxId();

ydb/core/kqp/topics/kqp_topics.cpp

Lines changed: 85 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#include "kqp_topics.h"
22

33
#include <ydb/core/base/path.h>
4+
#include <ydb/core/protos/kqp.pb.h>
45
#include <ydb/core/persqueue/utils.h>
56
#include <ydb/library/actors/core/log.h>
67

@@ -26,35 +27,73 @@ static void UpdateSupportivePartition(TMaybe<ui32>& lhs, const TMaybe<ui32>& rhs
2627
//
2728
bool TConsumerOperations::IsValid() const
2829
{
29-
return Offsets_.GetNumIntervals() == 1;
30+
return Offsets_.GetNumIntervals() <= 1;
3031
}
3132

32-
std::pair<ui64, ui64> TConsumerOperations::GetRange() const
33+
std::pair<ui64, ui64> TConsumerOperations::GetOffsetsCommitRange() const
3334
{
3435
Y_ABORT_UNLESS(IsValid());
3536

36-
return {Offsets_.Min(), Offsets_.Max()};
37+
if (Offsets_.Empty()) {
38+
return {0,0};
39+
} else {
40+
return {Offsets_.Min(), Offsets_.Max()};
41+
}
42+
}
43+
44+
bool TConsumerOperations::GetForceCommit() const
45+
{
46+
return ForceCommit_;
47+
}
48+
49+
bool TConsumerOperations::GetKillReadSession() const
50+
{
51+
return KillReadSession_;
52+
}
53+
54+
bool TConsumerOperations::GetOnlyCheckCommitedToFinish() const
55+
{
56+
return OnlyCheckCommitedToFinish_;
3757
}
3858

39-
void TConsumerOperations::AddOperation(const TString& consumer, const Ydb::Topic::OffsetsRange& range)
59+
TString TConsumerOperations::GetReadSessionId() const
60+
{
61+
return ReadSessionId_;
62+
}
63+
64+
void TConsumerOperations::AddOperation(const TString& consumer,
65+
const NKikimrKqp::TTopicOperationsRequest_TopicOffsets_PartitionOffsets_OffsetsRange& range,
66+
bool forceCommit,
67+
bool killReadSession,
68+
bool onlyCheckCommitedToFinish,
69+
const TString& readSessionId)
4070
{
4171
Y_ABORT_UNLESS(Consumer_.Empty() || Consumer_ == consumer);
4272

43-
AddOperationImpl(consumer, range.start(), range.end());
73+
AddOperationImpl(consumer, range.start(), range.end(), forceCommit, killReadSession, onlyCheckCommitedToFinish, readSessionId);
4474
}
4575

4676
void TConsumerOperations::Merge(const TConsumerOperations& rhs)
4777
{
4878
Y_ABORT_UNLESS(rhs.Consumer_.Defined());
4979
Y_ABORT_UNLESS(Consumer_.Empty() || Consumer_ == rhs.Consumer_);
5080

51-
for (auto& range : rhs.Offsets_) {
52-
AddOperationImpl(*rhs.Consumer_, range.first, range.second);
81+
if (!rhs.Offsets_.Empty()) {
82+
for (auto& range : rhs.Offsets_) {
83+
AddOperationImpl(*rhs.Consumer_, range.first, range.second, rhs.GetForceCommit(), rhs.GetKillReadSession(), rhs.GetOnlyCheckCommitedToFinish(), rhs.GetReadSessionId());
84+
}
85+
} else {
86+
AddOperationImpl(*rhs.Consumer_, 0, 0, rhs.GetForceCommit(), rhs.GetKillReadSession(), rhs.GetOnlyCheckCommitedToFinish(), rhs.GetReadSessionId());
5387
}
5488
}
5589

5690
void TConsumerOperations::AddOperationImpl(const TString& consumer,
57-
ui64 begin, ui64 end)
91+
ui64 begin,
92+
ui64 end,
93+
bool forceCommit,
94+
bool killReadSession,
95+
bool onlyCheckCommitedToFinish,
96+
const TString& readSessionId)
5897
{
5998
if (Offsets_.Intersects(begin, end)) {
6099
ythrow TOffsetsRangeIntersectExpection() << "offset ranges intersect";
@@ -64,7 +103,14 @@ void TConsumerOperations::AddOperationImpl(const TString& consumer,
64103
Consumer_ = consumer;
65104
}
66105

67-
Offsets_.InsertInterval(begin, end);
106+
if (end != 0) {
107+
Offsets_.InsertInterval(begin, end);
108+
}
109+
110+
ForceCommit_ = forceCommit;
111+
KillReadSession_ = killReadSession;
112+
OnlyCheckCommitedToFinish_ = onlyCheckCommitedToFinish;
113+
ReadSessionId_ = readSessionId;
68114
}
69115

70116
//
@@ -76,9 +122,14 @@ bool TTopicPartitionOperations::IsValid() const
76122
[](auto& x) { return x.second.IsValid(); });
77123
}
78124

79-
void TTopicPartitionOperations::AddOperation(const TString& topic, ui32 partition,
125+
void TTopicPartitionOperations::AddOperation(const TString& topic,
126+
ui32 partition,
80127
const TString& consumer,
81-
const Ydb::Topic::OffsetsRange& range)
128+
const NKikimrKqp::TTopicOperationsRequest_TopicOffsets_PartitionOffsets_OffsetsRange& range,
129+
bool forceCommit,
130+
bool killReadSession,
131+
bool onlyCheckCommitedToFinish,
132+
const TString& readSessionId)
82133
{
83134
Y_ABORT_UNLESS(Topic_.Empty() || Topic_ == topic);
84135
Y_ABORT_UNLESS(Partition_.Empty() || Partition_ == partition);
@@ -88,7 +139,7 @@ void TTopicPartitionOperations::AddOperation(const TString& topic, ui32 partitio
88139
Partition_ = partition;
89140
}
90141

91-
Operations_[consumer].AddOperation(consumer, range);
142+
Operations_[consumer].AddOperation(consumer, range, forceCommit, killReadSession, onlyCheckCommitedToFinish, readSessionId);
92143
}
93144

94145
void TTopicPartitionOperations::AddOperation(const TString& topic, ui32 partition,
@@ -117,11 +168,15 @@ void TTopicPartitionOperations::BuildTopicTxs(TTopicOperationTransactions& txs)
117168
for (auto& [consumer, operations] : Operations_) {
118169
NKikimrPQ::TPartitionOperation* o = t.tx.MutableOperations()->Add();
119170
o->SetPartitionId(*Partition_);
120-
auto [begin, end] = operations.GetRange();
121-
o->SetBegin(begin);
122-
o->SetEnd(end);
171+
auto [begin, end] = operations.GetOffsetsCommitRange();
172+
o->SetCommitOffsetsBegin(begin);
173+
o->SetCommitOffsetsEnd(end);
123174
o->SetConsumer(consumer);
124175
o->SetPath(*Topic_);
176+
o->SetKillReadSession(operations.GetKillReadSession());
177+
o->SetForceCommit(operations.GetForceCommit());
178+
o->SetOnlyCheckCommitedToFinish(operations.GetOnlyCheckCommitedToFinish());
179+
o->SetReadSessionId(operations.GetReadSessionId());
125180
}
126181

127182
if (HasWriteOperations_) {
@@ -251,14 +306,25 @@ bool TTopicOperations::TabletHasReadOperations(ui64 tabletId) const
251306
return false;
252307
}
253308

254-
void TTopicOperations::AddOperation(const TString& topic, ui32 partition,
309+
void TTopicOperations::AddOperation(const TString& topic,
310+
ui32 partition,
255311
const TString& consumer,
256-
const Ydb::Topic::OffsetsRange& range)
312+
const NKikimrKqp::TTopicOperationsRequest_TopicOffsets_PartitionOffsets_OffsetsRange& range,
313+
bool forceCommit,
314+
bool killReadSession,
315+
bool onlyCheckCommitedToFinish,
316+
const TString& readSessionId
317+
)
257318
{
258319
TTopicPartition key{topic, partition};
259-
Operations_[key].AddOperation(topic, partition,
320+
Operations_[key].AddOperation(topic,
321+
partition,
260322
consumer,
261-
range);
323+
range,
324+
forceCommit,
325+
killReadSession,
326+
onlyCheckCommitedToFinish,
327+
readSessionId);
262328
HasReadOperations_ = true;
263329
}
264330

ydb/core/kqp/topics/kqp_topics.h

Lines changed: 38 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#include <ydb/public/api/protos/ydb_topic.pb.h>
55
#include <ydb/core/protos/pqconfig.pb.h>
66

7+
#include <ydb/core/protos/kqp.pb.h>
78
#include <ydb/core/tx/long_tx_service/public/lock_handle.h>
89
#include <ydb/core/tx/scheme_cache/scheme_cache.h>
910

@@ -26,20 +27,40 @@ class TConsumerOperations {
2627
public:
2728
bool IsValid() const;
2829

29-
std::pair<ui64, ui64> GetRange() const;
30+
std::pair<ui64, ui64> GetOffsetsCommitRange() const;
3031

31-
ui64 GetBegin() const;
32-
ui64 GetEnd() const;
32+
ui64 GetOffsetCommitBegin() const;
33+
ui64 GetOffsetCommitEnd() const;
34+
35+
bool GetForceCommit() const;
36+
bool GetKillReadSession() const;
37+
bool GetOnlyCheckCommitedToFinish() const;
38+
TString GetReadSessionId() const;
39+
40+
void AddOperation(const TString& consumer,
41+
const NKikimrKqp::TTopicOperationsRequest_TopicOffsets_PartitionOffsets_OffsetsRange& range,
42+
bool forceCommit = false,
43+
bool killReadSession = false,
44+
bool onlyCheckCommitedToFinish = false,
45+
const TString& readSessionId = {});
3346

34-
void AddOperation(const TString& consumer, const Ydb::Topic::OffsetsRange& range);
3547
void Merge(const TConsumerOperations& rhs);
3648

3749
private:
3850
void AddOperationImpl(const TString& consumer,
39-
ui64 begin, ui64 end);
51+
ui64 begin,
52+
ui64 end,
53+
bool forceCommit = false,
54+
bool killReadSession = false,
55+
bool onlyCheckCommitedToFinish = false,
56+
const TString& readSessionId = {});
4057

4158
TMaybe<TString> Consumer_;
4259
TDisjointIntervalTree<ui64> Offsets_;
60+
bool ForceCommit_ = false;
61+
bool KillReadSession_ = false;
62+
bool OnlyCheckCommitedToFinish_ = false;
63+
TString ReadSessionId_;
4364
};
4465

4566
struct TTopicOperationTransaction {
@@ -53,9 +74,14 @@ class TTopicPartitionOperations {
5374
public:
5475
bool IsValid() const;
5576

56-
void AddOperation(const TString& topic, ui32 partition,
77+
void AddOperation(const TString& topic,
78+
ui32 partition,
5779
const TString& consumer,
58-
const Ydb::Topic::OffsetsRange& range);
80+
const NKikimrKqp::TTopicOperationsRequest_TopicOffsets_PartitionOffsets_OffsetsRange& range,
81+
bool forceCommit = false,
82+
bool killReadSession = false,
83+
bool onlyCheckCommitedToFinish = false,
84+
const TString& readSessionId = {});
5985
void AddOperation(const TString& topic, ui32 partition,
6086
TMaybe<ui32> supportivePartition);
6187

@@ -106,7 +132,11 @@ class TTopicOperations {
106132

107133
void AddOperation(const TString& topic, ui32 partition,
108134
const TString& consumer,
109-
const Ydb::Topic::OffsetsRange& range);
135+
const NKikimrKqp::TTopicOperationsRequest_TopicOffsets_PartitionOffsets_OffsetsRange& range,
136+
bool forceCommit,
137+
bool killReadSession,
138+
bool onlyCheckCommitedToFinish,
139+
const TString& readSessionId);
110140
void AddOperation(const TString& topic, ui32 partition,
111141
TMaybe<ui32> supportivePartition);
112142

ydb/core/persqueue/events/internal.h

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -810,11 +810,15 @@ struct TEvPQ {
810810
{
811811
}
812812

813-
void AddOperation(TString consumer, ui64 begin, ui64 end) {
813+
void AddOperation(TString consumer, ui64 begin, ui64 end, bool forceCommit = false, bool killReadSession = false, bool onlyCheckCommitedToFinish = false, TString readSessionId = {}) {
814814
NKikimrPQ::TPartitionOperation operation;
815-
operation.SetBegin(begin);
816-
operation.SetEnd(end);
815+
operation.SetCommitOffsetsBegin(begin);
816+
operation.SetCommitOffsetsEnd(end);
817817
operation.SetConsumer(std::move(consumer));
818+
operation.SetForceCommit(forceCommit);
819+
operation.SetKillReadSession(killReadSession);
820+
operation.SetOnlyCheckCommitedToFinish(onlyCheckCommitedToFinish);
821+
operation.SetReadSessionId(readSessionId);
818822

819823
Operations.push_back(std::move(operation));
820824
}

0 commit comments

Comments
 (0)