Skip to content

Commit

Permalink
wal: add Verify function to perform corruption check on wal contents
Browse files Browse the repository at this point in the history
Signed-off-by: Shreyas Rao <shreyas.sriganesh.rao@sap.com>
  • Loading branch information
shreyas-s-rao committed Mar 8, 2019
1 parent a943ad0 commit ba5ed69
Showing 1 changed file with 135 additions and 32 deletions.
167 changes: 135 additions & 32 deletions wal/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,66 +299,85 @@ func OpenForRead(lg *zap.Logger, dirpath string, snap walpb.Snapshot) (*WAL, err
}

func openAtIndex(lg *zap.Logger, dirpath string, snap walpb.Snapshot, write bool) (*WAL, error) {
names, err := readWALNames(lg, dirpath)
names, nameIndex, err := selectWALFiles(lg, dirpath, snap)
if err != nil {
return nil, err
}

rs, ls, closer, err := openWALFiles(lg, dirpath, names, nameIndex, write)
if err != nil {
return nil, err
}

// create a WAL ready for reading
w := &WAL{
dir: dirpath,
start: snap,
decoder: newDecoder(rs...),
readClose: closer,
locks: ls,
}

if write {
// write reuses the file descriptors from read; don't close so
// WAL can append without dropping the file lock
w.readClose = nil
if _, _, err := parseWALName(filepath.Base(w.tail().Name())); err != nil {
closer()
return nil, err
}
w.fp = newFilePipeline(lg, w.dir, SegmentSizeBytes)
}

return w, nil
}

func selectWALFiles(lg *zap.Logger, dirpath string, snap walpb.Snapshot) (names []string, nameIndex int, err error) {
names, err = readWALNames(lg, dirpath)
if err != nil {
return
}

nameIndex, ok := searchIndex(lg, names, snap.Index)
if !ok || !isValidSeq(lg, names[nameIndex:]) {
return nil, ErrFileNotFound
err = ErrFileNotFound
return
}

// open the wal files
return
}

func openWALFiles(lg *zap.Logger, dirpath string, names []string, nameIndex int, write bool) (rs []io.Reader, ls []*fileutil.LockedFile, closer func() error, err error) {
var l *fileutil.LockedFile
var rf *os.File
rcs := make([]io.ReadCloser, 0)
rs := make([]io.Reader, 0)
ls := make([]*fileutil.LockedFile, 0)
rs = make([]io.Reader, 0)
ls = make([]*fileutil.LockedFile, 0)
for _, name := range names[nameIndex:] {
p := filepath.Join(dirpath, name)
if write {
l, err := fileutil.TryLockFile(p, os.O_RDWR, fileutil.PrivateFileMode)
l, err = fileutil.TryLockFile(p, os.O_RDWR, fileutil.PrivateFileMode)
if err != nil {
closeAll(rcs...)
return nil, err
return
}
ls = append(ls, l)
rcs = append(rcs, l)
} else {
rf, err := os.OpenFile(p, os.O_RDONLY, fileutil.PrivateFileMode)
rf, err = os.OpenFile(p, os.O_RDONLY, fileutil.PrivateFileMode)
if err != nil {
closeAll(rcs...)
return nil, err
return
}
ls = append(ls, nil)
rcs = append(rcs, rf)
}
rs = append(rs, rcs[len(rcs)-1])
}

closer := func() error { return closeAll(rcs...) }

// create a WAL ready for reading
w := &WAL{
lg: lg,
dir: dirpath,
start: snap,
decoder: newDecoder(rs...),
readClose: closer,
locks: ls,
}

if write {
// write reuses the file descriptors from read; don't close so
// WAL can append without dropping the file lock
w.readClose = nil
if _, _, err := parseWALName(filepath.Base(w.tail().Name())); err != nil {
closer()
return nil, err
}
w.fp = newFilePipeline(w.lg, w.dir, SegmentSizeBytes)
}
closer = func() error { return closeAll(rcs...) }

return w, nil
return
}

// ReadAll reads out records of the current WAL.
Expand Down Expand Up @@ -480,6 +499,90 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.
return metadata, state, ents, err
}

// Verify reads through the records of the WAL and verifies that it is not corrupt.
// Unlike ReadAll, it does not return the entries in the WAL.
// It creates a new decoder to read through the records of the WAL.
// It cannot be called after a ReadAll call on the WAL.
// If it cannot read out the expected snap, it will return ErrSnapshotNotFound.
// If the loaded snap doesn't match with the expected one, it will
// return error ErrSnapshotMismatch.
func (w *WAL) Verify() error {
w.mu.Lock()
defer w.mu.Unlock()

var metadata []byte
var err error
var match bool

rec := &walpb.Record{}

// create a new decoder
names, nameIndex, err := selectWALFiles(w.lg, w.dir, w.start)
if err != nil {
return err
}
rs, _, closer, err := openWALFiles(w.lg, w.dir, names, nameIndex, false)
if err != nil {
return err
}
decoder := newDecoder(rs...)

for err = decoder.decode(rec); err == nil; err = decoder.decode(rec) {
switch rec.Type {
case metadataType:
if metadata != nil && !bytes.Equal(metadata, rec.Data) {
return ErrMetadataConflict
}
metadata = rec.Data
case crcType:
crc := decoder.crc.Sum32()
// Current crc of decoder must match the crc of the record.
// We need not match 0 crc, since the decoder is a new one at this point.
if crc != 0 && rec.Validate(crc) != nil {
return ErrCRCMismatch
}
decoder.updateCRC(rec.Crc)
case snapshotType:
var snap walpb.Snapshot
pbutil.MustUnmarshal(&snap, rec.Data)
if snap.Index == w.start.Index {
if snap.Term != w.start.Term {
return ErrSnapshotMismatch
}
match = true
}
// We ignore all entry and state type records as these
// are not necessary for validating the WAL contents
case entryType:
case stateType:
default:
return fmt.Errorf("unexpected block type %d", rec.Type)
}
}

if closer != nil {
closer()
}

if !match {
return ErrSnapshotNotFound
}

if w.tail() != nil {
// We must read all of the entries if WAL is opened in write mode.
if err != io.EOF {
return err
}
} else {
// We do not have to read out all entries in read mode.
if err != io.EOF && err != io.ErrUnexpectedEOF {
return err
}
}

return nil
}

// cut closes current file written and creates a new one ready to append.
// cut first creates a temp wal file and writes necessary headers into it.
// Then cut atomically rename temp wal file to a wal file.
Expand Down

0 comments on commit ba5ed69

Please sign in to comment.