Skip to content

Commit

Permalink
Merge pull request #459 from ipfs-force-community/opt/support-car-suffix
Browse files Browse the repository at this point in the history
opt: piece storage supports files with .car suffix
  • Loading branch information
LinZexiao authored Oct 10, 2023
2 parents 5035048 + e462c0b commit aba981a
Show file tree
Hide file tree
Showing 3 changed files with 151 additions and 21 deletions.
40 changes: 20 additions & 20 deletions piecestorage/storagemgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ type PieceStorageManager struct {
}

func NewPieceStorageManager(cfg *config.PieceStorage) (*PieceStorageManager, error) {
storages := make(map[string]IPieceStorage)
psm := &PieceStorageManager{
lk: sync.RWMutex{},
storages: make(map[string]IPieceStorage),
}

// todo: extract name check logic to a function and check blank in name

Expand All @@ -27,46 +30,42 @@ func NewPieceStorageManager(cfg *config.PieceStorage) (*PieceStorageManager, err
if fsCfg.Name == "" {
return nil, fmt.Errorf("fs piece storage name is empty, must set storage name in piece storage config `name=yourname`")
}
_, ok := storages[fsCfg.Name]
if ok {
return nil, fmt.Errorf("duplicate storage name: %s", fsCfg.Name)
}

st, err := NewFsPieceStorage(fsCfg)
if err != nil {
return nil, fmt.Errorf("unable to create fs piece storage %w", err)
}
storages[fsCfg.Name] = st

if err := psm.AddPieceStorage(st); err != nil {
return nil, err
}
}

for _, s3Cfg := range cfg.S3 {
// check if storage already exist in manager, and it's name is not empty
if s3Cfg.Name == "" {
return nil, fmt.Errorf("s3 pieceStorage name is empty, must set storage name in piece storage config `name=yourname`")
}
_, ok := storages[s3Cfg.Name]
if ok {
return nil, fmt.Errorf("duplicate storage name: %s", s3Cfg.Name)
}

st, err := NewS3PieceStorage(s3Cfg)
if err != nil {
return nil, fmt.Errorf("unable to create object piece storage %w", err)
}
storages[s3Cfg.Name] = st

if err := psm.AddPieceStorage(st); err != nil {
return nil, err
}
}
return &PieceStorageManager{
lk: sync.RWMutex{},
storages: storages,
}, nil

return psm, nil
}

