Skip to content

Commit

Permalink
[nspcc-dev#1319] blobstor: Compress big objects in a streaming fashion
Browse files Browse the repository at this point in the history
Signed-off-by: Evgenii Stratonikov <evgeniy@nspcc.ru>
  • Loading branch information
fyrchik authored and aprasolova committed Oct 19, 2022
1 parent 8150d48 commit b5f3df0
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 6 deletions.
17 changes: 17 additions & 0 deletions pkg/local_object_storage/blobstor/fstree/fstree.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,23 @@ func (t *FSTree) Put(addr *addressSDK.Address, data []byte) error {
return os.WriteFile(p, data, t.Permissions)
}

// PutStream puts executes handler on a file opened for write.
func (t *FSTree) PutStream(addr *addressSDK.Address, handler func(*os.File) error) error {
p := t.treePath(addr)

if err := util.MkdirAllX(filepath.Dir(p), t.Permissions); err != nil {
return err
}

f, err := os.OpenFile(p, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, t.Permissions)
if err != nil {
return err
}
defer f.Close()

return handler(f)
}

// Get returns object from storage by address.
func (t *FSTree) Get(addr *addressSDK.Address) ([]byte, error) {
p := t.treePath(addr)
Expand Down
24 changes: 18 additions & 6 deletions pkg/local_object_storage/blobstor/put.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package blobstor

import (
"fmt"
"os"
"strings"

"github.com/klauspost/compress/zstd"
"github.com/nspcc-dev/neofs-node/pkg/core/object"
storagelog "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/internal/log"
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
Expand Down Expand Up @@ -73,13 +75,19 @@ func (b *BlobStor) NeedsCompression(obj *objectSDK.Object) bool {
func (b *BlobStor) PutRaw(addr *addressSDK.Address, data []byte, compress bool) (*PutRes, error) {
big := b.isBig(data)

if compress {
data = b.compressor(data)
}

if big {
// save object in shallow dir
err := b.fsTree.Put(addr, data)
var err error
if compress {
err = b.fsTree.PutStream(addr, func(f *os.File) error {
enc, _ := zstd.NewWriter(f) // nil error if no options are provided
if _, err := enc.Write(data); err != nil {
return err
}
return enc.Close()
})
} else {
err = b.fsTree.Put(addr, data)
}
if err != nil {
return nil, err
}
Expand All @@ -89,6 +97,10 @@ func (b *BlobStor) PutRaw(addr *addressSDK.Address, data []byte, compress bool)
return new(PutRes), nil
}

if compress {
data = b.compressor(data)
}

// save object in blobovnicza
res, err := b.blobovniczas.put(addr, data)
if err != nil {
Expand Down

0 comments on commit b5f3df0

Please sign in to comment.