Skip to content

Commit

Permalink
sorter: provide more information on sorter IO errors (#1969) (#1976)
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Jul 6, 2021
1 parent 7d30157 commit 564be42
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 24 deletions.
58 changes: 36 additions & 22 deletions cdc/puller/sorter/file_backend.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2020 PingCAP, Inc.
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -24,6 +24,7 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/ticdc/cdc/model"
cerrors "github.com/pingcap/ticdc/pkg/errors"
"go.uber.org/zap"
)

Expand All @@ -46,12 +47,12 @@ type fileBackEnd struct {
func newFileBackEnd(fileName string, serde serializerDeserializer) (*fileBackEnd, error) {
f, err := os.Create(fileName)
if err != nil {
return nil, errors.Trace(err)
return nil, errors.Trace(wrapIOError(err))
}

err = f.Close()
if err != nil {
return nil, errors.Trace(err)
return nil, errors.Trace(wrapIOError(err))
}

log.Debug("new FileSorterBackEnd created", zap.String("filename", fileName))
Expand All @@ -65,7 +66,7 @@ func newFileBackEnd(fileName string, serde serializerDeserializer) (*fileBackEnd
func (f *fileBackEnd) reader() (backEndReader, error) {
fd, err := os.OpenFile(f.fileName, os.O_RDWR, 0o644)
if err != nil {
return nil, errors.Trace(err)
return nil, errors.Trace(wrapIOError(err))
}

atomic.AddInt64(&openFDCount, 1)
Expand All @@ -74,7 +75,7 @@ func (f *fileBackEnd) reader() (backEndReader, error) {
failpoint.Inject("sorterDebug", func() {
info, err := fd.Stat()
if err != nil {
failpoint.Return(nil, errors.Trace(err))
failpoint.Return(nil, errors.Trace(wrapIOError(err)))
}
totalSize = info.Size()
})
Expand All @@ -94,7 +95,7 @@ func (f *fileBackEnd) reader() (backEndReader, error) {

err = ret.readHeader()
if err != nil {
return nil, errors.Trace(err)
return nil, errors.Trace(wrapIOError(err))
}

return ret, nil
Expand All @@ -103,7 +104,7 @@ func (f *fileBackEnd) reader() (backEndReader, error) {
func (f *fileBackEnd) writer() (backEndWriter, error) {
fd, err := os.OpenFile(f.fileName, os.O_TRUNC|os.O_RDWR, 0o644)
if err != nil {
return nil, errors.Trace(err)
return nil, errors.Trace(wrapIOError(err))
}

atomic.AddInt64(&openFDCount, 1)
Expand All @@ -122,7 +123,7 @@ func (f *fileBackEnd) writer() (backEndWriter, error) {

err = ret.writeFileHeader()
if err != nil {
return nil, errors.Trace(err)
return nil, errors.Trace(wrapIOError(err))
}

return ret, nil
Expand All @@ -142,10 +143,10 @@ func (f *fileBackEnd) free() error {
err := os.Remove(f.fileName)
if err != nil {
failpoint.Inject("sorterDebug", func() {
failpoint.Return(errors.Trace(err))
failpoint.Return(errors.Trace(wrapIOError(err)))
})
// ignore this error in production to provide some resilience
log.Warn("fileBackEnd: failed to remove file", zap.Error(err))
log.Warn("fileBackEnd: failed to remove file", zap.Error(wrapIOError(err)))
}

return nil
Expand Down Expand Up @@ -223,7 +224,7 @@ func (r *fileBackEndReader) readNext() (*model.PolymorphicEvent, error) {
}
return nil, nil
}
return nil, errors.Trace(err)
return nil, errors.Trace(wrapIOError(err))
}

if m != blockMagic {
Expand All @@ -233,7 +234,7 @@ func (r *fileBackEndReader) readNext() (*model.PolymorphicEvent, error) {
var size uint32
err = binary.Read(r.reader, binary.LittleEndian, &size)
if err != nil {
return nil, errors.Trace(err)
return nil, errors.Trace(wrapIOError(err))
}

if cap(r.rawBytesBuf) < int(size) {
Expand All @@ -245,7 +246,7 @@ func (r *fileBackEndReader) readNext() (*model.PolymorphicEvent, error) {
// short reads are possible with bufio, hence the need for io.ReadFull
n, err := io.ReadFull(r.reader, r.rawBytesBuf)
if err != nil {
return nil, errors.Trace(err)
return nil, errors.Trace(wrapIOError(err))
}

if n != int(size) {
Expand Down Expand Up @@ -296,7 +297,7 @@ func (r *fileBackEndReader) resetAndClose() error {
failpoint.Inject("sorterDebug", func() {
info, err1 := r.f.Stat()
if err1 != nil {
failpoint.Return(errors.Trace(err))
failpoint.Return(errors.Trace(wrapIOError(err)))
}

log.Info("file debug info", zap.String("filename", info.Name()),
Expand Down Expand Up @@ -350,7 +351,7 @@ func (w *fileBackEndWriter) writeNext(event *model.PolymorphicEvent) error {
var err error
w.rawBytesBuf, err = w.backEnd.serde.marshal(event, w.rawBytesBuf)
if err != nil {
return errors.Trace(err)
return errors.Trace(wrapIOError(err))
}

size := len(w.rawBytesBuf)
Expand All @@ -360,20 +361,20 @@ func (w *fileBackEndWriter) writeNext(event *model.PolymorphicEvent) error {

err = binary.Write(w.writer, binary.LittleEndian, uint32(blockMagic))
if err != nil {
return errors.Trace(err)
return errors.Trace(wrapIOError(err))
}

err = binary.Write(w.writer, binary.LittleEndian, uint32(size))
if err != nil {
return errors.Trace(err)
return errors.Trace(wrapIOError(err))
}

// short writes are possible with bufio
offset := 0
for offset < size {
n, err := w.writer.Write(w.rawBytesBuf[offset:])
if err != nil {
return errors.Trace(err)
return errors.Trace(wrapIOError(err))
}
offset += n
}
Expand Down Expand Up @@ -402,24 +403,24 @@ func (w *fileBackEndWriter) flushAndClose() error {

err := w.writer.Flush()
if err != nil {
return errors.Trace(err)
return errors.Trace(wrapIOError(err))
}

_, err = w.f.Seek(numFileEntriesOffset, 0 /* relative to the beginning of the file */)
if err != nil {
return errors.Trace(err)
return errors.Trace(wrapIOError(err))
}

// write the total number of entries in the file to the header
err = binary.Write(w.f, binary.LittleEndian, uint64(w.eventsWritten))
if err != nil {
return errors.Trace(err)
return errors.Trace(wrapIOError(err))
}

err = w.f.Close()
if err != nil {
failpoint.Inject("sorterDebug", func() {
failpoint.Return(errors.Trace(err))
failpoint.Return(errors.Trace(wrapIOError(err)))
})
log.Warn("fileBackEndReader: could not close file", zap.Error(err))
return nil
Expand All @@ -435,3 +436,16 @@ func (w *fileBackEndWriter) flushAndClose() error {

return nil
}

// wrapIOError should be called when the error is to be returned to an caller outside this file and
// if the error could be caused by a filesystem-related error.
func wrapIOError(err error) error {
cause := errors.Cause(err)
switch cause.(type) {
case *os.PathError:
// We don't generate stack in this helper function to avoid confusion.
return cerrors.ErrUnifiedSorterIOError.FastGenByArgs(err.Error())
default:
return err
}
}
64 changes: 64 additions & 0 deletions cdc/puller/sorter/file_backend_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package sorter

import (
"io"
"os"

"github.com/pingcap/check"
"github.com/pingcap/ticdc/cdc/model"
cerrors "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/util/testleak"
)

type fileBackendSuite struct{}

var _ = check.SerialSuites(&fileBackendSuite{})

func (s *fileBackendSuite) TestWrapIOError(c *check.C) {
defer testleak.AfterTest(c)()

fullFile, err := os.OpenFile("/dev/full", os.O_RDWR, 0)
c.Assert(err, check.IsNil)
defer fullFile.Close() //nolint:errcheck
_, err = fullFile.WriteString("test")
wrapped := wrapIOError(err)
// tests that the error message gives the user some informative description
c.Assert(wrapped, check.ErrorMatches, ".*review the settings.*no space.*")

eof := wrapIOError(io.EOF)
// tests that the function does not change io.EOF
c.Assert(eof, check.Equals, io.EOF)
}

func (s *fileBackendSuite) TestNoSpace(c *check.C) {
defer testleak.AfterTest(c)()

fb := &fileBackEnd{
fileName: "/dev/full",
serde: &msgPackGenSerde{},
}
w, err := fb.writer()
c.Assert(err, check.IsNil)

err = w.writeNext(model.NewPolymorphicEvent(generateMockRawKV(0)))
if err == nil {
// Due to write buffering, `writeNext` might not return an error when the filesystem is full.
err = w.flushAndClose()
}

c.Assert(err, check.ErrorMatches, ".*review the settings.*no space.*")
c.Assert(cerrors.ErrUnifiedSorterIOError.Equal(err), check.IsTrue)
}
2 changes: 1 addition & 1 deletion errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -773,7 +773,7 @@ unified sorter backend is terminating

["CDC:ErrUnifiedSorterIOError"]
error = '''
unified sorter IO error
unified sorter IO error. Make sure your sort-dir is configured correctly by passing a valid argument or toml file to `cdc server`, or if you use TiUP, review the settings in `tiup cluster edit-config`. Details: %s
'''

["CDC:ErrUnknownKVEventType"]
Expand Down
2 changes: 1 addition & 1 deletion pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ var (
ErrUnifiedSorterBackendTerminating = errors.Normalize("unified sorter backend is terminating", errors.RFCCodeText("CDC:ErrUnifiedSorterBackendTerminating"))
ErrIllegalUnifiedSorterParameter = errors.Normalize("illegal parameter for unified sorter: %s", errors.RFCCodeText("CDC:ErrIllegalUnifiedSorterParameter"))
ErrAsyncIOCancelled = errors.Normalize("asynchronous IO operation is cancelled. Internal use only, report a bug if seen in log", errors.RFCCodeText("CDC:ErrAsyncIOCancelled"))
ErrUnifiedSorterIOError = errors.Normalize("unified sorter IO error", errors.RFCCodeText("CDC:ErrUnifiedSorterIOError"))
ErrUnifiedSorterIOError = errors.Normalize("unified sorter IO error. Make sure your sort-dir is configured correctly by passing a valid argument or toml file to `cdc server`, or if you use TiUP, review the settings in `tiup cluster edit-config`. Details: %s", errors.RFCCodeText("CDC:ErrUnifiedSorterIOError"))
ErrConflictingFileLocks = errors.Normalize("file lock conflict: %s", errors.RFCCodeText("ErrConflictingFileLocks"))
ErrSortDirLockError = errors.Normalize("error encountered when locking sort-dir", errors.RFCCodeText("ErrSortDirLockError"))

Expand Down

0 comments on commit 564be42

Please sign in to comment.