From 993db77121f5d5ab676afef4399c5c22d2a5f604 Mon Sep 17 00:00:00 2001 From: Vincent Lee Date: Fri, 22 Feb 2019 17:48:11 +0800 Subject: [PATCH] fix test case: should wait old connection closed --- bench/multi_bench/multi_bench.go | 4 ++-- nsqdserver/protocol_v2.go | 3 +++ nsqdserver/protocol_v2_test.go | 13 ++++++++++--- 3 files changed, 15 insertions(+), 5 deletions(-) diff --git a/bench/multi_bench/multi_bench.go b/bench/multi_bench/multi_bench.go index cbb356704..6e4c3f65b 100644 --- a/bench/multi_bench/multi_bench.go +++ b/bench/multi_bench/multi_bench.go @@ -1157,8 +1157,8 @@ func pubPipelineWorker(td time.Duration, globalPubMgr *nsq.TopicProducerMgr, top waitCh <- 1 cost := time.Since(s).Nanoseconds() - if cost > time.Millisecond.Nanoseconds()*100 { - log.Printf("pub id : %v slow:%v\n", traceID, cost) + if cost > time.Millisecond.Nanoseconds()*80 { + log.Printf("pub id : %v slow:%v, %v\n", traceID, cost, len(waitCh)) } addLatencyCounter(cost) } diff --git a/nsqdserver/protocol_v2.go b/nsqdserver/protocol_v2.go index d42d87189..464c4d9b7 100644 --- a/nsqdserver/protocol_v2.go +++ b/nsqdserver/protocol_v2.go @@ -637,6 +637,9 @@ func (p *protocolV2) messagePump(client *nsqd.ClientV2, startedChan chan bool, matched = !matched } if !matched { + if nsqd.NsqLogger().Level() >= levellogger.LOG_DETAIL { + nsqd.NsqLogger().Debugf("channel %v filtered message %v", subChannel.GetName(), nsqd.PrintMessageNoBody(msg)) + } subChannel.ConfirmBackendQueue(msg) subChannel.CleanWaitingRequeueChan(msg) subChannel.TryRefreshChannelEnd() diff --git a/nsqdserver/protocol_v2_test.go b/nsqdserver/protocol_v2_test.go index c251320ad..62786026f 100644 --- a/nsqdserver/protocol_v2_test.go +++ b/nsqdserver/protocol_v2_test.go @@ -4332,7 +4332,7 @@ func TestSubOrderedWithFilter(t *testing.T) { opts := nsqdNs.NewOptions() opts.Logger = newTestLogger(t) - opts.LogLevel = 1 + opts.LogLevel = 2 opts.SyncTimeout = time.Minute if testing.Verbose() { opts.LogLevel = 4 @@ -4351,7 +4351,7 @@ func TestSubOrderedWithFilter(t *testing.T) { Ext: true, } topic.SetDynamicInfo(topicDynConf, nil) - topic.GetChannel("ordered_ch") + channel := topic.GetChannel("ordered_ch") clientParams := make(map[string]interface{}) clientParams["client_id"] = "client_b" @@ -4371,8 +4371,15 @@ func TestSubOrderedWithFilter(t *testing.T) { } // since no match, will recv no message - time.Sleep(time.Second) + for { + time.Sleep(time.Second) + if channel.Depth() == 0 { + break + } + } conn.Close() + // wait old connection exit + time.Sleep(time.Second) for i := 0; i < 10; i++ { topic.PutMessage(nsqdNs.NewMessage(0, []byte("second"))) }