Skip to content

Commit

Permalink
fix(replay) - Update head for LSM entires also (#1456)
Browse files Browse the repository at this point in the history
#1372 tried to fix the `replay from
start` issue but it partially fixed the issue. The head was not being updated
in case all the entries are inserted only in the LSM tree.
This commit fixes it.

(cherry picked from commit 4c8fe7f)
  • Loading branch information
Ibrahim Jarif committed Aug 18, 2020
1 parent 7d169b2 commit aef68f0
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 12 deletions.
22 changes: 22 additions & 0 deletions backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ func TestBackupRestore1(t *testing.T) {
return nil
})
require.NoError(t, err)
require.Equal(t, db.orc.nextTs(), uint64(3))
}

func TestBackupRestore2(t *testing.T) {
Expand Down Expand Up @@ -163,6 +164,9 @@ func TestBackupRestore2(t *testing.T) {
err = db2.Load(&backup, 16)
require.NoError(t, err)

// Check nextTs is correctly set.
require.Equal(t, db1.orc.nextTs(), db2.orc.nextTs())

for i := byte(1); i < N; i++ {
err = db2.View(func(tx *Txn) error {
k := append(key1, i)
Expand Down Expand Up @@ -210,6 +214,9 @@ func TestBackupRestore2(t *testing.T) {
err = db3.Load(&backup, 16)
require.NoError(t, err)

// Check nextTs is correctly set.
require.Equal(t, db2.orc.nextTs(), db3.orc.nextTs())

for i := byte(1); i < N; i++ {
err = db3.View(func(tx *Txn) error {
k := append(key1, i)
Expand Down Expand Up @@ -325,6 +332,7 @@ func TestBackupRestore3(t *testing.T) {
N := 1000
entries := createEntries(N)

var db1NextTs uint64
// backup
{
db1, err := Open(DefaultOptions(filepath.Join(tmpdir, "backup1")))
Expand All @@ -335,6 +343,8 @@ func TestBackupRestore3(t *testing.T) {

_, err = db1.Backup(&bb, 0)
require.NoError(t, err)

db1NextTs = db1.orc.nextTs()
require.NoError(t, db1.Close())
}
require.True(t, len(entries) == N)
Expand All @@ -345,7 +355,9 @@ func TestBackupRestore3(t *testing.T) {
require.NoError(t, err)

defer db2.Close()
require.NotEqual(t, db1NextTs, db2.orc.nextTs())
require.NoError(t, db2.Load(&bb, 16))
require.Equal(t, db1NextTs, db2.orc.nextTs())

// verify
err = db2.View(func(txn *Txn) error {
Expand Down Expand Up @@ -383,6 +395,7 @@ func TestBackupLoadIncremental(t *testing.T) {
updates := make(map[int]byte)
var bb bytes.Buffer

var db1NextTs uint64
// backup
{
db1, err := Open(DefaultOptions(filepath.Join(tmpdir, "backup2")))
Expand Down Expand Up @@ -439,6 +452,9 @@ func TestBackupLoadIncremental(t *testing.T) {
require.NoError(t, err)
_, err = db1.Backup(&bb, since)
require.NoError(t, err)

db1NextTs = db1.orc.nextTs()

require.NoError(t, db1.Close())
}
require.True(t, len(entries) == N)
Expand All @@ -450,7 +466,9 @@ func TestBackupLoadIncremental(t *testing.T) {

defer db2.Close()

require.NotEqual(t, db1NextTs, db2.orc.nextTs())
require.NoError(t, db2.Load(&bb, 16))
require.Equal(t, db1NextTs, db2.orc.nextTs())

// verify
actual := make(map[int]byte)
Expand Down Expand Up @@ -517,6 +535,8 @@ func TestBackupBitClear(t *testing.T) {
_, err = db.Backup(bak, 0)
require.NoError(t, err)
require.NoError(t, bak.Close())

oldValue := db.orc.nextTs()
require.NoError(t, db.Close())

opt = getTestOptions(dir)
Expand All @@ -530,6 +550,8 @@ func TestBackupBitClear(t *testing.T) {
defer bak.Close()

require.NoError(t, db.Load(bak, 16))
// Ensure nextTs is still the same.
require.Equal(t, oldValue, db.orc.nextTs())

require.NoError(t, db.View(func(txn *Txn) error {
e, err := txn.Get(key)
Expand Down
37 changes: 27 additions & 10 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,11 +136,11 @@ func (db *DB) replayFunction() func(Entry, valuePointer) error {
} else {
nv = vp.Encode()
meta = meta | bitValuePointer
// Update vhead. If the crash happens while replay was in progess
// and the head is not updated, we will end up replaying all the
// files again.
db.updateHead([]valuePointer{vp})
}
// Update vhead. If the crash happens while replay was in progess
// and the head is not updated, we will end up replaying all the
// files starting from file zero, again.
db.updateHead([]valuePointer{vp})

v := y.ValueStruct{
Value: nv,
Expand Down Expand Up @@ -994,24 +994,41 @@ type flushTask struct {
dropPrefixes [][]byte
}

// handleFlushTask must be run serially.
func (db *DB) handleFlushTask(ft flushTask) error {
// There can be a scenario, when empty memtable is flushed. For example, memtable is empty and
// after writing request to value log, rotation count exceeds db.LogRotatesToFlush.
if ft.mt.Empty() {
func (db *DB) pushHead(ft flushTask) error {
// We don't need to store head pointer in the in-memory mode since we will
// never be replay anything.
if db.opt.InMemory {
return nil
}
// Ensure we never push a zero valued head pointer.
if ft.vptr.IsZero() {
return errors.New("Head should not be zero")
}

// Store badger head even if vptr is zero, need it for readTs
db.opt.Debugf("Storing value log head: %+v\n", ft.vptr)
db.opt.Debugf("Storing offset: %+v\n", ft.vptr)
val := ft.vptr.Encode()

// Pick the max commit ts, so in case of crash, our read ts would be higher than all the
// commits.
headTs := y.KeyWithTs(head, db.orc.nextTs())
ft.mt.Put(headTs, y.ValueStruct{Value: val})

return nil
}

// handleFlushTask must be run serially.
func (db *DB) handleFlushTask(ft flushTask) error {
// There can be a scenario, when empty memtable is flushed. For example, memtable is empty and
// after writing request to value log, rotation count exceeds db.LogRotatesToFlush.
if ft.mt.Empty() {
return nil
}

if err := db.pushHead(ft); err != nil {
return err
}

dk, err := db.registry.latestDataKey()
if err != nil {
return y.Wrapf(err, "failed to get datakey in db.handleFlushTask")
Expand Down
5 changes: 3 additions & 2 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2001,8 +2001,9 @@ func TestNoCrash(t *testing.T) {
}

db.Lock()
// make head to point to first file
db.vhead = valuePointer{0, 0, 0}
// make head to point to second file. We cannot make it point to the first
// vlog file because we cannot push a zero head pointer.
db.vhead = valuePointer{1, 0, 0}
db.Unlock()
db.Close()

Expand Down

0 comments on commit aef68f0

Please sign in to comment.