Skip to content

fix: update lastUpdate correctly #21

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions rag/persistency.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (

// CollectionState represents the persistent state of a collection
type CollectionState struct {
ExternalSources []ExternalSource `json:"external_sources"`
ExternalSources []*ExternalSource `json:"external_sources"`
Index map[string][]engine.Result `json:"index"`
}

Expand All @@ -29,7 +29,7 @@ type PersistentKB struct {
path string
assetDir string
maxChunkSize int
sources []ExternalSource
sources []*ExternalSource

index map[string][]engine.Result
}
Expand All @@ -48,7 +48,7 @@ func loadDB(path string) (*CollectionState, error) {
if err := json.Unmarshal(data, &legacyFiles); err != nil {
return nil, err
}
state.ExternalSources = []ExternalSource{}
state.ExternalSources = []*ExternalSource{}
state.Index = map[string][]engine.Result{}
}

Expand All @@ -68,7 +68,7 @@ func NewPersistentCollectionKB(stateFile, assetDir string, store Engine, maxChun
Engine: store,
assetDir: assetDir,
maxChunkSize: maxChunkSize,
sources: []ExternalSource{},
sources: []*ExternalSource{},
index: map[string][]engine.Result{},
}
persistentKB.Lock()
Expand Down Expand Up @@ -104,7 +104,7 @@ func (db *PersistentKB) Reset() error {
for f := range db.index {
os.Remove(filepath.Join(db.assetDir, f))
}
db.sources = []ExternalSource{}
db.sources = []*ExternalSource{}
db.index = map[string][]engine.Result{}
db.save()
db.Unlock()
Expand Down Expand Up @@ -368,14 +368,14 @@ func chunkFile(fpath string, maxchunksize int) ([]string, error) {
}

// GetExternalSources returns the list of external sources for this collection
func (db *PersistentKB) GetExternalSources() []ExternalSource {
func (db *PersistentKB) GetExternalSources() []*ExternalSource {
db.Lock()
defer db.Unlock()
return db.sources
}

// AddExternalSource adds an external source to the collection
func (db *PersistentKB) AddExternalSource(source ExternalSource) error {
func (db *PersistentKB) AddExternalSource(source *ExternalSource) error {
db.Lock()
defer db.Unlock()

Expand Down
17 changes: 10 additions & 7 deletions rag/source_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ type ExternalSource struct {

// SourceManager manages external sources for collections
type SourceManager struct {
sources map[string][]ExternalSource // collection name -> sources
collections map[string]*PersistentKB // collection name -> collection
sources map[string][]*ExternalSource // collection name -> sources
collections map[string]*PersistentKB // collection name -> collection
mu sync.RWMutex
ctx context.Context
cancel context.CancelFunc
Expand All @@ -33,7 +33,7 @@ type SourceManager struct {
func NewSourceManager() *SourceManager {
ctx, cancel := context.WithCancel(context.Background())
return &SourceManager{
sources: make(map[string][]ExternalSource),
sources: make(map[string][]*ExternalSource),
collections: make(map[string]*PersistentKB),
ctx: ctx,
cancel: cancel,
Expand Down Expand Up @@ -72,14 +72,14 @@ func (sm *SourceManager) AddSource(collectionName, url string, updateInterval ti
}

// Add the source to the collection's persistent storage
if err := collection.AddExternalSource(source); err != nil {
if err := collection.AddExternalSource(&source); err != nil {
return err
}

sm.sources[collectionName] = append(sm.sources[collectionName], source)
sm.sources[collectionName] = append(sm.sources[collectionName], &source)

// Trigger an immediate update
go sm.updateSource(collectionName, source, collection)
go sm.updateSource(collectionName, &source, collection)

return nil
}
Expand Down Expand Up @@ -116,7 +116,10 @@ func (sm *SourceManager) RemoveSource(collectionName, url string) error {
}

// updateSource updates a single source
func (sm *SourceManager) updateSource(collectionName string, source ExternalSource, collection *PersistentKB) {
func (sm *SourceManager) updateSource(collectionName string, source *ExternalSource, collection *PersistentKB) {

// update LastUpdate
source.LastUpdate = time.Now()

xlog.Info("Updating source", "url", source.URL)
content, err := sources.SourceRouter(source.URL)
Expand Down
6 changes: 3 additions & 3 deletions test/e2e/persistency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ var _ = Describe("Persistency", func() {
LastUpdate: time.Now(),
}

err := kb.AddExternalSource(source)
err := kb.AddExternalSource(&source)
Expect(err).To(BeNil())

sources := kb.GetExternalSources()
Expand All @@ -144,10 +144,10 @@ var _ = Describe("Persistency", func() {
LastUpdate: time.Now(),
}

err := kb.AddExternalSource(source)
err := kb.AddExternalSource(&source)
Expect(err).To(BeNil())

err = kb.AddExternalSource(source)
err = kb.AddExternalSource(&source)
Expect(err).ToNot(BeNil())
})
})
Expand Down
4 changes: 2 additions & 2 deletions test/e2e/source_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ var _ = Describe("SourceManager", func() {
UpdateInterval: DefaultUpdateInterval,
LastUpdate: time.Now(),
}
err := kb.AddExternalSource(source)
err := kb.AddExternalSource(&source)
Expect(err).To(BeNil())

// Register the collection
Expand Down Expand Up @@ -142,7 +142,7 @@ var _ = Describe("SourceManager", func() {
sourceManager.Start()

// Wait for at least one update cycle and verify the source was updated
Eventually(func() []rag.ExternalSource {
Eventually(func() []*rag.ExternalSource {
return kb.GetExternalSources()
}, TestTimeout, TestPollingInterval).Should(HaveLen(1))

Expand Down