Skip to content

Commit

Permalink
engine: Support byte reader of objects
Browse files Browse the repository at this point in the history
Previously, the local object storage engine always decoded the requested
objects. In cases where it was necessary to obtain a binary object, an
excessive decoding-encoding round was performed.

Now `GetBytes` method to access object binary is provided. The method
allows to optimize allocations via optional parameter.

Refs #2316. Refs #2317.

Signed-off-by: Leonard Lyubich <leonard@morphbits.io>
  • Loading branch information
cthulhu-rider committed Feb 20, 2024
1 parent e90d28c commit 494a3fd
Show file tree
Hide file tree
Showing 18 changed files with 497 additions and 113 deletions.
6 changes: 6 additions & 0 deletions cmd/blobovnicza-to-peapod/blobovniczatree/get.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package blobovniczatree

import (
"errors"
"fmt"
"path/filepath"

Expand All @@ -9,6 +10,7 @@ import (
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/util/logicerr"
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -141,3 +143,7 @@ func (b *Blobovniczas) getObject(blz *blobovnicza.Blobovnicza, prm blobovnicza.G

return common.GetRes{Object: obj, RawData: data}, nil
}

func (b *Blobovniczas) GetBytes(_ oid.Address, _ func(ln int) []byte) ([]byte, error) {
return nil, errors.New("unimplemented Blobovniczas.GetBytes")
}
5 changes: 5 additions & 0 deletions pkg/local_object_storage/blobstor/common/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"

"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/compression"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
)

// Storage represents key-value object storage.
Expand All @@ -20,6 +21,10 @@ type Storage interface {
// This function MUST be called before Open.
SetReportErrorFunc(f func(string, error))

// TODO: docs
// GetBytes reads object by address and returns it in a canonical NeoFS binary
// format. Optional allocation func allows to replace the built-in make one.
GetBytes(addr oid.Address, alloc func(ln int) []byte) ([]byte, error)
Get(GetPrm) (GetRes, error)
GetRange(GetRangePrm) (GetRangeRes, error)
Exists(ExistsPrm) (ExistsRes, error)
Expand Down
12 changes: 11 additions & 1 deletion pkg/local_object_storage/blobstor/compression/compress.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,22 @@ func (c *Config) NeedsCompression(obj *objectSDK.Object) bool {
return c.Enabled
}

// IsCompressed checks whether given data is compressed.
func (c *Config) IsCompressed(data []byte) bool {
return len(data) >= 4 && bytes.Equal(data[:4], zstdFrameMagic)
}

// Decompress decompresses data if it starts with the magic
// and returns data untouched otherwise.
func (c *Config) Decompress(data []byte) ([]byte, error) {
if len(data) < 4 || !bytes.Equal(data[:4], zstdFrameMagic) {
if !c.IsCompressed(data) {
return data, nil
}
return c.DecompressForce(data)
}

// DecompressForce decompresses given compressed data.
func (c *Config) DecompressForce(data []byte) ([]byte, error) {
return c.decoder.DecodeAll(data, nil)
}

Expand Down
56 changes: 56 additions & 0 deletions pkg/local_object_storage/blobstor/fstree/fstree.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"crypto/sha256"
"errors"
"fmt"
"io"
"io/fs"
"math"
"os"
"path/filepath"
"strings"
Expand Down Expand Up @@ -297,6 +299,60 @@ func (t *FSTree) Get(prm common.GetPrm) (common.GetRes, error) {
return common.GetRes{Object: obj, RawData: data}, nil
}

// TODO: docs
func (t *FSTree) GetBytes(addr oid.Address, alloc func(ln int) []byte) ([]byte, error) {
p := t.treePath(addr)

f, err := os.Open(p)
if err != nil {
if errors.Is(err, fs.ErrNotExist) {
return nil, logicerr.Wrap(apistatus.ObjectNotFound{})
}
return nil, fmt.Errorf("open object file %q: %w", p, err)
}

fi, err := f.Stat()
if err != nil {
return nil, fmt.Errorf("stat object file %q: %w", p, err)
}
sz := fi.Size()
if sz > math.MaxInt {
return nil, fmt.Errorf("too big object file %d > %d", sz, math.MaxInt)
}
if sz == 0 {
return nil, nil
}

var b []byte
if alloc != nil {
b = alloc(int(sz))
} else {
b = make([]byte, sz)
}

_, err = io.ReadFull(f, b)
if err != nil {
if errors.Is(err, io.EOF) {
err = io.ErrUnexpectedEOF
}
return b, fmt.Errorf("read all %d bytes from object file %q: %w", sz, p, err)
}

if !t.IsCompressed(b) {
return b, nil
}

dec, err := t.DecompressForce(b)
if err != nil {
if cap(dec) > cap(b) {
b = dec
}
return b, fmt.Errorf("decompress object file data %q: %w", p, err)
}

return dec, nil
}

// GetRange implements common.Storage.
func (t *FSTree) GetRange(prm common.GetRangePrm) (common.GetRangeRes, error) {
res, err := t.Get(common.GetPrm{Address: prm.Address})
Expand Down
33 changes: 33 additions & 0 deletions pkg/local_object_storage/blobstor/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/util/logicerr"
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
)

// Get reads the object from b.
Expand All @@ -30,3 +31,35 @@ func (b *BlobStor) Get(prm common.GetPrm) (common.GetRes, error) {
}
return b.storage[0].Storage.Get(prm)
}

// TODO: docs
func (b *BlobStor) GetBytes(addr oid.Address, subStorageID []byte, alloc func(ln int) []byte) ([]byte, error) {
b.modeMtx.RLock()
defer b.modeMtx.RUnlock()

var bs []byte
if subStorageID == nil {
allocReuse := func(ln int) []byte {
if cap(bs) >= ln {
return bs[:ln]
}
if alloc != nil {
return alloc(ln)
}
return make([]byte, ln)
}
for i := range b.storage {
var err error
bs, err = b.storage[i].Storage.GetBytes(addr, allocReuse)
if err == nil || !errors.As(err, new(apistatus.ObjectNotFound)) {
return bs, err
}
}

return nil, logicerr.Wrap(apistatus.ObjectNotFound{})
}
if len(subStorageID) == 0 {
return b.storage[len(b.storage)-1].Storage.GetBytes(addr, alloc)
}
return b.storage[0].Storage.GetBytes(addr, alloc)
}
17 changes: 17 additions & 0 deletions pkg/local_object_storage/blobstor/internal/blobstortest/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,5 +45,22 @@ func TestGet(t *testing.T, cons Constructor, min, max uint64) {
res, err = s.Get(gPrm)
require.NoError(t, err)
require.Equal(t, objects[i].raw, res.RawData)

// Binary.
b, err := s.GetBytes(objects[i].addr, nil /* make */)
require.NoError(t, err)
require.Equal(t, objects[i].raw, b)

b2, err := s.GetBytes(objects[i].addr, func(ln int) []byte {
if cap(b) >= ln {
return b[:ln]
}
return make([]byte, ln)
})
require.NoError(t, err)
require.Equal(t, objects[i].raw, b2)
if len(b) > 0 {
require.Equal(t, &b[0], &b2[0])
}
}
}
47 changes: 47 additions & 0 deletions pkg/local_object_storage/blobstor/peapod/peapod.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,53 @@ func (x *Peapod) Get(prm common.GetPrm) (common.GetRes, error) {
return common.GetRes{Object: obj, RawData: data}, err
}

// TODO: docs
func (x *Peapod) GetBytes(addr oid.Address, alloc func(ln int) []byte) ([]byte, error) {
var b []byte

err := x.bolt.View(func(tx *bbolt.Tx) error {
bktRoot := tx.Bucket(rootBucket)
if bktRoot == nil {
return errMissingRootBucket
}

val := bktRoot.Get(keyForObject(addr))
if val == nil {
return apistatus.ErrObjectNotFound
}

if alloc != nil {
b = alloc(len(val))
} else {
b = make([]byte, len(val))
}
copy(b, val)

return nil
})
if err != nil {
if errors.Is(err, apistatus.ErrObjectNotFound) {
return nil, logicerr.Wrap(err)
}
return nil, fmt.Errorf("exec read-only BoltDB transaction: %w", err)
}

// copy-paste from FSTree
if !x.compress.IsCompressed(b) {
return b, nil
}

dec, err := x.compress.DecompressForce(b)
if err != nil {
if cap(dec) > cap(b) {
b = dec
}
return b, fmt.Errorf("decompress object BoltDB data: %w", err)
}

return dec, nil
}

// GetRange works like Get but reads specific payload range.
func (x *Peapod) GetRange(prm common.GetRangePrm) (common.GetRangeRes, error) {
// copy-paste from FSTree
Expand Down
35 changes: 35 additions & 0 deletions pkg/local_object_storage/blobstor/peapod/peapod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,3 +274,38 @@ func TestPeapod_IterateAddresses(t *testing.T) {
require.NoError(t, err)
require.Equal(t, mSrc, mDst)
}

func TestPeapod_GetBytes(t *testing.T) {
ppd := newTestPeapod(t)
addr := oidtest.Address()
obj := objecttest.Object(t)

objBin, err := obj.Marshal()
require.NoError(t, err)

b, err := ppd.GetBytes(addr, nil /* make */)
require.ErrorIs(t, err, apistatus.ErrObjectNotFound)
require.Nil(t, b)

_, err = ppd.Put(common.PutPrm{
Address: addr,
RawData: objBin,
})
require.NoError(t, err)

b, err = ppd.GetBytes(addr, nil /* make */)
require.NoError(t, err)
require.Equal(t, objBin, b)

b2, err := ppd.GetBytes(addr, func(ln int) []byte {
if cap(b) >= ln {
return b[:ln]
}
return make([]byte, ln)
})
require.NoError(t, err)
require.Equal(t, objBin, b2)
if len(b) > 0 {
require.Equal(t, &b[0], &b2[0])
}
}
11 changes: 6 additions & 5 deletions pkg/local_object_storage/engine/error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (

const errSmallSize = 256

func newEngineWithErrorThreshold(t testing.TB, dir string, errThreshold uint32) (*StorageEngine, string, [2]*shard.ID) {
func newEngine(t testing.TB, dir string, opts ...Option) (*StorageEngine, string, [2]*shard.ID) {
if dir == "" {
var err error

Expand All @@ -31,10 +31,7 @@ func newEngineWithErrorThreshold(t testing.TB, dir string, errThreshold uint32)
t.Cleanup(func() { _ = os.RemoveAll(dir) })
}

e := New(
WithLogger(zaptest.NewLogger(t)),
WithShardPoolSize(1),
WithErrorThreshold(errThreshold))
e := New(append([]Option{WithShardPoolSize(1)}, opts...)...)

var ids [2]*shard.ID
var err error
Expand All @@ -60,6 +57,10 @@ func newEngineWithErrorThreshold(t testing.TB, dir string, errThreshold uint32)
return e, dir, ids
}

func newEngineWithErrorThreshold(t testing.TB, dir string, errThreshold uint32) (*StorageEngine, string, [2]*shard.ID) {
return newEngine(t, dir, WithLogger(zaptest.NewLogger(t)), WithErrorThreshold(errThreshold))
}

func TestErrorReporting(t *testing.T) {
t.Run("ignore errors by default", func(t *testing.T) {
e, dir, id := newEngineWithErrorThreshold(t, "", 0)
Expand Down
Loading

0 comments on commit 494a3fd

Please sign in to comment.