Skip to content

Commit

Permalink
Merge pull request ipfs#29 from libp2p/feat/faster-providers
Browse files Browse the repository at this point in the history
improve perf of providers storage and gc
  • Loading branch information
whyrusleeping authored Nov 15, 2016
2 parents 6d0b6be + b152b1e commit 853debe
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 42 deletions.
78 changes: 38 additions & 40 deletions providers/providers.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ func NewProviderManager(ctx context.Context, local peer.ID, dstore ds.Batching)

const providersKeyPrefix = "/providers/"

func mkProvKey(k *cid.Cid) ds.Key {
return ds.NewKey(providersKeyPrefix + base32.RawStdEncoding.EncodeToString(k.Bytes()))
func mkProvKey(k *cid.Cid) string {
return providersKeyPrefix + base32.RawStdEncoding.EncodeToString(k.Bytes())
}

func (pm *ProviderManager) Process() goprocess.Process {
Expand Down Expand Up @@ -112,7 +112,7 @@ func (pm *ProviderManager) getProvSet(k *cid.Cid) (*providerSet, error) {
}

func loadProvSet(dstore ds.Datastore, k *cid.Cid) (*providerSet, error) {
res, err := dstore.Query(dsq.Query{Prefix: mkProvKey(k).String()})
res, err := dstore.Query(dsq.Query{Prefix: mkProvKey(k)})
if err != nil {
return nil, err
}
Expand All @@ -123,13 +123,10 @@ func loadProvSet(dstore ds.Datastore, k *cid.Cid) (*providerSet, error) {
log.Error("got an error: ", e.Error)
continue
}
parts := strings.Split(e.Key, "/")
if len(parts) != 4 {
log.Warning("incorrectly formatted key: ", e.Key)
continue
}

decstr, err := base32.RawStdEncoding.DecodeString(parts[len(parts)-1])
lix := strings.LastIndex(e.Key, "/")

decstr, err := base32.RawStdEncoding.DecodeString(e.Key[lix+1:])
if err != nil {
log.Error("base32 decoding error: ", err)
continue
Expand Down Expand Up @@ -174,20 +171,20 @@ func (pm *ProviderManager) addProv(k *cid.Cid, p peer.ID) error {
}

func writeProviderEntry(dstore ds.Datastore, k *cid.Cid, p peer.ID, t time.Time) error {
dsk := mkProvKey(k).ChildString(base32.RawStdEncoding.EncodeToString([]byte(p)))
dsk := mkProvKey(k) + "/" + base32.RawStdEncoding.EncodeToString([]byte(p))

buf := make([]byte, 16)
n := binary.PutVarint(buf, t.UnixNano())

return dstore.Put(dsk, buf[:n])
return dstore.Put(ds.NewKey(dsk), buf[:n])
}

func (pm *ProviderManager) deleteProvSet(k *cid.Cid) error {
pm.providers.Remove(k.KeyString())

res, err := pm.dstore.Query(dsq.Query{
KeysOnly: true,
Prefix: mkProvKey(k).String(),
Prefix: mkProvKey(k),
})

entries, err := res.Rest()
Expand All @@ -204,44 +201,40 @@ func (pm *ProviderManager) deleteProvSet(k *cid.Cid) error {
return nil
}

func (pm *ProviderManager) getAllProvKeys() ([]*cid.Cid, error) {
func (pm *ProviderManager) getProvKeys() (func() (*cid.Cid, bool), error) {
res, err := pm.dstore.Query(dsq.Query{
KeysOnly: true,
KeysOnly: false,
Prefix: providersKeyPrefix,
})

if err != nil {
return nil, err
}

entries, err := res.Rest()
if err != nil {
return nil, err
}
iter := func() (*cid.Cid, bool) {
for e := range res.Next() {
parts := strings.Split(e.Key, "/")
if len(parts) != 4 {
log.Warningf("incorrectly formatted provider entry in datastore: %s", e.Key)
continue
}
decoded, err := base32.RawStdEncoding.DecodeString(parts[2])
if err != nil {
log.Warning("error decoding base32 provider key: %s: %s", parts[2], err)
continue
}

seen := cid.NewSet()
for _, e := range entries {
parts := strings.Split(e.Key, "/")
if len(parts) != 4 {
log.Warning("incorrectly formatted provider entry in datastore")
continue
}
decoded, err := base32.RawStdEncoding.DecodeString(parts[2])
if err != nil {
log.Warning("error decoding base32 provider key")
continue
}
c, err := cid.Cast(decoded)
if err != nil {
log.Warning("error casting key to cid from datastore key: %s", err)
continue
}

c, err := cid.Cast(decoded)
if err != nil {
log.Warning("error casting key to cid from datastore key")
continue
return c, true
}

seen.Add(c)
return nil, false
}

return seen.Keys(), nil
return iter, nil
}

func (pm *ProviderManager) run() {
Expand All @@ -261,12 +254,17 @@ func (pm *ProviderManager) run() {

gp.resp <- provs
case <-tick.C:
keys, err := pm.getAllProvKeys()
keys, err := pm.getProvKeys()
if err != nil {
log.Error("Error loading provider keys: ", err)
continue
}
for _, k := range keys {
for {
k, ok := keys()
if !ok {
break
}

provs, err := pm.getProvSet(k)
if err != nil {
log.Error("error loading known provset: ", err)
Expand Down
62 changes: 60 additions & 2 deletions providers/providers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@ package providers
import (
"context"
"fmt"
"io/ioutil"
"os"
"testing"
"time"

cid "github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
//lds "github.com/ipfs/go-ds-leveldb"
u "github.com/ipfs/go-ipfs-util"
peer "github.com/libp2p/go-libp2p-peer"
)
Expand Down Expand Up @@ -139,12 +142,67 @@ func TestProvidesExpire(t *testing.T) {
t.Fatal("providers map not cleaned up")
}

allprovs, err := p.getAllProvKeys()
proviter, err := p.getProvKeys()
if err != nil {
t.Fatal(err)
}

if len(allprovs) != 0 {
_, ok := proviter()
if ok {
t.Fatal("expected everything to be cleaned out of the datastore")
}
}

var _ = ioutil.NopCloser
var _ = os.DevNull

/* This can be used for profiling. Keeping it commented out for now to avoid incurring extra CI time
func TestLargeProvidersSet(t *testing.T) {
old := lruCacheSize
lruCacheSize = 10
defer func() { lruCacheSize = old }()
dirn, err := ioutil.TempDir("", "provtest")
if err != nil {
t.Fatal(err)
}
opts := &lds.Options{
NoSync: true,
Compression: 1,
}
lds, err := lds.NewDatastore(dirn, opts)
if err != nil {
t.Fatal(err)
}
_ = lds
defer func() {
os.RemoveAll(dirn)
}()
ctx := context.Background()
var peers []peer.ID
for i := 0; i < 3000; i++ {
peers = append(peers, peer.ID(fmt.Sprint(i)))
}
mid := peer.ID("myself")
p := NewProviderManager(ctx, mid, lds)
defer p.proc.Close()
var cids []*cid.Cid
for i := 0; i < 1000; i++ {
c := cid.NewCidV0(u.Hash([]byte(fmt.Sprint(i))))
cids = append(cids, c)
for _, pid := range peers {
p.AddProvider(ctx, c, pid)
}
}
for _, c := range cids {
_ = p.GetProviders(ctx, c)
}
}
//*/

0 comments on commit 853debe

Please sign in to comment.