Skip to content

Commit

Permalink
Merge pull request #7830 from influxdata/jw-cache-snapshot
Browse files Browse the repository at this point in the history
Cache snapshotting performance improvements
  • Loading branch information
jwilder authored Jan 12, 2017
2 parents bcdb0a7 + 11f2645 commit 7e7f0d0
Show file tree
Hide file tree
Showing 5 changed files with 228 additions and 71 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ The stress tool `influx_stress` will be removed in a subsequent release. We reco
- [#7323](https://github.com/influxdata/influxdb/pull/7323): Allow add items to array config via ENV
- [#4619](https://github.com/influxdata/influxdb/issues/4619): Support subquery execution in the query language.
- [#7326](https://github.com/influxdata/influxdb/issues/7326): Verbose output for SSL connection errors.
- [#7830](https://github.com/influxdata/influxdb/pull/7830): Cache snapshotting performance improvements

### Bugfixes

Expand Down
73 changes: 37 additions & 36 deletions tsdb/engine/tsm1/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,23 +166,34 @@ const (
statCacheWriteDropped = "writeDropped"
)

// storer is the interface that descibes a cache's store.
type storer interface {
entry(key string) (*entry, bool) // Get an entry by its key.
write(key string, values Values) error // Write an entry to the store.
add(key string, entry *entry) // Add a new entry to the store.
remove(key string) // Remove an entry from the store.
keys(sorted bool) []string // Return an optionally sorted slice of entry keys.
apply(f func(string, *entry) error) error // Apply f to all entries in the store in parallel.
applySerial(f func(string, *entry) error) error // Apply f to all entries in serial.
reset() // Reset the store to an initial unused state.
}

// Cache maintains an in-memory store of Values for a set of keys.
type Cache struct {
// TODO(edd): size is protected by mu but due to a bug in atomic size needs
// to be the first word in the struct, as that's the only place where you're
// guaranteed to be 64-bit aligned on a 32 bit system. See:
// https://golang.org/pkg/sync/atomic/#pkg-note-BUG
size uint64
commit sync.Mutex
// Due to a bug in atomic size needs to be the first word in the struct, as
// that's the only place where you're guaranteed to be 64-bit aligned on a
// 32 bit system. See: https://golang.org/pkg/sync/atomic/#pkg-note-BUG
size uint64
snapshotSize uint64

mu sync.RWMutex
store *ring
store storer
maxSize uint64

// snapshots are the cache objects that are currently being written to tsm files
// they're kept in memory while flushing so they can be queried along with the cache.
// they are read only and should never be modified
snapshot *Cache
snapshotSize uint64
snapshotting bool

// This number is the number of pending or failed WriteSnaphot attempts since the last successful one.
Expand Down Expand Up @@ -280,13 +291,9 @@ func (c *Cache) WriteMulti(values map[string][]Value) error {
addedSize += uint64(Values(v).Size())
}

// Set everything under one RLock. We'll optimistially set size here, and
// then decrement it later if there is a write error.
c.increaseSize(addedSize)
limit := c.maxSize
n := c.Size() + atomic.LoadUint64(&c.snapshotSize) + addedSize

// Enough room in the cache?
limit := c.maxSize // maxSize is safe for reading without a lock.
n := c.Size() + atomic.LoadUint64(&c.snapshotSize) + addedSize
if limit > 0 && n > limit {
atomic.AddInt64(&c.stats.WriteErr, 1)
return ErrCacheMemorySizeLimitExceeded(n, limit)
Expand All @@ -297,6 +304,8 @@ func (c *Cache) WriteMulti(values map[string][]Value) error {
store := c.store
c.mu.RUnlock()

// We'll optimistially set size here, and then decrement it for write errors.
c.increaseSize(addedSize)
for k, v := range values {
if err := store.write(k, v); err != nil {
// The write failed, hold onto the error and adjust the size delta.
Expand Down Expand Up @@ -345,35 +354,26 @@ func (c *Cache) Snapshot() (*Cache, error) {
}
}

// Append the current cache values to the snapshot. Because we're accessing
// the Cache we need to call f on each partition in serial.
if err := c.store.applySerial(func(k string, e *entry) error {
e.mu.RLock()
defer e.mu.RUnlock()
snapshotEntry, ok := c.snapshot.store.entry(k)
if ok {
if err := snapshotEntry.add(e.values); err != nil {
return err
}
} else {
c.snapshot.store.add(k, e)
snapshotEntry = e
}
atomic.AddUint64(&c.snapshotSize, uint64(Values(e.values).Size()))
return nil
}); err != nil {
return nil, err
// Did a prior snapshot exist that failed? If so, return the existing
// snapshot to retry.
if c.snapshot.Size() > 0 {
return c.snapshot, nil
}

snapshotSize := c.Size() // record the number of bytes written into a snapshot
c.snapshot.store, c.store = c.store, c.snapshot.store
snapshotSize := c.Size()

// Save the size of the snapshot on the snapshot cache
atomic.StoreUint64(&c.snapshot.size, snapshotSize)
// Save the size of the snapshot on the live cache
atomic.StoreUint64(&c.snapshotSize, snapshotSize)

// Reset the cache's store.
c.store.reset()
atomic.StoreUint64(&c.size, 0)
c.lastSnapshot = time.Now()

c.updateMemSize(-int64(snapshotSize)) // decrement the number of bytes in cache
c.updateCachedBytes(snapshotSize) // increment the number of bytes added to the snapshot
c.updateCachedBytes(snapshotSize) // increment the number of bytes added to the snapshot
c.updateSnapshots()

return c.snapshot, nil
Expand Down Expand Up @@ -401,14 +401,15 @@ func (c *Cache) ClearSnapshot(success bool) {

if success {
c.snapshotAttempts = 0
atomic.StoreUint64(&c.snapshotSize, 0)
c.updateMemSize(-int64(atomic.LoadUint64(&c.snapshotSize))) // decrement the number of bytes in cache

// Reset the snapshot's store, and reset the snapshot to a fresh Cache.
c.snapshot.store.reset()
c.snapshot = &Cache{
store: c.snapshot.store,
}

atomic.StoreUint64(&c.snapshotSize, 0)
c.updateSnapshots()
}
}
Expand Down
97 changes: 95 additions & 2 deletions tsdb/engine/tsm1/cache_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package tsm1

import (
"errors"
"fmt"
"io/ioutil"
"math"
Expand Down Expand Up @@ -99,6 +100,49 @@ func TestCache_CacheWriteMulti(t *testing.T) {
}
}

// Tests that the cache stats and size are correctly maintained during writes.
func TestCache_WriteMulti_Stats(t *testing.T) {
limit := uint64(1)
c := NewCache(limit, "")
ms := NewTestStore()
c.store = ms

// Not enough room in the cache.
v := NewValue(1, 1.0)
values := map[string][]Value{"foo": []Value{v, v}}
if got, exp := c.WriteMulti(values), ErrCacheMemorySizeLimitExceeded(uint64(v.Size()*2), limit); !reflect.DeepEqual(got, exp) {
t.Fatalf("got %q, expected %q", got, exp)
}

// Fail one of the values in the write.
c = NewCache(50, "")
c.store = ms

ms.writef = func(key string, v Values) error {
if key == "foo" {
return errors.New("write failed")
}
return nil
}

values = map[string][]Value{"foo": []Value{v, v}, "bar": []Value{v}}
if got, exp := c.WriteMulti(values), errors.New("write failed"); !reflect.DeepEqual(got, exp) {
t.Fatalf("got %v, expected %v", got, exp)
}

// Cache size decreased correctly.
if got, exp := c.Size(), uint64(16); got != exp {
t.Fatalf("got %v, expected %v", got, exp)
}

// Write stats updated
if got, exp := c.stats.WriteDropped, int64(1); got != exp {
t.Fatalf("got %v, expected %v", got, exp)
} else if got, exp := c.stats.WriteErr, int64(1); got != exp {
t.Fatalf("got %v, expected %v", got, exp)
}
}

func TestCache_CacheWriteMulti_TypeConflict(t *testing.T) {
v0 := NewValue(1, 1.0)
v1 := NewValue(2, 2.0)
Expand Down Expand Up @@ -388,6 +432,32 @@ func TestCache_CacheSnapshot(t *testing.T) {
}
}

// Tests that Snapshot updates statistics correctly.
func TestCache_Snapshot_Stats(t *testing.T) {
limit := uint64(16)
c := NewCache(limit, "")

values := map[string][]Value{"foo": []Value{NewValue(1, 1.0)}}
if err := c.WriteMulti(values); err != nil {
t.Fatal(err)
}

_, err := c.Snapshot()
if err != nil {
t.Fatal(err)
}

// Store size should have been reset.
if got, exp := c.Size(), uint64(0); got != exp {
t.Fatalf("got %v, expected %v", got, exp)
}

// Cached bytes should have been increased.
if got, exp := c.stats.CachedBytes, int64(16); got != exp {
t.Fatalf("got %v, expected %v", got, exp)
}
}

func TestCache_CacheEmptySnapshot(t *testing.T) {
c := NewCache(512, "")

Expand Down Expand Up @@ -425,7 +495,7 @@ func TestCache_CacheWriteMemoryExceeded(t *testing.T) {
t.Fatalf("cache keys incorrect after writes, exp %v, got %v", exp, keys)
}
if err := c.Write("bar", Values{v1}); err == nil || !strings.Contains(err.Error(), "cache-max-memory-size") {
t.Fatalf("wrong error writing key bar to cache")
t.Fatalf("wrong error writing key bar to cache: %v", err)
}

// Grab snapshot, write should still fail since we're still using the memory.
Expand All @@ -434,7 +504,7 @@ func TestCache_CacheWriteMemoryExceeded(t *testing.T) {
t.Fatalf("failed to snapshot cache: %v", err)
}
if err := c.Write("bar", Values{v1}); err == nil || !strings.Contains(err.Error(), "cache-max-memory-size") {
t.Fatalf("wrong error writing key bar to cache")
t.Fatalf("wrong error writing key bar to cache: %v", err)
}

// Clear the snapshot and the write should now succeed.
Expand Down Expand Up @@ -695,6 +765,29 @@ func mustMarshalEntry(entry WALEntry) (WalEntryType, []byte) {
return entry.Type(), snappy.Encode(b, b)
}

// TestStore implements the storer interface and can be used to mock out a
// Cache's storer implememation.
type TestStore struct {
entryf func(key string) (*entry, bool)
writef func(key string, values Values) error
addf func(key string, entry *entry)
removef func(key string)
keysf func(sorted bool) []string
applyf func(f func(string, *entry) error) error
applySerialf func(f func(string, *entry) error) error
resetf func()
}

func NewTestStore() *TestStore { return &TestStore{} }
func (s *TestStore) entry(key string) (*entry, bool) { return s.entryf(key) }
func (s *TestStore) write(key string, values Values) error { return s.writef(key, values) }
func (s *TestStore) add(key string, entry *entry) { s.addf(key, entry) }
func (s *TestStore) remove(key string) { s.removef(key) }
func (s *TestStore) keys(sorted bool) []string { return s.keysf(sorted) }
func (s *TestStore) apply(f func(string, *entry) error) error { return s.applyf(f) }
func (s *TestStore) applySerial(f func(string, *entry) error) error { return s.applySerialf(f) }
func (s *TestStore) reset() { s.resetf() }

var fvSize = uint64(NewValue(1, float64(1)).Size())

func BenchmarkCacheFloatEntries(b *testing.B) {
Expand Down
Loading

0 comments on commit 7e7f0d0

Please sign in to comment.