Skip to content

Commit a8ab3a2

Browse files
correct synchronization after tablet reboot (#12296)
1 parent e1bc51a commit a8ab3a2

File tree

17 files changed

+115
-21
lines changed

17 files changed

+115
-21
lines changed

ydb/core/kqp/ut/olap/indexes_ut.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -301,7 +301,7 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) {
301301
CompareYson(result, R"([[20000u;]])");
302302
}
303303

304-
AFL_VERIFY(updatesCount + 3 /*tablets count*/ * 1 /*normalizers*/ ==
304+
AFL_VERIFY(updatesCount + 6 ==
305305
(ui64)csController->GetActualizationRefreshSchemeCount().Val())(
306306
"updates", updatesCount)("count",
307307
csController->GetActualizationRefreshSchemeCount().Val());

ydb/core/tx/columnshard/engines/changes/actualization/construction/context.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,16 @@ class TTieringProcessContext {
6969
return Tasks;
7070
}
7171

72+
TString DebugString() const {
73+
TStringBuilder result;
74+
result << "{";
75+
for (auto&& i : Tasks) {
76+
result << i.first.DebugString() << ":" << i.second.size() << ";";
77+
}
78+
result << "}";
79+
return result;
80+
}
81+
7282
bool AddPortion(const std::shared_ptr<const TPortionInfo>& info, TPortionEvictionFeatures&& features, const std::optional<TDuration> dWait);
7383

7484
bool IsRWAddressAvailable(const TRWAddress& address) const {

ydb/core/tx/columnshard/engines/changes/actualization/controller/controller.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,14 @@ class TController {
1111

1212
public:
1313
void StartActualization(const NActualizer::TRWAddress& address) {
14+
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_ACTUALIZATION)("event", "actualization_start")("count", ActualizationsInProgress[address])(
15+
"limit", GetLimitForAddress(address))("rw", address.DebugString());
1416
AFL_VERIFY(++ActualizationsInProgress[address] <= (i32)GetLimitForAddress(address));
1517
}
1618

1719
void FinishActualization(const NActualizer::TRWAddress& address) {
20+
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_ACTUALIZATION)("event", "actualization_finished")("count", ActualizationsInProgress[address])(
21+
"limit", GetLimitForAddress(address))("rw", address.DebugString());
1822
AFL_VERIFY(--ActualizationsInProgress[address] >= 0);
1923
}
2024

ydb/core/tx/columnshard/engines/changes/cleanup_portions.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@
33

44
namespace NKikimr::NOlap {
55

6-
class TCleanupPortionsColumnEngineChanges: public TColumnEngineChanges {
6+
class TCleanupPortionsColumnEngineChanges: public TColumnEngineChanges,
7+
public NColumnShard::TMonitoringObjectsCounter<TCleanupPortionsColumnEngineChanges> {
78
private:
89
using TBase = TColumnEngineChanges;
910
THashMap<TString, std::vector<std::shared_ptr<TPortionInfo>>> StoragePortions;

ydb/core/tx/columnshard/engines/changes/cleanup_tables.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@
33

44
namespace NKikimr::NOlap {
55

6-
class TCleanupTablesColumnEngineChanges: public TColumnEngineChanges {
6+
class TCleanupTablesColumnEngineChanges: public TColumnEngineChanges,
7+
public NColumnShard::TMonitoringObjectsCounter<TCleanupTablesColumnEngineChanges> {
78
private:
89
using TBase = TColumnEngineChanges;
910
protected:

ydb/core/tx/columnshard/engines/changes/general_compaction.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@
77

88
namespace NKikimr::NOlap::NCompaction {
99

10-
class TGeneralCompactColumnEngineChanges: public TCompactColumnEngineChanges {
10+
class TGeneralCompactColumnEngineChanges: public TCompactColumnEngineChanges,
11+
public NColumnShard::TMonitoringObjectsCounter<TGeneralCompactColumnEngineChanges> {
1112
private:
1213
YDB_ACCESSOR(ui64, PortionExpectedSize, 1.5 * (1 << 20));
1314
using TBase = TCompactColumnEngineChanges;

ydb/core/tx/columnshard/engines/changes/indexation.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111

1212
namespace NKikimr::NOlap {
1313

14-
class TInsertColumnEngineChanges: public TChangesWithAppend {
14+
class TInsertColumnEngineChanges: public TChangesWithAppend, public NColumnShard::TMonitoringObjectsCounter<TInsertColumnEngineChanges> {
1515
private:
1616
using TBase = TChangesWithAppend;
1717
std::vector<TCommittedData> DataToIndex;

ydb/core/tx/columnshard/engines/changes/ttl.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
namespace NKikimr::NOlap {
99

10-
class TTTLColumnEngineChanges: public TChangesWithAppend {
10+
class TTTLColumnEngineChanges: public TChangesWithAppend, public NColumnShard::TMonitoringObjectsCounter<TTTLColumnEngineChanges> {
1111
private:
1212
using TBase = TChangesWithAppend;
1313

ydb/core/tx/columnshard/engines/column_engine_logs.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -408,7 +408,7 @@ std::shared_ptr<TCleanupPortionsColumnEngineChanges> TColumnEngineForLogs::Start
408408
std::vector<std::shared_ptr<TTTLColumnEngineChanges>> TColumnEngineForLogs::StartTtl(const THashMap<ui64, TTiering>& pathEviction,
409409
const std::shared_ptr<NDataLocks::TManager>& dataLocksManager, const ui64 memoryUsageLimit) noexcept {
410410
AFL_VERIFY(dataLocksManager);
411-
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "StartTtl")("external", pathEviction.size());
411+
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_ACTUALIZATION)("event", "StartTtl")("external", pathEviction.size());
412412

413413
TSaverContext saverContext(StoragesManager);
414414
NActualizer::TTieringProcessContext context(memoryUsageLimit, saverContext, dataLocksManager, VersionedIndex, SignalCounters, ActualizationController);
@@ -434,10 +434,12 @@ std::vector<std::shared_ptr<TTTLColumnEngineChanges>> TColumnEngineForLogs::Star
434434
i.second->BuildActualizationTasks(context, actualizationLag);
435435
}
436436
} else {
437-
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "StartTtl")("skip", "not_ready_tiers");
437+
AFL_WARN(NKikimrServices::TX_COLUMNSHARD_ACTUALIZATION)("event", "StartTtl")("skip", "not_ready_tiers");
438438
}
439439
std::vector<std::shared_ptr<TTTLColumnEngineChanges>> result;
440+
AFL_WARN(NKikimrServices::TX_COLUMNSHARD_ACTUALIZATION)("event", "StartTtl")("rw_tasks_count", context.GetTasks().size());
440441
for (auto&& i : context.GetTasks()) {
442+
AFL_WARN(NKikimrServices::TX_COLUMNSHARD_ACTUALIZATION)("event", "StartTtl")("rw", i.first.DebugString())("count", i.second.size());
441443
for (auto&& t : i.second) {
442444
SignalCounters.OnActualizationTask(t.GetTask()->GetPortionsToEvictCount(), t.GetTask()->GetPortionsToRemoveSize());
443445
result.emplace_back(t.GetTask());

ydb/core/tx/columnshard/engines/scheme/column/info.cpp

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -64,21 +64,22 @@ std::vector<std::shared_ptr<NKikimr::NOlap::IPortionDataChunk>> TSimpleColumnInf
6464
AFL_VERIFY(Loader);
6565
const auto checkNeedActualize = [&]() {
6666
if (!Serializer.IsEqualTo(sourceColumnFeatures.Serializer)) {
67-
AFL_NOTICE(NKikimrServices::TX_COLUMNSHARD)("event", "actualization")("reason", "serializer")
67+
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_ACTUALIZATION)("event", "actualization")("reason", "serializer")
6868
("from", sourceColumnFeatures.Serializer.SerializeToProto().DebugString())
6969
("to", Serializer.SerializeToProto().DebugString());
7070
return true;
7171
}
7272
if (!Loader->IsEqualTo(*sourceColumnFeatures.Loader)) {
73-
AFL_NOTICE(NKikimrServices::TX_COLUMNSHARD)("event", "actualization")("reason", "loader");
73+
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_ACTUALIZATION)("event", "actualization")("reason", "loader");
7474
return true;
7575
}
7676
if (!!DictionaryEncoding != !!sourceColumnFeatures.DictionaryEncoding) {
77-
AFL_NOTICE(NKikimrServices::TX_COLUMNSHARD)("event", "actualization")("reason", "dictionary")("from", !!sourceColumnFeatures.DictionaryEncoding)("to", !!DictionaryEncoding);
77+
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_ACTUALIZATION)("event", "actualization")("reason", "dictionary")(
78+
"from", !!sourceColumnFeatures.DictionaryEncoding)("to", !!DictionaryEncoding);
7879
return true;
7980
}
8081
if (!!DictionaryEncoding && !DictionaryEncoding->IsEqualTo(*sourceColumnFeatures.DictionaryEncoding)) {
81-
AFL_NOTICE(NKikimrServices::TX_COLUMNSHARD)("event", "actualization")("reason", "dictionary_encoding")
82+
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_ACTUALIZATION)("event", "actualization")("reason", "dictionary_encoding")
8283
("from", sourceColumnFeatures.DictionaryEncoding->SerializeToProto().DebugString())
8384
("to", DictionaryEncoding->SerializeToProto().DebugString())
8485
;

0 commit comments

Comments
 (0)