Skip to content

Commit 10be6ca

Browse files
committed
Fix manifest corruption dgraph-io#1756
1 parent 6a32b8a commit 10be6ca

File tree

2 files changed

+51
-7
lines changed

2 files changed

+51
-7
lines changed

manifest.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,11 @@ import (
2727
"path/filepath"
2828
"sync"
2929

30-
"github.com/dgraph-io/badger/pb"
31-
"github.com/dgraph-io/badger/y"
3230
"github.com/golang/protobuf/proto"
3331
"github.com/pkg/errors"
32+
33+
"github.com/dgraph-io/badger/pb"
34+
"github.com/dgraph-io/badger/y"
3435
)
3536

3637
// Manifest represents the contents of the MANIFEST file in a Badger store.
@@ -194,15 +195,15 @@ func (mf *manifestFile) addChanges(changesParam []*pb.ManifestChange) error {
194195

195196
// Maybe we could use O_APPEND instead (on certain file systems)
196197
mf.appendLock.Lock()
198+
defer mf.appendLock.Unlock()
199+
197200
if err := applyChangeSet(&mf.manifest, &changes); err != nil {
198-
mf.appendLock.Unlock()
199201
return err
200202
}
201203
// Rewrite manifest if it'd shrink by 1/10 and it's big enough to care
202204
if mf.manifest.Deletions > mf.deletionsRewriteThreshold &&
203205
mf.manifest.Deletions > manifestDeletionsRatio*(mf.manifest.Creations-mf.manifest.Deletions) {
204206
if err := mf.rewrite(); err != nil {
205-
mf.appendLock.Unlock()
206207
return err
207208
}
208209
} else {
@@ -211,15 +212,15 @@ func (mf *manifestFile) addChanges(changesParam []*pb.ManifestChange) error {
211212
binary.BigEndian.PutUint32(lenCrcBuf[4:8], crc32.Checksum(buf, y.CastagnoliCrcTable))
212213
buf = append(lenCrcBuf[:], buf...)
213214
if _, err := mf.fp.Write(buf); err != nil {
214-
mf.appendLock.Unlock()
215215
return err
216216
}
217217
}
218218

219-
mf.appendLock.Unlock()
220-
return y.FileSync(mf.fp)
219+
return syncFunc(mf.fp)
221220
}
222221

222+
var syncFunc = func(f *os.File) error { return y.FileSync(f) }
223+
223224
// Has to be 4 bytes. The value can never change, ever, anyway.
224225
var magicText = [4]byte{'B', 'd', 'g', 'r'}
225226

manifest_test.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@ import (
2323
"os"
2424
"path/filepath"
2525
"sort"
26+
"sync"
2627
"testing"
28+
"time"
2729

2830
"golang.org/x/net/trace"
2931

@@ -240,3 +242,44 @@ func TestManifestRewrite(t *testing.T) {
240242
uint64(deletionsThreshold * 3): {Level: 0, Checksum: []byte{}},
241243
}, m.Tables)
242244
}
245+
246+
func TestConcurrentManifestCompaction(t *testing.T) {
247+
dir, err := ioutil.TempDir("", "badger-test")
248+
require.NoError(t, err)
249+
defer removeDir(dir)
250+
251+
// set this low so rewrites will happen more often
252+
deletionsThreshold := 1
253+
254+
// overwrite the sync function to make this race condition easily reproducible
255+
syncFunc = func(f *os.File) error {
256+
// effectively making the Sync() take around 1s makes this reproduce every time
257+
time.Sleep(1 * time.Second)
258+
return f.Sync()
259+
}
260+
261+
mf, _, err := helpOpenOrCreateManifestFile(dir, false, deletionsThreshold)
262+
require.NoError(t, err)
263+
264+
cs := &pb.ManifestChangeSet{}
265+
for i := uint64(0); i < 1000; i++ {
266+
cs.Changes = append(cs.Changes,
267+
newCreateChange(i, 0, nil),
268+
newDeleteChange(i),
269+
)
270+
}
271+
272+
// simulate 2 concurrent compaction threads
273+
n := 2
274+
wg := sync.WaitGroup{}
275+
wg.Add(n)
276+
for i := 0; i < n; i++ {
277+
go func() {
278+
defer wg.Done()
279+
require.NoError(t, mf.addChanges(cs.Changes))
280+
}()
281+
}
282+
wg.Wait()
283+
284+
require.NoError(t, mf.close())
285+
}

0 commit comments

Comments
 (0)