Skip to content

Commit def5474

Browse files
authored
Merge pull request #703 from michaelwilner/resend-request-on-filestore-fix-deadlock
Iterate messages in filestore opens a separate file to avoid deadlock
2 parents bc1d3ac + c93c8d9 commit def5474

File tree

1 file changed

+29
-15
lines changed

1 file changed

+29
-15
lines changed

store/file/file_store.go

Lines changed: 29 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -329,14 +329,8 @@ func (store *fileStore) SaveMessage(seqNum int, msg []byte) error {
329329
return fmt.Errorf("unable to write to file: %s: %s", store.bodyFname, err.Error())
330330
}
331331
if store.fileSync {
332-
if err := store.bodyFile.Sync(); err != nil {
333-
return fmt.Errorf("unable to flush file: %s: %s", store.bodyFname, err.Error())
334-
}
335-
if err := store.headerFile.Sync(); err != nil {
336-
return fmt.Errorf("unable to flush file: %s: %s", store.headerFname, err.Error())
337-
}
332+
return store.syncBodyAndHeaderFilesLocked()
338333
}
339-
340334
return nil
341335
}
342336

@@ -348,24 +342,44 @@ func (store *fileStore) SaveMessageAndIncrNextSenderMsgSeqNum(seqNum int, msg []
348342
return store.IncrNextSenderMsgSeqNum()
349343
}
350344

351-
func (store *fileStore) IterateMessages(beginSeqNum, endSeqNum int, cb func([]byte) error) error {
352-
store.fileMu.Lock()
353-
defer store.fileMu.Unlock()
354-
355-
// Sync files and seek to start of header file
345+
func (store *fileStore) syncBodyAndHeaderFilesLocked() error {
356346
if err := store.bodyFile.Sync(); err != nil {
357347
return fmt.Errorf("unable to flush file: %s: %s", store.bodyFname, err.Error())
358348
} else if err = store.headerFile.Sync(); err != nil {
359349
return fmt.Errorf("unable to flush file: %s: %s", store.headerFname, err.Error())
360-
} else if _, err = store.headerFile.Seek(0, io.SeekStart); err != nil {
350+
}
351+
return nil
352+
}
353+
354+
func (store *fileStore) IterateMessages(beginSeqNum, endSeqNum int, cb func([]byte) error) error {
355+
// Sync files
356+
store.fileMu.Lock()
357+
err := store.syncBodyAndHeaderFilesLocked()
358+
store.fileMu.Unlock()
359+
if err != nil {
360+
return err
361+
}
362+
363+
// Open a read only view to body and header file
364+
bodyFile, err := openOrCreateFile(store.bodyFname, 0440)
365+
if err != nil {
366+
return err
367+
}
368+
defer func() { _ = bodyFile.Close() }()
369+
headerFile, err := openOrCreateFile(store.headerFname, 0440)
370+
if err != nil {
371+
return err
372+
}
373+
defer func() { _ = headerFile.Close() }()
374+
if _, err = headerFile.Seek(0, io.SeekStart); err != nil {
361375
return fmt.Errorf("unable to seek to start of file: %s: %s", store.headerFname, err.Error())
362376
}
363377

364378
// Iterate over the header file
365379
for {
366380
var seqNum, size int
367381
var offset int64
368-
if cnt, err := fmt.Fscanf(store.headerFile, "%d,%d,%d\n", &seqNum, &offset, &size); err != nil {
382+
if cnt, err := fmt.Fscanf(headerFile, "%d,%d,%d\n", &seqNum, &offset, &size); err != nil {
369383
if errors.Is(err, io.EOF) {
370384
break
371385
}
@@ -379,7 +393,7 @@ func (store *fileStore) IterateMessages(beginSeqNum, endSeqNum int, cb func([]by
379393
}
380394
// Otherwise process the file
381395
msg := make([]byte, size)
382-
if _, err := store.bodyFile.ReadAt(msg, offset); err != nil {
396+
if _, err := bodyFile.ReadAt(msg, offset); err != nil {
383397
return fmt.Errorf("unable to read from file: %s: %s", store.bodyFname, err.Error())
384398
} else if err = cb(msg); err != nil {
385399
return err

0 commit comments

Comments
 (0)