Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 125 additions & 0 deletions go/libraries/doltcore/doltdb/gc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package doltdb_test

import (
"context"
"errors"
"os"
"testing"

"github.com/dolthub/go-mysql-server/sql"
Expand All @@ -28,7 +30,13 @@ import (
"github.com/dolthub/dolt/go/libraries/doltcore/env"
"github.com/dolthub/dolt/go/libraries/doltcore/ref"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle"
"github.com/dolthub/dolt/go/libraries/utils/filesys"
"github.com/dolthub/dolt/go/store/hash"
"github.com/dolthub/dolt/go/store/nbs"
"github.com/dolthub/dolt/go/store/prolly"
"github.com/dolthub/dolt/go/store/prolly/tree"
"github.com/dolthub/dolt/go/store/types"
"github.com/dolthub/dolt/go/store/val"
)

func TestGarbageCollection(t *testing.T) {
Expand All @@ -40,6 +48,8 @@ func TestGarbageCollection(t *testing.T) {
testGarbageCollection(t, gct)
})
}

t.Run("HasCacheDataCorruption", testGarbageCollectionHasCacheDataCorruptionBugFix)
}

type stage struct {
Expand Down Expand Up @@ -140,3 +150,118 @@ func testGarbageCollection(t *testing.T, test gcTest) {
require.NoError(t, err)
assert.Equal(t, test.expected, actual)
}

// In September 2023, we found a failure to handle the `hasCache` in
// `*NomsBlockStore` appropriately while cleaning up a memtable into which
// dangling references had been written could result in writing chunks to a
// database which referenced non-existant chunks.
//
// The general pattern was to get new chunk addresses into the hasCache, but
// not written to the store, and then to have an incoming chunk add a refenece
// to missing chunk. At that time, we would clear the memtable, since it had
// invalid chunks in it, but we wouldn't purge the hasCache. Later writes which
// attempted to reference the chunks which had made it into the hasCache would
// succeed.
//
// One such concrete pattern for doing this is implemented below. We do:
//
// 1) Put a new chunk to the database -- C1.
//
// 2) Run a GC.
//
// 3) Put a new chunk to the database -- C2.
//
// 4) Call NBS.Commit() with a stale last hash.Hash. This causes us to cache C2
// as present in the store, but it does not get written to disk, because the
// optimistic concurrency control on the value of the current root hash fails.
//
// 5) Put a chunk referencing C1 to the database -- R1.
//
// 5) Call NBS.Commit(). This causes ErrDanglingRef. C1 was written before the
// GC and is no longer in the store. C2 is also cleared from the pending write
// set.
//
// 6) Put a chunk referencing C2 to the database -- R2.
//
// 7) Call NBS.Commit(). This should fail, since R2 references C2 and C2 is not
// in the store. However, C2 is in the cache as a result of step #4, and so
// this does not fail. R2 gets written to disk with a dangling reference to C2.
func testGarbageCollectionHasCacheDataCorruptionBugFix(t *testing.T) {
ctx := context.Background()

d, err := os.MkdirTemp(t.TempDir(), "hascachetest-")
require.NoError(t, err)

ddb, err := doltdb.LoadDoltDB(ctx, types.Format_DOLT, "file://"+d, filesys.LocalFS)
require.NoError(t, err)
defer ddb.Close()

err = ddb.WriteEmptyRepo(ctx, "main", "Aaron Son", "aaron@dolthub.com")
require.NoError(t, err)

root, err := ddb.NomsRoot(ctx)
require.NoError(t, err)

ns := ddb.NodeStore()

c1 := newIntMap(t, ctx, ns, 1, 1)
_, err = ns.Write(ctx, c1.Node())
require.NoError(t, err)

err = ddb.GC(ctx, nil)
require.NoError(t, err)

c2 := newIntMap(t, ctx, ns, 2, 2)
_, err = ns.Write(ctx, c2.Node())
require.NoError(t, err)

success, err := ddb.CommitRoot(ctx, c2.HashOf(), c2.HashOf())
require.NoError(t, err)
require.False(t, success, "committing the root with a last hash which does not match the current root must fail")

r1 := newAddrMap(t, ctx, ns, "r1", c1.HashOf())
_, err = ns.Write(ctx, r1.Node())
require.NoError(t, err)

success, err = ddb.CommitRoot(ctx, root, root)
require.True(t, errors.Is(err, nbs.ErrDanglingRef), "committing a reference to just-collected c1 must fail with ErrDanglingRef")

r2 := newAddrMap(t, ctx, ns, "r2", c2.HashOf())
_, err = ns.Write(ctx, r2.Node())
require.NoError(t, err)

success, err = ddb.CommitRoot(ctx, root, root)
require.True(t, errors.Is(err, nbs.ErrDanglingRef), "committing a reference to c2, which was erased with the ErrDanglingRef above, must also fail with ErrDanglingRef")
}

func newIntMap(t *testing.T, ctx context.Context, ns tree.NodeStore, k, v int8) prolly.Map {
desc := val.NewTupleDescriptor(val.Type{
Enc: val.Int8Enc,
Nullable: false,
})

tb := val.NewTupleBuilder(desc)
tb.PutInt8(0, k)
keyTuple := tb.Build(ns.Pool())

tb.PutInt8(0, v)
valueTuple := tb.Build(ns.Pool())

m, err := prolly.NewMapFromTuples(ctx, ns, desc, desc, keyTuple, valueTuple)
require.NoError(t, err)
return m
}

func newAddrMap(t *testing.T, ctx context.Context, ns tree.NodeStore, key string, h hash.Hash) prolly.AddressMap {
m, err := prolly.NewEmptyAddressMap(ns)
require.NoError(t, err)

editor := m.Editor()
err = editor.Add(ctx, key, h)
require.NoError(t, err)

m, err = editor.Flush(ctx)
require.NoError(t, err)

return m
}
13 changes: 0 additions & 13 deletions go/store/nbs/generational_chunk_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package nbs

import (
"context"
"fmt"
"io"
"path/filepath"
"strings"
Expand Down Expand Up @@ -154,18 +153,6 @@ func (gcs *GenerationalNBS) hasMany(recs []hasRecord) (absent hash.HashSet, err
return gcs.oldGen.hasMany(recs)
}

func (gcs *GenerationalNBS) errorIfDangling(ctx context.Context, addrs hash.HashSet) error {
absent, err := gcs.HasMany(ctx, addrs)
if err != nil {
return err
}
if len(absent) != 0 {
s := absent.String()
return fmt.Errorf("Found dangling references to %s", s)
}
return nil
}

// Put caches c in the ChunkSource. Upon return, c must be visible to
// subsequent Get and Has calls, but must not be persistent until a call
// to Flush(). Put may be called concurrently with other calls to Put(),
Expand Down
106 changes: 40 additions & 66 deletions go/store/nbs/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -705,6 +705,36 @@ func (nbs *NomsBlockStore) putChunk(ctx context.Context, c chunks.Chunk, getAddr
return nil
}

// When we have chunks with dangling references in our memtable, we have to
// throw away the entire memtable.
func (nbs *NomsBlockStore) handlePossibleDanglingRefError(err error) {
if errors.Is(err, ErrDanglingRef) {
nbs.mt = nil
}
}

// Writes to a Dolt database typically involve mutating some tuple maps and
// then mutating the top-level address map which points to all the branch heads
// and working sets. Each internal node of the address map can have many
// references and many of them typically change quite slowly. We keep a cache
// of recently written references which we know are in the database so that we
// don't have to check the table file indexes for these chunks when we write
// references to them again in the near future.
//
// This cache needs to be treated in a principled manner. The integrity checks
// that we run against the a set of chunks we are attempting to write consider
// the to-be-written chunks themselves as also being in the database. This is
// correct, assuming that all the chunks are written at the same time. However,
// we should not add the results of those presence checks to the cache until
// those chunks actually land in the database.
func (nbs *NomsBlockStore) addPendingRefsToHasCache() {
for _, e := range nbs.mt.pendingRefs {
if e.has {
nbs.hasCache.Add(*e.a, struct{}{})
}
}
}

func (nbs *NomsBlockStore) addChunk(ctx context.Context, ch chunks.Chunk, addrs hash.HashSet, checker refCheck) (bool, error) {
if err := ctx.Err(); err != nil {
return false, err
Expand All @@ -725,11 +755,10 @@ func (nbs *NomsBlockStore) addChunk(ctx context.Context, ch chunks.Chunk, addrs
if addChunkRes == chunkNotAdded {
ts, err := nbs.tables.append(ctx, nbs.mt, checker, nbs.hasCache, nbs.stats)
if err != nil {
if errors.Is(err, ErrDanglingRef) {
nbs.mt = nil
}
nbs.handlePossibleDanglingRefError(err)
return false, err
}
nbs.addPendingRefsToHasCache()
nbs.tables = ts
nbs.mt = newMemTable(nbs.mtSize)
addChunkRes = nbs.mt.addChunk(a, ch.Data())
Expand Down Expand Up @@ -757,7 +786,6 @@ type refCheck func(reqs []hasRecord) (hash.HashSet, error)
func (nbs *NomsBlockStore) errorIfDangling(root hash.Hash, checker refCheck) error {
if !root.IsEmpty() {
a := addr(root)
// We use |Get| here, since it updates recency of the entry.
if _, ok := nbs.hasCache.Get(a); !ok {
var hr [1]hasRecord
hr[0].a = &a
Expand All @@ -771,32 +799,6 @@ func (nbs *NomsBlockStore) errorIfDangling(root hash.Hash, checker refCheck) err
nbs.hasCache.Add(a, struct{}{})
}
}

if nbs.mt == nil || nbs.mt.pendingRefs == nil {
return nil // no pending refs to check
}

for i := range nbs.mt.pendingRefs {
// All of these are going to be |Add|ed after the call. We use
// |Contains| to check here so the frequency count only gets
// bumped once.
if nbs.hasCache.Contains(*nbs.mt.pendingRefs[i].a) {
nbs.mt.pendingRefs[i].has = true
}
}

sort.Sort(hasRecordByPrefix(nbs.mt.pendingRefs))
absent, err := checker(nbs.mt.pendingRefs)
if err != nil {
return err
} else if absent.Size() > 0 {
return fmt.Errorf("%w: found dangling references to %s", ErrDanglingRef, absent.String())
}

for _, e := range nbs.mt.pendingRefs {
nbs.hasCache.Add(*e.a, struct{}{})
}

return nil
}

Expand Down Expand Up @@ -1130,39 +1132,6 @@ func (nbs *NomsBlockStore) commit(ctx context.Context, current, last hash.Hash,
return true, nil
}

// check for dangling references in |nbs.mt|
if err = nbs.errorIfDangling(current, checker); err != nil {
if errors.Is(err, ErrDanglingRef) {
nbs.mt = nil
}
return false, err
}

// This is unfortunate. We want to serialize commits to the same store
// so that we avoid writing a bunch of unreachable small tables which result
// from optimistic lock failures. However, this means that the time to
// write tables is included in "commit" time and if all commits are
// serialized, it means a lot more waiting.
// "non-trivial" tables are persisted here, outside of the commit-lock.
// all other tables are persisted in updateManifest()
if nbs.mt != nil {
cnt, err := nbs.mt.count()
if err != nil {
return false, err
}

if cnt > preflushChunkCount {
ts, err := nbs.tables.append(ctx, nbs.mt, checker, nbs.hasCache, nbs.stats)
if err != nil {
if errors.Is(err, ErrDanglingRef) {
nbs.mt = nil
}
return false, err
}
nbs.tables, nbs.mt = ts, nil
}
}

nbs.mm.LockForUpdate()
defer func() {
unlockErr := nbs.mm.UnlockForUpdate()
Expand Down Expand Up @@ -1233,11 +1202,10 @@ func (nbs *NomsBlockStore) updateManifest(ctx context.Context, current, last has
if cnt > 0 {
ts, err := nbs.tables.append(ctx, nbs.mt, checker, nbs.hasCache, nbs.stats)
if err != nil {
if errors.Is(err, ErrDanglingRef) {
nbs.mt = nil
}
nbs.handlePossibleDanglingRefError(err)
return err
}
nbs.addPendingRefsToHasCache()
nbs.tables, nbs.mt = ts, nil
}
}
Expand All @@ -1250,6 +1218,12 @@ func (nbs *NomsBlockStore) updateManifest(ctx context.Context, current, last has
return errOptimisticLockFailedTables
}

// check for dangling reference to the new root
if err = nbs.errorIfDangling(current, checker); err != nil {
nbs.handlePossibleDanglingRefError(err)
return err
}

specs, err := nbs.tables.toSpecs()
if err != nil {
return err
Expand Down
4 changes: 0 additions & 4 deletions go/store/nbs/table_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,10 +302,6 @@ func (ts tableSet) append(ctx context.Context, mt *memTable, checker refCheck, h
return tableSet{}, fmt.Errorf("%w: found dangling references to %s", ErrDanglingRef, absent.String())
}

for _, e := range mt.pendingRefs {
hasCache.Add(*e.a, struct{}{})
}

cs, err := ts.p.Persist(ctx, mt, ts, stats)
if err != nil {
return tableSet{}, err
Expand Down