Skip to content

Commit 2c601e0

Browse files
authored
Merge 750792d into 4ab249f
2 parents 4ab249f + 750792d commit 2c601e0

File tree

8 files changed

+631
-21
lines changed

8 files changed

+631
-21
lines changed

ydb/core/driver_lib/run/kikimr_services_initializers.cpp

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1089,10 +1089,7 @@ void TSharedCacheInitializer::InitializeServices(
10891089
config->TotalAsyncQueueInFlyLimit = cfg.GetAsyncQueueInFlyLimit();
10901090
config->TotalScanQueueInFlyLimit = cfg.GetScanQueueInFlyLimit();
10911091
config->ReplacementPolicy = cfg.GetReplacementPolicy();
1092-
1093-
if (cfg.HasActivePagesReservationPercent()) {
1094-
config->ActivePagesReservationPercent = cfg.GetActivePagesReservationPercent();
1095-
}
1092+
config->ActivePagesReservationPercent = cfg.GetActivePagesReservationPercent();
10961093

10971094
TIntrusivePtr<::NMonitoring::TDynamicCounters> tabletGroup = GetServiceCounters(appData->Counters, "tablets");
10981095
TIntrusivePtr<::NMonitoring::TDynamicCounters> sausageGroup = tabletGroup->GetSubgroup("type", "S_CACHE");

ydb/core/protos/shared_cache.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ option java_package = "ru.yandex.kikimr.proto";
33

44
enum TReplacementPolicy {
55
ThreeLeveledLRU = 0;
6+
S3FIFO = 1;
67
}
78

89
message TSharedCacheConfig {
Lines changed: 358 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,358 @@
1+
#pragma once
2+
#include "defs.h"
3+
#include <ydb/core/util/cache_cache_iface.h>
4+
#include <ydb/library/yverify_stream/yverify_stream.h>
5+
#include <library/cpp/monlib/counters/counters.h>
6+
#include <library/cpp/monlib/dynamic_counters/counters.h>
7+
#include <util/generic/ptr.h>
8+
#include <util/generic/intrlist.h>
9+
10+
namespace NKikimr::NCache {
11+
12+
// TODO: remove template args and make some page base class
13+
14+
template <typename TPageKey
15+
, typename TPageKeyHash
16+
, typename TPageKeyEqual>
17+
class TTS3FIFOGhostQueue {
18+
struct TGhostPage {
19+
TPageKey Key;
20+
ui64 Size; // zero size is tombstone
21+
22+
TGhostPage(const TPageKey& key, ui64 size)
23+
: Key(key)
24+
, Size(size)
25+
{}
26+
};
27+
28+
struct TGhostPageHash {
29+
inline size_t operator()(const TGhostPage* ghost) const {
30+
return TPageKeyHash()(ghost->Key);
31+
}
32+
};
33+
34+
struct TGhostPageEqual {
35+
inline bool operator()(const TGhostPage* left, const TGhostPage* right) const {
36+
return TPageKeyEqual()(left->Key, right->Key);
37+
}
38+
};
39+
40+
public:
41+
TTS3FIFOGhostQueue(ui64 limit)
42+
: Limit(limit)
43+
{}
44+
45+
void Add(const TPageKey& key, ui64 size) {
46+
if (Y_UNLIKELY(size == 0)) {
47+
Y_DEBUG_ABORT_S("Empty " << key.ToString() << " page");
48+
return;
49+
}
50+
51+
TGhostPage* ghost = &GhostsQueue.emplace_back(key, size);
52+
if (Y_UNLIKELY(!GhostsSet.emplace(ghost).second)) {
53+
GhostsQueue.pop_back();
54+
Y_DEBUG_ABORT_S("Duplicated " << key.ToString() << " page");
55+
}
56+
57+
Size += ghost->Size;
58+
59+
EvictWhileFull();
60+
}
61+
62+
bool Erase(const TPageKey& key, ui64 size) {
63+
TGhostPage key_(key, size);
64+
if (auto it = GhostsSet.find(&key_); it != GhostsSet.end()) {
65+
TGhostPage* ghost = const_cast<TGhostPage*>(*it);
66+
Y_DEBUG_ABORT_UNLESS(ghost->Size == size);
67+
Y_ABORT_UNLESS(Size >= ghost->Size);
68+
Size -= ghost->Size;
69+
ghost->Size = 0;// mark as deleted
70+
GhostsSet.erase(it);
71+
return true;
72+
}
73+
return false;
74+
}
75+
76+
void UpdateLimit(ui64 limit) {
77+
Limit = limit;
78+
EvictWhileFull();
79+
}
80+
81+
TString Dump() const {
82+
TStringBuilder result;
83+
size_t count = 0;
84+
ui64 size = 0;
85+
for (auto it = GhostsQueue.begin(); it != GhostsQueue.end(); it++) {
86+
const TGhostPage* ghost = &*it;
87+
if (ghost->Size) { // isn't deleted
88+
Y_DEBUG_ABORT_UNLESS(GhostsSet.contains(ghost));
89+
if (count != 0) result << ", ";
90+
result << "{" << ghost->Key.ToString() << " " << ghost->Size << "b}";
91+
count++;
92+
size += ghost->Size;
93+
}
94+
}
95+
Y_DEBUG_ABORT_UNLESS(GhostsSet.size() == count);
96+
Y_DEBUG_ABORT_UNLESS(Size == size);
97+
return result;
98+
}
99+
100+
private:
101+
void EvictWhileFull() {
102+
while (!GhostsQueue.empty() && Size > Limit) {
103+
TGhostPage* ghost = &GhostsQueue.front();
104+
if (ghost->Size) { // isn't deleted
105+
Y_ABORT_UNLESS(Size >= ghost->Size);
106+
Size -= ghost->Size;
107+
Y_DEBUG_ABORT_UNLESS(GhostsSet.erase(ghost));
108+
}
109+
GhostsQueue.pop_front();
110+
}
111+
}
112+
113+
ui64 Limit;
114+
ui64 Size = 0;
115+
THashSet<TGhostPage*, TGhostPageHash, TGhostPageEqual> GhostsSet;
116+
TDeque<TGhostPage> GhostsQueue;
117+
};
118+
119+
template <typename TPage
120+
, typename TPageKey
121+
, typename TPageKeyHash
122+
, typename TPageKeyEqual
123+
, typename TPageSize
124+
, typename TPageLocation
125+
, typename TPageFrequency
126+
>
127+
class TS3FIFOCache : public ICacheCache<TPage> {
128+
enum class ELocation {
129+
None,
130+
SmallQueue,
131+
MainQueue
132+
};
133+
134+
struct TLimit {
135+
ui64 SmallQueueLimit;
136+
ui64 MainQueueLimit;
137+
138+
TLimit(ui64 limit)
139+
: SmallQueueLimit(limit / 10)
140+
, MainQueueLimit(limit - SmallQueueLimit)
141+
{}
142+
};
143+
144+
struct TQueue {
145+
ELocation Location;
146+
TIntrusiveList<TPage> Queue;
147+
ui64 Size = 0;
148+
};
149+
150+
public:
151+
TS3FIFOCache(ui64 limit)
152+
: Limit(limit)
153+
, SmallQueue(ELocation::SmallQueue)
154+
, MainQueue(ELocation::MainQueue)
155+
, GhostQueue(limit)
156+
{}
157+
158+
TPage* EvictNext() override {
159+
if (SmallQueue.Queue.Empty() && MainQueue.Queue.Empty()) {
160+
return nullptr;
161+
}
162+
163+
// TODO: account passive pages inside the cache
164+
TLimit savedLimit = std::exchange(Limit, TLimit(SmallQueue.Size + MainQueue.Size - 1));
165+
166+
TPage* evictedPage = EvictOneIfFull();
167+
Y_DEBUG_ABORT_UNLESS(evictedPage);
168+
169+
Limit = savedLimit;
170+
171+
return evictedPage;
172+
}
173+
174+
TIntrusiveList<TPage> Touch(TPage* page) override {
175+
const ELocation location = GetLocation(page);
176+
switch (location) {
177+
case ELocation::SmallQueue:
178+
case ELocation::MainQueue: {
179+
TouchFast(page);
180+
return {};
181+
}
182+
case ELocation::None:
183+
return Insert(page);
184+
default:
185+
Y_ABORT("Unknown page location");
186+
}
187+
}
188+
189+
void Erase(TPage* page) override {
190+
const ELocation location = GetLocation(page);
191+
switch (location) {
192+
case ELocation::None:
193+
break;
194+
case ELocation::SmallQueue:
195+
Erase(SmallQueue, page);
196+
break;
197+
case ELocation::MainQueue:
198+
Erase(MainQueue, page);
199+
break;
200+
default:
201+
Y_ABORT("Unknown page location");
202+
}
203+
204+
SetFrequency(page, 0);
205+
EraseGhost(page);
206+
}
207+
208+
void UpdateLimit(ui64 limit) override {
209+
Limit = limit;
210+
GhostQueue.UpdateLimit(limit);
211+
}
212+
213+
TString Dump() const {
214+
TStringBuilder result;
215+
216+
auto dump = [&](const TQueue& queue) {
217+
size_t count = 0;
218+
ui64 size = 0;
219+
for (auto it = queue.Queue.begin(); it != queue.Queue.end(); it++) {
220+
const TPage* page = &*it;
221+
if (count != 0) result << ", ";
222+
result << "{" << GetKey(page).ToString() << " " << GetFrequency(page) << "f " << GetSize(page) << "b}";
223+
count++;
224+
size += GetSize(page);
225+
}
226+
Y_DEBUG_ABORT_UNLESS(queue.Size == size);
227+
};
228+
229+
result << "SmallQueue: ";
230+
dump(SmallQueue);
231+
result << Endl << "MainQueue: ";
232+
dump(MainQueue);
233+
result << Endl << "GhostQueue: ";
234+
result << GhostQueue.Dump();
235+
236+
return result;
237+
}
238+
239+
private:
240+
TPage* EvictOneIfFull() {
241+
while (true) {
242+
if (!SmallQueue.Queue.Empty() && SmallQueue.Size > Limit.SmallQueueLimit) {
243+
TPage* page = Pop(SmallQueue);
244+
if (GetFrequency(page) > 1) { // TODO: why 1?
245+
Push(MainQueue, page);
246+
} else {
247+
AddGhost(page);
248+
return page;
249+
}
250+
} else if (!MainQueue.Queue.Empty() && MainQueue.Size > Limit.MainQueueLimit) {
251+
TPage* page = Pop(MainQueue);
252+
if (ui32 frequency = GetFrequency(page); frequency > 0) {
253+
SetFrequency(page, frequency - 1);
254+
Push(MainQueue, page);
255+
} else {
256+
return page;
257+
}
258+
} else {
259+
break;
260+
}
261+
}
262+
263+
return nullptr;
264+
}
265+
266+
void TouchFast(TPage* page) {
267+
Y_DEBUG_ABORT_UNLESS(GetLocation(page) != ELocation::None);
268+
269+
ui32 frequency = GetFrequency(page);
270+
if (frequency < 3) {
271+
SetFrequency(page, frequency + 1);
272+
}
273+
}
274+
275+
TIntrusiveList<TPage> Insert(TPage* page) {
276+
Y_DEBUG_ABORT_UNLESS(GetLocation(page) == ELocation::None);
277+
278+
Push(EraseGhost(page) ? MainQueue : SmallQueue, page);
279+
SetFrequency(page, 0);
280+
281+
TIntrusiveList<TPage> evictedList;
282+
while (TPage* evictedPage = EvictOneIfFull()) {
283+
evictedList.PushBack(evictedPage);
284+
}
285+
286+
return evictedList;
287+
}
288+
289+
TPage* Pop(TQueue& queue) {
290+
Y_DEBUG_ABORT_UNLESS(!queue.Queue.Empty());
291+
Y_ABORT_UNLESS(GetLocation(queue.Queue.Front()) == queue.Location);
292+
Y_DEBUG_ABORT_UNLESS(queue.Size >= GetSize(queue.Queue.Front()));
293+
294+
TPage* page = queue.Queue.PopFront();
295+
queue.Size -= GetSize(page);
296+
SetLocation(page, ELocation::None);
297+
298+
return page;
299+
}
300+
301+
void Push(TQueue& queue, TPage* page) {
302+
Y_ABORT_UNLESS(GetLocation(page) == ELocation::None);
303+
304+
queue.Queue.PushBack(page);
305+
queue.Size += GetSize(page);
306+
SetLocation(page, queue.Location);
307+
}
308+
309+
void Erase(TQueue& queue, TPage* page) {
310+
Y_ABORT_UNLESS(GetLocation(page) == queue.Location);
311+
Y_DEBUG_ABORT_UNLESS(queue.Size >= GetSize(page));
312+
313+
page->Unlink();
314+
queue.Size -= GetSize(page);
315+
SetLocation(page, ELocation::None);
316+
}
317+
318+
void AddGhost(const TPage* page) {
319+
GhostQueue.Add(GetKey(page), GetSize(page));
320+
}
321+
322+
bool EraseGhost(const TPage* page) {
323+
return GhostQueue.Erase(GetKey(page), GetSize(page));
324+
}
325+
326+
TPageKey GetKey(const TPage* page) const {
327+
return TPageKey::Get(page);
328+
}
329+
330+
ui64 GetSize(const TPage* page) const {
331+
return TPageSize::Get(page);
332+
}
333+
334+
ELocation GetLocation(const TPage* page) const {
335+
return static_cast<ELocation>(TPageLocation::Get(page));
336+
}
337+
338+
void SetLocation(TPage* page, ELocation location) const {
339+
TPageLocation::Set(page, static_cast<ui32>(location));
340+
}
341+
342+
ui32 GetFrequency(const TPage* page) const {
343+
return TPageFrequency::Get(page);
344+
}
345+
346+
void SetFrequency(TPage* page, ui32 frequency) const {
347+
TPageFrequency::Set(page, frequency);
348+
}
349+
350+
private:
351+
TLimit Limit;
352+
TQueue SmallQueue;
353+
TQueue MainQueue;
354+
TTS3FIFOGhostQueue<TPageKey, TPageKeyHash, TPageKeyEqual> GhostQueue;
355+
356+
};
357+
358+
}

0 commit comments

Comments
 (0)