Skip to content

Commit c99f2e5

Browse files
Merge pull request Workiva#143 from Workiva/immutable_b_tree
Immutable b tree
2 parents 133c65a + ed5110a commit c99f2e5

15 files changed

+3278
-0
lines changed

README.md

+5
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,11 @@ immutable version. Unfortunately, to make the B-tree generic we require an
145145
interface and the most expensive operation in CPU profiling is the interface
146146
method which in turn calls into runtime.assertI2T. We need generics.
147147

148+
#### Immutable B Tree
149+
A btree based on two principals, immutablability and concurrency.
150+
Somewhat slow for single value lookups and puts, it is very fast for bulk operations.
151+
A persister can be injected to make this index persistent.
152+
148153
#### Ctrie
149154

150155
A concurrent, lock-free hash array mapped trie with efficient non-blocking

btree/immutable/add.go

+319
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,319 @@
1+
/*
2+
Copyright 2014 Workiva, LLC
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package btree
18+
19+
import (
20+
"runtime"
21+
"sort"
22+
"sync"
23+
24+
terr "github.com/Workiva/go-datastructures/threadsafe/err"
25+
)
26+
27+
func (t *Tr) AddItems(its ...*Item) ([]*Item, error) {
28+
if len(its) == 0 {
29+
return nil, nil
30+
}
31+
32+
keys := make(Keys, 0, len(its))
33+
for _, item := range its {
34+
keys = append(keys, &Key{Value: item.Value, Payload: item.Payload})
35+
}
36+
37+
overwrittens, err := t.add(keys)
38+
if err != nil {
39+
return nil, err
40+
}
41+
42+
return overwrittens.toItems(), nil
43+
}
44+
45+
func (t *Tr) add(keys Keys) (Keys, error) {
46+
if t.Root == nil {
47+
n := t.createRoot()
48+
t.Root = n.ID
49+
t.context.addNode(n)
50+
}
51+
52+
nodes, err := t.determinePaths(keys)
53+
if err != nil {
54+
return nil, err
55+
}
56+
57+
var overwrittens Keys
58+
59+
var wg sync.WaitGroup
60+
wg.Add(len(nodes))
61+
var treeLock sync.Mutex
62+
localOverwrittens := make([]Keys, len(nodes))
63+
tree := make(map[string]*path, runtime.NumCPU())
64+
lerr := terr.New()
65+
66+
i := 0
67+
for id, bundles := range nodes {
68+
go func(i int, id string, bundles []*nodeBundle) {
69+
defer wg.Done()
70+
if len(bundles) == 0 {
71+
return
72+
}
73+
74+
n, err := t.contextOrCachedNode(ID(id), true)
75+
if err != nil {
76+
lerr.Set(err)
77+
return
78+
}
79+
80+
if !t.context.nodeExists(n.ID) {
81+
n = n.copy()
82+
t.context.addNode(n)
83+
}
84+
85+
overwrittens, err := insertLastDimension(t, n, bundles)
86+
87+
if err != nil {
88+
lerr.Set(err)
89+
return
90+
}
91+
92+
localOverwrittens[i] = overwrittens
93+
path := bundles[0].path
94+
treeLock.Lock()
95+
tree[string(n.ID)] = path
96+
treeLock.Unlock()
97+
}(i, id, bundles)
98+
i++
99+
}
100+
101+
wg.Wait()
102+
103+
if lerr.Get() != nil {
104+
return nil, lerr.Get()
105+
}
106+
107+
t.walkupInsert(tree)
108+
109+
for _, chunk := range localOverwrittens {
110+
overwrittens = append(overwrittens, chunk...)
111+
}
112+
113+
t.Count += len(keys) - len(overwrittens)
114+
115+
return overwrittens, nil
116+
}
117+
118+
func (t *Tr) determinePaths(keys Keys) (map[string][]*nodeBundle, error) {
119+
chunks := splitKeys(keys, runtime.NumCPU())
120+
var wg sync.WaitGroup
121+
wg.Add(len(chunks))
122+
chunkPaths := make([]map[interface{}]*nodeBundle, len(chunks))
123+
lerr := terr.New()
124+
125+
for i := range chunks {
126+
go func(i int) {
127+
defer wg.Done()
128+
keys := chunks[i]
129+
if len(keys) == 0 {
130+
return
131+
}
132+
mp := make(map[interface{}]*nodeBundle, len(keys))
133+
for _, key := range keys {
134+
path, err := t.iterativeFind(
135+
key.Value, t.Root,
136+
)
137+
138+
if err != nil {
139+
lerr.Set(err)
140+
return
141+
}
142+
mp[key.Value] = &nodeBundle{path: path, k: key}
143+
}
144+
chunkPaths[i] = mp
145+
}(i)
146+
}
147+
148+
wg.Wait()
149+
150+
if lerr.Get() != nil {
151+
return nil, lerr.Get()
152+
}
153+
154+
nodes := make(map[string][]*nodeBundle, 10)
155+
for _, chunk := range chunkPaths {
156+
for _, pb := range chunk {
157+
nodes[string(pb.path.peek().n.ID)] = append(nodes[string(pb.path.pop().n.ID)], pb)
158+
}
159+
}
160+
161+
return nodes, nil
162+
}
163+
164+
func insertByMerge(comparator Comparator, n *Node, bundles []*nodeBundle) (Keys, error) {
165+
positions := make(map[interface{}]int, len(n.ChildValues))
166+
overwrittens := make(Keys, 0, 10)
167+
168+
for i, value := range n.ChildValues {
169+
positions[value] = i
170+
}
171+
172+
for _, bundle := range bundles {
173+
if i, ok := positions[bundle.k.Value]; ok {
174+
overwrittens = append(overwrittens, n.ChildKeys[i])
175+
n.ChildKeys[i] = bundle.k
176+
} else {
177+
n.ChildValues = append(n.ChildValues, bundle.k.Value)
178+
n.ChildKeys = append(n.ChildKeys, bundle.k)
179+
}
180+
}
181+
182+
nsw := &nodeSortWrapper{
183+
values: n.ChildValues,
184+
keys: n.ChildKeys,
185+
comparator: comparator,
186+
}
187+
188+
sort.Sort(nsw)
189+
190+
for i := 0; i < len(nsw.values); i++ {
191+
if nsw.values[i] != nil {
192+
nsw.values = nsw.values[i:]
193+
nsw.keys = nsw.keys[i:]
194+
break
195+
}
196+
197+
nsw.keys[i] = nil
198+
}
199+
200+
n.ChildValues = nsw.values
201+
n.ChildKeys = nsw.keys
202+
return overwrittens, nil
203+
}
204+
205+
func insertLastDimension(t *Tr, n *Node, bundles []*nodeBundle) (Keys, error) {
206+
if n.IsLeaf && len(bundles) >= n.lenValues()/16 { // Found through empirical testing, it appears that the memmoves are more sensitive when dealing with interface{}'s.
207+
return insertByMerge(t.config.Comparator, n, bundles)
208+
}
209+
210+
overwrittens := make(Keys, 0, len(bundles))
211+
for _, bundle := range bundles {
212+
overwritten := n.insert(t.config.Comparator, bundle.k)
213+
if overwritten != nil {
214+
overwrittens = append(overwrittens, overwritten)
215+
}
216+
}
217+
218+
return overwrittens, nil
219+
}
220+
221+
func (t *Tr) iterativeSplit(n *Node) Keys {
222+
keys := make(Keys, 0, 10)
223+
for n.needsSplit(t.config.NodeWidth) {
224+
leftValue, leftNode := n.splitAt(t.config.NodeWidth / 2)
225+
t.context.addNode(leftNode)
226+
keys = append(keys, &Key{UUID: leftNode.ID, Value: leftValue})
227+
}
228+
229+
return keys
230+
}
231+
232+
// walkupInsert walks up nodes during the insertion process and adds
233+
// any new keys due to splits. Each layer of the tree can have insertions
234+
// performed in parallel as splits are local changes.
235+
func (t *Tr) walkupInsert(nodes map[string]*path) error {
236+
mapping := make(map[string]*Node, len(nodes))
237+
238+
for len(nodes) > 0 {
239+
splitNodes := make(map[string]Keys)
240+
newNodes := make(map[string]*path)
241+
for id, path := range nodes {
242+
node := t.context.getNode(ID(id))
243+
244+
parentPath := path.pop()
245+
if parentPath == nil {
246+
t.Root = node.ID
247+
continue
248+
}
249+
250+
parent := parentPath.n
251+
newNode := mapping[string(parent.ID)]
252+
if newNode == nil {
253+
if !t.context.nodeExists(parent.ID) {
254+
cp := parent.copy()
255+
if string(t.Root) == string(parent.ID) {
256+
t.Root = cp.ID
257+
}
258+
259+
t.context.addNode(cp)
260+
mapping[string(parent.ID)] = cp
261+
parent = cp
262+
} else {
263+
newNode = t.context.getNode(parent.ID)
264+
mapping[string(parent.ID)] = newNode
265+
parent = newNode
266+
}
267+
} else {
268+
parent = newNode
269+
}
270+
271+
i := parentPath.i
272+
273+
parent.replaceKeyAt(&Key{UUID: node.ID}, i)
274+
splitNodes[string(parent.ID)] = append(splitNodes[string(parent.ID)], t.iterativeSplit(node)...)
275+
newNodes[string(parent.ID)] = path
276+
}
277+
278+
var wg sync.WaitGroup
279+
wg.Add(len(splitNodes))
280+
lerr := terr.New()
281+
282+
for id, keys := range splitNodes {
283+
go func(id ID, keys Keys) {
284+
defer wg.Done()
285+
n, err := t.contextOrCachedNode(id, true)
286+
if err != nil {
287+
lerr.Set(err)
288+
return
289+
}
290+
for _, key := range keys {
291+
n.insert(t.config.Comparator, key)
292+
}
293+
}(ID(id), keys)
294+
}
295+
296+
wg.Wait()
297+
298+
if lerr.Get() != nil {
299+
return lerr.Get()
300+
}
301+
302+
nodes = newNodes
303+
}
304+
305+
n := t.context.getNode(t.Root)
306+
for n.needsSplit(t.config.NodeWidth) {
307+
root := newNode()
308+
t.Root = root.ID
309+
t.context.addNode(root)
310+
root.appendChild(&Key{UUID: n.ID})
311+
keys := t.iterativeSplit(n)
312+
for _, key := range keys {
313+
root.insert(t.config.Comparator, key)
314+
}
315+
n = root
316+
}
317+
318+
return nil
319+
}

0 commit comments

Comments
 (0)