Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
168 changes: 57 additions & 111 deletions ydb/core/tablet_flat/shared_cache_s3fifo.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,131 +19,64 @@ template <typename TPageTraits>
class TS3FIFOGhostPageQueue {
using TPageKey = typename TPageTraits::TPageKey;

struct TGhostPage {
TPageKey Key;
ui64 Size; // zero size is tombstone

TGhostPage(const TPageKey& key, ui64 size)
: Key(key)
, Size(size)
{}
};

struct TGhostPageHash {
using is_transparent = void;

inline size_t operator()(const TGhostPage* ghost) const {
return TPageTraits::GetHash(ghost->Key);
}

inline size_t operator()(const TPageKey& key) const {
return TPageTraits::GetHash(key);
}
};

struct TGhostPageEqual {
using is_transparent = void;

inline bool operator()(const TGhostPage* left, const TGhostPage* right) const {
return TPageTraits::Equals(left->Key, right->Key);
}

inline bool operator()(const TGhostPage* left, const TPageKey& right) const {
return TPageTraits::Equals(left->Key, right);
}
};

public:
TS3FIFOGhostPageQueue(ui64 limit)
: Limit(limit)
{}

void Add(const TPageKey& key, ui64 size) {
if (Y_UNLIKELY(size == 0)) {
Y_DEBUG_ABORT_S("Empty " << TPageTraits::ToString(key) << " page");
return;
}
bool Add(const TPageKey& key) {
size_t hash = TPageTraits::GetHash(key);

TGhostPage* ghost = &GhostsQueue.emplace_back(key, size);
if (Y_UNLIKELY(!GhostsSet.emplace(ghost).second)) {
GhostsQueue.pop_back();
Y_DEBUG_ABORT_S("Duplicated " << TPageTraits::ToString(key) << " page");
return;
if (GhostsSet.insert(hash).second) {
GhostsQueue.push_back(hash);
return true;
}

Size += ghost->Size;

EvictWhileFull();

return false;
}

bool Erase(const TPageKey& key, ui64 size) {
if (auto it = GhostsSet.find(key); it != GhostsSet.end()) {
TGhostPage* ghost = *it;
Y_DEBUG_ABORT_UNLESS(ghost->Size == size);
Y_ABORT_UNLESS(Size >= ghost->Size);
Size -= ghost->Size;
ghost->Size = 0; // mark as deleted
GhostsSet.erase(it);
return true;
void Limit(size_t limit) {
while (GhostsQueue.size() > limit) {
bool erased = GhostsSet.erase(GhostsQueue.front());
Y_DEBUG_ABORT_UNLESS(erased);
GhostsQueue.pop_front();
}
return false;
}
void UpdateLimit(ui64 limit) {
Limit = limit;
EvictWhileFull();

bool Contains(const TPageKey& key) {
size_t hash = TPageTraits::GetHash(key);
return GhostsSet.contains(hash);
}

TString Dump() const {
TStringBuilder result;
size_t count = 0;
ui64 size = 0;
for (auto it = GhostsQueue.begin(); it != GhostsQueue.end(); it++) {
const TGhostPage* ghost = &*it;
if (ghost->Size) { // isn't deleted
Y_DEBUG_ABORT_UNLESS(GhostsSet.contains(ghost));
if (count != 0) result << ", ";
result << "{" << TPageTraits::ToString(ghost->Key) << " " << ghost->Size << "b}";
count++;
size += ghost->Size;
}
for (size_t hash : GhostsQueue) {
Y_DEBUG_ABORT_UNLESS(GhostsSet.contains(hash));
if (count != 0) result << ", ";
result << hash;
count++;
}
Y_DEBUG_ABORT_UNLESS(GhostsSet.size() == count);
Y_DEBUG_ABORT_UNLESS(Size == size);
return result;
}

private:
void EvictWhileFull() {
while (!GhostsQueue.empty() && Size > Limit) {
TGhostPage* ghost = &GhostsQueue.front();
if (ghost->Size) { // isn't deleted
Y_ABORT_UNLESS(Size >= ghost->Size);
Size -= ghost->Size;
bool erased = GhostsSet.erase(ghost);
Y_ABORT_UNLESS(erased);
}
GhostsQueue.pop_front();
}
}

ui64 Limit;
ui64 Size = 0;
// TODO: store ghost withing PageMap
THashSet<TGhostPage*, TGhostPageHash, TGhostPageEqual> GhostsSet;
TDeque<TGhostPage> GhostsQueue;
// Note: only hashes are stored, all the collisions just ignored
THashSet<size_t> GhostsSet;
TDeque<size_t> GhostsQueue;
};

template <typename TPage, typename TPageTraits>
class TS3FIFOCache : public ICacheCache<TPage> {
using TPageKey = typename TPageTraits::TPageKey;

static const ui32 MaxMainQueueReinserts = 20;

struct TLimit {
ui64 TotalLimit;
ui64 SmallQueueLimit;
ui64 MainQueueLimit;

TLimit(ui64 limit)
: SmallQueueLimit(limit / 10)
: TotalLimit(limit)
, SmallQueueLimit(limit / 10)
, MainQueueLimit(limit - SmallQueueLimit)
{}
};
Expand All @@ -155,6 +88,7 @@ class TS3FIFOCache : public ICacheCache<TPage> {

ES3FIFOPageLocation Location;
TIntrusiveList<TPage> Queue;
ui64 Count = 0;
ui64 Size = 0;
};

Expand All @@ -163,7 +97,6 @@ class TS3FIFOCache : public ICacheCache<TPage> {
: Limit(limit)
, SmallQueue(ES3FIFOPageLocation::SmallQueue)
, MainQueue(ES3FIFOPageLocation::MainQueue)
, GhostQueue(limit)
{}

TIntrusiveList<TPage> EvictNext() override {
Expand Down Expand Up @@ -205,7 +138,6 @@ class TS3FIFOCache : public ICacheCache<TPage> {
const ES3FIFOPageLocation location = TPageTraits::GetLocation(page);
switch (location) {
case ES3FIFOPageLocation::None:
EraseGhost(page);
break;
case ES3FIFOPageLocation::SmallQueue:
Erase(SmallQueue, page);
Expand All @@ -222,7 +154,6 @@ class TS3FIFOCache : public ICacheCache<TPage> {

void UpdateLimit(ui64 limit) override {
Limit = limit;
GhostQueue.UpdateLimit(limit);
}

ui64 GetSize() const override {
Expand All @@ -242,6 +173,7 @@ class TS3FIFOCache : public ICacheCache<TPage> {
count++;
size += TPageTraits::GetSize(page);
}
Y_DEBUG_ABORT_UNLESS(queue.Count == count);
Y_DEBUG_ABORT_UNLESS(queue.Size == size);
};

Expand All @@ -257,26 +189,33 @@ class TS3FIFOCache : public ICacheCache<TPage> {

private:
TPage* EvictOneIfFull() {
while (true) {
if (!SmallQueue.Queue.Empty() && SmallQueue.Size > Limit.SmallQueueLimit) {
ui32 mainQueueReinserts = 0;

while (GetSize() > Limit.TotalLimit) {
if (SmallQueue.Size > Limit.SmallQueueLimit) {
TPage* page = Pop(SmallQueue);
if (ui32 frequency = TPageTraits::GetFrequency(page); frequency > 1) { // load inserts, first read touches, second read touches
TPageTraits::SetFrequency(page, 0);
Push(MainQueue, page);
} else {
if (frequency) TPageTraits::SetFrequency(page, 0);
if (frequency) { // the page is used only once
TPageTraits::SetFrequency(page, 0);
}
AddGhost(page);
return page;
}
} else if (!MainQueue.Queue.Empty() && MainQueue.Size > Limit.MainQueueLimit) {
} else {
TPage* page = Pop(MainQueue);
if (ui32 frequency = TPageTraits::GetFrequency(page); frequency > 0) {
if (ui32 frequency = TPageTraits::GetFrequency(page); frequency > 0 && mainQueueReinserts < MaxMainQueueReinserts) {
mainQueueReinserts++;
TPageTraits::SetFrequency(page, frequency - 1);
Push(MainQueue, page);
} else {
if (frequency) { // reinserts limit exceeded
TPageTraits::SetFrequency(page, 0);
}
return page;
}
} else {
break;
}
}

Expand All @@ -295,7 +234,7 @@ class TS3FIFOCache : public ICacheCache<TPage> {
TIntrusiveList<TPage> Insert(TPage* page) {
Y_DEBUG_ABORT_UNLESS(TPageTraits::GetLocation(page) == ES3FIFOPageLocation::None);

Push(EraseGhost(page) ? MainQueue : SmallQueue, page);
Push(IsGhost(page) ? MainQueue : SmallQueue, page);
TPageTraits::SetFrequency(page, 0);

TIntrusiveList<TPage> evictedList;
Expand All @@ -307,11 +246,13 @@ class TS3FIFOCache : public ICacheCache<TPage> {
}

TPage* Pop(TQueue& queue) {
Y_DEBUG_ABORT_UNLESS(!queue.Queue.Empty());
Y_ABORT_UNLESS(!queue.Queue.Empty());
Y_ABORT_UNLESS(TPageTraits::GetLocation(queue.Queue.Front()) == queue.Location);
Y_ABORT_UNLESS(queue.Count > 0);
Y_ABORT_UNLESS(queue.Size >= TPageTraits::GetSize(queue.Queue.Front()));

TPage* page = queue.Queue.PopFront();
queue.Count--;
queue.Size -= TPageTraits::GetSize(page);
TPageTraits::SetLocation(page, ES3FIFOPageLocation::None);

Expand All @@ -322,25 +263,30 @@ class TS3FIFOCache : public ICacheCache<TPage> {
Y_ABORT_UNLESS(TPageTraits::GetLocation(page) == ES3FIFOPageLocation::None);

queue.Queue.PushBack(page);
queue.Count++;
queue.Size += TPageTraits::GetSize(page);
TPageTraits::SetLocation(page, queue.Location);
}

void Erase(TQueue& queue, TPage* page) {
Y_ABORT_UNLESS(TPageTraits::GetLocation(page) == queue.Location);
Y_ABORT_UNLESS(queue.Count > 0);
Y_ABORT_UNLESS(queue.Size >= TPageTraits::GetSize(page));

page->Unlink();
queue.Count--;
queue.Size -= TPageTraits::GetSize(page);
TPageTraits::SetLocation(page, ES3FIFOPageLocation::None);
}

void AddGhost(const TPage* page) {
GhostQueue.Add(TPageTraits::GetKey(page), TPageTraits::GetSize(page));
if (GhostQueue.Add(TPageTraits::GetKey(page))) {
GhostQueue.Limit(SmallQueue.Count + MainQueue.Count);
}
}

bool EraseGhost(const TPage* page) {
return GhostQueue.Erase(TPageTraits::GetKey(page), TPageTraits::GetSize(page));
bool IsGhost(const TPage* page) {
return GhostQueue.Contains(TPageTraits::GetKey(page));
}

private:
Expand Down
Loading
Loading