Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions internal/client/client_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ func (b *batchCommandsBuilder) hasHighPriorityTask() bool {
func (b *batchCommandsBuilder) buildWithLimit(limit int64, collect func(id uint64, e *batchCommandsEntry),
) (*tikvpb.BatchCommandsRequest, map[string]*tikvpb.BatchCommandsRequest) {
count := int64(0)
exhaust := false
build := func(reqs []Item) {
for _, e := range reqs {
e := e.(*batchCommandsEntry)
Expand Down Expand Up @@ -148,17 +149,17 @@ func (b *batchCommandsBuilder) buildWithLimit(limit int64, collect func(id uint6
}
b.idAlloc++
}
exhaust = len(reqs) == 0
}
for (count < limit && b.entries.Len() > 0) || b.hasHighPriorityTask() {
n := limit
if limit == 0 {
n = 1
}
reqs := b.entries.Take(int(n))
if len(reqs) == 0 {
b.entries.Take(int(n), build)
if exhaust {
break
}
build(reqs)
}
var req *tikvpb.BatchCommandsRequest
if len(b.requests) > 0 {
Expand Down
22 changes: 17 additions & 5 deletions internal/client/priority_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func (ps *prioritySlice) Pop() interface{} {
old := *ps
n := len(old)
item := old[n-1]
old[n-1] = nil // avoid memory leak
*ps = old[0 : n-1]
return item
}
Expand Down Expand Up @@ -91,21 +92,32 @@ func (pq *PriorityQueue) pop() Item {
return e.(Item)
}

// Take returns the highest priority entries from the priority queue.
func (pq *PriorityQueue) Take(n int) []Item {
// Take returns the highest priority entries from the priority queue and run the provided function on them.
// ref https://github.com/tikv/client-go/issues/1834
// Take retrieves and removes the first n items from the priority queue and passes them to the callback function.
// If n is less than or equal to 0, the function returns without performing any operation.
// If n is greater than or equal to the queue length, all items are removed and passed to the callback.
// Otherwise, exactly n items are popped from the queue and passed to the callback.
// The callback function fn is responsible for processing the items; it receives a slice of Items.
// Ref issue: https://github.com/tikv/client-go/issues/1834
func (pq *PriorityQueue) Take(n int, cb func([]Item)) {
if n <= 0 {
return nil
return
}
if n >= pq.Len() {
ret := pq.ps
pq.ps = pq.ps[:0]
return ret
cb(ret)
for i := range ret {
ret[i] = nil
}
return
} else {
ret := make([]Item, n)
for i := 0; i < n; i++ {
ret[i] = pq.pop()
}
return ret
cb(ret)
}

}
Expand Down
54 changes: 51 additions & 3 deletions internal/client/priority_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,19 +44,24 @@ func TestPriority(t *testing.T) {
re.Equal(uint64(5), aq.highestPriority())
aq.clean()
re.Equal(5, aq.Len())
arr := make([]Item, 0)
collect := func(items []Item) {
arr = arr[:0]
arr = append(arr, items...)
}

arr := aq.Take(1)
aq.Take(1, collect)
re.Len(arr, 1)
re.Equal(uint64(5), arr[0].priority())
re.Equal(uint64(4), aq.highestPriority())

arr = aq.Take(2)
aq.Take(2, collect)
re.Len(arr, 2)
re.Equal(uint64(4), arr[0].priority())
re.Equal(uint64(3), arr[1].priority())
re.Equal(uint64(2), aq.highestPriority())

arr = aq.Take(5)
aq.Take(5, collect)
re.Len(arr, 2)
re.Equal(uint64(2), arr[0].priority())
re.Equal(uint64(1), arr[1].priority())
Expand All @@ -71,3 +76,46 @@ func TestPriority(t *testing.T) {
hq := NewPriorityQueue()
testFunc(hq)
}

func TestPriorityQueueTakeAllLeavesReferencesInBackingArray(t *testing.T) {
re := require.New(t)
pq := NewPriorityQueue()
checkReferences := func() bool {
backing := pq.ps[len(pq.ps):cap(pq.ps)]
for _, v := range backing {
if v != nil {
return true
}
}
return false
}

for i := 0; i < 3; i++ {
pq.Push(&FakeItem{pri: uint64(i + 1)})
}
re.False(checkReferences(), "expected no references in backing array yet")

arr := make([]Item, 0)
collect := func(items []Item) {
arr = arr[:0]
arr = append(arr, items...)
}

// pop one item, should leave reference in backing array.
pq.Take(1, collect)
re.Len(pq.ps, 2)
re.Len(arr, 1)
re.False(checkReferences(), "expected no references in backing array yet")

// Take all items without clean, the references remain in the backing array.
pq.Take(pq.Len(), collect)
re.Empty(pq.ps)
re.False(checkReferences(), "expected no references in backing array yet")

for i := 0; i < 3; i++ {
pq.Push(&FakeItem{pri: uint64(i + 1), canceled: true})
}
pq.clean()
re.Empty(pq.ps)
re.False(checkReferences(), "expected no references in backing array yet")
}
Loading