Skip to content

Commit

Permalink
manager: add logger manager (#88)
Browse files Browse the repository at this point in the history
  • Loading branch information
djshow832 authored Sep 21, 2022
1 parent ea3cd6f commit 31a5604
Show file tree
Hide file tree
Showing 4 changed files with 519 additions and 0 deletions.
58 changes: 58 additions & 0 deletions pkg/manager/logger/builder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package logger

import (
"sync"

"github.com/pingcap/TiProxy/lib/config"
"github.com/pingcap/TiProxy/lib/util/cmd"
"github.com/pingcap/log"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

var registerEncoders sync.Once

func buildEncoder(cfg *config.Log) (zapcore.Encoder, error) {
zapcfg := zap.NewDevelopmentConfig()
zapcfg.Encoding = cfg.Encoder
zapcfg.DisableStacktrace = true
encoder, err := log.NewTextEncoder(&log.Config{})
if err != nil {
return nil, err
}
registerEncoders.Do(func() {
zap.RegisterEncoder("tidb", func(cfg zapcore.EncoderConfig) (zapcore.Encoder, error) {
return encoder, nil
})
zap.RegisterEncoder("newtidb", func(cfg zapcore.EncoderConfig) (zapcore.Encoder, error) {
return cmd.NewTiDBEncoder(cfg), nil
})
})
return encoder, nil
}

func buildLevel(cfg *config.Log) (zap.AtomicLevel, error) {
return zap.ParseAtomicLevel(cfg.Level)
}

func buildSyncer(cfg *config.Log) (*AtomicWriteSyncer, error) {
syncer := &AtomicWriteSyncer{}
if err := syncer.Rebuild(cfg); err != nil {
return nil, err
}
return syncer, nil
}
110 changes: 110 additions & 0 deletions pkg/manager/logger/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package logger

import (
"context"
"encoding/json"

"github.com/pingcap/TiProxy/lib/config"
"github.com/pingcap/TiProxy/lib/util/waitgroup"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

// LoggerManager updates log configurations online.
type LoggerManager struct {
// The logger used by LoggerManager itself to log.
logger *zap.Logger
encoder zapcore.Encoder
syncer *AtomicWriteSyncer
level zap.AtomicLevel
cancel context.CancelFunc
wg waitgroup.WaitGroup
}

// NewLoggerManager creates a new LoggerManager.
func NewLoggerManager(cfg *config.Log) (*LoggerManager, error) {
lm := &LoggerManager{}
var err error
if lm.encoder, err = buildEncoder(cfg); err != nil {
return nil, err
}
if lm.syncer, err = buildSyncer(cfg); err != nil {
return nil, err
}
if lm.level, err = buildLevel(cfg); err != nil {
return nil, err
}
return lm, nil
}

// BuildLogger returns a new logger with the same syncer.
func (lm *LoggerManager) BuildLogger() *zap.Logger {
return zap.New(zapcore.NewCore(lm.encoder, lm.syncer, lm.level),
zap.ErrorOutput(lm.syncer),
zap.AddStacktrace(zapcore.FatalLevel))
}

// Init starts a goroutine to watch configuration.
func (lm *LoggerManager) Init(logger *zap.Logger, cfgCh chan *config.Log) {
lm.logger = logger
ctx, cancel := context.WithCancel(context.Background())
lm.wg.Run(func() {
lm.watchCfg(ctx, cfgCh)
})
lm.cancel = cancel
}

func (lm *LoggerManager) watchCfg(ctx context.Context, cfgCh chan *config.Log) {
for {
select {
case <-ctx.Done():
return
case cfg := <-cfgCh:
err := lm.updateLoggerCfg(cfg)
if err != nil {
bytes, merr := json.Marshal(cfg)
if merr != nil {
lm.logger.Error("update logger configuration failed", zap.NamedError("marshal_err", merr), zap.Error(err))
continue
}
lm.logger.Error("update logger configuration failed", zap.String("cfg", string(bytes)), zap.Error(err))
}
}
}
}

func (lm *LoggerManager) updateLoggerCfg(cfg *config.Log) error {
// encoder cannot be configured dynamically, because Core.With always clones the encoder.
if err := lm.syncer.Rebuild(cfg); err != nil {
return err
}
if level, err := zapcore.ParseLevel(cfg.Level); err != nil {
return err
} else {
lm.level.SetLevel(level)
}
return nil
}

// Close releases all resources.
func (lm *LoggerManager) Close() error {
if lm.cancel != nil {
lm.cancel()
}
lm.wg.Wait()
return lm.syncer.Close()
}
207 changes: 207 additions & 0 deletions pkg/manager/logger/manager_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package logger

import (
"context"
"math/rand"
"os"
"path/filepath"
"strings"
"testing"
"time"

"github.com/pingcap/TiProxy/lib/config"
"github.com/pingcap/TiProxy/lib/util/waitgroup"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)

// Test that the configuration change takes affect.
func TestUpdateCfg(t *testing.T) {
dir := t.TempDir()
fileName := filepath.Join(dir, "proxy.log")
cfg := &config.Log{
Level: "info",
Encoder: "tidb",
LogFile: config.LogFile{
Filename: fileName,
MaxSize: 1,
MaxDays: 2,
MaxBackups: 1,
},
}

tests := []struct {
updateCfg func(cfg *config.Log)
action func(log *zap.Logger)
check func(files []os.FileInfo)
}{
{
updateCfg: func(cfg *config.Log) {
cfg.Level = "error"
cfg.LogFile.MaxBackups = 2
},
action: func(log *zap.Logger) {
msg := strings.Repeat("a", 800*1024)
log.Info(msg)
msg = strings.Repeat("b", 800*1024)
log.Error(msg)
},
check: func(files []os.FileInfo) {
require.Equal(t, 1, len(files))
},
},
{
updateCfg: func(cfg *config.Log) {
cfg.LogFile.MaxSize = 3
cfg.LogFile.MaxBackups = 5
},
action: func(log *zap.Logger) {
for i := 0; i < 5; i++ {
msg := strings.Repeat("a", 500*1024)
log.Info(msg)
}
},
check: func(files []os.FileInfo) {
require.Equal(t, 1, len(files))
require.GreaterOrEqual(t, files[0].Size(), int64(2500*1024))
},
},
{
updateCfg: func(cfg *config.Log) {
cfg.LogFile.MaxBackups = 2
},
action: func(log *zap.Logger) {
for i := 0; i < 15; i++ {
msg := strings.Repeat("a", 300*1024)
log.Info(msg)
}
},
check: func(files []os.FileInfo) {
require.Equal(t, 3, len(files))
},
},
{
updateCfg: func(cfg *config.Log) {
cfg.LogFile.Filename = ""
},
action: func(log *zap.Logger) {
log.Info("a")
},
check: func(files []os.FileInfo) {
require.Equal(t, 0, len(files))
},
},
}

lg, ch := setupLogManager(t, cfg)
// Make sure the latest config also applies to cloned loggers.
lg = lg.Named("another").With(zap.String("field", "test_field"))
for _, test := range tests {
err := os.RemoveAll(dir)
require.NoError(t, err)

clonedCfg := *cfg
test.updateCfg(&clonedCfg)
// Push it 2 times to make sure the first one has already taken affect.
ch <- &clonedCfg
ch <- &clonedCfg
test.action(lg)
logfiles := readLogFiles(t, dir)
test.check(logfiles)
}
}

func setupLogManager(t *testing.T, cfg *config.Log) (*zap.Logger, chan *config.Log) {
lm, err := NewLoggerManager(cfg)
require.NoError(t, err)
lg := lm.BuildLogger()
ch := make(chan *config.Log)
lm.Init(lg, ch)

t.Cleanup(func() {
require.NoError(t, lm.Close())
})
return lg, ch
}

func readLogFiles(t *testing.T, dir string) []os.FileInfo {
entries, err := os.ReadDir(dir)
// The directory may not exist when the output is stdout.
if err != nil {
require.ErrorIs(t, err, os.ErrNotExist)
return nil
}
files := make([]os.FileInfo, 0, len(entries))
for _, entry := range entries {
if info, err := entry.Info(); err == nil {
files = append(files, info)
} else {
require.ErrorIs(t, err, os.ErrNotExist)
}
}
return files
}

// Test that the manager won't panic or hang when loggers log concurrently.
func TestLogConcurrently(t *testing.T) {
dir := t.TempDir()
fileName := filepath.Join(dir, "proxy.log")
cfg := &config.Log{
Level: "info",
Encoder: "tidb",
LogFile: config.LogFile{
Filename: fileName,
MaxSize: 1,
MaxDays: 2,
MaxBackups: 3,
},
}

lg, ch := setupLogManager(t, cfg)
var wg waitgroup.WaitGroup
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
for i := 0; i < 5; i++ {
wg.Run(func() {
for ctx.Err() == nil {
lg = lg.Named("test_name")
lg.Info("test_info")
lg.Warn("test_warn")
lg.Error("test_error")
lg = lg.With(zap.String("with", "test_with"))
lg.Info("test_info")
lg.Warn("test_warn")
lg.Error("test_error")
}
})
}
wg.Run(func() {
newCfg := *cfg
for ctx.Err() == nil {
newCfg.LogFile.MaxDays = int(rand.Int31n(10))
ch <- &newCfg
time.Sleep(10 * time.Millisecond)
newCfg.LogFile.MaxBackups = int(rand.Int31n(10))
ch <- &newCfg
time.Sleep(10 * time.Millisecond)
newCfg.LogFile.MaxSize = int(rand.Int31n(10))
ch <- &newCfg
time.Sleep(10 * time.Millisecond)
}
})
wg.Wait()
cancel()
}
Loading

0 comments on commit 31a5604

Please sign in to comment.