Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement preallocated buffer #996

Merged
merged 33 commits into from
Sep 11, 2019
Merged
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
fe89341
A draft of the new buffer class which avoids reallocations.
manishrjain Jul 29, 2019
f9f508c
Replace bytes.buffer with custom buffer
ashish-goswami Jul 30, 2019
6f3144d
Implement reader
ashish-goswami Jul 30, 2019
a04f3bd
Implement WriteTo Interface for reader
ashish-goswami Jul 31, 2019
96e6ac2
Fix bugs in Read and WriteTo method of Reader
ashish-goswami Jul 31, 2019
d4aa567
Add builder benchmark
ashish-goswami Jul 31, 2019
2f56ace
Merge remote-tracking branch 'origin/master' into mrjn/zbuffer
ashish-goswami Aug 8, 2019
b2aed3d
Fix compilation failures
ashish-goswami Aug 8, 2019
5285c7e
Remove benchmark builder temp
ashish-goswami Aug 8, 2019
96528de
temp
ashish-goswami Aug 9, 2019
0b17def
Merge remote-tracking branch 'origin/master' into mrjn/zbuffer
ashish-goswami Aug 9, 2019
ca10f22
temp
ashish-goswami Aug 9, 2019
9d07d54
Minor changes
ashish-goswami Aug 10, 2019
4cf3040
Minor changes
ashish-goswami Aug 10, 2019
7fa634f
Remove the func within the benchmark
manishrjain Aug 10, 2019
ab2a884
Increase the size of page by 2 for every new page.
manishrjain Aug 10, 2019
cb7f6e5
Merge branch 'master' into mrjn/zbuffer
ashish-goswami Aug 20, 2019
250e136
Fix ReadAt method
ashish-goswami Aug 20, 2019
9e149d3
Modify ReadAt function and add comments to all functons
ashish-goswami Aug 20, 2019
02b25c7
Fix compilation errors
ashish-goswami Aug 20, 2019
574bb0a
Address review comments
ashish-goswami Aug 21, 2019
37159fc
Address review comments
ashish-goswami Aug 22, 2019
2e75e11
Add tests for PageBuffer
ashish-goswami Aug 23, 2019
42817f8
Add Reader and Truncate to PageBuffer
ashish-goswami Aug 26, 2019
8d996ba
Merge branch 'master' into mrjn/zbuffer
ashish-goswami Aug 26, 2019
333e8c5
Fix comments
ashish-goswami Aug 26, 2019
589e03d
Fix spellings y_test.go
ashish-goswami Aug 26, 2019
7055c56
Remove PageBuffer from TableBuilder
ashish-goswami Aug 27, 2019
ccc2b55
Refactor PageBuffer
ashish-goswami Sep 5, 2019
9760ab2
Minor Fixes
ashish-goswami Sep 6, 2019
44e998b
Merge branch 'master' into mrjn/zbuffer
ashish-goswami Sep 9, 2019
b76f897
Refactor PageBufferReader Read()
ashish-goswami Sep 9, 2019
5a7d172
Address review comments
ashish-goswami Sep 11, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion manifest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,8 @@ func buildTable(t *testing.T, keyValues [][]string) *os.File {
UserMeta: 0,
})
}
f.Write(b.Finish())
_, err = f.Write(b.Finish())
require.NoError(t, err, "unable to write to file.")
f.Close()
f, _ = y.OpenSyncedFile(filename, true)
return f
Expand Down
1 change: 1 addition & 0 deletions stream_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,7 @@ func (w *sortedWriter) createTable(data []byte) error {
if _, err := fd.Write(data); err != nil {
return err
}

opts := table.Options{
LoadingMode: w.db.opt.TableLoadingMode,
ChkMode: w.db.opt.ChecksumVerificationMode,
Expand Down
26 changes: 25 additions & 1 deletion table/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ func TestTableIndex(t *testing.T) {
}
builder.Add(k, vs)
}
f.Write(builder.Finish())
_, err = f.Write(builder.Finish())
require.NoError(t, err, "unable to write to file")

opts = Options{LoadingMode: options.LoadToRAM, ChkMode: options.OnTableAndBlockRead}
tbl, err := OpenTable(f, opts)
Expand All @@ -76,3 +77,26 @@ func TestTableIndex(t *testing.T) {
}
})
}

