Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve series segment recovery #10002

Merged
merged 1 commit into from
Jun 26, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions tsdb/series_segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func (s *SeriesSegment) InitForWrite() (err error) {
// Only calculcate segment data size if writing.
for s.size = uint32(SeriesSegmentHeaderSize); s.size < uint32(len(s.data)); {
flag, _, _, sz := ReadSeriesEntry(s.data[s.size:])
if flag == 0 {
if !IsValidSeriesEntryFlag(flag) {
break
}
s.size += uint32(sz)
Expand Down Expand Up @@ -237,7 +237,7 @@ func (s *SeriesSegment) MaxSeriesID() uint64 {
func (s *SeriesSegment) ForEachEntry(fn func(flag uint8, id uint64, offset int64, key []byte) error) error {
for pos := uint32(SeriesSegmentHeaderSize); pos < uint32(len(s.data)); {
flag, id, key, sz := ReadSeriesEntry(s.data[pos:])
if flag == 0 {
if !IsValidSeriesEntryFlag(flag) {
break
}

Expand Down Expand Up @@ -368,7 +368,7 @@ func (hdr *SeriesSegmentHeader) WriteTo(w io.Writer) (n int64, err error) {
func ReadSeriesEntry(data []byte) (flag uint8, id uint64, key []byte, sz int64) {
// If flag byte is zero then no more entries exist.
flag, data = uint8(data[0]), data[1:]
if flag == 0 {
if !IsValidSeriesEntryFlag(flag) {
return 0, 0, nil, 1
}

Expand Down Expand Up @@ -396,3 +396,13 @@ func AppendSeriesEntry(dst []byte, flag uint8, id uint64, key []byte) []byte {
}
return dst
}

// IsValidSeriesEntryFlag returns true if flag is valid.
func IsValidSeriesEntryFlag(flag byte) bool {
switch flag {
case SeriesEntryInsertFlag, SeriesEntryTombstoneFlag:
return true
default:
return false
}
}
44 changes: 44 additions & 0 deletions tsdb/series_segment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package tsdb_test

import (
"bytes"
"os"
"path/filepath"
"testing"

Expand Down Expand Up @@ -140,6 +141,49 @@ func TestSeriesSegmentHeader(t *testing.T) {
}
}

func TestSeriesSegment_PartialWrite(t *testing.T) {
dir, cleanup := MustTempDir()
defer cleanup()

// Create a new initial segment (4mb) and initialize for writing.
segment, err := tsdb.CreateSeriesSegment(0, filepath.Join(dir, "0000"))
if err != nil {
t.Fatal(err)
} else if err := segment.InitForWrite(); err != nil {
t.Fatal(err)
}
defer segment.Close()

// Write two entries.
if _, err := segment.WriteLogEntry(tsdb.AppendSeriesEntry(nil, tsdb.SeriesEntryInsertFlag, 1, tsdb.AppendSeriesKey(nil, []byte("A"), nil))); err != nil {
t.Fatal(err)
} else if _, err := segment.WriteLogEntry(tsdb.AppendSeriesEntry(nil, tsdb.SeriesEntryInsertFlag, 2, tsdb.AppendSeriesKey(nil, []byte("B"), nil))); err != nil {
t.Fatal(err)
}
sz := segment.Size()
entrySize := len(tsdb.AppendSeriesEntry(nil, tsdb.SeriesEntryInsertFlag, 2, tsdb.AppendSeriesKey(nil, []byte("B"), nil)))

// Close segment.
if err := segment.Close(); err != nil {
t.Fatal(err)
}

// Truncate at each point and reopen.
for i := entrySize; i > 0; i-- {
if err := os.Truncate(filepath.Join(dir, "0000"), sz-int64(entrySize-i)); err != nil {
t.Fatal(err)
}
segment := tsdb.NewSeriesSegment(0, filepath.Join(dir, "0000"))
if err := segment.Open(); err != nil {
t.Fatal(err)
} else if err := segment.InitForWrite(); err != nil {
t.Fatal(err)
} else if err := segment.Close(); err != nil {
t.Fatal(err)
}
}
}

func TestJoinSeriesOffset(t *testing.T) {
if offset := tsdb.JoinSeriesOffset(0x1234, 0x56789ABC); offset != 0x123456789ABC {
t.Fatalf("unexpected offset: %x", offset)
Expand Down