Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ydb/core/kqp/executer_actor/kqp_data_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2162,6 +2162,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
TTopicTabletTxs topicTxs;
TDatashardTxs datashardTxs;
TEvWriteTxs evWriteTxs;

if (!TxManager) {
BuildDatashardTxs(datashardTasks, datashardTxs, evWriteTxs, topicTxs);
}
Expand Down
8 changes: 3 additions & 5 deletions ydb/core/kqp/session_actor/kqp_query_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ bool TKqpQueryState::PrepareNextStatementPart() {
void TKqpQueryState::AddOffsetsToTransaction() {
YQL_ENSURE(HasTopicOperations());

const auto& operations = GetTopicOperations();
const auto& operations = GetTopicOperationsFromRequest();

TMaybe<TString> consumer;
if (operations.HasConsumer()) {
Expand All @@ -442,7 +442,6 @@ void TKqpQueryState::AddOffsetsToTransaction() {
}

TopicOperations = NTopic::TTopicOperations();

for (auto& topic : operations.GetTopics()) {
auto path = CanonizePath(NPersQueue::GetFullTopicPath(GetDatabase(), topic.path()));

Expand All @@ -452,8 +451,7 @@ void TKqpQueryState::AddOffsetsToTransaction() {
} else {
for (auto& range : partition.partition_offsets()) {
YQL_ENSURE(consumer.Defined());

TopicOperations.AddOperation(path, partition.partition_id(), *consumer, range);
TopicOperations.AddOperation(path, partition.partition_id(), *consumer, range, partition.force_commit(), partition.kill_read_session(), partition.only_check_commited_to_finish(), partition.read_session_id());
}
}
}
Expand All @@ -474,7 +472,7 @@ std::unique_ptr<NSchemeCache::TSchemeCacheNavigate> TKqpQueryState::BuildSchemeC
auto navigate = std::make_unique<NSchemeCache::TSchemeCacheNavigate>();
navigate->DatabaseName = CanonizePath(GetDatabase());

const auto& operations = GetTopicOperations();
const auto& operations = GetTopicOperationsFromRequest();
TMaybe<TString> consumer;
if (operations.HasConsumer())
consumer = operations.GetConsumer();
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/session_actor/kqp_query_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ class TKqpQueryState : public TNonCopyable {
return RequestEv->GetQuery();
}

const ::NKikimrKqp::TTopicOperationsRequest& GetTopicOperations() const {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

а почему здесь важно, что FromReqest? Есть не из request?

const ::NKikimrKqp::TTopicOperationsRequest& GetTopicOperationsFromRequest() const {
return RequestEv->GetTopicOperations();
}

Expand Down
1 change: 0 additions & 1 deletion ydb/core/kqp/session_actor/kqp_session_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1358,7 +1358,6 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
}
}
}

request.TopicOperations = std::move(txCtx.TopicOperations);
} else if (QueryState->ShouldAcquireLocks(tx) && (!txCtx.HasOlapTable || Settings.TableService.GetEnableOlapSink())) {
request.AcquireLocksTxId = txCtx.Locks.GetLockTxId();
Expand Down
104 changes: 85 additions & 19 deletions ydb/core/kqp/topics/kqp_topics.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "kqp_topics.h"

#include <ydb/core/base/path.h>
#include <ydb/core/protos/kqp.pb.h>
#include <ydb/core/persqueue/utils.h>
#include <ydb/library/actors/core/log.h>

Expand All @@ -26,35 +27,73 @@ static void UpdateSupportivePartition(TMaybe<ui32>& lhs, const TMaybe<ui32>& rhs
//
bool TConsumerOperations::IsValid() const
{
return Offsets_.GetNumIntervals() == 1;
return Offsets_.GetNumIntervals() <= 1;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

В случае GetNumIntervals() == 0 функция GetOffsetsCommitRange возвращает {0, 0}? А в каких случаях это нужно?

}

std::pair<ui64, ui64> TConsumerOperations::GetRange() const
std::pair<ui64, ui64> TConsumerOperations::GetOffsetsCommitRange() const
{
Y_ABORT_UNLESS(IsValid());

return {Offsets_.Min(), Offsets_.Max()};
if (Offsets_.Empty()) {
return {0,0};
} else {
return {Offsets_.Min(), Offsets_.Max()};
}
}

bool TConsumerOperations::GetForceCommit() const
{
return ForceCommit_;
}

bool TConsumerOperations::GetKillReadSession() const
{
return KillReadSession_;
}

bool TConsumerOperations::GetOnlyCheckCommitedToFinish() const
{
return OnlyCheckCommitedToFinish_;
}

void TConsumerOperations::AddOperation(const TString& consumer, const Ydb::Topic::OffsetsRange& range)
TString TConsumerOperations::GetReadSessionId() const
{
return ReadSessionId_;
}

void TConsumerOperations::AddOperation(const TString& consumer,
const NKikimrKqp::TTopicOperationsRequest_TopicOffsets_PartitionOffsets_OffsetsRange& range,
bool forceCommit,
bool killReadSession,
bool onlyCheckCommitedToFinish,
const TString& readSessionId)
{
Y_ABORT_UNLESS(Consumer_.Empty() || Consumer_ == consumer);

AddOperationImpl(consumer, range.start(), range.end());
AddOperationImpl(consumer, range.start(), range.end(), forceCommit, killReadSession, onlyCheckCommitedToFinish, readSessionId);
}

void TConsumerOperations::Merge(const TConsumerOperations& rhs)
{
Y_ABORT_UNLESS(rhs.Consumer_.Defined());
Y_ABORT_UNLESS(Consumer_.Empty() || Consumer_ == rhs.Consumer_);

for (auto& range : rhs.Offsets_) {
AddOperationImpl(*rhs.Consumer_, range.first, range.second);
if (!rhs.Offsets_.Empty()) {
for (auto& range : rhs.Offsets_) {
AddOperationImpl(*rhs.Consumer_, range.first, range.second, rhs.GetForceCommit(), rhs.GetKillReadSession(), rhs.GetOnlyCheckCommitedToFinish(), rhs.GetReadSessionId());
}
} else {
AddOperationImpl(*rhs.Consumer_, 0, 0, rhs.GetForceCommit(), rhs.GetKillReadSession(), rhs.GetOnlyCheckCommitedToFinish(), rhs.GetReadSessionId());
}
}

void TConsumerOperations::AddOperationImpl(const TString& consumer,
ui64 begin, ui64 end)
ui64 begin,
ui64 end,
bool forceCommit,
bool killReadSession,
bool onlyCheckCommitedToFinish,
const TString& readSessionId)
{
if (Offsets_.Intersects(begin, end)) {
ythrow TOffsetsRangeIntersectExpection() << "offset ranges intersect";
Expand All @@ -64,7 +103,14 @@ void TConsumerOperations::AddOperationImpl(const TString& consumer,
Consumer_ = consumer;
}

Offsets_.InsertInterval(begin, end);
if (end != 0) {
Offsets_.InsertInterval(begin, end);
}

ForceCommit_ = forceCommit;
KillReadSession_ = killReadSession;
OnlyCheckCommitedToFinish_ = onlyCheckCommitedToFinish;
ReadSessionId_ = readSessionId;
}

//
Expand All @@ -76,9 +122,14 @@ bool TTopicPartitionOperations::IsValid() const
[](auto& x) { return x.second.IsValid(); });
}

void TTopicPartitionOperations::AddOperation(const TString& topic, ui32 partition,
void TTopicPartitionOperations::AddOperation(const TString& topic,
ui32 partition,
const TString& consumer,
const Ydb::Topic::OffsetsRange& range)
const NKikimrKqp::TTopicOperationsRequest_TopicOffsets_PartitionOffsets_OffsetsRange& range,
bool forceCommit,
bool killReadSession,
bool onlyCheckCommitedToFinish,
const TString& readSessionId)
{
Y_ABORT_UNLESS(Topic_.Empty() || Topic_ == topic);
Y_ABORT_UNLESS(Partition_.Empty() || Partition_ == partition);
Expand All @@ -88,7 +139,7 @@ void TTopicPartitionOperations::AddOperation(const TString& topic, ui32 partitio
Partition_ = partition;
}

Operations_[consumer].AddOperation(consumer, range);
Operations_[consumer].AddOperation(consumer, range, forceCommit, killReadSession, onlyCheckCommitedToFinish, readSessionId);
}

void TTopicPartitionOperations::AddOperation(const TString& topic, ui32 partition,
Expand Down Expand Up @@ -117,11 +168,15 @@ void TTopicPartitionOperations::BuildTopicTxs(TTopicOperationTransactions& txs)
for (auto& [consumer, operations] : Operations_) {
NKikimrPQ::TPartitionOperation* o = t.tx.MutableOperations()->Add();
o->SetPartitionId(*Partition_);
auto [begin, end] = operations.GetRange();
o->SetBegin(begin);
o->SetEnd(end);
auto [begin, end] = operations.GetOffsetsCommitRange();
o->SetCommitOffsetsBegin(begin);
o->SetCommitOffsetsEnd(end);
o->SetConsumer(consumer);
o->SetPath(*Topic_);
o->SetKillReadSession(operations.GetKillReadSession());
o->SetForceCommit(operations.GetForceCommit());
o->SetOnlyCheckCommitedToFinish(operations.GetOnlyCheckCommitedToFinish());
o->SetReadSessionId(operations.GetReadSessionId());
}

if (HasWriteOperations_) {
Expand Down Expand Up @@ -256,14 +311,25 @@ bool TTopicOperations::TabletHasReadOperations(ui64 tabletId) const
return false;
}

void TTopicOperations::AddOperation(const TString& topic, ui32 partition,
void TTopicOperations::AddOperation(const TString& topic,
ui32 partition,
const TString& consumer,
const Ydb::Topic::OffsetsRange& range)
const NKikimrKqp::TTopicOperationsRequest_TopicOffsets_PartitionOffsets_OffsetsRange& range,
bool forceCommit,
bool killReadSession,
bool onlyCheckCommitedToFinish,
const TString& readSessionId
)
{
TTopicPartition key{topic, partition};
Operations_[key].AddOperation(topic, partition,
Operations_[key].AddOperation(topic,
partition,
consumer,
range);
range,
forceCommit,
killReadSession,
onlyCheckCommitedToFinish,
readSessionId);
HasReadOperations_ = true;
}

Expand Down
46 changes: 38 additions & 8 deletions ydb/core/kqp/topics/kqp_topics.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <ydb/public/api/protos/ydb_topic.pb.h>
#include <ydb/core/protos/pqconfig.pb.h>

#include <ydb/core/protos/kqp.pb.h>
#include <ydb/core/tx/long_tx_service/public/lock_handle.h>
#include <ydb/core/tx/scheme_cache/scheme_cache.h>

Expand All @@ -26,20 +27,40 @@ class TConsumerOperations {
public:
bool IsValid() const;

std::pair<ui64, ui64> GetRange() const;
std::pair<ui64, ui64> GetOffsetsCommitRange() const;

ui64 GetBegin() const;
ui64 GetEnd() const;
ui64 GetOffsetCommitBegin() const;
ui64 GetOffsetCommitEnd() const;

bool GetForceCommit() const;
bool GetKillReadSession() const;
bool GetOnlyCheckCommitedToFinish() const;
TString GetReadSessionId() const;

void AddOperation(const TString& consumer,
const NKikimrKqp::TTopicOperationsRequest_TopicOffsets_PartitionOffsets_OffsetsRange& range,
bool forceCommit = false,
bool killReadSession = false,
bool onlyCheckCommitedToFinish = false,
const TString& readSessionId = {});

void AddOperation(const TString& consumer, const Ydb::Topic::OffsetsRange& range);
void Merge(const TConsumerOperations& rhs);

private:
void AddOperationImpl(const TString& consumer,
ui64 begin, ui64 end);
ui64 begin,
ui64 end,
bool forceCommit = false,
bool killReadSession = false,
bool onlyCheckCommitedToFinish = false,
const TString& readSessionId = {});

TMaybe<TString> Consumer_;
TDisjointIntervalTree<ui64> Offsets_;
bool ForceCommit_ = false;
bool KillReadSession_ = false;
bool OnlyCheckCommitedToFinish_ = false;
TString ReadSessionId_;
};

struct TTopicOperationTransaction {
Expand All @@ -53,9 +74,14 @@ class TTopicPartitionOperations {
public:
bool IsValid() const;

void AddOperation(const TString& topic, ui32 partition,
void AddOperation(const TString& topic,
ui32 partition,
const TString& consumer,
const Ydb::Topic::OffsetsRange& range);
const NKikimrKqp::TTopicOperationsRequest_TopicOffsets_PartitionOffsets_OffsetsRange& range,
bool forceCommit = false,
bool killReadSession = false,
bool onlyCheckCommitedToFinish = false,
const TString& readSessionId = {});
void AddOperation(const TString& topic, ui32 partition,
TMaybe<ui32> supportivePartition);

Expand Down Expand Up @@ -108,7 +134,11 @@ class TTopicOperations {

void AddOperation(const TString& topic, ui32 partition,
const TString& consumer,
const Ydb::Topic::OffsetsRange& range);
const NKikimrKqp::TTopicOperationsRequest_TopicOffsets_PartitionOffsets_OffsetsRange& range,
bool forceCommit,
bool killReadSession,
bool onlyCheckCommitedToFinish,
const TString& readSessionId);
void AddOperation(const TString& topic, ui32 partition,
TMaybe<ui32> supportivePartition);

Expand Down
10 changes: 7 additions & 3 deletions ydb/core/persqueue/events/internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -811,11 +811,15 @@ struct TEvPQ {
{
}

void AddOperation(TString consumer, ui64 begin, ui64 end) {
void AddOperation(TString consumer, ui64 begin, ui64 end, bool forceCommit = false, bool killReadSession = false, bool onlyCheckCommitedToFinish = false, TString readSessionId = {}) {
NKikimrPQ::TPartitionOperation operation;
operation.SetBegin(begin);
operation.SetEnd(end);
operation.SetCommitOffsetsBegin(begin);
operation.SetCommitOffsetsEnd(end);
operation.SetConsumer(std::move(consumer));
operation.SetForceCommit(forceCommit);
operation.SetKillReadSession(killReadSession);
operation.SetOnlyCheckCommitedToFinish(onlyCheckCommitedToFinish);
operation.SetReadSessionId(readSessionId);

Operations.push_back(std::move(operation));
}
Expand Down
Loading
Loading