Skip to content
This repository has been archived by the owner on Aug 13, 2019. It is now read-only.

Added storage size based retention method and new metrics #343

Merged
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
## master / unreleased
- [CHANGE] New `WALSegmentSize` option to override the `DefaultOptions.WALSegmentSize`. Added to allow using smaller wal files. For example using tmpfs on a RPI to minimise the SD card wear out from the constant WAL writes. As part of this change the `DefaultOptions.WALSegmentSize` constant was also exposed.
- [FEATURE] Size base retention through `Options.MaxBytes`. As part of this change added new metrics - `prometheus_tsdb_storage_blocks_bytes_total`, `prometheus_tsdb_size_retentions_total`, `prometheus_tsdb_time_retentions_total` and new public interface `SizeReader: Size() int64`
- [REMOVED] `PrefixMatcher` is considered unused so was removed.
- [CLEANUP] `Options.WALFlushInterval` is removed as it wasn't used anywhere.

Expand Down
38 changes: 36 additions & 2 deletions block.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"path/filepath"
"sync"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/tsdb/chunkenc"
Expand Down Expand Up @@ -140,6 +142,12 @@ type Appendable interface {
Appender() Appender
}

// SizeReader returns the size of the object in bytes.
type SizeReader interface {
// Size returns the size in bytes.
Size() int64
}

// BlockMeta provides meta information about a block.
type BlockMeta struct {
// Unique identifier for the block and its contents. Changes on compaction.
Expand All @@ -166,6 +174,7 @@ type BlockStats struct {
NumSeries uint64 `json:"numSeries,omitempty"`
NumChunks uint64 `json:"numChunks,omitempty"`
NumTombstones uint64 `json:"numTombstones,omitempty"`
NumBytes int64 `json:"numBytes,omitempty"`
}

// BlockDesc describes a block by ULID and time range.
krasi-georgiev marked this conversation as resolved.
Show resolved Hide resolved
Expand Down Expand Up @@ -257,7 +266,10 @@ type Block struct {

// OpenBlock opens the block in the directory. It can be passed a chunk pool, which is used
// to instantiate chunk structs.
func OpenBlock(dir string, pool chunkenc.Pool) (*Block, error) {
func OpenBlock(logger log.Logger, dir string, pool chunkenc.Pool) (*Block, error) {
krasi-georgiev marked this conversation as resolved.
Show resolved Hide resolved
krasi-georgiev marked this conversation as resolved.
Show resolved Hide resolved
if logger == nil {
logger = log.NewNopLogger()
}
meta, err := readMetaFile(dir)
if err != nil {
return nil, err
Expand All @@ -272,11 +284,20 @@ func OpenBlock(dir string, pool chunkenc.Pool) (*Block, error) {
return nil, err
}

tr, err := readTombstones(dir)
tr, tsr, err := readTombstones(dir)
if err != nil {
return nil, err
}

// TODO refactor to set this at block creation time as
gouthamve marked this conversation as resolved.
Show resolved Hide resolved
// that would be the logical place for a block size to be calculated.
bs := blockSize(cr, ir, tsr)
meta.Stats.NumBytes = bs
err = writeMetaFile(dir, meta)
if err != nil {
level.Warn(logger).Log("msg", "couldn't write the meta file for the block size", "block", dir, "err", err)
}

pb := &Block{
dir: dir,
meta: *meta,
Expand All @@ -288,6 +309,16 @@ func OpenBlock(dir string, pool chunkenc.Pool) (*Block, error) {
return pb, nil
}

func blockSize(rr ...SizeReader) int64 {
var total int64
for _, r := range rr {
if r != nil {
total += r.Size()
}
}
return total
}

// Close closes the on-disk block. It blocks as long as there are readers reading from the block.
func (pb *Block) Close() error {
pb.mtx.Lock()
Expand Down Expand Up @@ -315,6 +346,9 @@ func (pb *Block) Dir() string { return pb.dir }
// Meta returns meta information about the block.
func (pb *Block) Meta() BlockMeta { return pb.meta }

// Size returns the number of bytes that the block takes up.
func (pb *Block) Size() int64 { return pb.meta.Stats.NumBytes }

// ErrClosing is returned when a block is in the process of being closed.
var ErrClosing = errors.New("block is closing")

Expand Down
4 changes: 2 additions & 2 deletions block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,14 @@ func TestSetCompactionFailed(t *testing.T) {
defer os.RemoveAll(tmpdir)

blockDir := createBlock(t, tmpdir, 0, 0, 0)
b, err := OpenBlock(blockDir, nil)
b, err := OpenBlock(nil, blockDir, nil)
testutil.Ok(t, err)
testutil.Equals(t, false, b.meta.Compaction.Failed)
testutil.Ok(t, b.setCompactionFailed())
testutil.Equals(t, true, b.meta.Compaction.Failed)
testutil.Ok(t, b.Close())

b, err = OpenBlock(blockDir, nil)
b, err = OpenBlock(nil, blockDir, nil)
testutil.Ok(t, err)
testutil.Equals(t, true, b.meta.Compaction.Failed)
testutil.Ok(t, b.Close())
Expand Down
24 changes: 15 additions & 9 deletions chunks/chunks.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,17 +284,15 @@ func (b realByteSlice) Sub(start, end int) ByteSlice {
// Reader implements a SeriesReader for a serialized byte stream
// of series data.
type Reader struct {
// The underlying bytes holding the encoded series data.
bs []ByteSlice

// Closers for resources behind the byte slices.
cs []io.Closer

bs []ByteSlice // The underlying bytes holding the encoded series data.
cs []io.Closer // Closers for resources behind the byte slices.
size int64 // The total size of bytes in the reader.
pool chunkenc.Pool
}

func newReader(bs []ByteSlice, cs []io.Closer, pool chunkenc.Pool) (*Reader, error) {
cr := Reader{pool: pool, bs: bs, cs: cs}
var totalSize int64

for i, b := range cr.bs {
if b.Len() < 4 {
Expand All @@ -304,7 +302,9 @@ func newReader(bs []ByteSlice, cs []io.Closer, pool chunkenc.Pool) (*Reader, err
if m := binary.BigEndian.Uint32(b.Range(0, 4)); m != MagicChunks {
return nil, errors.Errorf("invalid magic number %x", m)
}
totalSize += int64(b.Len())
}
cr.size = totalSize
return &cr, nil
}

Expand All @@ -327,9 +327,10 @@ func NewDirReader(dir string, pool chunkenc.Pool) (*Reader, error) {
pool = chunkenc.NewPool()
}

var bs []ByteSlice
var cs []io.Closer

var (
bs []ByteSlice
cs []io.Closer
)
for _, fn := range files {
f, err := fileutil.OpenMmapFile(fn)
if err != nil {
Expand All @@ -345,6 +346,11 @@ func (s *Reader) Close() error {
return closeAll(s.cs...)
}

// Size returns the size of the chunks.
func (s *Reader) Size() int64 {
krasi-georgiev marked this conversation as resolved.
Show resolved Hide resolved
return s.size
}

func (s *Reader) Chunk(ref uint64) (chunkenc.Chunk, error) {
var (
seq = int(ref >> 32)
Expand Down
2 changes: 1 addition & 1 deletion compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (u

if b == nil {
var err error
b, err = OpenBlock(d, c.chunkPool)
b, err = OpenBlock(c.logger, d, c.chunkPool)
if err != nil {
return uid, err
}
Expand Down
Loading