Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions ydb/core/formats/arrow/program.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,18 +139,20 @@ class TKernelFunction : public IStepFunction<TAssignObject> {
TKernelFunction(const TFunctionPtr kernelsFunction, arrow::compute::ExecContext* ctx)
: TBase(ctx)
, Function(kernelsFunction)
{}
{
AFL_VERIFY(Function);
}

arrow::Result<arrow::Datum> Call(const TAssignObject& assign, const TDatumBatch& batch) const override {
auto arguments = TBase::BuildArgs(batch, assign.GetArguments());
if (!arguments) {
return arrow::Status::Invalid("Error parsing args.");
}
try {
// try {
return Function->Execute(*arguments, assign.GetOptions(), TBase::Ctx);
} catch (const std::exception& ex) {
return arrow::Status::ExecutionError(ex.what());
}
// } catch (const std::exception& ex) {
// return arrow::Status::ExecutionError(ex.what());
// }
}
};

Expand Down
9 changes: 8 additions & 1 deletion ydb/core/tx/columnshard/blobs_action/tier/write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ void TWriteAction::DoOnExecuteTxAfterWrite(NColumnShard::TColumnShard& /*self*/,
for (auto&& i : GetBlobsForWrite()) {
dbBlobs.RemoveTierDraftBlobId(GetStorageId(), i.first);
dbBlobs.AddTierBlobToDelete(GetStorageId(), i.first);
GCInfo->MutableBlobsToDelete().emplace_back(i.first);
}
}
}
Expand All @@ -41,4 +40,12 @@ NKikimr::NOlap::TUnifiedBlobId TWriteAction::AllocateNextBlobId(const TString& d
return TUnifiedBlobId(Max<ui32>(), TLogoBlobID(TabletId, now.GetValue() >> 32, now.GetValue() & Max<ui32>(), TLogoBlobID::MaxChannel, data.size(), AtomicIncrement(Counter) % TLogoBlobID::MaxCookie, 1));
}

void TWriteAction::DoOnCompleteTxAfterWrite(NColumnShard::TColumnShard& /*self*/, const bool blobsWroteSuccessfully) {
if (!blobsWroteSuccessfully) {
for (auto&& i : GetBlobsForWrite()) {
GCInfo->MutableBlobsToDelete().emplace_back(i.first);
}
}
}

}
4 changes: 1 addition & 3 deletions ydb/core/tx/columnshard/blobs_action/tier/write.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@ class TWriteAction: public IBlobsWritingAction {
}

