Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions index/scorch/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,8 +339,8 @@ func (s *Scorch) planMergeAtSnapshot(ctx context.Context,

atomic.AddUint64(&s.stats.TotFileMergeZapBeg, 1)
prevBytesReadTotal := cumulateBytesRead(segmentsToMerge)
newDocNums, _, err := s.segPlugin.Merge(segmentsToMerge, docsToDrop, path,
cw.cancelCh, s)
newDocNums, _, err := s.segPlugin.MergeEx(segmentsToMerge, docsToDrop, path,
cw.cancelCh, s, s.segmentConfig)
atomic.AddUint64(&s.stats.TotFileMergeZapEnd, 1)

fileMergeZapTime := uint64(time.Since(fileMergeZapStartTime))
Expand All @@ -358,7 +358,7 @@ func (s *Scorch) planMergeAtSnapshot(ctx context.Context,
return fmt.Errorf("merging failed: %v", err)
}

seg, err = s.segPlugin.Open(path)
seg, err = s.segPlugin.OpenEx(path, s.segmentConfig)
if err != nil {
s.unmarkIneligibleForRemoval(filename)
atomic.AddUint64(&s.stats.TotFileMergePlanTasksErr, 1)
Expand Down Expand Up @@ -469,7 +469,7 @@ func (s *Scorch) mergeSegmentBases(snapshot *IndexSnapshot,
path := s.path + string(os.PathSeparator) + filename

newDocNums, _, err :=
s.segPlugin.Merge(sbs, sbsDrops, path, s.closeCh, s)
s.segPlugin.MergeEx(sbs, sbsDrops, path, s.closeCh, s, s.segmentConfig)

atomic.AddUint64(&s.stats.TotMemMergeZapEnd, 1)

Expand All @@ -484,7 +484,7 @@ func (s *Scorch) mergeSegmentBases(snapshot *IndexSnapshot,
return nil, 0, err
}

seg, err := s.segPlugin.Open(path)
seg, err := s.segPlugin.OpenEx(path, s.segmentConfig)
if err != nil {
atomic.AddUint64(&s.stats.TotMemMergeErr, 1)
return nil, 0, err
Expand Down
4 changes: 2 additions & 2 deletions index/scorch/persister.go
Original file line number Diff line number Diff line change
Expand Up @@ -662,7 +662,7 @@ func (s *Scorch) persistSnapshotDirect(snapshot *IndexSnapshot) (err error) {
}
}()
for segmentID, path := range newSegmentPaths {
newSegments[segmentID], err = s.segPlugin.Open(path)
newSegments[segmentID], err = s.segPlugin.OpenEx(path, s.segmentConfig)
if err != nil {
return fmt.Errorf("error opening new segment at %s, %v", path, err)
}
Expand Down Expand Up @@ -872,7 +872,7 @@ func (s *Scorch) loadSegment(segmentBucket *bolt.Bucket) (*SegmentSnapshot, erro
return nil, fmt.Errorf("segment path missing")
}
segmentPath := s.path + string(os.PathSeparator) + string(pathBytes)
segment, err := s.segPlugin.Open(segmentPath)
segment, err := s.segPlugin.OpenEx(segmentPath, s.segmentConfig)
if err != nil {
return nil, fmt.Errorf("error opening bolt segment: %v", err)
}
Expand Down
4 changes: 3 additions & 1 deletion index/scorch/scorch.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type Scorch struct {
readOnly bool
version uint8
config map[string]interface{}
segmentConfig map[string]interface{}
analysisQueue *index.AnalysisQueue
path string

Expand Down Expand Up @@ -121,6 +122,7 @@ func NewScorch(storeName string,
forceMergeRequestCh: make(chan *mergerCtrl, 1),
segPlugin: defaultSegmentPlugin,
copyScheduled: map[string]int{},
segmentConfig: make(map[string]interface{}),
}

forcedSegmentType, forcedSegmentVersion, err := configForceSegmentTypeVersion(config)
Expand Down Expand Up @@ -441,7 +443,7 @@ func (s *Scorch) Batch(batch *index.Batch) (err error) {
stats := newFieldStats()

if len(analysisResults) > 0 {
newSegment, bufBytes, err = s.segPlugin.New(analysisResults)
newSegment, bufBytes, err = s.segPlugin.NewEx(analysisResults, s.segmentConfig)
if err != nil {
return err
}
Expand Down
8 changes: 8 additions & 0 deletions index/scorch/segment_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,14 @@ type SegmentPlugin interface {
// New takes a set of Documents and turns them into a new Segment
New(results []index.Document) (segment.Segment, uint64, error)

NewEx(results []index.Document, config map[string]interface{}) (segment.Segment, uint64, error)

// Open attempts to open the file at the specified path and
// return the corresponding Segment
Open(path string) (segment.Segment, error)

OpenEx(path string, config map[string]interface{}) (segment.Segment, error)

// Merge takes a set of Segments, and creates a new segment on disk at
// the specified path.
// Drops is a set of bitmaps (one for each segment) indicating which
Expand All @@ -67,6 +71,10 @@ type SegmentPlugin interface {
Merge(segments []segment.Segment, drops []*roaring.Bitmap, path string,
closeCh chan struct{}, s segment.StatsReporter) (
[][]uint64, uint64, error)

MergeEx(segments []segment.Segment, drops []*roaring.Bitmap, path string,
closeCh chan struct{}, s segment.StatsReporter, config map[string]interface{}) (
[][]uint64, uint64, error)
}

var supportedSegmentPlugins map[string]map[uint32]SegmentPlugin
Expand Down