Skip to content

Commit

Permalink
Introduce SSTable sha256 checksums (#689)
Browse files Browse the repository at this point in the history
Add SHA256 checksums for SSTables in MANIFEST. If a table no longer matches this checksum, that table would be skipped over with an error.

Tested that it works with previous badger directories. As new tables get created, Badger would store their checksums in MANIFEST.

Modified `badger info` to show the checksums stored in MANIFEST, so user can manually compare the output from `sha256sum <filename>` if needed.

Fixes #680 .
  • Loading branch information
manishrjain authored Jan 20, 2019
1 parent 8de6464 commit 2017987
Show file tree
Hide file tree
Showing 9 changed files with 229 additions and 159 deletions.
9 changes: 5 additions & 4 deletions badger/cmd/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,9 @@ func printInfo(dir, valueDir string) error {
})
for _, tableID := range tableIDs {
tableFile := table.IDToFilename(tableID)
file, ok := fileinfoByName[tableFile]
if ok {
tm, ok1 := manifest.Tables[tableID]
file, ok2 := fileinfoByName[tableFile]
if ok1 && ok2 {
fileinfoMarked[tableFile] = true
emptyString := ""
fileSize := file.Size()
Expand All @@ -180,8 +181,8 @@ func printInfo(dir, valueDir string) error {
}
levelSizes[level] += fileSize
// (Put level on every line to make easier to process with sed/perl.)
fmt.Printf("[%25s] %-12s %6s L%d%s\n", dur(baseTime, file.ModTime()),
tableFile, hbytes(fileSize), level, emptyString)
fmt.Printf("[%25s] %-12s %6s L%d %x%s\n", dur(baseTime, file.ModTime()),
tableFile, hbytes(fileSize), level, tm.Checksum, emptyString)
} else {
fmt.Printf("%s [MISSING]\n", tableFile)
numMissing++
Expand Down
2 changes: 1 addition & 1 deletion db.go
Original file line number Diff line number Diff line change
Expand Up @@ -829,7 +829,7 @@ func (db *DB) handleFlushTask(ft flushTask) error {
db.elog.Errorf("ERROR while syncing level directory: %v", dirSyncErr)
}

tbl, err := table.OpenTable(fd, db.opt.TableLoadingMode)
tbl, err := table.OpenTable(fd, db.opt.TableLoadingMode, nil)
if err != nil {
db.elog.Printf("ERROR while opening table: %v", err)
return err
Expand Down
32 changes: 20 additions & 12 deletions levels.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"math/rand"
"os"
"sort"
"strings"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -122,7 +123,7 @@ func newLevelsController(db *DB, mf *Manifest) (*levelsController, error) {
tick := time.NewTicker(3 * time.Second)
defer tick.Stop()

for fileID, tableManifest := range mf.Tables {
for fileID, tf := range mf.Tables {
fname := table.NewFilename(fileID, db.opt.Dir)
select {
case <-tick.C:
Expand All @@ -137,7 +138,7 @@ func newLevelsController(db *DB, mf *Manifest) (*levelsController, error) {
if fileID > maxFileID {
maxFileID = fileID
}
go func(fname string, level int) {
go func(fname string, tf TableManifest) {
var rerr error
defer func() {
throttle.Done(rerr)
Expand All @@ -149,16 +150,22 @@ func newLevelsController(db *DB, mf *Manifest) (*levelsController, error) {
return
}

t, err := table.OpenTable(fd, db.opt.TableLoadingMode)
t, err := table.OpenTable(fd, db.opt.TableLoadingMode, tf.Checksum)
if err != nil {
rerr = errors.Wrapf(err, "Opening table: %q", fname)
if strings.HasPrefix(err.Error(), "CHECKSUM_MISMATCH:") {
db.opt.Errorf(err.Error())
db.opt.Errorf("Ignoring table %s", fd.Name())
// Do not set rerr. We will continue without this table.
} else {
rerr = errors.Wrapf(err, "Opening table: %q", fname)
}
return
}

mu.Lock()
tables[level] = append(tables[level], t)
tables[tf.Level] = append(tables[tf.Level], t)
mu.Unlock()
}(fname, int(tableManifest.Level))
}(fname, tf)
}
if err := throttle.Finish(); err != nil {
closeAllTables(tables)
Expand Down Expand Up @@ -226,7 +233,7 @@ func (s *levelsController) deleteLSMTree() (int, error) {
// Generate the manifest changes.
changes := []*pb.ManifestChange{}
for _, table := range all {
changes = append(changes, makeTableDeleteChange(table.ID()))
changes = append(changes, newDeleteChange(table.ID()))
}
changeSet := pb.ManifestChangeSet{Changes: changes}
if err := s.kv.manifest.addChanges(changeSet.Changes); err != nil {
Expand Down Expand Up @@ -486,7 +493,7 @@ func (s *levelsController) compactBuildTables(
return
}

tbl, err := table.OpenTable(fd, s.kv.opt.TableLoadingMode)
tbl, err := table.OpenTable(fd, s.kv.opt.TableLoadingMode, nil)
// decrRef is added below.
resultCh <- newTableResult{tbl, errors.Wrapf(err, "Unable to open table: %q", fd.Name())}
}(builder)
Expand Down Expand Up @@ -534,13 +541,14 @@ func (s *levelsController) compactBuildTables(
func buildChangeSet(cd *compactDef, newTables []*table.Table) pb.ManifestChangeSet {
changes := []*pb.ManifestChange{}
for _, table := range newTables {
changes = append(changes, makeTableCreateChange(table.ID(), cd.nextLevel.level))
changes = append(changes,
newCreateChange(table.ID(), cd.nextLevel.level, table.Checksum))
}
for _, table := range cd.top {
changes = append(changes, makeTableDeleteChange(table.ID()))
changes = append(changes, newDeleteChange(table.ID()))
}
for _, table := range cd.bot {
changes = append(changes, makeTableDeleteChange(table.ID()))
changes = append(changes, newDeleteChange(table.ID()))
}
return pb.ManifestChangeSet{Changes: changes}
}
Expand Down Expand Up @@ -748,7 +756,7 @@ func (s *levelsController) addLevel0Table(t *table.Table) error {
// the proper order. (That means this update happens before that of some compaction which
// deletes the table.)
err := s.kv.manifest.addChanges([]*pb.ManifestChange{
makeTableCreateChange(t.ID(), 0),
newCreateChange(t.ID(), 0, t.Checksum),
})
if err != nil {
return err
Expand Down
29 changes: 16 additions & 13 deletions manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import (
// reconstruct the manifest at startup.
type Manifest struct {
Levels []levelManifest
Tables map[uint64]tableManifest
Tables map[uint64]TableManifest

// Contains total number of creation and deletion changes in the manifest -- used to compute
// whether it'd be useful to rewrite the manifest.
Expand All @@ -54,7 +54,7 @@ func createManifest() Manifest {
levels := make([]levelManifest, 0)
return Manifest{
Levels: levels,
Tables: make(map[uint64]tableManifest),
Tables: make(map[uint64]TableManifest),
}
}

Expand All @@ -64,10 +64,11 @@ type levelManifest struct {
Tables map[uint64]struct{} // Set of table id's
}

// tableManifest contains information about a specific level
// TableManifest contains information about a specific level
// in the LSM tree.
type tableManifest struct {
Level uint8
type TableManifest struct {
Level uint8
Checksum []byte
}

// manifestFile holds the file pointer (and other info) about the manifest file, which is a log
Expand Down Expand Up @@ -98,7 +99,7 @@ const (
func (m *Manifest) asChanges() []*pb.ManifestChange {
changes := make([]*pb.ManifestChange, 0, len(m.Tables))
for id, tm := range m.Tables {
changes = append(changes, makeTableCreateChange(id, int(tm.Level)))
changes = append(changes, newCreateChange(id, int(tm.Level), tm.Checksum))
}
return changes
}
Expand Down Expand Up @@ -384,8 +385,9 @@ func applyManifestChange(build *Manifest, tc *pb.ManifestChange) error {
if _, ok := build.Tables[tc.Id]; ok {
return fmt.Errorf("MANIFEST invalid, table %d exists", tc.Id)
}
build.Tables[tc.Id] = tableManifest{
Level: uint8(tc.Level),
build.Tables[tc.Id] = TableManifest{
Level: uint8(tc.Level),
Checksum: append([]byte{}, tc.Checksum...),
}
for len(build.Levels) <= int(tc.Level) {
build.Levels = append(build.Levels, levelManifest{make(map[uint64]struct{})})
Expand Down Expand Up @@ -417,15 +419,16 @@ func applyChangeSet(build *Manifest, changeSet *pb.ManifestChangeSet) error {
return nil
}

func makeTableCreateChange(id uint64, level int) *pb.ManifestChange {
func newCreateChange(id uint64, level int, checksum []byte) *pb.ManifestChange {
return &pb.ManifestChange{
Id: id,
Op: pb.ManifestChange_CREATE,
Level: uint32(level),
Id: id,
Op: pb.ManifestChange_CREATE,
Level: uint32(level),
Checksum: checksum,
}
}

func makeTableDeleteChange(id uint64) *pb.ManifestChange {
func newDeleteChange(id uint64) *pb.ManifestChange {
return &pb.ManifestChange{
Id: id,
Op: pb.ManifestChange_DELETE,
Expand Down
14 changes: 7 additions & 7 deletions manifest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func TestOverlappingKeyRangeError(t *testing.T) {
lh0 := newLevelHandler(kv, 0)
lh1 := newLevelHandler(kv, 1)
f := buildTestTable(t, "k", 2)
t1, err := table.OpenTable(f, options.MemoryMap)
t1, err := table.OpenTable(f, options.MemoryMap, nil)
require.NoError(t, err)
defer t1.DecrRef()

Expand All @@ -190,7 +190,7 @@ func TestOverlappingKeyRangeError(t *testing.T) {
lc.runCompactDef(0, cd)

f = buildTestTable(t, "l", 2)
t2, err := table.OpenTable(f, options.MemoryMap)
t2, err := table.OpenTable(f, options.MemoryMap, nil)
require.NoError(t, err)
defer t2.DecrRef()
done = lh0.tryAddLevel0Table(t2)
Expand Down Expand Up @@ -221,14 +221,14 @@ func TestManifestRewrite(t *testing.T) {
require.Equal(t, 0, m.Deletions)

err = mf.addChanges([]*pb.ManifestChange{
makeTableCreateChange(0, 0),
newCreateChange(0, 0, nil),
})
require.NoError(t, err)

for i := uint64(0); i < uint64(deletionsThreshold*3); i++ {
ch := []*pb.ManifestChange{
makeTableCreateChange(i+1, 0),
makeTableDeleteChange(i),
newCreateChange(i+1, 0, nil),
newDeleteChange(i),
}
err := mf.addChanges(ch)
require.NoError(t, err)
Expand All @@ -238,7 +238,7 @@ func TestManifestRewrite(t *testing.T) {
mf = nil
mf, m, err = helpOpenOrCreateManifestFile(dir, false, deletionsThreshold)
require.NoError(t, err)
require.Equal(t, map[uint64]tableManifest{
uint64(deletionsThreshold * 3): {Level: 0},
require.Equal(t, map[uint64]TableManifest{
uint64(deletionsThreshold * 3): {Level: 0, Checksum: []byte{}},
}, m.Tables)
}
Loading

0 comments on commit 2017987

Please sign in to comment.