Skip to content

Commit 2fc7f5a

Browse files
authored
Merge 6ec08bc into 898be03
2 parents 898be03 + 6ec08bc commit 2fc7f5a

File tree

7 files changed

+187
-76
lines changed

7 files changed

+187
-76
lines changed

ydb/core/statistics/aggregator/aggregator_impl.cpp

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -395,9 +395,10 @@ size_t TStatisticsAggregator::PropagatePart(const std::vector<TNodeId>& nodeIds,
395395
auto* entry = record->AddEntries();
396396
entry->SetSchemeShardId(ssId);
397397
auto itStats = BaseStatistics.find(ssId);
398-
if (itStats != BaseStatistics.end()) {
399-
entry->SetStats(itStats->second);
400-
size += itStats->second.size();
398+
if (itStats != BaseStatistics.end() && itStats->second.Committed) {
399+
const auto& stats = *itStats->second.Committed;
400+
entry->SetStats(stats);
401+
size += stats.size();
401402
} else {
402403
entry->SetStats(TString()); // stats are not sent from SS yet
403404
}
@@ -1059,8 +1060,11 @@ bool TStatisticsAggregator::OnRenderAppHtmlPage(NMon::TEvRemoteHttpInfo::TPtr ev
10591060
ui64 totalRowCount = 0;
10601061
ui64 totalBytesSize = 0;
10611062
for (const auto& [_, serializedStats] : BaseStatistics) {
1063+
if (!serializedStats.Committed) {
1064+
continue;
1065+
}
10621066
NKikimrStat::TSchemeShardStats stats;
1063-
Y_PROTOBUF_SUPPRESS_NODISCARD stats.ParseFromString(serializedStats);
1067+
Y_PROTOBUF_SUPPRESS_NODISCARD stats.ParseFromString(*serializedStats.Committed);
10641068
for (const auto& entry: stats.GetEntries()) {
10651069
totalRowCount += entry.GetRowCount();
10661070
totalBytesSize += entry.GetBytesSize();

ydb/core/statistics/aggregator/aggregator_impl.h

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,12 @@ class TStatisticsAggregator : public TActor<TStatisticsAggregator>, public NTabl
246246
TDuration PropagateTimeout;
247247
static constexpr TDuration FastCheckInterval = TDuration::MilliSeconds(50);
248248

249-
std::unordered_map<TSSId, TString> BaseStatistics; // schemeshard id -> serialized stats for all paths
249+
// Serialized stats for all paths from a single SchemeShard.
250+
struct TSerializedBaseStats {
251+
std::shared_ptr<TString> Committed; // Value that is safely persisted in local DB. Can be nullptr.
252+
std::shared_ptr<TString> Latest; // Value from the latest update.
253+
};
254+
std::unordered_map<TSSId, TSerializedBaseStats> BaseStatistics;
250255

251256
std::unordered_map<TSSId, size_t> SchemeShards; // all connected schemeshards
252257
std::unordered_map<TActorId, TSSId> SchemeShardPipes; // schemeshard pipe servers

ydb/core/statistics/aggregator/tx_init.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,8 +106,9 @@ struct TStatisticsAggregator::TTxInit : public TTxBase {
106106
while (!rowset.EndOfSet()) {
107107
ui64 schemeShardId = rowset.GetValue<Schema::BaseStatistics::SchemeShardId>();
108108
TString stats = rowset.GetValue<Schema::BaseStatistics::Stats>();
109-
110-
Self->BaseStatistics[schemeShardId] = stats;
109+
auto& schemeShardStats = Self->BaseStatistics[schemeShardId];
110+
schemeShardStats.Committed = std::make_shared<TString>(std::move(stats));
111+
schemeShardStats.Latest = schemeShardStats.Committed;
111112

112113
if (!rowset.Next()) {
113114
return false;

ydb/core/statistics/aggregator/tx_schemeshard_stats.cpp

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ namespace NKikimr::NStat {
44

55
struct TStatisticsAggregator::TTxSchemeShardStats : public TTxBase {
66
NKikimrStat::TEvSchemeShardStats Record;
7+
std::shared_ptr<TString> UpdatedStats;
78

89
TTxSchemeShardStats(TSelf* self, NKikimrStat::TEvSchemeShardStats&& record)
910
: TTxBase(self)
@@ -22,24 +23,22 @@ struct TStatisticsAggregator::TTxSchemeShardStats : public TTxBase {
2223

2324
NIceDb::TNiceDb db(txc.DB);
2425

26+
TSerializedBaseStats& existingStats = Self->BaseStatistics[schemeShardId];
27+
2528
NKikimrStat::TSchemeShardStats statRecord;
2629
Y_PROTOBUF_SUPPRESS_NODISCARD statRecord.ParseFromString(stats);
2730

2831
// if statistics is sent from schemeshard for the first time or
2932
// AreAllStatsFull field is not set (schemeshard is working on previous code version) or
3033
// statistics is full for all tables
3134
// then persist incoming statistics without changes
32-
if (!Self->BaseStatistics.contains(schemeShardId) ||
35+
if (!existingStats.Latest ||
3336
!statRecord.HasAreAllStatsFull() || statRecord.GetAreAllStatsFull())
3437
{
35-
db.Table<Schema::BaseStatistics>().Key(schemeShardId).Update(
36-
NIceDb::TUpdate<Schema::BaseStatistics::Stats>(stats));
37-
Self->BaseStatistics[schemeShardId] = stats;
38-
38+
UpdatedStats = std::make_shared<TString>(stats);
3939
} else {
4040
NKikimrStat::TSchemeShardStats oldStatRecord;
41-
const auto& oldStats = Self->BaseStatistics[schemeShardId];
42-
Y_PROTOBUF_SUPPRESS_NODISCARD oldStatRecord.ParseFromString(oldStats);
41+
Y_PROTOBUF_SUPPRESS_NODISCARD oldStatRecord.ParseFromString(*existingStats.Latest);
4342

4443
struct TOldStats {
4544
ui64 RowCount = 0;
@@ -75,14 +74,14 @@ struct TStatisticsAggregator::TTxSchemeShardStats : public TTxBase {
7574
}
7675
}
7776

78-
TString newStats;
79-
Y_PROTOBUF_SUPPRESS_NODISCARD newStatRecord.SerializeToString(&newStats);
80-
81-
db.Table<Schema::BaseStatistics>().Key(schemeShardId).Update(
82-
NIceDb::TUpdate<Schema::BaseStatistics::Stats>(newStats));
83-
Self->BaseStatistics[schemeShardId] = newStats;
77+
UpdatedStats = std::make_shared<TString>();
78+
Y_PROTOBUF_SUPPRESS_NODISCARD newStatRecord.SerializeToString(UpdatedStats.get());
8479
}
8580

81+
db.Table<Schema::BaseStatistics>().Key(schemeShardId).Update(
82+
NIceDb::TUpdate<Schema::BaseStatistics::Stats>(*UpdatedStats));
83+
existingStats.Latest = UpdatedStats;
84+
8685
if (!Self->EnableColumnStatistics) {
8786
return true;
8887
}
@@ -130,6 +129,7 @@ struct TStatisticsAggregator::TTxSchemeShardStats : public TTxBase {
130129

131130
void Complete(const TActorContext&) override {
132131
SA_LOG_D("[" << Self->TabletID() << "] TTxSchemeShardStats::Complete");
132+
Self->BaseStatistics[Record.GetSchemeShardId()].Committed = UpdatedStats;
133133
Self->ReportBaseStatisticsCounters();
134134
}
135135
};

ydb/core/statistics/service/ut/ut_basic_statistics.cpp

Lines changed: 97 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -53,62 +53,6 @@ void CreateTableWithGlobalIndex(TTestEnv& env, const TString& databaseName, cons
5353
FillTable(env, databaseName, tableName, rowCount);
5454
}
5555

56-
void ValidateRowCount(TTestActorRuntime& runtime, ui32 nodeIndex, TPathId pathId, size_t expectedRowCount) {
57-
auto statServiceId = NStat::MakeStatServiceID(runtime.GetNodeId(nodeIndex));
58-
ui64 rowCount = 0;
59-
while (rowCount == 0) {
60-
NStat::TRequest req;
61-
req.PathId = pathId;
62-
63-
auto evGet = std::make_unique<TEvStatistics::TEvGetStatistics>();
64-
evGet->StatType = NStat::EStatType::SIMPLE;
65-
evGet->StatRequests.push_back(req);
66-
67-
auto sender = runtime.AllocateEdgeActor(nodeIndex);
68-
runtime.Send(statServiceId, sender, evGet.release(), nodeIndex, true);
69-
auto evResult = runtime.GrabEdgeEventRethrow<TEvStatistics::TEvGetStatisticsResult>(sender);
70-
71-
UNIT_ASSERT(evResult);
72-
UNIT_ASSERT(evResult->Get());
73-
UNIT_ASSERT(evResult->Get()->StatResponses.size() == 1);
74-
75-
auto rsp = evResult->Get()->StatResponses[0];
76-
auto stat = rsp.Simple;
77-
78-
rowCount = stat.RowCount;
79-
80-
if (rowCount != 0) {
81-
UNIT_ASSERT(stat.RowCount == expectedRowCount);
82-
break;
83-
}
84-
85-
runtime.SimulateSleep(TDuration::Seconds(1));
86-
}
87-
}
88-
89-
ui64 GetRowCount(TTestActorRuntime& runtime, ui32 nodeIndex, TPathId pathId) {
90-
auto statServiceId = NStat::MakeStatServiceID(runtime.GetNodeId(nodeIndex));
91-
NStat::TRequest req;
92-
req.PathId = pathId;
93-
94-
auto evGet = std::make_unique<TEvStatistics::TEvGetStatistics>();
95-
evGet->StatType = NStat::EStatType::SIMPLE;
96-
evGet->StatRequests.push_back(req);
97-
98-
auto sender = runtime.AllocateEdgeActor(nodeIndex);
99-
runtime.Send(statServiceId, sender, evGet.release(), nodeIndex, true);
100-
auto evResult = runtime.GrabEdgeEventRethrow<TEvStatistics::TEvGetStatisticsResult>(sender);
101-
102-
UNIT_ASSERT(evResult);
103-
UNIT_ASSERT(evResult->Get());
104-
UNIT_ASSERT(evResult->Get()->StatResponses.size() == 1);
105-
106-
auto rsp = evResult->Get()->StatResponses[0];
107-
auto stat = rsp.Simple;
108-
109-
return stat.RowCount;
110-
}
111-
11256
} // namespace
11357

11458
Y_UNIT_TEST_SUITE(BasicStatistics) {
@@ -291,6 +235,103 @@ Y_UNIT_TEST_SUITE(BasicStatistics) {
291235
auto pathId = ResolvePathId(runtime, "/Root/Serverless/Table/ValueIndex/indexImplTable");
292236
ValidateRowCount(runtime, 1, pathId, 5);
293237
}
238+
239+
Y_UNIT_TEST(PersistenceWithStorageFailuresAndReboots) {
240+
TTestEnv env(1, 2);
241+
auto& runtime = *env.GetServer().GetRuntime();
242+
243+
const size_t rowCount1 = 5;
244+
245+
CreateDatabase(env, "Database", 2);
246+
CreateTable(env, "Database", "Table", rowCount1);
247+
248+
ui64 saTabletId = 0;
249+
auto pathId = ResolvePathId(runtime, "/Root/Database/Table", nullptr, &saTabletId);
250+
ui64 ssTabletId = pathId.OwnerId;
251+
252+
const ui32 nodeIdx = 1;
253+
const ui32 otherNodeIdx = 2;
254+
255+
// Block propagate events that go to node with otherNodeIdx. We will use this
256+
// node later as a clean slate.
257+
TBlockEvents<TEvStatistics::TEvPropagateStatistics> blockPropagate(runtime,
258+
[&](const TEvStatistics::TEvPropagateStatistics::TPtr& ev) {
259+
return ev->Recipient.NodeId() == runtime.GetNodeId(otherNodeIdx);
260+
});
261+
262+
// Wait until correct statistics gets reported
263+
ValidateRowCount(runtime, nodeIdx, pathId, rowCount1);
264+
265+
// Block persisting new updates from schemeshards on the aggregator.
266+
// This should result in old statistics being reported, even after new
267+
// updates arrive.
268+
TBlockEvents<TEvBlobStorage::TEvPut> blockPersistStats(runtime,
269+
[&](const TEvBlobStorage::TEvPut::TPtr& ev) {
270+
return ev->Get()->Id.TabletID() == saTabletId;
271+
});
272+
273+
// Upsert some more data
274+
const size_t rowCount2 = 7;
275+
FillTable(env, "Database", "Table", rowCount2);
276+
277+
{
278+
// Wait for an update from SchemeShard with new row count.
279+
280+
bool statsUpdateSent = false;
281+
auto sendObserver = runtime.AddObserver<TEvStatistics::TEvSchemeShardStats>([&](auto& ev){
282+
NKikimrStat::TSchemeShardStats statRecord;
283+
UNIT_ASSERT(statRecord.ParseFromString(ev->Get()->Record.GetStats()));
284+
for (const auto& entry : statRecord.GetEntries()) {
285+
if (TPathId::FromProto(entry.GetPathId()) == pathId
286+
&& entry.GetAreStatsFull()
287+
&& entry.GetRowCount() == rowCount2) {
288+
statsUpdateSent = true;
289+
}
290+
}
291+
});
292+
runtime.WaitFor("TEvSchemeShardStats", [&]{ return statsUpdateSent; });
293+
294+
bool propagateSent = false;
295+
auto propagateObserver = runtime.AddObserver<TEvStatistics::TEvPropagateStatistics>([&](auto& ev){
296+
if (ev->Recipient.NodeId() == runtime.GetNodeId(nodeIdx)) {
297+
propagateSent = true;
298+
}
299+
});
300+
runtime.WaitFor("TEvPropagateStatistics", [&]{ return propagateSent; });
301+
}
302+
UNIT_ASSERT_VALUES_EQUAL(GetRowCount(runtime, nodeIdx, pathId), rowCount1);
303+
304+
TActorId sender = runtime.AllocateEdgeActor();
305+
RebootTablet(runtime, ssTabletId, sender);
306+
307+
// Simulate storage failure, StatisticsAggregator will reboot.
308+
309+
TBlockEvents<TEvStatistics::TEvSchemeShardStats> blockSSUpdates(runtime);
310+
UNIT_ASSERT_GT(blockPersistStats.size(), 0);
311+
blockPersistStats.Stop();
312+
for (auto& ev : blockPersistStats) {
313+
auto proxy = ev->Recipient;
314+
ui32 groupId = GroupIDFromBlobStorageProxyID(proxy);
315+
auto res = ev->Get()->MakeErrorResponse(
316+
NKikimrProto::ERROR, "Something went wrong", TGroupId::FromValue(groupId));
317+
ui32 nodeIdx = ev->Sender.NodeId() - runtime.GetFirstNodeId();
318+
runtime.Send(new IEventHandle(ev->Sender, proxy, res.release()), nodeIdx, true);
319+
}
320+
TDispatchOptions rebootOptions;
321+
rebootOptions.FinalEvents.emplace_back(TEvTablet::EvBoot);
322+
runtime.DispatchEvents(rebootOptions);
323+
324+
// Check that after reboot the old value is still persisted by the Aggregator
325+
// and returned to the Service.
326+
blockPropagate.Stop();
327+
UNIT_ASSERT_VALUES_EQUAL(GetRowCount(runtime, otherNodeIdx, pathId), rowCount1);
328+
329+
// After everything is healed, stats should get updated.
330+
blockSSUpdates.Stop();
331+
// Wait takes a long time because of long send intervals in schemeshard, raise the limit.
332+
runtime.SetScheduledLimit(200000);
333+
WaitForRowCount(runtime, otherNodeIdx, pathId, rowCount2);
334+
}
294335
}
295336

296337
} // NSysView

ydb/core/statistics/ut_common/ut_common.cpp

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -520,5 +520,59 @@ void WaitForSavedStatistics(TTestActorRuntime& runtime, const TPathId& pathId) {
520520
waiter.Wait();
521521
}
522522

523+
ui64 GetRowCount(TTestActorRuntime& runtime, ui32 nodeIndex, TPathId pathId) {
524+
auto statServiceId = NStat::MakeStatServiceID(runtime.GetNodeId(nodeIndex));
525+
NStat::TRequest req;
526+
req.PathId = pathId;
527+
528+
auto evGet = std::make_unique<TEvStatistics::TEvGetStatistics>();
529+
evGet->StatType = NStat::EStatType::SIMPLE;
530+
evGet->StatRequests.push_back(req);
531+
532+
auto sender = runtime.AllocateEdgeActor(nodeIndex);
533+
runtime.Send(statServiceId, sender, evGet.release(), nodeIndex, true);
534+
auto evResult = runtime.GrabEdgeEventRethrow<TEvStatistics::TEvGetStatisticsResult>(sender);
535+
536+
UNIT_ASSERT(evResult);
537+
UNIT_ASSERT(evResult->Get());
538+
UNIT_ASSERT(evResult->Get()->StatResponses.size() == 1);
539+
540+
auto rsp = evResult->Get()->StatResponses[0];
541+
auto stat = rsp.Simple;
542+
543+
return stat.RowCount;
544+
}
545+
546+
void ValidateRowCount(TTestActorRuntime& runtime, ui32 nodeIndex, TPathId pathId, size_t expectedRowCount) {
547+
ui64 rowCount = 0;
548+
while (rowCount == 0) {
549+
rowCount = GetRowCount(runtime, nodeIndex, pathId);
550+
551+
if (rowCount != 0) {
552+
UNIT_ASSERT_VALUES_EQUAL(rowCount, expectedRowCount);
553+
break;
554+
}
555+
556+
runtime.SimulateSleep(TDuration::Seconds(1));
557+
}
558+
}
559+
560+
void WaitForRowCount(
561+
TTestActorRuntime& runtime, ui32 nodeIndex,
562+
TPathId pathId, size_t expectedRowCount, size_t timeoutSec) {
563+
ui64 lastRowCount = 0;
564+
for (size_t i = 0; i <= timeoutSec; ++i) {
565+
lastRowCount = GetRowCount(runtime, nodeIndex, pathId);
566+
if (i % 5 == 0) {
567+
Cerr << "row count: " << lastRowCount << " (expected: " << expectedRowCount << ")\n";
568+
}
569+
if (lastRowCount == expectedRowCount) {
570+
return;
571+
}
572+
runtime.SimulateSleep(TDuration::Seconds(1));
573+
}
574+
UNIT_ASSERT_C(false, "timed out, last row count: " << lastRowCount);
575+
}
576+
523577
} // NStat
524578
} // NKikimr

ydb/core/statistics/ut_common/ut_common.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,5 +119,11 @@ void AnalyzeStatus(TTestActorRuntime& runtime, TActorId sender, ui64 saTabletId,
119119

120120
void WaitForSavedStatistics(TTestActorRuntime& runtime, const TPathId& pathId);
121121

122+
ui64 GetRowCount(TTestActorRuntime& runtime, ui32 nodeIndex, TPathId pathId);
123+
void ValidateRowCount(TTestActorRuntime& runtime, ui32 nodeIndex, TPathId pathId, size_t expectedRowCount);
124+
void WaitForRowCount(
125+
TTestActorRuntime& runtime, ui32 nodeIndex,
126+
TPathId pathId, size_t expectedRowCount, size_t timeoutSec = 130);
127+
122128
} // namespace NStat
123129
} // namespace NKikimr

0 commit comments

Comments
 (0)