Skip to content

Commit 8f51162

Browse files
authored
Fix publishing base statistics that was not properly persisted (#24485)
1 parent ed61ee9 commit 8f51162

File tree

8 files changed

+190
-84
lines changed

8 files changed

+190
-84
lines changed

ydb/core/statistics/aggregator/aggregator_impl.cpp

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -416,9 +416,10 @@ size_t TStatisticsAggregator::PropagatePart(const std::vector<TNodeId>& nodeIds,
416416
auto* entry = record->AddEntries();
417417
entry->SetSchemeShardId(ssId);
418418
auto itStats = BaseStatistics.find(ssId);
419-
if (itStats != BaseStatistics.end()) {
420-
entry->SetStats(itStats->second);
421-
size += itStats->second.size();
419+
if (itStats != BaseStatistics.end() && itStats->second.Committed) {
420+
const auto& stats = *itStats->second.Committed;
421+
entry->SetStats(stats);
422+
size += stats.size();
422423
} else {
423424
entry->SetStats(TString()); // stats are not sent from SS yet
424425
}
@@ -1081,8 +1082,11 @@ bool TStatisticsAggregator::OnRenderAppHtmlPage(NMon::TEvRemoteHttpInfo::TPtr ev
10811082
ui64 totalRowCount = 0;
10821083
ui64 totalBytesSize = 0;
10831084
for (const auto& [_, serializedStats] : BaseStatistics) {
1085+
if (!serializedStats.Committed) {
1086+
continue;
1087+
}
10841088
NKikimrStat::TSchemeShardStats stats;
1085-
Y_PROTOBUF_SUPPRESS_NODISCARD stats.ParseFromString(serializedStats);
1089+
Y_PROTOBUF_SUPPRESS_NODISCARD stats.ParseFromString(*serializedStats.Committed);
10861090
for (const auto& entry: stats.GetEntries()) {
10871091
totalRowCount += entry.GetRowCount();
10881092
totalBytesSize += entry.GetBytesSize();

ydb/core/statistics/aggregator/aggregator_impl.h

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,12 @@ class TStatisticsAggregator : public TActor<TStatisticsAggregator>, public NTabl
248248

249249
static constexpr TDuration FastCheckInterval = TDuration::MilliSeconds(50);
250250

251-
std::unordered_map<TSSId, TString> BaseStatistics; // schemeshard id -> serialized stats for all paths
251+
// Serialized stats for all paths from a single SchemeShard.
252+
struct TSerializedBaseStats {
253+
std::shared_ptr<TString> Committed; // Value that is safely persisted in local DB. Can be nullptr.
254+
std::shared_ptr<TString> Latest; // Value from the latest update.
255+
};
256+
std::unordered_map<TSSId, TSerializedBaseStats> BaseStatistics;
252257

253258
std::unordered_map<TSSId, size_t> SchemeShards; // all connected schemeshards
254259
std::unordered_map<TActorId, TSSId> SchemeShardPipes; // schemeshard pipe servers

ydb/core/statistics/aggregator/tx_init.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,8 +106,9 @@ struct TStatisticsAggregator::TTxInit : public TTxBase {
106106
while (!rowset.EndOfSet()) {
107107
ui64 schemeShardId = rowset.GetValue<Schema::BaseStatistics::SchemeShardId>();
108108
TString stats = rowset.GetValue<Schema::BaseStatistics::Stats>();
109-
110-
Self->BaseStatistics[schemeShardId] = stats;
109+
auto& schemeShardStats = Self->BaseStatistics[schemeShardId];
110+
schemeShardStats.Committed = std::make_shared<TString>(std::move(stats));
111+
schemeShardStats.Latest = schemeShardStats.Committed;
111112

112113
if (!rowset.Next()) {
113114
return false;

ydb/core/statistics/aggregator/tx_schemeshard_stats.cpp

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ namespace NKikimr::NStat {
44

55
struct TStatisticsAggregator::TTxSchemeShardStats : public TTxBase {
66
NKikimrStat::TEvSchemeShardStats Record;
7+
std::shared_ptr<TString> UpdatedStats;
78

89
TTxSchemeShardStats(TSelf* self, NKikimrStat::TEvSchemeShardStats&& record)
910
: TTxBase(self)
@@ -22,24 +23,22 @@ struct TStatisticsAggregator::TTxSchemeShardStats : public TTxBase {
2223

2324
NIceDb::TNiceDb db(txc.DB);
2425

26+
TSerializedBaseStats& existingStats = Self->BaseStatistics[schemeShardId];
27+
2528
NKikimrStat::TSchemeShardStats statRecord;
2629
Y_PROTOBUF_SUPPRESS_NODISCARD statRecord.ParseFromString(stats);
2730

2831
// if statistics is sent from schemeshard for the first time or
2932
// AreAllStatsFull field is not set (schemeshard is working on previous code version) or
3033
// statistics is full for all tables
3134
// then persist incoming statistics without changes
32-
if (!Self->BaseStatistics.contains(schemeShardId) ||
35+
if (!existingStats.Latest ||
3336
!statRecord.HasAreAllStatsFull() || statRecord.GetAreAllStatsFull())
3437
{
35-
db.Table<Schema::BaseStatistics>().Key(schemeShardId).Update(
36-
NIceDb::TUpdate<Schema::BaseStatistics::Stats>(stats));
37-
Self->BaseStatistics[schemeShardId] = stats;
38-
38+
UpdatedStats = std::make_shared<TString>(stats);
3939
} else {
4040
NKikimrStat::TSchemeShardStats oldStatRecord;
41-
const auto& oldStats = Self->BaseStatistics[schemeShardId];
42-
Y_PROTOBUF_SUPPRESS_NODISCARD oldStatRecord.ParseFromString(oldStats);
41+
Y_PROTOBUF_SUPPRESS_NODISCARD oldStatRecord.ParseFromString(*existingStats.Latest);
4342

4443
struct TOldStats {
4544
ui64 RowCount = 0;
@@ -75,14 +74,14 @@ struct TStatisticsAggregator::TTxSchemeShardStats : public TTxBase {
7574
}
7675
}
7776

78-
TString newStats;
79-
Y_PROTOBUF_SUPPRESS_NODISCARD newStatRecord.SerializeToString(&newStats);
80-
81-
db.Table<Schema::BaseStatistics>().Key(schemeShardId).Update(
82-
NIceDb::TUpdate<Schema::BaseStatistics::Stats>(newStats));
83-
Self->BaseStatistics[schemeShardId] = newStats;
77+
UpdatedStats = std::make_shared<TString>();
78+
Y_PROTOBUF_SUPPRESS_NODISCARD newStatRecord.SerializeToString(UpdatedStats.get());
8479
}
8580

81+
db.Table<Schema::BaseStatistics>().Key(schemeShardId).Update(
82+
NIceDb::TUpdate<Schema::BaseStatistics::Stats>(*UpdatedStats));
83+
existingStats.Latest = UpdatedStats;
84+
8685
if (!Self->EnableColumnStatistics) {
8786
return true;
8887
}
@@ -130,6 +129,7 @@ struct TStatisticsAggregator::TTxSchemeShardStats : public TTxBase {
130129

131130
void Complete(const TActorContext&) override {
132131
SA_LOG_D("[" << Self->TabletID() << "] TTxSchemeShardStats::Complete");
132+
Self->BaseStatistics[Record.GetSchemeShardId()].Committed = UpdatedStats;
133133
Self->ReportBaseStatisticsCounters();
134134
}
135135
};

ydb/core/statistics/service/ut/ut_basic_statistics.cpp

Lines changed: 95 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -52,62 +52,6 @@ void CreateTableWithGlobalIndex(TTestEnv& env, const TString& databaseName, cons
5252
FillTable(env, databaseName, tableName, rowCount);
5353
}
5454

55-
void ValidateRowCount(TTestActorRuntime& runtime, ui32 nodeIndex, TPathId pathId, size_t expectedRowCount) {
56-
auto statServiceId = NStat::MakeStatServiceID(runtime.GetNodeId(nodeIndex));
57-
ui64 rowCount = 0;
58-
while (rowCount == 0) {
59-
NStat::TRequest req;
60-
req.PathId = pathId;
61-
62-
auto evGet = std::make_unique<TEvStatistics::TEvGetStatistics>();
63-
evGet->StatType = NStat::EStatType::SIMPLE;
64-
evGet->StatRequests.push_back(req);
65-
66-
auto sender = runtime.AllocateEdgeActor(nodeIndex);
67-
runtime.Send(statServiceId, sender, evGet.release(), nodeIndex, true);
68-
auto evResult = runtime.GrabEdgeEventRethrow<TEvStatistics::TEvGetStatisticsResult>(sender);
69-
70-
UNIT_ASSERT(evResult);
71-
UNIT_ASSERT(evResult->Get());
72-
UNIT_ASSERT(evResult->Get()->StatResponses.size() == 1);
73-
74-
auto rsp = evResult->Get()->StatResponses[0];
75-
auto stat = rsp.Simple;
76-
77-
rowCount = stat.RowCount;
78-
79-
if (rowCount != 0) {
80-
UNIT_ASSERT(stat.RowCount == expectedRowCount);
81-
break;
82-
}
83-
84-
runtime.SimulateSleep(TDuration::Seconds(1));
85-
}
86-
}
87-
88-
ui64 GetRowCount(TTestActorRuntime& runtime, ui32 nodeIndex, TPathId pathId) {
89-
auto statServiceId = NStat::MakeStatServiceID(runtime.GetNodeId(nodeIndex));
90-
NStat::TRequest req;
91-
req.PathId = pathId;
92-
93-
auto evGet = std::make_unique<TEvStatistics::TEvGetStatistics>();
94-
evGet->StatType = NStat::EStatType::SIMPLE;
95-
evGet->StatRequests.push_back(req);
96-
97-
auto sender = runtime.AllocateEdgeActor(nodeIndex);
98-
runtime.Send(statServiceId, sender, evGet.release(), nodeIndex, true);
99-
auto evResult = runtime.GrabEdgeEventRethrow<TEvStatistics::TEvGetStatisticsResult>(sender);
100-
101-
UNIT_ASSERT(evResult);
102-
UNIT_ASSERT(evResult->Get());
103-
UNIT_ASSERT(evResult->Get()->StatResponses.size() == 1);
104-
105-
auto rsp = evResult->Get()->StatResponses[0];
106-
auto stat = rsp.Simple;
107-
108-
return stat.RowCount;
109-
}
110-
11155
} // namespace
11256

11357
Y_UNIT_TEST_SUITE(BasicStatistics) {
@@ -375,6 +319,101 @@ Y_UNIT_TEST_SUITE(BasicStatistics) {
375319
UNIT_ASSERT_VALUES_EQUAL(sendCount, 2); // events from 2 serverless schemeshards
376320
UNIT_ASSERT_VALUES_EQUAL(propagateCount, 2); // SA -> node1 and node1 -> node2
377321
}
322+
323+
Y_UNIT_TEST(PersistenceWithStorageFailuresAndReboots) {
324+
TTestEnv env(1, 2);
325+
auto& runtime = *env.GetServer().GetRuntime();
326+
327+
const size_t rowCount1 = 5;
328+
329+
CreateDatabase(env, "Database", 2);
330+
CreateTable(env, "Database", "Table", rowCount1);
331+
332+
ui64 saTabletId = 0;
333+
auto pathId = ResolvePathId(runtime, "/Root/Database/Table", nullptr, &saTabletId);
334+
ui64 ssTabletId = pathId.OwnerId;
335+
336+
const ui32 nodeIdx = 1;
337+
const ui32 otherNodeIdx = 2;
338+
339+
// Block propagate events that go to node with otherNodeIdx. We will use this
340+
// node later as a clean slate.
341+
TBlockEvents<TEvStatistics::TEvPropagateStatistics> blockPropagate(runtime,
342+
[&](const TEvStatistics::TEvPropagateStatistics::TPtr& ev) {
343+
return ev->Recipient.NodeId() == runtime.GetNodeId(otherNodeIdx);
344+
});
345+
346+
// Wait until correct statistics gets reported
347+
ValidateRowCount(runtime, nodeIdx, pathId, rowCount1);
348+
349+
// Block persisting new updates from schemeshards on the aggregator.
350+
// This should result in old statistics being reported, even after new
351+
// updates arrive.
352+
TBlockEvents<TEvBlobStorage::TEvPut> blockPersistStats(runtime,
353+
[&](const TEvBlobStorage::TEvPut::TPtr& ev) {
354+
return ev->Get()->Id.TabletID() == saTabletId;
355+
});
356+
357+
// Upsert some more data
358+
const size_t rowCount2 = 7;
359+
FillTable(env, "Database", "Table", rowCount2);
360+
361+
{
362+
// Wait for an update from SchemeShard with new row count.
363+
364+
bool statsUpdateSent = false;
365+
auto sendObserver = runtime.AddObserver<TEvStatistics::TEvSchemeShardStats>([&](auto& ev){
366+
NKikimrStat::TSchemeShardStats statRecord;
367+
UNIT_ASSERT(statRecord.ParseFromString(ev->Get()->Record.GetStats()));
368+
for (const auto& entry : statRecord.GetEntries()) {
369+
if (TPathId::FromProto(entry.GetPathId()) == pathId
370+
&& entry.GetAreStatsFull()
371+
&& entry.GetRowCount() == rowCount2) {
372+
statsUpdateSent = true;
373+
}
374+
}
375+
});
376+
runtime.WaitFor("TEvSchemeShardStats", [&]{ return statsUpdateSent; });
377+
378+
bool propagateSent = false;
379+
auto propagateObserver = runtime.AddObserver<TEvStatistics::TEvPropagateStatistics>([&](auto& ev){
380+
if (ev->Recipient.NodeId() == runtime.GetNodeId(nodeIdx)) {
381+
propagateSent = true;
382+
}
383+
});
384+
runtime.WaitFor("TEvPropagateStatistics", [&]{ return propagateSent; });
385+
}
386+
UNIT_ASSERT_VALUES_EQUAL(GetRowCount(runtime, nodeIdx, pathId), rowCount1);
387+
388+
TActorId sender = runtime.AllocateEdgeActor();
389+
RebootTablet(runtime, ssTabletId, sender);
390+
391+
// Simulate storage failure, StatisticsAggregator will reboot.
392+
393+
TBlockEvents<TEvStatistics::TEvSchemeShardStats> blockSSUpdates(runtime);
394+
UNIT_ASSERT_GT(blockPersistStats.size(), 0);
395+
blockPersistStats.Stop();
396+
for (auto& ev : blockPersistStats) {
397+
auto proxy = ev->Recipient;
398+
ui32 groupId = GroupIDFromBlobStorageProxyID(proxy);
399+
auto res = ev->Get()->MakeErrorResponse(
400+
NKikimrProto::ERROR, "Something went wrong", TGroupId::FromValue(groupId));
401+
ui32 nodeIdx = ev->Sender.NodeId() - runtime.GetFirstNodeId();
402+
runtime.Send(new IEventHandle(ev->Sender, proxy, res.release()), nodeIdx, true);
403+
}
404+
TDispatchOptions rebootOptions;
405+
rebootOptions.FinalEvents.emplace_back(TEvTablet::EvBoot);
406+
runtime.DispatchEvents(rebootOptions);
407+
408+
// Check that after reboot the old value is still persisted by the Aggregator
409+
// and returned to the Service.
410+
blockPropagate.Stop();
411+
UNIT_ASSERT_VALUES_EQUAL(GetRowCount(runtime, otherNodeIdx, pathId), rowCount1);
412+
413+
// After everything is healed, stats should get updated.
414+
blockSSUpdates.Stop();
415+
WaitForRowCount(runtime, otherNodeIdx, pathId, rowCount2);
416+
}
378417
}
379418

380419
} // NSysView

ydb/core/statistics/service/ut/ut_http_request.cpp

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -81,21 +81,18 @@ void ProbeBaseStatsTest(bool isServerless) {
8181

8282
// Create a database and a table
8383
if (isServerless) {
84-
CreateDatabase(env, "Shared");
84+
CreateDatabase(env, "Shared", 1, true);
8585
CreateServerlessDatabase(env, "Database", "/Root/Shared");
8686
} else {
8787
CreateDatabase(env, "Database");
8888
}
8989
CreateColumnStoreTable(env, "Database", "Table", 5);
9090
const TString path = "/Root/Database/Table";
91+
const TPathId pathId = ResolvePathId(runtime, path);
9192
const ui32 nodeIdx = 1;
9293

93-
// Wait until the SchemeShard sends out the stats to the StatisticsAggregator.
94-
bool statsToSA = false;
95-
auto statsObserver = runtime.AddObserver<TEvStatistics::TEvSchemeShardStats>([&](auto& /* ev */) {
96-
statsToSA = true;
97-
});
98-
runtime.WaitFor("TEvSchemeShardStats", [&]{ return statsToSA; });
94+
// Wait until correct base statistics gets reported.
95+
ValidateRowCount(runtime, nodeIdx, pathId, ColumnTableRowsNumber);
9996

10097
// Issue the probe_base_stats request and verify that the result makes sense.
10198
const auto sender = runtime.AllocateEdgeActor(nodeIdx);
@@ -163,7 +160,7 @@ Y_UNIT_TEST_SUITE(HttpRequest) {
163160
}
164161

165162
Y_UNIT_TEST(ProbeBaseStatsServerless) {
166-
ProbeBaseStatsTest(false);
163+
ProbeBaseStatsTest(true);
167164
}
168165
}
169166

ydb/core/statistics/ut_common/ut_common.cpp

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -525,5 +525,59 @@ void WaitForSavedStatistics(TTestActorRuntime& runtime, const TPathId& pathId) {
525525
waiter.Wait();
526526
}
527527

528+
ui64 GetRowCount(TTestActorRuntime& runtime, ui32 nodeIndex, TPathId pathId) {
529+
auto statServiceId = NStat::MakeStatServiceID(runtime.GetNodeId(nodeIndex));
530+
NStat::TRequest req;
531+
req.PathId = pathId;
532+
533+
auto evGet = std::make_unique<TEvStatistics::TEvGetStatistics>();
534+
evGet->StatType = NStat::EStatType::SIMPLE;
535+
evGet->StatRequests.push_back(req);
536+
537+
auto sender = runtime.AllocateEdgeActor(nodeIndex);
538+
runtime.Send(statServiceId, sender, evGet.release(), nodeIndex, true);
539+
auto evResult = runtime.GrabEdgeEventRethrow<TEvStatistics::TEvGetStatisticsResult>(sender);
540+
541+
UNIT_ASSERT(evResult);
542+
UNIT_ASSERT(evResult->Get());
543+
UNIT_ASSERT(evResult->Get()->StatResponses.size() == 1);
544+
545+
auto rsp = evResult->Get()->StatResponses[0];
546+
auto stat = rsp.Simple;
547+
548+
return stat.RowCount;
549+
}
550+
551+
void ValidateRowCount(TTestActorRuntime& runtime, ui32 nodeIndex, TPathId pathId, size_t expectedRowCount) {
552+
ui64 rowCount = 0;
553+
while (rowCount == 0) {
554+
rowCount = GetRowCount(runtime, nodeIndex, pathId);
555+
556+
if (rowCount != 0) {
557+
UNIT_ASSERT_VALUES_EQUAL(rowCount, expectedRowCount);
558+
break;
559+
}
560+
561+
runtime.SimulateSleep(TDuration::Seconds(1));
562+
}
563+
}
564+
565+
void WaitForRowCount(
566+
TTestActorRuntime& runtime, ui32 nodeIndex,
567+
TPathId pathId, size_t expectedRowCount, size_t timeoutSec) {
568+
ui64 lastRowCount = 0;
569+
for (size_t i = 0; i <= timeoutSec; ++i) {
570+
lastRowCount = GetRowCount(runtime, nodeIndex, pathId);
571+
if (i % 5 == 0) {
572+
Cerr << "row count: " << lastRowCount << " (expected: " << expectedRowCount << ")\n";
573+
}
574+
if (lastRowCount == expectedRowCount) {
575+
return;
576+
}
577+
runtime.SimulateSleep(TDuration::Seconds(1));
578+
}
579+
UNIT_ASSERT_C(false, "timed out, last row count: " << lastRowCount);
580+
}
581+
528582
} // NStat
529583
} // NKikimr

ydb/core/statistics/ut_common/ut_common.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,5 +120,11 @@ void AnalyzeStatus(TTestActorRuntime& runtime, TActorId sender, ui64 saTabletId,
120120

121121
void WaitForSavedStatistics(TTestActorRuntime& runtime, const TPathId& pathId);
122122

123+
ui64 GetRowCount(TTestActorRuntime& runtime, ui32 nodeIndex, TPathId pathId);
124+
void ValidateRowCount(TTestActorRuntime& runtime, ui32 nodeIndex, TPathId pathId, size_t expectedRowCount);
125+
void WaitForRowCount(
126+
TTestActorRuntime& runtime, ui32 nodeIndex,
127+
TPathId pathId, size_t expectedRowCount, size_t timeoutSec = 130);
128+
123129
} // namespace NStat
124130
} // namespace NKikimr

0 commit comments

Comments
 (0)