virtual void DoOnExecuteTxAfterWrite(NColumnShard::TColumnShard& self, NColumnShard::TBlobManagerDb& dbBlobs, const bool blobsWroteSuccessfully) override;
virtual void DoOnCompleteTxAfterWrite(NColumnShard::TColumnShard& /*self*/, const bool /*blobsWroteSuccessfully*/) override {

}
virtual void DoOnCompleteTxAfterWrite(NColumnShard::TColumnShard& /*self*/, const bool blobsWroteSuccessfully) override;
public:
virtual bool NeedDraftTransaction() const override {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ void TTxWrite::Complete(const TActorContext& ctx) {
for (ui32 i = 0; i < buffer.GetAggregations().size(); ++i) {
const auto& writeMeta = buffer.GetAggregations()[i]->GetWriteData()->GetWriteMeta();
ctx.Send(writeMeta.GetSource(), Results[i].release());
Self->CSCounters.OnWriteTxComplete((now - writeMeta.GetWriteStartInstant()).MilliSeconds());
Self->CSCounters.OnWriteTxComplete(now - writeMeta.GetWriteStartInstant());
Self->CSCounters.OnSuccessWriteResponse();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,11 @@ TTxWriteIndex::TTxWriteIndex(TColumnShard* self, TEvPrivate::TEvWriteIndex::TPtr
Y_ABORT_UNLESS(Ev && Ev->Get()->IndexChanges);
}

void TTxWriteIndex::Describe(IOutputStream& out) const noexcept {
out << TypeName(*this);
if (Ev->Get()->IndexChanges) {
out << ": " << Ev->Get()->IndexChanges->DebugString();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class TTxWriteIndex: public TTransactionBase<TColumnShard> {
bool Execute(TTransactionContext& txc, const TActorContext& ctx) override;
void Complete(const TActorContext& ctx) override;
TTxType GetTxType() const override { return TXTYPE_WRITE_INDEX; }
virtual void Describe(IOutputStream& out) const noexcept override;

private:

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/blobs_reader/actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ void TActor::Handle(NBlobCache::TEvBlobCache::TEvReadBlobRangeResult::TPtr& ev)
bool aborted = false;
if (event.Status != NKikimrProto::EReplyStatus::OK) {
WaitingBlobsCount.Sub(Task->GetWaitingCount());
if (!Task->AddError(event.BlobRange, IBlobsReadingAction::TErrorStatus::Fail(event.Status, "cannot get blob"))) {
if (!Task->AddError(event.BlobRange, IBlobsReadingAction::TErrorStatus::Fail(event.Status, "cannot get blob: " + event.Data.substr(1024)))) {
aborted = true;
}
} else {
Expand Down
60 changes: 43 additions & 17 deletions ydb/core/tx/columnshard/columnshard__write.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "columnshard_impl.h"
#include "blobs_action/transaction/tx_write.h"
#include "blobs_action/transaction/tx_draft.h"
#include "counters/columnshard.h"
#include "operations/slice_builder.h"
#include "operations/write_data.h"

Expand All @@ -21,9 +22,17 @@ void TColumnShard::OverloadWriteFail(const EOverloadStatus overloadReason, const
IncCounter(COUNTER_WRITE_OVERLOAD);
CSCounters.OnOverloadInsertTable(writeData.GetSize());
break;
case EOverloadStatus::Shard:
case EOverloadStatus::ShardTxInFly:
IncCounter(COUNTER_WRITE_OVERLOAD);
CSCounters.OnOverloadShard(writeData.GetSize());
CSCounters.OnOverloadShardTx(writeData.GetSize());
break;
case EOverloadStatus::ShardWritesInFly:
IncCounter(COUNTER_WRITE_OVERLOAD);
CSCounters.OnOverloadShardWrites(writeData.GetSize());
break;
case EOverloadStatus::ShardWritesSizeInFly:
IncCounter(COUNTER_WRITE_OVERLOAD);
CSCounters.OnOverloadShardWritesSize(writeData.GetSize());
break;
case EOverloadStatus::None:
Y_ABORT("invalid function usage");
Expand All @@ -45,8 +54,20 @@ TColumnShard::EOverloadStatus TColumnShard::CheckOverloaded(const ui64 tableId)
return EOverloadStatus::InsertTable;
}

if (WritesMonitor.ShardOverloaded()) {
return EOverloadStatus::Shard;
ui64 txLimit = Settings.OverloadTxInFlight;
ui64 writesLimit = Settings.OverloadWritesInFlight;
ui64 writesSizeLimit = Settings.OverloadWritesSizeInFlight;
if (txLimit && Executor()->GetStats().TxInFly > txLimit) {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "shard_overload")("reason", "tx_in_fly")("sum", Executor()->GetStats().TxInFly)("limit", txLimit);
return EOverloadStatus::ShardTxInFly;
}
if (writesLimit && WritesMonitor.GetWritesInFlight() > writesLimit) {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "shard_overload")("reason", "writes_in_fly")("sum", WritesMonitor.GetWritesInFlight())("limit", writesLimit);
return EOverloadStatus::ShardWritesInFly;
}
if (writesSizeLimit && WritesMonitor.GetWritesSizeInFlight() > writesSizeLimit) {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "shard_overload")("reason", "writes_size_in_fly")("sum", WritesMonitor.GetWritesSizeInFlight())("limit", writesSizeLimit);
return EOverloadStatus::ShardWritesSizeInFly;
}
return EOverloadStatus::None;
}
Expand All @@ -57,7 +78,8 @@ void TColumnShard::Handle(TEvPrivate::TEvWriteBlobsResult::TPtr& ev, const TActo
auto& putResult = ev->Get()->GetPutResult();
OnYellowChannels(putResult);
NOlap::TWritingBuffer& wBuffer = ev->Get()->MutableWritesBuffer();
auto& baseAggregations = wBuffer.GetAggregations();
auto baseAggregations = wBuffer.GetAggregations();
wBuffer.InitReplyReceived(TMonotonic::Now());

auto wg = WritesMonitor.FinishWrite(wBuffer.GetSumSize(), wBuffer.GetAggregations().size());

Expand All @@ -70,13 +92,13 @@ void TColumnShard::Handle(TEvPrivate::TEvWriteBlobsResult::TPtr& ev, const TActo

auto result = std::make_unique<TEvColumnShard::TEvWriteResult>(TabletID(), writeMeta, NKikimrTxColumnShard::EResultStatus::ERROR);
ctx.Send(writeMeta.GetSource(), result.release());
CSCounters.OnFailedWriteResponse();
CSCounters.OnFailedWriteResponse(EWriteFailReason::NoTable);
wBuffer.RemoveData(aggr, StoragesManager->GetInsertOperator());
continue;
}

if (putResult.GetPutStatus() != NKikimrProto::OK) {
CSCounters.OnWritePutBlobsFail((TMonotonic::Now() - writeMeta.GetWriteStartInstant()).MilliSeconds());
CSCounters.OnWritePutBlobsFail(TMonotonic::Now() - writeMeta.GetWriteStartInstant());
IncCounter(COUNTER_WRITE_FAIL);

auto errCode = NKikimrTxColumnShard::EResultStatus::STORAGE_ERROR;
Expand All @@ -97,16 +119,17 @@ void TColumnShard::Handle(TEvPrivate::TEvWriteBlobsResult::TPtr& ev, const TActo
auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(TabletID(), operation->GetTxId(), NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, "put data fails");
ctx.Send(writeMeta.GetSource(), result.release());
}
CSCounters.OnFailedWriteResponse();
CSCounters.OnFailedWriteResponse(EWriteFailReason::PutBlob);
wBuffer.RemoveData(aggr, StoragesManager->GetInsertOperator());
} else {
const TMonotonic now = TMonotonic::Now();
CSCounters.OnWritePutBlobsSuccess((now - writeMeta.GetWriteStartInstant()).MilliSeconds());
CSCounters.OnWriteMiddle1PutBlobsSuccess((now - writeMeta.GetWriteMiddle1StartInstant()).MilliSeconds());
CSCounters.OnWriteMiddle2PutBlobsSuccess((now - writeMeta.GetWriteMiddle2StartInstant()).MilliSeconds());
CSCounters.OnWriteMiddle3PutBlobsSuccess((now - writeMeta.GetWriteMiddle3StartInstant()).MilliSeconds());
CSCounters.OnWriteMiddle4PutBlobsSuccess((now - writeMeta.GetWriteMiddle4StartInstant()).MilliSeconds());
CSCounters.OnWriteMiddle5PutBlobsSuccess((now - writeMeta.GetWriteMiddle5StartInstant()).MilliSeconds());
CSCounters.OnWritePutBlobsSuccess(now - writeMeta.GetWriteStartInstant());
CSCounters.OnWriteMiddle1PutBlobsSuccess(now - writeMeta.GetWriteMiddle1StartInstant());
CSCounters.OnWriteMiddle2PutBlobsSuccess(now - writeMeta.GetWriteMiddle2StartInstant());
CSCounters.OnWriteMiddle3PutBlobsSuccess(now - writeMeta.GetWriteMiddle3StartInstant());
CSCounters.OnWriteMiddle4PutBlobsSuccess(now - writeMeta.GetWriteMiddle4StartInstant());
CSCounters.OnWriteMiddle5PutBlobsSuccess(now - writeMeta.GetWriteMiddle5StartInstant());
CSCounters.OnWriteMiddle6PutBlobsSuccess(now - writeMeta.GetWriteMiddle6StartInstant());
LOG_S_DEBUG("Write (record) into pathId " << writeMeta.GetTableId()
<< (writeMeta.GetWriteId() ? (" writeId " + ToString(writeMeta.GetWriteId())).c_str() : "") << " at tablet " << TabletID());

Expand Down Expand Up @@ -139,18 +162,20 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex
IncCounter(signalIndex);

ctx.Send(source, std::make_unique<TEvColumnShard::TEvWriteResult>(TabletID(), writeMeta, NKikimrTxColumnShard::EResultStatus::ERROR));
CSCounters.OnFailedWriteResponse();
return;
};

if (!AppDataVerified().ColumnShardConfig.GetWritingEnabled()) {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "skip_writing")("reason", "disabled");
CSCounters.OnFailedWriteResponse(EWriteFailReason::Disabled);
return returnFail(COUNTER_WRITE_FAIL);
}

if (!TablesManager.IsReadyForWrite(tableId)) {
LOG_S_NOTICE("Write (fail) into pathId:" << writeMeta.GetTableId() << (TablesManager.HasPrimaryIndex()? "": " no index")
<< " at tablet " << TabletID());

CSCounters.OnFailedWriteResponse(EWriteFailReason::NoTable);
return returnFail(COUNTER_WRITE_FAIL);
}

Expand All @@ -159,6 +184,7 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex
if (!arrowData->ParseFromProto(record)) {
LOG_S_ERROR("Write (fail) " << record.GetData().size() << " bytes into pathId " << writeMeta.GetTableId()
<< " at tablet " << TabletID());
CSCounters.OnFailedWriteResponse(EWriteFailReason::IncorrectSchema);
return returnFail(COUNTER_WRITE_FAIL);
}

Expand All @@ -167,7 +193,7 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex
if (overloadStatus != EOverloadStatus::None) {
std::unique_ptr<NActors::IEventBase> result = std::make_unique<TEvColumnShard::TEvWriteResult>(TabletID(), writeData.GetWriteMeta(), NKikimrTxColumnShard::EResultStatus::OVERLOADED);
OverloadWriteFail(overloadStatus, writeData, std::move(result), ctx);
CSCounters.OnFailedWriteResponse();
CSCounters.OnFailedWriteResponse(EWriteFailReason::Overload);
} else {
if (ui64 writeId = (ui64)HasLongTxWrite(writeMeta.GetLongTxIdUnsafe(), writeMeta.GetWritePartId())) {
LOG_S_DEBUG("Write (duplicate) into pathId " << writeMeta.GetTableId()
Expand All @@ -179,7 +205,7 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex
auto result = std::make_unique<TEvColumnShard::TEvWriteResult>(
TabletID(), writeMeta, writeId, NKikimrTxColumnShard::EResultStatus::SUCCESS);
ctx.Send(writeMeta.GetSource(), result.release());
CSCounters.OnFailedWriteResponse();
CSCounters.OnFailedWriteResponse(EWriteFailReason::LongTxDuplication);
return;
}

Expand Down
4 changes: 4 additions & 0 deletions ydb/core/tx/columnshard/columnshard_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,9 @@ void TColumnShard::TryAbortWrites(NIceDb::TNiceDb& db, NOlap::TDbWrapper& dbTabl
failedAborts.push_back(writeId);
}
}
if (failedAborts.size()) {
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "failed_aborts")("count", failedAborts.size())("writes_count", writesToAbort.size());
}
for (auto& writeId : failedAborts) {
writesToAbort.erase(writeId);
}
Expand Down Expand Up @@ -813,6 +816,7 @@ void TColumnShard::SetupCleanupInsertTable() {
if (!InsertTable->GetAborted().size() && !writeIdsToCleanup.size()) {
return;
}
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "cleanup_started")("aborted", InsertTable->GetAborted().size())("to_cleanup", writeIdsToCleanup.size());

Execute(new TTxInsertTableCleanup(this, std::move(writeIdsToCleanup)), TActorContext::AsActorContext());
}
Expand Down
19 changes: 6 additions & 13 deletions ydb/core/tx/columnshard/columnshard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,9 @@ class TColumnShard
void OnTieringModified();
public:
enum class EOverloadStatus {
Shard /* "shard" */,
ShardTxInFly /* "shard_tx" */,
ShardWritesInFly /* "shard_writes" */,
ShardWritesSizeInFly /* "shard_writes_size" */,
InsertTable /* "insert_table" */,
Disk /* "disk" */,
None /* "none" */
Expand Down Expand Up @@ -326,8 +328,8 @@ class TColumnShard
class TWritesMonitor {
private:
TColumnShard& Owner;
ui64 WritesInFlight = 0;
ui64 WritesSizeInFlight = 0;
YDB_READONLY(ui64, WritesInFlight, 0);
YDB_READONLY(ui64, WritesSizeInFlight, 0);

public:
class TGuard: public TNonCopyable {
Expand Down Expand Up @@ -363,17 +365,8 @@ class TColumnShard
return TGuard(*this);
}

bool ShardOverloaded() const {
ui64 txLimit = Owner.Settings.OverloadTxInFlight;
ui64 writesLimit = Owner.Settings.OverloadWritesInFlight;
ui64 writesSizeLimit = Owner.Settings.OverloadWritesSizeInFlight;
return (txLimit && Owner.Executor()->GetStats().TxInFly > txLimit) ||
(writesLimit && WritesInFlight > writesLimit) ||
(writesSizeLimit && WritesSizeInFlight > writesSizeLimit);
}

TString DebugString() const {
return TStringBuilder() << "TWritesMonitor: inflight " << WritesInFlight << " (" << WritesSizeInFlight << " bytes)";
return TStringBuilder() << "{object=write_monitor;count=" << WritesInFlight << ";size=" << WritesSizeInFlight << "}";
}

private:
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/tx/columnshard/common/limits.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#include "limits.h"

namespace NKikimr::NColumnShard {
}
9 changes: 9 additions & 0 deletions ydb/core/tx/columnshard/common/limits.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#pragma once
#include <util/system/types.h>

namespace NKikimr::NOlap {
class TGlobalLimits {
public:
static const inline ui64 TxWriteLimitBytes = 256 * 1024 * 1024;
};
}
8 changes: 7 additions & 1 deletion ydb/core/tx/columnshard/common/snapshot.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
#pragma once
#include <ydb/library/conclusion/status.h>

#include <util/stream/output.h>
#include <util/string/cast.h>
#include <ydb/library/conclusion/status.h>
#include <util/datetime/base.h>

namespace NKikimrColumnShardProto {
class TSnapshot;
Expand All @@ -20,6 +22,10 @@ class TSnapshot {
, TxId(txId) {
}

constexpr TInstant GetPlanInstant() const noexcept {
return TInstant::MilliSeconds(PlanStep);
}

constexpr ui64 GetPlanStep() const noexcept {
return PlanStep;
}
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/columnshard/common/ya.make
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
LIBRARY()

SRCS(
limits.h
reverse_accessor.cpp
scalars.cpp
snapshot.cpp
Expand Down
Loading