Skip to content

Commit d1f9302

Browse files
authored
Introduce blocking ring queue on spinlock (required to add sleepping) (#4786)
1 parent 8a9629b commit d1f9302

File tree

2 files changed

+239
-1
lines changed

2 files changed

+239
-1
lines changed

ydb/library/actors/queues/activation_queue.h

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
#include "defs.h"
44
#include "mpmc_ring_queue.h"
5+
#include "mpmc_ring_queue_blocking.h"
56
#include <atomic>
67

78

@@ -20,7 +21,6 @@ class TRingActivationQueue {
2021
: IsMPSC(isMPSC)
2122
{}
2223

23-
2424
void Push(ui32 activation, ui64 revolvingCounter) {
2525
if (!IsNeedToWriteToOldQueue.load(std::memory_order_acquire)) {
2626
if (ActivationQueue.TryPush(activation)) {
@@ -57,4 +57,34 @@ class TRingActivationQueue {
5757

5858
};
5959

60+
class TBlockingActivationQueue {
61+
NThreading::TPadded<TMPMCBlockingRingQueue<20>> ActivationQueue;
62+
63+
public:
64+
TBlockingActivationQueue(bool)
65+
{}
66+
67+
68+
void Push(ui32 activation, ui64) {
69+
for (;;) {
70+
if (ActivationQueue.TryPush(activation)) {
71+
return;
72+
}
73+
}
74+
}
75+
76+
ui32 Pop(ui64) {
77+
std::optional<ui32> activation;
78+
activation = ActivationQueue.TryPop();
79+
if (activation) {
80+
return *activation;
81+
}
82+
return 0;
83+
}
84+
85+
void Stop() {
86+
ActivationQueue.Stop();
87+
}
88+
};
89+
6090
} // NActors
Lines changed: 208 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,208 @@
1+
#pragma once
2+
#include "defs.h"
3+
4+
#include <library/cpp/threading/chunk_queue/queue.h>
5+
6+
#include <atomic>
7+
#include <optional>
8+
9+
namespace NActors {
10+
11+
template <ui32 MaxSizeBits, ui32 ThreadBits=8>
12+
struct TMPMCBlockingRingQueue {
13+
static constexpr std::optional<ui32> TestSize = std::nullopt;
14+
static constexpr ui32 MaxSize = 1 << MaxSizeBits;
15+
static constexpr ui32 MaxThreadCount = 1 << ThreadBits;
16+
17+
struct TClaimedSlot {
18+
ui64 Slot;
19+
ui64 Generation;
20+
ui64 Head;
21+
std::optional<ui32> Value = std::nullopt;
22+
bool ByClaim = false;
23+
};
24+
25+
struct alignas(ui64) TSlot {
26+
static constexpr ui64 EmptyBit = 1ull << 63;
27+
static constexpr ui64 ClaimBit = 1ull << 62;
28+
static constexpr ui64 InternalBits = EmptyBit | ClaimBit;
29+
ui64 Generation = 0;
30+
ui64 Value = 0;
31+
bool IsEmpty = true;
32+
bool IsClaim = false;
33+
34+
static constexpr ui64 MakeEmpty(ui64 generation) {
35+
return EmptyBit | generation;
36+
}
37+
38+
static constexpr ui64 MakeClaim(ui64 generation) {
39+
return EmptyBit | ClaimBit | generation;
40+
}
41+
42+
static constexpr TSlot Recognise(ui64 slotValue) {
43+
if (slotValue & ClaimBit) {
44+
return {.Generation = (~InternalBits & slotValue), .IsEmpty=true, .IsClaim=true};
45+
}
46+
if (slotValue & EmptyBit) {
47+
return {.Generation = (~InternalBits & slotValue), .IsEmpty=true, .IsClaim=false};
48+
}
49+
return {.Value= (~InternalBits & slotValue), .IsEmpty=false, .IsClaim=false};
50+
}
51+
};
52+
53+
NThreading::TPadded<std::atomic<ui64>> Tail{0};
54+
NThreading::TPadded<std::atomic<ui64>> Head{0};
55+
NThreading::TPadded<TArrayHolder<std::atomic<ui64>>> Buffer;
56+
static constexpr ui32 ConvertIdx(ui32 idx) {
57+
idx = idx % MaxSize;
58+
if constexpr (TestSize && *TestSize < 0x100) {
59+
return idx;
60+
}
61+
62+
// 0, 16, 32, .., 240,
63+
// 1, 17, 33, .., 241,
64+
// ...
65+
// 15, 31, 63, ..., 255,
66+
// 16-32 cache lines
67+
return (idx & ~0xff) | ((idx & 0xf) << 4) | ((idx >> 4) & 0xf);
68+
}
69+
70+
NThreading::TPadded<std::atomic_bool> StopFlag = false;
71+
72+
TMPMCBlockingRingQueue()
73+
: Buffer(new std::atomic<ui64>[MaxSize])
74+
{
75+
for (ui32 idx = 0; idx < MaxSize; ++idx) {
76+
Buffer[idx] = TSlot::MakeEmpty(0);
77+
}
78+
}
79+
80+
~TMPMCBlockingRingQueue() {
81+
}
82+
83+
bool TryPush(ui32 val) {
84+
for (;;) {
85+
ui64 currentTail = Tail.fetch_add(1, std::memory_order_relaxed);
86+
ui32 generation = currentTail / MaxSize;
87+
88+
std::atomic<ui64> &currentSlot = Buffer[ConvertIdx(currentTail)];
89+
TSlot slot;
90+
ui64 expected = TSlot::MakeEmpty(generation);
91+
do {
92+
if (currentSlot.compare_exchange_strong(expected, val)) {
93+
return true;
94+
}
95+
slot = TSlot::Recognise(expected);
96+
} while (slot.Generation <= generation && slot.IsEmpty);
97+
98+
if (!slot.IsEmpty) {
99+
ui64 currentHead = Head.load(std::memory_order_acquire);
100+
if (currentHead + MaxSize <= currentTail + std::min<ui64>(1024, MaxSize - 1)) {
101+
return false;
102+
}
103+
}
104+
}
105+
}
106+
107+
void TryMoveTail(ui64 currentHead, ui64 currentTail) {
108+
while (currentTail <= currentHead) {
109+
if (Tail.compare_exchange_weak(currentTail, currentHead + 1)) {
110+
return;
111+
}
112+
}
113+
}
114+
115+
TClaimedSlot ClaimSlot() {
116+
for (;;) {
117+
ui64 currentHead = Head.fetch_add(1, std::memory_order_relaxed);
118+
ui32 idx = ConvertIdx(currentHead);
119+
ui32 generation = currentHead / MaxSize;
120+
auto &currentSlot = Buffer[idx];
121+
ui64 expected = currentSlot.load(std::memory_order_relaxed);
122+
TSlot expectedSlot = TSlot::Recognise(expected);
123+
if (!expectedSlot.IsEmpty) {
124+
if (currentSlot.compare_exchange_strong(expected, TSlot::MakeEmpty(generation + 1))) {
125+
return {idx, generation, currentHead, expectedSlot.Value, false};
126+
}
127+
SpinLockPause();
128+
continue;
129+
}
130+
131+
if (expectedSlot.Generation > generation) {
132+
SpinLockPause();
133+
continue;
134+
}
135+
136+
if (currentSlot.compare_exchange_strong(expected, TSlot::MakeClaim(generation))) {
137+
return {idx, generation, currentHead};
138+
}
139+
140+
expectedSlot = TSlot::Recognise(expected);
141+
if (!expectedSlot.IsEmpty) {
142+
if (currentSlot.compare_exchange_strong(expected, TSlot::MakeEmpty(generation + 1))) {
143+
return {idx, generation, currentHead, expectedSlot.Value, false};
144+
}
145+
}
146+
SpinLockPause();
147+
}
148+
}
149+
150+
std::optional<ui32> CheckSlot(TClaimedSlot &claimedSlot) {
151+
auto &currentSlot = Buffer[claimedSlot.Slot];
152+
ui64 expected = currentSlot.load(std::memory_order_acquire);
153+
auto slot = TSlot::Recognise(expected);
154+
if (slot.IsEmpty && slot.Generation <= claimedSlot.Generation) {
155+
return std::nullopt;
156+
} else if (slot.IsEmpty) {
157+
claimedSlot = ClaimSlot();
158+
if (claimedSlot.Value) {
159+
return claimedSlot.Value;
160+
}
161+
return std::nullopt;
162+
}
163+
if (currentSlot.compare_exchange_strong(expected, TSlot::MakeEmpty(claimedSlot.Generation + 1))) {
164+
return slot.Value;
165+
}
166+
return std::nullopt;
167+
}
168+
169+
std::optional<ui32> ForbidSlot(TClaimedSlot claimedSlot) {
170+
ui64 expected = TSlot::MakeClaim(claimedSlot.Generation);
171+
auto &currentSlot = Buffer[claimedSlot.Slot];
172+
if (currentSlot.compare_exchange_strong(expected, TSlot::MakeEmpty(claimedSlot.Generation + 1))) {
173+
TryMoveTail(claimedSlot.Head, Tail.load(std::memory_order_acquire));
174+
return std::nullopt;
175+
}
176+
auto slot = TSlot::Recognise(expected);
177+
if (slot.IsEmpty) {
178+
return std::nullopt;
179+
}
180+
if (currentSlot.compare_exchange_strong(expected, TSlot::MakeEmpty(claimedSlot.Generation + 1))) {
181+
return slot.Value;
182+
}
183+
return std::nullopt;
184+
}
185+
186+
std::optional<ui32> TryPop() {
187+
TClaimedSlot claimedSlot = ClaimSlot();
188+
if (claimedSlot.Value) {
189+
return claimedSlot.Value;
190+
}
191+
for (;;) {
192+
if (auto read = CheckSlot(claimedSlot)) {
193+
return read;
194+
}
195+
if (StopFlag.load(std::memory_order_acquire)) {
196+
break;
197+
}
198+
SpinLockPause();
199+
}
200+
return ForbidSlot(claimedSlot);
201+
}
202+
203+
void Stop() {
204+
StopFlag.store(true, std::memory_order_release);
205+
}
206+
};
207+
208+
} // NActors

0 commit comments

Comments
 (0)