Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce SSTable sha256 checksums #689

Merged
merged 3 commits into from
Jan 20, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions badger/cmd/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,9 @@ func printInfo(dir, valueDir string) error {
})
for _, tableID := range tableIDs {
tableFile := table.IDToFilename(tableID)
file, ok := fileinfoByName[tableFile]
if ok {
tm, ok1 := manifest.Tables[tableID]
file, ok2 := fileinfoByName[tableFile]
if ok1 && ok2 {
fileinfoMarked[tableFile] = true
emptyString := ""
fileSize := file.Size()
Expand All @@ -180,8 +181,8 @@ func printInfo(dir, valueDir string) error {
}
levelSizes[level] += fileSize
// (Put level on every line to make easier to process with sed/perl.)
fmt.Printf("[%25s] %-12s %6s L%d%s\n", dur(baseTime, file.ModTime()),
tableFile, hbytes(fileSize), level, emptyString)
fmt.Printf("[%25s] %-12s %6s L%d %x%s\n", dur(baseTime, file.ModTime()),
tableFile, hbytes(fileSize), level, tm.Checksum, emptyString)
} else {
fmt.Printf("%s [MISSING]\n", tableFile)
numMissing++
Expand Down
2 changes: 1 addition & 1 deletion db.go
Original file line number Diff line number Diff line change
Expand Up @@ -829,7 +829,7 @@ func (db *DB) handleFlushTask(ft flushTask) error {
db.elog.Errorf("ERROR while syncing level directory: %v", dirSyncErr)
}

tbl, err := table.OpenTable(fd, db.opt.TableLoadingMode)
tbl, err := table.OpenTable(fd, db.opt.TableLoadingMode, nil)
if err != nil {
db.elog.Printf("ERROR while opening table: %v", err)
return err
Expand Down
32 changes: 20 additions & 12 deletions levels.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"math/rand"
"os"
"sort"
"strings"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -122,7 +123,7 @@ func newLevelsController(db *DB, mf *Manifest) (*levelsController, error) {
tick := time.NewTicker(3 * time.Second)
defer tick.Stop()

for fileID, tableManifest := range mf.Tables {
for fileID, tf := range mf.Tables {
fname := table.NewFilename(fileID, db.opt.Dir)
select {
case <-tick.C:
Expand All @@ -137,7 +138,7 @@ func newLevelsController(db *DB, mf *Manifest) (*levelsController, error) {
if fileID > maxFileID {
maxFileID = fileID
}
go func(fname string, level int) {
go func(fname string, tf TableManifest) {
var rerr error
defer func() {
throttle.Done(rerr)
Expand All @@ -149,16 +150,22 @@ func newLevelsController(db *DB, mf *Manifest) (*levelsController, error) {
return
}

t, err := table.OpenTable(fd, db.opt.TableLoadingMode)
t, err := table.OpenTable(fd, db.opt.TableLoadingMode, tf.Checksum)
if err != nil {
rerr = errors.Wrapf(err, "Opening table: %q", fname)
if strings.HasPrefix(err.Error(), "CHECKSUM_MISMATCH:") {
db.opt.Errorf(err.Error())
db.opt.Errorf("Ignoring table %s", fd.Name())
// Do not set rerr. We will continue without this table.
} else {
rerr = errors.Wrapf(err, "Opening table: %q", fname)
}
return
}

mu.Lock()
tables[level] = append(tables[level], t)
tables[tf.Level] = append(tables[tf.Level], t)
mu.Unlock()
}(fname, int(tableManifest.Level))
}(fname, tf)
}
if err := throttle.Finish(); err != nil {
closeAllTables(tables)
Expand Down Expand Up @@ -226,7 +233,7 @@ func (s *levelsController) deleteLSMTree() (int, error) {
// Generate the manifest changes.
changes := []*pb.ManifestChange{}
for _, table := range all {
changes = append(changes, makeTableDeleteChange(table.ID()))
changes = append(changes, newDeleteChange(table.ID()))
}
changeSet := pb.ManifestChangeSet{Changes: changes}
if err := s.kv.manifest.addChanges(changeSet.Changes); err != nil {
Expand Down Expand Up @@ -486,7 +493,7 @@ func (s *levelsController) compactBuildTables(
return
}

tbl, err := table.OpenTable(fd, s.kv.opt.TableLoadingMode)
tbl, err := table.OpenTable(fd, s.kv.opt.TableLoadingMode, nil)
// decrRef is added below.
resultCh <- newTableResult{tbl, errors.Wrapf(err, "Unable to open table: %q", fd.Name())}
}(builder)
Expand Down Expand Up @@ -534,13 +541,14 @@ func (s *levelsController) compactBuildTables(
func buildChangeSet(cd *compactDef, newTables []*table.Table) pb.ManifestChangeSet {
changes := []*pb.ManifestChange{}
for _, table := range newTables {
changes = append(changes, makeTableCreateChange(table.ID(), cd.nextLevel.level))
changes = append(changes,
newCreateChange(table.ID(), cd.nextLevel.level, table.Checksum))
}
for _, table := range cd.top {
changes = append(changes, makeTableDeleteChange(table.ID()))
changes = append(changes, newDeleteChange(table.ID()))
}
for _, table := range cd.bot {
changes = append(changes, makeTableDeleteChange(table.ID()))
changes = append(changes, newDeleteChange(table.ID()))
}
return pb.ManifestChangeSet{Changes: changes}
}
Expand Down Expand Up @@ -748,7 +756,7 @@ func (s *levelsController) addLevel0Table(t *table.Table) error {
// the proper order. (That means this update happens before that of some compaction which
// deletes the table.)
err := s.kv.manifest.addChanges([]*pb.ManifestChange{
makeTableCreateChange(t.ID(), 0),
newCreateChange(t.ID(), 0, t.Checksum),
})
if err != nil {
return err
Expand Down
29 changes: 16 additions & 13 deletions manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import (
// reconstruct the manifest at startup.
type Manifest struct {
Levels []levelManifest
Tables map[uint64]tableManifest
Tables map[uint64]TableManifest

// Contains total number of creation and deletion changes in the manifest -- used to compute
// whether it'd be useful to rewrite the manifest.
Expand All @@ -54,7 +54,7 @@ func createManifest() Manifest {
levels := make([]levelManifest, 0)
return Manifest{
Levels: levels,
Tables: make(map[uint64]tableManifest),
Tables: make(map[uint64]TableManifest),
}
}

Expand All @@ -64,10 +64,11 @@ type levelManifest struct {
Tables map[uint64]struct{} // Set of table id's
}

// tableManifest contains information about a specific level
// TableManifest contains information about a specific level
// in the LSM tree.
type tableManifest struct {
Level uint8
type TableManifest struct {
Level uint8
Checksum []byte
}

// manifestFile holds the file pointer (and other info) about the manifest file, which is a log
Expand Down Expand Up @@ -98,7 +99,7 @@ const (
func (m *Manifest) asChanges() []*pb.ManifestChange {
changes := make([]*pb.ManifestChange, 0, len(m.Tables))
for id, tm := range m.Tables {
changes = append(changes, makeTableCreateChange(id, int(tm.Level)))
changes = append(changes, newCreateChange(id, int(tm.Level), tm.Checksum))
}
return changes
}
Expand Down Expand Up @@ -384,8 +385,9 @@ func applyManifestChange(build *Manifest, tc *pb.ManifestChange) error {
if _, ok := build.Tables[tc.Id]; ok {
return fmt.Errorf("MANIFEST invalid, table %d exists", tc.Id)
}
build.Tables[tc.Id] = tableManifest{
Level: uint8(tc.Level),
build.Tables[tc.Id] = TableManifest{
Level: uint8(tc.Level),
Checksum: append([]byte{}, tc.Checksum...),
}
for len(build.Levels) <= int(tc.Level) {
build.Levels = append(build.Levels, levelManifest{make(map[uint64]struct{})})
Expand Down Expand Up @@ -417,15 +419,16 @@ func applyChangeSet(build *Manifest, changeSet *pb.ManifestChangeSet) error {
return nil
}

func makeTableCreateChange(id uint64, level int) *pb.ManifestChange {
func newCreateChange(id uint64, level int, checksum []byte) *pb.ManifestChange {
return &pb.ManifestChange{
Id: id,
Op: pb.ManifestChange_CREATE,
Level: uint32(level),
Id: id,
Op: pb.ManifestChange_CREATE,
Level: uint32(level),
Checksum: checksum,
}
}

func makeTableDeleteChange(id uint64) *pb.ManifestChange {
func newDeleteChange(id uint64) *pb.ManifestChange {
return &pb.ManifestChange{
Id: id,
Op: pb.ManifestChange_DELETE,
Expand Down
14 changes: 7 additions & 7 deletions manifest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func TestOverlappingKeyRangeError(t *testing.T) {
lh0 := newLevelHandler(kv, 0)
lh1 := newLevelHandler(kv, 1)
f := buildTestTable(t, "k", 2)
t1, err := table.OpenTable(f, options.MemoryMap)
t1, err := table.OpenTable(f, options.MemoryMap, nil)
require.NoError(t, err)
defer t1.DecrRef()

Expand All @@ -190,7 +190,7 @@ func TestOverlappingKeyRangeError(t *testing.T) {
lc.runCompactDef(0, cd)

f = buildTestTable(t, "l", 2)
t2, err := table.OpenTable(f, options.MemoryMap)
t2, err := table.OpenTable(f, options.MemoryMap, nil)
require.NoError(t, err)
defer t2.DecrRef()
done = lh0.tryAddLevel0Table(t2)
Expand Down Expand Up @@ -221,14 +221,14 @@ func TestManifestRewrite(t *testing.T) {
require.Equal(t, 0, m.Deletions)

err = mf.addChanges([]*pb.ManifestChange{
makeTableCreateChange(0, 0),
newCreateChange(0, 0, nil),
})
require.NoError(t, err)

for i := uint64(0); i < uint64(deletionsThreshold*3); i++ {
ch := []*pb.ManifestChange{
makeTableCreateChange(i+1, 0),
makeTableDeleteChange(i),
newCreateChange(i+1, 0, nil),
newDeleteChange(i),
}
err := mf.addChanges(ch)
require.NoError(t, err)
Expand All @@ -238,7 +238,7 @@ func TestManifestRewrite(t *testing.T) {
mf = nil
mf, m, err = helpOpenOrCreateManifestFile(dir, false, deletionsThreshold)
require.NoError(t, err)
require.Equal(t, map[uint64]tableManifest{
uint64(deletionsThreshold * 3): {Level: 0},
require.Equal(t, map[uint64]TableManifest{
uint64(deletionsThreshold * 3): {Level: 0, Checksum: []byte{}},
}, m.Tables)
}
Loading