Skip to content

Commit

Permalink
implement FastWaterMark (#96)
Browse files Browse the repository at this point in the history
  • Loading branch information
coocood authored Jul 16, 2019
1 parent 4b97d99 commit 37ea3d3
Show file tree
Hide file tree
Showing 6 changed files with 138 additions and 12 deletions.
3 changes: 1 addition & 2 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,9 +240,8 @@ func Open(opt Options) (db *DB, err error) {
isManaged: opt.managedTxns,
nextCommit: 1,
commits: make(map[uint64]uint64),
readMark: y.WaterMark{},
readMark: y.NewFastWaterMark(),
}
orc.readMark.Init()

db = &DB{
imm: make([]*table.MemTable, 0, opt.NumMemtables),
Expand Down
12 changes: 8 additions & 4 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1146,7 +1146,7 @@ func TestMinReadTs(t *testing.T) {
}
time.Sleep(time.Millisecond)
require.Equal(t, uint64(10), db.orc.readTs())
min := db.orc.readMark.MinReadTs()
min := db.orc.readMark.MinReadTS()
require.Equal(t, uint64(9), min)

readTxn := db.NewTransaction(false)
Expand All @@ -1157,18 +1157,22 @@ func TestMinReadTs(t *testing.T) {
}
require.Equal(t, uint64(20), db.orc.readTs())
time.Sleep(time.Millisecond)
require.Equal(t, min, db.orc.readMark.MinReadTs())
require.Equal(t, min, db.orc.readMark.MinReadTS())
readTxn.Discard()
time.Sleep(time.Millisecond)
require.Equal(t, uint64(19), db.orc.readMark.MinReadTs())
require.Equal(t, uint64(10), db.orc.readMark.MinReadTS())
// The minReadTS can only be increase by newer txn done.
readTxn = db.NewTransaction(false)
readTxn.Discard()
require.Equal(t, uint64(20), db.orc.readMark.MinReadTS())

for i := 0; i < 10; i++ {
db.View(func(txn *Txn) error {
return nil
})
}
time.Sleep(time.Millisecond)
require.Equal(t, uint64(20), db.orc.readMark.MinReadTs())
require.Equal(t, uint64(20), db.orc.readMark.MinReadTS())
})
}

