Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions ydb/core/statistics/aggregator/aggregator_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -416,9 +416,10 @@ size_t TStatisticsAggregator::PropagatePart(const std::vector<TNodeId>& nodeIds,
auto* entry = record->AddEntries();
entry->SetSchemeShardId(ssId);
auto itStats = BaseStatistics.find(ssId);
if (itStats != BaseStatistics.end()) {
entry->SetStats(itStats->second);
size += itStats->second.size();
if (itStats != BaseStatistics.end() && itStats->second.Committed) {
const auto& stats = *itStats->second.Committed;
entry->SetStats(stats);
size += stats.size();
} else {
entry->SetStats(TString()); // stats are not sent from SS yet
}
Expand Down Expand Up @@ -1081,8 +1082,11 @@ bool TStatisticsAggregator::OnRenderAppHtmlPage(NMon::TEvRemoteHttpInfo::TPtr ev
ui64 totalRowCount = 0;
ui64 totalBytesSize = 0;
for (const auto& [_, serializedStats] : BaseStatistics) {
if (!serializedStats.Committed) {
continue;
}
NKikimrStat::TSchemeShardStats stats;
Y_PROTOBUF_SUPPRESS_NODISCARD stats.ParseFromString(serializedStats);
Y_PROTOBUF_SUPPRESS_NODISCARD stats.ParseFromString(*serializedStats.Committed);
for (const auto& entry: stats.GetEntries()) {
totalRowCount += entry.GetRowCount();
totalBytesSize += entry.GetBytesSize();
Expand Down
7 changes: 6 additions & 1 deletion ydb/core/statistics/aggregator/aggregator_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,12 @@ class TStatisticsAggregator : public TActor<TStatisticsAggregator>, public NTabl

static constexpr TDuration FastCheckInterval = TDuration::MilliSeconds(50);

std::unordered_map<TSSId, TString> BaseStatistics; // schemeshard id -> serialized stats for all paths
// Serialized stats for all paths from a single SchemeShard.
struct TSerializedBaseStats {
std::shared_ptr<TString> Committed; // Value that is safely persisted in local DB. Can be nullptr.
std::shared_ptr<TString> Latest; // Value from the latest update.
};
std::unordered_map<TSSId, TSerializedBaseStats> BaseStatistics;

std::unordered_map<TSSId, size_t> SchemeShards; // all connected schemeshards
std::unordered_map<TActorId, TSSId> SchemeShardPipes; // schemeshard pipe servers
Expand Down
5 changes: 3 additions & 2 deletions ydb/core/statistics/aggregator/tx_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,9 @@ struct TStatisticsAggregator::TTxInit : public TTxBase {
while (!rowset.EndOfSet()) {
ui64 schemeShardId = rowset.GetValue<Schema::BaseStatistics::SchemeShardId>();
TString stats = rowset.GetValue<Schema::BaseStatistics::Stats>();

Self->BaseStatistics[schemeShardId] = stats;
auto& schemeShardStats = Self->BaseStatistics[schemeShardId];
schemeShardStats.Committed = std::make_shared<TString>(std::move(stats));
schemeShardStats.Latest = schemeShardStats.Committed;

if (!rowset.Next()) {
return false;
Expand Down
26 changes: 13 additions & 13 deletions ydb/core/statistics/aggregator/tx_schemeshard_stats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ namespace NKikimr::NStat {

struct TStatisticsAggregator::TTxSchemeShardStats : public TTxBase {
NKikimrStat::TEvSchemeShardStats Record;
std::shared_ptr<TString> UpdatedStats;

TTxSchemeShardStats(TSelf* self, NKikimrStat::TEvSchemeShardStats&& record)
: TTxBase(self)
Expand All @@ -22,24 +23,22 @@ struct TStatisticsAggregator::TTxSchemeShardStats : public TTxBase {

NIceDb::TNiceDb db(txc.DB);

TSerializedBaseStats& existingStats = Self->BaseStatistics[schemeShardId];

NKikimrStat::TSchemeShardStats statRecord;
Y_PROTOBUF_SUPPRESS_NODISCARD statRecord.ParseFromString(stats);

// if statistics is sent from schemeshard for the first time or
// AreAllStatsFull field is not set (schemeshard is working on previous code version) or
// statistics is full for all tables
// then persist incoming statistics without changes
if (!Self->BaseStatistics.contains(schemeShardId) ||
if (!existingStats.Latest ||
!statRecord.HasAreAllStatsFull() || statRecord.GetAreAllStatsFull())
{
db.Table<Schema::BaseStatistics>().Key(schemeShardId).Update(
NIceDb::TUpdate<Schema::BaseStatistics::Stats>(stats));
Self->BaseStatistics[schemeShardId] = stats;

UpdatedStats = std::make_shared<TString>(stats);
} else {
NKikimrStat::TSchemeShardStats oldStatRecord;
const auto& oldStats = Self->BaseStatistics[schemeShardId];
Y_PROTOBUF_SUPPRESS_NODISCARD oldStatRecord.ParseFromString(oldStats);
Y_PROTOBUF_SUPPRESS_NODISCARD oldStatRecord.ParseFromString(*existingStats.Latest);

struct TOldStats {
ui64 RowCount = 0;
Expand Down Expand Up @@ -75,14 +74,14 @@ struct TStatisticsAggregator::TTxSchemeShardStats : public TTxBase {
}
}

TString newStats;
Y_PROTOBUF_SUPPRESS_NODISCARD newStatRecord.SerializeToString(&newStats);

db.Table<Schema::BaseStatistics>().Key(schemeShardId).Update(
NIceDb::TUpdate<Schema::BaseStatistics::Stats>(newStats));
Self->BaseStatistics[schemeShardId] = newStats;
UpdatedStats = std::make_shared<TString>();
Y_PROTOBUF_SUPPRESS_NODISCARD newStatRecord.SerializeToString(UpdatedStats.get());
}

db.Table<Schema::BaseStatistics>().Key(schemeShardId).Update(
NIceDb::TUpdate<Schema::BaseStatistics::Stats>(*UpdatedStats));
existingStats.Latest = UpdatedStats;

if (!Self->EnableColumnStatistics) {
return true;
}
Expand Down Expand Up @@ -130,6 +129,7 @@ struct TStatisticsAggregator::TTxSchemeShardStats : public TTxBase {

void Complete(const TActorContext&) override {
SA_LOG_D("[" << Self->TabletID() << "] TTxSchemeShardStats::Complete");
Self->BaseStatistics[Record.GetSchemeShardId()].Committed = UpdatedStats;
Self->ReportBaseStatisticsCounters();
}
};
Expand Down
151 changes: 95 additions & 56 deletions ydb/core/statistics/service/ut/ut_basic_statistics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,62 +52,6 @@ void CreateTableWithGlobalIndex(TTestEnv& env, const TString& databaseName, cons
FillTable(env, databaseName, tableName, rowCount);
}

void ValidateRowCount(TTestActorRuntime& runtime, ui32 nodeIndex, TPathId pathId, size_t expectedRowCount) {
auto statServiceId = NStat::MakeStatServiceID(runtime.GetNodeId(nodeIndex));
ui64 rowCount = 0;
while (rowCount == 0) {
NStat::TRequest req;
req.PathId = pathId;

auto evGet = std::make_unique<TEvStatistics::TEvGetStatistics>();
evGet->StatType = NStat::EStatType::SIMPLE;
evGet->StatRequests.push_back(req);

auto sender = runtime.AllocateEdgeActor(nodeIndex);
runtime.Send(statServiceId, sender, evGet.release(), nodeIndex, true);
auto evResult = runtime.GrabEdgeEventRethrow<TEvStatistics::TEvGetStatisticsResult>(sender);

UNIT_ASSERT(evResult);
UNIT_ASSERT(evResult->Get());
UNIT_ASSERT(evResult->Get()->StatResponses.size() == 1);

auto rsp = evResult->Get()->StatResponses[0];
auto stat = rsp.Simple;

rowCount = stat.RowCount;

if (rowCount != 0) {
UNIT_ASSERT(stat.RowCount == expectedRowCount);
break;
}

runtime.SimulateSleep(TDuration::Seconds(1));
}
}

ui64 GetRowCount(TTestActorRuntime& runtime, ui32 nodeIndex, TPathId pathId) {
auto statServiceId = NStat::MakeStatServiceID(runtime.GetNodeId(nodeIndex));
NStat::TRequest req;
req.PathId = pathId;

auto evGet = std::make_unique<TEvStatistics::TEvGetStatistics>();
evGet->StatType = NStat::EStatType::SIMPLE;
evGet->StatRequests.push_back(req);

auto sender = runtime.AllocateEdgeActor(nodeIndex);
runtime.Send(statServiceId, sender, evGet.release(), nodeIndex, true);
auto evResult = runtime.GrabEdgeEventRethrow<TEvStatistics::TEvGetStatisticsResult>(sender);

UNIT_ASSERT(evResult);
UNIT_ASSERT(evResult->Get());
UNIT_ASSERT(evResult->Get()->StatResponses.size() == 1);

auto rsp = evResult->Get()->StatResponses[0];
auto stat = rsp.Simple;

return stat.RowCount;
}

} // namespace

Y_UNIT_TEST_SUITE(BasicStatistics) {
Expand Down Expand Up @@ -375,6 +319,101 @@ Y_UNIT_TEST_SUITE(BasicStatistics) {
UNIT_ASSERT_VALUES_EQUAL(sendCount, 2); // events from 2 serverless schemeshards
UNIT_ASSERT_VALUES_EQUAL(propagateCount, 2); // SA -> node1 and node1 -> node2
}

Y_UNIT_TEST(PersistenceWithStorageFailuresAndReboots) {
TTestEnv env(1, 2);
auto& runtime = *env.GetServer().GetRuntime();

const size_t rowCount1 = 5;

CreateDatabase(env, "Database", 2);
CreateTable(env, "Database", "Table", rowCount1);

ui64 saTabletId = 0;
auto pathId = ResolvePathId(runtime, "/Root/Database/Table", nullptr, &saTabletId);
ui64 ssTabletId = pathId.OwnerId;

const ui32 nodeIdx = 1;
const ui32 otherNodeIdx = 2;

// Block propagate events that go to node with otherNodeIdx. We will use this
// node later as a clean slate.
TBlockEvents<TEvStatistics::TEvPropagateStatistics> blockPropagate(runtime,
[&](const TEvStatistics::TEvPropagateStatistics::TPtr& ev) {
return ev->Recipient.NodeId() == runtime.GetNodeId(otherNodeIdx);
});

// Wait until correct statistics gets reported
ValidateRowCount(runtime, nodeIdx, pathId, rowCount1);

// Block persisting new updates from schemeshards on the aggregator.
// This should result in old statistics being reported, even after new
// updates arrive.
TBlockEvents<TEvBlobStorage::TEvPut> blockPersistStats(runtime,
[&](const TEvBlobStorage::TEvPut::TPtr& ev) {
return ev->Get()->Id.TabletID() == saTabletId;
});

// Upsert some more data
const size_t rowCount2 = 7;
FillTable(env, "Database", "Table", rowCount2);

{
// Wait for an update from SchemeShard with new row count.

bool statsUpdateSent = false;
auto sendObserver = runtime.AddObserver<TEvStatistics::TEvSchemeShardStats>([&](auto& ev){
NKikimrStat::TSchemeShardStats statRecord;
UNIT_ASSERT(statRecord.ParseFromString(ev->Get()->Record.GetStats()));
for (const auto& entry : statRecord.GetEntries()) {
if (TPathId::FromProto(entry.GetPathId()) == pathId
&& entry.GetAreStatsFull()
&& entry.GetRowCount() == rowCount2) {
statsUpdateSent = true;
}
}
});
runtime.WaitFor("TEvSchemeShardStats", [&]{ return statsUpdateSent; });

bool propagateSent = false;
auto propagateObserver = runtime.AddObserver<TEvStatistics::TEvPropagateStatistics>([&](auto& ev){
if (ev->Recipient.NodeId() == runtime.GetNodeId(nodeIdx)) {
propagateSent = true;
}
});
runtime.WaitFor("TEvPropagateStatistics", [&]{ return propagateSent; });
}
UNIT_ASSERT_VALUES_EQUAL(GetRowCount(runtime, nodeIdx, pathId), rowCount1);

TActorId sender = runtime.AllocateEdgeActor();
RebootTablet(runtime, ssTabletId, sender);

// Simulate storage failure, StatisticsAggregator will reboot.

TBlockEvents<TEvStatistics::TEvSchemeShardStats> blockSSUpdates(runtime);
UNIT_ASSERT_GT(blockPersistStats.size(), 0);
blockPersistStats.Stop();
for (auto& ev : blockPersistStats) {
auto proxy = ev->Recipient;
ui32 groupId = GroupIDFromBlobStorageProxyID(proxy);
auto res = ev->Get()->MakeErrorResponse(
NKikimrProto::ERROR, "Something went wrong", TGroupId::FromValue(groupId));
ui32 nodeIdx = ev->Sender.NodeId() - runtime.GetFirstNodeId();
runtime.Send(new IEventHandle(ev->Sender, proxy, res.release()), nodeIdx, true);
}
TDispatchOptions rebootOptions;
rebootOptions.FinalEvents.emplace_back(TEvTablet::EvBoot);
runtime.DispatchEvents(rebootOptions);

// Check that after reboot the old value is still persisted by the Aggregator
// and returned to the Service.
blockPropagate.Stop();
UNIT_ASSERT_VALUES_EQUAL(GetRowCount(runtime, otherNodeIdx, pathId), rowCount1);

// After everything is healed, stats should get updated.
blockSSUpdates.Stop();
WaitForRowCount(runtime, otherNodeIdx, pathId, rowCount2);
}
}

} // NSysView
Expand Down
13 changes: 5 additions & 8 deletions ydb/core/statistics/service/ut/ut_http_request.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,21 +81,18 @@ void ProbeBaseStatsTest(bool isServerless) {

// Create a database and a table
if (isServerless) {
CreateDatabase(env, "Shared");
CreateDatabase(env, "Shared", 1, true);
CreateServerlessDatabase(env, "Database", "/Root/Shared");
} else {
CreateDatabase(env, "Database");
}
CreateColumnStoreTable(env, "Database", "Table", 5);
const TString path = "/Root/Database/Table";
const TPathId pathId = ResolvePathId(runtime, path);
const ui32 nodeIdx = 1;

// Wait until the SchemeShard sends out the stats to the StatisticsAggregator.
bool statsToSA = false;
auto statsObserver = runtime.AddObserver<TEvStatistics::TEvSchemeShardStats>([&](auto& /* ev */) {
statsToSA = true;
});
runtime.WaitFor("TEvSchemeShardStats", [&]{ return statsToSA; });
// Wait until correct base statistics gets reported.
ValidateRowCount(runtime, nodeIdx, pathId, ColumnTableRowsNumber);

// Issue the probe_base_stats request and verify that the result makes sense.
const auto sender = runtime.AllocateEdgeActor(nodeIdx);
Expand Down Expand Up @@ -163,7 +160,7 @@ Y_UNIT_TEST_SUITE(HttpRequest) {
}

Y_UNIT_TEST(ProbeBaseStatsServerless) {
ProbeBaseStatsTest(false);
ProbeBaseStatsTest(true);
}
}

Expand Down
54 changes: 54 additions & 0 deletions ydb/core/statistics/ut_common/ut_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -525,5 +525,59 @@ void WaitForSavedStatistics(TTestActorRuntime& runtime, const TPathId& pathId) {
waiter.Wait();
}

ui64 GetRowCount(TTestActorRuntime& runtime, ui32 nodeIndex, TPathId pathId) {
auto statServiceId = NStat::MakeStatServiceID(runtime.GetNodeId(nodeIndex));
NStat::TRequest req;
req.PathId = pathId;

auto evGet = std::make_unique<TEvStatistics::TEvGetStatistics>();
evGet->StatType = NStat::EStatType::SIMPLE;
evGet->StatRequests.push_back(req);

auto sender = runtime.AllocateEdgeActor(nodeIndex);
runtime.Send(statServiceId, sender, evGet.release(), nodeIndex, true);
auto evResult = runtime.GrabEdgeEventRethrow<TEvStatistics::TEvGetStatisticsResult>(sender);

UNIT_ASSERT(evResult);
UNIT_ASSERT(evResult->Get());
UNIT_ASSERT(evResult->Get()->StatResponses.size() == 1);

auto rsp = evResult->Get()->StatResponses[0];
auto stat = rsp.Simple;

return stat.RowCount;
}

void ValidateRowCount(TTestActorRuntime& runtime, ui32 nodeIndex, TPathId pathId, size_t expectedRowCount) {
ui64 rowCount = 0;
while (rowCount == 0) {
rowCount = GetRowCount(runtime, nodeIndex, pathId);

if (rowCount != 0) {
UNIT_ASSERT_VALUES_EQUAL(rowCount, expectedRowCount);
break;
}

runtime.SimulateSleep(TDuration::Seconds(1));
}
}

void WaitForRowCount(
TTestActorRuntime& runtime, ui32 nodeIndex,
TPathId pathId, size_t expectedRowCount, size_t timeoutSec) {
ui64 lastRowCount = 0;
for (size_t i = 0; i <= timeoutSec; ++i) {
lastRowCount = GetRowCount(runtime, nodeIndex, pathId);
if (i % 5 == 0) {
Cerr << "row count: " << lastRowCount << " (expected: " << expectedRowCount << ")\n";
}
if (lastRowCount == expectedRowCount) {
return;
}
runtime.SimulateSleep(TDuration::Seconds(1));
}
UNIT_ASSERT_C(false, "timed out, last row count: " << lastRowCount);
}

} // NStat
} // NKikimr
6 changes: 6 additions & 0 deletions ydb/core/statistics/ut_common/ut_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,5 +120,11 @@ void AnalyzeStatus(TTestActorRuntime& runtime, TActorId sender, ui64 saTabletId,

void WaitForSavedStatistics(TTestActorRuntime& runtime, const TPathId& pathId);

ui64 GetRowCount(TTestActorRuntime& runtime, ui32 nodeIndex, TPathId pathId);
void ValidateRowCount(TTestActorRuntime& runtime, ui32 nodeIndex, TPathId pathId, size_t expectedRowCount);
void WaitForRowCount(
TTestActorRuntime& runtime, ui32 nodeIndex,
TPathId pathId, size_t expectedRowCount, size_t timeoutSec = 130);

} // namespace NStat
} // namespace NKikimr
Loading