Skip to content

Commit

Permalink
Fix the way to take offset so that it can decode no matter the number…
Browse files Browse the repository at this point in the history
… of metrics is (#12)

* Add check if file to be mmapped is empty

* Stop using gzip

* Ignore empty partition

* Fix

* Use constant

* Reset everything
  • Loading branch information
nakabonne authored Jul 4, 2021
1 parent 167b85e commit a832e87
Show file tree
Hide file tree
Showing 5 changed files with 180 additions and 102 deletions.
7 changes: 7 additions & 0 deletions bstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ func (b *bstream) bytes() []byte {
return b.stream
}

// reset resets the buffer to be empty,
// but it retains the underlying storage for use by future writes.
func (b *bstream) reset() {
b.stream = b.stream[:0]
b.count = 0
}

type bit bool

const (
Expand Down
13 changes: 9 additions & 4 deletions disk_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"encoding/json"
"fmt"
"io"
"os"
"path/filepath"

Expand Down Expand Up @@ -51,7 +52,8 @@ func openDiskPartition(dirPath string) (partition, error) {
}

// Map data to the memory
f, err := os.Open(filepath.Join(dirPath, dataFileName))
dataPath := filepath.Join(dirPath, dataFileName)
f, err := os.Open(dataPath)
if err != nil {
return nil, fmt.Errorf("failed to read data file: %w", err)
}
Expand All @@ -60,6 +62,9 @@ func openDiskPartition(dirPath string) (partition, error) {
if err != nil {
return nil, fmt.Errorf("failed to fetch file info: %w", err)
}
if info.Size() == 0 {
return nil, ErrNoDataPoints
}
mapped, err := syscall.Mmap(int(f.Fd()), int(info.Size()))
if err != nil {
return nil, fmt.Errorf("failed to perform mmap: %w", err)
Expand Down Expand Up @@ -95,20 +100,20 @@ func (d *diskPartition) selectDataPoints(metric string, labels []Label, start, e
return nil, ErrNoDataPoints
}
r := bytes.NewReader(d.mappedFile)
if _, err := r.Seek(mt.Offset, 0); err != nil {
if _, err := r.Seek(mt.Offset, io.SeekStart); err != nil {
return nil, fmt.Errorf("failed to seek: %w", err)
}
decoder, err := newSeriesDecoder(r)
if err != nil {
return nil, fmt.Errorf("failed to generate decoder: %w", err)
return nil, fmt.Errorf("failed to generate decoder for metric %q in %q: %w", name, d.dirPath, err)
}

// TODO: Use binary search to select points on disk
points := make([]*DataPoint, 0, mt.NumDataPoints)
for i := 0; i < int(mt.NumDataPoints); i++ {
point := &DataPoint{}
if err := decoder.decodePoint(point); err != nil {
return nil, fmt.Errorf("failed to decode point: %w", err)
return nil, fmt.Errorf("failed to decode point of metric %q in %q: %w", name, d.dirPath, err)
}
if point.Timestamp < start {
continue
Expand Down
53 changes: 26 additions & 27 deletions encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
package tstorage

import (
"bytes"
"compress/gzip"
"encoding/binary"
"fmt"
"io"
Expand All @@ -36,7 +34,7 @@ import (

type seriesEncoder interface {
encodePoint(point *DataPoint) error
compress() error
flush() error
}

func newSeriesEncoder(w io.Writer) seriesEncoder {
Expand Down Expand Up @@ -132,15 +130,26 @@ func (e *gorillaEncoder) encodePoint(point *DataPoint) error {
return nil
}

// compress compress the buffered-date and writes them into the backend io.Writer
func (e *gorillaEncoder) compress() error {
// FIXME: Compress with ZStandard instead of gzip

gzipWriter := gzip.NewWriter(e.w)
if _, err := gzipWriter.Write(e.buf.bytes()); err != nil {
return err
// flush writes the buffered-bytes into the backend io.Writer
// and resets everything used for computation.
func (e *gorillaEncoder) flush() error {
// FIXME: Compress with ZStandard
_, err := e.w.Write(e.buf.bytes())
if err != nil {
return fmt.Errorf("failed to flush buffered bytes: %w", err)
}
return gzipWriter.Close()

e.buf.reset()
e.t0 = 0
e.t1 = 0
e.t = 0
e.tDelta = 0
e.v = 0
e.v = 0
e.leading = 0
e.trailing = 0

return nil
}

func (e *gorillaEncoder) writeVDelta(v float64) {
Expand Down Expand Up @@ -184,23 +193,13 @@ type seriesDecoder interface {

// newSeriesDecoder decompress data from the given Reader, then holds the decompressed data
func newSeriesDecoder(r io.Reader) (seriesDecoder, error) {
// FIXME: Decompress with ZStandard instead of gzip

gzipReader, err := gzip.NewReader(r)
// FIXME: Stop copying entire bytes to make BReader
b, err := io.ReadAll(r)
if err != nil {
return nil, fmt.Errorf("failed to new gzip reader: %w", err)
}
if err := gzipReader.Close(); err != nil {
return nil, fmt.Errorf("failed to close: %w", err)
}

// FIXME: Use another way to make bstreamReader from gzipReader
buf := new(bytes.Buffer)
if _, err := io.Copy(buf, gzipReader); err != nil {
return nil, fmt.Errorf("failed to copy bytes: %w", err)
return nil, fmt.Errorf("failed to read all bytes: %w", err)
}
return &gorillaDecoder{
br: newBReader(buf.Bytes()),
br: newBReader(b),
}, nil
}

Expand All @@ -222,11 +221,11 @@ func (d *gorillaDecoder) decodePoint(dst *DataPoint) error {
if d.numRead == 0 {
t, err := binary.ReadVarint(&d.br)
if err != nil {
return err
return fmt.Errorf("failed to read Timestamp of T0: %w", err)
}
v, err := d.br.readBits(64)
if err != nil {
return err
return fmt.Errorf("failed to read Value of T0: %w", err)
}
d.t = t
d.v = math.Float64frombits(v)
Expand Down
21 changes: 15 additions & 6 deletions storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,9 @@ func NewStorage(opts ...Option) (Storage, error) {
}
path := filepath.Join(s.dataPath, f.Name())
part, err := openDiskPartition(path)
if errors.Is(err, ErrNoDataPoints) {
continue
}
if err != nil {
return nil, fmt.Errorf("failed to open disk partition for %s: %w", path, err)
}
Expand Down Expand Up @@ -386,6 +389,12 @@ func (s *storage) flushPartitions() error {
return fmt.Errorf("failed to compact memory partition into %s: %w", dir, err)
}
newPart, err := openDiskPartition(dir)
if errors.Is(err, ErrNoDataPoints) {
if err := s.partitionList.remove(part); err != nil {
return fmt.Errorf("failed to remove partition: %w", err)
}
continue
}
if err != nil {
return fmt.Errorf("failed to generate disk partition for %s: %w", dir, err)
}
Expand Down Expand Up @@ -420,19 +429,22 @@ func (s *storage) flush(dirPath string, m *memoryPartition) error {
s.logger.Printf("unknown value found\n")
return false
}
// FIXME: Change the way to get offset. Currently, the encoder doesn't write each time. So the returned value of f.Seek will be 0 anytime.
offset, err := f.Seek(io.SeekStart, 1)
offset, err := f.Seek(0, io.SeekCurrent)
if err != nil {
s.logger.Printf("failed to set file offset of metric %q: %v\n", mt.name, err)
return false
}
// TODO: Merge out-of-order data points
for _, p := range mt.points {
if err := encoder.encodePoint(p); err != nil {
s.logger.Printf("failed to encode a data point of %q: %v\n", mt.name, err)
s.logger.Printf("failed to encode a data point that metric is %q: %v\n", mt.name, err)
return false
}
}
if err := encoder.flush(); err != nil {
s.logger.Printf("failed to flush data points that metric is %q: %v", mt.name, err)
return false
}
metrics[mt.name] = diskMetric{
Name: mt.name,
Offset: offset,
Expand All @@ -442,9 +454,6 @@ func (s *storage) flush(dirPath string, m *memoryPartition) error {
}
return true
})
if err := encoder.compress(); err != nil {
return err
}

b, err := json.Marshal(&meta{
MinTimestamp: m.minTimestamp(),
Expand Down
Loading

0 comments on commit a832e87

Please sign in to comment.