Skip to content

Commit e4b34b1

Browse files
committed
lock during store and replace
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
1 parent 24c6db1 commit e4b34b1

File tree

4 files changed

+36
-18
lines changed

4 files changed

+36
-18
lines changed

rag/collection.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ func NewPersistentLocalAICollection(llmClient *openai.Client, apiURL, apiKey, co
5252

5353
// TODO: This does not work as we do not have .Reset().
5454
// The problem is that LocalAI stores are not persistent either and do not allow upserts.
55-
persistentKB.repopulate()
55+
persistentKB.Repopulate()
5656

5757
return persistentKB
5858
}

rag/persistency.go

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -120,11 +120,16 @@ func (db *PersistentKB) save() error {
120120
return os.WriteFile(db.path, data, 0644)
121121
}
122122

123-
// repopulate reinitializes the persistent knowledge base with the files that were added to it.
124-
func (db *PersistentKB) repopulate() error {
123+
func (db *PersistentKB) Count() int {
125124
db.Lock()
126125
defer db.Unlock()
127126

127+
return db.Engine.Count()
128+
}
129+
130+
// repopulate reinitializes the persistent knowledge base with the files that were added to it.
131+
func (db *PersistentKB) repopulate() error {
132+
128133
if err := db.Engine.Reset(); err != nil {
129134
return fmt.Errorf("failed to reset engine: %w", err)
130135
}
@@ -141,6 +146,13 @@ func (db *PersistentKB) repopulate() error {
141146
return nil
142147
}
143148

149+
func (db *PersistentKB) Repopulate() error {
150+
db.Lock()
151+
defer db.Unlock()
152+
153+
return db.repopulate()
154+
}
155+
144156
// Store stores an entry in the persistent knowledge base.
145157
func (db *PersistentKB) ListDocuments() []string {
146158
db.Lock()
@@ -173,6 +185,10 @@ func (db *PersistentKB) Store(entry string, metadata map[string]string) error {
173185
db.Lock()
174186
defer db.Unlock()
175187

188+
return db.storeFile(entry, metadata)
189+
}
190+
191+
func (db *PersistentKB) storeFile(entry string, metadata map[string]string) error {
176192
fileName := filepath.Base(entry)
177193

178194
// copy file to assetDir (if it's a file)
@@ -193,19 +209,17 @@ func (db *PersistentKB) Store(entry string, metadata map[string]string) error {
193209
}
194210

195211
func (db *PersistentKB) StoreOrReplace(entry string, metadata map[string]string) error {
196-
db.Lock()
197212
fileName := filepath.Base(entry)
198213
_, ok := db.index[fileName]
199-
db.Unlock()
200214
// Check if we have it already in the index
201215
if ok {
202216
xlog.Info("Data already exists for entry", "entry", entry, "index", db.index)
203-
if err := db.RemoveEntry(fileName); err != nil {
217+
if err := db.removeFileEntry(fileName); err != nil {
204218
return fmt.Errorf("failed to remove entry: %w", err)
205219
}
206220
}
207221

208-
return db.Store(entry, metadata)
222+
return db.storeFile(entry, metadata)
209223
}
210224

211225
func (db *PersistentKB) store(metadata map[string]string, files ...string) ([]engine.Result, error) {
@@ -231,8 +245,14 @@ func (db *PersistentKB) store(metadata map[string]string, files ...string) ([]en
231245
return results, nil
232246
}
233247

234-
// RemoveEntry removes an entry from the persistent knowledge base.
235248
func (db *PersistentKB) RemoveEntry(entry string) error {
249+
db.Lock()
250+
defer db.Unlock()
251+
252+
return db.removeFileEntry(entry)
253+
}
254+
255+
func (db *PersistentKB) removeFileEntry(entry string) error {
236256

237257
xlog.Info("Removing entry", "entry", entry)
238258
if os.Getenv("LOCALRECALL_REPOPULATE_DELETE") != "true" {
@@ -261,25 +281,20 @@ func (db *PersistentKB) RemoveEntry(entry string) error {
261281
}
262282
}
263283

264-
db.Lock()
265-
266284
xlog.Info("Deleting entry from index", "entry", entry)
267285
delete(db.index, entry)
268286

269287
xlog.Info("Removing entry from disk", "file", e)
270288
os.Remove(e)
271-
db.Unlock()
272289
return db.save()
273290
}
274291

275-
db.Lock()
276292
for e := range db.index {
277293
if e == entry {
278294
os.Remove(filepath.Join(db.assetDir, e))
279295
break
280296
}
281297
}
282-
db.Unlock()
283298

284299
// TODO: this is suboptimal, but currently chromem does not support deleting single entities
285300
return db.repopulate()

test/e2e/common_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ const (
2020
DefaultUpdateInterval = time.Hour
2121

2222
// TestTimeout is the default timeout for Eventually blocks
23-
TestTimeout = 5 * time.Second
23+
TestTimeout = 1 * time.Minute
2424

2525
// TestPollingInterval is the default polling interval for Eventually blocks
2626
TestPollingInterval = 500 * time.Millisecond

test/e2e/source_manager_test.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -247,16 +247,19 @@ var _ = Describe("SourceManager", func() {
247247
return kb.ListDocuments()
248248
}, 2*time.Minute, 5*time.Second).Should(HaveLen(1))
249249

250+
// Wait for initial content to be fetched
251+
Eventually(func() int {
252+
return kb.Count()
253+
}, 2*time.Minute, 5*time.Second).Should(Equal(25))
254+
250255
// Let it run for 2 minutes and check for duplicates
251256
Consistently(func() int {
252-
e, ok := kb.Engine.(*engine.ChromemDB)
253-
Expect(ok).To(BeTrue())
254-
return e.Count()
257+
return kb.Count()
255258
}, 3*time.Minute, 5*time.Second).Should(Equal(25))
256259

257260
// Verify that search results don't contain duplicates
258261
Consistently(func() bool {
259-
results, err := kb.Engine.Search("What is the Black-crowned barwing?", 3)
262+
results, err := kb.Engine.Search("What is the Black-crowned barwing?", 5)
260263
if err != nil {
261264
fmt.Println("Error searching for content", err)
262265
return false

0 commit comments

Comments
 (0)