Skip to content

Commit 3afd93e

Browse files
committed
fix(embedded/tbtree): add marker entry to store timestamp when tree is empty
Signed-off-by: Stefano Scafiti <stefano.scafiti96@gmail.com>
1 parent cf9a5d8 commit 3afd93e

File tree

3 files changed

+161
-13
lines changed

3 files changed

+161
-13
lines changed

embedded/tbtree/snapshot.go

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -442,8 +442,15 @@ func (n *innerNode) writeTo(nw, hw io.Writer, writeOpts *WriteOpts, buf []byte)
442442
buf[bi] = InnerNodeType
443443
bi++
444444

445-
binary.BigEndian.PutUint16(buf[bi:], uint16(len(n.nodes)))
446-
bi += 2
445+
if n.tsMutated() {
446+
binary.BigEndian.PutUint16(buf[bi:], uint16(len(n.nodes)+1))
447+
bi += 2
448+
449+
bi += writeNodeRefToWithOffset(n, math.MaxInt64, math.MaxInt64, buf[bi:])
450+
} else {
451+
binary.BigEndian.PutUint16(buf[bi:], uint16(len(n.nodes)))
452+
bi += 2
453+
}
447454

448455
for i, c := range n.nodes {
449456
n := writeNodeRefToWithOffset(c, offsets[i], minOffsets[i], buf[bi:])
@@ -505,11 +512,23 @@ func (l *leafNode) writeTo(nw, hw io.Writer, writeOpts *WriteOpts, buf []byte) (
505512
buf[bi] = LeafNodeType
506513
bi++
507514

508-
binary.BigEndian.PutUint16(buf[bi:], uint16(len(l.values)))
509-
bi += 2
515+
if l.tsMutated() {
516+
// NOTE: we store a marker entry to remember the highest timestamp seen
510517

511-
accH := int64(0)
518+
binary.BigEndian.PutUint16(buf[bi:], uint16(len(l.values)+1))
519+
bi += 2
512520

521+
binary.BigEndian.PutUint16(buf[bi:], 0)
522+
bi += 2
523+
524+
binary.BigEndian.PutUint64(buf[bi:], l._ts)
525+
bi += 8
526+
} else {
527+
binary.BigEndian.PutUint16(buf[bi:], uint16(len(l.values)))
528+
bi += 2
529+
}
530+
531+
accH := int64(0)
513532
for _, v := range l.values {
514533
timedValue := v.timedValues[0]
515534

embedded/tbtree/tbtree.go

Lines changed: 43 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,7 @@ type node interface {
240240
minKey() []byte
241241
ts() uint64
242242
setTs(ts uint64) (node, error)
243+
tsMutated() bool
243244
size() (int, error)
244245
mutated() bool
245246
offset() int64 // only valid when !mutated()
@@ -825,7 +826,6 @@ func (t *TBtree) readNodeFrom(r *appendable.Reader) (node, error) {
825826
n.off = off
826827
return n, nil
827828
}
828-
829829
return nil, ErrReadingFileContent
830830
}
831831

@@ -837,7 +837,7 @@ func (t *TBtree) readInnerNodeFrom(r *appendable.Reader) (*innerNode, error) {
837837

838838
n := &innerNode{
839839
t: t,
840-
nodes: make([]node, childCount),
840+
nodes: make([]node, 0, childCount),
841841
_minOff: math.MaxInt64,
842842
}
843843

@@ -847,7 +847,13 @@ func (t *TBtree) readInnerNodeFrom(r *appendable.Reader) (*innerNode, error) {
847847
return nil, err
848848
}
849849

850-
n.nodes[c] = nref
850+
// marker node
851+
if nref.off == math.MaxInt64 {
852+
n._ts = nref.ts()
853+
continue
854+
}
855+
856+
n.nodes = append(n.nodes, nref)
851857

852858
if n._ts < nref._ts {
853859
n._ts = nref._ts
@@ -857,7 +863,6 @@ func (t *TBtree) readInnerNodeFrom(r *appendable.Reader) (*innerNode, error) {
857863
n._minOff = nref._minOff
858864
}
859865
}
860-
861866
return n, nil
862867
}
863868

@@ -905,7 +910,7 @@ func (t *TBtree) readLeafNodeFrom(r *appendable.Reader) (*leafNode, error) {
905910

906911
l := &leafNode{
907912
t: t,
908-
values: make([]*leafValue, valueCount),
913+
values: make([]*leafValue, 0, valueCount),
909914
}
910915

911916
for c := 0; c < int(valueCount); c++ {
@@ -914,6 +919,16 @@ func (t *TBtree) readLeafNodeFrom(r *appendable.Reader) (*leafNode, error) {
914919
return nil, err
915920
}
916921

922+
if ksize == 0 {
923+
ts, err := r.ReadUint64()
924+
if err != nil {
925+
return nil, err
926+
}
927+
928+
l._ts = ts
929+
continue
930+
}
931+
917932
key := make([]byte, ksize)
918933
_, err = r.Read(key)
919934
if err != nil {
@@ -953,13 +968,12 @@ func (t *TBtree) readLeafNodeFrom(r *appendable.Reader) (*leafNode, error) {
953968
hCount: hCount,
954969
}
955970

956-
l.values[c] = leafValue
971+
l.values = append(l.values, leafValue)
957972

958973
if l._ts < ts {
959974
l._ts = ts
960975
}
961976
}
962-
963977
return l, nil
964978
}
965979

@@ -1607,7 +1621,6 @@ func (t *TBtree) IncreaseTs(ts uint64) error {
16071621
_, _, err := t.flushTree(t.cleanupPercentage, false, false, "increaseTs")
16081622
return err
16091623
}
1610-
16111624
return nil
16121625
}
16131626

@@ -2097,6 +2110,15 @@ func (n *innerNode) size() (int, error) {
20972110
return size, nil
20982111
}
20992112

2113+
func (l *innerNode) tsMutated() bool {
2114+
for _, nd := range l.nodes {
2115+
if nd.ts() >= l.ts() {
2116+
return false
2117+
}
2118+
}
2119+
return true
2120+
}
2121+
21002122
func (n *innerNode) mutated() bool {
21012123
return n.mut
21022124
}
@@ -2265,6 +2287,10 @@ func (r *nodeRef) size() (int, error) {
22652287
return n.size()
22662288
}
22672289

2290+
func (r *nodeRef) tsMutated() bool {
2291+
return false
2292+
}
2293+
22682294
func (r *nodeRef) mutated() bool {
22692295
return false
22702296
}
@@ -2600,6 +2626,15 @@ func (l *leafNode) size() (int, error) {
26002626
return size, nil
26012627
}
26022628

2629+
func (l *leafNode) tsMutated() bool {
2630+
for _, v := range l.values {
2631+
if v.timedValues[0].Ts >= l.ts() {
2632+
return false
2633+
}
2634+
}
2635+
return true
2636+
}
2637+
26032638
func (l *leafNode) mutated() bool {
26042639
return l.mut
26052640
}

embedded/tbtree/tbtree_test.go

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1314,6 +1314,100 @@ func TestTBTreeIncreaseTs(t *testing.T) {
13141314
require.ErrorIs(t, err, ErrAlreadyClosed)
13151315
}
13161316

1317+
func TestTBTreeFlushAfterIncreaseTs(t *testing.T) {
1318+
dir := t.TempDir()
1319+
t.Cleanup(func() {
1320+
os.RemoveAll(dir)
1321+
})
1322+
1323+
tbtree, err := Open(dir, DefaultOptions())
1324+
require.NoError(t, err)
1325+
1326+
t.Run("increase ts to empty tree", func(t *testing.T) {
1327+
err = tbtree.IncreaseTs(100)
1328+
require.NoError(t, err)
1329+
1330+
_, _, err = tbtree.Flush()
1331+
require.NoError(t, err)
1332+
1333+
err = tbtree.Close()
1334+
require.NoError(t, err)
1335+
1336+
tbtree, err = Open(dir, DefaultOptions())
1337+
require.NoError(t, err)
1338+
1339+
require.Equal(t, tbtree.Ts(), uint64(100))
1340+
})
1341+
1342+
insertValues := func(n int) {
1343+
for i := 0; i < n; i++ {
1344+
var buf [4]byte
1345+
binary.BigEndian.PutUint32(buf[:], uint32(i))
1346+
1347+
err := tbtree.Insert(
1348+
buf[:],
1349+
buf[:],
1350+
)
1351+
require.NoError(t, err)
1352+
}
1353+
}
1354+
1355+
t.Run("increase ts after insertions", func(t *testing.T) {
1356+
insertValues(10)
1357+
require.Equal(t, uint64(110), tbtree.Ts())
1358+
1359+
err := tbtree.IncreaseTs(200)
1360+
require.NoError(t, err)
1361+
1362+
_, _, err = tbtree.Flush()
1363+
require.NoError(t, err)
1364+
1365+
err = tbtree.Close()
1366+
require.NoError(t, err)
1367+
1368+
tbtree, err = Open(dir, DefaultOptions())
1369+
require.NoError(t, err)
1370+
1371+
require.Equal(t, uint64(200), tbtree.Ts())
1372+
1373+
ln, isLeaf := tbtree.root.(*leafNode)
1374+
require.True(t, isLeaf)
1375+
1376+
require.Equal(t, uint64(200), ln.ts())
1377+
require.Len(t, ln.values, 10)
1378+
1379+
for _, v := range ln.values {
1380+
require.Less(t, v.timedValue().Ts, ln.ts())
1381+
}
1382+
})
1383+
1384+
t.Run("increase ts after more insertions", func(t *testing.T) {
1385+
insertValues(1000)
1386+
require.Equal(t, uint64(1200), tbtree.Ts())
1387+
1388+
err := tbtree.IncreaseTs(2500)
1389+
require.NoError(t, err)
1390+
1391+
_, _, err = tbtree.Flush()
1392+
require.NoError(t, err)
1393+
1394+
err = tbtree.Close()
1395+
require.NoError(t, err)
1396+
1397+
tbtree, err = Open(dir, DefaultOptions())
1398+
require.NoError(t, err)
1399+
1400+
require.Equal(t, uint64(2500), tbtree.Ts())
1401+
1402+
nd, isInner := tbtree.root.(*innerNode)
1403+
require.True(t, isInner)
1404+
1405+
for _, child := range nd.nodes {
1406+
require.Less(t, child.ts(), nd.ts())
1407+
}
1408+
})
1409+
}
1410+
13171411
func BenchmarkRandomInsertion(b *testing.B) {
13181412
seed := rand.NewSource(time.Now().UnixNano())
13191413
rnd := rand.New(seed)

0 commit comments

Comments
 (0)