Skip to content
This repository was archived by the owner on Dec 7, 2023. It is now read-only.

Commit

Permalink
Merge pull request #244 from twelho/storage-fixes
Browse files Browse the repository at this point in the history
Actually stop the watcher dispatch thread, better formatting consistency
  • Loading branch information
luxas authored Jul 30, 2019
2 parents 0513f86 + 6774650 commit e5046d6
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 9 deletions.
13 changes: 6 additions & 7 deletions pkg/storage/sync/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func (ss *SyncStorage) runAll(f func(storage.Storage) error) (err error) {
for i := 0; i < len(ss.storages); i++ {
if result := <-errC; result.error != nil {
if err == nil {
err = fmt.Errorf("SyncStorage: error in Storage %d: %v", result.int, result.error)
err = fmt.Errorf("SyncStorage: Error in Storage %d: %v", result.int, result.error)
} else {
err = fmt.Errorf("%v\n%29s %d: %v", err, "and error in Storage", result.int, result.error)
}
Expand All @@ -127,8 +127,8 @@ func (ss *SyncStorage) runAll(f func(storage.Storage) error) (err error) {
}

func (ss *SyncStorage) monitorFunc() {
log.Debug("SyncStorage: monitoring thread started")
defer log.Debug("SyncStorage: monitoring thread stopped")
log.Debug("SyncStorage: Monitoring thread started")
defer log.Debug("SyncStorage: Monitoring thread stopped")

// This is the internal client for propagating updates
c := client.NewClient(ss)
Expand All @@ -138,9 +138,8 @@ func (ss *SyncStorage) monitorFunc() {
// For now, only update the state on write when Ignite is running
for {
upd, ok := <-ss.eventStream
log.Debugf("SyncStorage: received update %v %t", upd, ok)
log.Debugf("SyncStorage: Received update %v %t", upd, ok)
if ok {

switch upd.Event {
case update.EventModify, update.EventCreate:
// First load the Object using the Storage given in the update,
Expand Down Expand Up @@ -169,9 +168,9 @@ func (ss *SyncStorage) monitorFunc() {
// updates as updateBuffer specifies.
select {
case ss.updateStream <- upd.Update:
log.Debugf("SyncStorage: sent update: %v", upd.Update)
log.Debugf("SyncStorage: Sent update: %v", upd.Update)
default:
log.Warn("SyncStorage: failed to send update, channel full")
log.Warn("SyncStorage: Failed to send update, channel full")
}
} else {
return
Expand Down
6 changes: 4 additions & 2 deletions pkg/storage/watch/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func (w *watcher) dispatchFunc() {

for {
// Wait until we have a batch dispatched to us
w.batcher.ProcessBatch(func(key, val interface{}) bool {
if !w.batcher.ProcessBatch(func(key, val interface{}) bool {
filePath := key.(string)

// Concatenate all known events, and dispatch them to be handled one by one
Expand All @@ -209,7 +209,9 @@ func (w *watcher) dispatchFunc() {

// Continue traversing the map
return true
})
}) {
return // The batcher is closed, stop processing
}

log.Debug("Watcher: Dispatched events batch and reset the events cache")
}
Expand Down

0 comments on commit e5046d6

Please sign in to comment.