Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 8 additions & 9 deletions ydb/core/kqp/executer_actor/kqp_data_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2167,7 +2167,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da

using TDatashardTxs = THashMap<ui64, NKikimrTxDataShard::TKqpTransaction*>;
using TEvWriteTxs = THashMap<ui64, NKikimrDataEvents::TEvWrite*>;
using TTopicTabletTxs = THashMap<ui64, NKikimrPQ::TDataTransaction>;
using TTopicTabletTxs = NTopic::TTopicOperationTransactions;

void ContinueExecute() {
if (Stats) {
Expand Down Expand Up @@ -2424,10 +2424,10 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
}
}

for (auto& [_, tx] : topicTxs) {
tx.SetOp(NKikimrPQ::TDataTransaction::Commit);
*tx.MutableSendingShards() = sendingShards;
*tx.MutableReceivingShards() = receivingShards;
for (auto& [_, t] : topicTxs) {
t.tx.SetOp(NKikimrPQ::TDataTransaction::Commit);
*t.tx.MutableSendingShards() = sendingShards;
*t.tx.MutableReceivingShards() = receivingShards;
YQL_ENSURE(!arbiter);
}
}
Expand Down Expand Up @@ -2589,13 +2589,12 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
writeId = Request.TopicOperations.GetWriteId();
}

