Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion index.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (b *Batch) DeleteInternal(key []byte) {
// Size returns the total number of operations inside the batch
// including normal index operations and internal operations.
func (b *Batch) Size() int {
return len(b.internal.IndexOps) + len(b.internal.InternalOps)
return b.internal.IndexOps.Len() + b.internal.InternalOps.Len()
}

// String prints a user friendly string representation of what
Expand Down
161 changes: 138 additions & 23 deletions index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"encoding/json"
"fmt"
"reflect"
"sync"

"github.com/blevesearch/bleve/document"
"github.com/blevesearch/bleve/index/store"
Expand Down Expand Up @@ -248,75 +249,189 @@ type DocIDReader interface {
Close() error
}

type IndexOpsMap struct {
sync.RWMutex
m map[string]*document.Document
}

func (m *IndexOpsMap) Delete(key string) {
m.Lock()
defer m.Unlock()
delete(m.m, key)
}

func (m *IndexOpsMap) Load(key string) (*document.Document, bool) {
m.RLock()
defer m.RUnlock()

v, ok := m.m[key]
return v, ok
}

func (m *IndexOpsMap) Range(f func(key string, value *document.Document) bool) {
m.RLock()
_m := m.m
m.RUnlock()

for k, v := range _m {
if f(k, v) == false {
break
}
}
}

func (m *IndexOpsMap) Store(key string, value *document.Document) {
m.Lock()
defer m.Unlock()

m.m[key] = value
}

func (m *IndexOpsMap) Len() int {
m.RLock()
defer m.RUnlock()

return len(m.m)
}

func (m *IndexOpsMap) reset() {
m.Lock()
defer m.Unlock()

m.m = make(map[string]*document.Document)
}

type InternalOpsMap struct {
sync.RWMutex
m map[string][]byte
}

func (m *InternalOpsMap) Delete(key string) {
m.Lock()
defer m.Unlock()

delete(m.m, key)
}

func (m *InternalOpsMap) Load(key string) ([]byte, bool) {
m.RLock()
defer m.RUnlock()

v, ok := m.m[key]
return v, ok
}

func (m *InternalOpsMap) Range(f func(key string, value []byte) bool) {
m.RLock()
_m := m.m
m.RUnlock()

for k, v := range _m {
if f(k, v) == false {
break
}
}
}
func (m *InternalOpsMap) Store(key string, value []byte) {
m.Lock()
defer m.Unlock()

m.m[key] = value
}

func (m *InternalOpsMap) Len() int {
m.RLock()
defer m.RUnlock()

return len(m.m)
}

func (m *InternalOpsMap) reset() {
m.Lock()
defer m.Unlock()

m.m = make(map[string][]byte)
}

type Batch struct {
IndexOps map[string]*document.Document
InternalOps map[string][]byte
IndexOps IndexOpsMap
InternalOps InternalOpsMap
}

func NewBatch() *Batch {
return &Batch{
IndexOps: make(map[string]*document.Document),
InternalOps: make(map[string][]byte),
IndexOps: IndexOpsMap{
m: make(map[string]*document.Document),
},
InternalOps: InternalOpsMap{
m: make(map[string][]byte),
},
}
}

func (b *Batch) Update(doc *document.Document) {
b.IndexOps[doc.ID] = doc
b.IndexOps.Store(doc.ID, doc)
}

func (b *Batch) Delete(id string) {
b.IndexOps[id] = nil
b.IndexOps.Store(id, nil)
}

func (b *Batch) SetInternal(key, val []byte) {
b.InternalOps[string(key)] = val
b.InternalOps.Store(string(key), val)
}

func (b *Batch) DeleteInternal(key []byte) {
b.InternalOps[string(key)] = nil
b.InternalOps.Store(string(key), nil)
}

func (b *Batch) String() string {
rv := fmt.Sprintf("Batch (%d ops, %d internal ops)\n", len(b.IndexOps), len(b.InternalOps))
for k, v := range b.IndexOps {
rv := fmt.Sprintf("Batch (%d ops, %d internal ops)\n", b.IndexOps.Len(), b.InternalOps.Len())

b.IndexOps.Range(func(k string, v *document.Document) bool {
if v != nil {
rv += fmt.Sprintf("\tINDEX - '%s'\n", k)
} else {
rv += fmt.Sprintf("\tDELETE - '%s'\n", k)
}
}
for k, v := range b.InternalOps {
return true
})
b.InternalOps.Range(func(k string, v []byte) bool {
if v != nil {
rv += fmt.Sprintf("\tSET INTERNAL - '%s'\n", k)
} else {
rv += fmt.Sprintf("\tDELETE INTERNAL - '%s'\n", k)
}
}
return true
})
return rv
}

func (b *Batch) Reset() {
b.IndexOps = make(map[string]*document.Document)
b.InternalOps = make(map[string][]byte)
b.IndexOps.reset()
b.InternalOps.reset()
}

func (b *Batch) Merge(o *Batch) {
for k, v := range o.IndexOps {
b.IndexOps[k] = v
}
for k, v := range o.InternalOps {
b.InternalOps[k] = v
}
o.IndexOps.Range(func(k string, v *document.Document) bool {
b.IndexOps.Store(k, v)
return true
})
o.InternalOps.Range(func(k string, v []byte) bool {
b.InternalOps.Store(k, v)
return true
})
}

func (b *Batch) TotalDocSize() int {
var s int
for k, v := range b.IndexOps {
b.IndexOps.Range(func(k string, v *document.Document) bool {
if v != nil {
s += v.Size() + size.SizeOfString
}
s += len(k)
}
return true
})
return s
}

Expand Down
8 changes: 5 additions & 3 deletions index/scorch/introducer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"sync/atomic"

"github.com/RoaringBitmap/roaring"
"github.com/blevesearch/bleve/index"
"github.com/blevesearch/bleve/index/scorch/segment"
"github.com/blevesearch/bleve/index/scorch/segment/zap"
)
Expand All @@ -28,7 +29,7 @@ type segmentIntroduction struct {
data segment.Segment
obsoletes map[uint64]*roaring.Bitmap
ids []string
internal map[string][]byte
internal *index.InternalOpsMap

applied chan error
persisted chan error
Expand Down Expand Up @@ -200,13 +201,14 @@ func (s *Scorch) introduceSegment(next *segmentIntroduction) error {
newSnapshot.internal[key] = oldVal
}
// set new values and apply deletes
for key, newVal := range next.internal {
next.internal.Range(func(key string, newVal []byte) bool {
if newVal != nil {
newSnapshot.internal[key] = newVal
} else {
delete(newSnapshot.internal, key)
}
}
return true
})

newSnapshot.updateSize()
s.rootLock.Lock()
Expand Down
16 changes: 9 additions & 7 deletions index/scorch/scorch.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,13 +291,13 @@ func (s *Scorch) Batch(batch *index.Batch) (err error) {
s.fireEvent(EventKindBatchIntroduction, time.Since(start))
}()

resultChan := make(chan *index.AnalysisResult, len(batch.IndexOps))
resultChan := make(chan *index.AnalysisResult, batch.IndexOps.Len())

var numUpdates uint64
var numDeletes uint64
var numPlainTextBytes uint64
var ids []string
for docID, doc := range batch.IndexOps {
batch.IndexOps.Range(func(docID string, doc *document.Document) bool {
if doc != nil {
// insert _id field
doc.AddField(document.NewTextFieldCustom("_id", nil, []byte(doc.ID), document.IndexField|document.StoreField, nil))
Expand All @@ -307,18 +307,20 @@ func (s *Scorch) Batch(batch *index.Batch) (err error) {
numDeletes++
}
ids = append(ids, docID)
}
return true
})

// FIXME could sort ids list concurrent with analysis?

go func() {
for _, doc := range batch.IndexOps {
batch.IndexOps.Range(func(_ string, doc *document.Document) bool {
if doc != nil {
aw := index.NewAnalysisWork(s, doc, resultChan)
// put the work on the queue
s.analysisQueue.Queue(aw)
}
}
return true
})
}()

// wait for analysis result
Expand Down Expand Up @@ -355,7 +357,7 @@ func (s *Scorch) Batch(batch *index.Batch) (err error) {
atomic.AddUint64(&s.stats.TotBatchesEmpty, 1)
}

err = s.prepareSegment(newSegment, ids, batch.InternalOps)
err = s.prepareSegment(newSegment, ids, &batch.InternalOps)
if err != nil {
if newSegment != nil {
_ = newSegment.Close()
Expand All @@ -375,7 +377,7 @@ func (s *Scorch) Batch(batch *index.Batch) (err error) {
}

func (s *Scorch) prepareSegment(newSegment segment.Segment, ids []string,
internalOps map[string][]byte) error {
internalOps *index.InternalOpsMap) error {

// new introduction
introduction := &segmentIntroduction{
Expand Down
4 changes: 2 additions & 2 deletions index/upsidedown/benchmark_common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func CommonBenchmarkIndexBatch(b *testing.B, storeName string, storeConfig map[s
batch := index.NewBatch()
for j := 0; j < 1000; j++ {
if j%batchSize == 0 {
if len(batch.IndexOps) > 0 {
if batch.IndexOps.Len() > 0 {
err := idx.Batch(batch)
if err != nil {
b.Fatal(err)
Expand All @@ -129,7 +129,7 @@ func CommonBenchmarkIndexBatch(b *testing.B, storeName string, storeConfig map[s
batch.Update(indexDocument)
}
// close last batch
if len(batch.IndexOps) > 0 {
if batch.IndexOps.Len() > 0 {
err := idx.Batch(batch)
if err != nil {
b.Fatal(err)
Expand Down
Loading