Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improvements: Manual Memory allocation via Calloc #1459

Merged
merged 28 commits into from
Aug 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
1291ce8
Revert "Revert "Compress/Encrypt Blocks in the background (#1227)" (#…
manishrjain Aug 11, 2020
f2e9b48
Revert "Revert "Buffer pool for decompression (#1308)" (#1408)"
manishrjain Aug 11, 2020
707323d
Revert "Revert "fix: Fix race condition in block.incRef (#1337)" (#14…
manishrjain Aug 11, 2020
54aa796
Compression, Decompression and some Encryption is using Calloc.
manishrjain Aug 11, 2020
b8e2016
Add Calloc functions
manishrjain Aug 11, 2020
f0b712c
Rename to Calloc. And also add a debug for number of blocks
manishrjain Aug 11, 2020
f70c560
The allocations and deallocations match up
manishrjain Aug 11, 2020
ea70a96
A bit more debugging
manishrjain Aug 11, 2020
3e660bb
Switch table builder to use Calloc.
manishrjain Aug 12, 2020
e254ecc
Switch levels.go compaction to only build 5 tables at a time.
manishrjain Aug 12, 2020
f416452
Have a dedicated compactor for L0 and L1.
manishrjain Aug 12, 2020
878d066
Add pool/calloc benchmark
Aug 12, 2020
f698845
Move Calloc and Free to y
Aug 14, 2020
1c1e17c
Fix numCompactor error
Aug 14, 2020
233be8e
Handle nil table in compaction and test fixes
Aug 14, 2020
378491d
Revert "Revert "add assert to check integer overflow for table size (…
Aug 14, 2020
84df817
Make level multiplier 15
Aug 14, 2020
915b0a5
Don't use Rand, instead use local rand instance.
manishrjain Aug 14, 2020
d5527e7
Pre-allocate 1.2x of Block Size for decompression.
manishrjain Aug 14, 2020
37b8b9e
Bring in latest Ristretto. Deal with rejected blocks.
manishrjain Aug 17, 2020
b73c7a4
Deal with when Logger is nil
manishrjain Aug 17, 2020
7aad718
Bring master in
manishrjain Aug 17, 2020
7233753
Address review comments
Aug 18, 2020
ff36cd3
Move WithLoggerLevel next to WithLogger in options.
manishrjain Aug 18, 2020
e5878e7
fix race condition in newtables
Aug 19, 2020
08fe986
Fix table tests
Aug 19, 2020
edb0a77
Fix persistDiscardStats test
Aug 19, 2020
263a3c7
Merge branch 'master' into mrjn/calloc
Aug 19, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 26 additions & 3 deletions badger/cmd/bank.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func toSlice(bal uint64) []byte {
}

func getBalance(txn *badger.Txn, account int) (uint64, error) {
item, err := txn.Get(key(account))
item, err := get(txn, key(account))
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -197,14 +197,33 @@ func diff(a, b []account) string {

var errFailure = errors.New("test failed due to balance mismatch")

// get function will fetch the value for the key "k" either by using the
// txn.Get API or the iterator.Seek API.
func get(txn *badger.Txn, k []byte) (*badger.Item, error) {
if rand.Int()%2 == 0 {
return txn.Get(k)
}

iopt := badger.DefaultIteratorOptions
// PrefectValues is expensive. We don't need it here.
iopt.PrefetchValues = false
it := txn.NewIterator(iopt)
defer it.Close()
it.Seek(k)
if it.Valid() {
return it.Item(), nil
}
return nil, badger.ErrKeyNotFound
}

// seekTotal retrives the total of all accounts by seeking for each account key.
func seekTotal(txn *badger.Txn) ([]account, error) {
expected := uint64(numAccounts) * uint64(initialBal)
var accounts []account

var total uint64
for i := 0; i < numAccounts; i++ {
item, err := txn.Get(key(i))
item, err := get(txn, key(i))
if err != nil {
log.Printf("Error for account: %d. err=%v. key=%q\n", i, err, key(i))
return accounts, err
Expand Down Expand Up @@ -343,7 +362,11 @@ func runTest(cmd *cobra.Command, args []string) error {
WithNumMemtables(2).
// Do not GC any versions, because we need them for the disect..
WithNumVersionsToKeep(int(math.MaxInt32)).
WithValueThreshold(1) // Make all values go to value log
WithValueThreshold(1). // Make all values go to value log
WithCompression(options.ZSTD).
WithKeepL0InMemory(false).
WithMaxCacheSize(10 << 20)

if mmap {
opts = opts.WithTableLoadingMode(options.MemoryMap)
}
Expand Down
1 change: 0 additions & 1 deletion badger/cmd/write_bench.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,6 @@ func reportStats(c *y.Closer, db *badger.DB) {
if showKeysCount {
showKeysStats(db)
}

// fetch directory contents
if showDir {
err := filepath.Walk(sstDir, func(path string, info os.FileInfo, err error) error {
Expand Down
11 changes: 9 additions & 2 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func (db *DB) replayFunction() func(Entry, valuePointer) error {
func Open(opt Options) (db *DB, err error) {
// It's okay to have zero compactors which will disable all compactions but
// we cannot have just one compactor otherwise we will end up with all data
// one level 2.
// on level 2.
if opt.NumCompactors == 1 {
return nil, errors.New("Cannot have 1 compactor. Need at least 2")
}
Expand Down Expand Up @@ -324,6 +324,12 @@ func Open(opt Options) (db *DB, err error) {
MaxCost: int64(float64(opt.MaxCacheSize) * 0.95),
BufferItems: 64,
Metrics: true,
OnEvict: func(i *ristretto.Item) {
table.BlockEvictHandler(i.Value)
},
OnReject: func(i *ristretto.Item) {
table.BlockEvictHandler(i.Value)
},
}
db.blockCache, err = ristretto.NewCache(&config)
if err != nil {
Expand Down Expand Up @@ -986,6 +992,7 @@ func buildL0Table(ft flushTask, bopts table.Options) []byte {
defer iter.Close()
b := table.NewTableBuilder(bopts)
defer b.Close()

var vp valuePointer
for iter.SeekToFirst(); iter.Valid(); iter.Next() {
if len(ft.dropPrefixes) > 0 && hasAnyPrefixes(iter.Key(), ft.dropPrefixes) {
Expand All @@ -997,7 +1004,7 @@ func buildL0Table(ft flushTask, bopts table.Options) []byte {
}
b.Add(iter.Key(), iter.Value(), vp.Len)
}
return b.Finish()
return b.Finish(true)
}

type flushTask struct {
Expand Down
11 changes: 4 additions & 7 deletions db2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,7 @@ func createTableWithRange(t *testing.T, db *DB, start, end int) *table.Table {
fd, err := y.CreateSyncedFile(table.NewFilename(fileID, db.opt.Dir), true)
require.NoError(t, err)

_, err = fd.Write(b.Finish())
_, err = fd.Write(b.Finish(false))
require.NoError(t, err, "unable to write to file")

tab, err := table.OpenTable(fd, bopts)
Expand Down Expand Up @@ -670,16 +670,13 @@ func TestL0GCBug(t *testing.T) {
// Simulate a crash by not closing db1 but releasing the locks.
if db1.dirLockGuard != nil {
require.NoError(t, db1.dirLockGuard.release())
db1.dirLockGuard = nil
}
if db1.valueDirGuard != nil {
require.NoError(t, db1.valueDirGuard.release())
db1.valueDirGuard = nil
}
for _, f := range db1.vlog.filesMap {
require.NoError(t, f.fd.Close())
}
require.NoError(t, db1.registry.Close())
require.NoError(t, db1.lc.close())
require.NoError(t, db1.manifest.close())
require.NoError(t, db1.Close())

db2, err := Open(opts)
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.12
require (
github.com/DataDog/zstd v1.4.1
github.com/cespare/xxhash v1.1.0
github.com/dgraph-io/ristretto v0.0.3-0.20200630154024-f66de99634de
github.com/dgraph-io/ristretto v0.0.4-0.20200817124926-18e279725890
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2
github.com/dustin/go-humanize v1.0.0
github.com/golang/protobuf v1.3.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwc
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgraph-io/ristretto v0.0.3-0.20200630154024-f66de99634de h1:t0UHb5vdojIDUqktM6+xJAfScFBsVpXZmqC9dsgJmeA=
github.com/dgraph-io/ristretto v0.0.3-0.20200630154024-f66de99634de/go.mod h1:KPxhHT9ZxKefz+PCeOGsrHpl1qZ7i70dGTu2u+Ahh6E=
github.com/dgraph-io/ristretto v0.0.4-0.20200817124926-18e279725890 h1:/6pLcQq2GNdLPOotXztuLDXYRPraTIzZMPiJW8HzAwg=
github.com/dgraph-io/ristretto v0.0.4-0.20200817124926-18e279725890/go.mod h1:KPxhHT9ZxKefz+PCeOGsrHpl1qZ7i70dGTu2u+Ahh6E=
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA=
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo=
Expand Down
8 changes: 4 additions & 4 deletions key_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func validRegistry(fp *os.File, encryptionKey []byte) error {
}
if len(encryptionKey) > 0 {
// Decrypting sanity text.
if eSanityText, err = y.XORBlock(eSanityText, encryptionKey, iv); err != nil {
if eSanityText, err = y.XORBlockAllocate(eSanityText, encryptionKey, iv); err != nil {
return y.Wrapf(err, "During validRegistry")
}
}
Expand Down Expand Up @@ -200,7 +200,7 @@ func (kri *keyRegistryIterator) next() (*pb.DataKey, error) {
}
if len(kri.encryptionKey) > 0 {
// Decrypt the key if the storage key exists.
if dataKey.Data, err = y.XORBlock(dataKey.Data, kri.encryptionKey, dataKey.Iv); err != nil {
if dataKey.Data, err = y.XORBlockAllocate(dataKey.Data, kri.encryptionKey, dataKey.Iv); err != nil {
return nil, y.Wrapf(err, "While decrypting datakey in keyRegistryIterator.next")
}
}
Expand Down Expand Up @@ -254,7 +254,7 @@ func WriteKeyRegistry(reg *KeyRegistry, opt KeyRegistryOptions) error {
eSanity := sanityText
if len(opt.EncryptionKey) > 0 {
var err error
eSanity, err = y.XORBlock(eSanity, opt.EncryptionKey, iv)
eSanity, err = y.XORBlockAllocate(eSanity, opt.EncryptionKey, iv)
if err != nil {
return y.Wrapf(err, "Error while encrpting sanity text in WriteKeyRegistry")
}
Expand Down Expand Up @@ -395,7 +395,7 @@ func storeDataKey(buf *bytes.Buffer, storageKey []byte, k *pb.DataKey) error {
return nil
}
var err error
k.Data, err = y.XORBlock(k.Data, storageKey, k.Iv)
k.Data, err = y.XORBlockAllocate(k.Data, storageKey, k.Iv)
return err
}
// In memory datakey will be plain text so encrypting before storing to the disk.
Expand Down
9 changes: 6 additions & 3 deletions levels.go
Original file line number Diff line number Diff line change
Expand Up @@ -657,7 +657,7 @@ nextTable:
numKeys, numSkips, time.Since(timeStart))
if builder.Empty() {
// Cleanup builder resources:
builder.Finish()
builder.Finish(false)
builder.Close()
continue
}
Expand All @@ -677,7 +677,7 @@ nextTable:
return nil, errors.Wrapf(err, "While opening new table: %d", fileID)
}

if _, err := fd.Write(builder.Finish()); err != nil {
if _, err := fd.Write(builder.Finish(false)); err != nil {
return nil, errors.Wrapf(err, "Unable to write to file: %d", fileID)
}
tbl, err := table.OpenTable(fd, bopts)
Expand All @@ -688,7 +688,7 @@ nextTable:
var tbl *table.Table
var err error
if s.kv.opt.InMemory {
tbl, err = table.OpenInMemoryTable(builder.Finish(), fileID, &bopts)
tbl, err = table.OpenInMemoryTable(builder.Finish(true), fileID, &bopts)
} else {
tbl, err = build(fileID)
}
Expand All @@ -700,6 +700,9 @@ nextTable:

mu.Lock()
newTables = append(newTables, tbl)
num := atomic.LoadInt32(&table.NumBlocks)
allocs := float64(atomic.LoadInt64(&y.NumAllocs)) / float64((1 << 20))
s.kv.opt.Debugf("Num Blocks: %d. Num Allocs (MB): %.2f\n", num, allocs)
mu.Unlock()
}(builder)
}
Expand Down
4 changes: 2 additions & 2 deletions levels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func createAndOpen(db *DB, td []keyValVersion, level int) {
panic(err)
}

if _, err = fd.Write(b.Finish()); err != nil {
if _, err = fd.Write(b.Finish(false)); err != nil {
panic(err)
}
tab, err := table.OpenTable(fd, opts)
Expand Down Expand Up @@ -740,7 +740,7 @@ func createEmptyTable(db *DB) *table.Table {
b.Add(y.KeyWithTs([]byte("foo"), 1), y.ValueStruct{}, 0)

// Open table in memory to avoid adding changes to manifest file.
tab, err := table.OpenInMemoryTable(b.Finish(), db.lc.reserveFileID(), &opts)
tab, err := table.OpenInMemoryTable(b.Finish(true), db.lc.reserveFileID(), &opts)
if err != nil {
panic(err)
}
Expand Down
2 changes: 1 addition & 1 deletion manifest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func buildTable(t *testing.T, keyValues [][]string, bopts table.Options) *os.Fil
UserMeta: 0,
}, 0)
}
_, err = f.Write(b.Finish())
_, err = f.Write(b.Finish(false))
require.NoError(t, err, "unable to write to file.")
f.Close()
f, _ = y.OpenSyncedFile(filename, true)
Expand Down
29 changes: 15 additions & 14 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func DefaultOptions(path string) Options {
Dir: path,
ValueDir: path,
LevelOneSize: 256 << 20,
LevelSizeMultiplier: 10,
LevelSizeMultiplier: 15,
TableLoadingMode: options.MemoryMap,
ValueLogLoadingMode: options.MemoryMap,
// table.MemoryMap to mmap() the tables.
Expand Down Expand Up @@ -176,6 +176,7 @@ func DefaultOptions(path string) Options {

func buildTableOptions(opt Options) table.Options {
return table.Options{
TableSize: uint64(opt.MaxTableSize),
BlockSize: opt.BlockSize,
BloomFalsePositive: opt.BloomFalsePositive,
LoadBloomsOnOpen: opt.LoadBloomsOnOpen,
Expand Down Expand Up @@ -230,17 +231,6 @@ func (opt Options) WithValueDir(val string) Options {
return opt
}

// WithLoggingLevel returns a new Options value with logging level of the
// default logger set to the given value.
// LoggingLevel sets the level of logging. It should be one of DEBUG, INFO,
// WARNING or ERROR levels.
//
// The default value of LoggingLevel is INFO.
func (opt Options) WithLoggingLevel(val loggingLevel) Options {
opt.Logger = defaultLogger(val)
return opt
}

// WithSyncWrites returns a new Options value with SyncWrites set to the given value.
//
// When SyncWrites is true all writes are synced to disk. Setting this to false would achieve better
Expand Down Expand Up @@ -318,6 +308,17 @@ func (opt Options) WithLogger(val Logger) Options {
return opt
}

// WithLoggingLevel returns a new Options value with logging level of the
// default logger set to the given value.
// LoggingLevel sets the level of logging. It should be one of DEBUG, INFO,
// WARNING or ERROR levels.
//
// The default value of LoggingLevel is INFO.
func (opt Options) WithLoggingLevel(val loggingLevel) Options {
opt.Logger = defaultLogger(val)
return opt
}

// WithMaxTableSize returns a new Options value with MaxTableSize set to the given value.
//
// MaxTableSize sets the maximum size in bytes for each LSM table or file.
Expand All @@ -335,7 +336,7 @@ func (opt Options) WithMaxTableSize(val int64) Options {
// Once a level grows to be larger than this ratio allowed, the compaction process will be
// triggered.
//
// The default value of LevelSizeMultiplier is 10.
// The default value of LevelSizeMultiplier is 15.
func (opt Options) WithLevelSizeMultiplier(val int) Options {
opt.LevelSizeMultiplier = val
return opt
Expand Down Expand Up @@ -460,7 +461,7 @@ func (opt Options) WithValueLogMaxEntries(val uint32) Options {
// NumCompactors sets the number of compaction workers to run concurrently.
// Setting this to zero stops compactions, which could eventually cause writes to block forever.
//
// The default value of NumCompactors is 2. One is dedicated just for L0.
// The default value of NumCompactors is 2. One is dedicated just for L0 and L1.
func (opt Options) WithNumCompactors(val int) Options {
opt.NumCompactors = val
return opt
Expand Down
3 changes: 2 additions & 1 deletion stream_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,7 @@ func (w *sortedWriter) send(done bool) error {
return err
}
go func(builder *table.Builder) {
defer builder.Close()
err := w.createTable(builder)
w.throttle.Done(err)
}(w.builder)
Expand Down Expand Up @@ -410,7 +411,7 @@ func (w *sortedWriter) Done() error {
}

func (w *sortedWriter) createTable(builder *table.Builder) error {
data := builder.Finish()
data := builder.Finish(w.db.opt.InMemory)
if len(data) == 0 {
return nil
}
Expand Down
Loading