Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement FastWaterMark #96

Merged
merged 1 commit into from
Jul 16, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,9 +240,8 @@ func Open(opt Options) (db *DB, err error) {
isManaged: opt.managedTxns,
nextCommit: 1,
commits: make(map[uint64]uint64),
readMark: y.WaterMark{},
readMark: y.NewFastWaterMark(),
}
orc.readMark.Init()

db = &DB{
imm: make([]*table.MemTable, 0, opt.NumMemtables),
Expand Down
12 changes: 8 additions & 4 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1146,7 +1146,7 @@ func TestMinReadTs(t *testing.T) {
}
time.Sleep(time.Millisecond)
require.Equal(t, uint64(10), db.orc.readTs())
min := db.orc.readMark.MinReadTs()
min := db.orc.readMark.MinReadTS()
require.Equal(t, uint64(9), min)

readTxn := db.NewTransaction(false)
Expand All @@ -1157,18 +1157,22 @@ func TestMinReadTs(t *testing.T) {
}
require.Equal(t, uint64(20), db.orc.readTs())
time.Sleep(time.Millisecond)
require.Equal(t, min, db.orc.readMark.MinReadTs())
require.Equal(t, min, db.orc.readMark.MinReadTS())
readTxn.Discard()
time.Sleep(time.Millisecond)
require.Equal(t, uint64(19), db.orc.readMark.MinReadTs())
require.Equal(t, uint64(10), db.orc.readMark.MinReadTS())
// The minReadTS can only be increase by newer txn done.
readTxn = db.NewTransaction(false)
readTxn.Discard()
require.Equal(t, uint64(20), db.orc.readMark.MinReadTS())

for i := 0; i < 10; i++ {
db.View(func(txn *Txn) error {
return nil
})
}
time.Sleep(time.Millisecond)
require.Equal(t, uint64(20), db.orc.readMark.MinReadTs())
require.Equal(t, uint64(20), db.orc.readMark.MinReadTS())
})
}

Expand Down
2 changes: 1 addition & 1 deletion levels.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ func (lc *levelsController) compactBuildTables(level int, cd compactDef, limiter
// Pick up the currently pending transactions' min readTs, so we can discard versions below this
// readTs. We should never discard any versions starting from above this timestamp, because that
// would affect the snapshot view guarantee provided by transactions.
minReadTs := lc.kv.orc.readMark.MinReadTs()
minReadTs := lc.kv.orc.readMark.MinReadTS()

var filter CompactionFilter
if lc.kv.opt.CompactionFilterFactory != nil {
Expand Down
11 changes: 6 additions & 5 deletions transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type oracle struct {
writeLock sync.Mutex
nextCommit uint64

readMark y.WaterMark
readMark *y.FastWaterMark

// commits stores a key fingerprint and latest commit counter for it.
// refCount is used to clear out commits map to avoid a memory blowup.
Expand Down Expand Up @@ -135,6 +135,7 @@ func (o *oracle) doneCommit(cts uint64) {

// Txn represents a Badger transaction.
type Txn struct {
wmNode *y.WaterMarkNode
readTs uint64
commitTs uint64

Expand Down Expand Up @@ -416,7 +417,7 @@ func (txn *Txn) Discard() {
panic("Unclosed iterator at time of Txn.Discard.")
}
txn.discarded = true
txn.db.orc.readMark.Done(txn.readTs)
txn.db.orc.readMark.Done(txn.wmNode)

if txn.update {
txn.db.orc.decrRef()
Expand Down Expand Up @@ -512,15 +513,15 @@ func (db *DB) NewTransaction(update bool) *Txn {
// DB is read-only, force read-only transaction.
update = false
}

readTS := db.orc.readTs()
txn := &Txn{
update: update,
db: db,
readTs: db.orc.readTs(),
count: 1, // One extra entry for BitFin.
size: int64(len(txnKey) + 10), // Some buffer for the extra entry.
}
db.orc.readMark.Begin(txn.readTs)
txn.wmNode = db.orc.readMark.Begin(readTS)
txn.readTs = txn.wmNode.ReadTS
if update {
txn.pendingWrites = make(map[string]*Entry)
txn.db.orc.addRef()
Expand Down
55 changes: 55 additions & 0 deletions y/wartermark_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package y

import (
"sync/atomic"
"testing"
"unsafe"

"github.com/stretchr/testify/require"
)

func TestFastWaterMark(t *testing.T) {
fwm := NewFastWaterMark()
node := fwm.Begin(100)
fwm.Done(node)
require.True(t, fwm.MinReadTS() == 100)

t1 := fwm.Begin(103)
t2 := fwm.Begin(102)
require.True(t, t2.ReadTS == 103)
t3 := fwm.Begin(105)
fwm.Done(t2)
require.True(t, fwm.MinReadTS() == 100)
require.True(t, t2.next == unsafe.Pointer(t1))
fwm.Done(t1)
require.True(t, fwm.MinReadTS() == 103)
fwm.Done(t3)
require.True(t, fwm.MinReadTS() == 105)
require.True(t, t3.next == nil)
}

var counter uint64

func BenchmarkFastWaterMark(b *testing.B) {
b.ReportAllocs()
fwm := NewFastWaterMark()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
node := fwm.Begin(atomic.AddUint64(&counter, 1))
fwm.Done(node)
}
})
}

func BenchmarkWaterMark(b *testing.B) {
b.ReportAllocs()
wm := new(WaterMark)
wm.Init()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
readTS := atomic.AddUint64(&counter, 1)
wm.Begin(readTS)
wm.Done(readTS)
}
})
}
67 changes: 67 additions & 0 deletions y/watermark.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package y
import (
"container/heap"
"sync/atomic"
"unsafe"
)

type uint64Heap []uint64
Expand Down Expand Up @@ -118,3 +119,69 @@ func (w *WaterMark) process() {
}
}
}

