Skip to content

Commit 54d1022

Browse files
author
Dan Laine
authored
merkleDB -- add inner heap type to syncWorkHeap (ava-labs#1582)
1 parent 6dad1d4 commit 54d1022

File tree

2 files changed

+170
-133
lines changed

2 files changed

+170
-133
lines changed

x/sync/syncworkheap.go

Lines changed: 38 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -12,21 +12,23 @@ import (
1212
"github.com/google/btree"
1313
)
1414

15-
var _ heap.Interface = (*syncWorkHeap)(nil)
15+
var _ heap.Interface = (*innerHeap)(nil)
1616

1717
type heapItem struct {
1818
workItem *syncWorkItem
1919
heapIndex int
2020
}
2121

22+
type innerHeap []*heapItem
23+
2224
// A priority queue of syncWorkItems.
2325
// Note that work item ranges never overlap.
2426
// Supports range merging and priority updating.
2527
// Not safe for concurrent use.
2628
type syncWorkHeap struct {
2729
// Max heap of items by priority.
2830
// i.e. heap.Pop returns highest priority item.
29-
priorityHeap []*heapItem
31+
innerHeap innerHeap
3032
// The heap items sorted by range start.
3133
// A nil start is considered to be the smallest.
3234
sortedItems *btree.BTreeG[*heapItem]
@@ -35,7 +37,6 @@ type syncWorkHeap struct {
3537

3638
func newSyncWorkHeap() *syncWorkHeap {
3739
return &syncWorkHeap{
38-
priorityHeap: make([]*heapItem, 0),
3940
sortedItems: btree.NewG(
4041
2,
4142
func(a, b *heapItem) bool {
@@ -56,7 +57,10 @@ func (wh *syncWorkHeap) Insert(item *syncWorkItem) {
5657
return
5758
}
5859

59-
heap.Push(wh, &heapItem{workItem: item})
60+
wrappedItem := &heapItem{workItem: item}
61+
62+
heap.Push(&wh.innerHeap, wrappedItem)
63+
wh.sortedItems.ReplaceOrInsert(wrappedItem)
6064
}
6165

6266
// Pops and returns a work item from the heap.
@@ -65,7 +69,9 @@ func (wh *syncWorkHeap) GetWork() *syncWorkItem {
6569
if wh.closed || wh.Len() == 0 {
6670
return nil
6771
}
68-
return heap.Pop(wh).(*heapItem).workItem
72+
item := heap.Pop(&wh.innerHeap).(*heapItem)
73+
wh.sortedItems.Delete(item)
74+
return item.workItem
6975
}
7076

7177
// Insert the item into the heap, merging it with existing items
@@ -97,7 +103,7 @@ func (wh *syncWorkHeap) MergeInsert(item *syncWorkItem) {
97103
// merged into [beforeItem.start, item.end]
98104
beforeItem.workItem.end = item.end
99105
beforeItem.workItem.priority = math.Max(item.priority, beforeItem.workItem.priority)
100-
heap.Fix(wh, beforeItem.heapIndex)
106+
heap.Fix(&wh.innerHeap, beforeItem.heapIndex)
101107
mergedBefore = beforeItem
102108
}
103109
return false
@@ -113,7 +119,7 @@ func (wh *syncWorkHeap) MergeInsert(item *syncWorkItem) {
113119
// [item.start, afterItem.end].
114120
afterItem.workItem.start = item.start
115121
afterItem.workItem.priority = math.Max(item.priority, afterItem.workItem.priority)
116-
heap.Fix(wh, afterItem.heapIndex)
122+
heap.Fix(&wh.innerHeap, afterItem.heapIndex)
117123
mergedAfter = afterItem
118124
}
119125
return false
@@ -128,61 +134,54 @@ func (wh *syncWorkHeap) MergeInsert(item *syncWorkItem) {
128134
wh.remove(mergedAfter)
129135
// update the priority
130136
mergedBefore.workItem.priority = math.Max(mergedBefore.workItem.priority, mergedAfter.workItem.priority)
131-
heap.Fix(wh, mergedBefore.heapIndex)
137+
heap.Fix(&wh.innerHeap, mergedBefore.heapIndex)
132138
}
133139

134140
// nothing was merged, so add new item to the heap
135141
if mergedBefore == nil && mergedAfter == nil {
136142
// We didn't merge [item] with an existing one; put it in the heap.
137-
heap.Push(wh, &heapItem{workItem: item})
143+
wh.Insert(item)
138144
}
139145
}
140146

141147
// Deletes [item] from the heap.
142148
func (wh *syncWorkHeap) remove(item *heapItem) {
143-
oldIndex := item.heapIndex
144-
newLength := len(wh.priorityHeap) - 1
149+
heap.Remove(&wh.innerHeap, item.heapIndex)
145150

146-
// swap with last item, delete item, then fix heap if required
147-
wh.Swap(newLength, item.heapIndex)
148-
wh.priorityHeap[newLength] = nil
149-
wh.priorityHeap = wh.priorityHeap[:newLength]
150-
151-
// the item was already the last item, so nothing needs to be fixed
152-
if oldIndex != newLength {
153-
heap.Fix(wh, oldIndex)
154-
}
155151
wh.sortedItems.Delete(item)
156152
}
157153

154+
func (wh *syncWorkHeap) Len() int {
155+
return wh.innerHeap.Len()
156+
}
157+
158158
// below this line are the implementations required for heap.Interface
159159

160-
func (wh *syncWorkHeap) Len() int {
161-
return len(wh.priorityHeap)
160+
func (h innerHeap) Len() int {
161+
return len(h)
162162
}
163163

164-
func (wh *syncWorkHeap) Less(i int, j int) bool {
165-
return wh.priorityHeap[i].workItem.priority > wh.priorityHeap[j].workItem.priority
164+
func (h innerHeap) Less(i int, j int) bool {
165+
return h[i].workItem.priority > h[j].workItem.priority
166166
}
167167

168-
func (wh *syncWorkHeap) Swap(i int, j int) {
169-
wh.priorityHeap[i], wh.priorityHeap[j] = wh.priorityHeap[j], wh.priorityHeap[i]
170-
wh.priorityHeap[i].heapIndex = i
171-
wh.priorityHeap[j].heapIndex = j
168+
func (h innerHeap) Swap(i int, j int) {
169+
h[i], h[j] = h[j], h[i]
170+
h[i].heapIndex = i
171+
h[j].heapIndex = j
172172
}
173173

174-
func (wh *syncWorkHeap) Pop() interface{} {
175-
newLength := len(wh.priorityHeap) - 1
176-
value := wh.priorityHeap[newLength]
177-
wh.priorityHeap[newLength] = nil
178-
wh.priorityHeap = wh.priorityHeap[:newLength]
179-
wh.sortedItems.Delete(value)
180-
return value
174+
func (h *innerHeap) Pop() interface{} {
175+
old := *h
176+
n := len(old)
177+
item := old[n-1]
178+
old[n-1] = nil
179+
*h = old[0 : n-1]
180+
return item
181181
}
182182

183-
func (wh *syncWorkHeap) Push(x interface{}) {
183+
func (h *innerHeap) Push(x interface{}) {
184184
item := x.(*heapItem)
185-
item.heapIndex = len(wh.priorityHeap)
186-
wh.priorityHeap = append(wh.priorityHeap, item)
187-
wh.sortedItems.ReplaceOrInsert(item)
185+
item.heapIndex = len(*h)
186+
*h = append(*h, item)
188187
}

0 commit comments

Comments
 (0)