Skip to content

Commit

Permalink
Merge pull request #10896 from filecoin-project/feat/chainindex-shard…
Browse files Browse the repository at this point in the history
…ed-mutex

feat: chainstore: sharded mutex for filling chain height index
  • Loading branch information
magik6k authored May 24, 2023
2 parents f99fbc1 + cad743e commit 8c8e287
Show file tree
Hide file tree
Showing 5 changed files with 266 additions and 15 deletions.
44 changes: 29 additions & 15 deletions chain/store/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,21 @@ package store

import (
"context"
"hash/maphash"
"os"
"strconv"
"sync"

"github.com/puzpuzpuz/xsync/v2"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-state-types/abi"

"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/lib/shardedmutex"
)

var DefaultChainIndexCacheSize = 32 << 15
// DefaultChainIndexCacheSize no longer sets the maximum size, just the initial size of the map.
var DefaultChainIndexCacheSize = 1 << 15

func init() {
if s := os.Getenv("LOTUS_CHAIN_INDEX_CACHE"); s != "" {
Expand All @@ -27,20 +30,26 @@ func init() {
}

type ChainIndex struct {
indexCacheLk sync.Mutex
indexCache map[types.TipSetKey]*lbEntry
indexCache *xsync.MapOf[types.TipSetKey, *lbEntry]

fillCacheLock shardedmutex.ShardedMutexFor[types.TipSetKey]

loadTipSet loadTipSetFunc

skipLength abi.ChainEpoch
}
type loadTipSetFunc func(context.Context, types.TipSetKey) (*types.TipSet, error)

func maphashTSK(s maphash.Seed, tsk types.TipSetKey) uint64 {
return maphash.Bytes(s, tsk.Bytes())
}

func NewChainIndex(lts loadTipSetFunc) *ChainIndex {
return &ChainIndex{
indexCache: make(map[types.TipSetKey]*lbEntry, DefaultChainIndexCacheSize),
loadTipSet: lts,
skipLength: 20,
indexCache: xsync.NewTypedMapOfPresized[types.TipSetKey, *lbEntry](maphashTSK, DefaultChainIndexCacheSize),
fillCacheLock: shardedmutex.NewFor(maphashTSK, 32),
loadTipSet: lts,
skipLength: 20,
}
}

Expand All @@ -59,17 +68,23 @@ func (ci *ChainIndex) GetTipsetByHeight(ctx context.Context, from *types.TipSet,
return nil, xerrors.Errorf("failed to round down: %w", err)
}

ci.indexCacheLk.Lock()
defer ci.indexCacheLk.Unlock()
cur := rounded.Key()
for {
lbe, ok := ci.indexCache[cur]
lbe, ok := ci.indexCache.Load(cur) // check the cache
if !ok {
fc, err := ci.fillCache(ctx, cur)
if err != nil {
return nil, xerrors.Errorf("failed to fill cache: %w", err)
lk := ci.fillCacheLock.GetLock(cur)
lk.Lock() // if entry is missing, take the lock
lbe, ok = ci.indexCache.Load(cur) // check if someone else added it while we waited for lock
if !ok {
fc, err := ci.fillCache(ctx, cur)
if err != nil {
lk.Unlock()
return nil, xerrors.Errorf("failed to fill cache: %w", err)
}
lbe = fc
ci.indexCache.Store(cur, lbe)
}
lbe = fc
lk.Unlock()
}

if to == lbe.targetHeight {
Expand Down Expand Up @@ -137,7 +152,6 @@ func (ci *ChainIndex) fillCache(ctx context.Context, tsk types.TipSetKey) (*lbEn
targetHeight: skipTarget.Height(),
target: skipTarget.Key(),
}
ci.indexCache[tsk] = lbe

return lbe, nil
}
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ require (
github.com/open-rpc/meta-schema v0.0.0-20201029221707-1b72ef2ea333
github.com/polydawn/refmt v0.89.0
github.com/prometheus/client_golang v1.14.0
github.com/puzpuzpuz/xsync/v2 v2.4.0
github.com/raulk/clock v1.1.0
github.com/raulk/go-watchdog v1.3.0
github.com/stretchr/testify v1.8.2
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1478,6 +1478,8 @@ github.com/prometheus/procfs v0.9.0 h1:wzCHvIvM5SxWqYvwgVL7yJY8Lz3PKn49KQtpgMYJf
github.com/prometheus/procfs v0.9.0/go.mod h1:+pB4zwohETzFnmlpe6yd2lSc+0/46IYZRB/chUwxUZY=
github.com/prometheus/statsd_exporter v0.21.0 h1:hA05Q5RFeIjgwKIYEdFd59xu5Wwaznf33yKI+pyX6T8=
github.com/prometheus/statsd_exporter v0.21.0/go.mod h1:rbT83sZq2V+p73lHhPZfMc3MLCHmSHelCh9hSGYNLTQ=
github.com/puzpuzpuz/xsync/v2 v2.4.0 h1:5sXAMHrtx1bg9nbRZTOn8T4MkWe5V+o8yKRH02Eznag=
github.com/puzpuzpuz/xsync/v2 v2.4.0/go.mod h1:gD2H2krq/w52MfPLE+Uy64TzJDVY7lP2znR9qmR35kU=
github.com/quic-go/qpack v0.4.0 h1:Cr9BXA1sQS2SmDUWjSofMPNKmvF6IiIfDRmgU0w1ZCo=
github.com/quic-go/qpack v0.4.0/go.mod h1:UZVnYIfi5GRk+zI9UMaCPsmZ2xKJP7XBUvVyT1Knj9A=
github.com/quic-go/qtls-go1-19 v0.3.2 h1:tFxjCFcTQzK+oMxG6Zcvp4Dq8dx4yD3dDiIiyc86Z5U=
Expand Down
75 changes: 75 additions & 0 deletions lib/shardedmutex/shardedmutex.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package shardedmutex

import (
"hash/maphash"
"sync"
)

const cacheline = 64

// padding a mutex to a cacheline improves performance as the cachelines are not contested
// name old time/op new time/op delta
// Locks-8 74.6ns ± 7% 12.3ns ± 2% -83.54% (p=0.000 n=20+18)
type paddedMutex struct {
mt sync.Mutex
pad [cacheline - 8]uint8
}

type ShardedMutex struct {
shards []paddedMutex
}

// New creates a new ShardedMutex with N shards
func New(nShards int) ShardedMutex {
if nShards < 1 {
panic("n_shards cannot be less than 1")
}
return ShardedMutex{
shards: make([]paddedMutex, nShards),
}
}

func (sm ShardedMutex) Shards() int {
return len(sm.shards)
}

func (sm ShardedMutex) Lock(shard int) {
sm.shards[shard].mt.Lock()
}

func (sm ShardedMutex) Unlock(shard int) {
sm.shards[shard].mt.Unlock()
}

func (sm ShardedMutex) GetLock(shard int) sync.Locker {
return &sm.shards[shard].mt
}

type ShardedMutexFor[K any] struct {
inner ShardedMutex

hasher func(maphash.Seed, K) uint64
seed maphash.Seed
}

func NewFor[K any](hasher func(maphash.Seed, K) uint64, nShards int) ShardedMutexFor[K] {
return ShardedMutexFor[K]{
inner: New(nShards),
hasher: hasher,
seed: maphash.MakeSeed(),
}
}

func (sm ShardedMutexFor[K]) shardFor(key K) int {
return int(sm.hasher(sm.seed, key) % uint64(len(sm.inner.shards)))
}

func (sm ShardedMutexFor[K]) Lock(key K) {
sm.inner.Lock(sm.shardFor(key))
}
func (sm ShardedMutexFor[K]) Unlock(key K) {
sm.inner.Unlock(sm.shardFor(key))
}
func (sm ShardedMutexFor[K]) GetLock(key K) sync.Locker {
return sm.inner.GetLock(sm.shardFor(key))
}
159 changes: 159 additions & 0 deletions lib/shardedmutex/shardedmutex_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
package shardedmutex

import (
"fmt"
"hash/maphash"
"runtime"
"sync"
"sync/atomic"
"testing"
"time"
)

func TestLockingDifferentShardsDoesNotBlock(t *testing.T) {
shards := 16
sm := New(shards)
done := make(chan struct{})
go func() {
select {
case <-done:
return
case <-time.After(5 * time.Second):
panic("test locked up")
}
}()
for i := 0; i < shards; i++ {
sm.Lock(i)
}

close(done)
}
func TestLockingSameShardsBlocks(t *testing.T) {
shards := 16
sm := New(shards)
wg := sync.WaitGroup{}
wg.Add(shards)
ch := make(chan int, shards)

for i := 0; i < shards; i++ {
go func(i int) {
if i != 15 {
sm.Lock(i)
}
wg.Done()
wg.Wait()
sm.Lock((15 + i) % shards)
ch <- i
sm.Unlock(i)
}(i)
}

wg.Wait()
for i := 0; i < 2*shards; i++ {
runtime.Gosched()
}
for i := 0; i < shards; i++ {
if a := <-ch; a != i {
t.Errorf("got %d instead of %d", a, i)
}
}
}

func TestShardedByString(t *testing.T) {
shards := 16
sm := NewFor(maphash.String, shards)

wg1 := sync.WaitGroup{}
wg1.Add(shards * 20)
wg2 := sync.WaitGroup{}
wg2.Add(shards * 20)

active := atomic.Int32{}
max := atomic.Int32{}

for i := 0; i < shards*20; i++ {
go func(i int) {
wg1.Done()
wg1.Wait()
sm.Lock(fmt.Sprintf("goroutine %d", i))
activeNew := active.Add(1)
for {
curMax := max.Load()
if curMax >= activeNew {
break
}
if max.CompareAndSwap(curMax, activeNew) {
break
}
}
for j := 0; j < 100; j++ {
runtime.Gosched()
}
active.Add(-1)
sm.Unlock(fmt.Sprintf("goroutine %d", i))
wg2.Done()
}(i)
}

wg2.Wait()

if max.Load() != 16 {
t.Fatal("max load not achieved", max.Load())
}

}

func BenchmarkShardedMutex(b *testing.B) {
shards := 16
sm := New(shards)

done := atomic.Int32{}
go func() {
for {
sm.Lock(0)
sm.Unlock(0)
if done.Load() != 0 {
return
}
}
}()
for i := 0; i < 100; i++ {
runtime.Gosched()
}

b.ResetTimer()
for i := 0; i < b.N; i++ {
sm.Lock(1)
sm.Unlock(1)
}
done.Add(1)
}

func BenchmarkShardedMutexOf(b *testing.B) {
shards := 16
sm := NewFor(maphash.String, shards)

str1 := "string1"
str2 := "string2"

done := atomic.Int32{}
go func() {
for {
sm.Lock(str1)
sm.Unlock(str1)
if done.Load() != 0 {
return
}
}
}()
for i := 0; i < 100; i++ {
runtime.Gosched()
}

b.ResetTimer()
for i := 0; i < b.N; i++ {
sm.Lock(str2)
sm.Unlock(str2)
}
done.Add(1)
}

0 comments on commit 8c8e287

Please sign in to comment.