Skip to content

Commit 293e2b3

Browse files
authored
Merge a5af1fe into 6583b1b
2 parents 6583b1b + a5af1fe commit 293e2b3

File tree

11 files changed

+1153
-127
lines changed

11 files changed

+1153
-127
lines changed

ydb/core/driver_lib/run/kikimr_services_initializers.cpp

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1093,10 +1093,7 @@ void TSharedCacheInitializer::InitializeServices(
10931093
config->TotalAsyncQueueInFlyLimit = cfg.GetAsyncQueueInFlyLimit();
10941094
config->TotalScanQueueInFlyLimit = cfg.GetScanQueueInFlyLimit();
10951095
config->ReplacementPolicy = cfg.GetReplacementPolicy();
1096-
1097-
if (cfg.HasActivePagesReservationPercent()) {
1098-
config->ActivePagesReservationPercent = cfg.GetActivePagesReservationPercent();
1099-
}
1096+
config->ActivePagesReservationPercent = cfg.GetActivePagesReservationPercent();
11001097

11011098
TIntrusivePtr<::NMonitoring::TDynamicCounters> tabletGroup = GetServiceCounters(appData->Counters, "tablets");
11021099
TIntrusivePtr<::NMonitoring::TDynamicCounters> sausageGroup = tabletGroup->GetSubgroup("type", "S_CACHE");

ydb/core/protos/shared_cache.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ option java_package = "ru.yandex.kikimr.proto";
33

44
enum TReplacementPolicy {
55
ThreeLeveledLRU = 0;
6+
S3FIFO = 1;
67
}
78

89
message TSharedCacheConfig {

ydb/core/tablet_flat/shared_cache_events.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#include "defs.h"
44
#include "flat_bio_events.h"
55
#include "shared_handle.h"
6+
#include <ydb/core/protos/shared_cache.pb.h>
67

78
#include <util/generic/map.h>
89
#include <util/generic/set.h>
@@ -24,6 +25,7 @@ namespace NSharedCache {
2425
EvRequest,
2526
EvResult,
2627
EvUpdated,
28+
EvReplacementPolicySwitch,
2729

2830
EvEnd
2931

@@ -127,6 +129,16 @@ namespace NSharedCache {
127129

128130
THashMap<TLogoBlobID, TActions> Actions;
129131
};
132+
133+
struct TEvReplacementPolicySwitch : public TEventLocal<TEvReplacementPolicySwitch, EvReplacementPolicySwitch> {
134+
using TReplacementPolicy = NKikimrSharedCache::TReplacementPolicy;
135+
136+
TReplacementPolicy ReplacementPolicy;
137+
138+
TEvReplacementPolicySwitch(TReplacementPolicy replacementPolicy)
139+
: ReplacementPolicy(replacementPolicy)
140+
{}
141+
};
130142
}
131143
}
132144

Lines changed: 342 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,342 @@
1+
#pragma once
2+
#include "defs.h"
3+
#include <ydb/core/util/cache_cache_iface.h>
4+
#include <ydb/library/yverify_stream/yverify_stream.h>
5+
#include <library/cpp/monlib/counters/counters.h>
6+
#include <library/cpp/monlib/dynamic_counters/counters.h>
7+
8+
namespace NKikimr::NCache {
9+
10+
// TODO: remove template args and make some page base class
11+
12+
enum class ES3FIFOPageLocation {
13+
None,
14+
SmallQueue,
15+
MainQueue
16+
};
17+
18+
template <typename TPageKey, typename TPageTraits>
19+
class TS3FIFOGhostPageQueue {
20+
struct TGhostPage {
21+
TPageKey Key;
22+
ui64 Size; // zero size is tombstone
23+
24+
TGhostPage(const TPageKey& key, ui64 size)
25+
: Key(key)
26+
, Size(size)
27+
{}
28+
};
29+
30+
struct TGhostPageHash {
31+
using is_transparent = void;
32+
33+
inline size_t operator()(const TGhostPage* ghost) const {
34+
return ghost->Key.GetHash();
35+
}
36+
37+
inline size_t operator()(const TPageKey& key) const {
38+
return key.GetHash();
39+
}
40+
};
41+
42+
struct TGhostPageEqual {
43+
using is_transparent = void;
44+
45+
inline bool operator()(const TGhostPage* left, const TGhostPage* right) const {
46+
return left->Key == right->Key;
47+
}
48+
49+
inline bool operator()(const TGhostPage* left, const TPageKey& right) const {
50+
return left->Key == right;
51+
}
52+
};
53+
54+
public:
55+
TS3FIFOGhostPageQueue(ui64 limit)
56+
: Limit(limit)
57+
{}
58+
59+
void Add(const TPageKey& key, ui64 size) {
60+
if (Y_UNLIKELY(size == 0)) {
61+
Y_DEBUG_ABORT_S("Empty " << key.ToString() << " page");
62+
return;
63+
}
64+
65+
TGhostPage* ghost = &GhostsQueue.emplace_back(key, size);
66+
if (Y_UNLIKELY(!GhostsSet.emplace(ghost).second)) {
67+
GhostsQueue.pop_back();
68+
Y_DEBUG_ABORT_S("Duplicated " << key.ToString() << " page");
69+
return;
70+
}
71+
72+
Size += ghost->Size;
73+
74+
EvictWhileFull();
75+
}
76+
77+
bool Erase(const TPageKey& key, ui64 size) {
78+
if (auto it = GhostsSet.find(key); it != GhostsSet.end()) {
79+
TGhostPage* ghost = *it;
80+
Y_DEBUG_ABORT_UNLESS(ghost->Size == size);
81+
Y_ABORT_UNLESS(Size >= ghost->Size);
82+
Size -= ghost->Size;
83+
ghost->Size = 0; // mark as deleted
84+
GhostsSet.erase(it);
85+
return true;
86+
}
87+
return false;
88+
}
89+
90+
void UpdateLimit(ui64 limit) {
91+
Limit = limit;
92+
EvictWhileFull();
93+
}
94+
95+
TString Dump() const {
96+
TStringBuilder result;
97+
size_t count = 0;
98+
ui64 size = 0;
99+
for (auto it = GhostsQueue.begin(); it != GhostsQueue.end(); it++) {
100+
const TGhostPage* ghost = &*it;
101+
if (ghost->Size) { // isn't deleted
102+
Y_DEBUG_ABORT_UNLESS(GhostsSet.contains(ghost));
103+
if (count != 0) result << ", ";
104+
result << "{" << ghost->Key.ToString() << " " << ghost->Size << "b}";
105+
count++;
106+
size += ghost->Size;
107+
}
108+
}
109+
Y_DEBUG_ABORT_UNLESS(GhostsSet.size() == count);
110+
Y_DEBUG_ABORT_UNLESS(Size == size);
111+
return result;
112+
}
113+
114+
private:
115+
void EvictWhileFull() {
116+
while (!GhostsQueue.empty() && Size > Limit) {
117+
TGhostPage* ghost = &GhostsQueue.front();
118+
if (ghost->Size) { // isn't deleted
119+
Y_ABORT_UNLESS(Size >= ghost->Size);
120+
Size -= ghost->Size;
121+
bool erased = GhostsSet.erase(ghost);
122+
Y_ABORT_UNLESS(erased);
123+
}
124+
GhostsQueue.pop_front();
125+
}
126+
}
127+
128+
ui64 Limit;
129+
ui64 Size = 0;
130+
// TODO: store ghost withing PageMap
131+
THashSet<TGhostPage*, TGhostPageHash, TGhostPageEqual> GhostsSet;
132+
TDeque<TGhostPage> GhostsQueue;
133+
};
134+
135+
template <typename TPage, typename TPageKey, typename TPageTraits>
136+
class TS3FIFOCache : public ICacheCache<TPage> {
137+
struct TLimit {
138+
ui64 SmallQueueLimit;
139+
ui64 MainQueueLimit;
140+
141+
TLimit(ui64 limit)
142+
: SmallQueueLimit(limit / 10)
143+
, MainQueueLimit(limit - SmallQueueLimit)
144+
{}
145+
};
146+
147+
struct TQueue {
148+
TQueue(ES3FIFOPageLocation location)
149+
: Location(location)
150+
{}
151+
152+
ES3FIFOPageLocation Location;
153+
TIntrusiveList<TPage> Queue;
154+
ui64 Size = 0;
155+
};
156+
157+
public:
158+
TS3FIFOCache(ui64 limit)
159+
: Limit(limit)
160+
, SmallQueue(ES3FIFOPageLocation::SmallQueue)
161+
, MainQueue(ES3FIFOPageLocation::MainQueue)
162+
, GhostQueue(limit)
163+
{}
164+
165+
TPage* EvictNext() override {
166+
if (SmallQueue.Queue.Empty() && MainQueue.Queue.Empty()) {
167+
return nullptr;
168+
}
169+
170+
// TODO: account passive pages inside the cache
171+
TLimit savedLimit = std::exchange(Limit, TLimit(SmallQueue.Size + MainQueue.Size - 1));
172+
173+
TPage* evictedPage = EvictOneIfFull();
174+
Y_DEBUG_ABORT_UNLESS(evictedPage);
175+
176+
Limit = savedLimit;
177+
178+
return evictedPage;
179+
}
180+
181+
TIntrusiveList<TPage> Touch(TPage* page) override {
182+
const ES3FIFOPageLocation location = TPageTraits::GetLocation(page);
183+
switch (location) {
184+
case ES3FIFOPageLocation::SmallQueue:
185+
case ES3FIFOPageLocation::MainQueue: {
186+
TouchFast(page);
187+
return {};
188+
}
189+
case ES3FIFOPageLocation::None:
190+
return Insert(page);
191+
default:
192+
Y_ABORT("Unknown page location");
193+
}
194+
}
195+
196+
void Erase(TPage* page) override {
197+
const ES3FIFOPageLocation location = TPageTraits::GetLocation(page);
198+
switch (location) {
199+
case ES3FIFOPageLocation::None:
200+
EraseGhost(page);
201+
break;
202+
case ES3FIFOPageLocation::SmallQueue:
203+
Erase(SmallQueue, page);
204+
break;
205+
case ES3FIFOPageLocation::MainQueue:
206+
Erase(MainQueue, page);
207+
break;
208+
default:
209+
Y_ABORT("Unknown page location");
210+
}
211+
212+
TPageTraits::SetFrequency(page, 0);
213+
}
214+
215+
void UpdateLimit(ui64 limit) override {
216+
Limit = limit;
217+
GhostQueue.UpdateLimit(limit);
218+
}
219+
220+
TString Dump() const {
221+
TStringBuilder result;
222+
223+
auto dump = [&](const TQueue& queue) {
224+
size_t count = 0;
225+
ui64 size = 0;
226+
for (auto it = queue.Queue.begin(); it != queue.Queue.end(); it++) {
227+
const TPage* page = &*it;
228+
if (count != 0) result << ", ";
229+
result << "{" << TPageTraits::GetKey(page).ToString() << " " << TPageTraits::GetFrequency(page) << "f " << TPageTraits::GetSize(page) << "b}";
230+
count++;
231+
size += TPageTraits::GetSize(page);
232+
}
233+
Y_DEBUG_ABORT_UNLESS(queue.Size == size);
234+
};
235+
236+
result << "SmallQueue: ";
237+
dump(SmallQueue);
238+
result << Endl << "MainQueue: ";
239+
dump(MainQueue);
240+
result << Endl << "GhostQueue: ";
241+
result << GhostQueue.Dump();
242+
243+
return result;
244+
}
245+
246+
private:
247+
TPage* EvictOneIfFull() {
248+
while (true) {
249+
if (!SmallQueue.Queue.Empty() && SmallQueue.Size > Limit.SmallQueueLimit) {
250+
TPage* page = Pop(SmallQueue);
251+
if (ui32 frequency = TPageTraits::GetFrequency(page); frequency > 1) { // load inserts, first read touches, second read touches
252+
Push(MainQueue, page);
253+
} else {
254+
if (frequency) TPageTraits::SetFrequency(page, 0);
255+
AddGhost(page);
256+
return page;
257+
}
258+
} else if (!MainQueue.Queue.Empty() && MainQueue.Size > Limit.MainQueueLimit) {
259+
TPage* page = Pop(MainQueue);
260+
if (ui32 frequency = TPageTraits::GetFrequency(page); frequency > 0) {
261+
TPageTraits::SetFrequency(page, frequency - 1);
262+
Push(MainQueue, page);
263+
} else {
264+
return page;
265+
}
266+
} else {
267+
break;
268+
}
269+
}
270+
271+
return nullptr;
272+
}
273+
274+
void TouchFast(TPage* page) {
275+
Y_DEBUG_ABORT_UNLESS(TPageTraits::GetLocation(page) != ES3FIFOPageLocation::None);
276+
277+
ui32 frequency = TPageTraits::GetFrequency(page);
278+
if (frequency < 3) {
279+
TPageTraits::SetFrequency(page, frequency + 1);
280+
}
281+
}
282+
283+
TIntrusiveList<TPage> Insert(TPage* page) {
284+
Y_DEBUG_ABORT_UNLESS(TPageTraits::GetLocation(page) == ES3FIFOPageLocation::None);
285+
286+
Push(EraseGhost(page) ? MainQueue : SmallQueue, page);
287+
TPageTraits::SetFrequency(page, 0);
288+
289+
TIntrusiveList<TPage> evictedList;
290+
while (TPage* evictedPage = EvictOneIfFull()) {
291+
evictedList.PushBack(evictedPage);
292+
}
293+
294+
return evictedList;
295+
}
296+
297+
TPage* Pop(TQueue& queue) {
298+
Y_DEBUG_ABORT_UNLESS(!queue.Queue.Empty());
299+
Y_ABORT_UNLESS(TPageTraits::GetLocation(queue.Queue.Front()) == queue.Location);
300+
Y_ABORT_UNLESS(queue.Size >= TPageTraits::GetSize(queue.Queue.Front()));
301+
302+
TPage* page = queue.Queue.PopFront();
303+
queue.Size -= TPageTraits::GetSize(page);
304+
TPageTraits::SetLocation(page, ES3FIFOPageLocation::None);
305+
306+
return page;
307+
}
308+
309+
void Push(TQueue& queue, TPage* page) {
310+
Y_ABORT_UNLESS(TPageTraits::GetLocation(page) == ES3FIFOPageLocation::None);
311+
312+
queue.Queue.PushBack(page);
313+
queue.Size += TPageTraits::GetSize(page);
314+
TPageTraits::SetLocation(page, queue.Location);
315+
}
316+
317+
void Erase(TQueue& queue, TPage* page) {
318+
Y_ABORT_UNLESS(TPageTraits::GetLocation(page) == queue.Location);
319+
Y_ABORT_UNLESS(queue.Size >= TPageTraits::GetSize(page));
320+
321+
page->Unlink();
322+
queue.Size -= TPageTraits::GetSize(page);
323+
TPageTraits::SetLocation(page, ES3FIFOPageLocation::None);
324+
}
325+
326+
void AddGhost(const TPage* page) {
327+
GhostQueue.Add(TPageTraits::GetKey(page), TPageTraits::GetSize(page));
328+
}
329+
330+
bool EraseGhost(const TPage* page) {
331+
return GhostQueue.Erase(TPageTraits::GetKey(page), TPageTraits::GetSize(page));
332+
}
333+
334+
private:
335+
TLimit Limit;
336+
TQueue SmallQueue;
337+
TQueue MainQueue;
338+
TS3FIFOGhostPageQueue<TPageKey, TPageTraits> GhostQueue;
339+
340+
};
341+
342+
}

0 commit comments

Comments
 (0)