Expand Down
2 changes: 1 addition & 1 deletion levels.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ func (lc *levelsController) compactBuildTables(level int, cd compactDef, limiter
// Pick up the currently pending transactions' min readTs, so we can discard versions below this
// readTs. We should never discard any versions starting from above this timestamp, because that
// would affect the snapshot view guarantee provided by transactions.
minReadTs := lc.kv.orc.readMark.MinReadTs()
minReadTs := lc.kv.orc.readMark.MinReadTS()

var filter CompactionFilter
if lc.kv.opt.CompactionFilterFactory != nil {
Expand Down
11 changes: 6 additions & 5 deletions transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type oracle struct {
writeLock sync.Mutex
nextCommit uint64

readMark y.WaterMark
readMark *y.FastWaterMark

// commits stores a key fingerprint and latest commit counter for it.
// refCount is used to clear out commits map to avoid a memory blowup.
Expand Down Expand Up @@ -135,6 +135,7 @@ func (o *oracle) doneCommit(cts uint64) {

// Txn represents a Badger transaction.
type Txn struct {
wmNode *y.WaterMarkNode
readTs uint64
commitTs uint64

Expand Down Expand Up @@ -416,7 +417,7 @@ func (txn *Txn) Discard() {
panic("Unclosed iterator at time of Txn.Discard.")
}
txn.discarded = true
txn.db.orc.readMark.Done(txn.readTs)
txn.db.orc.readMark.Done(txn.wmNode)

if txn.update {
txn.db.orc.decrRef()
Expand Down Expand Up @@ -512,15 +513,15 @@ func (db *DB) NewTransaction(update bool) *Txn {
// DB is read-only, force read-only transaction.
update = false
}

readTS := db.orc.readTs()
txn := &Txn{
update: update,
db: db,
readTs: db.orc.readTs(),
count: 1, // One extra entry for BitFin.
size: int64(len(txnKey) + 10), // Some buffer for the extra entry.
}
db.orc.readMark.Begin(txn.readTs)
txn.wmNode = db.orc.readMark.Begin(readTS)
txn.readTs = txn.wmNode.ReadTS
if update {
txn.pendingWrites = make(map[string]*Entry)
txn.db.orc.addRef()
Expand Down
55 changes: 55 additions & 0 deletions y/wartermark_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package y

import (
"sync/atomic"
"testing"
"unsafe"

"github.com/stretchr/testify/require"
)

func TestFastWaterMark(t *testing.T) {
fwm := NewFastWaterMark()
node := fwm.Begin(100)
fwm.Done(node)
require.True(t, fwm.MinReadTS() == 100)

t1 := fwm.Begin(103)
t2 := fwm.Begin(102)
require.True(t, t2.ReadTS == 103)
t3 := fwm.Begin(105)
fwm.Done(t2)
require.True(t, fwm.MinReadTS() == 100)
require.True(t, t2.next == unsafe.Pointer(t1))
fwm.Done(t1)
require.True(t, fwm.MinReadTS() == 103)
fwm.Done(t3)
require.True(t, fwm.MinReadTS() == 105)
require.True(t, t3.next == nil)
}

var counter uint64

func BenchmarkFastWaterMark(b *testing.B) {
b.ReportAllocs()
fwm := NewFastWaterMark()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
node := fwm.Begin(atomic.AddUint64(&counter, 1))
fwm.Done(node)
}
})
}

func BenchmarkWaterMark(b *testing.B) {
b.ReportAllocs()
wm := new(WaterMark)
wm.Init()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
readTS := atomic.AddUint64(&counter, 1)
wm.Begin(readTS)
wm.Done(readTS)
}
})
}
67 changes: 67 additions & 0 deletions y/watermark.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package y
import (
"container/heap"
"sync/atomic"
"unsafe"
)

type uint64Heap []uint64
Expand Down Expand Up @@ -118,3 +119,69 @@ func (w *WaterMark) process() {
}
}
}

type FastWaterMark struct {
head unsafe.Pointer
minReadTS uint64
}

type WaterMarkNode struct {
ReadTS uint64
isAlive uint64
next unsafe.Pointer
}

func NewFastWaterMark() *FastWaterMark {
return &FastWaterMark{
head: unsafe.Pointer(&WaterMarkNode{}),
}
}

// Begin marks readTS to prevent the SST files being deleted while reading.
// The real readTS to use is the returned WaterMarkNode.ReadTS to make sure readTS is increasing monotonically.
// MinReadTS will be less than or equal to readTS until Done is called.
func (wm *FastWaterMark) Begin(readTSHint uint64) *WaterMarkNode {
n := &WaterMarkNode{
isAlive: 1,
}
for {
headPtr := atomic.LoadPointer(&wm.head)
head := (*WaterMarkNode)(headPtr)
if head.ReadTS >= readTSHint {
// Make sure the ReadTS is increasing monotonically.
n.ReadTS = head.ReadTS
} else {
n.ReadTS = readTSHint
}
n.next = headPtr
if atomic.CompareAndSwapPointer(&wm.head, headPtr, unsafe.Pointer(n)) {
return n
}
}
}

// Done unmark the WaterMarkNode.ReadTS, it may increase the MinReadTS if the all the older WaterMarkNode is dead.
func (wm *FastWaterMark) Done(n *WaterMarkNode) {
next := (*WaterMarkNode)(atomic.LoadPointer(&n.next))
var nextChanged bool
for {
if next == nil {
atomic.StoreUint64(&wm.minReadTS, n.ReadTS)
break
}
if atomic.LoadUint64(&next.isAlive) > 0 {
break
}
next = (*WaterMarkNode)(atomic.LoadPointer(&next.next))
nextChanged = true
}
if nextChanged {
atomic.StorePointer(&n.next, unsafe.Pointer(next))
}
atomic.StoreUint64(&n.isAlive, 0)
}

// MinReadTS returns the minimum readTS in used, so older version files can be safely deleted.
func (wm *FastWaterMark) MinReadTS() uint64 {
return atomic.LoadUint64(&wm.minReadTS)
}

0 comments on commit 37ea3d3

Please sign in to comment.