Skip to content

Commit bf5884e

Browse files
Merge 37bd04b into d0c941e
2 parents d0c941e + 37bd04b commit bf5884e

File tree

5 files changed

+124
-26
lines changed

5 files changed

+124
-26
lines changed

ydb/core/tx/columnshard/columnshard.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,8 @@ void TColumnShard::Handle(NActors::TEvents::TEvWakeup::TPtr& ev, const TActorCon
250250
const TMonotonic now = TMonotonic::Now();
251251
GetProgressTxController().PingTimeouts(now);
252252
ctx.Schedule(TDuration::Seconds(1), new NActors::TEvents::TEvWakeup(0));
253+
} else if (ev->Get()->Tag == 1) {
254+
WriteTasksQueue.Drain(true, ctx);
253255
}
254256
}
255257

ydb/core/tx/columnshard/columnshard__write.cpp

Lines changed: 38 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -447,6 +447,40 @@ class TAbortWriteTransaction: public NTabletFlatExecutor::TTransactionBase<TColu
447447
ui64 Cookie;
448448
};
449449

450+
bool TColumnShard::TWriteTask::Execute(TColumnShard* owner, const TActorContext& ctx) {
451+
auto overloadStatus = owner->CheckOverloaded(PathId);
452+
if (overloadStatus == EOverloadStatus::OverloadMetadata) {
453+
AFL_INFO(NKikimrServices::TX_COLUMNSHARD_WRITE)("event", "wait_overload")("status", overloadStatus);
454+
return false;
455+
}
456+
owner->Counters.GetCSCounters().WritingCounters->OnWritingTaskDequeue(TMonotonic::Now() - Created);
457+
if (overloadStatus != EOverloadStatus::None) {
458+
std::unique_ptr<NActors::IEventBase> result = NEvents::TDataEvents::TEvWriteResult::BuildError(
459+
owner->TabletID(), 0, NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED, "overload data error");
460+
owner->OverloadWriteFail(overloadStatus, NEvWrite::TWriteMeta(0, PathId, SourceId, {}, TGUID::CreateTimebased().AsGuidString()),
461+
ArrowData->GetSize(), Cookie, std::move(result), ctx);
462+
return true;
463+
}
464+
465+
owner->OperationsManager->RegisterLock(LockId, owner->Generation());
466+
auto writeOperation = owner->OperationsManager->RegisterOperation(
467+
PathId, LockId, Cookie, GranuleShardingVersionId, ModificationType, AppDataVerified().FeatureFlags.GetEnableWritePortionsOnInsert());
468+
469+
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_WRITE)("writing_size", ArrowData->GetSize())("operation_id", writeOperation->GetIdentifier())(
470+
"in_flight", owner->Counters.GetWritesMonitor()->GetWritesInFlight())(
471+
"size_in_flight", owner->Counters.GetWritesMonitor()->GetWritesSizeInFlight());
472+
owner->Counters.GetWritesMonitor()->OnStartWrite(ArrowData->GetSize());
473+
474+
AFL_VERIFY(writeOperation);
475+
writeOperation->SetBehaviour(Behaviour);
476+
NOlap::TWritingContext wContext(owner->TabletID(), owner->SelfId(), Schema, owner->StoragesManager,
477+
owner->Counters.GetIndexationCounters().SplitterCounters, owner->Counters.GetCSCounters().WritingCounters, NOlap::TSnapshot::Max(),
478+
writeOperation->GetActivityChecker());
479+
ArrowData->SetSeparationPoints(owner->GetIndexAs<NOlap::TColumnEngineForLogs>().GetGranulePtrVerified(PathId)->GetBucketPositions());
480+
writeOperation->Start(*owner, ArrowData, SourceId, wContext);
481+
return true;
482+
}
483+
450484
void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActorContext& ctx) {
451485
TMemoryProfileGuard mpg("NEvents::TDataEvents::TEvWrite");
452486
NActors::TLogContextGuard gLogging =
@@ -560,11 +594,8 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
560594
return;
561595
}
562596

563-
auto overloadStatus = CheckOverloaded(pathId);
564-
if (overloadStatus != EOverloadStatus::None) {
565-
std::unique_ptr<NActors::IEventBase> result = NEvents::TDataEvents::TEvWriteResult::BuildError(
566-
TabletID(), 0, NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED, "overload data error");
567-
OverloadWriteFail(overloadStatus, NEvWrite::TWriteMeta(0, pathId, source, {}, TGUID::CreateTimebased().AsGuidString()), arrowData->GetSize(), cookie, std::move(result), ctx);
597+
if (!AppDataVerified().ColumnShardConfig.GetWritingEnabled()) {
598+
sendError("writing disabled", NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED);
568599
return;
569600
}
570601

@@ -580,25 +611,8 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
580611
lockId = record.GetLockTxId();
581612
}
582613

