diff --git a/kv.go b/db.go similarity index 93% rename from kv.go rename to db.go index 457c922c6..338966961 100644 --- a/kv.go +++ b/db.go @@ -49,9 +49,9 @@ type closers struct { valueGC *y.Closer } -// KV provides the various functions required to interact with Badger. -// KV is thread-safe. -type KV struct { +// DB provides the various functions required to interact with Badger. +// DB is thread-safe. +type DB struct { sync.RWMutex // Guards list of inmemory tables, not individual reads and writes. dirLockGuard *DirectoryLockGuard @@ -81,7 +81,7 @@ const ( kvWriteChCapacity = 1000 ) -func replayFunction(out *KV) func(entry, valuePointer) error { +func replayFunction(out *DB) func(entry, valuePointer) error { type txnEntry struct { nk []byte v y.ValueStruct @@ -163,8 +163,8 @@ func replayFunction(out *KV) func(entry, valuePointer) error { } } -// NewKV returns a new KV object. -func NewKV(optParam *Options) (out *KV, err error) { +// Open returns a new DB object. +func Open(optParam *Options) (out *DB, err error) { // Make a copy early and fill in maxBatchSize opt := *optParam opt.maxBatchSize = (15 * opt.MaxTableSize) / 100 @@ -229,13 +229,13 @@ func NewKV(optParam *Options) (out *KV, err error) { } heap.Init(&gs.commitMark) - out = &KV{ + out = &DB{ imm: make([]*skl.Skiplist, 0, opt.NumMemtables), flushChan: make(chan flushTask, opt.NumMemtables), writeCh: make(chan *request, kvWriteChCapacity), opt: opt, manifest: manifestFile, - elog: trace.NewEventLog("Badger", "KV"), + elog: trace.NewEventLog("Badger", "DB"), dirLockGuard: dirLockGuard, valueDirGuard: valueDirLockGuard, txnState: gs, @@ -309,9 +309,9 @@ func NewKV(optParam *Options) (out *KV, err error) { return out, nil } -// Close closes a KV. It's crucial to call it to ensure all the pending updates +// Close closes a DB. It's crucial to call it to ensure all the pending updates // make their way to disk. -func (s *KV) Close() (err error) { +func (s *DB) Close() (err error) { s.elog.Printf("Closing database") // Stop value GC first. s.closers.valueGC.SignalAndWait() @@ -321,7 +321,7 @@ func (s *KV) Close() (err error) { // Now close the value log. if vlogErr := s.vlog.Close(); err == nil { - err = errors.Wrap(vlogErr, "KV.Close") + err = errors.Wrap(vlogErr, "DB.Close") } // Make sure that block writer is done pushing stuff into memtable! @@ -364,7 +364,7 @@ func (s *KV) Close() (err error) { s.elog.Printf("Compaction finished") if lcErr := s.lc.close(); err == nil { - err = errors.Wrap(lcErr, "KV.Close") + err = errors.Wrap(lcErr, "DB.Close") } s.elog.Printf("Waiting for closer") s.closers.updateSize.SignalAndWait() @@ -372,25 +372,25 @@ func (s *KV) Close() (err error) { s.elog.Finish() if guardErr := s.dirLockGuard.Release(); err == nil { - err = errors.Wrap(guardErr, "KV.Close") + err = errors.Wrap(guardErr, "DB.Close") } if s.valueDirGuard != nil { if guardErr := s.valueDirGuard.Release(); err == nil { - err = errors.Wrap(guardErr, "KV.Close") + err = errors.Wrap(guardErr, "DB.Close") } } if manifestErr := s.manifest.close(); err == nil { - err = errors.Wrap(manifestErr, "KV.Close") + err = errors.Wrap(manifestErr, "DB.Close") } // Fsync directories to ensure that lock file, and any other removed files whose directory // we haven't specifically fsynced, are guaranteed to have their directory entry removal // persisted to disk. if syncErr := syncDir(s.opt.Dir); err == nil { - err = errors.Wrap(syncErr, "KV.Close") + err = errors.Wrap(syncErr, "DB.Close") } if syncErr := syncDir(s.opt.ValueDir); err == nil { - err = errors.Wrap(syncErr, "KV.Close") + err = errors.Wrap(syncErr, "DB.Close") } return err @@ -417,7 +417,7 @@ func syncDir(dir string) error { } // getMemtables returns the current memtables and get references. -func (s *KV) getMemTables() ([]*skl.Skiplist, func()) { +func (s *DB) getMemTables() ([]*skl.Skiplist, func()) { s.RLock() defer s.RUnlock() @@ -442,7 +442,7 @@ func (s *KV) getMemTables() ([]*skl.Skiplist, func()) { // get returns the value in memtable or disk for given key. // Note that value will include meta byte. -func (s *KV) get(key []byte) (y.ValueStruct, error) { +func (s *DB) get(key []byte) (y.ValueStruct, error) { tables, decr := s.getMemTables() // Lock should be released. defer decr() @@ -457,7 +457,7 @@ func (s *KV) get(key []byte) (y.ValueStruct, error) { return s.lc.get(key) } -func (s *KV) updateOffset(ptrs []valuePointer) { +func (s *DB) updateOffset(ptrs []valuePointer) { var ptr valuePointer for i := len(ptrs) - 1; i >= 0; i-- { p := ptrs[i] @@ -482,11 +482,11 @@ var requestPool = sync.Pool{ }, } -func (s *KV) shouldWriteValueToLSM(e entry) bool { +func (s *DB) shouldWriteValueToLSM(e entry) bool { return len(e.Value) < s.opt.ValueThreshold } -func (s *KV) writeToLSM(b *request) error { +func (s *DB) writeToLSM(b *request) error { if len(b.Ptrs) != len(b.Entries) { return errors.Errorf("Ptrs and Entries don't match: %+v", b) } @@ -516,7 +516,7 @@ func (s *KV) writeToLSM(b *request) error { } // writeRequests is called serially by only one goroutine. -func (s *KV) writeRequests(reqs []*request) error { +func (s *DB) writeRequests(reqs []*request) error { if len(reqs) == 0 { return nil } @@ -565,7 +565,7 @@ func (s *KV) writeRequests(reqs []*request) error { return nil } -func (s *KV) doWrites(lc *y.Closer) { +func (s *DB) doWrites(lc *y.Closer) { defer lc.Done() pendingCh := make(chan struct{}, 1) @@ -625,7 +625,7 @@ func (s *KV) doWrites(lc *y.Closer) { } } -func (s *KV) sendToWriteCh(entries []*entry) (*request, error) { +func (s *DB) sendToWriteCh(entries []*entry) (*request, error) { var count, size int64 for _, e := range entries { size += int64(s.opt.estimateSize(e)) @@ -650,7 +650,7 @@ func (s *KV) sendToWriteCh(entries []*entry) (*request, error) { // batchSet applies a list of badger.Entry. If a request level error occurs it // will be returned. // Check(kv.BatchSet(entries)) -func (s *KV) batchSet(entries []*entry) error { +func (s *DB) batchSet(entries []*entry) error { req, err := s.sendToWriteCh(entries) if err != nil { return err @@ -669,7 +669,7 @@ func (s *KV) batchSet(entries []*entry) error { // err := kv.BatchSetAsync(entries, func(err error)) { // Check(err) // } -func (s *KV) batchSetAsync(entries []*entry, f func(error)) error { +func (s *DB) batchSetAsync(entries []*entry, f func(error)) error { req, err := s.sendToWriteCh(entries) if err != nil { return err @@ -688,7 +688,7 @@ func (s *KV) batchSetAsync(entries []*entry, f func(error)) error { var errNoRoom = errors.New("No room for write") // ensureRoomForWrite is always called serially. -func (s *KV) ensureRoomForWrite() error { +func (s *DB) ensureRoomForWrite() error { var err error s.Lock() defer s.Unlock() @@ -696,7 +696,7 @@ func (s *KV) ensureRoomForWrite() error { return nil } - y.AssertTrue(s.mt != nil) // A nil mt indicates that KV is being closed. + y.AssertTrue(s.mt != nil) // A nil mt indicates that DB is being closed. select { case s.flushChan <- flushTask{s.mt, s.vptr}: s.elog.Printf("Flushing value log to disk if async mode.") @@ -743,7 +743,7 @@ type flushTask struct { vptr valuePointer } -func (s *KV) flushMemtable(lc *y.Closer) error { +func (s *DB) flushMemtable(lc *y.Closer) error { defer lc.Done() for ft := range s.flushChan { @@ -818,7 +818,7 @@ func exists(path string) (bool, error) { return true, err } -func (s *KV) updateSize(lc *y.Closer) { +func (s *DB) updateSize(lc *y.Closer) { defer lc.Done() metricsTicker := time.NewTicker(5 * time.Minute) @@ -882,11 +882,11 @@ func (s *KV) updateSize(lc *y.Closer) { // the cost of increased activity on the LSM tree. discardRatio must be in the range (0.0, 1.0), // both endpoints excluded, otherwise an ErrInvalidRequest is returned. // -// Only one GC is allowed at a time. If another value log GC is running, or KV has been closed, this +// Only one GC is allowed at a time. If another value log GC is running, or DB has been closed, this // would return an ErrRejected. // // Note: Every time GC is run, it would produce a spike of activity on the LSM tree. -func (s *KV) RunValueLogGC(discardRatio float64) error { +func (s *DB) RunValueLogGC(discardRatio float64) error { if discardRatio >= 1.0 || discardRatio <= 0.0 { return ErrInvalidRequest } diff --git a/kv_test.go b/db_test.go similarity index 95% rename from kv_test.go rename to db_test.go index 1b2c02389..82a35bc40 100644 --- a/kv_test.go +++ b/db_test.go @@ -43,7 +43,7 @@ func getTestOptions(dir string) *Options { return opt } -func getItemValue(t *testing.T, item *KVItem) (val []byte) { +func getItemValue(t *testing.T, item *Item) (val []byte) { v, err := item.Value() if err != nil { t.Error(err) @@ -54,13 +54,13 @@ func getItemValue(t *testing.T, item *KVItem) (val []byte) { return v } -func txnSet(t *testing.T, kv *KV, key []byte, val []byte, meta byte) { +func txnSet(t *testing.T, kv *DB, key []byte, val []byte, meta byte) { txn := kv.NewTransaction(true) require.NoError(t, txn.Set(key, val, meta)) require.NoError(t, txn.Commit(nil)) } -func txnDelete(t *testing.T, kv *KV, key []byte) { +func txnDelete(t *testing.T, kv *DB, key []byte) { txn := kv.NewTransaction(true) require.NoError(t, txn.Delete(key)) require.NoError(t, txn.Commit(nil)) @@ -70,7 +70,7 @@ func TestWrite(t *testing.T) { dir, err := ioutil.TempDir("", "badger") require.NoError(t, err) defer os.RemoveAll(dir) - kv, err := NewKV(getTestOptions(dir)) + kv, err := Open(getTestOptions(dir)) require.NoError(t, err) defer kv.Close() @@ -83,7 +83,7 @@ func TestConcurrentWrite(t *testing.T) { dir, err := ioutil.TempDir("", "badger") require.NoError(t, err) defer os.RemoveAll(dir) - kv, _ := NewKV(getTestOptions(dir)) + kv, _ := Open(getTestOptions(dir)) defer kv.Close() // Not a benchmark. Just a simple test for concurrent writes. @@ -138,7 +138,7 @@ func TestGet(t *testing.T) { dir, err := ioutil.TempDir("", "badger") require.NoError(t, err) defer os.RemoveAll(dir) - kv, err := NewKV(getTestOptions(dir)) + kv, err := Open(getTestOptions(dir)) if err != nil { t.Error(err) } @@ -190,7 +190,7 @@ func TestExists(t *testing.T) { dir, err := ioutil.TempDir("", "badger") require.NoError(t, err) defer os.RemoveAll(dir) - kv, err := NewKV(getTestOptions(dir)) + kv, err := Open(getTestOptions(dir)) if err != nil { t.Error(err) } @@ -233,7 +233,7 @@ func TestGetMore(t *testing.T) { dir, err := ioutil.TempDir("", "badger") require.NoError(t, err) defer os.RemoveAll(dir) - kv, err := NewKV(getTestOptions(dir)) + kv, err := Open(getTestOptions(dir)) if err != nil { t.Error(err) t.Fail() @@ -341,7 +341,7 @@ func TestExistsMore(t *testing.T) { dir, err := ioutil.TempDir("", "badger") require.NoError(t, err) defer os.RemoveAll(dir) - kv, err := NewKV(getTestOptions(dir)) + kv, err := Open(getTestOptions(dir)) if err != nil { t.Error(err) t.Fail() @@ -414,7 +414,7 @@ func TestIterate2Basic(t *testing.T) { dir, err := ioutil.TempDir("", "badger") require.NoError(t, err) defer os.RemoveAll(dir) - kv, _ := NewKV(getTestOptions(dir)) + kv, _ := Open(getTestOptions(dir)) defer kv.Close() bkey := func(i int) []byte { @@ -483,7 +483,7 @@ func TestLoad(t *testing.T) { defer os.RemoveAll(dir) n := 10000 { - kv, _ := NewKV(getTestOptions(dir)) + kv, _ := Open(getTestOptions(dir)) for i := 0; i < n; i++ { if (i % 10000) == 0 { fmt.Printf("Putting i=%d\n", i) @@ -494,7 +494,7 @@ func TestLoad(t *testing.T) { kv.Close() } - kv, err := NewKV(getTestOptions(dir)) + kv, err := Open(getTestOptions(dir)) require.NoError(t, err) require.Equal(t, uint64(10001), kv.txnState.readTs()) for i := 0; i < n; i++ { @@ -538,7 +538,7 @@ func TestIterateDeleted(t *testing.T) { opt.SyncWrites = true opt.Dir = dir opt.ValueDir = dir - ps, err := NewKV(&opt) + ps, err := Open(&opt) require.NoError(t, err) defer ps.Close() txnSet(t, ps, []byte("Key1"), []byte("Value1"), 0x00) @@ -589,7 +589,7 @@ func TestIterateDeleted(t *testing.T) { } func TestDirNotExists(t *testing.T) { - _, err := NewKV(getTestOptions("not-exists")) + _, err := Open(getTestOptions("not-exists")) require.Error(t, err) } @@ -601,7 +601,7 @@ func TestDeleteWithoutSyncWrite(t *testing.T) { *opt = DefaultOptions opt.Dir = dir opt.ValueDir = dir - kv, err := NewKV(opt) + kv, err := Open(opt) if err != nil { t.Error(err) t.Fail() @@ -614,7 +614,7 @@ func TestDeleteWithoutSyncWrite(t *testing.T) { kv.Close() // Reopen KV - kv, err = NewKV(opt) + kv, err = Open(opt) if err != nil { t.Error(err) t.Fail() @@ -633,10 +633,10 @@ func TestPidFile(t *testing.T) { require.NoError(t, err) defer os.RemoveAll(dir) options := getTestOptions(dir) - kv1, err := NewKV(options) + kv1, err := Open(options) require.NoError(t, err) defer kv1.Close() - _, err = NewKV(options) + _, err = Open(options) require.Error(t, err) require.Contains(t, err.Error(), "Another process is using this Badger database") } @@ -647,7 +647,7 @@ func TestBigKeyValuePairs(t *testing.T) { defer os.RemoveAll(dir) opt := getTestOptions(dir) - kv, err := NewKV(opt) + kv, err := Open(opt) require.NoError(t, err) bigK := make([]byte, maxKeySize+1) @@ -673,7 +673,7 @@ func TestIteratorPrefetchSize(t *testing.T) { dir, err := ioutil.TempDir("", "badger") require.NoError(t, err) defer os.RemoveAll(dir) - kv, _ := NewKV(getTestOptions(dir)) + kv, _ := Open(getTestOptions(dir)) defer kv.Close() bkey := func(i int) []byte { @@ -720,7 +720,7 @@ func TestSetIfAbsentAsync(t *testing.T) { dir, err := ioutil.TempDir("", "badger") require.NoError(t, err) defer os.RemoveAll(dir) - kv, _ := NewKV(getTestOptions(dir)) + kv, _ := Open(getTestOptions(dir)) bkey := func(i int) []byte { return []byte(fmt.Sprintf("%09d", i)) @@ -741,7 +741,7 @@ func TestSetIfAbsentAsync(t *testing.T) { } require.NoError(t, kv.Close()) - kv, err = NewKV(getTestOptions(dir)) + kv, err = Open(getTestOptions(dir)) require.NoError(t, err) opt := DefaultIteratorOptions @@ -763,7 +763,7 @@ func TestGetSetRace(t *testing.T) { dir, err := ioutil.TempDir("", "badger") require.NoError(t, err) defer os.RemoveAll(dir) - kv, _ := NewKV(getTestOptions(dir)) + kv, _ := Open(getTestOptions(dir)) data := make([]byte, 4096) _, err = rand.Read(data) diff --git a/errors.go b/errors.go index 802c595a0..4fd83874c 100644 --- a/errors.go +++ b/errors.go @@ -65,7 +65,7 @@ var ( "Value log GC attempt didn't result in any cleanup") // ErrRejected is returned if a value log GC is called either while another GC is running, or - // after KV::Close has been called. + // after DB::Close has been called. ErrRejected = errors.New("Value log GC request rejected") // ErrInvalidRequest is returned if the user request is invalid. diff --git a/iterator.go b/iterator.go index 1f74d6cb0..d0380b904 100644 --- a/iterator.go +++ b/iterator.go @@ -32,36 +32,36 @@ const ( prefetched ) -// KVItem is returned during iteration. Both the Key() and Value() output is only valid until +// Item is returned during iteration. Both the Key() and Value() output is only valid until // iterator.Next() is called. -type KVItem struct { +type Item struct { status prefetchStatus err error wg sync.WaitGroup - kv *KV + db *DB key []byte vptr []byte meta byte userMeta byte val []byte slice *y.Slice // Used only during prefetching. - next *KVItem + next *Item version uint64 txn *Txn } -func (item *KVItem) ToString() string { +func (item *Item) ToString() string { return fmt.Sprintf("key=%q, version=%d, meta=%x", item.Key(), item.Version(), item.meta) } // Key returns the key. Remember to copy if you need to access it outside the iteration loop. -func (item *KVItem) Key() []byte { +func (item *Item) Key() []byte { return item.key } // Version returns the commit timestamp of the item. -func (item *KVItem) Version() uint64 { +func (item *Item) Version() uint64 { return item.version } @@ -73,7 +73,7 @@ func (item *KVItem) Version() uint64 { // // Remember to parse or copy it if you need to reuse it. DO NOT modify or // append to this slice; it would result in a panic. -func (item *KVItem) Value() ([]byte, error) { +func (item *Item) Value() ([]byte, error) { item.wg.Wait() if item.status == prefetched { return item.val, item.err @@ -85,7 +85,7 @@ func (item *KVItem) Value() ([]byte, error) { return buf, err } -func (item *KVItem) hasValue() bool { +func (item *Item) hasValue() bool { if item.meta == 0 && item.vptr == nil { // key not found return false @@ -97,7 +97,7 @@ func (item *KVItem) hasValue() bool { return true } -func (item *KVItem) yieldItemValue() ([]byte, func(), error) { +func (item *Item) yieldItemValue() ([]byte, func(), error) { if !item.hasValue() { return nil, nil, nil } @@ -114,7 +114,7 @@ func (item *KVItem) yieldItemValue() ([]byte, func(), error) { var vp valuePointer vp.Decode(item.vptr) - return item.kv.vlog.Read(vp) + return item.db.vlog.Read(vp) } func runCallback(cb func()) { @@ -123,7 +123,7 @@ func runCallback(cb func()) { } } -func (item *KVItem) prefetchValue() { +func (item *Item) prefetchValue() { val, cb, err := item.yieldItemValue() defer runCallback(cb) @@ -142,7 +142,7 @@ func (item *KVItem) prefetchValue() { // This can be called while iterating through a store to quickly estimate the // size of a range of key-value pairs (without fetching the corresponding // values). -func (item *KVItem) EstimatedSize() int64 { +func (item *Item) EstimatedSize() int64 { if !item.hasValue() { return 0 } @@ -156,17 +156,17 @@ func (item *KVItem) EstimatedSize() int64 { // UserMeta returns the userMeta set by the user. Typically, this byte, optionally set by the user // is used to interpret the value. -func (item *KVItem) UserMeta() byte { +func (item *Item) UserMeta() byte { return item.userMeta } // TODO: Switch this to use linked list container in Go. type list struct { - head *KVItem - tail *KVItem + head *Item + tail *Item } -func (l *list) push(i *KVItem) { +func (l *list) push(i *Item) { i.next = nil if l.tail == nil { l.head = i @@ -177,7 +177,7 @@ func (l *list) push(i *KVItem) { l.tail = i } -func (l *list) pop() *KVItem { +func (l *list) pop() *Item { if l.head == nil { return nil } @@ -217,24 +217,24 @@ type Iterator struct { readTs uint64 opt IteratorOptions - item *KVItem + item *Item data list waste list lastKey []byte // Used to skip over multiple versions of the same key. } -func (it *Iterator) newItem() *KVItem { +func (it *Iterator) newItem() *Item { item := it.waste.pop() if item == nil { - item = &KVItem{slice: new(y.Slice), kv: it.txn.kv, txn: it.txn} + item = &Item{slice: new(y.Slice), db: it.txn.db, txn: it.txn} } return item } // Item returns pointer to the current KVItem. // This item is only valid until it.Next() gets called. -func (it *Iterator) Item() *KVItem { +func (it *Iterator) Item() *Item { tx := it.txn if tx.update { // Track reads if this is an update txn. @@ -256,7 +256,7 @@ func (it *Iterator) ValidForPrefix(prefix []byte) bool { func (it *Iterator) Close() { it.iitr.Close() // TODO: We could handle this error. - _ = it.txn.kv.vlog.decrIteratorCount() + _ = it.txn.db.vlog.decrIteratorCount() } // Next would advance the iterator by one. Always check it.Valid() after a Next() @@ -288,7 +288,7 @@ func (it *Iterator) parseItem() bool { mi := it.iitr key := mi.Key() - setItem := func(item *KVItem) { + setItem := func(item *Item) { if it.item == nil { it.item = item } else { @@ -362,7 +362,7 @@ FILL: return true } -func (it *Iterator) fill(item *KVItem) { +func (it *Iterator) fill(item *Item) { vs := it.iitr.Value() item.meta = vs.Meta item.userMeta = vs.UserMeta diff --git a/level_handler.go b/level_handler.go index 0dbd4201d..63bc25556 100644 --- a/level_handler.go +++ b/level_handler.go @@ -40,7 +40,7 @@ type levelHandler struct { level int strLevel string maxTotalSize int64 - kv *KV + db *DB } func (s *levelHandler) getTotalSize() int64 { @@ -155,11 +155,11 @@ func decrRefs(tables []*table.Table) error { return nil } -func newLevelHandler(kv *KV, level int) *levelHandler { +func newLevelHandler(db *DB, level int) *levelHandler { return &levelHandler{ level: level, strLevel: fmt.Sprintf("l%d", level), - kv: kv, + db: db, } } @@ -169,7 +169,7 @@ func (s *levelHandler) tryAddLevel0Table(t *table.Table) bool { // Need lock as we may be deleting the first table during a level 0 compaction. s.Lock() defer s.Unlock() - if len(s.tables) >= s.kv.opt.NumLevelZeroTablesStall { + if len(s.tables) >= s.db.opt.NumLevelZeroTablesStall { return false } diff --git a/levels.go b/levels.go index 726a0dda2..07b891e8b 100644 --- a/levels.go +++ b/levels.go @@ -37,7 +37,7 @@ type levelsController struct { // The following are initialized once and const. levels []*levelHandler - kv *KV + kv *DB nextFileID uint64 // Atomic @@ -55,7 +55,7 @@ var ( // revertToManifest checks that all necessary table files exist and removes all table files not // referenced by the manifest. idMap is a set of table file id's that were read from the directory // listing. -func revertToManifest(kv *KV, mf *Manifest, idMap map[uint64]struct{}) error { +func revertToManifest(kv *DB, mf *Manifest, idMap map[uint64]struct{}) error { // 1. Check all files in manifest exist. for id := range mf.Tables { if _, ok := idMap[id]; !ok { @@ -77,7 +77,7 @@ func revertToManifest(kv *KV, mf *Manifest, idMap map[uint64]struct{}) error { return nil } -func newLevelsController(kv *KV, mf *Manifest) (*levelsController, error) { +func newLevelsController(kv *DB, mf *Manifest) (*levelsController, error) { y.AssertTrue(kv.opt.NumLevelZeroTablesStall > kv.opt.NumLevelZeroTables) s := &levelsController{ kv: kv, diff --git a/manifest_test.go b/manifest_test.go index 6f631aae5..df699d295 100644 --- a/manifest_test.go +++ b/manifest_test.go @@ -41,7 +41,7 @@ func TestManifestBasic(t *testing.T) { opt := getTestOptions(dir) { - kv, err := NewKV(opt) + kv, err := Open(opt) require.NoError(t, err) n := 5000 for i := 0; i < n; i++ { @@ -56,7 +56,7 @@ func TestManifestBasic(t *testing.T) { require.NoError(t, kv.Close()) } - kv, err := NewKV(opt) + kv, err := Open(opt) require.NoError(t, err) require.NoError(t, kv.View(func(txn *Txn) error { @@ -76,7 +76,7 @@ func helpTestManifestFileCorruption(t *testing.T, off int64, errorContent string opt := getTestOptions(dir) { - kv, err := NewKV(opt) + kv, err := Open(opt) require.NoError(t, err) require.NoError(t, kv.Close()) } @@ -86,7 +86,7 @@ func helpTestManifestFileCorruption(t *testing.T, off int64, errorContent string _, err = fp.WriteAt([]byte{'X'}, off) require.NoError(t, err) require.NoError(t, fp.Close()) - kv, err := NewKV(opt) + kv, err := Open(opt) defer func() { if kv != nil { kv.Close() @@ -163,7 +163,7 @@ func TestOverlappingKeyRangeError(t *testing.T) { opt := DefaultOptions opt.Dir = dir opt.ValueDir = dir - kv, err := NewKV(&opt) + kv, err := Open(&opt) require.NoError(t, err) lh0 := newLevelHandler(kv, 0) diff --git a/transaction.go b/transaction.go index bc5978563..97b7f9b94 100644 --- a/transaction.go +++ b/transaction.go @@ -168,7 +168,7 @@ type Txn struct { pendingWrites map[string]*entry // cache stores any writes done by txn. - kv *KV + db *DB callbacks []func() discarded bool } @@ -189,8 +189,8 @@ func (txn *Txn) Set(key, val []byte, userMeta byte) error { return ErrEmptyKey } else if len(key) > maxKeySize { return exceedsMaxKeySizeError(key) - } else if int64(len(val)) > txn.kv.opt.ValueLogFileSize { - return exceedsMaxValueSizeError(val, txn.kv.opt.ValueLogFileSize) + } else if int64(len(val)) > txn.db.opt.ValueLogFileSize { + return exceedsMaxValueSizeError(val, txn.db.opt.ValueLogFileSize) } fp := farm.Fingerprint64(key) // Avoid dealing with byte arrays. @@ -230,9 +230,9 @@ func (txn *Txn) Delete(key []byte) error { return nil } -// Get looks for key and returns a KVItem. +// Get looks for key and returns a Item. // If key is not found, ErrKeyNotFound is returned. -func (txn *Txn) Get(key []byte) (item KVItem, rerr error) { +func (txn *Txn) Get(key []byte) (item Item, rerr error) { if len(key) == 0 { return item, ErrEmptyKey } else if txn.discarded { @@ -248,7 +248,7 @@ func (txn *Txn) Get(key []byte) (item KVItem, rerr error) { item.key = key item.status = prefetched item.version = txn.readTs - // We probably don't need to set KV on item here. + // We probably don't need to set db on item here. return item, nil } // Only track reads if this is update txn. No need to track read if txn serviced it @@ -258,9 +258,9 @@ func (txn *Txn) Get(key []byte) (item KVItem, rerr error) { } seek := y.KeyWithTs(key, txn.readTs) - vs, err := txn.kv.get(seek) + vs, err := txn.db.get(seek) if err != nil { - return item, errors.Wrapf(err, "KV::Get key: %q", key) + return item, errors.Wrapf(err, "DB::Get key: %q", key) } if vs.Value == nil && vs.Meta == 0 { return item, ErrKeyNotFound @@ -273,7 +273,7 @@ func (txn *Txn) Get(key []byte) (item KVItem, rerr error) { item.version = vs.Version item.meta = vs.Meta item.userMeta = vs.UserMeta - item.kv = txn.kv + item.db = txn.db item.vptr = vs.Value item.txn = txn return item, nil @@ -289,7 +289,7 @@ func (txn *Txn) Discard() { cb() } if txn.update { - txn.kv.txnState.decrRef() + txn.db.txnState.decrRef() } } @@ -313,7 +313,7 @@ func (txn *Txn) Commit(callback func(error)) error { return nil // Nothing to do. } - state := txn.kv.txnState + state := txn.db.txnState commitTs := state.newCommitTs(txn) if commitTs == 0 { return ErrConflict @@ -340,9 +340,9 @@ func (txn *Txn) Commit(callback func(error)) error { // TODO: What if some of the txns successfully make it to value log, but others fail. // Nothing gets updated to LSM, until a restart happens. - return txn.kv.batchSet(entries) + return txn.db.batchSet(entries) } - return txn.kv.batchSetAsync(entries, callback) + return txn.db.batchSetAsync(entries, callback) } func (txn *Txn) CommitAt(commitTs uint64, callback func(error)) error { @@ -372,14 +372,14 @@ func (txn *Txn) CommitAt(commitTs uint64, callback func(error)) error { // itr.Close() // TODO: Move this usage to README. func (txn *Txn) NewIterator(opt IteratorOptions) *Iterator { - tables, decr := txn.kv.getMemTables() + tables, decr := txn.db.getMemTables() defer decr() - txn.kv.vlog.incrIteratorCount() + txn.db.vlog.incrIteratorCount() var iters []y.Iterator for i := 0; i < len(tables); i++ { iters = append(iters, tables[i].NewUniIterator(opt.Reverse)) } - iters = txn.kv.lc.appendIterators(iters, opt.Reverse) // This will increment references. + iters = txn.db.lc.appendIterators(iters, opt.Reverse) // This will increment references. res := &Iterator{ txn: txn, iitr: y.NewMergeIterator(iters, opt.Reverse), @@ -389,27 +389,27 @@ func (txn *Txn) NewIterator(opt IteratorOptions) *Iterator { return res } -func (kv *KV) NewTransaction(update bool) *Txn { +func (db *DB) NewTransaction(update bool) *Txn { txn := &Txn{ update: update, - kv: kv, - readTs: kv.txnState.readTs(), + db: db, + readTs: db.txnState.readTs(), } if update { txn.pendingWrites = make(map[string]*entry) - txn.kv.txnState.addRef() + txn.db.txnState.addRef() } return txn } -func (kv *KV) NewTransactionAt(readTs uint64, update bool) *Txn { - txn := kv.NewTransaction(update) +func (db *DB) NewTransactionAt(readTs uint64, update bool) *Txn { + txn := db.NewTransaction(update) txn.readTs = readTs return txn } -func (kv *KV) View(callback func(txn *Txn) error) error { - txn := kv.NewTransaction(false) +func (db *DB) View(callback func(txn *Txn) error) error { + txn := db.NewTransaction(false) defer txn.Discard() return callback(txn) diff --git a/transaction_test.go b/transaction_test.go index 4dba74708..f6921937d 100644 --- a/transaction_test.go +++ b/transaction_test.go @@ -30,7 +30,7 @@ func TestTxnSimple(t *testing.T) { dir, err := ioutil.TempDir("", "badger") require.NoError(t, err) defer os.RemoveAll(dir) - kv, err := NewKV(getTestOptions(dir)) + kv, err := Open(getTestOptions(dir)) require.NoError(t, err) defer kv.Close() @@ -56,7 +56,7 @@ func TestTxnVersions(t *testing.T) { dir, err := ioutil.TempDir("", "badger") require.NoError(t, err) defer os.RemoveAll(dir) - kv, err := NewKV(getTestOptions(dir)) + kv, err := Open(getTestOptions(dir)) require.NoError(t, err) defer kv.Close() @@ -161,7 +161,7 @@ func TestTxnWriteSkew(t *testing.T) { dir, err := ioutil.TempDir("", "badger") require.NoError(t, err) defer os.RemoveAll(dir) - kv, err := NewKV(getTestOptions(dir)) + kv, err := Open(getTestOptions(dir)) require.NoError(t, err) defer kv.Close() @@ -232,7 +232,7 @@ func TestTxnIterationEdgeCase(t *testing.T) { dir, err := ioutil.TempDir("", "badger") require.NoError(t, err) defer os.RemoveAll(dir) - kv, err := NewKV(getTestOptions(dir)) + kv, err := Open(getTestOptions(dir)) require.NoError(t, err) defer kv.Close() @@ -314,7 +314,7 @@ func TestTxnIterationEdgeCase2(t *testing.T) { dir, err := ioutil.TempDir("", "badger") require.NoError(t, err) defer os.RemoveAll(dir) - kv, err := NewKV(getTestOptions(dir)) + kv, err := Open(getTestOptions(dir)) require.NoError(t, err) defer kv.Close() @@ -408,7 +408,7 @@ func TestTxnIterationEdgeCase3(t *testing.T) { dir, err := ioutil.TempDir("", "badger") require.NoError(t, err) defer os.RemoveAll(dir) - kv, err := NewKV(getTestOptions(dir)) + kv, err := Open(getTestOptions(dir)) require.NoError(t, err) defer kv.Close() @@ -473,7 +473,7 @@ func TestTxnManaged(t *testing.T) { dir, err := ioutil.TempDir("", "badger") require.NoError(t, err) defer os.RemoveAll(dir) - kv, err := NewKV(getTestOptions(dir)) + kv, err := Open(getTestOptions(dir)) require.NoError(t, err) defer kv.Close() diff --git a/util.go b/util.go index f371201a0..88fd74dcc 100644 --- a/util.go +++ b/util.go @@ -50,7 +50,7 @@ func (s *levelHandler) getSummary(sum *summary) { } } -func (s *KV) validate() error { return s.lc.validate() } +func (s *DB) validate() error { return s.lc.validate() } func (s *levelsController) validate() error { for _, l := range s.levels { diff --git a/value.go b/value.go index 72d6244d5..f94cbf95a 100644 --- a/value.go +++ b/value.go @@ -439,7 +439,7 @@ type valueLog struct { // A refcount of iterators -- when this hits zero, we can delete the filesToBeDeleted. numActiveIterators int - kv *KV + kv *DB maxFid uint32 writableLogOffset uint32 opt Options @@ -532,7 +532,7 @@ func (vlog *valueLog) createVlogFile(fid uint32) (*logFile, error) { return lf, nil } -func (vlog *valueLog) Open(kv *KV, opt *Options) error { +func (vlog *valueLog) Open(kv *DB, opt *Options) error { vlog.dirPath = opt.ValueDir vlog.opt = *opt vlog.kv = kv diff --git a/value_test.go b/value_test.go index 86c2511c1..00c080c4e 100644 --- a/value_test.go +++ b/value_test.go @@ -32,7 +32,7 @@ func TestValueBasic(t *testing.T) { y.Check(err) defer os.RemoveAll(dir) - kv, _ := NewKV(getTestOptions(dir)) + kv, _ := Open(getTestOptions(dir)) defer kv.Close() log := &kv.vlog @@ -89,7 +89,7 @@ func TestValueGC(t *testing.T) { opt := getTestOptions(dir) opt.ValueLogFileSize = 1 << 20 - kv, _ := NewKV(opt) + kv, _ := Open(opt) defer kv.Close() sz := 32 << 10 @@ -140,7 +140,7 @@ func TestValueGC2(t *testing.T) { opt := getTestOptions(dir) opt.ValueLogFileSize = 1 << 20 - kv, _ := NewKV(opt) + kv, _ := Open(opt) defer kv.Close() sz := 32 << 10 @@ -214,7 +214,7 @@ func TestValueGC3(t *testing.T) { opt := getTestOptions(dir) opt.ValueLogFileSize = 1 << 20 - kv, err := NewKV(opt) + kv, err := Open(opt) require.NoError(t, err) defer kv.Close() @@ -287,7 +287,7 @@ func TestValueGC4(t *testing.T) { opt := getTestOptions(dir) opt.ValueLogFileSize = 1 << 20 - kv, _ := NewKV(opt) + kv, _ := Open(opt) defer kv.Close() sz := 128 << 10 // 5 entries per value log file. @@ -357,7 +357,7 @@ func TestChecksums(t *testing.T) { // Set up SST with K1=V1 opts := getTestOptions(dir) opts.ValueLogFileSize = 100 * 1024 * 1024 // 100Mb - kv, err := NewKV(opts) + kv, err := Open(opts) require.NoError(t, err) require.NoError(t, kv.Close()) @@ -384,7 +384,7 @@ func TestChecksums(t *testing.T) { require.NoError(t, ioutil.WriteFile(vlogFilePath(dir, 0), buf, 0777)) // K1 should exist, but K2 shouldn't. - kv, err = NewKV(opts) + kv, err = Open(opts) require.NoError(t, err) require.NoError(t, kv.View(func(txn *Txn) error { @@ -406,7 +406,7 @@ func TestChecksums(t *testing.T) { // The vlog should contain K0 and K3 (K1 and k2 was lost when Badger started up // last due to checksum failure). - kv, err = NewKV(opts) + kv, err = Open(opts) require.NoError(t, err) { @@ -439,7 +439,7 @@ func TestPartialAppendToValueLog(t *testing.T) { // Create skeleton files. opts := getTestOptions(dir) opts.ValueLogFileSize = 100 * 1024 * 1024 // 100Mb - kv, err := NewKV(opts) + kv, err := Open(opts) require.NoError(t, err) require.NoError(t, kv.Close()) @@ -467,7 +467,7 @@ func TestPartialAppendToValueLog(t *testing.T) { require.NoError(t, ioutil.WriteFile(vlogFilePath(dir, 0), buf, 0777)) // Badger should now start up - kv, err = NewKV(opts) + kv, err = Open(opts) require.NoError(t, err) require.NoError(t, kv.View(func(txn *Txn) error { @@ -485,7 +485,7 @@ func TestPartialAppendToValueLog(t *testing.T) { // When K3 is set, it should be persisted after a restart. txnSet(t, kv, k3, v3, 0) require.NoError(t, kv.Close()) - kv, err = NewKV(getTestOptions(dir)) + kv, err = Open(getTestOptions(dir)) require.NoError(t, err) checkKeys(t, kv, [][]byte{k3}) require.NoError(t, kv.Close()) @@ -498,7 +498,7 @@ func TestValueLogTrigger(t *testing.T) { opt := getTestOptions(dir) opt.ValueLogFileSize = 1 << 20 - kv, err := NewKV(opt) + kv, err := Open(opt) require.NoError(t, err) // Write a lot of data, so it creates some work for valug log GC. @@ -535,7 +535,7 @@ func TestValueLogTrigger(t *testing.T) { require.NoError(t, kv.Close()) err = kv.RunValueLogGC(0.5) - require.Equal(t, ErrRejected, err, "Error should be returned after closing KV.") + require.Equal(t, ErrRejected, err, "Error should be returned after closing DB.") } func createVlog(t *testing.T, entries []*entry) []byte { @@ -545,7 +545,7 @@ func createVlog(t *testing.T, entries []*entry) []byte { opts := getTestOptions(dir) opts.ValueLogFileSize = 100 * 1024 * 1024 // 100Mb - kv, err := NewKV(opts) + kv, err := Open(opts) require.NoError(t, err) txnSet(t, kv, entries[0].Key, entries[0].Value, entries[0].Meta) entries = entries[1:] @@ -562,7 +562,7 @@ func createVlog(t *testing.T, entries []*entry) []byte { return buf } -func checkKeys(t *testing.T, kv *KV, keys [][]byte) { +func checkKeys(t *testing.T, kv *DB, keys [][]byte) { i := 0 txn := kv.NewTransaction(false) iter := txn.NewIterator(IteratorOptions{})