func (p *PieceStorageManager) FindStorageForRead(ctx context.Context, s string) (IPieceStorage, error) {
var storages []IPieceStorage
_ = p.EachPieceStorage(func(st IPieceStorage) error {
has, err := st.Has(ctx, s)
if err != nil {
log.Warnf("got error while check avaibale in storage: %s", err.Error())
log.Warnf("got error while check available in storage: %s", err.Error())
return nil
}
if has {
Expand Down Expand Up @@ -120,7 +119,7 @@ func (p *PieceStorageManager) AddMemPieceStorage(s IPieceStorage) {
p.lk.Lock()
defer p.lk.Unlock()

p.storages[s.GetName()] = s
p.storages[s.GetName()] = newStoreWrapper(s)
}

func (p *PieceStorageManager) AddPieceStorage(s IPieceStorage) error {
Expand All @@ -132,7 +131,8 @@ func (p *PieceStorageManager) AddPieceStorage(s IPieceStorage) error {
if ok {
return fmt.Errorf("duplicate storage name: %s", s.GetName())
}
p.storages[s.GetName()] = s
p.storages[s.GetName()] = newStoreWrapper(s)

return nil
}

Expand Down Expand Up @@ -183,7 +183,7 @@ func (p *PieceStorageManager) ListStorageInfos() types.PieceStorageInfos {
}
switch st.Type() {
case S3:
cfg := st.(*s3PieceStorage).s3Cfg
cfg := st.(*storeWrapper).IPieceStorage.(*s3PieceStorage).s3Cfg
s3 = append(s3, types.S3Storage{
Name: cfg.Name,
EndPoint: cfg.EndPoint,
Expand All @@ -194,7 +194,7 @@ func (p *PieceStorageManager) ListStorageInfos() types.PieceStorageInfos {
})

case FS:
cfg := st.(*fsPieceStorage).fsCfg
cfg := st.(*storeWrapper).IPieceStorage.(*fsPieceStorage).fsCfg
fs = append(fs, types.FsStorage{
Name: cfg.Name,
Path: cfg.Path,
Expand Down
26 changes: 25 additions & 1 deletion piecestorage/storagemgr_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package piecestorage

import (
"context"
"fmt"
"math"
"os"
"path/filepath"
"testing"

"github.com/filecoin-project/venus/venus-shared/types/market"
Expand Down Expand Up @@ -104,7 +106,7 @@ func TestRandSelect(t *testing.T) {
for i := 0; i < 1000; i++ {
st, err := psm.FindStorageForWrite(1024 * 1024)
assert.Nil(t, err)
selectName = append(selectName, st.(*MemPieceStore).Name)
selectName = append(selectName, st.(*storeWrapper).IPieceStorage.(*MemPieceStore).Name)
}
assert.Contains(t, selectName, "1")
assert.Contains(t, selectName, "2")
Expand Down Expand Up @@ -149,3 +151,25 @@ func TestEachStorage(t *testing.T) {
assert.NotNil(t, err)
assert.Equal(t, 2, count)
}

func TestMultiFormatFiles(t *testing.T) {
assert.Equal(t, []string{"test", "test.car"}, extendPiece("test"))
assert.Equal(t, []string{"test", "test.car"}, extendPiece("test.car"))

tmpDir := t.TempDir()
psm, err := NewPieceStorageManager(&config.PieceStorage{
Fs: []*config.FsPieceStorage{
{Name: "test", Path: tmpDir},
},
})
assert.Nil(t, err)

assert.NoError(t, os.WriteFile(filepath.Join(tmpDir, "test"), []byte("xxx"), 0777))
assert.NoError(t, os.WriteFile(filepath.Join(tmpDir, "test2"+carSuffix), []byte("xxx"), 0777))

ctx := context.Background()
for _, name := range []string{"test", "test2"} {
_, err = psm.FindStorageForRead(ctx, name)
assert.NoError(t, err)
}
}
106 changes: 106 additions & 0 deletions piecestorage/store_wrapper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package piecestorage

import (
"context"
"io"
"strings"

"github.com/filecoin-project/dagstore/mount"
)

const carSuffix = ".car"

var _ IPieceStorage = (*storeWrapper)(nil)

type storeWrapper struct {
IPieceStorage
}

func newStoreWrapper(s IPieceStorage) IPieceStorage {
return &storeWrapper{IPieceStorage: s}
}

func extendPiece(s string) []string {
if strings.HasSuffix(s, carSuffix) {
return []string{strings.Split(s, carSuffix)[0], s}
}
return []string{s, s + carSuffix}
}

func (sw *storeWrapper) Len(ctx context.Context, s string) (int64, error) {
var l int64
var err error
for _, name := range extendPiece(s) {
l, err = sw.IPieceStorage.Len(ctx, name)
if err == nil {
return l, nil
}

}

return l, err
}

func (sw *storeWrapper) Has(ctx context.Context, s string) (bool, error) {
var has bool
var err error
for _, name := range extendPiece(s) {
has, err = sw.IPieceStorage.Has(ctx, name)
if err == nil && has {
return has, nil
}
}

return has, err
}

func (sw *storeWrapper) GetReaderCloser(ctx context.Context, s string) (io.ReadCloser, error) {
var rc io.ReadCloser
var err error
for _, name := range extendPiece(s) {
rc, err = sw.IPieceStorage.GetReaderCloser(ctx, name)
if err == nil {
return rc, nil
}
}

return rc, err
}

func (sw *storeWrapper) GetMountReader(ctx context.Context, s string) (mount.Reader, error) {
var reader mount.Reader
var err error
for _, name := range extendPiece(s) {
reader, err = sw.IPieceStorage.GetMountReader(ctx, name)
if err == nil {
return reader, nil
}
}

return reader, err
}

func (sw *storeWrapper) GetRedirectUrl(ctx context.Context, s string) (string, error) {
var url string
var err error
for _, name := range extendPiece(s) {
url, err = sw.IPieceStorage.GetRedirectUrl(ctx, name)
if err == nil {
return url, nil
}
}

return url, err
}

func (sw *storeWrapper) Validate(s string) error {
var err error
for _, name := range extendPiece(s) {
err = sw.IPieceStorage.Validate(name)
if err == nil {
return nil
}
}

return err
}

0 comments on commit aba981a

Please sign in to comment.