func BenchmarkBuilder(b *testing.B) {
rand.Seed(time.Now().Unix())
key := func(i int) []byte {
return []byte(fmt.Sprintf("%032d", i))
}

val := make([]byte, 32)
rand.Read(val)
vs := y.ValueStruct{Value: []byte(val)}

keysCount := 1300000 // This number of entries consumes ~64MB of memory.
for i := 0; i < b.N; i++ {
opts := Options{BlockSize: 4 * 1024, BloomFalsePositive: 0.01}
builder := NewTableBuilder(opts)

for i := 0; i < keysCount; i++ {
builder.Add(key(i), vs)
}

_ = builder.Finish()
}
}
12 changes: 8 additions & 4 deletions table/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ func buildTable(t *testing.T, keyValues [][]string) *os.File {
y.AssertTrue(len(kv) == 2)
b.Add(y.KeyWithTs([]byte(kv[0]), 0), y.ValueStruct{Value: []byte(kv[1]), Meta: 'A', UserMeta: 0})
}
f.Write(b.Finish())
_, err = f.Write(b.Finish())
require.NoError(t, err, "writing to file failed")
f.Close()
f, _ = y.OpenSyncedFile(filename, true)
return f
Expand Down Expand Up @@ -666,7 +667,8 @@ func TestTableBigValues(t *testing.T) {
builder.Add(key, vs)
}

f.Write(builder.Finish())
_, err = f.Write(builder.Finish())
require.NoError(t, err, "unable to write to file")
opts = Options{LoadingMode: options.LoadToRAM, ChkMode: options.OnTableAndBlockRead}
tbl, err := OpenTable(f, opts)
require.NoError(t, err, "unable to open table")
Expand Down Expand Up @@ -761,7 +763,8 @@ func BenchmarkReadMerged(b *testing.B) {
v := fmt.Sprintf("%d", id)
builder.Add([]byte(k), y.ValueStruct{Value: []byte(v), Meta: 123, UserMeta: 0})
}
f.Write(builder.Finish())
_, err = f.Write(builder.Finish())
require.NoError(b, err, "unable to write to file")
opts = Options{LoadingMode: options.LoadToRAM, ChkMode: options.OnTableAndBlockRead}
tbl, err := OpenTable(f, opts)
y.Check(err)
Expand Down Expand Up @@ -847,7 +850,8 @@ func getTableForBenchmarks(b *testing.B, count int) *Table {
builder.Add([]byte(k), y.ValueStruct{Value: []byte(v)})
}

f.Write(builder.Finish())
_, err = f.Write(builder.Finish())
require.NoError(b, err, "unable to write to file")
opts = Options{LoadingMode: options.LoadToRAM, ChkMode: options.NoVerification}
tbl, err := OpenTable(f, opts)
require.NoError(b, err, "unable to open table")
Expand Down
170 changes: 170 additions & 0 deletions y/y.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"encoding/binary"
"fmt"
"hash/crc32"
"io"
"math"
"os"
"reflect"
Expand Down Expand Up @@ -338,3 +339,172 @@ func BytesToU32Slice(b []byte) []uint32 {
hdr.Data = uintptr(unsafe.Pointer(&b[0]))
return u32s
}

// page struct contains one underlying buffer.
type page struct {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you plan to add more items to this struct or would it make sense to just do something like this instead:

type page []byte

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might add some functions to page.

buf []byte
}

// PageBuffer consists of many pages. A page is a wrapper over []byte. PageBuffer can act as a
// replacement of bytes.Buffer. Instead of having single underlying buffer, it has multiple
// underlying buffers. Hence it avoids any copy during relocation(as happens in bytes.Buffer).
// PageBuffer allocates memory in pages. Once a page is full, it will allocate page with double the
// size of previous page. Its function are not thread safe.
type PageBuffer struct {
pages []*page

length int // Length of PageBuffer.
nextPageSize int // Size of next page to be allocated.
}

// NewPageBuffer returns a new PageBuffer with first page having size pageSize.
func NewPageBuffer(pageSize int) *PageBuffer {
b := &PageBuffer{}
b.pages = append(b.pages, &page{buf: make([]byte, 0, pageSize)})
b.nextPageSize = pageSize * 2
return b
}

