Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ydb/core/kqp/common/buffer/buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ struct TKqpBufferWriterSettings {
IKqpTransactionManagerPtr TxManager;
NWilson::TTraceId TraceId;
TIntrusivePtr<TKqpCounters> Counters;
TIntrusivePtr<NTxProxy::TTxProxyMon> TxProxyMon;
};

NActors::IActor* CreateKqpBufferWriterActor(TKqpBufferWriterSettings&& settings);
Expand Down
30 changes: 4 additions & 26 deletions ydb/core/kqp/common/kqp_tx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -251,23 +251,12 @@ bool HasOlapTableReadInTx(const NKqpProto::TKqpPhyQuery& physicalQuery) {
return false;
}

bool HasOlapTableWriteInStage(const NKqpProto::TKqpPhyStage& stage, const google::protobuf::RepeatedPtrField< ::NKqpProto::TKqpPhyTable>& tables) {
bool HasOlapTableWriteInStage(const NKqpProto::TKqpPhyStage& stage) {
for (const auto& sink : stage.GetSinks()) {
if (sink.GetTypeCase() == NKqpProto::TKqpSink::kInternalSink && sink.GetInternalSink().GetSettings().Is<NKikimrKqp::TKqpTableSinkSettings>()) {
NKikimrKqp::TKqpTableSinkSettings settings;
YQL_ENSURE(sink.GetInternalSink().GetSettings().UnpackTo(&settings), "Failed to unpack settings");

const bool isOlapSink = std::any_of(
std::begin(tables),
std::end(tables),
[&](const NKqpProto::TKqpPhyTable& table) {
return table.GetKind() == NKqpProto::EKqpPhyTableKind::TABLE_KIND_OLAP
&& google::protobuf::util::MessageDifferencer::Equals(table.GetId(), settings.GetTable());
});

if (isOlapSink) {
return true;
}
return settings.GetIsOlap();
}
}
return false;
Expand All @@ -276,7 +265,7 @@ bool HasOlapTableWriteInStage(const NKqpProto::TKqpPhyStage& stage, const google
bool HasOlapTableWriteInTx(const NKqpProto::TKqpPhyQuery& physicalQuery) {
for (const auto &tx : physicalQuery.GetTransactions()) {
for (const auto &stage : tx.GetStages()) {
if (HasOlapTableWriteInStage(stage, tx.GetTables())) {
if (HasOlapTableWriteInStage(stage)) {
return true;
}
}
Expand Down Expand Up @@ -325,18 +314,7 @@ bool HasOltpTableWriteInTx(const NKqpProto::TKqpPhyQuery& physicalQuery) {
if (sink.GetTypeCase() == NKqpProto::TKqpSink::kInternalSink && sink.GetInternalSink().GetSettings().Is<NKikimrKqp::TKqpTableSinkSettings>()) {
NKikimrKqp::TKqpTableSinkSettings settings;
YQL_ENSURE(sink.GetInternalSink().GetSettings().UnpackTo(&settings), "Failed to unpack settings");

const bool isOltpSink = std::any_of(
std::begin(tx.GetTables()),
std::end(tx.GetTables()),
[&](const NKqpProto::TKqpPhyTable& table) {
return table.GetKind() == NKqpProto::EKqpPhyTableKind::TABLE_KIND_DS
&& google::protobuf::util::MessageDifferencer::Equals(table.GetId(), settings.GetTable());
});

if (isOltpSink) {
return true;
}
return !settings.GetIsOlap();
}
}
}
Expand Down
4 changes: 1 addition & 3 deletions ydb/core/kqp/common/kqp_tx.h
Original file line number Diff line number Diff line change
Expand Up @@ -507,9 +507,7 @@ bool NeedSnapshot(const TKqpTransactionContext& txCtx, const NYql::TKikimrConfig
bool commitTx, const NKqpProto::TKqpPhyQuery& physicalQuery);

bool HasOlapTableReadInTx(const NKqpProto::TKqpPhyQuery& physicalQuery);
bool HasOlapTableWriteInStage(
const NKqpProto::TKqpPhyStage& stage,
const google::protobuf::RepeatedPtrField< ::NKqpProto::TKqpPhyTable>& tables);
bool HasOlapTableWriteInStage(const NKqpProto::TKqpPhyStage& stage);
bool HasOlapTableWriteInTx(const NKqpProto::TKqpPhyQuery& physicalQuery);
bool HasOltpTableReadInTx(const NKqpProto::TKqpPhyQuery& physicalQuery);
bool HasOltpTableWriteInTx(const NKqpProto::TKqpPhyQuery& physicalQuery);
Expand Down
18 changes: 8 additions & 10 deletions ydb/core/kqp/executer_actor/kqp_data_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -221,9 +221,6 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
return ReplyErrorAndDie(Ydb::StatusIds::ABORTED, {});
}

ResponseEv->Record.MutableResponse()->SetStatus(Ydb::StatusIds::SUCCESS);
Counters->TxProxyMon->ReportStatusOK->Inc();

auto addLocks = [this](const ui64 taskId, const auto& data) {
if (data.GetData().template Is<NKikimrTxDataShard::TEvKqpInputActorResultInfo>()) {
NKikimrTxDataShard::TEvKqpInputActorResultInfo info;
Expand Down Expand Up @@ -256,11 +253,13 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
ShardIdToTableInfo->Add(lock.GetDataShard(), stageInfo.Meta.TableKind == ETableKind::Olap, stageInfo.Meta.TablePath);
if (TxManager) {
YQL_ENSURE(stageInfo.Meta.TableKind == ETableKind::Olap);
TxManager->AddShard(lock.GetDataShard(), stageInfo.Meta.TableKind == ETableKind::Olap, stageInfo.Meta.TablePath);
TxManager->AddAction(lock.GetDataShard(), IKqpTransactionManager::EAction::WRITE);
IKqpTransactionManager::TActionFlags flags = IKqpTransactionManager::EAction::WRITE;
if (info.GetHasRead()) {
TxManager->AddAction(lock.GetDataShard(), IKqpTransactionManager::EAction::READ);
flags |= IKqpTransactionManager::EAction::READ;
}

TxManager->AddShard(lock.GetDataShard(), stageInfo.Meta.TableKind == ETableKind::Olap, stageInfo.Meta.TablePath);
TxManager->AddAction(lock.GetDataShard(), flags);
TxManager->AddLock(lock.GetDataShard(), lock);
}
}
Expand Down Expand Up @@ -337,6 +336,9 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
}

void MakeResponseAndPassAway() {
ResponseEv->Record.MutableResponse()->SetStatus(Ydb::StatusIds::SUCCESS);
Counters->TxProxyMon->ReportStatusOK->Inc();

ResponseEv->Snapshot = GetSnapshot();

if (!Locks.empty() || (TxManager && TxManager->HasLocks())) {
Expand Down Expand Up @@ -1942,10 +1944,6 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
return false;
}

bool HasOlapSink(const NKqpProto::TKqpPhyStage& stage, const google::protobuf::RepeatedPtrField< ::NKqpProto::TKqpPhyTable>& tables) {
return NKqp::HasOlapTableWriteInStage(stage, tables);
}

void Execute() {
LWTRACK(KqpDataExecuterStartExecute, ResponseEv->Orbit, TxId);

Expand Down
63 changes: 41 additions & 22 deletions ydb/core/kqp/runtime/kqp_write_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -303,10 +303,11 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
// TODO: Maybe there are better ways to initialize new shards...
for (const auto& shardInfo : ShardedWriteController->GetPendingShards()) {
TxManager->AddShard(shardInfo.ShardId, IsOlap(), TablePath);
TxManager->AddAction(shardInfo.ShardId, IKqpTransactionManager::EAction::WRITE);
IKqpTransactionManager::TActionFlags flags = IKqpTransactionManager::EAction::WRITE;
if (shardInfo.HasRead) {
TxManager->AddAction(shardInfo.ShardId, IKqpTransactionManager::EAction::READ);
flags |= IKqpTransactionManager::EAction::READ;
}
TxManager->AddAction(shardInfo.ShardId, flags);
}
}

Expand Down Expand Up @@ -540,7 +541,6 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
<< " ShardID=" << ev->Get()->Record.GetOrigin() << ","
<< " Sink=" << this->SelfId() << "."
<< getIssues().ToOneLineString());

// TODO: Add new status for splits in datashard. This is tmp solution.
if (getIssues().ToOneLineString().Contains("in a pre/offline state assuming this is due to a finished split (wrong shard state)")) {
ResetShardRetries(ev->Get()->Record.GetOrigin(), ev->Cookie);
Expand All @@ -561,13 +561,12 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
<< " ShardID=" << ev->Get()->Record.GetOrigin() << ","
<< " Sink=" << this->SelfId() << "."
<< getIssues().ToOneLineString());

RuntimeError(
TStringBuilder() << "Disk space exhausted for table `"
<< TablePath << "`. "
<< getIssues().ToOneLineString(),
NYql::NDqProto::StatusIds::PRECONDITION_FAILED,
getIssues());
RuntimeError(
TStringBuilder() << "Disk space exhausted for table `"
<< TablePath << "`. "
<< getIssues().ToOneLineString(),
NYql::NDqProto::StatusIds::UNAVAILABLE,
getIssues());
return;
}
case NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED: {
Expand Down Expand Up @@ -670,7 +669,7 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
preparedInfo.Coordinator = domainCoordinators.Select(*TxId);
}

OnMessageAcknowledged(ev->Get()->Record.GetOrigin());
OnMessageReceived(ev->Get()->Record.GetOrigin());
const auto result = ShardedWriteController->OnMessageAcknowledged(
ev->Get()->Record.GetOrigin(), ev->Cookie);
if (result) {
Expand Down Expand Up @@ -713,7 +712,7 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
return;
}

OnMessageAcknowledged(ev->Get()->Record.GetOrigin());
OnMessageReceived(ev->Get()->Record.GetOrigin());
const auto result = ShardedWriteController->OnMessageAcknowledged(
ev->Get()->Record.GetOrigin(), ev->Cookie);
if (result && result->IsShardEmpty && Mode == EMode::IMMEDIATE_COMMIT) {
Expand All @@ -723,7 +722,7 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
}
}

void OnMessageAcknowledged(const ui64 shardId) {
void OnMessageReceived(const ui64 shardId) {
if (auto it = SendTime.find(shardId); it != std::end(SendTime)) {
Counters->WriteActorWritesLatencyHistogram->Collect((TInstant::Now() - it->second).MilliSeconds());
SendTime.erase(it);
Expand Down Expand Up @@ -847,7 +846,7 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
<< ", Attempts=" << metadata->SendAttempts << ", Mode=" << static_cast<int>(Mode));
Send(
PipeCacheId,
new TEvPipeCache::TEvForward(evWrite.release(), shardId, true),
new TEvPipeCache::TEvForward(evWrite.release(), shardId, /* subscribe */ true),
IEventHandle::FlagTrackDelivery,
metadata->Cookie,
TableWriteActorSpan.GetTraceId());
Expand Down Expand Up @@ -1270,6 +1269,7 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
, Alloc(std::make_shared<NKikimr::NMiniKQL::TScopedAlloc>(__LOCATION__))
, TypeEnv(*Alloc)
, Counters(settings.Counters)
, TxProxyMon(settings.TxProxyMon)
, BufferWriteActor(TWilsonKqp::BufferWriteActor, NWilson::TTraceId(settings.TraceId), "TKqpBufferWriteActor", NWilson::EFlags::AUTO_END)
, BufferWriteActorState(TWilsonKqp::BufferWriteActorState, BufferWriteActor.GetTraceId(),
"BufferWriteActorState::Writing", NWilson::EFlags::AUTO_END)
Expand Down Expand Up @@ -1552,6 +1552,7 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
FillEvWritePrepare(evWrite.get(), shardId, *TxId, TxManager);
}

SendTime[shardId] = TInstant::Now();
CA_LOG_D("Send EvWrite (external) to ShardID=" << shardId << ", isPrepare=" << !isRollback << ", isImmediateCommit=" << isRollback << ", TxId=" << evWrite->Record.GetTxId()
<< ", LockTxId=" << evWrite->Record.GetLockTxId() << ", LockNodeId=" << evWrite->Record.GetLockNodeId()
<< ", Locks= " << [&]() {
Expand All @@ -1565,10 +1566,11 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
<< ", OperationsCount=" << 0 << ", IsFinal=" << 1
<< ", Attempts=" << 0);

// TODO: Track latecy
Send(
NKikimr::MakePipePerNodeCacheID(false),
new TEvPipeCache::TEvForward(evWrite.release(), shardId, true),
0,
new TEvPipeCache::TEvForward(evWrite.release(), shardId, /* subscribe */ true),
IEventHandle::FlagTrackDelivery,
0);
}
}
Expand Down Expand Up @@ -1672,26 +1674,30 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub

switch (res->GetStatus()) {
case TEvTxProxy::TEvProposeTransactionStatus::EStatus::StatusAccepted:
// TODO: metrics
TxProxyMon->ClientTxStatusAccepted->Inc();
break;
case TEvTxProxy::TEvProposeTransactionStatus::EStatus::StatusProcessed:
TxProxyMon->ClientTxStatusProcessed->Inc();
break;
case TEvTxProxy::TEvProposeTransactionStatus::EStatus::StatusConfirmed:
TxProxyMon->ClientTxStatusConfirmed->Inc();
break;

case TEvTxProxy::TEvProposeTransactionStatus::EStatus::StatusPlanned:
TxProxyMon->ClientTxStatusPlanned->Inc();
break;

case TEvTxProxy::TEvProposeTransactionStatus::EStatus::StatusOutdated:
case TEvTxProxy::TEvProposeTransactionStatus::EStatus::StatusDeclined:
case TEvTxProxy::TEvProposeTransactionStatus::EStatus::StatusDeclinedNoSpace:
case TEvTxProxy::TEvProposeTransactionStatus::EStatus::StatusRestarting:
// TODO: CancelProposal???
TxProxyMon->ClientTxStatusCoordinatorDeclined->Inc();
ReplyErrorAndDie(TStringBuilder() << "Failed to plan transaction, status: " << res->GetStatus(), NYql::NDqProto::StatusIds::UNAVAILABLE, {});
break;

case TEvTxProxy::TEvProposeTransactionStatus::EStatus::StatusUnknown:
case TEvTxProxy::TEvProposeTransactionStatus::EStatus::StatusAborted:
TxProxyMon->ClientTxStatusCoordinatorDeclined->Inc();
ReplyErrorAndDie(TStringBuilder() << "Unexpected TEvProposeTransactionStatus status: " << res->GetStatus(), NYql::NDqProto::StatusIds::INTERNAL_ERROR, {});
break;
}
Expand Down Expand Up @@ -1797,7 +1803,6 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
<< " ShardID=" << ev->Get()->Record.GetOrigin() << ","
<< " Sink=" << this->SelfId() << "."
<< getIssues().ToOneLineString());

ReplyErrorAndDie(
TStringBuilder() << "Internal error for table. "
<< getIssues().ToOneLineString(),
Expand All @@ -1810,11 +1815,10 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
<< " ShardID=" << ev->Get()->Record.GetOrigin() << ","
<< " Sink=" << this->SelfId() << "."
<< getIssues().ToOneLineString());

ReplyErrorAndDie(
TStringBuilder() << "Disk space exhausted for table. "
<< getIssues().ToOneLineString(),
NYql::NDqProto::StatusIds::PRECONDITION_FAILED,
NYql::NDqProto::StatusIds::UNAVAILABLE,
getIssues());
return;
}
Expand All @@ -1824,7 +1828,6 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
<< " Sink=" << this->SelfId() << "."
<< " Ignored this error."
<< getIssues().ToOneLineString());
// TODO: support waiting
ReplyErrorAndDie(
TStringBuilder() << "Tablet " << ev->Get()->Record.GetOrigin() << " is overloaded."
<< getIssues().ToOneLineString(),
Expand Down Expand Up @@ -1886,11 +1889,23 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
}
}

