Skip to content

switch on normalizer state #5138

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ydb/core/kqp/ut/olap/indexes_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) {
CompareYson(result, R"([[20000u;]])");
}

AFL_VERIFY(updatesCount + 3 /*tablets count*/ * 3 /*2 * normalizers + main_load*/ ==
AFL_VERIFY(updatesCount + 3 /*tablets count*/ * 1 /*normalizers*/ ==
(ui64)csController->GetActualizationRefreshSchemeCount().Val())("updates", updatesCount)("count", csController->GetActualizationRefreshSchemeCount().Val());
}

Expand Down
6 changes: 4 additions & 2 deletions ydb/core/tx/columnshard/columnshard__init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -387,8 +387,10 @@ bool TTxInitSchema::Execute(TTransactionContext& txc, const TActorContext&) {
}
}

// NIceDb::TNiceDb db(txc.DB);
// Self->NormalizerController.InitControllerState(db);
if (!NYDBTest::TControllers::GetColumnShardController()->BuildLocalBaseModifier()) {
NIceDb::TNiceDb db(txc.DB);
Self->NormalizerController.InitControllerState(db);
}

// Enable compression for the SmallBlobs table
const auto* smallBlobsDefaultColumnFamily = txc.DB.GetScheme().DefaultFamilyFor(Schema::SmallBlobs::TableId);
Expand Down
7 changes: 7 additions & 0 deletions ydb/core/tx/columnshard/data_locks/manager/manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,11 @@ void TManager::TGuard::Release(TManager& manager) {
Released = true;
}

void TManager::TGuard::AbortLock() {
if (!Released) {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("message", "aborted data locks manager");
}
Released = true;
}

}
3 changes: 3 additions & 0 deletions ydb/core/tx/columnshard/data_locks/manager/manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ class TManager {
{

}

void AbortLock();

~TGuard();

void Release(TManager& manager);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ void TColumnEngineChanges::WriteIndexOnComplete(NColumnShard::TColumnShard* self

void TColumnEngineChanges::Compile(TFinalizationContext& context) noexcept {
AFL_VERIFY(Stage != EStage::Aborted);
AFL_VERIFY(Stage == EStage::Constructed);
AFL_VERIFY(Stage == EStage::Constructed)("real", Stage);

DoCompile(context);
DoOnAfterCompile();
Expand Down Expand Up @@ -100,6 +100,9 @@ void TColumnEngineChanges::AbortEmergency(const TString& reason) {
} else {
Stage = EStage::Aborted;
AbortedReason = reason;
if (!!LockGuard) {
LockGuard->AbortLock();
}
OnAbortEmergency();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,6 @@ class TColumnEngineChanges {
virtual void DoStart(NColumnShard::TColumnShard& self) = 0;
virtual TConclusionStatus DoConstructBlobs(TConstructionContext& context) noexcept = 0;
virtual void OnAbortEmergency() {

}

TBlobsAction BlobsAction;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ bool TTieringProcessContext::AddPortion(const TPortionInfo& info, TPortionEvicti
std::vector<TTaskConstructor> tasks = {buildNewTask()};
it = Tasks.emplace(features.GetRWAddress(), std::move(tasks)).first;
}
if (it->second.back().GetTxWriteVolume() + info.GetTxVolume() > TGlobalLimits::TxWriteLimitBytes && it->second.back().GetTxWriteVolume()) {
if (it->second.back().GetTxWriteVolume() + info.GetTxVolume() > TGlobalLimits::TxWriteLimitBytes / 2 && it->second.back().GetTxWriteVolume()) {
if (Controller->IsNewTaskAvailable(it->first, it->second.size())) {
it->second.emplace_back(buildNewTask());
} else {
Expand Down
5 changes: 5 additions & 0 deletions ydb/core/tx/columnshard/normalizer/abstract/abstract.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,13 @@ namespace NKikimr::NOlap {
if (NColumnShard::Schema::GetSpecialValue(db, NColumnShard::Schema::EValueIds::LastNormalizerSequentialId, lastNormalizerId)) {
// We want to rerun all normalizers in case of binary rollback
if (lastNormalizerId <= GetLastNormalizerSequentialId()) {
AFL_NOTICE(NKikimrServices::TX_COLUMNSHARD)("last_normalizer_id", lastNormalizerId)("event", "restored");
LastAppliedNormalizerId = lastNormalizerId;
} else {
AFL_NOTICE(NKikimrServices::TX_COLUMNSHARD)("last_normalizer_id", LastAppliedNormalizerId)("event", "not_restored");
}
} else {
AFL_NOTICE(NKikimrServices::TX_COLUMNSHARD)("last_normalizer_id", LastAppliedNormalizerId)("event", "have not info");
}
}

Expand Down
5 changes: 4 additions & 1 deletion ydb/core/tx/columnshard/normalizer/abstract/abstract.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,12 +146,15 @@ namespace NKikimr::NOlap {
}

ui32 GetSequentialId() const {
return (ui32) GetType();
return (ui32)GetType();
}

TConclusion<std::vector<INormalizerTask::TPtr>> Init(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) {
if (controller.HasLastAppliedNormalizerId() && controller.GetLastAppliedNormalizerIdUnsafe() >= GetSequentialId()) {
AFL_NOTICE(NKikimrServices::TX_COLUMNSHARD)("event", "normalization_skipped")("last", controller.GetLastAppliedNormalizerIdUnsafe())("seq_id", GetSequentialId())("type", GetType());
return std::vector<INormalizerTask::TPtr>();
} else {
AFL_NOTICE(NKikimrServices::TX_COLUMNSHARD)("event", "normalization_init")("last", controller.GetLastAppliedNormalizerIdOptional().value_or(9999999))("seq_id", GetSequentialId())("type", GetType());
}
auto result = DoInit(controller, txc);
if (!result.IsSuccess()) {
Expand Down
6 changes: 6 additions & 0 deletions ydb/core/tx/columnshard/transactions/tx_controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,9 @@ class TTxController {
void SetProposeStartInfo(const TTxController::TProposeResult& info) {
AFL_VERIFY(!ProposeStartInfo);
ProposeStartInfo = info;
if (IsFail()) {
Status = EStatus::Failed;
}
}

const TTxInfo& GetTxInfo() const {
Expand Down Expand Up @@ -290,6 +293,7 @@ class TTxController {

bool StartProposeOnExecute(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc) {
AFL_VERIFY(!ProposeStartInfo);
AFL_VERIFY(!IsFail());
ProposeStartInfo = DoStartProposeOnExecute(owner, txc);
if (ProposeStartInfo->IsFail()) {
SwitchStateVerified(EStatus::Parsed, EStatus::Failed);
Expand All @@ -299,11 +303,13 @@ class TTxController {
return !GetProposeStartInfoVerified().IsFail();
}
void StartProposeOnComplete(TColumnShard& owner, const TActorContext& ctx) {
AFL_VERIFY(!IsFail());
SwitchStateVerified(EStatus::ProposeStartedOnExecute, EStatus::ProposeStartedOnComplete);
AFL_VERIFY(IsAsync());
return DoStartProposeOnComplete(owner, ctx);
}
void FinishProposeOnExecute(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc) {
AFL_VERIFY(!IsFail());
SwitchStateVerified(EStatus::ProposeStartedOnComplete, EStatus::ProposeFinishedOnExecute);
AFL_VERIFY(IsAsync());
return DoFinishProposeOnExecute(owner, txc);
Expand Down
Loading