diff --git a/lib/cli/config.go b/lib/cli/config.go index e5f71dea..a766fead 100644 --- a/lib/cli/config.go +++ b/lib/cli/config.go @@ -15,7 +15,6 @@ package cli import ( - "fmt" "net/http" "os" @@ -23,16 +22,16 @@ import ( ) const ( - configPrefix = "/api/admin/config" + configPrefix = "/api/admin/config/" ) -func getConfigCmd(ctx *Context, pathSuffix string) *cobra.Command { +func GetConfigCmd(ctx *Context) *cobra.Command { rootCmd := &cobra.Command{ - Use: pathSuffix, + Use: "config", + Short: "", } - path := fmt.Sprintf("%s/%s", configPrefix, pathSuffix) - // set config proxy + // set config { setProxy := &cobra.Command{ Use: "set", @@ -49,7 +48,7 @@ func getConfigCmd(ctx *Context, pathSuffix string) *cobra.Command { b = f } - resp, err := doRequest(cmd.Context(), ctx, http.MethodPut, path, b) + resp, err := doRequest(cmd.Context(), ctx, http.MethodPut, configPrefix, b) if err != nil { return err } @@ -60,13 +59,13 @@ func getConfigCmd(ctx *Context, pathSuffix string) *cobra.Command { rootCmd.AddCommand(setProxy) } - // get config proxy + // get config { getProxy := &cobra.Command{ Use: "get", } getProxy.RunE = func(cmd *cobra.Command, args []string) error { - resp, err := doRequest(cmd.Context(), ctx, http.MethodGet, path, nil) + resp, err := doRequest(cmd.Context(), ctx, http.MethodGet, configPrefix, nil) if err != nil { return err } @@ -79,13 +78,3 @@ func getConfigCmd(ctx *Context, pathSuffix string) *cobra.Command { return rootCmd } - -func GetConfigCmd(ctx *Context) *cobra.Command { - rootCmd := &cobra.Command{ - Use: "config", - Short: "", - } - rootCmd.AddCommand(getConfigCmd(ctx, "proxy")) - rootCmd.AddCommand(getConfigCmd(ctx, "log")) - return rootCmd -} diff --git a/lib/config/proxy.go b/lib/config/proxy.go index ffcd84b9..e7c357bb 100644 --- a/lib/config/proxy.go +++ b/lib/config/proxy.go @@ -140,6 +140,11 @@ func NewConfig(data []byte) (*Config, error) { return &cfg, nil } +func (cfg *Config) Clone() *Config { + newCfg := *cfg + return &newCfg +} + func (cfg *Config) Check() error { if cfg.Workdir == "" { d, err := os.Getwd() diff --git a/lib/util/cmd/syncer.go b/lib/util/cmd/syncer.go index bb2e6bdb..c04b052a 100644 --- a/lib/util/cmd/syncer.go +++ b/lib/util/cmd/syncer.go @@ -114,14 +114,7 @@ func (ws *AtomicWriteSyncer) setOutput(output closableSyncer) error { // Close closes logger. func (ws *AtomicWriteSyncer) Close() error { - var err error - ws.Lock() - if ws.output != nil { - err = ws.output.Close() - ws.output = nil - } - ws.Unlock() - return err + return ws.setOutput(nil) } // initFileLog initializes file based logging options. diff --git a/pkg/manager/config/config.go b/pkg/manager/config/config.go index fb220172..e3970df1 100644 --- a/pkg/manager/config/config.go +++ b/pkg/manager/config/config.go @@ -18,45 +18,22 @@ import ( "context" "encoding/json" "path" - "reflect" "github.com/pingcap/TiProxy/lib/config" "github.com/pingcap/TiProxy/lib/util/errors" "go.uber.org/zap" ) -var cfg2Path = map[reflect.Type]string{ - reflect.TypeOf(config.ProxyServerOnline{}): "proxy", - reflect.TypeOf(config.LogOnline{}): "log", -} - -func (e *ConfigManager) SetConfig(ctx context.Context, val any) error { - rf := reflect.TypeOf(val) - if rf.Kind() == reflect.Pointer { - rf = rf.Elem() - } - p, ok := cfg2Path[rf] - if !ok { - return errors.WithStack(errors.New("invalid type")) - } +func (e *ConfigManager) SetConfig(ctx context.Context, val *config.Config) error { c, err := json.Marshal(val) if err != nil { return errors.WithStack(err) } - return e.set(ctx, pathPrefixConfig, p, c) + return e.set(ctx, pathPrefixConfig, "all", c) } -func (e *ConfigManager) GetConfig(ctx context.Context, val any) error { - rf := reflect.TypeOf(val) - if rf.Kind() == reflect.Pointer { - rf = rf.Elem() - } - p, ok := cfg2Path[rf] - if !ok { - return errors.WithStack(errors.New("invalid type")) - } - - c, err := e.get(ctx, pathPrefixConfig, p) +func (e *ConfigManager) GetConfig(ctx context.Context, val *config.Config) error { + c, err := e.get(ctx, pathPrefixConfig, "all") if err != nil { return err } @@ -64,29 +41,18 @@ func (e *ConfigManager) GetConfig(ctx context.Context, val any) error { return json.Unmarshal(c.Value, val) } -func MakeConfigChan[T any](e *ConfigManager, initval *T) <-chan *T { - cfgchan := make(chan *T, 64) - rf := reflect.TypeOf(initval) - if rf.Kind() == reflect.Pointer { - rf = rf.Elem() - } - p, ok := cfg2Path[rf] - if !ok { - panic(errors.WithStack(errors.New("invalid type"))) - } - - _ = e.SetConfig(context.Background(), initval) - - e.Watch(path.Join(pathPrefixConfig, p), func(_ *zap.Logger, k KVEvent) { - var v T +func (e *ConfigManager) WatchConfig(ctx context.Context) <-chan *config.Config { + ch := make(chan *config.Config) + e.Watch(path.Join(pathPrefixConfig, "all"), func(_ *zap.Logger, k KVEvent) { + var c config.Config if k.Type == KVEventDel { - cfgchan <- &v + ch <- &c } else { - e := json.Unmarshal(k.Value, &v) + e := json.Unmarshal(k.Value, &c) if e == nil { - cfgchan <- &v + ch <- &c } } }) - return cfgchan + return ch } diff --git a/pkg/manager/config/manager.go b/pkg/manager/config/manager.go index 3f844f6e..89fbf8fc 100644 --- a/pkg/manager/config/manager.go +++ b/pkg/manager/config/manager.go @@ -79,6 +79,11 @@ func (srv *ConfigManager) Init(ctx context.Context, cfg *config.Config, logger * return a.Key < b.Key }) + // init config for other components + if err := srv.SetConfig(ctx, cfg); err != nil { + return err + } + var nctx context.Context nctx, srv.cancel = context.WithCancel(ctx) srv.wg.Run(func() { diff --git a/pkg/manager/logger/manager.go b/pkg/manager/logger/manager.go index 08b8fdb2..b27e29df 100644 --- a/pkg/manager/logger/manager.go +++ b/pkg/manager/logger/manager.go @@ -17,6 +17,7 @@ package logger import ( "context" "encoding/json" + "fmt" "github.com/pingcap/TiProxy/lib/config" "github.com/pingcap/TiProxy/lib/util/cmd" @@ -51,28 +52,32 @@ func NewLoggerManager(cfg *config.Log) (*LoggerManager, *zap.Logger, error) { } // Init starts a goroutine to watch configuration. -func (lm *LoggerManager) Init(cfgCh <-chan *config.LogOnline) { +func (lm *LoggerManager) Init(cfgch <-chan *config.Config) { ctx, cancel := context.WithCancel(context.Background()) + lm.cancel = cancel + lm.wg.Run(func() { - lm.watchCfg(ctx, cfgCh) + lm.watchCfg(ctx, cfgch) }) - lm.cancel = cancel } -func (lm *LoggerManager) watchCfg(ctx context.Context, cfgCh <-chan *config.LogOnline) { +func (lm *LoggerManager) watchCfg(ctx context.Context, cfgch <-chan *config.Config) { for { select { case <-ctx.Done(): return - case cfg := <-cfgCh: + case acfg := <-cfgch: + cfg := &acfg.Log.LogOnline + err := lm.updateLoggerCfg(cfg) if err != nil { bytes, merr := json.Marshal(cfg) - if merr != nil { - lm.logger.Error("update logger configuration failed", zap.NamedError("marshal_err", merr), zap.Error(err)) - continue - } - lm.logger.Error("update logger configuration failed", zap.String("cfg", string(bytes)), zap.Error(err)) + fmt.Printf("ggg %+v %+v\n", cfg, err) + lm.logger.Error("update logger configuration failed", + zap.NamedError("update error", err), + zap.String("cfg", string(bytes)), + zap.NamedError("cfg marshal error", merr), + ) } } } diff --git a/pkg/manager/logger/manager_test.go b/pkg/manager/logger/manager_test.go index 3f3fd89b..acc0720c 100644 --- a/pkg/manager/logger/manager_test.go +++ b/pkg/manager/logger/manager_test.go @@ -16,6 +16,7 @@ package logger import ( "context" + "fmt" "math/rand" "os" "path/filepath" @@ -33,15 +34,17 @@ import ( func TestUpdateCfg(t *testing.T) { dir := t.TempDir() fileName := filepath.Join(dir, "proxy.log") - cfg := &config.Log{ - Encoder: "tidb", - LogOnline: config.LogOnline{ - Level: "info", - LogFile: config.LogFile{ - Filename: fileName, - MaxSize: 1, - MaxDays: 2, - MaxBackups: 1, + cfg := &config.Config{ + Log: config.Log{ + Encoder: "tidb", + LogOnline: config.LogOnline{ + Level: "info", + LogFile: config.LogFile{ + Filename: fileName, + MaxSize: 1, + MaxDays: 2, + MaxBackups: 1, + }, }, }, } @@ -57,9 +60,9 @@ func TestUpdateCfg(t *testing.T) { cfg.LogFile.MaxBackups = 2 }, action: func(log *zap.Logger) { - msg := strings.Repeat("a", 800*1024) + msg := strings.Repeat("a", 600*1024) log.Info(msg) - msg = strings.Repeat("b", 800*1024) + msg = strings.Repeat("b", 600*1024) log.Error(msg) }, check: func(files []os.FileInfo) bool { @@ -81,7 +84,7 @@ func TestUpdateCfg(t *testing.T) { if len(files) != 1 { return false } - return files[0].Size() >= int64(2500*1024) + return files[0].Size() >= int64(5*500*1024) }, }, { @@ -115,25 +118,39 @@ func TestUpdateCfg(t *testing.T) { // Make sure the latest config also applies to cloned loggers. lg = lg.Named("another").With(zap.String("field", "test_field")) for i, test := range tests { + require.NoError(t, lg.Sync()) + + clonedCfg := cfg.Clone() + test.updateCfg(&clonedCfg.Log.LogOnline) + ch <- clonedCfg + + // 2rd will block due to watch channel of size 1 + // this ensured all old data are flushed by closing the older file logger + ch <- clonedCfg + + // delete old data after flushed err := os.RemoveAll(dir) require.NoError(t, err) - clonedCfg := cfg.LogOnline - test.updateCfg(&clonedCfg) - // Push it 2 times to make sure the first one has already taken affect. - ch <- &clonedCfg - ch <- &clonedCfg + // write new data test.action(lg) - // Backup files are removed by another goroutine, so there will be some delay. - // We check it multiple times until it succeeds. + // retry before new data are flushed timer := time.NewTimer(3 * time.Second) succeed := false for !succeed { select { case <-timer.C: - t.Fatalf("%dth case time out", i) - case <-time.After(10 * time.Millisecond): + bstr := new(strings.Builder) + logfiles := readLogFiles(t, dir) + e := int64(0) + for _, f := range logfiles { + fmt.Fprintf(bstr, "%s: %d\n", f.Name(), f.Size()) + e += f.Size() + } + fmt.Fprintf(bstr, "3#### %d\n", e) + t.Fatalf("%dth case time out:\n%s", i, bstr.String()) + case <-time.After(50 * time.Millisecond): logfiles := readLogFiles(t, dir) if test.check(logfiles) { succeed = true @@ -145,12 +162,11 @@ func TestUpdateCfg(t *testing.T) { } } -func setupLogManager(t *testing.T, cfg *config.Log) (*zap.Logger, chan *config.LogOnline) { - lm, lg, err := NewLoggerManager(cfg) +func setupLogManager(t *testing.T, cfg *config.Config) (*zap.Logger, chan<- *config.Config) { + lm, lg, err := NewLoggerManager(&cfg.Log) require.NoError(t, err) - ch := make(chan *config.LogOnline) + ch := make(chan *config.Config) lm.Init(ch) - t.Cleanup(func() { require.NoError(t, lm.Close()) }) @@ -179,15 +195,17 @@ func readLogFiles(t *testing.T, dir string) []os.FileInfo { func TestLogConcurrently(t *testing.T) { dir := t.TempDir() fileName := filepath.Join(dir, "proxy.log") - cfg := &config.Log{ - Encoder: "tidb", - LogOnline: config.LogOnline{ - Level: "info", - LogFile: config.LogFile{ - Filename: fileName, - MaxSize: 1, - MaxDays: 2, - MaxBackups: 3, + cfg := &config.Config{ + Log: config.Log{ + Encoder: "tidb", + LogOnline: config.LogOnline{ + Level: "info", + LogFile: config.LogFile{ + Filename: fileName, + MaxSize: 1, + MaxDays: 2, + MaxBackups: 3, + }, }, }, } @@ -210,16 +228,16 @@ func TestLogConcurrently(t *testing.T) { }) } wg.Run(func() { - newCfg := cfg.LogOnline + newCfg := cfg.Clone() for ctx.Err() == nil { - newCfg.LogFile.MaxDays = int(rand.Int31n(10)) - ch <- &newCfg + newCfg.Log.LogFile.MaxDays = int(rand.Int31n(10)) + ch <- newCfg time.Sleep(10 * time.Millisecond) - newCfg.LogFile.MaxBackups = int(rand.Int31n(10)) - ch <- &newCfg + newCfg.Log.LogFile.MaxBackups = int(rand.Int31n(10)) + ch <- newCfg time.Sleep(10 * time.Millisecond) - newCfg.LogFile.MaxSize = int(rand.Int31n(10)) - ch <- &newCfg + newCfg.Log.LogFile.MaxSize = int(rand.Int31n(10)) + ch <- newCfg time.Sleep(10 * time.Millisecond) } }) diff --git a/pkg/proxy/proxy.go b/pkg/proxy/proxy.go index b7ffbd0d..b21befba 100644 --- a/pkg/proxy/proxy.go +++ b/pkg/proxy/proxy.go @@ -88,15 +88,15 @@ func (s *SQLServer) reset(cfg *config.ProxyServerOnline) { s.mu.Unlock() } -func (s *SQLServer) Run(ctx context.Context, onlineProxyConfig <-chan *config.ProxyServerOnline) { +func (s *SQLServer) Run(ctx context.Context, cfgch <-chan *config.Config) { // Create another context because it still needs to run after graceful shutdown. ctx, s.cancelFunc = context.WithCancel(context.Background()) for { select { case <-ctx.Done(): return - case och := <-onlineProxyConfig: - s.reset(och) + case ach := <-cfgch: + s.reset(&ach.Proxy.ProxyServerOnline) default: conn, err := s.listener.Accept() if err != nil { diff --git a/pkg/server/api/config.go b/pkg/server/api/config.go index 632fa0d8..58d1db54 100644 --- a/pkg/server/api/config.go +++ b/pkg/server/api/config.go @@ -19,6 +19,7 @@ import ( "github.com/gin-gonic/gin" "github.com/pingcap/TiProxy/lib/config" + "github.com/pingcap/TiProxy/lib/util/errors" mgrcfg "github.com/pingcap/TiProxy/pkg/manager/config" "go.uber.org/zap" ) @@ -28,12 +29,8 @@ type configHttpHandler struct { cfgmgr *mgrcfg.ConfigManager } -type OnlineCfgTypes interface { - config.ProxyServerOnline | config.LogOnline -} - -func setConfig[T OnlineCfgTypes](h *configHttpHandler, c *gin.Context) { - cfg := new(T) +func (h *configHttpHandler) HandleSetConfig(c *gin.Context) { + cfg := new(config.Config) if c.ShouldBindJSON(cfg) != nil { c.JSON(http.StatusBadRequest, "bad config json") return @@ -48,10 +45,10 @@ func setConfig[T OnlineCfgTypes](h *configHttpHandler, c *gin.Context) { c.JSON(http.StatusOK, "") } -func getConfig[T OnlineCfgTypes](h *configHttpHandler, c *gin.Context) { - var cfg T +func (h *configHttpHandler) HandleGetConfig(c *gin.Context) { + var cfg config.Config err := h.cfgmgr.GetConfig(c, &cfg) - if err != nil { + if err != nil && !errors.Is(err, mgrcfg.ErrNoResults) { h.logger.Error("can not get config", zap.Error(err)) c.JSON(http.StatusInternalServerError, "can not get config") return @@ -62,16 +59,6 @@ func getConfig[T OnlineCfgTypes](h *configHttpHandler, c *gin.Context) { func registerConfig(group *gin.RouterGroup, logger *zap.Logger, cfgmgr *mgrcfg.ConfigManager) { h := &configHttpHandler{logger, cfgmgr} - group.PUT("/proxy", func(c *gin.Context) { - setConfig[config.ProxyServerOnline](h, c) - }) - group.GET("/proxy", func(c *gin.Context) { - getConfig[config.ProxyServerOnline](h, c) - }) - group.PUT("/log", func(c *gin.Context) { - setConfig[config.LogOnline](h, c) - }) - group.GET("/log", func(c *gin.Context) { - getConfig[config.LogOnline](h, c) - }) + group.PUT("/", h.HandleSetConfig) + group.GET("/", h.HandleGetConfig) } diff --git a/pkg/server/server.go b/pkg/server/server.go index dabc301f..f11ed1a3 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -63,8 +63,7 @@ type Server struct { // HTTP server HTTPListener net.Listener // L7 proxy - Proxy *proxy.SQLServer - proxyCh <-chan *config.ProxyServerOnline + Proxy *proxy.SQLServer } func NewServer(ctx context.Context, sctx *sctx.Context) (srv *Server, err error) { @@ -159,7 +158,7 @@ func NewServer(ctx context.Context, sctx *sctx.Context) (srv *Server, err error) return } - srv.LoggerManager.Init(mgrcfg.MakeConfigChan(srv.ConfigManager, &cfg.Log.LogOnline)) + srv.LoggerManager.Init(srv.ConfigManager.WatchConfig(ctx)) nscs, nerr := srv.ConfigManager.ListAllNamespace(ctx) if nerr != nil { @@ -217,10 +216,8 @@ func NewServer(ctx context.Context, sctx *sctx.Context) (srv *Server, err error) return } - srv.proxyCh = mgrcfg.MakeConfigChan(srv.ConfigManager, &cfg.Proxy.ProxyServerOnline) - srv.wg.Run(func() { - srv.Proxy.Run(ctx, srv.proxyCh) + srv.Proxy.Run(ctx, srv.ConfigManager.WatchConfig(ctx)) }) }