4
4
#include < ydb/core/blobstorage/lwtrace_probes/blobstorage_probes.h>
5
5
#include < ydb/core/mon/mon.h>
6
6
#include < ydb/core/protos/node_whiteboard.pb.h>
7
- #include < ydb/core/util/light.h>
8
7
9
8
#include < library/cpp/bucket_quoter/bucket_quoter.h>
10
9
#include < library/cpp/containers/stack_vector/stack_vec.h>
@@ -15,6 +14,270 @@ namespace NKikimr {
15
14
16
15
struct TPDiskConfig ;
17
16
17
+ inline NHPTimer::STime HPNow () {
18
+ NHPTimer::STime ret;
19
+ GetTimeFast (&ret);
20
+ return ret;
21
+ }
22
+
23
+ inline double HPSecondsFloat (i64 cycles) {
24
+ if (cycles > 0 ) {
25
+ return double (cycles) / NHPTimer::GetClockRate ();
26
+ } else {
27
+ return 0.0 ;
28
+ }
29
+ }
30
+
31
+ inline double HPMilliSecondsFloat (i64 cycles) {
32
+ if (cycles > 0 ) {
33
+ return double (cycles) * 1000.0 / NHPTimer::GetClockRate ();
34
+ } else {
35
+ return 0 ;
36
+ }
37
+ }
38
+
39
+ inline ui64 HPMilliSeconds (i64 cycles) {
40
+ return (ui64)HPMilliSecondsFloat (cycles);
41
+ }
42
+
43
+ inline ui64 HPMicroSecondsFloat (i64 cycles) {
44
+ if (cycles > 0 ) {
45
+ return double (cycles) * 1000000.0 / NHPTimer::GetClockRate ();
46
+ } else {
47
+ return 0 ;
48
+ }
49
+ }
50
+
51
+ inline ui64 HPMicroSeconds (i64 cycles) {
52
+ return (ui64)HPMicroSecondsFloat (cycles);
53
+ }
54
+
55
+ inline ui64 HPNanoSeconds (i64 cycles) {
56
+ if (cycles > 0 ) {
57
+ return ui64 (double (cycles) * 1000000000.0 / NHPTimer::GetClockRate ());
58
+ } else {
59
+ return 0 ;
60
+ }
61
+ }
62
+
63
+ inline ui64 HPCyclesNs (ui64 ns) {
64
+ return ui64 (NHPTimer::GetClockRate () * double (ns) / 1000000000.0 );
65
+ }
66
+
67
+ inline ui64 HPCyclesUs (ui64 us) {
68
+ return ui64 (NHPTimer::GetClockRate () * double (us) / 1000000.0 );
69
+ }
70
+
71
+ inline ui64 HPCyclesMs (ui64 ms) {
72
+ return ui64 (NHPTimer::GetClockRate () * double (ms) / 1000.0 );
73
+ }
74
+
75
+ class TLightBase {
76
+ protected:
77
+ TString Name;
78
+ ::NMonitoring::TDynamicCounters::TCounterPtr State; // Current state (0=OFF=green, 1=ON=red)
79
+ ::NMonitoring::TDynamicCounters::TCounterPtr Count; // Number of switches to ON state
80
+ ::NMonitoring::TDynamicCounters::TCounterPtr RedMs; // Time elapsed in ON state
81
+ ::NMonitoring::TDynamicCounters::TCounterPtr GreenMs; // Time elapsed in OFF state
82
+ private:
83
+ ui64 RedCycles = 0 ;
84
+ ui64 GreenCycles = 0 ;
85
+ NHPTimer::STime AdvancedTill = 0 ;
86
+ NHPTimer::STime LastNow = 0 ;
87
+ ui64 UpdateThreshold = 0 ;
88
+ public:
89
+ void Initialize (TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters, const TString& name) {
90
+ Name = name;
91
+ State = counters->GetCounter (name + " _state" );
92
+ Count = counters->GetCounter (name + " _count" , true );
93
+ RedMs = counters->GetCounter (name + " _redMs" , true );
94
+ GreenMs = counters->GetCounter (name + " _greenMs" , true );
95
+ UpdateThreshold = HPCyclesMs (100 );
96
+ AdvancedTill = Now ();
97
+ }
98
+
99
+ void Initialize (TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters, const TString& countName,
100
+ const TString& redMsName,const TString& greenMsName) {
101
+ Count = counters->GetCounter (countName, true );
102
+ RedMs = counters->GetCounter (redMsName, true );
103
+ GreenMs = counters->GetCounter (greenMsName, true );
104
+ UpdateThreshold = HPCyclesMs (100 );
105
+ AdvancedTill = Now ();
106
+ }
107
+
108
+ ui64 GetCount () const {
109
+ return *Count;
110
+ }
111
+
112
+ ui64 GetRedMs () const {
113
+ return *RedMs;
114
+ }
115
+
116
+ ui64 GetGreenMs () const {
117
+ return *GreenMs;
118
+ }
119
+ protected:
120
+ void Modify (bool state, bool prevState) {
121
+ if (state && !prevState) { // Switched to ON state
122
+ if (State) {
123
+ *State = true ;
124
+ }
125
+ (*Count)++;
126
+ return ;
127
+ }
128
+ if (!state && prevState) { // Switched to OFF state
129
+ if (State) {
130
+ *State = false ;
131
+ }
132
+ return ;
133
+ }
134
+ }
135
+
136
+ void Advance (bool state, NHPTimer::STime now) {
137
+ if (now == AdvancedTill) {
138
+ return ;
139
+ }
140
+ Elapsed (state, now - AdvancedTill);
141
+ if (RedCycles > UpdateThreshold) {
142
+ *RedMs += CutMs (RedCycles);
143
+ }
144
+ if (GreenCycles > UpdateThreshold) {
145
+ *GreenMs += CutMs (GreenCycles);
146
+ }
147
+ AdvancedTill = now;
148
+ }
149
+
150
+ NHPTimer::STime Now () {
151
+ // Avoid time going backwards
152
+ NHPTimer::STime now = HPNow ();
153
+ if (now < LastNow) {
154
+ now = LastNow;
155
+ }
156
+ LastNow = now;
157
+ return now;
158
+ }
159
+ private:
160
+ void Elapsed (bool state, ui64 cycles) {
161
+ if (state) {
162
+ RedCycles += cycles;
163
+ } else {
164
+ GreenCycles += cycles;
165
+ }
166
+ }
167
+
168
+ ui64 CutMs (ui64& src) {
169
+ ui64 ms = HPMilliSeconds (src);
170
+ ui64 cycles = HPCyclesMs (ms);
171
+ src -= cycles;
172
+ return ms;
173
+ }
174
+ };
175
+
176
+ // Thread-safe light
177
+ class TLight : public TLightBase {
178
+ private:
179
+ struct TItem {
180
+ bool State;
181
+ bool Filled;
182
+ TItem (bool state = false , bool filled = false )
183
+ : State(state)
184
+ , Filled(filled)
185
+ {}
186
+ };
187
+
188
+ // Cyclic buffer to enforce event ordering by seqno
189
+ TSpinLock Lock;
190
+ size_t HeadIdx = 0 ; // Index of current state
191
+ size_t FilledCount = 0 ;
192
+ ui16 Seqno = 0 ; // Current seqno
193
+ TStackVec<TItem, 32 > Data; // In theory should have not more than thread count items
194
+ public:
195
+ TLight () {
196
+ InitData ();
197
+ }
198
+
199
+ void Set (bool state, ui16 seqno) {
200
+ TGuard<TSpinLock> g (Lock);
201
+ Push (state, seqno);
202
+ bool prevState;
203
+ // Note that 'state' variable is being reused
204
+ NHPTimer::STime now = Now ();
205
+ while (Pop (state, prevState)) {
206
+ Modify (state, prevState);
207
+ Advance (prevState, now);
208
+ }
209
+ }
210
+
211
+ void Update () {
212
+ TGuard<TSpinLock> g (Lock);
213
+ Advance (Data[HeadIdx].State , Now ());
214
+ }
215
+
216
+ private:
217
+ void InitData (bool state = false , bool filled = false ) {
218
+ Data.clear ();
219
+ Data.emplace_back (state, filled);
220
+ Data.resize (32 );
221
+ HeadIdx = 0 ;
222
+ }
223
+
224
+ void Push (bool state, ui16 seqno) {
225
+ FilledCount++;
226
+ if (FilledCount == 1 ) { // First event must initialize seqno
227
+ Seqno = seqno;
228
+ InitData (state, true );
229
+ if (state) {
230
+ Modify (true , false );
231
+ }
232
+ return ;
233
+ }
234
+ Y_ABORT_UNLESS (seqno != Seqno, " ordering overflow or duplicate event headSeqno# %d seqno# %d state# %d filled# %d" ,
235
+ (int )Seqno, (int )seqno, (int )state, (int )CountFilled ());
236
+ ui16 diff = seqno;
237
+ diff -= Seqno; // Underflow is fine
238
+ size_t size = Data.size ();
239
+ if (size <= diff) { // Buffer is full -- extend and move wrapped part
240
+ Data.resize (size * 2 );
241
+ for (size_t i = 0 ; i < HeadIdx; i++) {
242
+ Data[size + i] = Data[i];
243
+ Data[i].Filled = false ;
244
+ }
245
+ }
246
+ TItem& item = Data[(HeadIdx + diff) % Data.size ()];
247
+ Y_ABORT_UNLESS (!item.Filled , " ordering overflow or duplicate event headSeqno# %d seqno# %d state# %d filled# %d" ,
248
+ (int )Seqno, (int )seqno, (int )state, (int )CountFilled ());
249
+ item.Filled = true ;
250
+ item.State = state;
251
+ }
252
+
253
+ bool Pop (bool & state, bool & prevState) {
254
+ size_t nextIdx = (HeadIdx + 1 ) % Data.size ();
255
+ TItem& head = Data[HeadIdx];
256
+ TItem& next = Data[nextIdx];
257
+ if (!head.Filled || !next.Filled ) {
258
+ return false ;
259
+ }
260
+ state = next.State ;
261
+ prevState = head.State ;
262
+ head.Filled = false ;
263
+ HeadIdx = nextIdx;
264
+ Seqno++; // Overflow is fine
265
+ FilledCount--;
266
+ if (FilledCount == 1 && Data.size () > 32 ) {
267
+ InitData (state, true );
268
+ }
269
+ return true ;
270
+ }
271
+
272
+ size_t CountFilled () const {
273
+ size_t ret = 0 ;
274
+ for (const TItem& item : Data) {
275
+ ret += item.Filled ;
276
+ }
277
+ return ret;
278
+ }
279
+ };
280
+
18
281
class TBurstmeter {
19
282
private:
20
283
TBucketQuoter<i64 , TSpinLock, THPTimerUs> Bucket;
0 commit comments