Skip to content
This repository has been archived by the owner on Jun 19, 2023. It is now read-only.

Commit

Permalink
Merge pull request #67 from ipfs/frrist/arc-cache-stripe
Browse files Browse the repository at this point in the history
fix(arc): striped locking on last byte of CID
  • Loading branch information
Stebalien authored May 4, 2021
2 parents fb07d7b + 8d1f7bf commit d90561b
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 13 deletions.
70 changes: 67 additions & 3 deletions arc_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package blockstore

import (
"context"
"sync"

lru "github.com/hashicorp/golang-lru"
blocks "github.com/ipfs/go-block-format"
Expand All @@ -17,7 +18,9 @@ type cacheSize int
// size. This provides block access-time improvements, allowing
// to short-cut many searches without querying the underlying datastore.
type arccache struct {
cache *lru.TwoQueueCache
cache *lru.TwoQueueCache
lks [256]sync.RWMutex

blockstore Blockstore
viewer Viewer

Expand All @@ -42,11 +45,27 @@ func newARCCachedBS(ctx context.Context, bs Blockstore, lruSize int) (*arccache,
return c, nil
}

func (b *arccache) getLock(k cid.Cid) *sync.RWMutex {
return &b.lks[mutexKey(k)]
}

func mutexKey(k cid.Cid) uint8 {
return k.KeyString()[len(k.KeyString())-1]
}

func (b *arccache) DeleteBlock(k cid.Cid) error {
if !k.Defined() {
return nil
}

if has, _, ok := b.queryCache(k); ok && !has {
return nil
}

lk := b.getLock(k)
lk.Lock()
defer lk.Unlock()

b.cache.Remove(k) // Invalidate cache before deleting.
err := b.blockstore.DeleteBlock(k)
if err == nil {
Expand All @@ -56,9 +75,18 @@ func (b *arccache) DeleteBlock(k cid.Cid) error {
}

func (b *arccache) Has(k cid.Cid) (bool, error) {
if !k.Defined() {
return false, nil
}

if has, _, ok := b.queryCache(k); ok {
return has, nil
}

lk := b.getLock(k)
lk.RLock()
defer lk.RUnlock()

has, err := b.blockstore.Has(k)
if err != nil {
return false, err
Expand All @@ -68,6 +96,10 @@ func (b *arccache) Has(k cid.Cid) (bool, error) {
}

func (b *arccache) GetSize(k cid.Cid) (int, error) {
if !k.Defined() {
return -1, ErrNotFound
}

if has, blockSize, ok := b.queryCache(k); ok {
if !has {
// don't have it, return
Expand All @@ -79,6 +111,11 @@ func (b *arccache) GetSize(k cid.Cid) (int, error) {
}
// we have it but don't know the size, ask the datastore.
}

lk := b.getLock(k)
lk.RLock()
defer lk.RUnlock()

blockSize, err := b.blockstore.GetSize(k)
if err == ErrNotFound {
b.cacheHave(k, false)
Expand All @@ -100,7 +137,6 @@ func (b *arccache) View(k cid.Cid, callback func([]byte) error) error {
}

if !k.Defined() {
log.Error("undefined cid in arc cache")
return ErrNotFound
}

Expand All @@ -110,19 +146,26 @@ func (b *arccache) View(k cid.Cid, callback func([]byte) error) error {
return ErrNotFound
}

lk := b.getLock(k)
lk.RLock()
defer lk.RUnlock()

return b.viewer.View(k, callback)
}

func (b *arccache) Get(k cid.Cid) (blocks.Block, error) {
if !k.Defined() {
log.Error("undefined cid in arc cache")
return nil, ErrNotFound
}

if has, _, ok := b.queryCache(k); ok && !has {
return nil, ErrNotFound
}

lk := b.getLock(k)
lk.RLock()
defer lk.RUnlock()

bl, err := b.blockstore.Get(k)
if bl == nil && err == ErrNotFound {
b.cacheHave(k, false)
Expand All @@ -137,6 +180,10 @@ func (b *arccache) Put(bl blocks.Block) error {
return nil
}

lk := b.getLock(bl.Cid())
lk.Lock()
defer lk.Unlock()

err := b.blockstore.Put(bl)
if err == nil {
b.cacheSize(bl.Cid(), len(bl.RawData()))
Expand All @@ -145,14 +192,31 @@ func (b *arccache) Put(bl blocks.Block) error {
}

func (b *arccache) PutMany(bs []blocks.Block) error {
mxs := [256]*sync.RWMutex{}
var good []blocks.Block
for _, block := range bs {
// call put on block if result is inconclusive or we are sure that
// the block isn't in storage
if has, _, ok := b.queryCache(block.Cid()); !ok || (ok && !has) {
good = append(good, block)
mxs[mutexKey(block.Cid())] = &b.lks[mutexKey(block.Cid())]
}
}

for _, mx := range mxs {
if mx != nil {
mx.Lock()
}
}

defer func() {
for _, mx := range mxs {
if mx != nil {
mx.Unlock()
}
}
}()

err := b.blockstore.PutMany(good)
if err != nil {
return err
Expand Down
38 changes: 28 additions & 10 deletions arc_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,16 +246,34 @@ func TestDifferentKeyObjectsWork(t *testing.T) {
}

func TestPutManyCaches(t *testing.T) {
arc, _, cd := createStores(t)
arc.PutMany([]blocks.Block{exampleBlock})
t.Run("happy path PutMany", func(t *testing.T) {
arc, _, cd := createStores(t)
arc.PutMany([]blocks.Block{exampleBlock})

trap("has hit datastore", cd, t)
arc.Has(exampleBlock.Cid())
arc.GetSize(exampleBlock.Cid())
untrap(cd)
arc.DeleteBlock(exampleBlock.Cid())

arc.Put(exampleBlock)
trap("PunMany has hit datastore", cd, t)
arc.PutMany([]blocks.Block{exampleBlock})
})

trap("has hit datastore", cd, t)
arc.Has(exampleBlock.Cid())
arc.GetSize(exampleBlock.Cid())
untrap(cd)
arc.DeleteBlock(exampleBlock.Cid())
t.Run("PutMany with duplicates", func(t *testing.T) {
arc, _, cd := createStores(t)
arc.PutMany([]blocks.Block{exampleBlock, exampleBlock})

trap("has hit datastore", cd, t)
arc.Has(exampleBlock.Cid())
arc.GetSize(exampleBlock.Cid())
untrap(cd)
arc.DeleteBlock(exampleBlock.Cid())

arc.Put(exampleBlock)
trap("PunMany has hit datastore", cd, t)
arc.PutMany([]blocks.Block{exampleBlock})
})

arc.Put(exampleBlock)
trap("PunMany has hit datastore", cd, t)
arc.PutMany([]blocks.Block{exampleBlock})
}

0 comments on commit d90561b

Please sign in to comment.