Skip to content

Commit f2d1054

Browse files
authored
Fix tsan error (#5441)
1 parent 7a17f96 commit f2d1054

File tree

3 files changed

+131
-101
lines changed

3 files changed

+131
-101
lines changed
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
#include "mon_stats.h"
2+
3+
4+
namespace NActors {
5+
6+
TLogHistogram::TLogHistogram() {
7+
memset(Buckets, 0, sizeof(Buckets));
8+
}
9+
10+
void TLogHistogram::Aggregate(const TLogHistogram& other) {
11+
const ui64 inc = RelaxedLoad(&other.TotalSamples);
12+
RelaxedStore(&TotalSamples, RelaxedLoad(&TotalSamples) + inc);
13+
for (size_t i = 0; i < Y_ARRAY_SIZE(Buckets); ++i) {
14+
Buckets[i] += RelaxedLoad(&other.Buckets[i]);
15+
}
16+
}
17+
18+
ui32 TLogHistogram::Count() const {
19+
return Y_ARRAY_SIZE(Buckets);
20+
}
21+
22+
NMonitoring::TBucketBound TLogHistogram::UpperBound(ui32 index) const {
23+
Y_ASSERT(index < Y_ARRAY_SIZE(Buckets));
24+
if (index == 0) {
25+
return 1;
26+
}
27+
return NMonitoring::TBucketBound(1ull << (index - 1)) * 2.0;
28+
}
29+
30+
NMonitoring::TBucketValue TLogHistogram::Value(ui32 index) const {
31+
Y_ASSERT(index < Y_ARRAY_SIZE(Buckets));
32+
return Buckets[index];
33+
}
34+
35+
TExecutorThreadStats::TExecutorThreadStats() // must be not empty as 0 used as default
36+
: ElapsedTicksByActivity(TLocalProcessKeyStateIndexLimiter::GetMaxKeysCount())
37+
, ReceivedEventsByActivity(TLocalProcessKeyStateIndexLimiter::GetMaxKeysCount())
38+
, ActorsAliveByActivity(TLocalProcessKeyStateIndexLimiter::GetMaxKeysCount())
39+
, ScheduledEventsByActivity(TLocalProcessKeyStateIndexLimiter::GetMaxKeysCount())
40+
, StuckActorsByActivity(TLocalProcessKeyStateIndexLimiter::GetMaxKeysCount())
41+
, UsageByActivity(TLocalProcessKeyStateIndexLimiter::GetMaxKeysCount())
42+
{}
43+
44+
namespace {
45+
template <typename T>
46+
void AggregateOne(TVector<T>& self, const TVector<T>& other) {
47+
const size_t selfSize = self.size();
48+
const size_t otherSize = other.size();
49+
if (selfSize < otherSize)
50+
self.resize(otherSize);
51+
for (size_t at = 0; at < otherSize; ++at)
52+
self[at] += RelaxedLoad(&other[at]);
53+
}
54+
}
55+
56+
void TExecutorThreadStats::Aggregate(const TExecutorThreadStats& other) {
57+
SentEvents += RelaxedLoad(&other.SentEvents);
58+
ReceivedEvents += RelaxedLoad(&other.ReceivedEvents);
59+
PreemptedEvents += RelaxedLoad(&other.PreemptedEvents);
60+
NonDeliveredEvents += RelaxedLoad(&other.NonDeliveredEvents);
61+
EmptyMailboxActivation += RelaxedLoad(&other.EmptyMailboxActivation);
62+
CpuUs += RelaxedLoad(&other.CpuUs);
63+
SafeElapsedTicks += RelaxedLoad(&other.SafeElapsedTicks);
64+
RelaxedStore(
65+
&WorstActivationTimeUs,
66+
std::max(RelaxedLoad(&WorstActivationTimeUs), RelaxedLoad(&other.WorstActivationTimeUs)));
67+
ElapsedTicks += RelaxedLoad(&other.ElapsedTicks);
68+
ParkedTicks += RelaxedLoad(&other.ParkedTicks);
69+
BlockedTicks += RelaxedLoad(&other.BlockedTicks);
70+
MailboxPushedOutByTailSending += RelaxedLoad(&other.MailboxPushedOutByTailSending);
71+
MailboxPushedOutBySoftPreemption += RelaxedLoad(&other.MailboxPushedOutBySoftPreemption);
72+
MailboxPushedOutByTime += RelaxedLoad(&other.MailboxPushedOutByTime);
73+
MailboxPushedOutByEventCount += RelaxedLoad(&other.MailboxPushedOutByEventCount);
74+
NotEnoughCpuExecutions += RelaxedLoad(&other.NotEnoughCpuExecutions);
75+
76+
ActivationTimeHistogram.Aggregate(other.ActivationTimeHistogram);
77+
EventDeliveryTimeHistogram.Aggregate(other.EventDeliveryTimeHistogram);
78+
EventProcessingCountHistogram.Aggregate(other.EventProcessingCountHistogram);
79+
EventProcessingTimeHistogram.Aggregate(other.EventProcessingTimeHistogram);
80+
81+
AggregateOne(ElapsedTicksByActivity, other.ElapsedTicksByActivity);
82+
AggregateOne(ReceivedEventsByActivity, other.ReceivedEventsByActivity);
83+
AggregateOne(ActorsAliveByActivity, other.ActorsAliveByActivity);
84+
AggregateOne(ScheduledEventsByActivity, other.ScheduledEventsByActivity);
85+
AggregateOne(StuckActorsByActivity, other.StuckActorsByActivity);
86+
87+
// AggregatedCurrentActivationTime is readed and modified only from one thread
88+
auto timeUs = RelaxedLoad(&other.CurrentActivationTime.TimeUs);
89+
if (timeUs) {
90+
AggregatedCurrentActivationTime.push_back(TActivationTime{
91+
.TimeUs = timeUs,
92+
.LastActivity = RelaxedLoad(&other.CurrentActivationTime.LastActivity)});
93+
}
94+
if (other.AggregatedCurrentActivationTime.size()) {
95+
AggregatedCurrentActivationTime.insert(AggregatedCurrentActivationTime.end(), other.AggregatedCurrentActivationTime.begin(), other.AggregatedCurrentActivationTime.end());
96+
}
97+
98+
if (UsageByActivity.size() < other.UsageByActivity.size()) {
99+
UsageByActivity.resize(other.UsageByActivity.size());
100+
}
101+
for (size_t i = 0; i < UsageByActivity.size(); ++i) {
102+
for (size_t j = 0; j < 10; ++j) {
103+
UsageByActivity[i][j] += RelaxedLoad(&other.UsageByActivity[i][j]);
104+
}
105+
}
106+
107+
RelaxedStore(
108+
&PoolActorRegistrations,
109+
std::max(RelaxedLoad(&PoolActorRegistrations), RelaxedLoad(&other.PoolActorRegistrations)));
110+
RelaxedStore(
111+
&PoolDestroyedActors,
112+
std::max(RelaxedLoad(&PoolDestroyedActors), RelaxedLoad(&other.PoolDestroyedActors)));
113+
RelaxedStore(
114+
&PoolAllocatedMailboxes,
115+
std::max(RelaxedLoad(&PoolAllocatedMailboxes), RelaxedLoad(&other.PoolAllocatedMailboxes)));
116+
}
117+
118+
size_t TExecutorThreadStats::MaxActivityType() const {
119+
return ActorsAliveByActivity.size();
120+
}
121+
122+
}

ydb/library/actors/core/mon_stats.h

Lines changed: 8 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,7 @@
88

99
namespace NActors {
1010
struct TLogHistogram : public NMonitoring::IHistogramSnapshot {
11-
TLogHistogram() {
12-
memset(Buckets, 0, sizeof(Buckets));
13-
}
11+
TLogHistogram();
1412

1513
inline void Add(ui64 val, ui64 inc = 1) {
1614
size_t ind = 0;
@@ -29,31 +27,14 @@ namespace NActors {
2927
RelaxedStore(&Buckets[ind], RelaxedLoad(&Buckets[ind]) + inc);
3028
}
3129

32-
void Aggregate(const TLogHistogram& other) {
33-
const ui64 inc = RelaxedLoad(&other.TotalSamples);
34-
RelaxedStore(&TotalSamples, RelaxedLoad(&TotalSamples) + inc);
35-
for (size_t i = 0; i < Y_ARRAY_SIZE(Buckets); ++i) {
36-
Buckets[i] += RelaxedLoad(&other.Buckets[i]);
37-
}
38-
}
30+
void Aggregate(const TLogHistogram& other);
3931

4032
// IHistogramSnapshot
41-
ui32 Count() const override {
42-
return Y_ARRAY_SIZE(Buckets);
43-
}
33+
ui32 Count() const override;
4434

45-
NMonitoring::TBucketBound UpperBound(ui32 index) const override {
46-
Y_ASSERT(index < Y_ARRAY_SIZE(Buckets));
47-
if (index == 0) {
48-
return 1;
49-
}
50-
return NMonitoring::TBucketBound(1ull << (index - 1)) * 2.0;
51-
}
35+
NMonitoring::TBucketBound UpperBound(ui32 index) const override;
5236

53-
NMonitoring::TBucketValue Value(ui32 index) const override {
54-
Y_ASSERT(index < Y_ARRAY_SIZE(Buckets));
55-
return Buckets[index];
56-
}
37+
NMonitoring::TBucketValue Value(ui32 index) const override;
5738

5839
ui64 TotalSamples = 0;
5940
ui64 Buckets[65];
@@ -126,85 +107,11 @@ namespace NActors {
126107
ui64 MailboxPushedOutByEventCount = 0;
127108
ui64 NotEnoughCpuExecutions = 0;
128109

129-
TExecutorThreadStats() // must be not empty as 0 used as default
130-
: ElapsedTicksByActivity(TLocalProcessKeyStateIndexLimiter::GetMaxKeysCount())
131-
, ReceivedEventsByActivity(TLocalProcessKeyStateIndexLimiter::GetMaxKeysCount())
132-
, ActorsAliveByActivity(TLocalProcessKeyStateIndexLimiter::GetMaxKeysCount())
133-
, ScheduledEventsByActivity(TLocalProcessKeyStateIndexLimiter::GetMaxKeysCount())
134-
, StuckActorsByActivity(TLocalProcessKeyStateIndexLimiter::GetMaxKeysCount())
135-
, UsageByActivity(TLocalProcessKeyStateIndexLimiter::GetMaxKeysCount())
136-
{}
137-
138-
template <typename T>
139-
static void AggregateOne(TVector<T>& self, const TVector<T>& other) {
140-
const size_t selfSize = self.size();
141-
const size_t otherSize = other.size();
142-
if (selfSize < otherSize)
143-
self.resize(otherSize);
144-
for (size_t at = 0; at < otherSize; ++at)
145-
self[at] += RelaxedLoad(&other[at]);
146-
}
110+
TExecutorThreadStats();
147111

148-
void Aggregate(const TExecutorThreadStats& other) {
149-
SentEvents += RelaxedLoad(&other.SentEvents);
150-
ReceivedEvents += RelaxedLoad(&other.ReceivedEvents);
151-
PreemptedEvents += RelaxedLoad(&other.PreemptedEvents);
152-
NonDeliveredEvents += RelaxedLoad(&other.NonDeliveredEvents);
153-
EmptyMailboxActivation += RelaxedLoad(&other.EmptyMailboxActivation);
154-
CpuUs += RelaxedLoad(&other.CpuUs);
155-
SafeElapsedTicks += RelaxedLoad(&other.SafeElapsedTicks);
156-
RelaxedStore(
157-
&WorstActivationTimeUs,
158-
std::max(RelaxedLoad(&WorstActivationTimeUs), RelaxedLoad(&other.WorstActivationTimeUs)));
159-
ElapsedTicks += RelaxedLoad(&other.ElapsedTicks);
160-
ParkedTicks += RelaxedLoad(&other.ParkedTicks);
161-
BlockedTicks += RelaxedLoad(&other.BlockedTicks);
162-
MailboxPushedOutByTailSending += RelaxedLoad(&other.MailboxPushedOutByTailSending);
163-
MailboxPushedOutBySoftPreemption += RelaxedLoad(&other.MailboxPushedOutBySoftPreemption);
164-
MailboxPushedOutByTime += RelaxedLoad(&other.MailboxPushedOutByTime);
165-
MailboxPushedOutByEventCount += RelaxedLoad(&other.MailboxPushedOutByEventCount);
166-
NotEnoughCpuExecutions += RelaxedLoad(&other.NotEnoughCpuExecutions);
167-
168-
ActivationTimeHistogram.Aggregate(other.ActivationTimeHistogram);
169-
EventDeliveryTimeHistogram.Aggregate(other.EventDeliveryTimeHistogram);
170-
EventProcessingCountHistogram.Aggregate(other.EventProcessingCountHistogram);
171-
EventProcessingTimeHistogram.Aggregate(other.EventProcessingTimeHistogram);
172-
173-
AggregateOne(ElapsedTicksByActivity, other.ElapsedTicksByActivity);
174-
AggregateOne(ReceivedEventsByActivity, other.ReceivedEventsByActivity);
175-
AggregateOne(ActorsAliveByActivity, other.ActorsAliveByActivity);
176-
AggregateOne(ScheduledEventsByActivity, other.ScheduledEventsByActivity);
177-
AggregateOne(StuckActorsByActivity, other.StuckActorsByActivity);
178-
if (other.CurrentActivationTime.TimeUs) {
179-
AggregatedCurrentActivationTime.push_back(other.CurrentActivationTime);
180-
}
181-
if (other.AggregatedCurrentActivationTime.size()) {
182-
AggregatedCurrentActivationTime.insert(AggregatedCurrentActivationTime.end(), other.AggregatedCurrentActivationTime.begin(), other.AggregatedCurrentActivationTime.end());
183-
}
184-
185-
if (UsageByActivity.size() < other.UsageByActivity.size()) {
186-
UsageByActivity.resize(other.UsageByActivity.size());
187-
}
188-
for (size_t i = 0; i < UsageByActivity.size(); ++i) {
189-
for (size_t j = 0; j < 10; ++j) {
190-
UsageByActivity[i][j] += RelaxedLoad(&other.UsageByActivity[i][j]);
191-
}
192-
}
193-
194-
RelaxedStore(
195-
&PoolActorRegistrations,
196-
std::max(RelaxedLoad(&PoolActorRegistrations), RelaxedLoad(&other.PoolActorRegistrations)));
197-
RelaxedStore(
198-
&PoolDestroyedActors,
199-
std::max(RelaxedLoad(&PoolDestroyedActors), RelaxedLoad(&other.PoolDestroyedActors)));
200-
RelaxedStore(
201-
&PoolAllocatedMailboxes,
202-
std::max(RelaxedLoad(&PoolAllocatedMailboxes), RelaxedLoad(&other.PoolAllocatedMailboxes)));
203-
}
112+
void Aggregate(const TExecutorThreadStats& other);
204113

205-
size_t MaxActivityType() const {
206-
return ActorsAliveByActivity.size();
207-
}
114+
size_t MaxActivityType() const;
208115
};
209116

210117
}

ydb/library/actors/core/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ SRCS(
7878
mailbox_queue_simple.h
7979
mon.cpp
8080
mon.h
81+
mon_stats.cpp
8182
mon_stats.h
8283
monotonic.cpp
8384
monotonic.h

0 commit comments

Comments
 (0)