583-
if (!AppDataVerified().ColumnShardConfig.GetWritingEnabled()) {
584-
sendError("writing disabled", NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED);
585-
return;
586-
}
587-
588-
OperationsManager->RegisterLock(lockId, Generation());
589-
auto writeOperation = OperationsManager->RegisterOperation(
590-
pathId, lockId, cookie, granuleShardingVersionId, *mType, AppDataVerified().FeatureFlags.GetEnableWritePortionsOnInsert());
591-
592-
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_WRITE)("writing_size", arrowData->GetSize())("operation_id", writeOperation->GetIdentifier())(
593-
"in_flight", Counters.GetWritesMonitor()->GetWritesInFlight())("size_in_flight", Counters.GetWritesMonitor()->GetWritesSizeInFlight());
594-
Counters.GetWritesMonitor()->OnStartWrite(arrowData->GetSize());
595-
596-
Y_ABORT_UNLESS(writeOperation);
597-
writeOperation->SetBehaviour(behaviour);
598-
NOlap::TWritingContext wContext(TabletID(), SelfId(), schema, StoragesManager, Counters.GetIndexationCounters().SplitterCounters,
599-
Counters.GetCSCounters().WritingCounters, NOlap::TSnapshot::Max(), writeOperation->GetActivityChecker());
600-
arrowData->SetSeparationPoints(GetIndexAs<NOlap::TColumnEngineForLogs>().GetGranulePtrVerified(pathId)->GetBucketPositions());
601-
writeOperation->Start(*this, arrowData, source, wContext);
614+
WriteTasksQueue.Enqueue(TWriteTask(arrowData, schema, source, granuleShardingVersionId, pathId, cookie, lockId, *mType, behaviour));
615+
WriteTasksQueue.Drain(false, ctx);
602616
}
603617

604618
} // namespace NKikimr::NColumnShard

ydb/core/tx/columnshard/columnshard_impl.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ TColumnShard::TColumnShard(TTabletStorageInfo* info, const TActorId& tablet)
7777
, TabletCountersHolder(new TProtobufTabletCounters<ESimpleCounters_descriptor, ECumulativeCounters_descriptor,
7878
EPercentileCounters_descriptor, ETxTypes_descriptor>())
7979
, Counters(*TabletCountersHolder)
80+
, WriteTasksQueue(this)
8081
, ProgressTxController(std::make_unique<TTxController>(*this))
8182
, StoragesManager(std::make_shared<NOlap::TStoragesManager>(*this))
8283
, DataLocksManager(std::make_shared<NOlap::NDataLocks::TManager>())

ydb/core/tx/columnshard/columnshard_impl.h

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ class TGeneralCompactColumnEngineChanges;
9090

9191
namespace NKikimr::NColumnShard {
9292

93+
class TArrowData;
9394
class TEvWriteCommitPrimaryTransactionOperator;
9495
class TEvWriteCommitSecondaryTransactionOperator;
9596
class TTxFinishAsyncTransaction;
@@ -461,9 +462,79 @@ class TColumnShard: public TActor<TColumnShard>, public NTabletFlatExecutor::TTa
461462
}
462463
}
463464

