Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement preallocated buffer #996

Merged
merged 33 commits into from
Sep 11, 2019
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
fe89341
A draft of the new buffer class which avoids reallocations.
manishrjain Jul 29, 2019
f9f508c
Replace bytes.buffer with custom buffer
ashish-goswami Jul 30, 2019
6f3144d
Implement reader
ashish-goswami Jul 30, 2019
a04f3bd
Implement WriteTo Interface for reader
ashish-goswami Jul 31, 2019
96e6ac2
Fix bugs in Read and WriteTo method of Reader
ashish-goswami Jul 31, 2019
d4aa567
Add builder benchmark
ashish-goswami Jul 31, 2019
2f56ace
Merge remote-tracking branch 'origin/master' into mrjn/zbuffer
ashish-goswami Aug 8, 2019
b2aed3d
Fix compilation failures
ashish-goswami Aug 8, 2019
5285c7e
Remove benchmark builder temp
ashish-goswami Aug 8, 2019
96528de
temp
ashish-goswami Aug 9, 2019
0b17def
Merge remote-tracking branch 'origin/master' into mrjn/zbuffer
ashish-goswami Aug 9, 2019
ca10f22
temp
ashish-goswami Aug 9, 2019
9d07d54
Minor changes
ashish-goswami Aug 10, 2019
4cf3040
Minor changes
ashish-goswami Aug 10, 2019
7fa634f
Remove the func within the benchmark
manishrjain Aug 10, 2019
ab2a884
Increase the size of page by 2 for every new page.
manishrjain Aug 10, 2019
cb7f6e5
Merge branch 'master' into mrjn/zbuffer
ashish-goswami Aug 20, 2019
250e136
Fix ReadAt method
ashish-goswami Aug 20, 2019
9e149d3
Modify ReadAt function and add comments to all functons
ashish-goswami Aug 20, 2019
02b25c7
Fix compilation errors
ashish-goswami Aug 20, 2019
574bb0a
Address review comments
ashish-goswami Aug 21, 2019
37159fc
Address review comments
ashish-goswami Aug 22, 2019
2e75e11
Add tests for PageBuffer
ashish-goswami Aug 23, 2019
42817f8
Add Reader and Truncate to PageBuffer
ashish-goswami Aug 26, 2019
8d996ba
Merge branch 'master' into mrjn/zbuffer
ashish-goswami Aug 26, 2019
333e8c5
Fix comments
ashish-goswami Aug 26, 2019
589e03d
Fix spellings y_test.go
ashish-goswami Aug 26, 2019
7055c56
Remove PageBuffer from TableBuilder
ashish-goswami Aug 27, 2019
ccc2b55
Refactor PageBuffer
ashish-goswami Sep 5, 2019
9760ab2
Minor Fixes
ashish-goswami Sep 6, 2019
44e998b
Merge branch 'master' into mrjn/zbuffer
ashish-goswami Sep 9, 2019
b76f897
Refactor PageBufferReader Read()
ashish-goswami Sep 9, 2019
5a7d172
Address review comments
ashish-goswami Sep 11, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion db.go
Original file line number Diff line number Diff line change
Expand Up @@ -858,7 +858,7 @@ func writeLevel0Table(ft flushTask, f io.Writer, bopts table.Options) error {
}
b.Add(iter.Key(), iter.Value())
}
_, err := f.Write(b.Finish())
_, err := b.Finish().WriteTo(f)
return err
}

