Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

opt: piece storage supports files with .car suffix #459

Merged
merged 1 commit into from
Oct 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 20 additions & 20 deletions piecestorage/storagemgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ type PieceStorageManager struct {
}

func NewPieceStorageManager(cfg *config.PieceStorage) (*PieceStorageManager, error) {
storages := make(map[string]IPieceStorage)
psm := &PieceStorageManager{
lk: sync.RWMutex{},
storages: make(map[string]IPieceStorage),
}

// todo: extract name check logic to a function and check blank in name

Expand All @@ -27,46 +30,42 @@ func NewPieceStorageManager(cfg *config.PieceStorage) (*PieceStorageManager, err
if fsCfg.Name == "" {
return nil, fmt.Errorf("fs piece storage name is empty, must set storage name in piece storage config `name=yourname`")
}
_, ok := storages[fsCfg.Name]
if ok {
return nil, fmt.Errorf("duplicate storage name: %s", fsCfg.Name)
}

st, err := NewFsPieceStorage(fsCfg)
if err != nil {
return nil, fmt.Errorf("unable to create fs piece storage %w", err)
}
storages[fsCfg.Name] = st

if err := psm.AddPieceStorage(st); err != nil {
return nil, err
}
}

for _, s3Cfg := range cfg.S3 {
// check if storage already exist in manager, and it's name is not empty
if s3Cfg.Name == "" {
return nil, fmt.Errorf("s3 pieceStorage name is empty, must set storage name in piece storage config `name=yourname`")
}
_, ok := storages[s3Cfg.Name]
if ok {
return nil, fmt.Errorf("duplicate storage name: %s", s3Cfg.Name)
}

st, err := NewS3PieceStorage(s3Cfg)
if err != nil {
return nil, fmt.Errorf("unable to create object piece storage %w", err)
}
storages[s3Cfg.Name] = st

if err := psm.AddPieceStorage(st); err != nil {
return nil, err
}
}
return &PieceStorageManager{
lk: sync.RWMutex{},
storages: storages,
}, nil

return psm, nil
}

