Skip to content

Commit 6a32b8a

Browse files
committed
add write capacity options
1 parent a3bfc41 commit 6a32b8a

File tree

3 files changed

+18
-9
lines changed

3 files changed

+18
-9
lines changed

db.go

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -92,10 +92,6 @@ type DB struct {
9292
pub *publisher
9393
}
9494

95-
const (
96-
kvWriteChCapacity = 1000
97-
)
98-
9995
func (db *DB) replayFunction() func(Entry, valuePointer) error {
10096
type txnEntry struct {
10197
nk []byte
@@ -282,7 +278,7 @@ func Open(opt Options) (db *DB, err error) {
282278
db = &DB{
283279
imm: make([]*skl.Skiplist, 0, opt.NumMemtables),
284280
flushChan: make(chan flushTask, opt.NumMemtables),
285-
writeCh: make(chan *request, kvWriteChCapacity),
281+
writeCh: make(chan *request, opt.KVWriteCapacity),
286282
opt: opt,
287283
manifest: manifestFile,
288284
elog: elog,
@@ -344,7 +340,7 @@ func Open(opt Options) (db *DB, err error) {
344340
db.orc.readMark.Done(db.orc.nextTxnTs)
345341
db.orc.incrementNextTs()
346342

347-
db.writeCh = make(chan *request, kvWriteChCapacity)
343+
db.writeCh = make(chan *request, opt.KVWriteCapacity)
348344
db.closers.writes = y.NewCloser(1)
349345
go db.doWrites(db.closers.writes)
350346

@@ -710,6 +706,7 @@ func (db *DB) doWrites(lc *y.Closer) {
710706
reqLen := new(expvar.Int)
711707
y.PendingWrites.Set(db.opt.Dir, reqLen)
712708

709+
writeCapacity := 3 * db.opt.KVWriteCapacity
713710
reqs := make([]*request, 0, 10)
714711
for {
715712
var r *request
@@ -723,7 +720,7 @@ func (db *DB) doWrites(lc *y.Closer) {
723720
reqs = append(reqs, r)
724721
reqLen.Set(int64(len(reqs)))
725722

726-
if len(reqs) >= 3*kvWriteChCapacity {
723+
if len(reqs) >= writeCapacity {
727724
pendingCh <- struct{}{} // blocking.
728725
goto writeCase
729726
}

options.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,9 @@ type Options struct {
7272
// the same directory. Use this options with caution.
7373
BypassLockGuard bool
7474

75+
// KVWriteCapacity defines the capacity channel size for write
76+
KVWriteCapacity int
77+
7578
// Transaction start and commit timestamps are managed by end-user.
7679
// This is only useful for databases built on top of Badger (like Dgraph).
7780
// Not recommended for most users.
@@ -118,6 +121,7 @@ func DefaultOptions(path string) Options {
118121
Logger: defaultLogger,
119122
EventLogging: true,
120123
LogRotatesToFlush: 2,
124+
KVWriteCapacity: 1000,
121125
}
122126
}
123127

@@ -419,3 +423,11 @@ func (opt Options) WithBypassLockGuard(b bool) Options {
419423
opt.BypassLockGuard = b
420424
return opt
421425
}
426+
427+
// WithWriteCapacity returns a new Options value with WriteCapacity set to the given
428+
//
429+
// When write channel is full, Badger will block until write channel is flushed out.
430+
func (opt Options) WithWriteCapacity(writeCapacity int) Options {
431+
opt.KVWriteCapacity = writeCapacity
432+
return opt
433+
}

txn.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,8 @@ func newOracle(opt Options) *oracle {
7272
txnMark: &y.WaterMark{Name: "badger.TxnTimestamp"},
7373
closer: y.NewCloser(2),
7474
}
75-
orc.readMark.Init(orc.closer, opt.EventLogging, kvWriteChCapacity)
76-
orc.txnMark.Init(orc.closer, opt.EventLogging, kvWriteChCapacity)
75+
orc.readMark.Init(orc.closer, opt.EventLogging, opt.KVWriteCapacity)
76+
orc.txnMark.Init(orc.closer, opt.EventLogging, opt.KVWriteCapacity)
7777
return orc
7878
}
7979

0 commit comments

Comments
 (0)