465+
class TWriteTask: TMoveOnly {
466+
private:
467+
std::shared_ptr<TArrowData> ArrowData;
468+
NOlap::ISnapshotSchema::TPtr Schema;
469+
const NActors::TActorId SourceId;
470+
const std::optional<ui32> GranuleShardingVersionId;
471+
const ui64 PathId;
472+
const ui64 Cookie;
473+
const ui64 LockId;
474+
const NEvWrite::EModificationType ModificationType;
475+
const EOperationBehaviour Behaviour;
476+
const TMonotonic Created = TMonotonic::Now();
477+
public:
478+
TWriteTask(const std::shared_ptr<TArrowData>& arrowData, const NOlap::ISnapshotSchema::TPtr& schema, const NActors::TActorId sourceId,
479+
const std::optional<ui32>& granuleShardingVersionId, const ui64 pathId, const ui64 cookie, const ui64 lockId,
480+
const NEvWrite::EModificationType modificationType, const EOperationBehaviour behaviour)
481+
: ArrowData(arrowData)
482+
, Schema(schema)
483+
, SourceId(sourceId)
484+
, GranuleShardingVersionId(granuleShardingVersionId)
485+
, PathId(pathId)
486+
, Cookie(cookie)
487+
, LockId(lockId)
488+
, ModificationType(modificationType)
489+
, Behaviour(behaviour) {
490+
}
491+
492+
const TMonotonic& GetCreatedMonotonic() const {
493+
return Created;
494+
}
495+
496+
bool Execute(TColumnShard* owner, const TActorContext& ctx);
497+
};
498+
499+
class TWriteTasksQueue {
500+
private:
501+
bool WriteTasksOverloadCheckerScheduled = false;
502+
std::deque<TWriteTask> WriteTasks;
503+
i64 PredWriteTasksSize = 0;
504+
TColumnShard* Owner;
505+
public:
506+
TWriteTasksQueue(TColumnShard* owner)
507+
: Owner(owner) {
508+
}
509+
510+
~TWriteTasksQueue() {
511+
Owner->Counters.GetCSCounters().WritingCounters->QueueWaitSize->Sub(PredWriteTasksSize);
512+
}
513+
514+
void Enqueue(TWriteTask&& task) {
515+
WriteTasks.emplace_back(std::move(task));
516+
}
517+
518+
void Drain(const bool onWakeup, const TActorContext& ctx) {
519+
if (onWakeup) {
520+
WriteTasksOverloadCheckerScheduled = false;
521+
}
522+
while (WriteTasks.size() && WriteTasks.front().Execute(Owner, ctx)) {
523+
WriteTasks.pop_front();
524+
}
525+
if (!WriteTasks.size() && !WriteTasksOverloadCheckerScheduled) {
526+
Owner->Schedule(TDuration::MilliSeconds(100), new NActors::TEvents::TEvWakeup(1));
527+
WriteTasksOverloadCheckerScheduled = true;
528+
}
529+
Owner->Counters.GetCSCounters().WritingCounters->QueueWaitSize->Add((i64)WriteTasks.size() - PredWriteTasksSize);
530+
PredWriteTasksSize = (i64)WriteTasks.size();
531+
}
532+
};
533+
464534
private:
465535
std::unique_ptr<TTabletCountersBase> TabletCountersHolder;
466536
TCountersManager Counters;
537+
TWriteTasksQueue WriteTasksQueue;
467538

468539
std::unique_ptr<TTxController> ProgressTxController;
469540
std::unique_ptr<TOperationsManager> OperationsManager;

ydb/core/tx/columnshard/counters/columnshard.h

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
#pragma once
2-
#include "common/owner.h"
32
#include "initialization.h"
43
#include "tx_progress.h"
54

5+
#include "common/owner.h"
6+
67
#include <ydb/core/tx/columnshard/counters/tablet_counters.h>
78

89
#include <library/cpp/monlib/dynamic_counters/counters.h>
@@ -26,14 +27,23 @@ class TWriteCounters: public TCommonCountersOwner {
2627
NMonitoring::TDynamicCounters::TCounterPtr VolumeWriteData;
2728
NMonitoring::THistogramPtr HistogramBytesWriteDataCount;
2829
NMonitoring::THistogramPtr HistogramBytesWriteDataBytes;
30+
NMonitoring::THistogramPtr HistogramDurationQueueWait;
2931

3032
public:
33+
const NMonitoring::TDynamicCounters::TCounterPtr QueueWaitSize;
34+
35+
void OnWritingTaskDequeue(const TDuration d){
36+
HistogramDurationQueueWait->Collect(d.MilliSeconds());
37+
}
38+
3139
TWriteCounters(TCommonCountersOwner& owner)
3240
: TBase(owner, "activity", "writing")
41+
, QueueWaitSize(TBase::GetValue("Write/Queue/Size"))
3342
{
3443
VolumeWriteData = TBase::GetDeriviative("Write/Incoming/Bytes");
3544
HistogramBytesWriteDataCount = TBase::GetHistogram("Write/Incoming/ByBytes/Count", NMonitoring::ExponentialHistogram(18, 2, 100));
3645
HistogramBytesWriteDataBytes = TBase::GetHistogram("Write/Incoming/ByBytes/Bytes", NMonitoring::ExponentialHistogram(18, 2, 100));
46+
HistogramDurationQueueWait = TBase::GetHistogram("Write/Queue/Waiting/DurationMs", NMonitoring::ExponentialHistogram(18, 2, 100));
3747
}
3848

3949
void OnIncomingData(const ui64 dataSize) const {
@@ -231,4 +241,4 @@ class TCSCounters: public TCommonCountersOwner {
231241
TCSCounters();
232242
};
233243

234-
}
244+
} // namespace NKikimr::NColumnShard

0 commit comments

Comments
 (0)