func (p *PieceStorageManager) FindStorageForRead(ctx context.Context, s string) (IPieceStorage, error) {
var storages []IPieceStorage
_ = p.EachPieceStorage(func(st IPieceStorage) error {
has, err := st.Has(ctx, s)
if err != nil {
log.Warnf("got error while check avaibale in storage: %s", err.Error())
log.Warnf("got error while check available in storage: %s", err.Error())
return nil
}
if has {
Expand Down Expand Up @@ -120,7 +119,7 @@ func (p *PieceStorageManager) AddMemPieceStorage(s IPieceStorage) {
p.lk.Lock()
defer p.lk.Unlock()

p.storages[s.GetName()] = s
p.storages[s.GetName()] = newStoreWrapper(s)
}

func (p *PieceStorageManager) AddPieceStorage(s IPieceStorage) error {
Expand All @@ -132,7 +131,8 @@ func (p *PieceStorageManager) AddPieceStorage(s IPieceStorage) error {
if ok {
return fmt.Errorf("duplicate storage name: %s", s.GetName())
}
p.storages[s.GetName()] = s
p.storages[s.GetName()] = newStoreWrapper(s)

return nil
}

Expand Down Expand Up @@ -183,7 +183,7 @@ func (p *PieceStorageManager) ListStorageInfos() types.PieceStorageInfos {
}
switch st.Type() {
case S3:
cfg := st.(*s3PieceStorage).s3Cfg
cfg := st.(*storeWrapper).IPieceStorage.(*s3PieceStorage).s3Cfg
s3 = append(s3, types.S3Storage{
Name: cfg.Name,
EndPoint: cfg.EndPoint,
Expand All @@ -194,7 +194,7 @@ func (p *PieceStorageManager) ListStorageInfos() types.PieceStorageInfos {
})

case FS:
cfg := st.(*fsPieceStorage).fsCfg
cfg := st.(*storeWrapper).IPieceStorage.(*fsPieceStorage).fsCfg
fs = append(fs, types.FsStorage{
Name: cfg.Name,
Path: cfg.Path,
Expand Down
26 changes: 25 additions & 1 deletion piecestorage/storagemgr_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package piecestorage

import (
"context"
"fmt"
"math"
"os"
"path/filepath"
"testing"

"github.com/filecoin-project/venus/venus-shared/types/market"
Expand Down Expand Up @@ -104,7 +106,7 @@ func TestRandSelect(t *testing.T) {
for i := 0; i < 1000; i++ {
st, err := psm.FindStorageForWrite(1024 * 1024)
assert.Nil(t, err)
selectName = append(selectName, st.(*MemPieceStore).Name)
selectName = append(selectName, st.(*storeWrapper).IPieceStorage.(*MemPieceStore).Name)
}
assert.Contains(t, selectName, "1")
assert.Contains(t, selectName, "2")
Expand Down Expand Up @@ -149,3 +151,25 @@ func TestEachStorage(t *testing.T) {
assert.NotNil(t, err)
assert.Equal(t, 2, count)
}

func TestMultiFormatFiles(t *testing.T) {
assert.Equal(t, []string{"test", "test.car"}, extendPiece("test"))
assert.Equal(t, []string{"test", "test.car"}, extendPiece("test.car"))

tmpDir := t.TempDir()
psm, err := NewPieceStorageManager(&config.PieceStorage{
Fs: []*config.FsPieceStorage{
{Name: "test", Path: tmpDir},
},
})
assert.Nil(t, err)

assert.NoError(t, os.WriteFile(filepath.Join(tmpDir, "test"), []byte("xxx"), 0777))
assert.NoError(t, os.WriteFile(filepath.Join(tmpDir, "test2"+carSuffix), []byte("xxx"), 0777))

ctx := context.Background()
for _, name := range []string{"test", "test2"} {
_, err = psm.FindStorageForRead(ctx, name)
assert.NoError(t, err)
}
}
106 changes: 106 additions & 0 deletions piecestorage/store_wrapper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package piecestorage

import (
"context"
"io"
"strings"

"github.com/filecoin-project/dagstore/mount"
)

const carSuffix = ".car"

var _ IPieceStorage = (*storeWrapper)(nil)

type storeWrapper struct {
IPieceStorage
}

func newStoreWrapper(s IPieceStorage) IPieceStorage {
return &storeWrapper{IPieceStorage: s}
}

func extendPiece(s string) []string {
if strings.HasSuffix(s, carSuffix) {
return []string{strings.Split(s, carSuffix)[0], s}
}
return []string{s, s + carSuffix}
}

func (sw *storeWrapper) Len(ctx context.Context, s string) (int64, error) {
var l int64
var err error
for _, name := range extendPiece(s) {
l, err = sw.IPieceStorage.Len(ctx, name)
if err == nil {
return l, nil
}

}

return l, err
}

func (sw *storeWrapper) Has(ctx context.Context, s string) (bool, error) {
var has bool
var err error
for _, name := range extendPiece(s) {
has, err = sw.IPieceStorage.Has(ctx, name)
if err == nil && has {
return has, nil
}
}

return has, err
}

func (sw *storeWrapper) GetReaderCloser(ctx context.Context, s string) (io.ReadCloser, error) {
var rc io.ReadCloser
var err error
for _, name := range extendPiece(s) {
rc, err = sw.IPieceStorage.GetReaderCloser(ctx, name)
if err == nil {
return rc, nil
}
}

return rc, err
}

func (sw *storeWrapper) GetMountReader(ctx context.Context, s string) (mount.Reader, error) {
var reader mount.Reader
var err error
for _, name := range extendPiece(s) {
reader, err = sw.IPieceStorage.GetMountReader(ctx, name)
if err == nil {
return reader, nil
}
}

return reader, err
}

func (sw *storeWrapper) GetRedirectUrl(ctx context.Context, s string) (string, error) {
var url string
var err error
for _, name := range extendPiece(s) {
url, err = sw.IPieceStorage.GetRedirectUrl(ctx, name)
if err == nil {
return url, nil
}
}

return url, err
}

func (sw *storeWrapper) Validate(s string) error {
var err error
for _, name := range extendPiece(s) {
err = sw.IPieceStorage.Validate(name)
if err == nil {
return nil
}
}

return err
}
Loading