forked from hypermodeinc/badger
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Fix a rare edge case for value log truncate (hypermodeinc#602)
In a specific scenario, where a FS truncates the value log entries perfectly so that a txn's FIN entry is lost, while the rest of the transaction is not; AND this is the first transaction in the system; Badger can skip truncating this transaction on the first Replay (run when Badger is Open). But, will truncate after it has a few valid transactions. The reason is that when there's only one invalid txn, the valid offset of file becomes zero. It is hard to differentiate between a valid offset of zero, v/s an uninitialized offset. To fix this, we use the file size as a guide. If the file size is greater than the valid offset, we would truncate the file. This interferes with value log GC (and how we grow a value log file to double it's size due to how Windows treats mmapped files). So, we move the truncate logic to within replay function, and out of iterate, so it does not interfere with the functionality of value log GC. Also, move memory expensive tests to manual mode, and the tests write to disk instead of tmpfs. Fixes hypermodeinc#592 .
- Loading branch information
1 parent
dab8968
commit da4f1dc
Showing
4 changed files
with
369 additions
and
155 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,323 @@ | ||
/* | ||
* Copyright 2018 Dgraph Labs, Inc. and Contributors | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package badger | ||
|
||
import ( | ||
"flag" | ||
"fmt" | ||
"io/ioutil" | ||
"log" | ||
"math/rand" | ||
"os" | ||
"path" | ||
"regexp" | ||
"testing" | ||
|
||
"github.com/stretchr/testify/require" | ||
) | ||
|
||
func TestTruncateVlogWithClose(t *testing.T) { | ||
key := func(i int) []byte { | ||
return []byte(fmt.Sprintf("%d%10d", i, i)) | ||
} | ||
data := func(l int) []byte { | ||
m := make([]byte, l) | ||
_, err := rand.Read(m) | ||
require.NoError(t, err) | ||
return m | ||
} | ||
|
||
dir, err := ioutil.TempDir("", "badger") | ||
require.NoError(t, err) | ||
defer os.RemoveAll(dir) | ||
|
||
opt := getTestOptions(dir) | ||
opt.SyncWrites = true | ||
opt.Truncate = true | ||
opt.ValueThreshold = 1 // Force all reads from value log. | ||
|
||
db, err := Open(opt) | ||
require.NoError(t, err) | ||
|
||
err = db.Update(func(txn *Txn) error { | ||
return txn.Set(key(0), data(4055)) | ||
}) | ||
require.NoError(t, err) | ||
|
||
// Close the DB. | ||
require.NoError(t, db.Close()) | ||
require.NoError(t, os.Truncate(path.Join(dir, "000000.vlog"), 4096)) | ||
|
||
// Reopen and write some new data. | ||
db, err = Open(opt) | ||
require.NoError(t, err) | ||
for i := 0; i < 32; i++ { | ||
err := db.Update(func(txn *Txn) error { | ||
return txn.Set(key(i), data(10)) | ||
}) | ||
require.NoError(t, err) | ||
} | ||
// Read it back to ensure that we can read it now. | ||
for i := 0; i < 32; i++ { | ||
err := db.View(func(txn *Txn) error { | ||
item, err := txn.Get(key(i)) | ||
require.NoError(t, err) | ||
val := getItemValue(t, item) | ||
require.Equal(t, 10, len(val)) | ||
return nil | ||
}) | ||
require.NoError(t, err) | ||
} | ||
require.NoError(t, db.Close()) | ||
|
||
// Reopen and read the data again. | ||
db, err = Open(opt) | ||
require.NoError(t, err) | ||
for i := 0; i < 32; i++ { | ||
err := db.View(func(txn *Txn) error { | ||
item, err := txn.Get(key(i)) | ||
require.NoError(t, err) | ||
val := getItemValue(t, item) | ||
require.Equal(t, 10, len(val)) | ||
return nil | ||
}) | ||
require.NoError(t, err) | ||
} | ||
require.NoError(t, db.Close()) | ||
} | ||
|
||
var manual = flag.Bool("manual", false, "Set when manually running some tests.") | ||
|
||
// The following 3 TruncateVlogNoClose tests should be run one after another. | ||
// None of these close the DB, simulating a crash. They should be run with a | ||
// script, which truncates the value log to 4096, lining up with the end of the | ||
// first entry in the txn. At <4096, it would cause the entry to be truncated | ||
// immediately, at >4096, same thing. | ||
func TestTruncateVlogNoClose(t *testing.T) { | ||
if !*manual { | ||
t.Skip("Skipping test meant to be run manually.") | ||
return | ||
} | ||
fmt.Println("running") | ||
dir := "p" | ||
opts := getTestOptions(dir) | ||
opts.SyncWrites = true | ||
opts.Truncate = true | ||
|
||
kv, err := Open(opts) | ||
require.NoError(t, err) | ||
key := func(i int) string { | ||
return fmt.Sprintf("%d%10d", i, i) | ||
} | ||
data := fmt.Sprintf("%4055d", 1) | ||
err = kv.Update(func(txn *Txn) error { | ||
return txn.Set([]byte(key(0)), []byte(data)) | ||
}) | ||
require.NoError(t, err) | ||
} | ||
func TestTruncateVlogNoClose2(t *testing.T) { | ||
if !*manual { | ||
t.Skip("Skipping test meant to be run manually.") | ||
return | ||
} | ||
dir := "p" | ||
opts := getTestOptions(dir) | ||
opts.SyncWrites = true | ||
opts.Truncate = true | ||
|
||
kv, err := Open(opts) | ||
require.NoError(t, err) | ||
key := func(i int) string { | ||
return fmt.Sprintf("%d%10d", i, i) | ||
} | ||
data := fmt.Sprintf("%10d", 1) | ||
for i := 32; i < 64; i++ { | ||
err := kv.Update(func(txn *Txn) error { | ||
return txn.Set([]byte(key(i)), []byte(data)) | ||
}) | ||
require.NoError(t, err) | ||
} | ||
for i := 32; i < 64; i++ { | ||
require.NoError(t, kv.View(func(txn *Txn) error { | ||
item, err := txn.Get([]byte(key(i))) | ||
require.NoError(t, err) | ||
val := getItemValue(t, item) | ||
require.NotNil(t, val) | ||
require.True(t, len(val) > 0) | ||
return nil | ||
})) | ||
} | ||
} | ||
func TestTruncateVlogNoClose3(t *testing.T) { | ||
if !*manual { | ||
t.Skip("Skipping test meant to be run manually.") | ||
return | ||
} | ||
fmt.Print("Running") | ||
dir := "p" | ||
opts := getTestOptions(dir) | ||
opts.SyncWrites = true | ||
opts.Truncate = true | ||
|
||
kv, err := Open(opts) | ||
require.NoError(t, err) | ||
key := func(i int) string { | ||
return fmt.Sprintf("%d%10d", i, i) | ||
} | ||
for i := 32; i < 64; i++ { | ||
require.NoError(t, kv.View(func(txn *Txn) error { | ||
item, err := txn.Get([]byte(key(i))) | ||
require.NoError(t, err) | ||
val := getItemValue(t, item) | ||
require.NotNil(t, val) | ||
require.True(t, len(val) > 0) | ||
return nil | ||
})) | ||
} | ||
} | ||
|
||
func TestBigKeyValuePairs(t *testing.T) { | ||
// This test takes too much memory. So, run separately. | ||
if !*manual { | ||
t.Skip("Skipping test meant to be run manually.") | ||
return | ||
} | ||
opts := DefaultOptions | ||
opts.MaxTableSize = 1 << 20 | ||
opts.ValueLogMaxEntries = 64 | ||
runBadgerTest(t, &opts, func(t *testing.T, db *DB) { | ||
bigK := make([]byte, 65001) | ||
bigV := make([]byte, db.opt.ValueLogFileSize+1) | ||
small := make([]byte, 65000) | ||
|
||
txn := db.NewTransaction(true) | ||
require.Regexp(t, regexp.MustCompile("Key.*exceeded"), txn.Set(bigK, small)) | ||
require.Regexp(t, regexp.MustCompile("Value.*exceeded"), txn.Set(small, bigV)) | ||
|
||
require.NoError(t, txn.Set(small, small)) | ||
require.Regexp(t, regexp.MustCompile("Key.*exceeded"), txn.Set(bigK, bigV)) | ||
|
||
require.NoError(t, db.View(func(txn *Txn) error { | ||
_, err := txn.Get(small) | ||
require.Equal(t, ErrKeyNotFound, err) | ||
return nil | ||
})) | ||
|
||
// Now run a longer test, which involves value log GC. | ||
data := fmt.Sprintf("%100d", 1) | ||
key := func(i int) string { | ||
return fmt.Sprintf("%65000d", i) | ||
} | ||
|
||
saveByKey := func(key string, value []byte) error { | ||
return db.Update(func(txn *Txn) error { | ||
return txn.Set([]byte(key), value) | ||
}) | ||
} | ||
|
||
getByKey := func(key string) error { | ||
return db.View(func(txn *Txn) error { | ||
item, err := txn.Get([]byte(key)) | ||
if err != nil { | ||
return err | ||
} | ||
return item.Value(func(val []byte) { | ||
if len(val) == 0 { | ||
log.Fatalf("key not found %q", len(key)) | ||
} | ||
}) | ||
}) | ||
} | ||
|
||
for i := 0; i < 32; i++ { | ||
if i < 30 { | ||
require.NoError(t, saveByKey(key(i), []byte(data))) | ||
} else { | ||
require.NoError(t, saveByKey(key(i), []byte(fmt.Sprintf("%100d", i)))) | ||
} | ||
} | ||
|
||
for j := 0; j < 5; j++ { | ||
for i := 0; i < 32; i++ { | ||
if i < 30 { | ||
require.NoError(t, saveByKey(key(i), []byte(data))) | ||
} else { | ||
require.NoError(t, saveByKey(key(i), []byte(fmt.Sprintf("%100d", i)))) | ||
} | ||
} | ||
} | ||
|
||
for i := 0; i < 32; i++ { | ||
require.NoError(t, getByKey(key(i))) | ||
} | ||
|
||
var loops int | ||
var err error | ||
for err == nil { | ||
err = db.RunValueLogGC(0.5) | ||
require.NotRegexp(t, regexp.MustCompile("truncate"), err) | ||
loops++ | ||
} | ||
t.Logf("Ran value log GC %d times. Last error: %v\n", loops, err) | ||
}) | ||
} | ||
|
||
// The following test checks for issue #585. | ||
func TestPushValueLogLimit(t *testing.T) { | ||
// This test takes too much memory. So, run separately. | ||
if !*manual { | ||
t.Skip("Skipping test meant to be run manually.") | ||
return | ||
} | ||
opt := DefaultOptions | ||
opt.ValueLogMaxEntries = 64 | ||
opt.ValueLogFileSize = 2 << 30 | ||
runBadgerTest(t, &opt, func(t *testing.T, db *DB) { | ||
data := []byte(fmt.Sprintf("%30d", 1)) | ||
key := func(i int) string { | ||
return fmt.Sprintf("%100d", i) | ||
} | ||
|
||
for i := 0; i < 32; i++ { | ||
if i == 4 { | ||
v := make([]byte, 2<<30) | ||
err := db.Update(func(txn *Txn) error { | ||
return txn.Set([]byte(key(i)), v) | ||
}) | ||
require.NoError(t, err) | ||
} else { | ||
err := db.Update(func(txn *Txn) error { | ||
return txn.Set([]byte(key(i)), data) | ||
}) | ||
require.NoError(t, err) | ||
} | ||
} | ||
|
||
for i := 0; i < 32; i++ { | ||
err := db.View(func(txn *Txn) error { | ||
item, err := txn.Get([]byte(key(i))) | ||
require.NoError(t, err, "Getting key: %s", key(i)) | ||
err = item.Value(func(v []byte) { | ||
_ = v | ||
}) | ||
require.NoError(t, err, "Getting value: %s", key(i)) | ||
return nil | ||
}) | ||
require.NoError(t, err) | ||
} | ||
}) | ||
} |
Oops, something went wrong.