Skip to content
This repository has been archived by the owner on Aug 2, 2021. It is now read-only.

stream: avoid repeated SetSyncPull for chunk, change batch size #1993

Merged
merged 3 commits into from
Nov 29, 2019
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 34 additions & 1 deletion network/stream/sync_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,15 @@ import (
const (
syncStreamName = "SYNC"
cacheCapacity = 10000
setCacheCapacity = 200000 // 200000 * 32 = ~6.4mb mem footprint, 200K chunks ~=800 megs of data
maxBinZeroSyncPeers = 3
)

var (
setCacheMissCount = metrics.GetOrRegisterCounter("network.stream.sync_provider.set.cachemiss", nil)
setCacheHitCount = metrics.GetOrRegisterCounter("network.stream.sync_provider.set.cachehit", nil)
)

type syncProvider struct {
netStore *storage.NetStore // netstore
kad *network.Kademlia // kademlia
Expand All @@ -50,6 +56,8 @@ type syncProvider struct {
quit chan struct{} // shutdown
cacheMtx sync.RWMutex // synchronization primitive to protect cache
cache *lru.Cache // cache to minimize load on netstore
setCacheMtx sync.RWMutex // set cache mutex
setCache *lru.Cache // cache to reduce load on localstore to not set the same chunk as synced
acud marked this conversation as resolved.
Show resolved Hide resolved
logger log.Logger // logger that appends the base address to loglines
binZeroSem chan struct{} // semaphore to limit number of syncing peers on bin 0
}
Expand All @@ -63,6 +71,10 @@ func NewSyncProvider(ns *storage.NetStore, kad *network.Kademlia, autostart bool
if err != nil {
panic(err)
}
sc, err := lru.New(setCacheCapacity)
if err != nil {
panic(err)
}

return &syncProvider{
netStore: ns,
Expand All @@ -72,6 +84,7 @@ func NewSyncProvider(ns *storage.NetStore, kad *network.Kademlia, autostart bool
name: syncStreamName,
quit: make(chan struct{}),
cache: c,
setCache: sc,
logger: log.New("base", hex.EncodeToString(kad.BaseAddr()[:16])),
binZeroSem: make(chan struct{}, maxBinZeroSyncPeers),
}
Expand Down Expand Up @@ -190,11 +203,31 @@ func (s *syncProvider) Get(ctx context.Context, addr ...chunk.Address) ([]chunk.

// Set the supplied addrs as synced in order to allow for garbage collection
func (s *syncProvider) Set(ctx context.Context, addrs ...chunk.Address) error {
err := s.netStore.Set(ctx, chunk.ModeSetSyncPull, addrs...)
var chunksToSet []chunk.Address

s.setCacheMtx.RLock()
for _, addr := range addrs {
if _, ok := s.setCache.Get(addr); !ok {
chunksToSet = append(chunksToSet, addr)
setCacheMissCount.Inc(1)
} else {
setCacheHitCount.Inc(1)
}
}
s.setCacheMtx.RUnlock()

err := s.netStore.Set(ctx, chunk.ModeSetSyncPull, chunksToSet...)
if err != nil {
metrics.GetOrRegisterCounter("syncProvider.set-sync-err", nil).Inc(1)
return err
}

s.setCacheMtx.Lock()
defer s.setCacheMtx.Unlock()

for _, addr := range chunksToSet {
s.setCache.Add(addr, struct{}{})
}
return nil
}

Expand Down