Skip to content

signals for tablet initialization #7281

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions ydb/core/tx/columnshard/columnshard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,13 @@ void TColumnShard::SwitchToWork(const TActorContext& ctx) {
BackgroundSessionsManager->Start();
ctx.Send(SelfId(), new TEvPrivate::TEvPeriodicWakeup());
NYDBTest::TControllers::GetColumnShardController()->OnSwitchToWork(TabletID());
AFL_VERIFY(!!StartInstant);
CSCounters.Initialization.OnSwitchToWork(TMonotonic::Now() - *StartInstant, TMonotonic::Now() - CreateInstant);
}

void TColumnShard::OnActivateExecutor(const TActorContext& ctx) {
StartInstant = TMonotonic::Now();
CSCounters.Initialization.OnActivateExecutor(TMonotonic::Now() - CreateInstant);
const TLogContextGuard gLogging = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", TabletID())("self_id", SelfId());
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "initialize_shard")("step", "OnActivateExecutor");
Executor()->RegisterExternalTabletCounters(TabletCountersPtr.release());
Expand Down
11 changes: 11 additions & 0 deletions ydb/core/tx/columnshard/columnshard__init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ namespace NKikimr::NColumnShard {
using namespace NTabletFlatExecutor;

class TTxInit : public TTransactionBase<TColumnShard> {
private:
const TMonotonic StartInstant = TMonotonic::Now();

public:
TTxInit(TColumnShard* self)
: TBase(self)
Expand Down Expand Up @@ -250,13 +253,16 @@ bool TTxInit::Execute(TTransactionContext& txc, const TActorContext& ctx) {
}

void TTxInit::Complete(const TActorContext& ctx) {
Self->CSCounters.Initialization.OnTxInitFinished(TMonotonic::Now() - StartInstant);
Self->ProgressTxController->OnTabletInit();
Self->SwitchToWork(ctx);
NYDBTest::TControllers::GetColumnShardController()->OnTabletInitCompleted(*Self);
}

class TTxUpdateSchema : public TTransactionBase<TColumnShard> {
std::vector<NOlap::INormalizerTask::TPtr> NormalizerTasks;
const TMonotonic StartInstant = TMonotonic::Now();

public:
TTxUpdateSchema(TColumnShard* self)
: TBase(self)
Expand Down Expand Up @@ -295,6 +301,7 @@ bool TTxUpdateSchema::Execute(TTransactionContext& txc, const TActorContext&) {

void TTxUpdateSchema::Complete(const TActorContext& ctx) {
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("step", "TTxUpdateSchema.Complete");
Self->CSCounters.Initialization.OnTxUpdateSchemaFinished(TMonotonic::Now() - StartInstant);
if (NormalizerTasks.empty()) {
AFL_VERIFY(Self->NormalizerController.IsNormalizationFinished())("details", Self->NormalizerController.DebugString());
Self->Execute(new TTxInit(Self), ctx);
Expand Down Expand Up @@ -360,6 +367,9 @@ void TTxApplyNormalizer::Complete(const TActorContext& ctx) {

/// Create local database on tablet start if none
class TTxInitSchema : public TTransactionBase<TColumnShard> {
private:
const TMonotonic StartInstant = TMonotonic::Now();

public:
TTxInitSchema(TColumnShard* self)
: TBase(self)
Expand Down Expand Up @@ -422,6 +432,7 @@ bool TTxInitSchema::Execute(TTransactionContext& txc, const TActorContext&) {
}

void TTxInitSchema::Complete(const TActorContext& ctx) {
Self->CSCounters.Initialization.OnTxInitSchemaFinished(TMonotonic::Now() - StartInstant);
LOG_S_DEBUG("TxInitSchema.Complete at tablet " << Self->TabletID(););
Self->Execute(new TTxUpdateSchema(Self), ctx);
}
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/tx/columnshard/columnshard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,9 @@ class TColumnShard
using TSchemaPreset = TSchemaPreset;
using TTableInfo = TTableInfo;

const TMonotonic CreateInstant = TMonotonic::Now();
std::optional<TMonotonic> StartInstant;

struct TLongTxWriteInfo {
ui64 WriteId;
ui32 WritePartId;
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/counters/columnshard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ namespace NKikimr::NColumnShard {

TCSCounters::TCSCounters()
: TBase("CS")
{
, Initialization(*this) {
StartBackgroundCount = TBase::GetDeriviative("StartBackground/Count");
TooEarlyBackgroundCount = TBase::GetDeriviative("TooEarlyBackground/Count");
SetupCompactionCount = TBase::GetDeriviative("SetupCompaction/Count");
Expand Down
53 changes: 53 additions & 0 deletions ydb/core/tx/columnshard/counters/columnshard.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,55 @@ enum class EWriteFailReason {
Overload /* "overload" */
};

class TCSInitialization: public TCommonCountersOwner {
private:
using TBase = TCommonCountersOwner;

const NMonitoring::THistogramPtr HistogramTabletInitializationMs;
const NMonitoring::THistogramPtr HistogramTxInitDurationMs;
const NMonitoring::THistogramPtr HistogramTxUpdateSchemaDurationMs;
const NMonitoring::THistogramPtr HistogramTxInitSchemaDurationMs;
const NMonitoring::THistogramPtr HistogramActivateExecutorFromActivationDurationMs;
const NMonitoring::THistogramPtr HistogramSwitchToWorkFromActivationDurationMs;
const NMonitoring::THistogramPtr HistogramSwitchToWorkFromCreateDurationMs;

public:

void OnTxInitFinished(const TDuration d) const {
HistogramTxInitDurationMs->Collect(d.MilliSeconds());
}

void OnTxUpdateSchemaFinished(const TDuration d) const {
HistogramTxUpdateSchemaDurationMs->Collect(d.MilliSeconds());
}

void OnTxInitSchemaFinished(const TDuration d) const {
HistogramTxInitSchemaDurationMs->Collect(d.MilliSeconds());
}

void OnActivateExecutor(const TDuration fromCreate) const {
HistogramActivateExecutorFromActivationDurationMs->Collect(fromCreate.MilliSeconds());
}
void OnSwitchToWork(const TDuration fromStart, const TDuration fromCreate) const {
HistogramSwitchToWorkFromActivationDurationMs->Collect(fromStart.MilliSeconds());
HistogramSwitchToWorkFromCreateDurationMs->Collect(fromCreate.MilliSeconds());
}

TCSInitialization(TCommonCountersOwner& owner)
: TBase(owner, "stage", "initialization")
, HistogramTabletInitializationMs(TBase::GetHistogram("TabletInitializationMs", NMonitoring::ExponentialHistogram(15, 2, 32)))
, HistogramTxInitDurationMs(TBase::GetHistogram("TxInitDurationMs", NMonitoring::ExponentialHistogram(15, 2, 32)))
, HistogramTxUpdateSchemaDurationMs(TBase::GetHistogram("TxInitDurationMs", NMonitoring::ExponentialHistogram(15, 2, 32)))
, HistogramTxInitSchemaDurationMs(TBase::GetHistogram("TxInitSchemaDurationMs", NMonitoring::ExponentialHistogram(15, 2, 32)))
, HistogramActivateExecutorFromActivationDurationMs(
TBase::GetHistogram("ActivateExecutorFromActivationDurationMs", NMonitoring::ExponentialHistogram(15, 2, 32)))
, HistogramSwitchToWorkFromActivationDurationMs(
TBase::GetHistogram("SwitchToWorkFromActivationDurationMs", NMonitoring::ExponentialHistogram(15, 2, 32)))
, HistogramSwitchToWorkFromCreateDurationMs(
TBase::GetHistogram("SwitchToWorkFromCreateDurationMs", NMonitoring::ExponentialHistogram(15, 2, 32))) {
}
};

class TCSCounters: public TCommonCountersOwner {
private:
using TBase = TCommonCountersOwner;
Expand Down Expand Up @@ -62,11 +111,15 @@ class TCSCounters: public TCommonCountersOwner {
NMonitoring::THistogramPtr HistogramSuccessWriteMiddle6PutBlobsDurationMs;
NMonitoring::THistogramPtr HistogramFailedWritePutBlobsDurationMs;
NMonitoring::THistogramPtr HistogramWriteTxCompleteDurationMs;

NMonitoring::TDynamicCounters::TCounterPtr WritePutBlobsCount;
NMonitoring::TDynamicCounters::TCounterPtr WriteRequests;
THashMap<EWriteFailReason, NMonitoring::TDynamicCounters::TCounterPtr> FailedWriteRequests;
NMonitoring::TDynamicCounters::TCounterPtr SuccessWriteRequests;

public:
const TCSInitialization Initialization;

void OnStartWriteRequest() const {
WriteRequests->Add(1);
}
Expand Down
1 change: 0 additions & 1 deletion ydb/core/tx/columnshard/engines/predicate/range.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ bool TPKRangeFilter::IsPortionInUsage(const TPortionInfo& info) const {
return false;
}
}

// AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("start", info.IndexKeyStart().DebugString())("end", info.IndexKeyEnd().DebugString())(
// "from", PredicateFrom.DebugString())("to", PredicateTo.DebugString());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,8 @@ TConclusion<bool> TDeletionFilter::DoExecuteInplace(const std::shared_ptr<IDataS

TConclusion<bool> TShardingFilter::DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& /*step*/) const {
NYDBTest::TControllers::GetColumnShardController()->OnSelectShardingFilter();
auto filter = source->GetContext()->GetReadMetadata()->GetRequestShardingInfo()->GetShardingInfo()->GetFilter(
source->GetStageData().GetTable()->BuildTableVerified());
const auto& shardingInfo = source->GetContext()->GetReadMetadata()->GetRequestShardingInfo()->GetShardingInfo();
auto filter = shardingInfo->GetFilter(source->GetStageData().GetTable()->BuildTableVerified());
source->MutableStageData().AddFilter(filter);
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,23 +217,17 @@ TConclusionStatus TScanHead::DetectSourcesFeatureInContextIntervalScan(const THa
}
const ui64 startMemory = optimizer.GetMemorySum();
if (!optimizer.Optimize(Context->ReduceMemoryIntervalLimit) && Context->RejectMemoryIntervalLimit < optimizer.GetMemorySum()) {
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "next_internal_broken")
("reason", "a lot of memory need")("start", startMemory)
("reduce_limit", Context->ReduceMemoryIntervalLimit)
("reject_limit", Context->RejectMemoryIntervalLimit)
("need", optimizer.GetMemorySum())
("path_ids", JoinSeq(",", optimizer.GetPathIds()))
("details", IS_LOG_PRIORITY_ENABLED(NActors::NLog::PRI_DEBUG, NKikimrServices::TX_COLUMNSHARD_SCAN) ? optimizer.DebugString() : "NEED_DEBUG_LEVEL");
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "next_internal_broken")("reason", "a lot of memory need")("start", startMemory)(
"reduce_limit", Context->ReduceMemoryIntervalLimit)("reject_limit", Context->RejectMemoryIntervalLimit)(
"need", optimizer.GetMemorySum())("path_ids", JoinSeq(",", optimizer.GetPathIds()))(
"details", IS_LOG_PRIORITY_ENABLED(NActors::NLog::PRI_DEBUG, NKikimrServices::TX_COLUMNSHARD_SCAN) ? optimizer.DebugString() : "NEED_DEBUG_LEVEL");
Context->GetCommonContext()->GetCounters().OnOptimizedIntervalMemoryFailed(optimizer.GetMemorySum());
return TConclusionStatus::Fail("We need a lot of memory in time for interval scanner: " +
::ToString(optimizer.GetMemorySum()) + " path_ids: " + JoinSeq(",", optimizer.GetPathIds()) + ". We need wait compaction processing. Sorry.");
} else if (optimizer.GetMemorySum() < startMemory) {
AFL_INFO(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "memory_reduce_active")
("reason", "need reduce memory")("start", startMemory)
("reduce_limit", Context->ReduceMemoryIntervalLimit)
("reject_limit", Context->RejectMemoryIntervalLimit)
("need", optimizer.GetMemorySum())
("path_ids", JoinSeq(",", optimizer.GetPathIds()));
AFL_INFO(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "memory_reduce_active")("reason", "need reduce memory")("start", startMemory)(
"reduce_limit", Context->ReduceMemoryIntervalLimit)("reject_limit", Context->RejectMemoryIntervalLimit)(
"need", optimizer.GetMemorySum())("path_ids", JoinSeq(",", optimizer.GetPathIds()));
Context->GetCommonContext()->GetCounters().OnOptimizedIntervalMemoryReduced(startMemory - optimizer.GetMemorySum());
}
Context->GetCommonContext()->GetCounters().OnOptimizedIntervalMemoryRequired(optimizer.GetMemorySum());
Expand Down
Loading