type FastWaterMark struct {
head unsafe.Pointer
minReadTS uint64
}

type WaterMarkNode struct {
ReadTS uint64
isAlive uint64
next unsafe.Pointer
}

func NewFastWaterMark() *FastWaterMark {
return &FastWaterMark{
head: unsafe.Pointer(&WaterMarkNode{}),
}
}

// Begin marks readTS to prevent the SST files being deleted while reading.
// The real readTS to use is the returned WaterMarkNode.ReadTS to make sure readTS is increasing monotonically.
// MinReadTS will be less than or equal to readTS until Done is called.
func (wm *FastWaterMark) Begin(readTSHint uint64) *WaterMarkNode {
n := &WaterMarkNode{
isAlive: 1,
}
for {
headPtr := atomic.LoadPointer(&wm.head)
head := (*WaterMarkNode)(headPtr)
if head.ReadTS >= readTSHint {
// Make sure the ReadTS is increasing monotonically.
n.ReadTS = head.ReadTS
} else {
n.ReadTS = readTSHint
}
n.next = headPtr
if atomic.CompareAndSwapPointer(&wm.head, headPtr, unsafe.Pointer(n)) {
return n
}
}
}

// Done unmark the WaterMarkNode.ReadTS, it may increase the MinReadTS if the all the older WaterMarkNode is dead.
func (wm *FastWaterMark) Done(n *WaterMarkNode) {
next := (*WaterMarkNode)(atomic.LoadPointer(&n.next))
var nextChanged bool
for {
if next == nil {
atomic.StoreUint64(&wm.minReadTS, n.ReadTS)
break
}
if atomic.LoadUint64(&next.isAlive) > 0 {
break
}
next = (*WaterMarkNode)(atomic.LoadPointer(&next.next))
nextChanged = true
}
if nextChanged {
atomic.StorePointer(&n.next, unsafe.Pointer(next))
}
atomic.StoreUint64(&n.isAlive, 0)
}

// MinReadTS returns the minimum readTS in used, so older version files can be safely deleted.
func (wm *FastWaterMark) MinReadTS() uint64 {
return atomic.LoadUint64(&wm.minReadTS)
}