Skip to content

feat: Speedup adding batch of documents #20

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions rag/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

type Engine interface {
Store(s string, metadata map[string]string) (engine.Result, error)
StoreDocuments(s []string, metadata map[string]string) ([]engine.Result, error)
Reset() error
Search(s string, similarEntries int) ([]types.Result, error)
Count() int
Expand Down
29 changes: 29 additions & 0 deletions rag/engine/chromem.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,35 @@ func (c *ChromemDB) Store(s string, metadata map[string]string) (Result, error)
}, nil
}

func (c *ChromemDB) StoreDocuments(s []string, metadata map[string]string) ([]Result, error) {
defer func() {
c.index += len(s)
}()

if len(s) == 0 {
return nil, fmt.Errorf("empty string")
}

results := make([]Result, len(s))
documents := make([]chromem.Document, len(s))
for i, content := range s {
documents[i] = chromem.Document{
Metadata: metadata,
Content: content,
ID: fmt.Sprint(c.index + i),
}
results[i] = Result{
ID: fmt.Sprint(c.index + i),
}
}

if err := c.collection.AddDocuments(context.Background(), documents, runtime.NumCPU()); err != nil {
return nil, err
}

return results, nil
}

func (c *ChromemDB) Delete(where map[string]string, whereDocuments map[string]string, ids ...string) error {
return c.collection.Delete(context.Background(), where, whereDocuments, ids...)
}
Expand Down
12 changes: 12 additions & 0 deletions rag/engine/localai.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,18 @@ func (db *LocalAIRAGDB) Count() int {
return 0
}

func (db *LocalAIRAGDB) StoreDocuments(s []string, metadata map[string]string) ([]Result, error) {
results := []Result{}
for _, content := range s {
result, err := db.Store(content, metadata)
if err != nil {
return nil, err
}
results = append(results, result)
}
return results, nil
}

func (db *LocalAIRAGDB) Store(s string, metadata map[string]string) (Result, error) {
resp, err := db.openaiClient.CreateEmbeddings(context.TODO(),
openai.EmbeddingRequestStrings{
Expand Down
19 changes: 11 additions & 8 deletions rag/persistency.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ func (db *PersistentKB) Store(entry string, metadata map[string]string) error {
}

func (db *PersistentKB) storeFile(entry string, metadata map[string]string) error {
xlog.Info("Storing file", "entry", entry)
fileName := filepath.Base(entry)

// copy file to assetDir (if it's a file)
Expand All @@ -217,6 +218,7 @@ func (db *PersistentKB) storeFile(entry string, metadata map[string]string) erro
}

func (db *PersistentKB) StoreOrReplace(entry string, metadata map[string]string) error {
xlog.Info("Storing or replacing entry", "entry", entry)
db.Lock()
defer db.Unlock()

Expand All @@ -234,22 +236,23 @@ func (db *PersistentKB) StoreOrReplace(entry string, metadata map[string]string)
}

func (db *PersistentKB) store(metadata map[string]string, files ...string) ([]engine.Result, error) {
xlog.Info("Storing files", "files", files)
results := []engine.Result{}

for _, c := range files {
e := filepath.Join(db.assetDir, filepath.Base(c))
pieces, err := chunkFile(e, db.maxChunkSize)
if err != nil {
return nil, err
}
for _, p := range pieces {
metadata["type"] = "file"
metadata["source"] = c
res, err := db.Engine.Store(p, metadata)
if err != nil {
return nil, err
}
results = append(results, res)
metadata["type"] = "file"
metadata["source"] = c
xlog.Info("Storing pieces", "pieces", pieces, "metadata", metadata)
res, err := db.Engine.StoreDocuments(pieces, metadata)
if err != nil {
return nil, err
}
results = append(results, res...)
db.index[c] = results
}

Expand Down
2 changes: 1 addition & 1 deletion rag/source_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func (sm *SourceManager) updateSource(collectionName string, source ExternalSour
return
}

xlog.Info("Content", "content", content)
//xlog.Info("Content", "content", content)

// Create a temporary file to store the content
sanitizedURL := sanitizeURL(source.URL)
Expand Down
8 changes: 7 additions & 1 deletion rag/sources/router.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,21 @@
package sources

import "strings"
import (
"strings"

"github.com/mudler/localrecall/pkg/xlog"
)

func SourceRouter(url string) (string, error) {

xlog.Info("Downloading content from", "url", url)
switch {
case strings.HasSuffix(url, "sitemap.xml"):
content, err := GetWebSitemapContent(url)
if err != nil {
return "", err
}
xlog.Info("Downloaded all content from sitemap", "url", url, "length", len(content))
return strings.Join(content, "\n"), nil
}

Expand Down