Skip to content

feat: allow scaling of the search service #11029

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions services/search/pkg/config/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@ type Engine struct {
// EngineBleve configures the bleve engine
type EngineBleve struct {
Datapath string `yaml:"data_path" env:"SEARCH_ENGINE_BLEVE_DATA_PATH" desc:"The directory where the filesystem will store search data. If not defined, the root directory derives from $OCIS_BASE_DATA_PATH/search." introductionVersion:"pre5.0"`
Scale bool `yaml:"scale" env:"SEARCH_ENGINE_BLEVE_SCALE" desc:"Enable scaling of the search index (bleve). If set to 'true', the instance of the search service will no longer have exclusive write access to the index. Note when scaling search, all instances of the search service must be set to true! For 'false', which is the default, the running search service will lock out other processes trying to access the index as long it is running." introductionVersion:"%%NEXT%%"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Scale bool `yaml:"scale" env:"SEARCH_ENGINE_BLEVE_SCALE" desc:"Enable scaling of the search index (bleve). If set to 'true', the instance of the search service will no longer have exclusive write access to the index. Note when scaling search, all instances of the search service must be set to true! For 'false', which is the default, the running search service will lock out other processes trying to access the index as long it is running." introductionVersion:"%%NEXT%%"`
Scale bool `yaml:"scale" env:"SEARCH_ENGINE_BLEVE_SCALE" desc:"Enable scaling of the search index (bleve). If set to 'true', the instance of the search service will no longer have exclusive write access to the index. Note when scaling search, all instances of the search service must be set to true! For 'false', which is the default, the running search service will lock out other processes trying to access the index exclusively as long it is running." introductionVersion:"%%NEXT%%"`

I missed one word 😅

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure... with the new wording, it seems other processes might be able to access the index as long as they aren't requesting exclusive access.

To clarify, oCIS' search service will request exclusive access (write mode), which will cause other processes to be locked out regardless of the access type.
Also, with the Scale flag active, the difference is that oCIS will connect and disconnect with different accesses (including exclusive access) multiple times. Exclusive access will be requested (depending on the operation), and it will be granted if possible, but the connection won't last long, so other connections will be able to access the index afterwards.

}
129 changes: 85 additions & 44 deletions services/search/pkg/engine/bleve.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"errors"
"math"
"path"
"path/filepath"
"reflect"
"strings"
"time"
Expand All @@ -31,44 +30,44 @@ import (
searchMessage "github.com/owncloud/ocis/v2/protogen/gen/ocis/messages/search/v0"
searchService "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/search/v0"
"github.com/owncloud/ocis/v2/services/search/pkg/content"
bleveEngine "github.com/owncloud/ocis/v2/services/search/pkg/engine/bleve"
searchQuery "github.com/owncloud/ocis/v2/services/search/pkg/query"
)

// Bleve represents a search engine which utilizes bleve to search and store resources.
type Bleve struct {
index bleve.Index
indexGetter bleveEngine.IndexGetter
queryCreator searchQuery.Creator[query.Query]
}

// NewBleveIndex returns a new bleve index
// given path must exist.
func NewBleveIndex(root string) (bleve.Index, error) {
destination := filepath.Join(root, "bleve")
index, err := bleve.Open(destination)
if errors.Is(bleve.ErrorIndexPathDoesNotExist, err) {
m, err := BuildBleveMapping()
if err != nil {
return nil, err
}
index, err = bleve.New(destination, m)
if err != nil {
return nil, err
}

return index, nil
}

return index, err
}

// NewBleveEngine creates a new Bleve instance
func NewBleveEngine(index bleve.Index, queryCreator searchQuery.Creator[query.Query]) *Bleve {
// If scalable is set to true, one connection to the index is created and
// closed per operation, so multiple operations can be executed in parallel.
// If set to false, only one write connection is created for the whole
// service, which will lock the index for other processes. In this case,
// you must close the engine yourself.
func NewBleveEngine(indexGetter bleveEngine.IndexGetter, queryCreator searchQuery.Creator[query.Query]) *Bleve {
return &Bleve{
index: index,
indexGetter: indexGetter,
queryCreator: queryCreator,
}
}

// Close will get the index and close it. If the indexGetter is returning
// new instances, this method will close just the new returned instance but
// not any other instances that might be in use.
//
// This method is useful if "memory" and "persistent" (not "persistentScale")
// index getters are used.
func (b *Bleve) Close() error {
// regardless of the implementation, we want to close the index
bleveIndex, _, err := b.indexGetter.GetIndex()
if err != nil {
return err
}
return bleveIndex.Close()
}

// BuildBleveMapping builds a bleve index mapping which can be used for indexing
func BuildBleveMapping() (mapping.IndexMapping, error) {
nameMapping := bleve.NewTextFieldMapping()
Expand Down Expand Up @@ -123,6 +122,12 @@ func BuildBleveMapping() (mapping.IndexMapping, error) {
// Search executes a search request operation within the index.
// Returns a SearchIndexResponse object or an error.
func (b *Bleve) Search(ctx context.Context, sir *searchService.SearchIndexRequest) (*searchService.SearchIndexResponse, error) {
bleveIndex, closeFn, err := b.indexGetter.GetIndex(bleveEngine.ReadOnly(true))
if err != nil {
return nil, err
}
defer closeFn()

createdQuery, err := b.queryCreator.Create(sir.Query)
if err != nil {
if searchQuery.IsValidationError(err) {
Expand Down Expand Up @@ -169,7 +174,7 @@ func (b *Bleve) Search(ctx context.Context, sir *searchService.SearchIndexReques
}

bleveReq.Fields = []string{"*"}
res, err := b.index.Search(bleveReq)
res, err := bleveIndex.Search(bleveReq)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -237,19 +242,31 @@ func (b *Bleve) Search(ctx context.Context, sir *searchService.SearchIndexReques

// Upsert indexes or stores Resource data fields.
func (b *Bleve) Upsert(id string, r Resource) error {
return b.index.Index(id, r)
bleveIndex, closeFn, err := b.indexGetter.GetIndex()
if err != nil {
return err
}
defer closeFn()

return bleveIndex.Index(id, r)
}

// Move updates the resource location and all of its necessary fields.
func (b *Bleve) Move(id string, parentid string, target string) error {
r, err := b.getResource(id)
bleveIndex, closeFn, err := b.indexGetter.GetIndex()
if err != nil {
return err
}
defer closeFn()

r, err := b.getResource(bleveIndex, id)
if err != nil {
return err
}
currentPath := r.Path
nextPath := utils.MakeRelativePath(target)

r, err = b.updateEntity(id, func(r *Resource) {
r, err = b.updateEntity(bleveIndex, id, func(r *Resource) {
r.Path = nextPath
r.Name = path.Base(nextPath)
r.ParentID = parentid
Expand All @@ -266,13 +283,13 @@ func (b *Bleve) Move(id string, parentid string, target string) error {
bleveReq := bleve.NewSearchRequest(q)
bleveReq.Size = math.MaxInt
bleveReq.Fields = []string{"*"}
res, err := b.index.Search(bleveReq)
res, err := bleveIndex.Search(bleveReq)
if err != nil {
return err
}

for _, h := range res.Hits {
_, err := b.updateEntity(h.ID, func(r *Resource) {
_, err := b.updateEntity(bleveIndex, h.ID, func(r *Resource) {
r.Path = strings.Replace(r.Path, currentPath, nextPath, 1)
})
if err != nil {
Expand All @@ -289,29 +306,53 @@ func (b *Bleve) Move(id string, parentid string, target string) error {
// instead of removing the resource it just marks it as deleted!
// can be undone
func (b *Bleve) Delete(id string) error {
return b.setDeleted(id, true)
bleveIndex, closeFn, err := b.indexGetter.GetIndex()
if err != nil {
return err
}
defer closeFn()

return b.setDeleted(bleveIndex, id, true)
}

// Restore is the counterpart to Delete.
// It restores the resource which makes it available again.
func (b *Bleve) Restore(id string) error {
return b.setDeleted(id, false)
bleveIndex, closeFn, err := b.indexGetter.GetIndex()
if err != nil {
return err
}
defer closeFn()

return b.setDeleted(bleveIndex, id, false)
}

// Purge removes a resource from the index, irreversible operation.
func (b *Bleve) Purge(id string) error {
return b.index.Delete(id)
bleveIndex, closeFn, err := b.indexGetter.GetIndex()
if err != nil {
return err
}
defer closeFn()

return bleveIndex.Delete(id)
}

// DocCount returns the number of resources in the index.
func (b *Bleve) DocCount() (uint64, error) {
return b.index.DocCount()
bleveIndex, closeFn, err := b.indexGetter.GetIndex(bleveEngine.ReadOnly(true))
if err != nil {
return 0, err
}
defer closeFn()

return bleveIndex.DocCount()
}

func (b *Bleve) getResource(id string) (*Resource, error) {
func (b *Bleve) getResource(bleveIndex bleve.Index, id string) (*Resource, error) {
req := bleve.NewSearchRequest(bleve.NewDocIDQuery([]string{id}))
req.Fields = []string{"*"}
res, err := b.index.Search(req)
res, err := bleveIndex.Search(req)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -446,19 +487,19 @@ func getPhotoValue[T any](fields map[string]interface{}) *T {
return nil
}

func (b *Bleve) updateEntity(id string, mutateFunc func(r *Resource)) (*Resource, error) {
it, err := b.getResource(id)
func (b *Bleve) updateEntity(bleveIndex bleve.Index, id string, mutateFunc func(r *Resource)) (*Resource, error) {
it, err := b.getResource(bleveIndex, id)
if err != nil {
return nil, err
}

mutateFunc(it)

return it, b.index.Index(it.ID, it)
return it, bleveIndex.Index(it.ID, it)
}

func (b *Bleve) setDeleted(id string, deleted bool) error {
it, err := b.updateEntity(id, func(r *Resource) {
func (b *Bleve) setDeleted(bleveIndex bleve.Index, id string, deleted bool) error {
it, err := b.updateEntity(bleveIndex, id, func(r *Resource) {
r.Deleted = deleted
})
if err != nil {
Expand All @@ -473,13 +514,13 @@ func (b *Bleve) setDeleted(id string, deleted bool) error {
bleveReq := bleve.NewSearchRequest(q)
bleveReq.Size = math.MaxInt
bleveReq.Fields = []string{"*"}
res, err := b.index.Search(bleveReq)
res, err := bleveIndex.Search(bleveReq)
if err != nil {
return err
}

for _, h := range res.Hits {
_, err := b.updateEntity(h.ID, func(r *Resource) {
_, err := b.updateEntity(bleveIndex, h.ID, func(r *Resource) {
r.Deleted = deleted
})
if err != nil {
Expand Down
Loading