Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ydb/core/protos/config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1725,6 +1725,7 @@ message TColumnShardConfig {
optional uint64 MemoryLimitScanPortion = 27 [default = 100000000];
optional string ReaderClassName = 28;
optional bool AllowNullableColumnsInPK = 29 [default = false];
optional uint32 RestoreDataOnWriteTimeoutSeconds = 30;
}

message TSchemeShardConfig {
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/tx/columnshard/columnshard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,13 @@ void TColumnShard::CleanupActors(const TActorContext& ctx) {
if (BackgroundSessionsManager) {
BackgroundSessionsManager->Stop();
}
InFlightReadsTracker.Stop(this);
ctx.Send(ResourceSubscribeActor, new TEvents::TEvPoisonPill);
ctx.Send(BufferizationWriteActorId, new TEvents::TEvPoisonPill);
ctx.Send(DataAccessorsControlActorId, new TEvents::TEvPoisonPill);
if (!!OperationsManager) {
OperationsManager->StopWriting();
}
if (PrioritizationClientId) {
NPrioritiesQueue::TCompServiceOperator::UnregisterClient(PrioritizationClientId);
}
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/columnshard/columnshard.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ namespace TEvColumnShard {
public:
std::optional<NOlap::TSnapshot> ReadFromSnapshot;
std::optional<NOlap::TSnapshot> ReadToSnapshot;
TString TaskIdentifier;
std::shared_ptr<NOlap::TPKRangesFilter> RangesFilter;
public:
void AddColumn(const ui32 id, const TString& columnName) {
Expand Down
36 changes: 27 additions & 9 deletions ydb/core/tx/columnshard/columnshard__write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,15 @@ void TColumnShard::Handle(NPrivateEvents::NWrite::TEvWritePortionResult::TPtr& e
NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD_WRITE)("tablet_id", TabletID())("event", "TEvWritePortionResult");
std::vector<TNoDataWrite> noDataWrites = ev->Get()->DetachNoDataWrites();
for (auto&& i : noDataWrites) {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD_WRITE)("event", "no_data_write_finished")("writing_size", i.GetDataSize())("writing_id", i.GetWriteMeta().GetId());
Counters.GetWritesMonitor()->OnFinishWrite(i.GetDataSize(), 1);
}
if (ev->Get()->GetWriteStatus() == NKikimrProto::OK) {
std::vector<TInsertedPortions> writtenPacks = ev->Get()->DetachInsertedPacks();
const TMonotonic now = TMonotonic::Now();
for (auto&& i : writtenPacks) {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD_WRITE)("writing_size", i.GetDataSize())("event", "data_write_finished")(
"writing_id", i.GetWriteMeta().GetId());
Counters.OnWritePutBlobsSuccess(now - i.GetWriteMeta().GetWriteStartInstant(), i.GetRecordsCount());
Counters.GetWritesMonitor()->OnFinishWrite(i.GetDataSize(), 1);
}
Expand All @@ -115,6 +118,8 @@ void TColumnShard::Handle(NPrivateEvents::NWrite::TEvWritePortionResult::TPtr& e
for (auto&& i : writtenPacks) {
Counters.OnWritePutBlobsFailed(now - i.GetWriteMeta().GetWriteStartInstant(), i.GetRecordsCount());
Counters.GetCSCounters().OnWritePutBlobsFail(now - i.GetWriteMeta().GetWriteStartInstant());
AFL_WARN(NKikimrServices::TX_COLUMNSHARD_WRITE)("writing_size", i.GetDataSize())("event", "data_write_error")(
"writing_id", i.GetWriteMeta().GetId());
Counters.GetWritesMonitor()->OnFinishWrite(i.GetDataSize(), 1);
}
Execute(new TTxBlobsWritingFailed(this, ev->Get()->GetWriteStatus(), std::move(writtenPacks)), ctx);
Expand All @@ -131,10 +136,11 @@ void TColumnShard::Handle(TEvPrivate::TEvWriteBlobsResult::TPtr& ev, const TActo
auto baseAggregations = wBuffer.GetAggregations();
wBuffer.InitReplyReceived(TMonotonic::Now());

Counters.GetWritesMonitor()->OnFinishWrite(wBuffer.GetSumSize(), wBuffer.GetAggregations().size());

for (auto&& aggr : baseAggregations) {
const auto& writeMeta = aggr->GetWriteMeta();
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_WRITE)("event", "blobs_write_finished")("writing_size", aggr->GetSize())(
"writing_id", writeMeta.GetId())("status", putResult.GetPutStatus());
Counters.GetWritesMonitor()->OnFinishWrite(aggr->GetSize(), 1);

if (!TablesManager.IsReadyForWrite(writeMeta.GetTableId())) {
ACFL_ERROR("event", "absent_pathId")("path_id", writeMeta.GetTableId())("has_index", TablesManager.HasPrimaryIndex());
Expand Down Expand Up @@ -210,7 +216,7 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex
granuleShardingVersion = record.GetGranuleShardingVersion();
}

NEvWrite::TWriteMeta writeMeta(writeId, pathId, source, granuleShardingVersion);
NEvWrite::TWriteMeta writeMeta(writeId, pathId, source, granuleShardingVersion, TGUID::CreateTimebased().AsGuidString());
if (record.HasModificationType()) {
writeMeta.SetModificationType(TEnumOperator<NEvWrite::EModificationType>::DeserializeFromProto(record.GetModificationType()));
}
Expand Down Expand Up @@ -288,7 +294,7 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex
writeData.MutableWriteMeta().SetWriteMiddle1StartInstant(TMonotonic::Now());

NOlap::TWritingContext context(TabletID(), SelfId(), snapshotSchema, StoragesManager, Counters.GetIndexationCounters().SplitterCounters,
Counters.GetCSCounters().WritingCounters, GetLastTxSnapshot());
Counters.GetCSCounters().WritingCounters, GetLastTxSnapshot(), std::make_shared<TAtomicCounter>(1));
std::shared_ptr<NConveyor::ITask> task =
std::make_shared<NOlap::TBuildBatchesTask>(BufferizationWriteActorId, std::move(writeData), context);
NConveyor::TInsertServiceOperator::AsyncTaskToExecute(task);
Expand Down Expand Up @@ -460,6 +466,15 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
const auto& record = ev->Get()->Record;
const auto source = ev->Sender;
const auto cookie = ev->Cookie;

if (!TablesManager.GetPrimaryIndex()) {
Counters.GetTabletCounters()->IncCounter(COUNTER_WRITE_FAIL);
auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(
TabletID(), 0, NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, "schema not ready for writing");
ctx.Send(source, result.release(), 0, cookie);
return;
}

const auto behaviourConclusion = TOperationsManager::GetBehaviour(*ev->Get());
AFL_TRACE(NKikimrServices::TX_COLUMNSHARD_WRITE)("ev_write", record.DebugString());
if (behaviourConclusion.IsFail()) {
Expand Down Expand Up @@ -560,12 +575,10 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
if (overloadStatus != EOverloadStatus::None) {
std::unique_ptr<NActors::IEventBase> result = NEvents::TDataEvents::TEvWriteResult::BuildError(
TabletID(), 0, NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED, "overload data error");
OverloadWriteFail(overloadStatus, NEvWrite::TWriteMeta(0, pathId, source, {}), arrowData->GetSize(), cookie, std::move(result), ctx);
OverloadWriteFail(overloadStatus, NEvWrite::TWriteMeta(0, pathId, source, {}, TGUID::CreateTimebased().AsGuidString()), arrowData->GetSize(), cookie, std::move(result), ctx);
return;
}

Counters.GetWritesMonitor()->OnStartWrite(arrowData->GetSize());

std::optional<ui32> granuleShardingVersionId;
if (record.HasGranuleShardingVersionId()) {
granuleShardingVersionId = record.GetGranuleShardingVersionId();
Expand All @@ -586,10 +599,15 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
OperationsManager->RegisterLock(lockId, Generation());
auto writeOperation = OperationsManager->RegisterOperation(
pathId, lockId, cookie, granuleShardingVersionId, *mType, AppDataVerified().FeatureFlags.GetEnableWritePortionsOnInsert());

AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_WRITE)("writing_size", arrowData->GetSize())("operation_id", writeOperation->GetIdentifier())(
"in_flight", Counters.GetWritesMonitor()->GetWritesInFlight())("size_in_flight", Counters.GetWritesMonitor()->GetWritesSizeInFlight());
Counters.GetWritesMonitor()->OnStartWrite(arrowData->GetSize());

Y_ABORT_UNLESS(writeOperation);
writeOperation->SetBehaviour(behaviour);
NOlap::TWritingContext wContext(pathId, SelfId(), schema, StoragesManager, Counters.GetIndexationCounters().SplitterCounters,
Counters.GetCSCounters().WritingCounters, NOlap::TSnapshot::Max());
NOlap::TWritingContext wContext(TabletID(), SelfId(), schema, StoragesManager, Counters.GetIndexationCounters().SplitterCounters,
Counters.GetCSCounters().WritingCounters, NOlap::TSnapshot::Max(), writeOperation->GetActivityChecker());
arrowData->SetSeparationPoints(GetIndexAs<NOlap::TColumnEngineForLogs>().GetGranulePtrVerified(pathId)->GetBucketPositions());
writeOperation->Start(*this, arrowData, source, wContext);
}
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/tx/columnshard/columnshard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ class TColumnShard: public TActor<TColumnShard>, public NTabletFlatExecutor::TTa
switch (ev->GetTypeRewrite()) {
HFunc(TEvTablet::TEvTabletDead, HandleTabletDead);
default:
LOG_S_WARN("TColumnShard.StateBroken at " << TabletID() << " unhandled event type: " << ev->GetTypeRewrite()
LOG_S_WARN("TColumnShard.StateBroken at " << TabletID() << " unhandled event type: " << ev->GetTypeName()
<< " event: " << ev->ToString());
Send(IEventHandle::ForwardOnNondelivery(std::move(ev), NActors::TEvents::TEvUndelivered::ReasonActorUnknown));
break;
Expand Down Expand Up @@ -453,7 +453,7 @@ class TColumnShard: public TActor<TColumnShard>, public NTabletFlatExecutor::TTa

default:
if (!HandleDefaultEvents(ev, SelfId())) {
LOG_S_WARN("TColumnShard.StateWork at " << TabletID() << " unhandled event type: " << ev->GetTypeRewrite()
LOG_S_WARN("TColumnShard.StateWork at " << TabletID() << " unhandled event type: " << ev->GetTypeName()
<< " event: " << ev->ToString());
}
break;
Expand Down
33 changes: 33 additions & 0 deletions ydb/core/tx/columnshard/counters/writes_monitor.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#include "writes_monitor.h"

#include <ydb/library/actors/core/log.h>

namespace NKikimr::NColumnShard {

TAtomicCounter TWritesMonitor::WritesInFlight = 0;
TAtomicCounter TWritesMonitor::WritesSizeInFlight = 0;

void TWritesMonitor::OnStartWrite(const ui64 dataSize) {
++WritesInFlightLocal;
WritesSizeInFlightLocal += dataSize;
WritesInFlight.Inc();
WritesSizeInFlight.Add(dataSize);
UpdateTabletCounters();
}

void TWritesMonitor::OnFinishWrite(const ui64 dataSize, const ui32 writesCount /*= 1*/) {
AFL_VERIFY(writesCount <= WritesInFlightLocal);
AFL_VERIFY(dataSize <= WritesSizeInFlightLocal);
WritesSizeInFlightLocal -= dataSize;
WritesInFlightLocal -= writesCount;
AFL_VERIFY(0 <= WritesInFlight.Sub(writesCount));
AFL_VERIFY(0 <= WritesSizeInFlight.Sub(dataSize));
UpdateTabletCounters();
}

TString TWritesMonitor::DebugString() const {
return TStringBuilder() << "{object=write_monitor;count_local=" << WritesInFlightLocal << ";size_local=" << WritesSizeInFlightLocal << ";"
<< "count_node=" << WritesInFlight.Val() << ";size_node=" << WritesSizeInFlight.Val() << "}";
}

} // namespace NKikimr::NColumnShard
36 changes: 18 additions & 18 deletions ydb/core/tx/columnshard/counters/writes_monitor.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,40 +6,40 @@

namespace NKikimr::NColumnShard {

class TWritesMonitor {
class TWritesMonitor: TNonCopyable {
private:
TTabletCountersBase& Stats;

YDB_READONLY(ui64, WritesInFlight, 0);
YDB_READONLY(ui64, WritesSizeInFlight, 0);
static TAtomicCounter WritesInFlight;
static TAtomicCounter WritesSizeInFlight;
ui64 WritesInFlightLocal = 0;
ui64 WritesSizeInFlightLocal = 0;

public:
TWritesMonitor(TTabletCountersBase& stats)
: Stats(stats) {
}

void OnStartWrite(const ui64 dataSize) {
++WritesInFlight;
WritesSizeInFlight += dataSize;
UpdateTabletCounters();
~TWritesMonitor() {
OnFinishWrite(WritesSizeInFlightLocal, WritesInFlightLocal);
}

void OnFinishWrite(const ui64 dataSize, const ui32 writesCount = 1) {
Y_ABORT_UNLESS(WritesInFlight > 0);
Y_ABORT_UNLESS(WritesSizeInFlight >= dataSize);
WritesInFlight -= writesCount;
WritesSizeInFlight -= dataSize;
UpdateTabletCounters();
}
void OnStartWrite(const ui64 dataSize);

void OnFinishWrite(const ui64 dataSize, const ui32 writesCount = 1);

TString DebugString() const {
return TStringBuilder() << "{object=write_monitor;count=" << WritesInFlight << ";size=" << WritesSizeInFlight
<< "}";
TString DebugString() const;

ui64 GetWritesInFlight() const {
return WritesInFlight.Val();
}
ui64 GetWritesSizeInFlight() const {
return WritesSizeInFlight.Val();
}

private:
void UpdateTabletCounters() {
Stats.Simple()[COUNTER_WRITES_IN_FLY].Set(WritesInFlight);
Stats.Simple()[COUNTER_WRITES_IN_FLY].Set(WritesInFlightLocal);
}
};

Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/columnshard/counters/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ SRCS(
scan.cpp
splitter.cpp
portions.cpp
writes_monitor.cpp
)

PEERDIR(
Expand Down
69 changes: 62 additions & 7 deletions ydb/core/tx/columnshard/data_reader/actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,50 +3,105 @@
namespace NKikimr::NOlap::NDataReader {

void TActor::HandleExecute(NKqp::TEvKqpCompute::TEvScanData::TPtr& ev) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_RESTORE)("event", "scan_data");
LastAck = std::nullopt;
if (!CheckActivity()) {
TBase::Send(*ScanActorId, new NKqp::TEvKqp::TEvAbortExecution(NYql::NDqProto::StatusIds::ABORTED, "external task aborted"));
return;
}
SwitchStage(EStage::WaitData, EStage::WaitData);
auto data = ev->Get()->ArrowBatch;
AFL_VERIFY(!!data || ev->Get()->Finished);
if (data) {
AFL_VERIFY(ScanActorId);
const auto status = RestoreTask->OnDataChunk(data);
if (status.IsSuccess()) {
TBase::Send(*ScanActorId, new NKqp::TEvKqpCompute::TEvScanDataAck(FreeSpace, 1, 1));
TBase::Send(*ScanActorId, new NKqp::TEvKqpCompute::TEvScanDataAck(FreeSpace, 1, 1), NActors::IEventHandle::FlagTrackDelivery);
LastAck = TMonotonic::Now();
} else {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_RESTORE)("event", "scan_data_restore_fail")("message", status.GetErrorMessage());
SwitchStage(EStage::WaitData, EStage::Finished);
TBase::Send(*ScanActorId, NKqp::TEvKqp::TEvAbortExecution::Aborted("task finished: " + status.GetErrorMessage()).Release());
PassAway();
}
} else {
SwitchStage(EStage::WaitData, EStage::Finished);
auto status = RestoreTask->OnFinished();
if (status.IsFail()) {
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "restore_task_finished_error")("reason", status.GetErrorMessage());
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD_RESTORE)("event", "restore_task_finished_error")("reason", status.GetErrorMessage());
} else {
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "restore_task_finished")("reason", status.GetErrorMessage());
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_RESTORE)("event", "restore_task_finished");
}
PassAway();
}
}

void TActor::HandleExecute(NKqp::TEvKqpCompute::TEvScanInitActor::TPtr& ev) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_RESTORE)("event", "init_actor");
LastAck = std::nullopt;
if (!CheckActivity()) {
TBase::Send(*ScanActorId, new NKqp::TEvKqp::TEvAbortExecution(NYql::NDqProto::StatusIds::ABORTED, "external task aborted"));
return;
}
SwitchStage(EStage::Initialization, EStage::WaitData);
AFL_VERIFY(!ScanActorId);
auto& msg = ev->Get()->Record;
ScanActorId = ActorIdFromProto(msg.GetScanActorId());
TBase::Send(*ScanActorId, new NKqp::TEvKqpCompute::TEvScanDataAck(FreeSpace, 1, 1));
TBase::Send(*ScanActorId, new NKqp::TEvKqpCompute::TEvScanDataAck(FreeSpace, 1, 1), NActors::IEventHandle::FlagTrackDelivery);
LastAck = TMonotonic::Now();
}

void TActor::HandleExecute(NKqp::TEvKqpCompute::TEvScanError::TPtr& ev) {
SwitchStage(std::nullopt, EStage::Finished);
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "problem_on_restore_data")(
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD_RESTORE)("event", "problem_on_restore_data")(
"reason", NYql::IssuesFromMessageAsString(ev->Get()->Record.GetIssues()));
RestoreTask->OnError(NYql::IssuesFromMessageAsString(ev->Get()->Record.GetIssues()));
PassAway();
}

void TActor::HandleExecute(NActors::TEvents::TEvWakeup::TPtr& /*ev*/) {
if (!CheckActivity()) {
TBase::Send(*ScanActorId, new NKqp::TEvKqp::TEvAbortExecution(NYql::NDqProto::StatusIds::ABORTED, "external task aborted"));
return;
}

if (LastAck && TMonotonic::Now() - *LastAck > RestoreTask->GetTimeout()) {
SwitchStage(std::nullopt, EStage::Finished);
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD_RESTORE)("event", "problem_timeout");
RestoreTask->OnError("timeout on restore data");
TBase::Send(*ScanActorId, new NKqp::TEvKqp::TEvAbortExecution(NYql::NDqProto::StatusIds::ABORTED, "external task aborted"));
PassAway();
return;
}
Schedule(TDuration::Seconds(1), new NActors::TEvents::TEvWakeup());
}

void TActor::Bootstrap(const TActorContext& /*ctx*/) {
if (!CheckActivity()) {
return;
}
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_RESTORE)("event", "start_restore")("tablet_actor_id", RestoreTask->GetTabletActorId())(
"this", (ui64)this);
auto evStart = RestoreTask->BuildRequestInitiator();
Send(RestoreTask->GetTabletActorId(), evStart.release());
Send(RestoreTask->GetTabletActorId(), evStart.release(), NActors::IEventHandle::FlagTrackDelivery);
LastAck = TMonotonic::Now();
Become(&TActor::StateFunc);
Schedule(TDuration::Seconds(1), new NActors::TEvents::TEvWakeup());
}

bool TActor::CheckActivity() {
if (AbortedFlag) {
return false;
}
if (RestoreTask->IsActive()) {
return true;
}
AbortedFlag = true;
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "restoring_cancelled_from_operation");
SwitchStage(std::nullopt, EStage::Finished);
RestoreTask->OnError("restore task aborted through operation cancelled");
PassAway();
return false;
}

}
} // namespace NKikimr::NOlap::NDataReader
Loading
Loading