From 134952b6c52ed9d149dbe8e933245b437918efd2 Mon Sep 17 00:00:00 2001 From: Zhen Ye Date: Wed, 8 Jan 2025 11:30:56 +0800 Subject: [PATCH] enhance: refactor the streaming node log (#39052) issue: #38399 Signed-off-by: chyezh --- .../streamingnode/server/resource/resource.go | 11 ++++++++++- .../streamingnode/server/resource/test_utility.go | 5 ++++- .../service/handler/consumer/consume_server.go | 12 ++++++++---- .../service/handler/producer/produce_server.go | 10 +++++++--- .../streamingnode/server/wal/adaptor/opener.go | 12 +++++++----- .../server/wal/adaptor/scanner_adaptor.go | 6 +++++- .../server/wal/adaptor/wal_adaptor.go | 8 +++++++- .../server/wal/interceptors/segment/builder.go | 10 +++++++--- .../wal/interceptors/segment/inspector/impls.go | 3 +++ .../segment/manager/partition_manager.go | 4 +++- .../segment/manager/partition_managers.go | 8 ++++++-- .../timetick/timetick_sync_operator.go | 9 ++++++--- .../server/wal/interceptors/txn/txn_manager.go | 4 +++- .../server/walmanager/manager_impl.go | 15 +++++++++------ .../server/walmanager/wal_lifetime.go | 4 ++-- .../server/walmanager/wal_lifetime_test.go | 3 ++- 16 files changed, 89 insertions(+), 35 deletions(-) diff --git a/internal/streamingnode/server/resource/resource.go b/internal/streamingnode/server/resource/resource.go index 06edb5a5cd32b..0626d9de28648 100644 --- a/internal/streamingnode/server/resource/resource.go +++ b/internal/streamingnode/server/resource/resource.go @@ -12,10 +12,14 @@ import ( "github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/segment/stats" tinspector "github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/timetick/inspector" "github.com/milvus-io/milvus/internal/types" + "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/syncutil" + "github.com/milvus-io/milvus/pkg/util/typeutil" ) -var r = &resourceImpl{} // singleton resource instance +var r = &resourceImpl{ + logger: log.With(log.FieldModule(typeutil.StreamingNodeRole)), +} // singleton resource instance // optResourceInit is the option to initialize the resource. type optResourceInit func(r *resourceImpl) @@ -93,6 +97,7 @@ func Resource() *resourceImpl { // All utility on it is concurrent-safe and singleton. type resourceImpl struct { flusher flusher.Flusher + logger *log.MLogger timestampAllocator idalloc.Allocator idAllocator idalloc.Allocator etcdClient *clientv3.Client @@ -153,6 +158,10 @@ func (r *resourceImpl) TimeTickInspector() tinspector.TimeTickSyncInspector { return r.timeTickInspector } +func (r *resourceImpl) Logger() *log.MLogger { + return r.logger +} + // assertNotNil panics if the resource is nil. func assertNotNil(v interface{}) { iv := reflect.ValueOf(v) diff --git a/internal/streamingnode/server/resource/test_utility.go b/internal/streamingnode/server/resource/test_utility.go index 3fddc19b893f2..a287d856933d1 100644 --- a/internal/streamingnode/server/resource/test_utility.go +++ b/internal/streamingnode/server/resource/test_utility.go @@ -10,12 +10,15 @@ import ( "github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/segment/stats" tinspector "github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/timetick/inspector" "github.com/milvus-io/milvus/internal/types" + "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/syncutil" ) // InitForTest initializes the singleton of resources for test. func InitForTest(t *testing.T, opts ...optResourceInit) { - r = &resourceImpl{} + r = &resourceImpl{ + logger: log.With(), + } for _, opt := range opts { opt(r) } diff --git a/internal/streamingnode/server/service/handler/consumer/consume_server.go b/internal/streamingnode/server/service/handler/consumer/consume_server.go index 4d1a2bb74b693..45cf3db8c7b3e 100644 --- a/internal/streamingnode/server/service/handler/consumer/consume_server.go +++ b/internal/streamingnode/server/service/handler/consumer/consume_server.go @@ -6,6 +6,7 @@ import ( "github.com/cockroachdb/errors" "go.uber.org/zap" + "github.com/milvus-io/milvus/internal/streamingnode/server/resource" "github.com/milvus-io/milvus/internal/streamingnode/server/wal" "github.com/milvus-io/milvus/internal/streamingnode/server/walmanager" "github.com/milvus-io/milvus/internal/util/streamingutil/service/contextutil" @@ -80,7 +81,7 @@ func CreateConsumeServer(walManager walmanager.Manager, streamServer streamingpb }); err != nil { // release the scanner to avoid resource leak. if err := scanner.Close(); err != nil { - log.Warn("close scanner failed at create consume server", zap.Error(err)) + resource.Resource().Logger().Warn("close scanner failed at create consume server", zap.Error(err)) } return nil, err } @@ -89,9 +90,12 @@ func CreateConsumeServer(walManager walmanager.Manager, streamServer streamingpb consumerID: 1, scanner: scanner, consumeServer: consumeServer, - logger: log.With(zap.String("channel", l.Channel().Name), zap.Int64("term", l.Channel().Term)), // Add trace info for all log. - closeCh: make(chan struct{}), - metrics: metrics, + logger: resource.Resource().Logger().With( + log.FieldComponent("consumer-server"), + zap.String("channel", l.Channel().Name), + zap.Int64("term", l.Channel().Term)), // Add trace info for all log. + closeCh: make(chan struct{}), + metrics: metrics, }, nil } diff --git a/internal/streamingnode/server/service/handler/producer/produce_server.go b/internal/streamingnode/server/service/handler/producer/produce_server.go index 366e79534d0d1..0bd9b35721dae 100644 --- a/internal/streamingnode/server/service/handler/producer/produce_server.go +++ b/internal/streamingnode/server/service/handler/producer/produce_server.go @@ -7,6 +7,7 @@ import ( "github.com/cockroachdb/errors" "go.uber.org/zap" + "github.com/milvus-io/milvus/internal/streamingnode/server/resource" "github.com/milvus-io/milvus/internal/streamingnode/server/wal" "github.com/milvus-io/milvus/internal/streamingnode/server/walmanager" "github.com/milvus-io/milvus/internal/util/streamingutil/service/contextutil" @@ -45,9 +46,12 @@ func CreateProduceServer(walManager walmanager.Manager, streamServer streamingpb } metrics := newProducerMetrics(l.Channel()) return &ProduceServer{ - wal: l, - produceServer: produceServer, - logger: log.With(zap.String("channel", l.Channel().Name), zap.Int64("term", l.Channel().Term)), + wal: l, + produceServer: produceServer, + logger: resource.Resource().Logger().With( + log.FieldComponent("producer-server"), + zap.String("channel", l.Channel().Name), + zap.Int64("term", l.Channel().Term)), produceMessageCh: make(chan *streamingpb.ProduceMessageResponse), appendWG: sync.WaitGroup{}, metrics: metrics, diff --git a/internal/streamingnode/server/wal/adaptor/opener.go b/internal/streamingnode/server/wal/adaptor/opener.go index de1b7963c586a..8fc2d8ba45ca3 100644 --- a/internal/streamingnode/server/wal/adaptor/opener.go +++ b/internal/streamingnode/server/wal/adaptor/opener.go @@ -23,6 +23,7 @@ func adaptImplsToOpener(opener walimpls.OpenerImpls, builders []interceptors.Int idAllocator: typeutil.NewIDAllocator(), walInstances: typeutil.NewConcurrentMap[int64, wal.WAL](), interceptorBuilders: builders, + logger: log.With(log.FieldComponent("opener")), } } @@ -33,6 +34,7 @@ type openerAdaptorImpl struct { idAllocator *typeutil.IDAllocator walInstances *typeutil.ConcurrentMap[int64, wal.WAL] // store all wal instances allocated by these allocator. interceptorBuilders []interceptors.InterceptorBuilder + logger *log.MLogger } // Open opens a wal instance for the channel. @@ -43,24 +45,24 @@ func (o *openerAdaptorImpl) Open(ctx context.Context, opt *wal.OpenOption) (wal. defer o.lifetime.Done() id := o.idAllocator.Allocate() - log := log.With(zap.Any("channel", opt.Channel), zap.Int64("id", id)) + logger := o.logger.With(zap.Any("channel", opt.Channel), zap.Int64("id", id)) l, err := o.opener.Open(ctx, &walimpls.OpenOption{ Channel: opt.Channel, }) if err != nil { - log.Warn("open wal failed", zap.Error(err)) + logger.Warn("open wal failed", zap.Error(err)) return nil, err } // wrap the wal into walExtend with cleanup function and interceptors. wal := adaptImplsToWAL(l, o.interceptorBuilders, func() { o.walInstances.Remove(id) - log.Info("wal deleted from opener") + logger.Info("wal deleted from opener") }) o.walInstances.Insert(id, wal) - log.Info("new wal created") + logger.Info("new wal created") return wal, nil } @@ -72,7 +74,7 @@ func (o *openerAdaptorImpl) Close() { // close all wal instances. o.walInstances.Range(func(id int64, l wal.WAL) bool { l.Close() - log.Info("close wal by opener", zap.Int64("id", id), zap.Any("channel", l.Channel())) + o.logger.Info("close wal by opener", zap.Int64("id", id), zap.Any("channel", l.Channel())) return true }) // close the opener diff --git a/internal/streamingnode/server/wal/adaptor/scanner_adaptor.go b/internal/streamingnode/server/wal/adaptor/scanner_adaptor.go index 6e293353c40e4..1e4844e6d7207 100644 --- a/internal/streamingnode/server/wal/adaptor/scanner_adaptor.go +++ b/internal/streamingnode/server/wal/adaptor/scanner_adaptor.go @@ -35,7 +35,11 @@ func newScannerAdaptor( readOption.MesasgeHandler = defaultMessageHandler(make(chan message.ImmutableMessage)) } options.GetFilterFunc(readOption.MessageFilter) - logger := log.With(zap.String("name", name), zap.String("channel", l.Channel().Name)) + logger := resource.Resource().Logger().With( + log.FieldComponent("scanner"), + zap.String("name", name), + zap.String("channel", l.Channel().Name), + ) s := &scannerAdaptorImpl{ logger: logger, innerWAL: l, diff --git a/internal/streamingnode/server/wal/adaptor/wal_adaptor.go b/internal/streamingnode/server/wal/adaptor/wal_adaptor.go index c84ec8b63aba4..17e0909b4e7a4 100644 --- a/internal/streamingnode/server/wal/adaptor/wal_adaptor.go +++ b/internal/streamingnode/server/wal/adaptor/wal_adaptor.go @@ -6,6 +6,7 @@ import ( "go.uber.org/zap" "google.golang.org/protobuf/types/known/anypb" + "github.com/milvus-io/milvus/internal/streamingnode/server/resource" "github.com/milvus-io/milvus/internal/streamingnode/server/wal" "github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors" "github.com/milvus-io/milvus/internal/streamingnode/server/wal/metricsutil" @@ -50,6 +51,10 @@ func adaptImplsToWAL( cleanup: cleanup, writeMetrics: metricsutil.NewWriteMetrics(basicWAL.Channel(), basicWAL.WALName()), scanMetrics: metricsutil.NewScanMetrics(basicWAL.Channel()), + logger: resource.Resource().Logger().With( + log.FieldComponent("wal"), + zap.Any("channel", basicWAL.Channel()), + ), } param.WAL.Set(wal) return wal @@ -68,6 +73,7 @@ type walAdaptorImpl struct { cleanup func() writeMetrics *metricsutil.WriteMetrics scanMetrics *metricsutil.ScanMetrics + logger *log.MLogger } func (w *walAdaptorImpl) WALName() string { @@ -193,7 +199,7 @@ func (w *walAdaptorImpl) Available() <-chan struct{} { // Close overrides Scanner Close function. func (w *walAdaptorImpl) Close() { - logger := log.With(zap.Any("channel", w.Channel()), zap.String("processing", "WALClose")) + logger := w.logger.With(zap.String("processing", "WALClose")) logger.Info("wal begin to close, start graceful close...") // graceful close the interceptors before wal closing. w.interceptorBuildResult.GracefulCloseFunc() diff --git a/internal/streamingnode/server/wal/interceptors/segment/builder.go b/internal/streamingnode/server/wal/interceptors/segment/builder.go index b2ef3cafa8407..14578bf357af7 100644 --- a/internal/streamingnode/server/wal/interceptors/segment/builder.go +++ b/internal/streamingnode/server/wal/interceptors/segment/builder.go @@ -5,6 +5,7 @@ import ( "go.uber.org/zap" + "github.com/milvus-io/milvus/internal/streamingnode/server/resource" "github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors" "github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/segment/manager" "github.com/milvus-io/milvus/pkg/log" @@ -21,9 +22,12 @@ func (b *interceptorBuilder) Build(param interceptors.InterceptorBuildParam) int assignManager := syncutil.NewFuture[*manager.PChannelSegmentAllocManager]() ctx, cancel := context.WithCancel(context.Background()) segmentInterceptor := &segmentInterceptor{ - ctx: ctx, - cancel: cancel, - logger: log.With(zap.Any("pchannel", param.WALImpls.Channel())), + ctx: ctx, + cancel: cancel, + logger: resource.Resource().Logger().With( + log.FieldComponent("segment-assigner"), + zap.Any("pchannel", param.WALImpls.Channel()), + ), assignManager: assignManager, } go segmentInterceptor.recoverPChannelManager(param) diff --git a/internal/streamingnode/server/wal/interceptors/segment/inspector/impls.go b/internal/streamingnode/server/wal/interceptors/segment/inspector/impls.go index de8ed3e119b2f..5a95f177ecc7b 100644 --- a/internal/streamingnode/server/wal/interceptors/segment/inspector/impls.go +++ b/internal/streamingnode/server/wal/interceptors/segment/inspector/impls.go @@ -6,6 +6,7 @@ import ( "github.com/milvus-io/milvus/internal/streamingnode/server/resource" "github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/segment/stats" + "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/syncutil" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -30,6 +31,7 @@ func NewSealedInspector(n *stats.SealSignalNotifier) SealOperationInspector { }, }), triggerCh: make(chan string), + logger: resource.Resource().Logger().With(log.FieldComponent("segment-assigner")), } go s.background() return s @@ -43,6 +45,7 @@ type sealOperationInspectorImpl struct { notifier *stats.SealSignalNotifier backOffTimer *typeutil.BackoffTimer triggerCh chan string + logger *log.MLogger } // TriggerSealWaited implements SealInspector.TriggerSealWaited. diff --git a/internal/streamingnode/server/wal/interceptors/segment/manager/partition_manager.go b/internal/streamingnode/server/wal/interceptors/segment/manager/partition_manager.go index bce92f57960d6..59be40f37c39a 100644 --- a/internal/streamingnode/server/wal/interceptors/segment/manager/partition_manager.go +++ b/internal/streamingnode/server/wal/interceptors/segment/manager/partition_manager.go @@ -36,7 +36,9 @@ func newPartitionSegmentManager( ) *partitionSegmentManager { return &partitionSegmentManager{ mu: sync.Mutex{}, - logger: log.With( + logger: resource.Resource().Logger().With( + log.FieldComponent("segment-assigner"), + zap.Any("pchannel", pchannel), zap.Any("pchannel", pchannel), zap.String("vchannel", vchannel), zap.Int64("collectionID", collectionID), diff --git a/internal/streamingnode/server/wal/interceptors/segment/manager/partition_managers.go b/internal/streamingnode/server/wal/interceptors/segment/manager/partition_managers.go index c4269cc8636c7..02ec9996e58e1 100644 --- a/internal/streamingnode/server/wal/interceptors/segment/manager/partition_managers.go +++ b/internal/streamingnode/server/wal/interceptors/segment/manager/partition_managers.go @@ -7,6 +7,7 @@ import ( "go.uber.org/zap" "github.com/milvus-io/milvus/internal/proto/rootcoordpb" + "github.com/milvus-io/milvus/internal/streamingnode/server/resource" "github.com/milvus-io/milvus/internal/streamingnode/server/wal" "github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/segment/policy" "github.com/milvus-io/milvus/internal/streamingnode/server/wal/metricsutil" @@ -79,8 +80,11 @@ func buildNewPartitionManagers( } } m := &partitionSegmentManagers{ - mu: sync.Mutex{}, - logger: log.With(zap.Any("pchannel", pchannel)), + mu: sync.Mutex{}, + logger: resource.Resource().Logger().With( + log.FieldComponent("segment-assigner"), + zap.String("pchannel", pchannel.Name), + ), wal: wal, pchannel: pchannel, managers: managers, diff --git a/internal/streamingnode/server/wal/interceptors/timetick/timetick_sync_operator.go b/internal/streamingnode/server/wal/interceptors/timetick/timetick_sync_operator.go index 2fe9465b06ad7..d3bc977212495 100644 --- a/internal/streamingnode/server/wal/interceptors/timetick/timetick_sync_operator.go +++ b/internal/streamingnode/server/wal/interceptors/timetick/timetick_sync_operator.go @@ -27,9 +27,12 @@ var _ inspector.TimeTickSyncOperator = &timeTickSyncOperator{} func newTimeTickSyncOperator(param interceptors.InterceptorBuildParam) *timeTickSyncOperator { ctx, cancel := context.WithCancel(context.Background()) return &timeTickSyncOperator{ - ctx: ctx, - cancel: cancel, - logger: log.With(zap.Any("pchannel", param.WALImpls.Channel())), + ctx: ctx, + cancel: cancel, + logger: resource.Resource().Logger().With( + log.FieldComponent("timetick-sync"), + zap.Any("pchannel", param.WALImpls.Channel()), + ), pchannel: param.WALImpls.Channel(), ready: make(chan struct{}), interceptorBuildParam: param, diff --git a/internal/streamingnode/server/wal/interceptors/txn/txn_manager.go b/internal/streamingnode/server/wal/interceptors/txn/txn_manager.go index e8a7f8d96fe09..0aba8bdd6a5f1 100644 --- a/internal/streamingnode/server/wal/interceptors/txn/txn_manager.go +++ b/internal/streamingnode/server/wal/interceptors/txn/txn_manager.go @@ -24,6 +24,7 @@ func NewTxnManager(pchannel types.PChannelInfo) *TxnManager { sessions: make(map[message.TxnID]*TxnSession), closed: nil, metrics: metricsutil.NewTxnMetrics(pchannel.Name), + logger: resource.Resource().Logger().With(log.FieldComponent("txn-manager")), } } @@ -35,6 +36,7 @@ type TxnManager struct { sessions map[message.TxnID]*TxnSession closed lifetime.SafeChan metrics *metricsutil.TxnMetrics + logger *log.MLogger } // BeginNewTxn starts a new transaction with a session. @@ -120,7 +122,7 @@ func (m *TxnManager) GracefulClose(ctx context.Context) error { m.closed.Close() } } - log.Info("there's still txn session in txn manager, waiting for them to be consumed", zap.Int("session count", len(m.sessions))) + m.logger.Info("there's still txn session in txn manager, waiting for them to be consumed", zap.Int("session count", len(m.sessions))) m.mu.Unlock() select { diff --git a/internal/streamingnode/server/walmanager/manager_impl.go b/internal/streamingnode/server/walmanager/manager_impl.go index 087e8bce44910..774a5982afa0c 100644 --- a/internal/streamingnode/server/walmanager/manager_impl.go +++ b/internal/streamingnode/server/walmanager/manager_impl.go @@ -5,6 +5,7 @@ import ( "go.uber.org/zap" + "github.com/milvus-io/milvus/internal/streamingnode/server/resource" "github.com/milvus-io/milvus/internal/streamingnode/server/wal" "github.com/milvus-io/milvus/internal/streamingnode/server/wal/registry" "github.com/milvus-io/milvus/internal/util/streamingutil/status" @@ -19,7 +20,7 @@ var errWALManagerClosed = status.NewOnShutdownError("wal manager is closed") // OpenManager create a wal manager. func OpenManager() (Manager, error) { walName := util.MustSelectWALName() - log.Info("open wal manager", zap.String("walName", walName)) + resource.Resource().Logger().Info("open wal manager", zap.String("walName", walName)) opener, err := registry.MustGetBuilder(walName).Build() if err != nil { return nil, err @@ -33,6 +34,7 @@ func newManager(opener wal.Opener) Manager { lifetime: typeutil.NewGenericLifetime[managerState](managerOpenable | managerRemoveable | managerGetable), wltMap: typeutil.NewConcurrentMap[string, *walLifetime](), opener: opener, + logger: resource.Resource().Logger().With(log.FieldComponent("wal-manager")), } } @@ -42,6 +44,7 @@ type managerImpl struct { wltMap *typeutil.ConcurrentMap[string, *walLifetime] opener wal.Opener // wal allocator + logger *log.MLogger } // Open opens a wal instance for the channel on this Manager. @@ -53,10 +56,10 @@ func (m *managerImpl) Open(ctx context.Context, channel types.PChannelInfo) (err defer func() { m.lifetime.Done() if err != nil { - log.Warn("open wal failed", zap.Error(err), zap.String("channel", channel.Name), zap.Int64("term", channel.Term)) + m.logger.Warn("open wal failed", zap.Error(err), zap.String("channel", channel.Name), zap.Int64("term", channel.Term)) return } - log.Info("open wal success", zap.String("channel", channel.Name), zap.Int64("term", channel.Term)) + m.logger.Info("open wal success", zap.String("channel", channel.Name), zap.Int64("term", channel.Term)) }() return m.getWALLifetime(channel.Name).Open(ctx, channel) @@ -71,10 +74,10 @@ func (m *managerImpl) Remove(ctx context.Context, channel types.PChannelInfo) (e defer func() { m.lifetime.Done() if err != nil { - log.Warn("remove wal failed", zap.Error(err), zap.String("channel", channel.Name), zap.Int64("term", channel.Term)) + m.logger.Warn("remove wal failed", zap.Error(err), zap.String("channel", channel.Name), zap.Int64("term", channel.Term)) return } - log.Info("remove wal success", zap.String("channel", channel.Name), zap.Int64("term", channel.Term)) + m.logger.Info("remove wal success", zap.String("channel", channel.Name), zap.Int64("term", channel.Term)) }() return m.getWALLifetime(channel.Name).Remove(ctx, channel.Term) @@ -144,7 +147,7 @@ func (m *managerImpl) getWALLifetime(channel string) *walLifetime { } // Perform a cas here. - newWLT := newWALLifetime(m.opener, channel) + newWLT := newWALLifetime(m.opener, channel, m.logger) wlt, loaded := m.wltMap.GetOrInsert(channel, newWLT) // if loaded, lifetime is exist, close the redundant lifetime. if loaded { diff --git a/internal/streamingnode/server/walmanager/wal_lifetime.go b/internal/streamingnode/server/walmanager/wal_lifetime.go index 616c1bc7c4b07..803db935b8835 100644 --- a/internal/streamingnode/server/walmanager/wal_lifetime.go +++ b/internal/streamingnode/server/walmanager/wal_lifetime.go @@ -13,7 +13,7 @@ import ( ) // newWALLifetime create a WALLifetime with opener. -func newWALLifetime(opener wal.Opener, channel string) *walLifetime { +func newWALLifetime(opener wal.Opener, channel string, logger *log.MLogger) *walLifetime { ctx, cancel := context.WithCancel(context.Background()) l := &walLifetime{ ctx: ctx, @@ -22,7 +22,7 @@ func newWALLifetime(opener wal.Opener, channel string) *walLifetime { finish: make(chan struct{}), opener: opener, statePair: newWALStatePair(), - logger: log.With(zap.String("channel", channel)), + logger: logger.With(zap.String("channel", channel)), } go l.backgroundTask() return l diff --git a/internal/streamingnode/server/walmanager/wal_lifetime_test.go b/internal/streamingnode/server/walmanager/wal_lifetime_test.go index a14464df8b594..32d63f5d75b5b 100644 --- a/internal/streamingnode/server/walmanager/wal_lifetime_test.go +++ b/internal/streamingnode/server/walmanager/wal_lifetime_test.go @@ -13,6 +13,7 @@ import ( "github.com/milvus-io/milvus/internal/streamingnode/server/resource" "github.com/milvus-io/milvus/internal/streamingnode/server/wal" internaltypes "github.com/milvus-io/milvus/internal/types" + "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/streaming/util/types" "github.com/milvus-io/milvus/pkg/util/syncutil" ) @@ -47,7 +48,7 @@ func TestWALLifetime(t *testing.T) { return l, nil }) - wlt := newWALLifetime(opener, channel) + wlt := newWALLifetime(opener, channel, log.With()) assert.Nil(t, wlt.GetWAL()) // Test open.