// Write writes data to PageBuffer b. It returns number of bytes written and any error encountered.
func (b *PageBuffer) Write(data []byte) (int, error) {
dataLen := len(data)
for {
cp := b.pages[len(b.pages)-1] // Current page.

n := copy(cp.buf[len(cp.buf):cap(cp.buf)], data)
cp.buf = cp.buf[:len(cp.buf)+n]
b.length += n

if len(data) == n {
break
}
data = data[n:]

b.pages = append(b.pages, &page{buf: make([]byte, 0, b.nextPageSize)})
b.nextPageSize *= 2
}

return dataLen, nil
}

// WriteByte writes data byte to PageBuffer and returns any encountered error.
func (b *PageBuffer) WriteByte(data byte) error {
_, err := b.Write([]byte{data}) // Can be changed later.
return err
}

// Len returns length of PageBuffer.
func (b *PageBuffer) Len() int {
return b.length
}

// pageForOffset returns pageIdx and startIdx for the offset.
func (b *PageBuffer) pageForOffset(offset int) (int, int) {
AssertTrue(offset < b.length)

var pageIdx, startIdx, sizeNow int
for i := 0; i < len(b.pages); i++ {
cp := b.pages[i]

if sizeNow+len(cp.buf)-1 < offset {
sizeNow += len(cp.buf)
} else {
pageIdx = i
startIdx = offset - sizeNow
break
}
}

return pageIdx, startIdx
}

// Truncate truncates PageBuffer to length n.
func (b *PageBuffer) Truncate(n int) {
pageIdx, startIdx := b.pageForOffset(n)
// For simplicity of the code reject extra pages. These pages can be kept.
b.pages = b.pages[:pageIdx+1]
cp := b.pages[len(b.pages)-1]
cp.buf = cp.buf[:startIdx]
b.length = n
}

// Bytes returns whole Buffer data as single []byte.
func (b *PageBuffer) Bytes() []byte {
buf := make([]byte, b.length)
written := 0
for i := 0; i < len(b.pages); i++ {
written += copy(buf[written:], b.pages[i].buf)
}

return buf
}

// WriteTo writes whole buffer to w. It returns number of bytes written and any error encountered.
func (b *PageBuffer) WriteTo(w io.Writer) (int64, error) {
written := int64(0)
for i := 0; i < len(b.pages); i++ {
n, err := w.Write(b.pages[i].buf)
written += int64(n)
if err != nil {
return written, err
}
}

return written, nil
}

// NewReaderAt returns a reader which starts reading from offset in page buffer.
func (b *PageBuffer) NewReaderAt(offset int) *PageBufferReader {
pageIdx, startIdx := b.pageForOffset(offset)

return &PageBufferReader{
buf: b,
pageIdx: pageIdx,
startIdx: startIdx,
}
}

// PageBufferReader is a reader for PageBuffer.
type PageBufferReader struct {
buf *PageBuffer // Underlying page buffer.
pageIdx int // Idx of page from where it will start reading.
startIdx int // Idx inside page - buf.pages[pageIdx] from where it will start reading.
}

// Read reads upto len(p) bytes. It returns number of bytes read and any error encountered.
func (r *PageBufferReader) Read(p []byte) (int, error) {
// Check if there is enough to Read.
pc := len(r.buf.pages)

read := 0
for r.pageIdx < pc && read < len(p) {
cp := r.buf.pages[r.pageIdx] // Current Page.
endIdx := len(cp.buf) // Last Idx up to which we can read from this page.

n := copy(p[read:], cp.buf[r.startIdx:endIdx])
read += n
r.startIdx += n

// Instead of len(cp.buf), we comparing with cap(cp.buf). This ensures that we move to next
// page only when we have read all data. Reading from last page is an edge case. We don't
// want to move to next page until last page is full to its capacity.
if r.startIdx >= cap(cp.buf) {
// We should move to next page.
r.pageIdx++
r.startIdx = 0
continue
}

// When last page in not full to its capacity and we have read all data up to its
// length, just break out of the loop.
if r.pageIdx == pc-1 {
break
}
}

if read == 0 {
return read, io.EOF
}

return read, nil
}
Loading