Skip to content

Commit for autopartitioned topics #11629

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ydb/core/kqp/executer_actor/kqp_data_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2162,6 +2162,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
TTopicTabletTxs topicTxs;
TDatashardTxs datashardTxs;
TEvWriteTxs evWriteTxs;

if (!TxManager) {
BuildDatashardTxs(datashardTasks, datashardTxs, evWriteTxs, topicTxs);
}
Expand Down
8 changes: 3 additions & 5 deletions ydb/core/kqp/session_actor/kqp_query_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ bool TKqpQueryState::PrepareNextStatementPart() {
void TKqpQueryState::AddOffsetsToTransaction() {
YQL_ENSURE(HasTopicOperations());

const auto& operations = GetTopicOperations();
const auto& operations = GetTopicOperationsFromRequest();

TMaybe<TString> consumer;
if (operations.HasConsumer()) {
Expand All @@ -442,7 +442,6 @@ void TKqpQueryState::AddOffsetsToTransaction() {
}

TopicOperations = NTopic::TTopicOperations();

for (auto& topic : operations.GetTopics()) {
auto path = CanonizePath(NPersQueue::GetFullTopicPath(GetDatabase(), topic.path()));

Expand All @@ -452,8 +451,7 @@ void TKqpQueryState::AddOffsetsToTransaction() {
} else {
for (auto& range : partition.partition_offsets()) {
YQL_ENSURE(consumer.Defined());

TopicOperations.AddOperation(path, partition.partition_id(), *consumer, range);
TopicOperations.AddOperation(path, partition.partition_id(), *consumer, range, partition.force_commit(), partition.kill_read_session(), partition.only_check_commited_to_finish(), partition.read_session_id());
}
}
}
Expand All @@ -474,7 +472,7 @@ std::unique_ptr<NSchemeCache::TSchemeCacheNavigate> TKqpQueryState::BuildSchemeC
auto navigate = std::make_unique<NSchemeCache::TSchemeCacheNavigate>();
navigate->DatabaseName = CanonizePath(GetDatabase());

const auto& operations = GetTopicOperations();
const auto& operations = GetTopicOperationsFromRequest();
TMaybe<TString> consumer;
if (operations.HasConsumer())
consumer = operations.GetConsumer();
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/session_actor/kqp_query_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ class TKqpQueryState : public TNonCopyable {
return RequestEv->GetQuery();
}

const ::NKikimrKqp::TTopicOperationsRequest& GetTopicOperations() const {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

а почему здесь важно, что FromReqest? Есть не из request?

const ::NKikimrKqp::TTopicOperationsRequest& GetTopicOperationsFromRequest() const {
return RequestEv->GetTopicOperations();
}

Expand Down
1 change: 0 additions & 1 deletion ydb/core/kqp/session_actor/kqp_session_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1358,7 +1358,6 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
}
}
}

request.TopicOperations = std::move(txCtx.TopicOperations);
} else if (QueryState->ShouldAcquireLocks(tx) && (!txCtx.HasOlapTable || Settings.TableService.GetEnableOlapSink())) {
request.AcquireLocksTxId = txCtx.Locks.GetLockTxId();
Expand Down
104 changes: 85 additions & 19 deletions ydb/core/kqp/topics/kqp_topics.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "kqp_topics.h"

#include <ydb/core/base/path.h>
#include <ydb/core/protos/kqp.pb.h>
#include <ydb/core/persqueue/utils.h>
#include <ydb/library/actors/core/log.h>

Expand All @@ -26,35 +27,73 @@ static void UpdateSupportivePartition(TMaybe<ui32>& lhs, const TMaybe<ui32>& rhs
//
bool TConsumerOperations::IsValid() const
{
return Offsets_.GetNumIntervals() == 1;
return Offsets_.GetNumIntervals() <= 1;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

В случае GetNumIntervals() == 0 функция GetOffsetsCommitRange возвращает {0, 0}? А в каких случаях это нужно?

}

std::pair<ui64, ui64> TConsumerOperations::GetRange() const
std::pair<ui64, ui64> TConsumerOperations::GetOffsetsCommitRange() const
{
Y_ABORT_UNLESS(IsValid());

return {Offsets_.Min(), Offsets_.Max()};
if (Offsets_.Empty()) {
return {0,0};
} else {
return {Offsets_.Min(), Offsets_.Max()};
}
}

bool TConsumerOperations::GetForceCommit() const
{
return ForceCommit_;
}

bool TConsumerOperations::GetKillReadSession() const
{
return KillReadSession_;
}

bool TConsumerOperations::GetOnlyCheckCommitedToFinish() const
{
return OnlyCheckCommitedToFinish_;
}

void TConsumerOperations::AddOperation(const TString& consumer, const Ydb::Topic::OffsetsRange& range)
TString TConsumerOperations::GetReadSessionId() const
{
return ReadSessionId_;
}

void TConsumerOperations::AddOperation(const TString& consumer,
const NKikimrKqp::TTopicOperationsRequest_TopicOffsets_PartitionOffsets_OffsetsRange& range,
bool forceCommit,
bool killReadSession,
bool onlyCheckCommitedToFinish,
const TString& readSessionId)
{
Y_ABORT_UNLESS(Consumer_.Empty() || Consumer_ == consumer);

AddOperationImpl(consumer, range.start(), range.end());
AddOperationImpl(consumer, range.start(), range.end(), forceCommit, killReadSession, onlyCheckCommitedToFinish, readSessionId);
}

void TConsumerOperations::Merge(const TConsumerOperations& rhs)
{
Y_ABORT_UNLESS(rhs.Consumer_.Defined());
Y_ABORT_UNLESS(Consumer_.Empty() || Consumer_ == rhs.Consumer_);

for (auto& range : rhs.Offsets_) {
AddOperationImpl(*rhs.Consumer_, range.first, range.second);
if (!rhs.Offsets_.Empty()) {
for (auto& range : rhs.Offsets_) {
AddOperationImpl(*rhs.Consumer_, range.first, range.second, rhs.GetForceCommit(), rhs.GetKillReadSession(), rhs.GetOnlyCheckCommitedToFinish(), rhs.GetReadSessionId());
}
} else {
AddOperationImpl(*rhs.Consumer_, 0, 0, rhs.GetForceCommit(), rhs.GetKillReadSession(), rhs.GetOnlyCheckCommitedToFinish(), rhs.GetReadSessionId());
}
}

void TConsumerOperations::AddOperationImpl(const TString& consumer,
ui64 begin, ui64 end)
ui64 begin,
ui64 end,
bool forceCommit,
bool killReadSession,
bool onlyCheckCommitedToFinish,
const TString& readSessionId)
{
if (Offsets_.Intersects(begin, end)) {
ythrow TOffsetsRangeIntersectExpection() << "offset ranges intersect";
Expand All @@ -64,7 +103,14 @@ void TConsumerOperations::AddOperationImpl(const TString& consumer,
Consumer_ = consumer;
}

Offsets_.InsertInterval(begin, end);
if (end != 0) {
Offsets_.InsertInterval(begin, end);
}

ForceCommit_ = forceCommit;
KillReadSession_ = killReadSession;
OnlyCheckCommitedToFinish_ = onlyCheckCommitedToFinish;
ReadSessionId_ = readSessionId;
}

//
Expand All @@ -76,9 +122,14 @@ bool TTopicPartitionOperations::IsValid() const
[](auto& x) { return x.second.IsValid(); });
}

void TTopicPartitionOperations::AddOperation(const TString& topic, ui32 partition,
void TTopicPartitionOperations::AddOperation(const TString& topic,
ui32 partition,
const TString& consumer,
const Ydb::Topic::OffsetsRange& range)
const NKikimrKqp::TTopicOperationsRequest_TopicOffsets_PartitionOffsets_OffsetsRange& range,
bool forceCommit,
bool killReadSession,
bool onlyCheckCommitedToFinish,
const TString& readSessionId)
{
Y_ABORT_UNLESS(Topic_.Empty() || Topic_ == topic);
Y_ABORT_UNLESS(Partition_.Empty() || Partition_ == partition);
Expand All @@ -88,7 +139,7 @@ void TTopicPartitionOperations::AddOperation(const TString& topic, ui32 partitio
Partition_ = partition;
}

Operations_[consumer].AddOperation(consumer, range);
Operations_[consumer].AddOperation(consumer, range, forceCommit, killReadSession, onlyCheckCommitedToFinish, readSessionId);
}

void TTopicPartitionOperations::AddOperation(const TString& topic, ui32 partition,
Expand Down Expand Up @@ -117,11 +168,15 @@ void TTopicPartitionOperations::BuildTopicTxs(TTopicOperationTransactions& txs)
for (auto& [consumer, operations] : Operations_) {
NKikimrPQ::TPartitionOperation* o = t.tx.MutableOperations()->Add();
o->SetPartitionId(*Partition_);
auto [begin, end] = operations.GetRange();
o->SetBegin(begin);
o->SetEnd(end);
auto [begin, end] = operations.GetOffsetsCommitRange();
o->SetCommitOffsetsBegin(begin);
o->SetCommitOffsetsEnd(end);
o->SetConsumer(consumer);
o->SetPath(*Topic_);
o->SetKillReadSession(operations.GetKillReadSession());
o->SetForceCommit(operations.GetForceCommit());
o->SetOnlyCheckCommitedToFinish(operations.GetOnlyCheckCommitedToFinish());
o->SetReadSessionId(operations.GetReadSessionId());
}

if (HasWriteOperations_) {
Expand Down Expand Up @@ -256,14 +311,25 @@ bool TTopicOperations::TabletHasReadOperations(ui64 tabletId) const
return false;
}

void TTopicOperations::AddOperation(const TString& topic, ui32 partition,
void TTopicOperations::AddOperation(const TString& topic,
ui32 partition,
const TString& consumer,
const Ydb::Topic::OffsetsRange& range)
const NKikimrKqp::TTopicOperationsRequest_TopicOffsets_PartitionOffsets_OffsetsRange& range,
bool forceCommit,
bool killReadSession,
bool onlyCheckCommitedToFinish,
const TString& readSessionId
)
{
TTopicPartition key{topic, partition};
Operations_[key].AddOperation(topic, partition,
Operations_[key].AddOperation(topic,
partition,
consumer,
range);
range,
forceCommit,
killReadSession,
onlyCheckCommitedToFinish,
readSessionId);
HasReadOperations_ = true;
}

Expand Down
46 changes: 38 additions & 8 deletions ydb/core/kqp/topics/kqp_topics.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <ydb/public/api/protos/ydb_topic.pb.h>
#include <ydb/core/protos/pqconfig.pb.h>

#include <ydb/core/protos/kqp.pb.h>
#include <ydb/core/tx/long_tx_service/public/lock_handle.h>
#include <ydb/core/tx/scheme_cache/scheme_cache.h>

Expand All @@ -26,20 +27,40 @@ class TConsumerOperations {
public:
bool IsValid() const;

std::pair<ui64, ui64> GetRange() const;
std::pair<ui64, ui64> GetOffsetsCommitRange() const;

ui64 GetBegin() const;
ui64 GetEnd() const;
ui64 GetOffsetCommitBegin() const;
ui64 GetOffsetCommitEnd() const;

bool GetForceCommit() const;
bool GetKillReadSession() const;
bool GetOnlyCheckCommitedToFinish() const;
TString GetReadSessionId() const;

void AddOperation(const TString& consumer,
const NKikimrKqp::TTopicOperationsRequest_TopicOffsets_PartitionOffsets_OffsetsRange& range,
bool forceCommit = false,
bool killReadSession = false,
bool onlyCheckCommitedToFinish = false,
const TString& readSessionId = {});

void AddOperation(const TString& consumer, const Ydb::Topic::OffsetsRange& range);
void Merge(const TConsumerOperations& rhs);

private:
void AddOperationImpl(const TString& consumer,
ui64 begin, ui64 end);
ui64 begin,
ui64 end,
bool forceCommit = false,
bool killReadSession = false,
bool onlyCheckCommitedToFinish = false,
const TString& readSessionId = {});

TMaybe<TString> Consumer_;
TDisjointIntervalTree<ui64> Offsets_;
bool ForceCommit_ = false;
bool KillReadSession_ = false;
bool OnlyCheckCommitedToFinish_ = false;
TString ReadSessionId_;
};

struct TTopicOperationTransaction {
Expand All @@ -53,9 +74,14 @@ class TTopicPartitionOperations {
public:
bool IsValid() const;

void AddOperation(const TString& topic, ui32 partition,
void AddOperation(const TString& topic,
ui32 partition,
const TString& consumer,
const Ydb::Topic::OffsetsRange& range);
const NKikimrKqp::TTopicOperationsRequest_TopicOffsets_PartitionOffsets_OffsetsRange& range,
bool forceCommit = false,
bool killReadSession = false,
bool onlyCheckCommitedToFinish = false,
const TString& readSessionId = {});
void AddOperation(const TString& topic, ui32 partition,
TMaybe<ui32> supportivePartition);

Expand Down Expand Up @@ -108,7 +134,11 @@ class TTopicOperations {

void AddOperation(const TString& topic, ui32 partition,
const TString& consumer,
const Ydb::Topic::OffsetsRange& range);
const NKikimrKqp::TTopicOperationsRequest_TopicOffsets_PartitionOffsets_OffsetsRange& range,
bool forceCommit,
bool killReadSession,
bool onlyCheckCommitedToFinish,
const TString& readSessionId);
void AddOperation(const TString& topic, ui32 partition,
TMaybe<ui32> supportivePartition);

Expand Down
10 changes: 7 additions & 3 deletions ydb/core/persqueue/events/internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -811,11 +811,15 @@ struct TEvPQ {
{
}

void AddOperation(TString consumer, ui64 begin, ui64 end) {
void AddOperation(TString consumer, ui64 begin, ui64 end, bool forceCommit = false, bool killReadSession = false, bool onlyCheckCommitedToFinish = false, TString readSessionId = {}) {
NKikimrPQ::TPartitionOperation operation;
operation.SetBegin(begin);
operation.SetEnd(end);
operation.SetCommitOffsetsBegin(begin);
operation.SetCommitOffsetsEnd(end);
operation.SetConsumer(std::move(consumer));
operation.SetForceCommit(forceCommit);
operation.SetKillReadSession(killReadSession);
operation.SetOnlyCheckCommitedToFinish(onlyCheckCommitedToFinish);
operation.SetReadSessionId(readSessionId);

Operations.push_back(std::move(operation));
}
Expand Down
Loading
Loading