Skip to content

Commit

Permalink
fix test case: should wait old connection closed
Browse files Browse the repository at this point in the history
  • Loading branch information
absolute8511 committed Feb 22, 2019
1 parent 350de76 commit 993db77
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 5 deletions.
4 changes: 2 additions & 2 deletions bench/multi_bench/multi_bench.go
Original file line number Diff line number Diff line change
Expand Up @@ -1157,8 +1157,8 @@ func pubPipelineWorker(td time.Duration, globalPubMgr *nsq.TopicProducerMgr, top
waitCh <- 1

cost := time.Since(s).Nanoseconds()
if cost > time.Millisecond.Nanoseconds()*100 {
log.Printf("pub id : %v slow:%v\n", traceID, cost)
if cost > time.Millisecond.Nanoseconds()*80 {
log.Printf("pub id : %v slow:%v, %v\n", traceID, cost, len(waitCh))
}
addLatencyCounter(cost)
}
Expand Down
3 changes: 3 additions & 0 deletions nsqdserver/protocol_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,9 @@ func (p *protocolV2) messagePump(client *nsqd.ClientV2, startedChan chan bool,
matched = !matched
}
if !matched {
if nsqd.NsqLogger().Level() >= levellogger.LOG_DETAIL {
nsqd.NsqLogger().Debugf("channel %v filtered message %v", subChannel.GetName(), nsqd.PrintMessageNoBody(msg))
}
subChannel.ConfirmBackendQueue(msg)
subChannel.CleanWaitingRequeueChan(msg)
subChannel.TryRefreshChannelEnd()
Expand Down
13 changes: 10 additions & 3 deletions nsqdserver/protocol_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4332,7 +4332,7 @@ func TestSubOrderedWithFilter(t *testing.T) {

opts := nsqdNs.NewOptions()
opts.Logger = newTestLogger(t)
opts.LogLevel = 1
opts.LogLevel = 2
opts.SyncTimeout = time.Minute
if testing.Verbose() {
opts.LogLevel = 4
Expand All @@ -4351,7 +4351,7 @@ func TestSubOrderedWithFilter(t *testing.T) {
Ext: true,
}
topic.SetDynamicInfo(topicDynConf, nil)
topic.GetChannel("ordered_ch")
channel := topic.GetChannel("ordered_ch")

clientParams := make(map[string]interface{})
clientParams["client_id"] = "client_b"
Expand All @@ -4371,8 +4371,15 @@ func TestSubOrderedWithFilter(t *testing.T) {
}

// since no match, will recv no message
time.Sleep(time.Second)
for {
time.Sleep(time.Second)
if channel.Depth() == 0 {
break
}
}
conn.Close()
// wait old connection exit
time.Sleep(time.Second)
for i := 0; i < 10; i++ {
topic.PutMessage(nsqdNs.NewMessage(0, []byte("second")))
}
Expand Down

0 comments on commit 993db77

Please sign in to comment.