Expand Down
2 changes: 1 addition & 1 deletion db2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -534,7 +534,7 @@ func createTableWithRange(t *testing.T, db *DB, start, end int) *table.Table {
fd, err := y.CreateSyncedFile(table.NewFilename(fileID, db.opt.Dir), true)
require.NoError(t, err)

_, err = fd.Write(b.Finish())
_, err = b.Finish().WriteTo(fd)
require.NoError(t, err, "unable to write to file")

opts := table.Options{LoadingMode: options.LoadToRAM, ChkMode: options.NoVerification}
Expand Down
8 changes: 6 additions & 2 deletions levels.go
Original file line number Diff line number Diff line change
Expand Up @@ -571,15 +571,19 @@ func (s *levelsController) compactBuildTables(
s.kv.opt.Debugf("LOG Compact. Added %d keys. Skipped %d keys. Iteration took: %v",
numKeys, numSkips, time.Since(timeStart))
build := func(fileID uint64) (*table.Table, error) {
fd, err := y.CreateSyncedFile(table.NewFilename(fileID, s.kv.opt.Dir), true)
fd, err := y.CreateSyncedFile(table.NewFilename(fileID, s.kv.opt.Dir), false)
if err != nil {
return nil, errors.Wrapf(err, "While opening new table: %d", fileID)
}

if _, err := fd.Write(builder.Finish()); err != nil {
if _, err := builder.Finish().WriteTo(fd); err != nil {
return nil, errors.Wrapf(err, "Unable to write to file: %d", fileID)
}

if err := fd.Sync(); err != nil {
return nil, errors.Wrapf(err, "Unable to sync file: %d", fileID)
}

opts := table.Options{
LoadingMode: s.kv.opt.TableLoadingMode,
ChkMode: s.kv.opt.ChecksumVerificationMode,
Expand Down
2 changes: 1 addition & 1 deletion manifest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func buildTable(t *testing.T, keyValues [][]string) *os.File {
UserMeta: 0,
})
}
f.Write(b.Finish())
y.Check2(b.Finish().WriteTo(f))
f.Close()
f, _ = y.OpenSyncedFile(filename, true)
return f
Expand Down
15 changes: 9 additions & 6 deletions stream_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,8 +297,8 @@ func (w *sortedWriter) send() error {
return err
}
go func(builder *table.Builder) {
data := builder.Finish()
err := w.createTable(data)
buf := builder.Finish()
err := w.createTable(buf)
w.throttle.Done(err)
}(w.builder)

Expand All @@ -319,16 +319,19 @@ func (w *sortedWriter) Done() error {
return w.send()
}

func (w *sortedWriter) createTable(data []byte) error {
if len(data) == 0 {
func (w *sortedWriter) createTable(buf *y.Buffer) error {
if buf.Len() == 0 {
return nil
}
fileID := w.db.lc.reserveFileID()
fd, err := y.CreateSyncedFile(table.NewFilename(fileID, w.db.opt.Dir), true)
fd, err := y.CreateSyncedFile(table.NewFilename(fileID, w.db.opt.Dir), false)
if err != nil {
return err
}
if _, err := fd.Write(data); err != nil {
if _, err := buf.WriteTo(fd); err != nil {
return err
}
if err := fd.Sync(); err != nil {
return err
}
opts := table.Options{
Expand Down
12 changes: 6 additions & 6 deletions table/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (h header) Size() int { return headerSize }
// Builder is used in building a table.
type Builder struct {
// Typically tens or hundreds of meg. This is for one single file.
buf *bytes.Buffer
buf *y.Buffer

baseKey []byte // Base key for the current block.
baseOffset uint32 // Offset for the current block.
Expand All @@ -75,7 +75,7 @@ type Builder struct {
// NewTableBuilder makes a new TableBuilder.
func NewTableBuilder(opts Options) *Builder {
return &Builder{
buf: newBuffer(1 << 20),
buf: y.NewBuffer(1 << 20),
tableIndex: &pb.TableIndex{},
keyHashes: make([]uint64, 0, 1024), // Avoid some malloc calls.
opt: &opts,
Expand Down Expand Up @@ -126,7 +126,7 @@ func (b *Builder) addHelper(key []byte, v y.ValueStruct) {
b.buf.Write(h.Encode())
b.buf.Write(diffKey) // We only need to store the key difference.

v.EncodeTo(b.buf)
v.EncodeToYBuf(b.buf)
}

/*
Expand All @@ -144,7 +144,7 @@ func (b *Builder) finishBlock() {
b.buf.Write(y.U32SliceToBytes(b.entryOffsets))
b.buf.Write(y.U32ToBytes(uint32(len(b.entryOffsets))))

blockBuf := b.buf.Bytes()[b.baseOffset:] // Store checksum for current block.
blockBuf := b.buf.ReadAt(int(b.baseOffset), -1)
b.writeChecksum(blockBuf)

// TODO(Ashish):Add padding: If we want to make block as multiple of OS pages, we can
Expand Down Expand Up @@ -222,7 +222,7 @@ The table structure looks like
| Index | Index Size | Checksum | Checksum Size |
+---------+------------+-----------+---------------+
*/
func (b *Builder) Finish() []byte {
func (b *Builder) Finish() *y.Buffer {
bf := z.NewBloomFilter(float64(len(b.keyHashes)), b.opt.BloomFalsePositive)
for _, h := range b.keyHashes {
bf.Add(h)
Expand All @@ -244,7 +244,7 @@ func (b *Builder) Finish() []byte {
y.Check(err)

b.writeChecksum(index)
return b.buf.Bytes()
return b.buf
}

func (b *Builder) writeChecksum(data []byte) {
Expand Down
41 changes: 38 additions & 3 deletions table/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,9 @@ import (
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/dgraph-io/badger/options"
"github.com/dgraph-io/badger/y"
"github.com/stretchr/testify/require"
)

func TestTableIndex(t *testing.T) {
Expand Down Expand Up @@ -63,7 +62,8 @@ func TestTableIndex(t *testing.T) {
}
builder.Add(k, vs)
}
f.Write(builder.Finish())
_, err = builder.Finish().WriteTo(f)
require.NoError(t, err, "unable to write to file")

opts = Options{LoadingMode: options.LoadToRAM, ChkMode: options.OnTableAndBlockRead}
tbl, err := OpenTable(f, opts)
Expand All @@ -76,3 +76,38 @@ func TestTableIndex(t *testing.T) {
}
})
}

func BenchmarkBuilder(b *testing.B) {
rand.Seed(time.Now().Unix())
key := func(i int) []byte {
return []byte(fmt.Sprintf("%032d", i))
}

val := make([]byte, 32)
rand.Read(val)
vs := y.ValueStruct{Value: []byte(val)}

keysCount := 1300000
for i := 0; i < b.N; i++ {
opts := Options{BlockSize: 4 * 1024, BloomFalsePositive: 0.01}
builder := NewTableBuilder(opts)
// filename := fmt.Sprintf("%s%c%d.sst", os.TempDir(), os.PathSeparator, rand.Int63())
// f, err := y.OpenSyncedFile(filename, false)
// require.NoError(b, err)

for i := 0; i < keysCount; i++ {
builder.Add(key(i), vs)
}

_ = builder.Finish()
// b.Logf("data size: %d\n", len(data))
// bo := bufio.NewWriterSize(f, 100<<20)
// f.Write(builder.Finish())
// f.Write(builder.Finish())
// fmt.Println(builder.buf.Len())
// io.Copy(f, builder.Finish())
// f.Sync()
// bo.Flush()
// _, err = builder.Finish().WriteTo(f)
}
}
41 changes: 37 additions & 4 deletions table/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ func buildTable(t *testing.T, keyValues [][]string) *os.File {
y.AssertTrue(len(kv) == 2)
b.Add(y.KeyWithTs([]byte(kv[0]), 0), y.ValueStruct{Value: []byte(kv[1]), Meta: 'A', UserMeta: 0})
}
f.Write(b.Finish())
_, err = b.Finish().WriteTo(f)
require.NoError(t, err, "writing to file failed")
f.Close()
f, _ = y.OpenSyncedFile(filename, true)
return f
Expand Down Expand Up @@ -666,7 +667,8 @@ func TestTableBigValues(t *testing.T) {
builder.Add(key, vs)
}

f.Write(builder.Finish())
_, err = builder.Finish().WriteTo(f)
require.NoError(t, err, "unable to write to file")
opts = Options{LoadingMode: options.LoadToRAM, ChkMode: options.OnTableAndBlockRead}
tbl, err := OpenTable(f, opts)
require.NoError(t, err, "unable to open table")
Expand Down Expand Up @@ -761,7 +763,8 @@ func BenchmarkReadMerged(b *testing.B) {
v := fmt.Sprintf("%d", id)
builder.Add([]byte(k), y.ValueStruct{Value: []byte(v), Meta: 123, UserMeta: 0})
}
f.Write(builder.Finish())
_, err = builder.Finish().WriteTo(f)
require.NoError(b, err, "unable to write to file")
opts = Options{LoadingMode: options.LoadToRAM, ChkMode: options.OnTableAndBlockRead}
tbl, err := OpenTable(f, opts)
y.Check(err)
Expand Down Expand Up @@ -847,9 +850,39 @@ func getTableForBenchmarks(b *testing.B, count int) *Table {
builder.Add([]byte(k), y.ValueStruct{Value: []byte(v)})
}

f.Write(builder.Finish())
_, err = builder.Finish().WriteTo(f)
require.NoError(b, err, "unable to write to file")
opts = Options{LoadingMode: options.LoadToRAM, ChkMode: options.NoVerification}
tbl, err := OpenTable(f, opts)
require.NoError(b, err, "unable to open table")
return tbl
}

func BenchmarkFilewrite(b *testing.B) {
a := bytes.Repeat([]byte("a"), 1024)
rand.Seed(time.Now().Unix())
b.Run("sync-flag", func(b *testing.B) {
filename := fmt.Sprintf("%s%s%d.sst", os.TempDir(), string(os.PathSeparator), rand.Int63())
f, err := y.OpenSyncedFile(filename, true)
require.NoError(b, err)
for i := 0; i < b.N; i++ {
for i := 0; i < 100; i++ {
_, err := f.Write(a)
require.NoError(b, err)
}
}
})

b.Run("fysnc", func(b *testing.B) {
filename := fmt.Sprintf("%s%s%d.sst", os.TempDir(), string(os.PathSeparator), rand.Int63())
f, err := y.OpenSyncedFile(filename, false)
require.NoError(b, err)
for i := 0; i < b.N; i++ {
for i := 0; i < 100; i++ {
_, err := f.Write(a)
require.NoError(b, err)
}
require.NoError(b, f.Sync())
}
})
}
12 changes: 12 additions & 0 deletions y/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,18 @@ func (v *ValueStruct) EncodeTo(buf *bytes.Buffer) {
buf.Write(v.Value)
}

// EncodeTo should be kept in sync with the Encode function above. The reason
// this function exists is to avoid creating byte arrays per key-value pair in
// table/builder.go.
func (v *ValueStruct) EncodeToYBuf(buf *Buffer) {
buf.WriteByte(v.Meta)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Error return value of buf.WriteByte is not checked (from errcheck)

buf.WriteByte(v.UserMeta)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Error return value of buf.WriteByte is not checked (from errcheck)

var enc [binary.MaxVarintLen64]byte
sz := binary.PutUvarint(enc[:], v.ExpiresAt)
buf.Write(enc[:sz])

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Error return value of buf.Write is not checked (from errcheck)

buf.Write(v.Value)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Error return value of buf.Write is not checked (from errcheck)

}

// Iterator is an interface for a basic iterator.
type Iterator interface {
Next()
Expand Down
Loading