Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions index/scorch/introducer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
segment "github.com/blevesearch/scorch_segment_api/v2"
)

const introducer = "introducer"

type segmentIntroduction struct {
id uint64
data segment.Segment
Expand All @@ -50,10 +52,11 @@ type epochWatcher struct {
func (s *Scorch) introducerLoop() {
defer func() {
if r := recover(); r != nil {
s.fireAsyncError(&AsyncPanicError{
Source: "introducer",
Path: s.path,
})
s.fireAsyncError(NewScorchError(
introducer,
fmt.Sprintf("panic: %v, path: %s", r, s.path),
ErrAsyncPanic,
))
}

s.asyncTasks.Done()
Expand Down
24 changes: 18 additions & 6 deletions index/scorch/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,16 @@ import (
segment "github.com/blevesearch/scorch_segment_api/v2"
)

const merger = "merger"

func (s *Scorch) mergerLoop() {
defer func() {
if r := recover(); r != nil {
s.fireAsyncError(&AsyncPanicError{
Source: "merger",
Path: s.path,
})
s.fireAsyncError(NewScorchError(
merger,
fmt.Sprintf("panic: %v, path: %s", r, s.path),
ErrAsyncPanic,
))
}

s.asyncTasks.Done()
Expand All @@ -45,7 +48,11 @@ func (s *Scorch) mergerLoop() {
var ctrlMsg *mergerCtrl
mergePlannerOptions, err := s.parseMergePlannerOptions()
if err != nil {
s.fireAsyncError(fmt.Errorf("mergePlannerOption json parsing err: %v", err))
s.fireAsyncError(NewScorchError(
merger,
fmt.Sprintf("mergerPlannerOptions json parsing err: %v", err),
ErrOptionsParse,
))
return
}
ctrlMsgDflt := &mergerCtrl{ctx: context.Background(),
Expand Down Expand Up @@ -110,7 +117,12 @@ OUTER:
ctrlMsg = nil
break OUTER
}
s.fireAsyncError(fmt.Errorf("merging err: %v", err))

s.fireAsyncError(NewScorchError(
merger,
fmt.Sprintf("merging err: %v", err),
ErrPersist,
))
_ = ourSnapshot.DecRef()
atomic.AddUint64(&s.stats.TotFileMergeLoopErr, 1)
continue OUTER
Expand Down
35 changes: 27 additions & 8 deletions index/scorch/persister.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ import (
bolt "go.etcd.io/bbolt"
)

const persister = "persister"

// DefaultPersisterNapTimeMSec is kept to zero as this helps in direct
// persistence of segments with the default safe batch option.
// If the default safe batch option results in high number of
Expand Down Expand Up @@ -95,10 +97,11 @@ type notificationChan chan struct{}
func (s *Scorch) persisterLoop() {
defer func() {
if r := recover(); r != nil {
s.fireAsyncError(&AsyncPanicError{
Source: "persister",
Path: s.path,
})
s.fireAsyncError(NewScorchError(
persister,
fmt.Sprintf("panic: %v, path: %s", r, s.path),
ErrAsyncPanic,
))
}

s.asyncTasks.Done()
Expand All @@ -112,7 +115,11 @@ func (s *Scorch) persisterLoop() {

po, err := s.parsePersisterOptions()
if err != nil {
s.fireAsyncError(fmt.Errorf("persisterOptions json parsing err: %v", err))
s.fireAsyncError(NewScorchError(
persister,
fmt.Sprintf("persisterOptions json parsing err: %v", err),
ErrOptionsParse,
))
return
}

Expand Down Expand Up @@ -173,7 +180,11 @@ OUTER:
// the retry attempt
unpersistedCallbacks = append(unpersistedCallbacks, ourPersistedCallbacks...)

s.fireAsyncError(fmt.Errorf("got err persisting snapshot: %v", err))
s.fireAsyncError(NewScorchError(
persister,
fmt.Sprintf("got err persisting snapshot: %v", err),
ErrPersist,
))
_ = ourSnapshot.DecRef()
atomic.AddUint64(&s.stats.TotPersistLoopErr, 1)
continue OUTER
Expand Down Expand Up @@ -1060,13 +1071,21 @@ func (s *Scorch) loadSegment(segmentBucket *bolt.Bucket) (*SegmentSnapshot, erro
func (s *Scorch) removeOldData() {
removed, err := s.removeOldBoltSnapshots()
if err != nil {
s.fireAsyncError(fmt.Errorf("got err removing old bolt snapshots: %v", err))
s.fireAsyncError(NewScorchError(
persister,
fmt.Sprintf("got err removing old bolt snapshots: %v", err),
ErrCleanup,
))
}
atomic.AddUint64(&s.stats.TotSnapshotsRemovedFromMetaStore, uint64(removed))

err = s.removeOldZapFiles()
if err != nil {
s.fireAsyncError(fmt.Errorf("got err removing old zap files: %v", err))
s.fireAsyncError(NewScorchError(
persister,
fmt.Sprintf("got err removing old zap files: %v", err),
ErrCleanup,
))
}
}

Expand Down
43 changes: 37 additions & 6 deletions index/scorch/scorch.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,45 @@ type Scorch struct {
spatialPlugin index.SpatialAnalyzerPlugin
}

// AsyncPanicError is passed to scorch asyncErrorHandler when panic occurs in scorch background process
type AsyncPanicError struct {
Source string
Path string
type ScorchErrorType string

func (t ScorchErrorType) Error() string {
return string(t)
}

// ErrType values for ScorchError
const (
ErrAsyncPanic = ScorchErrorType("async panic error")
ErrPersist = ScorchErrorType("persist error")
ErrCleanup = ScorchErrorType("cleanup error")
ErrOptionsParse = ScorchErrorType("options parse error")
)

// ScorchError is passed to onAsyncError when errors are
// fired from scorch background processes
type ScorchError struct {
Source string
ErrMsg string
ErrType ScorchErrorType
}

func (e *AsyncPanicError) Error() string {
return fmt.Sprintf("%s panic when processing %s", e.Source, e.Path)
func (e *ScorchError) Error() string {
return fmt.Sprintf("source: %s, %v: %s", e.Source, e.ErrType, e.ErrMsg)
}

// Lets the onAsyncError function verify what type of
// error is fired using errors.Is(...). This lets the function
// handle errors differently.
func (e *ScorchError) Unwrap() error {
return e.ErrType
}

func NewScorchError(source, errMsg string, errType ScorchErrorType) error {
return &ScorchError{
Source: source,
ErrMsg: errMsg,
ErrType: errType,
}
}

type internalStats struct {
Expand Down
Loading