Skip to content

Prepare & execute topics in BufferWriteActor #12464

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Dec 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 47 additions & 2 deletions ydb/core/kqp/common/kqp_tx_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,32 @@ class TKqpTransactionManager : public IKqpTransactionManager {
}
}

void AddTopic(ui64 topicId, const TString& path) override {
Y_ABORT_UNLESS(State == ETransactionState::COLLECTING);
ShardsIds.insert(topicId);
auto& shardInfo = ShardsInfo[topicId];

const auto [stringsIter, _] = TablePathes.insert(path);
const TStringBuf pathBuf = *stringsIter;
shardInfo.Pathes.insert(pathBuf);
}

void AddTopicsToShards() override {
if (!HasTopics()) {
return;
}

for (auto& topicId : GetTopicOperations().GetSendingTabletIds()) {
AddTopic(topicId, *GetTopicOperations().GetTabletName(topicId));
AddAction(topicId, EAction::READ);
}

for (auto& topicId : GetTopicOperations().GetReceivingTabletIds()) {
AddTopic(topicId, *GetTopicOperations().GetTabletName(topicId));
AddAction(topicId, EAction::WRITE);
}
}

bool AddLock(ui64 shardId, const NKikimrDataEvents::TLock& lockProto) override {
Y_ABORT_UNLESS(State == ETransactionState::COLLECTING);
TKqpLock lock(lockProto);
Expand Down Expand Up @@ -124,6 +150,22 @@ class TKqpTransactionManager : public IKqpTransactionManager {
ShardsInfo.at(shardId).State = state;
}

void SetTopicOperations(NTopic::TTopicOperations&& topicOperations) override {
TopicOperations = std::move(topicOperations);
}

const NTopic::TTopicOperations& GetTopicOperations() const override {
return TopicOperations;
}

void BuildTopicTxs(NTopic::TTopicOperationTransactions& txs) override {
TopicOperations.BuildTopicTxs(txs);
}

bool HasTopics() const override {
return GetTopicOperations().GetSize() != 0;
}

TVector<NKikimrDataEvents::TLock> GetLocks() const override {
TVector<NKikimrDataEvents::TLock> locks;
for (const auto& [_, shardInfo] : ShardsInfo) {
Expand Down Expand Up @@ -189,7 +231,8 @@ class TKqpTransactionManager : public IKqpTransactionManager {
bool IsVolatile() const override {
return !HasOlapTable()
&& !IsReadOnly()
&& !IsSingleShard();
&& !IsSingleShard()
&& !HasTopics();

// TODO: && !HasPersistentChannels;
// Note: currently persistent channels are never used
Expand Down Expand Up @@ -342,7 +385,7 @@ class TKqpTransactionManager : public IKqpTransactionManager {
shardInfo.State = EShardState::EXECUTING;
}

AFL_ENSURE(ReceivingShards.empty() || !IsSingleShard() || HasOlapTable());
AFL_ENSURE(ReceivingShards.empty() || HasTopics() || !IsSingleShard() || HasOlapTable());
}

TCommitInfo GetCommitInfo() override {
Expand Down Expand Up @@ -440,6 +483,8 @@ class TKqpTransactionManager : public IKqpTransactionManager {

THashSet<ui64> ShardsToWaitPrepare;

NTopic::TTopicOperations TopicOperations;

ui64 MinStep = 0;
ui64 MaxStep = 0;
ui64 Coordinator = 0;
Expand Down
9 changes: 9 additions & 0 deletions ydb/core/kqp/common/kqp_tx_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ class IKqpTransactionManager {

virtual void AddShard(ui64 shardId, bool isOlap, const TString& path) = 0;
virtual void AddAction(ui64 shardId, ui8 action) = 0;
virtual void AddTopic(ui64 topicId, const TString& path) = 0;
virtual void AddTopicsToShards() = 0;
virtual bool AddLock(ui64 shardId, const NKikimrDataEvents::TLock& lock) = 0;

virtual void BreakLock(ui64 shardId) = 0;
Expand All @@ -49,6 +51,13 @@ class IKqpTransactionManager {
virtual EShardState GetState(ui64 shardId) const = 0;
virtual void SetState(ui64 shardId, EShardState state) = 0;

virtual void SetTopicOperations(NTopic::TTopicOperations&& topicOperations) = 0;
virtual const NTopic::TTopicOperations& GetTopicOperations() const = 0;

virtual void BuildTopicTxs(NTopic::TTopicOperationTransactions& txs) = 0;

virtual bool HasTopics() const = 0;

virtual bool IsTxPrepared() const = 0;
virtual bool IsTxFinished() const = 0;

Expand Down
16 changes: 12 additions & 4 deletions ydb/core/kqp/executer_actor/kqp_data_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1533,7 +1533,12 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da

private:
bool IsReadOnlyTx() const {
if (Request.TopicOperations.HasOperations()) {
if (BufferActorId && TxManager->GetTopicOperations().HasOperations()) {
YQL_ENSURE(!Request.UseImmediateEffects);
return false;
}

if (!BufferActorId && Request.TopicOperations.HasOperations()) {
YQL_ENSURE(!Request.UseImmediateEffects);
return false;
}
Expand Down Expand Up @@ -2103,7 +2108,8 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
}

// Single-shard datashard transactions are always immediate
ImmediateTx = (datashardTxs.size() + evWriteTxs.size() + Request.TopicOperations.GetSize() + sourceScanPartitionsCount) <= 1
auto topicSize = (BufferActorId) ? TxManager->GetTopicOperations().GetSize() : Request.TopicOperations.GetSize();
ImmediateTx = (datashardTxs.size() + evWriteTxs.size() + topicSize + sourceScanPartitionsCount) <= 1
&& !UnknownAffectedShardCount
&& evWriteTxs.empty()
&& !HasOlapTable;
Expand Down Expand Up @@ -2383,6 +2389,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
}
}

YQL_ENSURE(!TxManager);
Request.TopicOperations.BuildTopicTxs(topicTxs);

const bool needRollback = Request.LocksOp == ELocksOp::Rollback;
Expand Down Expand Up @@ -2419,8 +2426,8 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
// HTAP transactions always use generic readsets
!evWriteTxs.empty());

if (!locksMap.empty() || VolatileTx ||
Request.TopicOperations.HasReadOperations() || Request.TopicOperations.HasWriteOperations())
if (!locksMap.empty() || VolatileTx || Request.TopicOperations.HasReadOperations()
|| Request.TopicOperations.HasWriteOperations())
{
YQL_ENSURE(Request.LocksOp == ELocksOp::Commit || Request.LocksOp == ELocksOp::Rollback || VolatileTx);

Expand Down Expand Up @@ -2766,6 +2773,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
void ExecuteTopicTabletTransactions(TTopicTabletTxs& topicTxs) {
YQL_ENSURE(!TxManager);
TMaybe<ui64> writeId;

if (Request.TopicOperations.HasWriteId()) {
writeId = Request.TopicOperations.GetWriteId();
}
Expand Down
168 changes: 167 additions & 1 deletion ydb/core/kqp/runtime/kqp_write_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <ydb/core/tx/data_events/shards_splitter.h>
#include <ydb/core/tx/scheme_cache/scheme_cache.h>
#include <ydb/core/tx/tx.h>
#include <ydb/core/persqueue/events/global.h>
#include <ydb/library/actors/core/actorsystem.h>
#include <ydb/library/actors/core/interconnect.h>
#include <ydb/library/wilson_ids/wilson.h>
Expand Down Expand Up @@ -1287,6 +1288,7 @@ struct TEvBufferWriteResult : public TEventLocal<TEvBufferWriteResult, TKqpEvent

class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, public IKqpTableWriterCallbacks {
using TBase = TActorBootstrapped<TKqpBufferWriteActor>;
using TTopicTabletTxs = NTopic::TTopicOperationTransactions;

public:
enum class EState {
Expand Down Expand Up @@ -1315,6 +1317,7 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
State = EState::WRITING;
Alloc->Release();
Counters->BufferActorsCount->Inc();
TxManager->AddTopicsToShards();
}

void Bootstrap() {
Expand All @@ -1335,6 +1338,7 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
hFunc(TEvBufferWrite, Handle);

hFunc(TEvTxProxy::TEvProposeTransactionStatus, Handle);
hFunc(TEvPersQueue::TEvProposeTransactionResult, Handle);
hFunc(NKikimr::NEvents::TDataEvents::TEvWriteResult, Handle);
hFunc(TEvPipeCache::TEvDeliveryProblem, Handle);
default:
Expand Down Expand Up @@ -1521,6 +1525,7 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
Close();
Process();
SendToExternalShards(false);
SendToTopics();
}

void ImmediateCommit() {
Expand Down Expand Up @@ -1618,6 +1623,63 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
}
}

void SendToTopics() {
if (!TxManager->HasTopics()) {
return;
}

TTopicTabletTxs topicTxs;
TxManager->BuildTopicTxs(topicTxs);

TMaybe<ui64> writeId;
if (TxManager->GetTopicOperations().HasWriteId()) {
writeId = TxManager->GetTopicOperations().GetWriteId();
}

for (auto& [tabletId, t] : topicTxs) {
auto& transaction = t.tx;
transaction.SetOp(NKikimrPQ::TDataTransaction::Commit);

const auto prepareSettings = TxManager->GetPrepareTransactionInfo();
if (!prepareSettings.ArbiterColumnShard) {
for (const ui64 sendingShardId : prepareSettings.SendingShards) {
transaction.AddSendingShards(sendingShardId);
}
for (const ui64 receivingShardId : prepareSettings.ReceivingShards) {
transaction.AddReceivingShards(receivingShardId);
}
} else {
transaction.AddSendingShards(*prepareSettings.ArbiterColumnShard);
transaction.AddReceivingShards(*prepareSettings.ArbiterColumnShard);
}

auto ev = std::make_unique<TEvPersQueue::TEvProposeTransactionBuilder>();

if (t.hasWrite && writeId.Defined()) {
auto* w = transaction.MutableWriteId();
w->SetNodeId(SelfId().NodeId());
w->SetKeyId(*writeId);
}
transaction.SetImmediate(false);

ActorIdToProto(SelfId(), ev->Record.MutableSourceActor());
ev->Record.MutableData()->Swap(&transaction);
ev->Record.SetTxId(*TxId);

SendTime[tabletId] = TInstant::Now();
auto traceId = BufferWriteActor.GetTraceId();

CA_LOG_D("Preparing KQP transaction on topic tablet: " << tabletId << ", writeId: " << writeId);

Send(
MakePipePerNodeCacheID(false),
new TEvPipeCache::TEvForward(ev.release(), tabletId, /* subscribe */ true),
IEventHandle::FlagTrackDelivery,
0,
std::move(traceId));
}
}

void SendCommitToCoordinator() {
const auto commitInfo = TxManager->GetCommitInfo();

Expand Down Expand Up @@ -1741,6 +1803,69 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
}
}

void Handle(TEvPersQueue::TEvProposeTransactionResult::TPtr& ev) {
auto& event = ev->Get()->Record;
const ui64 tabletId = event.GetOrigin();

CA_LOG_D("Got ProposeTransactionResult" <<
", PQ tablet: " << tabletId <<
", status: " << NKikimrPQ::TEvProposeTransactionResult_EStatus_Name(event.GetStatus()));

switch (event.GetStatus()) {
case NKikimrPQ::TEvProposeTransactionResult::PREPARED:
ProcessPreparedTopic(ev);
return;
case NKikimrPQ::TEvProposeTransactionResult::COMPLETE:
ProcessCompletedTopic(ev);
return;
case NKikimrPQ::TEvProposeTransactionResult::ABORTED:
CA_LOG_E("Got ABORTED ProposeTransactionResult for PQ."
<< " ShardID=" << ev->Get()->Record.GetOrigin() << ","
<< " Sink=" << this->SelfId() << ".");
ReplyErrorAndDie(
TStringBuilder() << "Aborted proposal status for PQ. ",
NYql::NDqProto::StatusIds::ABORTED,
{});
return;
case NKikimrPQ::TEvProposeTransactionResult::BAD_REQUEST:
CA_LOG_E("Got BAD REQUEST ProposeTransactionResult for PQ."
<< " ShardID=" << ev->Get()->Record.GetOrigin() << ","
<< " Sink=" << this->SelfId() << ".");
ReplyErrorAndDie(
TStringBuilder() << "Bad request proposal status for PQ. ",
NYql::NDqProto::StatusIds::BAD_REQUEST,
{});
return;
case NKikimrPQ::TEvProposeTransactionResult::OVERLOADED:
CA_LOG_E("Got OVERLOADED ProposeTransactionResult for PQ."
<< " ShardID=" << ev->Get()->Record.GetOrigin() << ","
<< " Sink=" << this->SelfId() << ".");
ReplyErrorAndDie(
TStringBuilder() << "Overloaded proposal status for PQ. ",
NYql::NDqProto::StatusIds::OVERLOADED,
{});
return;
case NKikimrPQ::TEvProposeTransactionResult::CANCELLED:
CA_LOG_E("Got CANCELLED ProposeTransactionResult for PQ."
<< " ShardID=" << ev->Get()->Record.GetOrigin() << ","
<< " Sink=" << this->SelfId() << ".");
ReplyErrorAndDie(
TStringBuilder() << "Cancelled proposal status for PQ. ",
NYql::NDqProto::StatusIds::CANCELLED,
{});
return;
default:
CA_LOG_E("Got undefined ProposeTransactionResult for PQ."
<< " ShardID=" << ev->Get()->Record.GetOrigin() << ","
<< " Sink=" << this->SelfId() << ".");
ReplyErrorAndDie(
TStringBuilder() << "Undefined proposal status for PQ. ",
NYql::NDqProto::StatusIds::INTERNAL_ERROR,
{});
return;
}
}

void Handle(TEvPipeCache::TEvDeliveryProblem::TPtr& ev) {
CA_LOG_W("TEvDeliveryProblem was received from tablet: " << ev->Get()->TabletId);
ReplyErrorAndDie(TStringBuilder() << "Failed to deviler message.", NYql::NDqProto::StatusIds::UNAVAILABLE, {});
Expand Down Expand Up @@ -1768,7 +1893,7 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
Rollback();
State = EState::FINISHED;
Send(ExecuterActorId, new TEvKqpBuffer::TEvResult{});
} else if (TxManager->IsSingleShard() && !TxManager->HasOlapTable() && !WriteInfos.empty()) {
} else if (TxManager->IsSingleShard() && !TxManager->HasOlapTable() && !WriteInfos.empty() && !TxManager->HasTopics()) {
TxManager->StartExecute();
ImmediateCommit();
} else {
Expand Down Expand Up @@ -1947,6 +2072,47 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
}
}

void ProcessPreparedTopic(TEvPersQueue::TEvProposeTransactionResult::TPtr& ev) {
if (State != EState::PREPARING) {
CA_LOG_D("Ignored topic prepared event.");
return;
}
OnMessageReceived(ev->Get()->Record.GetOrigin());
CA_LOG_D("Got propose prepared result TxId=" << ev->Get()->Record.GetTxId()
<< ", TabletId=" << ev->Get()->Record.GetOrigin()
<< ", Cookie=" << ev->Cookie);

const auto& record = ev->Get()->Record;
IKqpTransactionManager::TPrepareResult preparedInfo;
preparedInfo.ShardId = record.GetOrigin();
preparedInfo.MinStep = record.GetMinStep();
preparedInfo.MaxStep = record.GetMaxStep();

preparedInfo.Coordinator = 0;
if (record.DomainCoordinatorsSize()) {
auto domainCoordinators = TCoordinators(TVector<ui64>(record.GetDomainCoordinators().begin(),
record.GetDomainCoordinators().end()));
preparedInfo.Coordinator = domainCoordinators.Select(*TxId);
}

OnPrepared(std::move(preparedInfo), 0);
}

void ProcessCompletedTopic(TEvPersQueue::TEvProposeTransactionResult::TPtr& ev) {
NKikimrPQ::TEvProposeTransactionResult& event = ev->Get()->Record;

if (State != EState::COMMITTING) {
CA_LOG_D("Ignored completed event.");
return;
}
OnMessageReceived(event.GetOrigin());
CA_LOG_D("Got propose completed result" <<
", topic tablet: " << event.GetOrigin() <<
", status: " << NKikimrPQ::TEvProposeTransactionResult_EStatus_Name(event.GetStatus()));

OnCommitted(event.GetOrigin(), 0);
}

void ProcessWritePreparedShard(NKikimr::NEvents::TDataEvents::TEvWriteResult::TPtr& ev) {
if (State != EState::PREPARING) {
CA_LOG_D("Ignored write prepared event.");
Expand Down
Loading
Loading