Skip to content

Commit

Permalink
commit bbolt transaction if there is any pending deleting operations
Browse files Browse the repository at this point in the history
Signed-off-by: Siyuan Zhang <sizhang@google.com>
  • Loading branch information
siyuanfoundation committed Jan 6, 2024
1 parent d0fb754 commit 8c14477
Showing 1 changed file with 21 additions and 2 deletions.
23 changes: 21 additions & 2 deletions server/storage/backend/batch_tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,8 @@ func (t *batchTx) commit(stop bool) {

type batchTxBuffered struct {
batchTx
buf txWriteBuffer
buf txWriteBuffer
pendingDeleteOperations int
}

func newBatchTxBuffered(backend *backend) *batchTxBuffered {
Expand All @@ -310,7 +311,14 @@ func (t *batchTxBuffered) Unlock() {
t.buf.writeback(&t.backend.readTx.buf)
// gofail: var afterWritebackBuf struct{}
t.backend.readTx.Unlock()
if t.pending >= t.backend.batchLimit {
// We need to commit the transaction if
// 1. t.pending >= t.backend.batchLimit for performance,
// because We don't want buffer to grow too big as it has worse complexity.
// 2. there is any pending deleting for correctness,
// because batchTxBuffered doesn't keep track of deletions.
// Please also refer to
// https://github.com/etcd-io/etcd/pull/17119#issuecomment-1857547158
if t.pending >= t.backend.batchLimit || t.pendingDeleteOperations > 0 {
t.commit(false)
}
}
Expand Down Expand Up @@ -356,6 +364,7 @@ func (t *batchTxBuffered) unsafeCommit(stop bool) {
}

t.batchTx.commit(stop)
t.pendingDeleteOperations = 0

if !stop {
t.backend.readTx.tx = t.backend.begin(false)
Expand All @@ -371,3 +380,13 @@ func (t *batchTxBuffered) UnsafeSeqPut(bucket Bucket, key []byte, value []byte)
t.batchTx.UnsafeSeqPut(bucket, key, value)
t.buf.putSeq(bucket, key, value)
}

func (t *batchTxBuffered) UnsafeDelete(bucketType Bucket, key []byte) {
t.batchTx.UnsafeDelete(bucketType, key)
t.pendingDeleteOperations++
}

func (t *batchTxBuffered) UnsafeDeleteBucket(bucket Bucket) {
t.batchTx.UnsafeDeleteBucket(bucket)
t.pendingDeleteOperations++
}

0 comments on commit 8c14477

Please sign in to comment.