Skip to content

Commit

Permalink
TSM: TSMReader.Close blocks until reads complete
Browse files Browse the repository at this point in the history
  • Loading branch information
Jacob Marble committed Apr 30, 2018
1 parent fa24142 commit 0529429
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 13 deletions.
7 changes: 6 additions & 1 deletion coordinator/points_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package coordinator

import (
"errors"
"fmt"
"sort"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -308,7 +309,11 @@ func (w *PointsWriter) WritePointsPrivileged(database, retentionPolicy string, c
ch := make(chan error, len(shardMappings.Points))
for shardID, points := range shardMappings.Points {
go func(shard *meta.ShardInfo, database, retentionPolicy string, points []models.Point) {
ch <- w.writeToShard(shard, database, retentionPolicy, points)
err := w.writeToShard(shard, database, retentionPolicy, points)
if err == tsdb.ErrShardDeletion {
err = tsdb.PartialWriteError{Reason: fmt.Sprintf("shard %d is pending deletion", shard.ID), Dropped: len(points)}
}
ch <- err
}(shardMappings.Shards[shardID], database, retentionPolicy, points)
}

Expand Down
5 changes: 4 additions & 1 deletion tsdb/engine/tsm1/file_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,10 @@ func (f *FileStore) Close() error {
defer f.mu.Unlock()

for _, file := range f.files {
file.Close()
err := file.Close()
if err != nil {
return err
}
}

f.lastFileStats = nil
Expand Down
16 changes: 10 additions & 6 deletions tsdb/engine/tsm1/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ var nilOffset = []byte{255, 255, 255, 255}
// TSMReader is a reader for a TSM file.
type TSMReader struct {
// refs is the count of active references to this reader.
refs int64
refs int64
refsWG sync.WaitGroup

mu sync.RWMutex

Expand Down Expand Up @@ -398,13 +399,11 @@ func (t *TSMReader) Type(key []byte) (byte, error) {

// Close closes the TSMReader.
func (t *TSMReader) Close() error {
t.refsWG.Wait()

t.mu.Lock()
defer t.mu.Unlock()

if t.InUse() {
return ErrFileInUse
}

if err := t.accessor.close(); err != nil {
return err
}
Expand All @@ -417,13 +416,15 @@ func (t *TSMReader) Close() error {
// there are no more references.
func (t *TSMReader) Ref() {
atomic.AddInt64(&t.refs, 1)
t.refsWG.Add(1)
}

// Unref removes a usage record of this TSMReader. If the Reader was closed
// by another goroutine while there were active references, the file will
// be closed and remove
func (t *TSMReader) Unref() {
atomic.AddInt64(&t.refs, -1)
t.refsWG.Done()
}

// InUse returns whether the TSMReader currently has any active references.
Expand Down Expand Up @@ -455,7 +456,10 @@ func (t *TSMReader) remove() error {
}

if path != "" {
os.RemoveAll(path)
err := os.RemoveAll(path)
if err != nil {
return err
}
}

if err := t.tombstoner.Delete(); err != nil {
Expand Down
4 changes: 0 additions & 4 deletions tsdb/engine/tsm1/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1828,10 +1828,6 @@ func TestTSMReader_References(t *testing.T) {

r.Ref()

if err := r.Close(); err != ErrFileInUse {
t.Fatalf("expected error closing reader: %v", err)
}

if err := r.Remove(); err != ErrFileInUse {
t.Fatalf("expected error removing reader: %v", err)
}
Expand Down
4 changes: 3 additions & 1 deletion tsdb/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ var (
ErrShardNotFound = fmt.Errorf("shard not found")
// ErrStoreClosed is returned when trying to use a closed Store.
ErrStoreClosed = fmt.Errorf("store is closed")
// ErrShardDeletion is returned when trying to create a shard that is being deleted
ErrShardDeletion = errors.New("shard is being deleted")
)

// Statistics gathered by the store.
Expand Down Expand Up @@ -523,7 +525,7 @@ func (s *Store) CreateShard(database, retentionPolicy string, shardID uint64, en
// Shard may be undergoing a pending deletion. While the shard can be
// recreated, it must wait for the pending delete to finish.
if _, ok := s.pendingShardDeletes[shardID]; ok {
return fmt.Errorf("shard %d is pending deletion and cannot be created again until finished", shardID)
return ErrShardDeletion
}

// Create the db and retention policy directories if they don't exist.
Expand Down

0 comments on commit 0529429

Please sign in to comment.