Skip to content

Commit 6ed45ae

Browse files
authored
fix(manifest): fix manifest corruption due to race condition in concurrent compactions (#1756)
`manifestFile.rewrite` closes the file due to which `manifest.fp.Sync()` can fail. This leads to an updated manifest but non-updated tables set. Hence, the compaction process enters into an infinite loop failing with the `MANIFEST removes non-existing table` error. This PR fixes it.
1 parent cba20b9 commit 6ed45ae

File tree

2 files changed

+48
-6
lines changed

2 files changed

+48
-6
lines changed

manifest.go

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -212,19 +212,17 @@ func (mf *manifestFile) addChanges(changesParam []*pb.ManifestChange) error {
212212
}
213213
// Maybe we could use O_APPEND instead (on certain file systems)
214214
mf.appendLock.Lock()
215+
defer mf.appendLock.Unlock()
215216
if err := applyChangeSet(&mf.manifest, &changes); err != nil {
216-
mf.appendLock.Unlock()
217217
return err
218218
}
219219
if mf.inMemory {
220-
mf.appendLock.Unlock()
221220
return nil
222221
}
223222
// Rewrite manifest if it'd shrink by 1/10 and it's big enough to care
224223
if mf.manifest.Deletions > mf.deletionsRewriteThreshold &&
225224
mf.manifest.Deletions > manifestDeletionsRatio*(mf.manifest.Creations-mf.manifest.Deletions) {
226225
if err := mf.rewrite(); err != nil {
227-
mf.appendLock.Unlock()
228226
return err
229227
}
230228
} else {
@@ -233,15 +231,16 @@ func (mf *manifestFile) addChanges(changesParam []*pb.ManifestChange) error {
233231
binary.BigEndian.PutUint32(lenCrcBuf[4:8], crc32.Checksum(buf, y.CastagnoliCrcTable))
234232
buf = append(lenCrcBuf[:], buf...)
235233
if _, err := mf.fp.Write(buf); err != nil {
236-
mf.appendLock.Unlock()
237234
return err
238235
}
239236
}
240237

241-
mf.appendLock.Unlock()
242-
return mf.fp.Sync()
238+
return syncFunc(mf.fp)
243239
}
244240

241+
// this function is saved here to allow injection of fake filesystem latency at test time.
242+
var syncFunc = func(f *os.File) error { return f.Sync() }
243+
245244
// Has to be 4 bytes. The value can never change, ever, anyway.
246245
var magicText = [4]byte{'B', 'd', 'g', 'r'}
247246

manifest_test.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@ import (
2424
"os"
2525
"path/filepath"
2626
"sort"
27+
"sync"
2728
"testing"
29+
"time"
2830

2931
otrace "go.opencensus.io/trace"
3032

@@ -245,3 +247,44 @@ func TestManifestRewrite(t *testing.T) {
245247
uint64(deletionsThreshold * 3): {Level: 0},
246248
}, m.Tables)
247249
}
250+
251+
func TestConcurrentManifestCompaction(t *testing.T) {
252+
dir, err := ioutil.TempDir("", "badger-test")
253+
require.NoError(t, err)
254+
defer removeDir(dir)
255+
256+
// set this low so rewrites will happen more often
257+
deletionsThreshold := 1
258+
259+
// overwrite the sync function to make this race condition easily reproducible
260+
syncFunc = func(f *os.File) error {
261+
// effectively making the Sync() take around 1s makes this reproduce every time
262+
time.Sleep(1 * time.Second)
263+
return f.Sync()
264+
}
265+
266+
mf, _, err := helpOpenOrCreateManifestFile(dir, false, 0, deletionsThreshold)
267+
require.NoError(t, err)
268+
269+
cs := &pb.ManifestChangeSet{}
270+
for i := uint64(0); i < 1000; i++ {
271+
cs.Changes = append(cs.Changes,
272+
newCreateChange(i, 0, 0, 0),
273+
newDeleteChange(i),
274+
)
275+
}
276+
277+
// simulate 2 concurrent compaction threads
278+
n := 2
279+
wg := sync.WaitGroup{}
280+
wg.Add(n)
281+
for i := 0; i < n; i++ {
282+
go func() {
283+
defer wg.Done()
284+
require.NoError(t, mf.addChanges(cs.Changes))
285+
}()
286+
}
287+
wg.Wait()
288+
289+
require.NoError(t, mf.close())
290+
}

0 commit comments

Comments
 (0)