for (auto& tx : topicTxs) {
auto tabletId = tx.first;
auto& transaction = tx.second;
for (auto& [tabletId, t] : topicTxs) {
auto& transaction = t.tx;

auto ev = std::make_unique<TEvPersQueue::TEvProposeTransaction>();

if (writeId.Defined()) {
if (t.hasWrite && writeId.Defined()) {
auto* w = transaction.MutableWriteId();
w->SetNodeId(SelfId().NodeId());
w->SetKeyId(*writeId);
Expand Down
11 changes: 6 additions & 5 deletions ydb/core/kqp/topics/kqp_topics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,15 +105,15 @@ void TTopicPartitionOperations::AddOperation(const TString& topic, ui32 partitio
HasWriteOperations_ = true;
}

void TTopicPartitionOperations::BuildTopicTxs(THashMap<ui64, NKikimrPQ::TDataTransaction> &txs)
void TTopicPartitionOperations::BuildTopicTxs(TTopicOperationTransactions& txs)
{
Y_ABORT_UNLESS(TabletId_.Defined());
Y_ABORT_UNLESS(Partition_.Defined());

auto& tx = txs[*TabletId_];
auto& t = txs[*TabletId_];

for (auto& [consumer, operations] : Operations_) {
NKikimrPQ::TPartitionOperation* o = tx.MutableOperations()->Add();
NKikimrPQ::TPartitionOperation* o = t.tx.MutableOperations()->Add();
o->SetPartitionId(*Partition_);
auto [begin, end] = operations.GetRange();
o->SetBegin(begin);
Expand All @@ -123,12 +123,13 @@ void TTopicPartitionOperations::BuildTopicTxs(THashMap<ui64, NKikimrPQ::TDataTra
}

if (HasWriteOperations_) {
NKikimrPQ::TPartitionOperation* o = tx.MutableOperations()->Add();
NKikimrPQ::TPartitionOperation* o = t.tx.MutableOperations()->Add();
o->SetPartitionId(*Partition_);
o->SetPath(*Topic_);
if (SupportivePartition_.Defined()) {
o->SetSupportivePartition(*SupportivePartition_);
}
t.hasWrite = true;
}
}

Expand Down Expand Up @@ -355,7 +356,7 @@ bool TTopicOperations::ProcessSchemeCacheNavigate(const NSchemeCache::TSchemeCac
return true;
}

void TTopicOperations::BuildTopicTxs(THashMap<ui64, NKikimrPQ::TDataTransaction> &txs)
void TTopicOperations::BuildTopicTxs(TTopicOperationTransactions& txs)
{
for (auto& [_, operations] : Operations_) {
operations.BuildTopicTxs(txs);
Expand Down
11 changes: 9 additions & 2 deletions ydb/core/kqp/topics/kqp_topics.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,13 @@ class TConsumerOperations {
TDisjointIntervalTree<ui64> Offsets_;
};

struct TTopicOperationTransaction {
NKikimrPQ::TDataTransaction tx;
bool hasWrite = false;
};

using TTopicOperationTransactions = THashMap<ui64, TTopicOperationTransaction>;

class TTopicPartitionOperations {
public:
bool IsValid() const;
Expand All @@ -52,7 +59,7 @@ class TTopicPartitionOperations {
void AddOperation(const TString& topic, ui32 partition,
TMaybe<ui32> supportivePartition);

void BuildTopicTxs(THashMap<ui64, NKikimrPQ::TDataTransaction> &txs);
void BuildTopicTxs(TTopicOperationTransactions &txs);

void Merge(const TTopicPartitionOperations& rhs);

Expand Down Expand Up @@ -109,7 +116,7 @@ class TTopicOperations {
Ydb::StatusIds_StatusCode& status,
TString& message);

void BuildTopicTxs(THashMap<ui64, NKikimrPQ::TDataTransaction> &txs);
void BuildTopicTxs(TTopicOperationTransactions &txs);

void Merge(const TTopicOperations& rhs);

Expand Down
126 changes: 82 additions & 44 deletions ydb/core/persqueue/pq_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1041,16 +1041,21 @@ void TPersQueue::InitTxWrites(const NKikimrPQ::TTabletTxInfo& info,
for (size_t i = 0; i != info.TxWritesSize(); ++i) {
auto& txWrite = info.GetTxWrites(i);
const TWriteId writeId = GetWriteId(txWrite);
ui32 partitionId = txWrite.GetOriginalPartitionId();
TPartitionId shadowPartitionId(partitionId, writeId, txWrite.GetInternalPartitionId());

TxWrites[writeId].Partitions.emplace(partitionId, shadowPartitionId);
TTxWriteInfo& writeInfo = TxWrites[writeId];
if (txWrite.HasOriginalPartitionId()) {
ui32 partitionId = txWrite.GetOriginalPartitionId();
TPartitionId shadowPartitionId(partitionId, writeId, txWrite.GetInternalPartitionId());

AddSupportivePartition(shadowPartitionId);
CreateSupportivePartitionActor(shadowPartitionId, ctx);
SubscribeWriteId(writeId, ctx);
writeInfo.Partitions.emplace(partitionId, shadowPartitionId);

AddSupportivePartition(shadowPartitionId);
CreateSupportivePartitionActor(shadowPartitionId, ctx);

NextSupportivePartitionId = Max(NextSupportivePartitionId, shadowPartitionId.InternalPartitionId + 1);
}

NextSupportivePartitionId = Max(NextSupportivePartitionId, shadowPartitionId.InternalPartitionId + 1);
SubscribeWriteId(writeId, ctx);
}

NewSupportivePartitions.clear();
Expand Down Expand Up @@ -3283,7 +3288,7 @@ bool TPersQueue::CheckTxWriteOperation(const NKikimrPQ::TPartitionOperation& ope
TPartitionId partitionId(operation.GetPartitionId(),
writeId,
operation.GetSupportivePartition());
PQ_LOG_D("partitionId=" << partitionId);
PQ_LOG_D("PartitionId " << partitionId << " for WriteId " << writeId);
return Partitions.contains(partitionId);
}

Expand All @@ -3294,7 +3299,6 @@ bool TPersQueue::CheckTxWriteOperations(const NKikimrPQ::TDataTransaction& txBod
}

const TWriteId writeId = GetWriteId(txBody);
PQ_LOG_D("writeId=" << writeId);

for (auto& operation : txBody.GetOperations()) {
auto isWrite = [](const NKikimrPQ::TPartitionOperation& o) {
Expand All @@ -3320,7 +3324,7 @@ void TPersQueue::HandleDataTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransact
const NKikimrPQ::TDataTransaction& txBody = event.GetData();

if (TabletState != NKikimrPQ::ENormal) {
PQ_LOG_D("invalid PQ tablet state (" << NKikimrPQ::ETabletState_Name(TabletState) << ")");
PQ_LOG_D("TxId " << event.GetTxId() << " invalid PQ tablet state (" << NKikimrPQ::ETabletState_Name(TabletState) << ")");
SendProposeTransactionAbort(ActorIdFromProto(event.GetSourceActor()),
event.GetTxId(),
NKikimrPQ::TError::ERROR,
Expand All @@ -3334,7 +3338,7 @@ void TPersQueue::HandleDataTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransact
//

if (txBody.OperationsSize() <= 0) {
PQ_LOG_D("empty list of operations");
PQ_LOG_D("TxId " << event.GetTxId() << " empty list of operations");
SendProposeTransactionAbort(ActorIdFromProto(event.GetSourceActor()),
event.GetTxId(),
NKikimrPQ::TError::BAD_REQUEST,
Expand All @@ -3344,7 +3348,7 @@ void TPersQueue::HandleDataTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransact
}

if (!CheckTxWriteOperations(txBody)) {
PQ_LOG_D("invalid WriteId " << txBody.GetWriteId());
PQ_LOG_D("TxId " << event.GetTxId() << " invalid WriteId " << txBody.GetWriteId());
SendProposeTransactionAbort(ActorIdFromProto(event.GetSourceActor()),
event.GetTxId(),
NKikimrPQ::TError::BAD_REQUEST,
Expand All @@ -3353,9 +3357,36 @@ void TPersQueue::HandleDataTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransact
return;
}

if (txBody.HasWriteId()) {
const TWriteId writeId = GetWriteId(txBody);
if (!TxWrites.contains(writeId)) {
PQ_LOG_D("TxId " << event.GetTxId() << " unknown WriteId " << writeId);
SendProposeTransactionAbort(ActorIdFromProto(event.GetSourceActor()),
event.GetTxId(),
NKikimrPQ::TError::BAD_REQUEST,
"unknown WriteId",
ctx);
return;
}

TTxWriteInfo& writeInfo = TxWrites.at(writeId);
if (writeInfo.Deleting) {
PQ_LOG_W("TxId " << event.GetTxId() << " WriteId " << writeId << " will be deleted");
SendProposeTransactionAbort(ActorIdFromProto(event.GetSourceActor()),
event.GetTxId(),
NKikimrPQ::TError::BAD_REQUEST,
"WriteId will be deleted",
ctx);
return;
}

writeInfo.TxId = event.GetTxId();
PQ_LOG_D("TxId " << event.GetTxId() << " has WriteId " << writeId);
}

TMaybe<TPartitionId> partitionId = FindPartitionId(txBody);
if (!partitionId.Defined()) {
PQ_LOG_D("unknown partition for WriteId " << txBody.GetWriteId());
PQ_LOG_W("TxId " << event.GetTxId() << " unknown partition for WriteId " << txBody.GetWriteId());
SendProposeTransactionAbort(ActorIdFromProto(event.GetSourceActor()),
event.GetTxId(),
NKikimrPQ::TError::INTERNAL,
Expand Down Expand Up @@ -3568,13 +3599,15 @@ bool TPersQueue::CanProcessTxWrites() const
void TPersQueue::SubscribeWriteId(const TWriteId& writeId,
const TActorContext& ctx)
{
PQ_LOG_D("send TEvSubscribeLock for WriteId " << writeId);
ctx.Send(NLongTxService::MakeLongTxServiceID(writeId.NodeId),
new NLongTxService::TEvLongTxService::TEvSubscribeLock(writeId.KeyId, writeId.NodeId));
}

void TPersQueue::UnsubscribeWriteId(const TWriteId& writeId,
const TActorContext& ctx)
{
PQ_LOG_D("send TEvUnsubscribeLock for WriteId " << writeId);
ctx.Send(NLongTxService::MakeLongTxServiceID(writeId.NodeId),
new NLongTxService::TEvLongTxService::TEvUnsubscribeLock(writeId.KeyId, writeId.NodeId));
}
Expand Down Expand Up @@ -3876,11 +3909,16 @@ void TPersQueue::SavePlanStep(NKikimrPQ::TTabletTxInfo& info)
void TPersQueue::SaveTxWrites(NKikimrPQ::TTabletTxInfo& info)
{
for (auto& [writeId, write] : TxWrites) {
for (auto [partitionId, shadowPartitionId] : write.Partitions) {
if (write.Partitions.empty()) {
auto* txWrite = info.MutableTxWrites()->Add();
SetWriteId(*txWrite, writeId);
txWrite->SetOriginalPartitionId(partitionId);
txWrite->SetInternalPartitionId(shadowPartitionId.InternalPartitionId);
} else {
for (auto [partitionId, shadowPartitionId] : write.Partitions) {
auto* txWrite = info.MutableTxWrites()->Add();
SetWriteId(*txWrite, writeId);
txWrite->SetOriginalPartitionId(partitionId);
txWrite->SetInternalPartitionId(shadowPartitionId.InternalPartitionId);
}
}
}

Expand Down Expand Up @@ -4325,6 +4363,9 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,

WriteTx(tx, NKikimrPQ::TTransaction::EXECUTED);

PQ_LOG_D("delete partitions for TxId " << tx.TxId);
BeginDeletePartitions(tx);

tx.State = NKikimrPQ::TTransaction::EXECUTED;
PQ_LOG_D("TxId " << tx.TxId <<
", NewState " << NKikimrPQ::TTransaction_EState_Name(tx.State));
Expand All @@ -4343,8 +4384,8 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,

case NKikimrPQ::TTransaction::WAIT_RS_ACKS:
PQ_LOG_D("HaveAllRecipientsReceive " << tx.HaveAllRecipientsReceive() <<
", WriteIdIsDisabled " << WriteIdIsDisabled(tx.WriteId));
if (tx.HaveAllRecipientsReceive() && WriteIdIsDisabled(tx.WriteId)) {
", AllSupportivePartitionsHaveBeenDeleted " << AllSupportivePartitionsHaveBeenDeleted(tx.WriteId));
if (tx.HaveAllRecipientsReceive() && AllSupportivePartitionsHaveBeenDeleted(tx.WriteId)) {
DeleteTx(tx);
// implicitly switch to the state DELETING
}
Expand All @@ -4369,7 +4410,7 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,
}
}

bool TPersQueue::WriteIdIsDisabled(const TMaybe<TWriteId>& writeId) const
bool TPersQueue::AllSupportivePartitionsHaveBeenDeleted(const TMaybe<TWriteId>& writeId) const
{
if (!writeId.Defined()) {
return true;
Expand All @@ -4380,26 +4421,21 @@ bool TPersQueue::WriteIdIsDisabled(const TMaybe<TWriteId>& writeId) const
TabletID(), writeId->NodeId, writeId->KeyId);
const TTxWriteInfo& writeInfo = TxWrites.at(*writeId);

bool disabled =
(writeInfo.LongTxSubscriptionStatus != NKikimrLongTxService::TEvLockStatus::STATUS_SUBSCRIBED) &&
PQ_LOG_D("WriteId " << *writeId <<
" Partitions.size=" << writeInfo.Partitions.size());
bool deleted =
writeInfo.Partitions.empty()
;

PQ_LOG_D("WriteId " << *writeId << " is " << (disabled ? "disabled" : "enabled"));

return disabled;
return deleted;
}

void TPersQueue::DeleteWriteId(const TMaybe<TWriteId>& writeId)
{
if (!writeId.Defined()) {
if (!writeId.Defined() || !TxWrites.contains(*writeId)) {
return;
}

Y_ABORT_UNLESS(TxWrites.contains(*writeId),
"PQ %" PRIu64 ", WriteId {%" PRIu64 ", %" PRIu64 "}",
TabletID(), writeId->NodeId, writeId->KeyId);

PQ_LOG_D("delete WriteId " << *writeId);
TxWrites.erase(*writeId);
}
Expand Down Expand Up @@ -4729,7 +4765,7 @@ void TPersQueue::ProcessCheckPartitionStatusRequests(const TPartitionId& partiti
}
}

void TPersQueue::Handle(NLongTxService::TEvLongTxService::TEvLockStatus::TPtr& ev, const TActorContext& ctx)
void TPersQueue::Handle(NLongTxService::TEvLongTxService::TEvLockStatus::TPtr& ev)
{
PQ_LOG_D("Handle TEvLongTxService::TEvLockStatus " << ev->Get()->Record.ShortDebugString());

Expand All @@ -4750,22 +4786,14 @@ void TPersQueue::Handle(NLongTxService::TEvLongTxService::TEvLockStatus::TPtr& e
return;
}

if (!writeInfo.TxId.Defined()) {
PQ_LOG_D("delete write info for WriteId " << writeId);
// the message TEvProposeTransaction will not come anymore
BeginDeletePartitions(writeInfo);
if (writeInfo.TxId.Defined()) {
// the message `TEvProposeTransaction` has already arrived
PQ_LOG_D("there is already a transaction TxId " << writeInfo.TxId << " for WriteId " << writeId);
return;
}

ui64 txId = *writeInfo.TxId;
PQ_LOG_D("delete write info for WriteId " << writeId << " and TxId " << txId);

auto* tx = GetTransaction(ctx, txId);
if (!tx ||
(tx->State == NKikimrPQ::TTransaction::EXECUTED) ||
(tx->State == NKikimrPQ::TTransaction::WAIT_RS_ACKS)) {
BeginDeletePartitions(writeInfo);
}
PQ_LOG_D("delete partitions for WriteId " << writeId);
BeginDeletePartitions(writeInfo);
}

void TPersQueue::Handle(TEvPQ::TEvReadingPartitionStatusRequest::TPtr& ev, const TActorContext& ctx)
Expand Down Expand Up @@ -4865,6 +4893,16 @@ void TPersQueue::BeginDeletePartitions(TTxWriteInfo& writeInfo)
writeInfo.Deleting = true;
}

void TPersQueue::BeginDeletePartitions(const TDistributedTransaction& tx)
{
if (!tx.WriteId.Defined() || !TxWrites.contains(*tx.WriteId)) {
return;
}

TTxWriteInfo& writeInfo = TxWrites.at(*tx.WriteId);
BeginDeletePartitions(writeInfo);
}

TString TPersQueue::LogPrefix() const {
return TStringBuilder() << "[PQ: " << TabletID() << "] ";
}
Expand Down Expand Up @@ -4919,7 +4957,7 @@ bool TPersQueue::HandleHook(STFUNC_SIG)
HFuncTraced(TEvMediatorTimecast::TEvRegisterTabletResult, Handle);
HFuncTraced(TEvPQ::TEvCheckPartitionStatusRequest, Handle);
HFuncTraced(TEvPQ::TEvPartitionScaleStatusChanged, Handle);
HFuncTraced(NLongTxService::TEvLongTxService::TEvLockStatus, Handle);
hFuncTraced(NLongTxService::TEvLongTxService::TEvLockStatus, Handle);
HFuncTraced(TEvPQ::TEvReadingPartitionStatusRequest, Handle);
HFuncTraced(TEvPQ::TEvDeletePartitionDone, Handle);
HFuncTraced(TEvPQ::TEvTransactionCompleted, Handle);
Expand Down
Loading