Skip to content

Commit

Permalink
fix: Return io error other than NotExist refreshing config (milvus-…
Browse files Browse the repository at this point in the history
…io#38924)

Related to milvus-io#38923

This PR:

- Check whether `os.Stat` config file error is io.ErrNotExist
- Panic when get config return error during Milvus initialization

---------

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
  • Loading branch information
congqixia committed Jan 8, 2025
1 parent b2b7cca commit 798288e
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 28 deletions.
7 changes: 6 additions & 1 deletion pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ package config

import (
"fmt"
"log"
"strings"

"github.com/cockroachdb/errors"
"github.com/spf13/cast"
"go.uber.org/zap"

"github.com/milvus-io/milvus/pkg/util/typeutil"
)
Expand All @@ -44,7 +46,10 @@ func Init(opts ...Option) (*Manager, error) {
sourceManager := NewManager()
if o.FileInfo != nil {
s := NewFileSource(o.FileInfo)
sourceManager.AddSource(s)
err := sourceManager.AddSource(s)
if err != nil {
log.Fatal("failed to add FileSource config", zap.Error(err))
}
}
if o.EnvKeyFormatter != nil {
sourceManager.AddSource(NewEnvSource(o.EnvKeyFormatter))
Expand Down
17 changes: 13 additions & 4 deletions pkg/config/file_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,29 +123,38 @@ func (fs *FileSource) loadFromFile() error {
configFiles = fs.files
fs.RUnlock()

notExistsNum := 0
for _, configFile := range configFiles {
if _, err := os.Stat(configFile); err != nil {
continue
if os.IsNotExist(err) {
notExistsNum++
continue
}
return err
}

ext := filepath.Ext(configFile)
if len(ext) == 0 || (ext[1:] != "yaml" && ext[1:] != "yml") {
return fmt.Errorf("Unsupported Config Type: " + ext)
return fmt.Errorf("Unsupported Config Type: %s", ext)
}

data, err := os.ReadFile(configFile)
if err != nil {
return errors.Wrap(err, "Read config failed: "+configFile)
return errors.Wrapf(err, "Read config failed: %s", configFile)
}

var config map[string]interface{}
err = yaml.Unmarshal(data, &config)
if err != nil {
return errors.Wrap(err, "unmarshal yaml file "+configFile+" failed")
return errors.Wrapf(err, "unmarshal yaml file %s failed", configFile)
}

flattenAndMergeMap("", config, newConfig)
}
// not allow all config files missing, return error for this case
if notExistsNum == len(configFiles) {
return errors.Newf("all config files not exists, files: %v", configFiles)
}

return fs.update(newConfig)
}
Expand Down
53 changes: 32 additions & 21 deletions pkg/config/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ func TestOnEvent(t *testing.T) {

dir, _ := os.MkdirTemp("", "milvus")
yamlFile := path.Join(dir, "milvus.yaml")
os.WriteFile(yamlFile, []byte("a.b: \"\""), 0o600)
mgr, _ := Init(WithEnvSource(formatKey),
WithFilesSource(&FileInfo{
Files: []string{yamlFile},
Expand All @@ -147,31 +148,41 @@ func TestOnEvent(t *testing.T) {
RefreshInterval: 10 * time.Millisecond,
}))
os.WriteFile(yamlFile, []byte("a.b: aaa"), 0o600)
time.Sleep(time.Second)
value, err := mgr.GetConfig("a.b")
assert.NoError(t, err)
assert.Equal(t, value, "aaa")
assert.Eventually(t, func() bool {
value, err := mgr.GetConfig("a.b")
assert.NoError(t, err)
return value == "aaa"
}, time.Second*5, time.Second)

ctx := context.Background()
client.KV.Put(ctx, "test/config/a/b", "bbb")
time.Sleep(time.Second)
value, err = mgr.GetConfig("a.b")
assert.NoError(t, err)
assert.Equal(t, value, "bbb")

assert.Eventually(t, func() bool {
value, err := mgr.GetConfig("a.b")
assert.NoError(t, err)
return value == "bbb"
}, time.Second*5, time.Second)

client.KV.Put(ctx, "test/config/a/b", "ccc")
time.Sleep(time.Second)
value, err = mgr.GetConfig("a.b")
assert.NoError(t, err)
assert.Equal(t, value, "ccc")
assert.Eventually(t, func() bool {
value, err := mgr.GetConfig("a.b")
assert.NoError(t, err)
return value == "ccc"
}, time.Second*5, time.Second)

os.WriteFile(yamlFile, []byte("a.b: ddd"), 0o600)
time.Sleep(time.Second)
value, err = mgr.GetConfig("a.b")
assert.NoError(t, err)
assert.Equal(t, value, "ccc")
assert.Eventually(t, func() bool {
value, err := mgr.GetConfig("a.b")
assert.NoError(t, err)
return value == "ccc"
}, time.Second*5, time.Second)

client.KV.Delete(ctx, "test/config/a/b")
time.Sleep(time.Second)
value, err = mgr.GetConfig("a.b")
assert.NoError(t, err)
assert.Equal(t, value, "ddd")
assert.Eventually(t, func() bool {
value, err := mgr.GetConfig("a.b")
assert.NoError(t, err)
return value == "ddd"
}, time.Second*5, time.Second)
}

func TestDeadlock(t *testing.T) {
Expand Down Expand Up @@ -206,6 +217,7 @@ func TestCachedConfig(t *testing.T) {

dir, _ := os.MkdirTemp("", "milvus")
yamlFile := path.Join(dir, "milvus.yaml")
os.WriteFile(yamlFile, []byte("a.b: aaa"), 0o600)
mgr, _ := Init(WithEnvSource(formatKey),
WithFilesSource(&FileInfo{
Files: []string{yamlFile},
Expand All @@ -218,7 +230,6 @@ func TestCachedConfig(t *testing.T) {
}))
// test get cached value from file
{
os.WriteFile(yamlFile, []byte("a.b: aaa"), 0o600)
time.Sleep(time.Second)
_, exist := mgr.GetCachedValue("a.b")
assert.False(t, exist)
Expand Down
3 changes: 1 addition & 2 deletions pkg/config/refresher.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,7 @@ func (r *refresher) refreshPeriodically(name string) {
case <-ticker.C:
err := r.fetchFunc()
if err != nil {
log.Error("can not pull configs", zap.Error(err))
r.stop()
log.WithRateGroup("refresher", 1, 60).RatedWarn(60, "can not pull configs", zap.Error(err))
}
case <-r.intervalDone:
log.Info("stop refreshing configurations", zap.String("source", name))
Expand Down

0 comments on commit 798288e

Please sign in to comment.