Skip to content

Commit

Permalink
enhance: refactor the streaming node log (#39052)
Browse files Browse the repository at this point in the history
issue: #38399

Signed-off-by: chyezh <chyezh@outlook.com>
  • Loading branch information
chyezh authored Jan 8, 2025
1 parent 329795b commit 134952b
Show file tree
Hide file tree
Showing 16 changed files with 89 additions and 35 deletions.
11 changes: 10 additions & 1 deletion internal/streamingnode/server/resource/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,14 @@ import (
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/segment/stats"
tinspector "github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/timetick/inspector"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/syncutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

var r = &resourceImpl{} // singleton resource instance
var r = &resourceImpl{
logger: log.With(log.FieldModule(typeutil.StreamingNodeRole)),
} // singleton resource instance

// optResourceInit is the option to initialize the resource.
type optResourceInit func(r *resourceImpl)
Expand Down Expand Up @@ -93,6 +97,7 @@ func Resource() *resourceImpl {
// All utility on it is concurrent-safe and singleton.
type resourceImpl struct {
flusher flusher.Flusher
logger *log.MLogger
timestampAllocator idalloc.Allocator
idAllocator idalloc.Allocator
etcdClient *clientv3.Client
Expand Down Expand Up @@ -153,6 +158,10 @@ func (r *resourceImpl) TimeTickInspector() tinspector.TimeTickSyncInspector {
return r.timeTickInspector
}

func (r *resourceImpl) Logger() *log.MLogger {
return r.logger
}

// assertNotNil panics if the resource is nil.
func assertNotNil(v interface{}) {
iv := reflect.ValueOf(v)
Expand Down
5 changes: 4 additions & 1 deletion internal/streamingnode/server/resource/test_utility.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,15 @@ import (
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/segment/stats"
tinspector "github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/timetick/inspector"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/syncutil"
)

// InitForTest initializes the singleton of resources for test.
func InitForTest(t *testing.T, opts ...optResourceInit) {
r = &resourceImpl{}
r = &resourceImpl{
logger: log.With(),
}
for _, opt := range opts {
opt(r)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/cockroachdb/errors"
"go.uber.org/zap"

"github.com/milvus-io/milvus/internal/streamingnode/server/resource"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal"
"github.com/milvus-io/milvus/internal/streamingnode/server/walmanager"
"github.com/milvus-io/milvus/internal/util/streamingutil/service/contextutil"
Expand Down Expand Up @@ -80,7 +81,7 @@ func CreateConsumeServer(walManager walmanager.Manager, streamServer streamingpb
}); err != nil {
// release the scanner to avoid resource leak.
if err := scanner.Close(); err != nil {
log.Warn("close scanner failed at create consume server", zap.Error(err))
resource.Resource().Logger().Warn("close scanner failed at create consume server", zap.Error(err))
}
return nil, err
}
Expand All @@ -89,9 +90,12 @@ func CreateConsumeServer(walManager walmanager.Manager, streamServer streamingpb
consumerID: 1,
scanner: scanner,
consumeServer: consumeServer,
logger: log.With(zap.String("channel", l.Channel().Name), zap.Int64("term", l.Channel().Term)), // Add trace info for all log.
closeCh: make(chan struct{}),
metrics: metrics,
logger: resource.Resource().Logger().With(
log.FieldComponent("consumer-server"),
zap.String("channel", l.Channel().Name),
zap.Int64("term", l.Channel().Term)), // Add trace info for all log.
closeCh: make(chan struct{}),
metrics: metrics,
}, nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/cockroachdb/errors"
"go.uber.org/zap"

"github.com/milvus-io/milvus/internal/streamingnode/server/resource"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal"
"github.com/milvus-io/milvus/internal/streamingnode/server/walmanager"
"github.com/milvus-io/milvus/internal/util/streamingutil/service/contextutil"
Expand Down Expand Up @@ -45,9 +46,12 @@ func CreateProduceServer(walManager walmanager.Manager, streamServer streamingpb
}
metrics := newProducerMetrics(l.Channel())
return &ProduceServer{
wal: l,
produceServer: produceServer,
logger: log.With(zap.String("channel", l.Channel().Name), zap.Int64("term", l.Channel().Term)),
wal: l,
produceServer: produceServer,
logger: resource.Resource().Logger().With(
log.FieldComponent("producer-server"),
zap.String("channel", l.Channel().Name),
zap.Int64("term", l.Channel().Term)),
produceMessageCh: make(chan *streamingpb.ProduceMessageResponse),
appendWG: sync.WaitGroup{},
metrics: metrics,
Expand Down
12 changes: 7 additions & 5 deletions internal/streamingnode/server/wal/adaptor/opener.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ func adaptImplsToOpener(opener walimpls.OpenerImpls, builders []interceptors.Int
idAllocator: typeutil.NewIDAllocator(),
walInstances: typeutil.NewConcurrentMap[int64, wal.WAL](),
interceptorBuilders: builders,
logger: log.With(log.FieldComponent("opener")),
}
}

Expand All @@ -33,6 +34,7 @@ type openerAdaptorImpl struct {
idAllocator *typeutil.IDAllocator
walInstances *typeutil.ConcurrentMap[int64, wal.WAL] // store all wal instances allocated by these allocator.
interceptorBuilders []interceptors.InterceptorBuilder
logger *log.MLogger
}

// Open opens a wal instance for the channel.
Expand All @@ -43,24 +45,24 @@ func (o *openerAdaptorImpl) Open(ctx context.Context, opt *wal.OpenOption) (wal.
defer o.lifetime.Done()

id := o.idAllocator.Allocate()
log := log.With(zap.Any("channel", opt.Channel), zap.Int64("id", id))
logger := o.logger.With(zap.Any("channel", opt.Channel), zap.Int64("id", id))

l, err := o.opener.Open(ctx, &walimpls.OpenOption{
Channel: opt.Channel,
})
if err != nil {
log.Warn("open wal failed", zap.Error(err))
logger.Warn("open wal failed", zap.Error(err))
return nil, err
}

// wrap the wal into walExtend with cleanup function and interceptors.
wal := adaptImplsToWAL(l, o.interceptorBuilders, func() {
o.walInstances.Remove(id)
log.Info("wal deleted from opener")
logger.Info("wal deleted from opener")
})

o.walInstances.Insert(id, wal)
log.Info("new wal created")
logger.Info("new wal created")
return wal, nil
}

Expand All @@ -72,7 +74,7 @@ func (o *openerAdaptorImpl) Close() {
// close all wal instances.
o.walInstances.Range(func(id int64, l wal.WAL) bool {
l.Close()
log.Info("close wal by opener", zap.Int64("id", id), zap.Any("channel", l.Channel()))
o.logger.Info("close wal by opener", zap.Int64("id", id), zap.Any("channel", l.Channel()))
return true
})
// close the opener
Expand Down
6 changes: 5 additions & 1 deletion internal/streamingnode/server/wal/adaptor/scanner_adaptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,11 @@ func newScannerAdaptor(
readOption.MesasgeHandler = defaultMessageHandler(make(chan message.ImmutableMessage))
}
options.GetFilterFunc(readOption.MessageFilter)
logger := log.With(zap.String("name", name), zap.String("channel", l.Channel().Name))
logger := resource.Resource().Logger().With(
log.FieldComponent("scanner"),
zap.String("name", name),
zap.String("channel", l.Channel().Name),
)
s := &scannerAdaptorImpl{
logger: logger,
innerWAL: l,
Expand Down
8 changes: 7 additions & 1 deletion internal/streamingnode/server/wal/adaptor/wal_adaptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"go.uber.org/zap"
"google.golang.org/protobuf/types/known/anypb"

"github.com/milvus-io/milvus/internal/streamingnode/server/resource"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/metricsutil"
Expand Down Expand Up @@ -50,6 +51,10 @@ func adaptImplsToWAL(
cleanup: cleanup,
writeMetrics: metricsutil.NewWriteMetrics(basicWAL.Channel(), basicWAL.WALName()),
scanMetrics: metricsutil.NewScanMetrics(basicWAL.Channel()),
logger: resource.Resource().Logger().With(
log.FieldComponent("wal"),
zap.Any("channel", basicWAL.Channel()),
),
}
param.WAL.Set(wal)
return wal
Expand All @@ -68,6 +73,7 @@ type walAdaptorImpl struct {
cleanup func()
writeMetrics *metricsutil.WriteMetrics
scanMetrics *metricsutil.ScanMetrics
logger *log.MLogger
}

func (w *walAdaptorImpl) WALName() string {
Expand Down Expand Up @@ -193,7 +199,7 @@ func (w *walAdaptorImpl) Available() <-chan struct{} {

// Close overrides Scanner Close function.
func (w *walAdaptorImpl) Close() {
logger := log.With(zap.Any("channel", w.Channel()), zap.String("processing", "WALClose"))
logger := w.logger.With(zap.String("processing", "WALClose"))
logger.Info("wal begin to close, start graceful close...")
// graceful close the interceptors before wal closing.
w.interceptorBuildResult.GracefulCloseFunc()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"go.uber.org/zap"

"github.com/milvus-io/milvus/internal/streamingnode/server/resource"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/segment/manager"
"github.com/milvus-io/milvus/pkg/log"
Expand All @@ -21,9 +22,12 @@ func (b *interceptorBuilder) Build(param interceptors.InterceptorBuildParam) int
assignManager := syncutil.NewFuture[*manager.PChannelSegmentAllocManager]()
ctx, cancel := context.WithCancel(context.Background())
segmentInterceptor := &segmentInterceptor{
ctx: ctx,
cancel: cancel,
logger: log.With(zap.Any("pchannel", param.WALImpls.Channel())),
ctx: ctx,
cancel: cancel,
logger: resource.Resource().Logger().With(
log.FieldComponent("segment-assigner"),
zap.Any("pchannel", param.WALImpls.Channel()),
),
assignManager: assignManager,
}
go segmentInterceptor.recoverPChannelManager(param)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/milvus-io/milvus/internal/streamingnode/server/resource"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/segment/stats"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/syncutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
Expand All @@ -30,6 +31,7 @@ func NewSealedInspector(n *stats.SealSignalNotifier) SealOperationInspector {
},
}),
triggerCh: make(chan string),
logger: resource.Resource().Logger().With(log.FieldComponent("segment-assigner")),
}
go s.background()
return s
Expand All @@ -43,6 +45,7 @@ type sealOperationInspectorImpl struct {
notifier *stats.SealSignalNotifier
backOffTimer *typeutil.BackoffTimer
triggerCh chan string
logger *log.MLogger
}

// TriggerSealWaited implements SealInspector.TriggerSealWaited.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ func newPartitionSegmentManager(
) *partitionSegmentManager {
return &partitionSegmentManager{
mu: sync.Mutex{},
logger: log.With(
logger: resource.Resource().Logger().With(
log.FieldComponent("segment-assigner"),
zap.Any("pchannel", pchannel),
zap.Any("pchannel", pchannel),
zap.String("vchannel", vchannel),
zap.Int64("collectionID", collectionID),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"go.uber.org/zap"

"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
"github.com/milvus-io/milvus/internal/streamingnode/server/resource"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/segment/policy"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/metricsutil"
Expand Down Expand Up @@ -79,8 +80,11 @@ func buildNewPartitionManagers(
}
}
m := &partitionSegmentManagers{
mu: sync.Mutex{},
logger: log.With(zap.Any("pchannel", pchannel)),
mu: sync.Mutex{},
logger: resource.Resource().Logger().With(
log.FieldComponent("segment-assigner"),
zap.String("pchannel", pchannel.Name),
),
wal: wal,
pchannel: pchannel,
managers: managers,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,12 @@ var _ inspector.TimeTickSyncOperator = &timeTickSyncOperator{}
func newTimeTickSyncOperator(param interceptors.InterceptorBuildParam) *timeTickSyncOperator {
ctx, cancel := context.WithCancel(context.Background())
return &timeTickSyncOperator{
ctx: ctx,
cancel: cancel,
logger: log.With(zap.Any("pchannel", param.WALImpls.Channel())),
ctx: ctx,
cancel: cancel,
logger: resource.Resource().Logger().With(
log.FieldComponent("timetick-sync"),
zap.Any("pchannel", param.WALImpls.Channel()),
),
pchannel: param.WALImpls.Channel(),
ready: make(chan struct{}),
interceptorBuildParam: param,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ func NewTxnManager(pchannel types.PChannelInfo) *TxnManager {
sessions: make(map[message.TxnID]*TxnSession),
closed: nil,
metrics: metricsutil.NewTxnMetrics(pchannel.Name),
logger: resource.Resource().Logger().With(log.FieldComponent("txn-manager")),
}
}

Expand All @@ -35,6 +36,7 @@ type TxnManager struct {
sessions map[message.TxnID]*TxnSession
closed lifetime.SafeChan
metrics *metricsutil.TxnMetrics
logger *log.MLogger
}

// BeginNewTxn starts a new transaction with a session.
Expand Down Expand Up @@ -120,7 +122,7 @@ func (m *TxnManager) GracefulClose(ctx context.Context) error {
m.closed.Close()
}
}
log.Info("there's still txn session in txn manager, waiting for them to be consumed", zap.Int("session count", len(m.sessions)))
m.logger.Info("there's still txn session in txn manager, waiting for them to be consumed", zap.Int("session count", len(m.sessions)))
m.mu.Unlock()

select {
Expand Down
Loading

0 comments on commit 134952b

Please sign in to comment.