Skip to content

Combined locking with exclusive bit #38

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Dec 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
780 changes: 390 additions & 390 deletions cachelib/allocator/CacheAllocator-inl.h

Large diffs are not rendered by default.

160 changes: 93 additions & 67 deletions cachelib/allocator/CacheAllocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -1389,7 +1389,7 @@ class CacheAllocator : public CacheBase {
double slabsApproxFreePercentage(TierId tid) const;

// wrapper around Item's refcount and active handle tracking
FOLLY_ALWAYS_INLINE void incRef(Item& it);
FOLLY_ALWAYS_INLINE RefcountWithFlags::incResult incRef(Item& it, bool failIfMoving);
FOLLY_ALWAYS_INLINE RefcountWithFlags::Value decRef(Item& it);

// drops the refcount and if needed, frees the allocation back to the memory
Expand Down Expand Up @@ -1440,6 +1440,12 @@ class CacheAllocator : public CacheBase {
bool nascent = false,
const Item* toRecycle = nullptr);

// Must be called by the thread which called markExclusive and
// succeeded. After this call, the item is unlinked from Access and
// MM Containers. The item is no longer marked as exclusive and it's
// ref count is 0 - it's available for recycling.
void unlinkItemExclusive(Item& it);

// acquires an handle on the item. returns an empty handle if it is null.
// @param it pointer to an item
// @return WriteHandle return a handle to this item
Expand Down Expand Up @@ -1550,17 +1556,17 @@ class CacheAllocator : public CacheBase {
// @return handle to the parent item if the validations pass
// otherwise, an empty Handle is returned.
//
ReadHandle validateAndGetParentHandleForChainedMoveLocked(
WriteHandle validateAndGetParentHandleForChainedMoveLocked(
const ChainedItem& item, const Key& parentKey);

// Given an existing item, allocate a new one for the
// existing one to later be moved into.
//
// @param oldItem the item we want to allocate a new item for
// @param item reference to the item we want to allocate a new item for
//
// @return handle to the newly allocated item
//
WriteHandle allocateNewItemForOldItem(const Item& oldItem);
WriteHandle allocateNewItemForOldItem(const Item& item);

// internal helper that grabs a refcounted handle to the item. This does
// not record the access to reflect in the mmContainer.
Expand Down Expand Up @@ -1614,18 +1620,16 @@ class CacheAllocator : public CacheBase {
// @param oldItem Reference to the item being moved
// @param newItemHdl Reference to the handle of the new item being moved into
//
// @return the handle to the oldItem if the move was completed
// and the oldItem can be recycled.
// Otherwise an empty handle is returned.
template <typename P>
WriteHandle moveRegularItemWithSync(Item& oldItem, WriteHandle& newItemHdl, P&& predicate);
// @return true If the move was completed, and the containers were updated
// successfully.
void moveRegularItemWithSync(Item& oldItem, WriteHandle& newItemHdl);

// Moves a regular item to a different slab. This should only be used during
// slab release after the item's exclusive bit has been set. The user supplied
// callback is responsible for copying the contents and fixing the semantics
// of chained item.
//
// @param oldItem Reference to the item being moved
// @param oldItem item being moved
// @param newItemHdl Reference to the handle of the new item being moved into
//
// @return true If the move was completed, and the containers were updated
Expand Down Expand Up @@ -1789,9 +1793,9 @@ class CacheAllocator : public CacheBase {
// handle to the item. On failure an empty handle.
WriteHandle tryEvictToNextMemoryTier(TierId tid, PoolId pid, Item& item, bool fromBgThread);

bool tryPromoteToNextMemoryTier(TierId tid, PoolId pid, Item& item, bool fromBgThread);
WriteHandle tryPromoteToNextMemoryTier(TierId tid, PoolId pid, Item& item, bool fromBgThread);

bool tryPromoteToNextMemoryTier(Item& item, bool fromBgThread);
WriteHandle tryPromoteToNextMemoryTier(Item& item, bool fromBgThread);

// Try to move the item down to the next memory tier
//
Expand Down Expand Up @@ -1878,22 +1882,23 @@ class CacheAllocator : public CacheBase {

// @return true when successfully marked as moving,
// fasle when this item has already been freed
bool markExclusiveForSlabRelease(const SlabReleaseContext& ctx,
void* alloc,
util::Throttler& throttler);
bool markMovingForSlabRelease(const SlabReleaseContext& ctx,
void* alloc,
util::Throttler& throttler);

// "Move" (by copying) the content in this item to another memory
// location by invoking the move callback.
//
//
// @param ctx slab release context
// @param item old item to be moved elsewhere
// @param oldItem old item to be moved elsewhere
// @param handle handle to the item or to it's parent (if chained)
// @param throttler slow this function down as not to take too much cpu
//
// @return true if the item has been moved
// false if we have exhausted moving attempts
bool moveForSlabRelease(const SlabReleaseContext& ctx,
Item& item,
Item& oldItem,
util::Throttler& throttler);

// "Move" (by copying) the content in this item to another memory
Expand Down Expand Up @@ -1929,6 +1934,8 @@ class CacheAllocator : public CacheBase {
// handle on failure. caller can retry.
WriteHandle evictChainedItemForSlabRelease(ChainedItem& item);

typename NvmCacheT::PutToken createPutToken(Item& item);

// Helper function to remove a item if predicates is true.
//
// @return last handle to the item on success. empty handle on failure.
Expand Down Expand Up @@ -1966,8 +1973,10 @@ class CacheAllocator : public CacheBase {
candidates.reserve(batch);

size_t tries = 0;
mmContainer.withEvictionIterator([&tries, &candidates, &batch, this](auto &&itr){
while (candidates.size() < batch && (config_.maxEvictionPromotionHotness == 0 || tries < config_.maxEvictionPromotionHotness) && itr) {
mmContainer.withEvictionIterator([&tries, &candidates, &batch, &mmContainer, this](auto &&itr) {
while (candidates.size() < batch &&
(config_.maxEvictionPromotionHotness == 0 || tries < config_.maxEvictionPromotionHotness) &&
itr) {
tries++;
Item* candidate = itr.get();
XDCHECK(candidate);
Expand All @@ -1976,7 +1985,8 @@ class CacheAllocator : public CacheBase {
throw std::runtime_error("Not supported for chained items");
}

if (candidate->getRefCount() == 0 && candidate->markExclusive()) {
if (candidate->markMoving(true)) {
mmContainer.remove(itr);
candidates.push_back(candidate);
}

Expand All @@ -1985,37 +1995,45 @@ class CacheAllocator : public CacheBase {
});

for (Item *candidate : candidates) {
{
auto toReleaseHandle =
evictNormalItem(*candidate,
true /* skipIfTokenInvalid */, true /* from BG thread */);
// destroy toReleseHandle. The item won't be release to allocator
// since we marked it as exclusive.
}
auto ref = candidate->unmarkExclusive();

if (ref == 0u) {
if (candidate->hasChainedItem()) {
(*stats_.chainedItemEvictions)[pid][cid].inc();
} else {
(*stats_.regularItemEvictions)[pid][cid].inc();
}
auto evictedToNext = tryEvictToNextMemoryTier(*candidate, true /* from BgThread */);
if (!evictedToNext) {
auto token = createPutToken(*candidate);

auto ret = candidate->markExclusiveWhenMoving();
XDCHECK(ret);

unlinkItemExclusive(*candidate);
// wake up any readers that wait for the move to complete
// it's safe to do now, as we have the item marked exclusive and
// no other reader can be added to the waiters list
wakeUpWaiters(candidate->getKey(), WriteHandle{});

if (token.isValid() && shouldWriteToNvmCacheExclusive(*candidate)) {
nvmCache_->put(*candidate, std::move(token));
}
} else {
evictions++;
XDCHECK(!evictedToNext->isExclusive() && !evictedToNext->isMoving());
XDCHECK(!candidate->isExclusive() && !candidate->isMoving());
XDCHECK(!candidate->isAccessible());
XDCHECK(candidate->getKey() == evictedToNext->getKey());

evictions++;
// it's safe to recycle the item here as there are no more
// references and the item could not been marked as moving
// by other thread since it's detached from MMContainer.
auto res = releaseBackToAllocator(*candidate, RemoveContext::kEviction,
/* isNascent */ false);
XDCHECK(res == ReleaseRes::kReleased);
wakeUpWaiters(candidate->getKey(), std::move(evictedToNext));
}
XDCHECK(!candidate->isExclusive() && !candidate->isMoving());

if (candidate->hasChainedItem()) {
(*stats_.chainedItemEvictions)[pid][cid].inc();
} else {
if (candidate->hasChainedItem()) {
stats_.evictFailParentAC.inc();
} else {
stats_.evictFailAC.inc();
}
(*stats_.regularItemEvictions)[pid][cid].inc();
}

// it's safe to recycle the item here as there are no more
// references and the item could not been marked as moving
// by other thread since it's detached from MMContainer.
auto res = releaseBackToAllocator(*candidate, RemoveContext::kEviction,
/* isNascent */ false);
XDCHECK(res == ReleaseRes::kReleased);
}
return evictions;
}
Expand All @@ -2028,7 +2046,7 @@ class CacheAllocator : public CacheBase {

size_t tries = 0;

mmContainer.withPromotionIterator([&tries, &candidates, &batch, this](auto &&itr){
mmContainer.withPromotionIterator([&tries, &candidates, &batch, &mmContainer, this](auto &&itr){
while (candidates.size() < batch && (config_.maxEvictionPromotionHotness == 0 || tries < config_.maxEvictionPromotionHotness) && itr) {
tries++;
Item* candidate = itr.get();
Expand All @@ -2038,10 +2056,9 @@ class CacheAllocator : public CacheBase {
throw std::runtime_error("Not supported for chained items");
}


// TODO: only allow it for read-only items?
// or implement mvcc
if (!candidate->isExpired() && candidate->markExclusive()) {
if (candidate->markMoving(true)) {
candidates.push_back(candidate);
}

Expand All @@ -2051,18 +2068,28 @@ class CacheAllocator : public CacheBase {

for (Item *candidate : candidates) {
auto promoted = tryPromoteToNextMemoryTier(*candidate, true);
auto ref = candidate->unmarkExclusive();
if (promoted)
if (promoted) {
promotions++;

if (ref == 0u) {
// stats_.promotionMoveSuccess.inc();
auto res = releaseBackToAllocator(*candidate, RemoveContext::kEviction,
/* isNascent */ false);
XDCHECK(res == ReleaseRes::kReleased);
removeFromMMContainer(*candidate);
XDCHECK(!candidate->isExclusive() && !candidate->isMoving());
// it's safe to recycle the item here as there are no more
// references and the item could not been marked as moving
// by other thread since it's detached from MMContainer.
auto res = releaseBackToAllocator(*candidate, RemoveContext::kEviction,
/* isNascent */ false);
XDCHECK(res == ReleaseRes::kReleased);
} else {
//we failed to allocate a new item, this item is no longer moving
auto ref = candidate->unmarkMoving();
if (UNLIKELY(ref == 0)) {
const auto res =
releaseBackToAllocator(*candidate,
RemoveContext::kNormal, false);
XDCHECK(res == ReleaseRes::kReleased);
}
}

}

return promotions;
}

Expand Down Expand Up @@ -2173,18 +2200,14 @@ class CacheAllocator : public CacheBase {
std::optional<bool> saveNvmCache();
void saveRamCache();

static bool itemExclusivePredicate(const Item& item) {
return item.getRefCount() == 0;
static bool itemSlabMovePredicate(const Item& item) {
return item.isMoving() && item.getRefCount() == 0;
}

static bool itemExpiryPredicate(const Item& item) {
return item.getRefCount() == 1 && item.isExpired();
}

static bool parentEvictForSlabReleasePredicate(const Item& item) {
return item.getRefCount() == 1 && !item.isExclusive();
}

std::unique_ptr<Deserializer> createDeserializer();

// Execute func on each item. `func` can throw exception but must ensure
Expand Down Expand Up @@ -2234,8 +2257,9 @@ class CacheAllocator : public CacheBase {
return memoryTierConfigs.size();
}

bool addWaitContextForMovingItem(
folly::StringPiece key, std::shared_ptr<WaitContext<ReadHandle>> waiter);
WriteHandle handleWithWaitContextForMovingItem(Item& item);

size_t wakeUpWaiters(folly::StringPiece key, WriteHandle&& handle);

class MoveCtx {
public:
Expand All @@ -2260,6 +2284,8 @@ class CacheAllocator : public CacheBase {
waiters.push_back(std::move(waiter));
}

size_t numWaiters() const { return waiters.size(); }

private:
// notify all pending waiters that are waiting for the fetch.
void wakeUpWaiters() {
Expand Down
39 changes: 22 additions & 17 deletions cachelib/allocator/CacheItem-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,28 @@ bool CacheItem<CacheTrait>::isExclusive() const noexcept {
}

template <typename CacheTrait>
bool CacheItem<CacheTrait>::isOnlyExclusive() const noexcept {
return ref_.isOnlyExclusive();
bool CacheItem<CacheTrait>::markExclusiveWhenMoving() {
return ref_.markExclusiveWhenMoving();
}

template <typename CacheTrait>
bool CacheItem<CacheTrait>::markMoving(bool failIfRefNotZero) {
return ref_.markMoving(failIfRefNotZero);
}

template <typename CacheTrait>
RefcountWithFlags::Value CacheItem<CacheTrait>::unmarkMoving() noexcept {
return ref_.unmarkMoving();
}

template <typename CacheTrait>
bool CacheItem<CacheTrait>::isMoving() const noexcept {
return ref_.isMoving();
}

template <typename CacheTrait>
bool CacheItem<CacheTrait>::isOnlyMoving() const noexcept {
return ref_.isOnlyMoving();
}

template <typename CacheTrait>
Expand Down Expand Up @@ -266,21 +286,6 @@ bool CacheItem<CacheTrait>::isNvmEvicted() const noexcept {
return ref_.isNvmEvicted();
}

template <typename CacheTrait>
void CacheItem<CacheTrait>::markIncomplete() noexcept {
ref_.markIncomplete();
}

template <typename CacheTrait>
void CacheItem<CacheTrait>::unmarkIncomplete() noexcept {
ref_.unmarkIncomplete();
}

template <typename CacheTrait>
bool CacheItem<CacheTrait>::isIncomplete() const noexcept {
return ref_.isIncomplete();
}

template <typename CacheTrait>
void CacheItem<CacheTrait>::markIsChainedItem() noexcept {
XDCHECK(!hasChainedItem());
Expand Down
Loading