From aaa7785efeeca8a60902002ec202301f4cf614ab Mon Sep 17 00:00:00 2001 From: congqixia Date: Wed, 8 Jan 2025 12:00:56 +0800 Subject: [PATCH] fix: Return io error other than `NotExist` refreshing config (#38924) Related to #38923 This PR: - Check whether `os.Stat` config file error is io.ErrNotExist - Panic when get config return error during Milvus initialization --------- Signed-off-by: Congqi Xia --- pkg/config/config.go | 7 ++++- pkg/config/file_source.go | 11 +++++++- pkg/config/manager_test.go | 53 +++++++++++++++++++++++--------------- pkg/config/refresher.go | 3 +-- 4 files changed, 49 insertions(+), 25 deletions(-) diff --git a/pkg/config/config.go b/pkg/config/config.go index fc93c086f74d4..46eea0beab1fd 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -17,9 +17,11 @@ package config import ( + "log" "strings" "github.com/cockroachdb/errors" + "go.uber.org/zap" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -38,7 +40,10 @@ func Init(opts ...Option) (*Manager, error) { sourceManager := NewManager() if o.FileInfo != nil { s := NewFileSource(o.FileInfo) - sourceManager.AddSource(s) + err := sourceManager.AddSource(s) + if err != nil { + log.Fatal("failed to add FileSource config", zap.Error(err)) + } } if o.EnvKeyFormatter != nil { sourceManager.AddSource(NewEnvSource(o.EnvKeyFormatter)) diff --git a/pkg/config/file_source.go b/pkg/config/file_source.go index e8402efe6b6ad..fb4bd18750298 100644 --- a/pkg/config/file_source.go +++ b/pkg/config/file_source.go @@ -123,9 +123,14 @@ func (fs *FileSource) loadFromFile() error { configFiles = fs.files fs.RUnlock() + notExistsNum := 0 for _, configFile := range configFiles { if _, err := os.Stat(configFile); err != nil { - continue + if os.IsNotExist(err) { + notExistsNum++ + continue + } + return err } yamlReader.SetConfigFile(configFile) @@ -161,6 +166,10 @@ func (fs *FileSource) loadFromFile() error { newConfig[formatKey(key)] = str } } + // not allow all config files missing, return error for this case + if notExistsNum == len(configFiles) { + return errors.Newf("all config files not exists, files: %v", configFiles) + } return fs.update(newConfig) } diff --git a/pkg/config/manager_test.go b/pkg/config/manager_test.go index 0fccc876af509..c2afac823314c 100644 --- a/pkg/config/manager_test.go +++ b/pkg/config/manager_test.go @@ -136,6 +136,7 @@ func TestOnEvent(t *testing.T) { dir, _ := os.MkdirTemp("", "milvus") yamlFile := path.Join(dir, "milvus.yaml") + os.WriteFile(yamlFile, []byte("a.b: \"\""), 0o600) mgr, _ := Init(WithEnvSource(formatKey), WithFilesSource(&FileInfo{ Files: []string{yamlFile}, @@ -147,31 +148,41 @@ func TestOnEvent(t *testing.T) { RefreshInterval: 10 * time.Millisecond, })) os.WriteFile(yamlFile, []byte("a.b: aaa"), 0o600) - time.Sleep(time.Second) - value, err := mgr.GetConfig("a.b") - assert.NoError(t, err) - assert.Equal(t, value, "aaa") + assert.Eventually(t, func() bool { + value, err := mgr.GetConfig("a.b") + assert.NoError(t, err) + return value == "aaa" + }, time.Second*5, time.Second) + ctx := context.Background() client.KV.Put(ctx, "test/config/a/b", "bbb") - time.Sleep(time.Second) - value, err = mgr.GetConfig("a.b") - assert.NoError(t, err) - assert.Equal(t, value, "bbb") + + assert.Eventually(t, func() bool { + value, err := mgr.GetConfig("a.b") + assert.NoError(t, err) + return value == "bbb" + }, time.Second*5, time.Second) + client.KV.Put(ctx, "test/config/a/b", "ccc") - time.Sleep(time.Second) - value, err = mgr.GetConfig("a.b") - assert.NoError(t, err) - assert.Equal(t, value, "ccc") + assert.Eventually(t, func() bool { + value, err := mgr.GetConfig("a.b") + assert.NoError(t, err) + return value == "ccc" + }, time.Second*5, time.Second) + os.WriteFile(yamlFile, []byte("a.b: ddd"), 0o600) - time.Sleep(time.Second) - value, err = mgr.GetConfig("a.b") - assert.NoError(t, err) - assert.Equal(t, value, "ccc") + assert.Eventually(t, func() bool { + value, err := mgr.GetConfig("a.b") + assert.NoError(t, err) + return value == "ccc" + }, time.Second*5, time.Second) + client.KV.Delete(ctx, "test/config/a/b") - time.Sleep(time.Second) - value, err = mgr.GetConfig("a.b") - assert.NoError(t, err) - assert.Equal(t, value, "ddd") + assert.Eventually(t, func() bool { + value, err := mgr.GetConfig("a.b") + assert.NoError(t, err) + return value == "ddd" + }, time.Second*5, time.Second) } func TestDeadlock(t *testing.T) { @@ -206,6 +217,7 @@ func TestCachedConfig(t *testing.T) { dir, _ := os.MkdirTemp("", "milvus") yamlFile := path.Join(dir, "milvus.yaml") + os.WriteFile(yamlFile, []byte("a.b: aaa"), 0o600) mgr, _ := Init(WithEnvSource(formatKey), WithFilesSource(&FileInfo{ Files: []string{yamlFile}, @@ -218,7 +230,6 @@ func TestCachedConfig(t *testing.T) { })) // test get cached value from file { - os.WriteFile(yamlFile, []byte("a.b: aaa"), 0o600) time.Sleep(time.Second) _, exist := mgr.GetCachedValue("a.b") assert.False(t, exist) diff --git a/pkg/config/refresher.go b/pkg/config/refresher.go index 2a403f5ed5156..2cdca1a0c51e0 100644 --- a/pkg/config/refresher.go +++ b/pkg/config/refresher.go @@ -70,8 +70,7 @@ func (r *refresher) refreshPeriodically(name string) { case <-ticker.C: err := r.fetchFunc() if err != nil { - log.Error("can not pull configs", zap.Error(err)) - r.stop() + log.WithRateGroup("refresher", 1, 60).RatedWarn(60, "can not pull configs", zap.Error(err)) } case <-r.intervalDone: log.Info("stop refreshing configurations", zap.String("source", name))