diff --git a/pkg/storage/sync/storage.go b/pkg/storage/sync/storage.go index 57091a09b..7714837f3 100644 --- a/pkg/storage/sync/storage.go +++ b/pkg/storage/sync/storage.go @@ -116,7 +116,7 @@ func (ss *SyncStorage) runAll(f func(storage.Storage) error) (err error) { for i := 0; i < len(ss.storages); i++ { if result := <-errC; result.error != nil { if err == nil { - err = fmt.Errorf("SyncStorage: error in Storage %d: %v", result.int, result.error) + err = fmt.Errorf("SyncStorage: Error in Storage %d: %v", result.int, result.error) } else { err = fmt.Errorf("%v\n%29s %d: %v", err, "and error in Storage", result.int, result.error) } @@ -127,8 +127,8 @@ func (ss *SyncStorage) runAll(f func(storage.Storage) error) (err error) { } func (ss *SyncStorage) monitorFunc() { - log.Debug("SyncStorage: monitoring thread started") - defer log.Debug("SyncStorage: monitoring thread stopped") + log.Debug("SyncStorage: Monitoring thread started") + defer log.Debug("SyncStorage: Monitoring thread stopped") // This is the internal client for propagating updates c := client.NewClient(ss) @@ -138,9 +138,8 @@ func (ss *SyncStorage) monitorFunc() { // For now, only update the state on write when Ignite is running for { upd, ok := <-ss.eventStream - log.Debugf("SyncStorage: received update %v %t", upd, ok) + log.Debugf("SyncStorage: Received update %v %t", upd, ok) if ok { - switch upd.Event { case update.EventModify, update.EventCreate: // First load the Object using the Storage given in the update, @@ -169,9 +168,9 @@ func (ss *SyncStorage) monitorFunc() { // updates as updateBuffer specifies. select { case ss.updateStream <- upd.Update: - log.Debugf("SyncStorage: sent update: %v", upd.Update) + log.Debugf("SyncStorage: Sent update: %v", upd.Update) default: - log.Warn("SyncStorage: failed to send update, channel full") + log.Warn("SyncStorage: Failed to send update, channel full") } } else { return diff --git a/pkg/storage/watch/watch.go b/pkg/storage/watch/watch.go index ca8dac3e1..934d1c24a 100644 --- a/pkg/storage/watch/watch.go +++ b/pkg/storage/watch/watch.go @@ -199,7 +199,7 @@ func (w *watcher) dispatchFunc() { for { // Wait until we have a batch dispatched to us - w.batcher.ProcessBatch(func(key, val interface{}) bool { + if !w.batcher.ProcessBatch(func(key, val interface{}) bool { filePath := key.(string) // Concatenate all known events, and dispatch them to be handled one by one @@ -209,7 +209,9 @@ func (w *watcher) dispatchFunc() { // Continue traversing the map return true - }) + }) { + return // The batcher is closed, stop processing + } log.Debug("Watcher: Dispatched events batch and reset the events cache") }