void OnMessageReceived(const ui64 shardId) {
if (auto it = SendTime.find(shardId); it != std::end(SendTime)) {
Counters->WriteActorWritesLatencyHistogram->Collect((TInstant::Now() - it->second).MilliSeconds());
SendTime.erase(it);
}
}

void ProcessWritePreparedShard(NKikimr::NEvents::TDataEvents::TEvWriteResult::TPtr& ev) {
if (State != EState::PREPARING) {
CA_LOG_D("Ignored write prepared event.");
return;
}
OnMessageReceived(ev->Get()->Record.GetOrigin());
CA_LOG_D("Got prepared result TxId=" << ev->Get()->Record.GetTxId()
<< ", TabletId=" << ev->Get()->Record.GetOrigin()
<< ", Cookie=" << ev->Cookie);

const auto& record = ev->Get()->Record;
IKqpTransactionManager::TPrepareResult preparedInfo;
preparedInfo.ShardId = record.GetOrigin();
Expand All @@ -1912,6 +1927,7 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
CA_LOG_D("Ignored write completed event.");
return;
}
OnMessageReceived(ev->Get()->Record.GetOrigin());
CA_LOG_D("Got completed result TxId=" << ev->Get()->Record.GetTxId()
<< ", TabletId=" << ev->Get()->Record.GetOrigin()
<< ", Cookie=" << ev->Cookie
Expand Down Expand Up @@ -2033,6 +2049,9 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
IShardedWriteControllerPtr ShardedWriteController = nullptr;

TIntrusivePtr<TKqpCounters> Counters;
TIntrusivePtr<NTxProxy::TTxProxyMon> TxProxyMon;
THashMap<ui64, TInstant> SendTime;

NWilson::TSpan BufferWriteActor;
NWilson::TSpan BufferWriteActorState;
};
Expand Down
Loading
Loading