diff --git a/.travis.yml b/.travis.yml index d71464e2d..79c7fff9e 100644 --- a/.travis.yml +++ b/.travis.yml @@ -11,7 +11,7 @@ sudo: false script: - curl https://raw.githubusercontent.com/golang/dep/master/install.sh | sh - dep ensure - - travis_wait ./test.sh + - travis_wait 25 ./test.sh notifications: email: false diff --git a/Gopkg.lock b/Gopkg.lock index f915331d8..1df563803 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -266,12 +266,12 @@ revision = "7f31f4b264ec95ca86f90649fe4dca4a2a690f4a" [[projects]] - digest = "1:871f13c531ddcd27db330b10c1fc44051aa20ac81b5aeefe1900473382740444" + digest = "1:678f62e1137708588ffc59ed8b5eda4228e4696638da6ec97119b0070515766e" name = "github.com/youzan/go-nsq" packages = ["."] pruneopts = "UT" - revision = "04069106044ab415a8327a45404e1238de03fa89" - version = "v1.3.1" + revision = "4b00af35f37f4c6b137ba055c45419f97e3a1536" + version = "v1.3.2-HA" [[projects]] branch = "master" diff --git a/Gopkg.toml b/Gopkg.toml index 7e6c5b7da..33c226126 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -127,7 +127,7 @@ [[constraint]] name = "github.com/youzan/go-nsq" - version = "1.3.1" + version = "1.3.2-HA" [[constraint]] branch = "master" diff --git a/apps/nsqd/nsqd.go b/apps/nsqd/nsqd.go index b6cabaf51..d86f5ee64 100644 --- a/apps/nsqd/nsqd.go +++ b/apps/nsqd/nsqd.go @@ -164,6 +164,7 @@ func nsqdFlagSet(opts *nsqd.Options) *flag.FlagSet { flagSet.Duration("queue-scan-refresh-interval", opts.QueueScanRefreshInterval, "scan refresh interval for new channels") flagSet.Int("queue-scan-selection-count", opts.QueueScanSelectionCount, "select count for each scan") flagSet.Int("queue-scan-worker-pool-max", opts.QueueScanWorkerPoolMax, "the max scan worker pool") + flagSet.Int("queue-topic-job-worker-pool-max", opts.QueueTopicJobWorkerPoolMax, "the max scan worker pool") flagSet.Float64("queue-scan-dirty-percent", opts.QueueScanDirtyPercent, "retry scan immediately if dirty percent happened in last scan") flagSet.Bool("allow-zan-test-skip", opts.AllowZanTestSkip, "allow zan test message filter in new created channel & channels under newly upgraded topic") flagSet.Int("default-commit-buf", int(opts.DefaultCommitBuf), "the default commit buffer for topic data") diff --git a/bench/multi_bench/multi_bench.go b/bench/multi_bench/multi_bench.go index b3850a4cf..6e4c3f65b 100644 --- a/bench/multi_bench/multi_bench.go +++ b/bench/multi_bench/multi_bench.go @@ -5,6 +5,7 @@ import ( "encoding/binary" "errors" "flag" + "fmt" "log" "math/rand" "os" @@ -37,7 +38,9 @@ var ( concurrency = flagSet.Int("c", 100, "concurrency of goroutine") pubPoolSize = flagSet.Int("pub-pool", 1, "producer pool size") benchCase = flagSet.String("bench-case", "simple", "which bench should run (simple/benchpub/benchsub/benchdelaysub/checkdata/benchlookup/benchreg/consumeoffset/checkdata2)") + pipelineSize = flagSet.Int("pipeline", 0, "pipeline size") channelNum = flagSet.Int("ch_num", 1, "the channel number under each topic") + ephemeral = flagSet.Bool("ephemeral", false, "use ephemeral channel for test") trace = flagSet.Bool("trace", false, "enable the trace of pub and sub") ordered = flagSet.Bool("ordered", false, "enable ordered sub") retryBackground = flagSet.Bool("retry-background", false, "retry pub in background") @@ -79,6 +82,82 @@ var pubTraceFailedList map[string]map[uint64]int64 var maxDelayTs int64 var myRand = rand.New(rand.NewSource(time.Now().Unix())) +var latencyDistribute [32]int64 + +func addLatencyCounter(cost int64) { + index := cost / 1000 / 1000 + if index < 100 { + index = index / 10 + } else if index < 1000 { + index = 9 + index/100 + } else if index < 10000 { + index = 19 + index/1000 + } else { + index = 29 + } + atomic.AddInt64(&latencyDistribute[index], 1) +} + +func printLatencyStats() { + for i, v := range latencyDistribute { + if i == 0 { + fmt.Printf("latency <100ms:") + } else if i == 10 { + fmt.Printf("\nlatency 100ms ~ 999ms:") + } else if i == 20 { + fmt.Printf("\nlatency > 1s:") + } + fmt.Printf("%d: %v, ", i, v) + } + fmt.Printf("\n") +} + +func printTotalQPSAndLatencyStats(start time.Time, latency bool) { + end := time.Now() + duration := end.Sub(start) + tmc := atomic.LoadInt64(&totalMsgCount) + log.Printf("duration: %s - %.03fmb/s - %.03fops/s - %.03fus/op\n", + duration, + float64(tmc*int64(*size))/duration.Seconds()/1024/1024, + float64(tmc)/duration.Seconds(), + float64(duration/time.Microsecond)/(float64(tmc)+0.01)) + + log.Printf("total count: %v, total error : %v\n", tmc, atomic.LoadInt64(&totalErrCount)) + if latency { + printLatencyStats() + } +} + +func printPeriodQPSAndLatencyStats(start time.Time, latency bool, done chan int) { + go func() { + prevMsgCount := int64(0) + prevStart := start + for { + select { + case <-done: + return + default: + } + time.Sleep(time.Second * 5) + end := time.Now() + duration := end.Sub(prevStart) + currentTmc := atomic.LoadInt64(¤tMsgCount) + tmc := currentTmc - prevMsgCount + prevMsgCount = currentTmc + prevStart = time.Now() + log.Printf("duration: %s - %.03fmb/s - %.03fops/s - %.03fus/op\n", + duration, + float64(tmc*int64(*size))/duration.Seconds()/1024/1024, + float64(tmc)/duration.Seconds(), + float64(duration/time.Microsecond)/(float64(tmc)+0.01)) + + if latency { + printLatencyStats() + } + } + }() +} + type ByMsgOffset []*nsq.Message func (self ByMsgOffset) Len() int { @@ -128,8 +207,19 @@ func startBenchPub(msg []byte, batch [][]byte) { log.Printf("lookup connect error : %v", err) return } - + pubIDList := make(map[string]*int64) + // received max continuous trace id + subReceivedMaxTraceIDList := make(map[string]*int64) for _, t := range topics { + init := time.Now().UnixNano() + pubIDList[t] = &init + subInit := init + subReceivedMaxTraceIDList[t] = &subInit + topicMutex[t] = &sync.Mutex{} + } + for _, t := range topics { + v := pubIDList[t] + atomic.AddInt64(v, 1) if *trace { var id nsq.NewMessageID var offset uint64 @@ -156,10 +246,18 @@ func startBenchPub(msg []byte, batch [][]byte) { rdyChan := make(chan int) for j := 0; j < *concurrency; j++ { wg.Add(1) - go func() { + go func(index int) { defer wg.Done() - pubWorker(*runfor, pubMgr, topics[j%len(topics)], *batchSize, batch, rdyChan, goChan, false) - }() + mutex.Lock() + curTopic := topics[index%len(topics)] + counter := pubIDList[curTopic] + mutex.Unlock() + if *pipelineSize > 1 { + pubPipelineWorker(*runfor, pubMgr, topics[j%len(topics)], counter, rdyChan, goChan) + } else { + pubWorker(*runfor, pubMgr, topics[j%len(topics)], counter, batch, rdyChan, goChan, false) + } + }(j) <-rdyChan } @@ -175,37 +273,12 @@ func startBenchPub(msg []byte, batch [][]byte) { start := time.Now() close(goChan) - go func() { - prevMsgCount := int64(0) - prevStart := start - for { - time.Sleep(time.Second * 5) - end := time.Now() - duration := end.Sub(prevStart) - currentTmc := atomic.LoadInt64(¤tMsgCount) - tmc := currentTmc - prevMsgCount - prevMsgCount = currentTmc - prevStart = time.Now() - log.Printf("duration: %s - %.03fmb/s - %.03fops/s - %.03fus/op\n", - duration, - float64(tmc*int64(*size))/duration.Seconds()/1024/1024, - float64(tmc)/duration.Seconds(), - float64(duration/time.Microsecond)/(float64(tmc)+0.01)) - - } - - }() + done := make(chan int) + printPeriodQPSAndLatencyStats(start, true, done) wg.Wait() - end := time.Now() - duration := end.Sub(start) - tmc := atomic.LoadInt64(&totalMsgCount) - log.Printf("duration: %s - %.03fmb/s - %.03fops/s - %.03fus/op\n", - duration, - float64(tmc*int64(*size))/duration.Seconds()/1024/1024, - float64(tmc)/duration.Seconds(), - float64(duration/time.Microsecond)/(float64(tmc)+0.01)) - log.Printf("total count: %v, total error : %v\n", tmc, atomic.LoadInt64(&totalErrCount)) + close(done) + printTotalQPSAndLatencyStats(start, true) } func startBenchSub() { @@ -224,7 +297,11 @@ func startBenchSub() { for chIndex := 0; chIndex < *channelNum; chIndex++ { wg.Add(1) go func(id int, topic string, chSuffix string) { - subWorker(quitChan, *runfor, *lookupAddress, topic, topic+"_ch"+chSuffix, rdyChan, goChan, id) + chName := topic + "_ch" + chSuffix + if *ephemeral { + chName = chName + "#ephemeral" + } + subWorker(quitChan, *runfor, *lookupAddress, topic, chName, rdyChan, goChan, id) wg.Done() }(j, topics[j%len(topics)], strconv.Itoa(chIndex)) <-rdyChan @@ -244,38 +321,14 @@ func startBenchSub() { start := time.Now() close(goChan) close(quitChan) - go func() { - prevMsgCount := int64(0) - prevStart := start - for { - time.Sleep(time.Second * 5) - end := time.Now() - duration := end.Sub(prevStart) - currentTmc := atomic.LoadInt64(&totalSubMsgCount) - tmc := currentTmc - prevMsgCount - prevMsgCount = currentTmc - prevStart = time.Now() - log.Printf("duration: %s - %.03fmb/s - %.03fops/s - %.03fus/op\n", - duration, - float64(tmc*int64(*size))/duration.Seconds()/1024/1024, - float64(tmc)/duration.Seconds(), - float64(duration/time.Microsecond)/(float64(tmc)+0.01)) - - } - }() + done := make(chan int) + printPeriodQPSAndLatencyStats(start, false, done) wg.Wait() - end := time.Now() - duration := end.Sub(start) - tmc := atomic.LoadInt64(&totalSubMsgCount) - log.Printf("duration: %s - %.03fmb/s - %.03fops/s - %.03fus/op", - duration, - float64(tmc*int64(*size))/duration.Seconds()/1024/1024, - float64(tmc)/duration.Seconds(), - float64(duration/time.Microsecond)/(float64(tmc)+0.01)) - log.Printf("total count: %v\n", tmc) + close(done) + printTotalQPSAndLatencyStats(start, false) } func startSimpleTest(msg []byte, batch [][]byte) { @@ -383,14 +436,14 @@ func startCheckData2() { rdyChan := make(chan int) for j := 0; j < *concurrency; j++ { wg.Add(1) - go func() { + go func(index int) { defer wg.Done() mutex.Lock() - curTopic := topics[j%len(topics)] + curTopic := topics[index%len(topics)] counter := pubIDList[curTopic] mutex.Unlock() - pubWorker2(*runfor, pubMgr, curTopic, counter, rdyChan, goChan) - }() + pubWorker(*runfor, pubMgr, curTopic, counter, nil, rdyChan, goChan, false) + }(j) <-rdyChan } @@ -492,8 +545,19 @@ func startCheckData(msg []byte, batch [][]byte, testDelay bool) { log.Printf("lookup connect error : %v", err) return } - + pubIDList := make(map[string]*int64) + // received max continuous trace id + subReceivedMaxTraceIDList := make(map[string]*int64) + for _, t := range topics { + init := time.Now().UnixNano() + pubIDList[t] = &init + subInit := init + subReceivedMaxTraceIDList[t] = &subInit + topicMutex[t] = &sync.Mutex{} + } for _, t := range topics { + v := pubIDList[t] + atomic.AddInt64(v, 1) if *trace { var id nsq.NewMessageID var offset uint64 @@ -523,10 +587,14 @@ func startCheckData(msg []byte, batch [][]byte, testDelay bool) { rdyChan := make(chan int) for j := 0; j < *concurrency; j++ { wg.Add(1) - go func() { + go func(index int) { defer wg.Done() - pubWorker(*runfor, pubMgr, topics[j%len(topics)], *batchSize, batch, rdyChan, goChan, testDelay) - }() + mutex.Lock() + curTopic := topics[index%len(topics)] + counter := pubIDList[curTopic] + mutex.Unlock() + pubWorker(*runfor, pubMgr, topics[j%len(topics)], counter, batch, rdyChan, goChan, testDelay) + }(j) <-rdyChan } @@ -1001,8 +1069,7 @@ func main() { } } -func pubWorker(td time.Duration, globalPubMgr *nsq.TopicProducerMgr, topicName string, batchSize int, -batch [][]byte, rdyChan chan int, goChan chan int, testDelay bool) { +func getPubMgr(globalPubMgr *nsq.TopicProducerMgr, rdyChan chan int) *nsq.TopicProducerMgr { var pubMgr *nsq.TopicProducerMgr var err error if *useSinglePubMgr { @@ -1010,99 +1077,183 @@ batch [][]byte, rdyChan chan int, goChan chan int, testDelay bool) { } else { pubMgr, err = nsq.NewTopicProducerMgr(topics, config) if err != nil { - log.Printf("init error : %v", err) + log.Printf("init pub mgr error : %v", err) close(rdyChan) - return + return nil } pubMgr.SetLogger(log.New(os.Stderr, "", log.LstdFlags), nsq.LogLevelInfo) err = pubMgr.ConnectToNSQLookupd(*lookupAddress) if err != nil { log.Printf("lookup connect error : %v", err) close(rdyChan) - return + return nil } } + return pubMgr +} +func pubPipelineWorker(td time.Duration, globalPubMgr *nsq.TopicProducerMgr, topicName string, pubIDCounter *int64, + rdyChan chan int, goChan chan int) { + pubMgr := getPubMgr(globalPubMgr, rdyChan) + if pubMgr == nil { + return + } rdyChan <- 1 <-goChan var msgCount int64 endTime := time.Now().Add(td) - traceIDs := make([]uint64, len(batch)) - var traceResp pubResp - pubIDCounter := int64(0) + responseCh := make(chan *nsq.ProducerTransaction, *pipelineSize) + waitCh := make(chan int, *pipelineSize) + stopCh := make(chan int) + defer close(stopCh) + go func() { + for { + select { + case rsp := <-responseCh: + var tid int64 + if len(rsp.Args) > 0 { + tid = rsp.Args[0].(int64) + } + if rsp.Error != nil { + log.Printf("pub id : %v error :%v\n", tid, rsp.Error) + } + if len(rsp.ResponseData) < 2 || string(rsp.ResponseData) != "OK" { + log.Printf("pub id : %v invalid response :%s\n", tid, rsp.ResponseData) + } + select { + case <-waitCh: + case <-stopCh: + return + } + case <-stopCh: + return + } + } + }() for { - if time.Now().After(endTime) { + s := time.Now() + if s.After(endTime) { break } if (*sleepfor).Nanoseconds() > int64(10000) { time.Sleep(*sleepfor) } + traceID := atomic.AddInt64(pubIDCounter, 1) + data := make([]byte, 8) + binary.BigEndian.PutUint64(data, uint64(traceID)) + + err := pubMgr.PublishAsync(topicName, data, responseCh, traceID) + if err != nil { + log.Printf("pub id : %v error :%v\n", traceID, err) + atomic.AddInt64(&totalErrCount, 1) + time.Sleep(time.Second) + continue + } + msgCount += int64(1) + atomic.AddInt64(¤tMsgCount, int64(1)) + if time.Now().After(endTime) { + break + } + waitCh <- 1 + + cost := time.Since(s).Nanoseconds() + if cost > time.Millisecond.Nanoseconds()*80 { + log.Printf("pub id : %v slow:%v, %v\n", traceID, cost, len(waitCh)) + } + addLatencyCounter(cost) + } + atomic.AddInt64(&totalMsgCount, msgCount) +} + +func pubWorker(td time.Duration, globalPubMgr *nsq.TopicProducerMgr, topicName string, pubIDCounter *int64, + batch [][]byte, + rdyChan chan int, goChan chan int, testDelay bool) { + + var err error + pubMgr := getPubMgr(globalPubMgr, rdyChan) + if pubMgr == nil { + return + } + rdyChan <- 1 + <-goChan + var msgCount int64 + endTime := time.Now().Add(td) + var traceResp pubResp + mutex.Lock() + failedList, ok := pubTraceFailedList[topicName] + if !ok { + failedList = make(map[uint64]int64) + pubTraceFailedList[topicName] = failedList + } + failedLocker := topicMutex[topicName] + mutex.Unlock() + traceIDs := make([]uint64, len(batch)) - traceID := atomic.AddInt64(&pubIDCounter, 1) - traceData := make([]byte, 8) - binary.BigEndian.PutUint64(traceData, uint64(traceID)) + for { + s := time.Now() + if s.After(endTime) { + break + } + if (*sleepfor).Nanoseconds() > int64(10000) { + time.Sleep(*sleepfor) + } - singleMsg := batch[0] + traceID := atomic.AddInt64(pubIDCounter, 1) + data := make([]byte, 8) + binary.BigEndian.PutUint64(data, uint64(traceID)) + if len(batch) > 0 { + data = batch[0] + } if testDelay && atomic.LoadInt64(¤tMsgCount)%int64(*delayPercent) == 0 { delayDuration := time.Second * time.Duration(1+myRand.Intn(*maxDelaySecs)) - delayTs := int(time.Now().Add(delayDuration).Unix()) + delayTs := int(s.Add(delayDuration).Unix()) if int64(delayTs) > atomic.LoadInt64(&maxDelayTs) { atomic.StoreInt64(&maxDelayTs, int64(delayTs)) } - singleMsg = []byte("delay-" + strconv.Itoa(delayTs)) + data = []byte("delay-" + strconv.Itoa(delayTs)) } if *trace { - if batchSize == 1 { + if *batchSize == 1 { if *ordered { - traceResp.id, traceResp.offset, traceResp.rawSize, err = pubMgr.PublishOrdered(topicName, traceData, singleMsg) + traceResp.id, traceResp.offset, traceResp.rawSize, err = pubMgr.PublishOrdered(topicName, data, data) } else { - traceResp.id, traceResp.offset, traceResp.rawSize, err = pubMgr.PublishAndTrace(topicName, uint64(traceID), singleMsg) + traceResp.id, traceResp.offset, traceResp.rawSize, err = pubMgr.PublishAndTrace(topicName, uint64(traceID), data) } } else { traceResp.id, traceResp.offset, traceResp.rawSize, err = pubMgr.MultiPublishAndTrace(topicName, traceIDs, batch) } + if err != nil { - log.Println("pub error :" + err.Error()) + log.Printf("pub id : %v error :%v\n", traceID, err) + failedLocker.Lock() + failedList[uint64(traceID)] = 1 + failedLocker.Unlock() atomic.AddInt64(&totalErrCount, 1) time.Sleep(time.Second) continue } - - pidStr := getPartitionID(traceResp.id) - mutex.Lock() - topicResp, ok := pubRespCheck[topicName+pidStr] - if !ok { - topicResp = make(map[uint64]pubResp) - pubRespCheck[topicName+pidStr] = topicResp - } - oldResp, ok := topicResp[uint64(traceResp.id)] - if ok { - log.Printf("got the same id with mpub: %v\n", traceResp) - if oldResp != traceResp { - log.Printf("response not the same old %v, new:%v\n", oldResp, traceResp) - } - } else { - topicResp[uint64(traceResp.id)] = traceResp - } - mutex.Unlock() } else { - var err error - if batchSize == 1 { + if *batchSize == 1 { if *retryBackground { - err = pubMgr.PublishAndRetryBackground(topicName, singleMsg) + err = pubMgr.PublishAndRetryBackground(topicName, data) } else { - err = pubMgr.Publish(topicName, singleMsg) + err = pubMgr.Publish(topicName, data) } } else { err = pubMgr.MultiPublish(topicName, batch) } if err != nil { - log.Println("pub error :" + err.Error()) + log.Printf("pub id : %v error :%v\n", traceID, err) + failedLocker.Lock() + failedList[uint64(traceID)] = 1 + failedLocker.Unlock() atomic.AddInt64(&totalErrCount, 1) time.Sleep(time.Second) continue } } + cost := time.Since(s).Nanoseconds() + addLatencyCounter(cost) msgCount += int64(len(batch)) atomic.AddInt64(¤tMsgCount, int64(len(batch))) if time.Now().After(endTime) { @@ -1193,7 +1344,7 @@ func (c *consumeHandler) HandleMessage(message *nsq.Message) error { } func subWorker(quitChan chan int, td time.Duration, lookupAddr string, topic string, channel string, -rdyChan chan int, goChan chan int, id int) { + rdyChan chan int, goChan chan int, id int) { consumer, err := nsq.NewConsumer(topic, channel, config) if err != nil { panic(err.Error()) @@ -1271,7 +1422,7 @@ func (c *consumeTraceIDHandler) HandleMessage(message *nsq.Message) error { } func subWorker2(quitChan chan int, td time.Duration, lookupAddr string, topic string, channel string, -subIDCounter *int64, subTraceWaiting map[uint64]*nsq.Message, locker *sync.Mutex, rdyChan chan int, goChan chan int, id int) { + subIDCounter *int64, subTraceWaiting map[uint64]*nsq.Message, locker *sync.Mutex, rdyChan chan int, goChan chan int, id int) { consumer, err := nsq.NewConsumer(topic, channel, config) if err != nil { panic(err.Error()) @@ -1299,91 +1450,3 @@ subIDCounter *int64, subTraceWaiting map[uint64]*nsq.Message, locker *sync.Mutex consumer.ConnectToNSQLookupd(lookupAddr) <-done } - -func pubWorker2(td time.Duration, globalPubMgr *nsq.TopicProducerMgr, topicName string, pubIDCounter *int64, rdyChan chan int, goChan chan int) { - var pubMgr *nsq.TopicProducerMgr - var err error - if *useSinglePubMgr { - pubMgr = globalPubMgr - } else { - pubMgr, err = nsq.NewTopicProducerMgr(topics, config) - if err != nil { - log.Printf("init pub mgr error : %v", err) - close(rdyChan) - return - } - pubMgr.SetLogger(log.New(os.Stderr, "", log.LstdFlags), nsq.LogLevelInfo) - err = pubMgr.ConnectToNSQLookupd(*lookupAddress) - if err != nil { - log.Printf("lookup connect error : %v", err) - close(rdyChan) - return - } - } - - rdyChan <- 1 - <-goChan - var msgCount int64 - endTime := time.Now().Add(td) - var traceResp pubResp - mutex.Lock() - failedList, ok := pubTraceFailedList[topicName] - if !ok { - failedList = make(map[uint64]int64) - pubTraceFailedList[topicName] = failedList - } - failedLocker := topicMutex[topicName] - mutex.Unlock() - - for { - if time.Now().After(endTime) { - break - } - if (*sleepfor).Nanoseconds() > int64(10000) { - time.Sleep(*sleepfor) - } - - traceID := atomic.AddInt64(pubIDCounter, 1) - data := make([]byte, 8) - binary.BigEndian.PutUint64(data, uint64(traceID)) - if *trace { - if *ordered { - traceResp.id, traceResp.offset, traceResp.rawSize, err = pubMgr.PublishOrdered(topicName, data, data) - } else { - traceResp.id, traceResp.offset, traceResp.rawSize, err = pubMgr.PublishAndTrace(topicName, uint64(traceID), data) - } - if err != nil { - log.Printf("pub id : %v error :%v\n", traceID, err) - failedLocker.Lock() - failedList[uint64(traceID)] = 1 - failedLocker.Unlock() - atomic.AddInt64(&totalErrCount, 1) - time.Sleep(time.Second) - continue - } - } else { - var err error - if *retryBackground { - err = pubMgr.PublishAndRetryBackground(topicName, data) - } else { - err = pubMgr.Publish(topicName, data) - } - if err != nil { - log.Printf("pub id : %v error :%v\n", traceID, err) - failedLocker.Lock() - failedList[uint64(traceID)] = 1 - failedLocker.Unlock() - - atomic.AddInt64(&totalErrCount, 1) - time.Sleep(time.Second) - continue - } - } - msgCount += 1 - atomic.AddInt64(¤tMsgCount, 1) - if time.Now().After(endTime) { - break - } - } - atomic.AddInt64(&totalMsgCount, msgCount) -} \ No newline at end of file diff --git a/consistence/commitlog.go b/consistence/commitlog.go index f1afdcc5f..3e6339605 100644 --- a/consistence/commitlog.go +++ b/consistence/commitlog.go @@ -36,7 +36,7 @@ var ( ErrCommitLogCleanKeepMin = errors.New("commit log clean should keep some data") ) -var LOGROTATE_NUM = 2000000 +var LOGROTATE_NUM = 500000 var MIN_KEEP_LOG_ITEM = 1000 var bp sync.Pool diff --git a/consistence/coordinator_rpc.go b/consistence/coordinator_rpc.go index b0a80b5c3..cf9f4567c 100644 --- a/consistence/coordinator_rpc.go +++ b/consistence/coordinator_rpc.go @@ -356,11 +356,11 @@ func (self *NsqdCoordRpcServer) UpdateTopicInfo(rpcTopicReq *RpcAdminTopicInfo) if !ok { self.nsqdCoord.checkLocalTopicMagicCode(&rpcTopicReq.TopicPartitionMetaInfo, true) - // TODO: need handle fix mode here + tryFix := ForceFixLeaderData var localErr error - tpCoord, localErr = NewTopicCoordinator(rpcTopicReq.Name, rpcTopicReq.Partition, + tpCoord, localErr = NewTopicCoordinatorWithFixMode(rpcTopicReq.Name, rpcTopicReq.Partition, GetTopicPartitionBasePath(self.dataRootPath, rpcTopicReq.Name, rpcTopicReq.Partition), - rpcTopicReq.SyncEvery, rpcTopicReq.OrderedMulti) + rpcTopicReq.SyncEvery, rpcTopicReq.OrderedMulti, tryFix) if localErr != nil || tpCoord == nil { self.nsqdCoord.coordMutex.Unlock() ret = *ErrLocalInitTopicCoordFailed diff --git a/consistence/nsqd_coordinator.go b/consistence/nsqd_coordinator.go index 4a74c5303..0a98b89b9 100644 --- a/consistence/nsqd_coordinator.go +++ b/consistence/nsqd_coordinator.go @@ -595,7 +595,7 @@ func (self *NsqdCoordinator) forceCleanTopicData(topicName string, partition int func (self *NsqdCoordinator) initLocalTopicCoord(topicInfo *TopicPartitionMetaInfo, topicLeaderSession *TopicLeaderSession, - basepath string, forceFixLeader bool) (*TopicCoordinator, *nsqd.Topic, error) { + basepath string, forceFixCommitLog bool) (*TopicCoordinator, *nsqd.Topic, error) { self.coordMutex.Lock() defer self.coordMutex.Unlock() coords, ok := self.topicCoords[topicInfo.Name] @@ -612,7 +612,7 @@ func (self *NsqdCoordinator) initLocalTopicCoord(topicInfo *TopicPartitionMetaIn topicName := topicInfo.Name partition := topicInfo.Partition tc, err = NewTopicCoordinatorWithFixMode(topicInfo.Name, topicInfo.Partition, basepath, - topicInfo.SyncEvery, topicInfo.OrderedMulti, forceFixLeader) + topicInfo.SyncEvery, topicInfo.OrderedMulti, forceFixCommitLog) if err != nil { coordLog.Infof("failed to get topic coordinator:%v-%v, err:%v", topicName, partition, err) return nil, nil, err @@ -745,7 +745,8 @@ func (self *NsqdCoordinator) loadLocalTopicData() error { if err != nil { coordLog.Infof("failed to get topic leader info:%v-%v, err:%v", topicName, partition, err) } - _, _, loadErr := self.initLocalTopicCoord(topicInfo, topicLeaderSession, basepath, forceFixLeader) + fixCommitLog := ForceFixLeaderData + _, _, loadErr := self.initLocalTopicCoord(topicInfo, topicLeaderSession, basepath, fixCommitLog) if loadErr != nil { coordLog.Infof("topic %v coord init error: %v", topicInfo.GetTopicDesp(), loadErr.Error()) continue @@ -825,64 +826,68 @@ func (self *NsqdCoordinator) loadLocalTopicData() error { func checkAndFixLocalLogQueueEnd(tc *coordData, localLogQ ILocalLogQueue, logMgr *TopicCommitLogMgr, tryFixEnd bool, forceFix bool) error { + if logMgr == nil || localLogQ == nil { + return nil + } tname := tc.topicInfo.GetTopicDesp() - if logMgr != nil && localLogQ != nil { - logIndex, logOffset, logData, err := logMgr.GetLastCommitLogOffsetV2() - if err != nil { - if err != ErrCommitLogEOF { - coordLog.Errorf("commit log is corrupted: %v", err) - return err + logIndex, logOffset, logData, err := logMgr.GetLastCommitLogOffsetV2() + if err != nil { + if err != ErrCommitLogEOF { + coordLog.Errorf("commit log is corrupted: %v", err) + return err + } + + coordLog.Infof("no commit last log data : %v", err) + return nil + } + coordLog.Infof("current topic %v log: %v:%v, %v", + tname, logIndex, logOffset, logData) + + if !forceFix && !tryFixEnd { + return nil + } + localErr := localLogQ.ResetBackendEndNoLock(nsqd.BackendOffset(logData.MsgOffset+int64(logData.MsgSize)), + logData.MsgCnt+int64(logData.MsgNum)-1) + if localErr == nil { + return nil + } + coordLog.Errorf("topic %v reset local queue backend failed: %v", tname, localErr) + if !forceFix { + return localErr + } + realEnd := localLogQ.TotalDataSize() + cntNum, _ := logMgr.ConvertToCountIndex(logIndex, logOffset) + for { + cntNum-- + logIndex, logOffset, localErr = logMgr.ConvertToOffsetIndex(cntNum) + if localErr != nil { + coordLog.Errorf("topic %v try fix failed: %v , %v", tname, localErr, cntNum) + break + } + logData, localErr = logMgr.GetCommitLogFromOffsetV2(logIndex, logOffset) + if localErr != nil { + coordLog.Errorf("topic %v try fix failed: %v , %v:%v", tname, localErr, logIndex, logOffset) + break + } + if logData.MsgOffset+int64(logData.MsgSize) <= realEnd { + localErr = localLogQ.ResetBackendEndNoLock(nsqd.BackendOffset(logData.MsgOffset+int64(logData.MsgSize)), + logData.MsgCnt+int64(logData.MsgNum)-1) + if localErr != nil { + coordLog.Infof("topic %v reset local queue failed: %v at %v:%v", tname, localErr, logIndex, logOffset) } else { - coordLog.Infof("no commit last log data : %v", err) - } - } else { - coordLog.Infof("current topic %v log: %v:%v, %v", - tname, logIndex, logOffset, logData) - if tryFixEnd || forceFix { - localErr := localLogQ.ResetBackendEndNoLock(nsqd.BackendOffset(logData.MsgOffset+int64(logData.MsgSize)), - logData.MsgCnt+int64(logData.MsgNum)-1) + coordLog.Warningf("topic %v fix local queue to: %v, %v, commit log: %v:%v:%v", tname, + logData, realEnd, logIndex, logOffset, cntNum) + _, localErr = logMgr.TruncateToOffsetV2(logIndex, logOffset+int64(GetLogDataSize())) if localErr != nil { - coordLog.Errorf("topic %v reset local queue backend failed: %v", tname, localErr) - if forceFix { - realEnd := localLogQ.TotalDataSize() - cntNum, _ := logMgr.ConvertToCountIndex(logIndex, logOffset) - for { - cntNum-- - logIndex, logOffset, localErr = logMgr.ConvertToOffsetIndex(cntNum) - if localErr != nil { - coordLog.Errorf("topic %v try fix failed: %v , %v", tname, localErr, cntNum) - panic(localErr) - } - logData, localErr = logMgr.GetCommitLogFromOffsetV2(logIndex, logOffset) - if localErr != nil { - coordLog.Errorf("topic %v try fix failed: %v , %v:%v", tname, localErr, logIndex, logOffset) - panic(localErr) - } - if logData.MsgOffset+int64(logData.MsgSize) <= realEnd { - localErr = localLogQ.ResetBackendEndNoLock(nsqd.BackendOffset(logData.MsgOffset+int64(logData.MsgSize)), - logData.MsgCnt+int64(logData.MsgNum)-1) - if localErr != nil { - coordLog.Infof("topic %v reset local queue failed: %v at %v:%v", tname, localErr, logIndex, logOffset) - } else { - coordLog.Warningf("topic %v fix local queue to: %v, %v, commit log: %v:%v:%v", tname, - logData, realEnd, logIndex, logOffset, cntNum) - _, localErr = logMgr.TruncateToOffsetV2(logIndex, logOffset+int64(GetLogDataSize())) - if localErr != nil { - coordLog.Errorf("topic %v reset local queue failed: %v, at %v:%v", tname, - localErr, logIndex, logOffset) - } else { - return nil - } - } - } - } - } - return localErr + coordLog.Errorf("topic %v reset local queue failed: %v, at %v:%v", tname, + localErr, logIndex, logOffset) + } else { + return nil } } } } - return nil + return localErr } func checkAndFixLocalLogQueueData(tc *coordData, @@ -903,39 +908,38 @@ func checkAndFixLocalLogQueueData(tc *coordData, snap := localLogQ.GetDiskQueueSnapshot() for { err = snap.SeekTo(nsqd.BackendOffset(log.MsgOffset)) - if err != nil { - coordLog.Warningf("topic %v log start %v should be fixed: %v, %v", tc.topicInfo.GetTopicDesp(), logStart, log, err) - // try fix start - if err == nsqd.ErrReadQueueAlreadyCleaned { - start := snap.GetQueueReadStart() - logStart.SegmentStartOffset = GetNextLogOffset(logStart.SegmentStartOffset) - if log.MsgOffset+int64(log.MsgSize) < int64(start.Offset()) { - matchIndex, matchOffset, _, err := logMgr.SearchLogDataByMsgOffset(int64(start.Offset())) - if err != nil { - coordLog.Infof("search log failed: %v", err) - } else if matchIndex > logStart.SegmentStartIndex || - (matchIndex == logStart.SegmentStartIndex && matchOffset > logStart.SegmentStartOffset) { - logStart.SegmentStartIndex = matchIndex - logStart.SegmentStartOffset = matchOffset - } - } - err = logMgr.CleanOldData(logStart.SegmentStartIndex, logStart.SegmentStartOffset) - if err != nil { - // maybe the diskqueue data corrupt, we need sync from leader - coordLog.Errorf("clean log failed : %v, %v", logStart, err) - return err - } - logStart, log, err = logMgr.GetLogStartInfo() + if err == nil { + break + } + coordLog.Warningf("topic %v log start %v should be fixed: %v, %v", tc.topicInfo.GetTopicDesp(), logStart, log, err) + // try fix start + if err == nsqd.ErrReadQueueAlreadyCleaned { + start := snap.GetQueueReadStart() + logStart.SegmentStartOffset = GetNextLogOffset(logStart.SegmentStartOffset) + if log.MsgOffset+int64(log.MsgSize) < int64(start.Offset()) { + matchIndex, matchOffset, _, err := logMgr.SearchLogDataByMsgOffset(int64(start.Offset())) if err != nil { - return err + coordLog.Infof("search log failed: %v", err) + } else if matchIndex > logStart.SegmentStartIndex || + (matchIndex == logStart.SegmentStartIndex && matchOffset > logStart.SegmentStartOffset) { + logStart.SegmentStartIndex = matchIndex + logStart.SegmentStartOffset = matchOffset } - coordLog.Warningf("topic %v log start fixed to: %v, %v", tc.topicInfo.GetTopicDesp(), logStart, log) - } else { - coordLog.Errorf("read disk failed at log start: %v, %v, %v", logStart, log, err) + } + err = logMgr.CleanOldData(logStart.SegmentStartIndex, logStart.SegmentStartOffset) + if err != nil { + // maybe the diskqueue data corrupt, we need sync from leader + coordLog.Errorf("clean log failed : %v, %v", logStart, err) + return err + } + logStart, log, err = logMgr.GetLogStartInfo() + if err != nil { return err } + coordLog.Warningf("topic %v log start fixed to: %v, %v", tc.topicInfo.GetTopicDesp(), logStart, log) } else { - break + coordLog.Errorf("read disk failed at log start: %v, %v, %v", logStart, log, err) + return err } } if endFixErr != nil { diff --git a/consistence/nsqlookup_coordinator.go b/consistence/nsqlookup_coordinator.go index 5d164c0c1..1ef28be50 100644 --- a/consistence/nsqlookup_coordinator.go +++ b/consistence/nsqlookup_coordinator.go @@ -339,6 +339,7 @@ func (self *NsqLookupCoordinator) notifyTopicsToSingleNsqdForReload(topics []Top } func (self *NsqLookupCoordinator) notifyTopicsToAllNsqdForReload(topics []TopicPartitionMetaInfo) { + coordLog.Infof("notify all topics %v for all nodes ", len(topics)) for _, v := range topics { select { case <-self.stopChan: @@ -1416,7 +1417,6 @@ func (self *NsqLookupCoordinator) notifySingleNsqdForTopicReload(topicInfo Topic } func (self *NsqLookupCoordinator) notifyAllNsqdsForTopicReload(topicInfo TopicPartitionMetaInfo) *CoordErr { - coordLog.Infof("reload topic %v for all nodes ", topicInfo.GetTopicDesp()) rpcErr := self.notifyISRTopicMetaInfo(&topicInfo) if rpcErr != nil { coordLog.Infof("failed to notify topic %v info : %v", topicInfo.GetTopicDesp(), rpcErr) diff --git a/nsqadmin/http.go b/nsqadmin/http.go index ab3bc0fda..cfbc93514 100644 --- a/nsqadmin/http.go +++ b/nsqadmin/http.go @@ -861,21 +861,17 @@ func (s *httpServer) searchMessageTrace(w http.ResponseWriter, req *http.Request for index, m := range resultList.LogDataDtos { idx := index items := make([]TraceLogItemInfo, 0) - extraJsonStr, _ := strconv.Unquote(m.Extra) - err := json.Unmarshal([]byte(extraJsonStr), &items) + err = json.Unmarshal([]byte(m.Extra), &items) + // try compatible if err != nil || len(items) == 0 { - s.ctx.nsqadmin.logf("msg extra invalid: %v: %v, %v", m.Extra, m.Extra1, err) - err = json.Unmarshal([]byte(m.Extra), &items) - // try compatible + err = json.Unmarshal([]byte(m.Extra1), &items) if err != nil || len(items) == 0 { + s.ctx.nsqadmin.logf("msg extra invalid: %v: %v, %v", m.Extra, m.Extra1, err) extraJsonStr, _ := strconv.Unquote(m.Extra1) err = json.Unmarshal([]byte(extraJsonStr), &items) if err != nil || len(items) == 0 { s.ctx.nsqadmin.logf("msg extra1 invalid: %v, %v", m.Extra1, err) - err = json.Unmarshal([]byte(m.Extra1), &items) - if err != nil || len(items) == 0 { - continue - } + continue } } } @@ -954,6 +950,9 @@ func (s *httpServer) searchMessageTrace(w http.ResponseWriter, req *http.Request jsv.DC = v.DC logDataForJs = append(logDataForJs, jsv) } + if len(warnMessages) > 0 && requestMsgID > 0 { + needGetRequestMsg = true + } //s.ctx.nsqadmin.logf("sorted msg trace data : %v", logDataFilterEmpty) var requestMsg string requestMsgDC := make(map[string]string) @@ -973,8 +972,9 @@ func (s *httpServer) searchMessageTrace(w http.ResponseWriter, req *http.Request msgBody, _, err := s.ci.GetNSQDMessageByID(*producer, topicName, strconv.Itoa(pid), requestMsgID) if err != nil { s.ctx.nsqadmin.logf("get msg %v data failed : %v", requestMsgID, err) - warnMessages = append(warnMessages, err.Error()) + //warnMessages = append(warnMessages, err.Error()) } else { + s.ctx.nsqadmin.logf("get msg %v data : %v", requestMsgID, msgBody) if hasMultiDC { requestMsgDC[producer.DC] = msgBody var buf bytes.Buffer diff --git a/nsqd/channel.go b/nsqd/channel.go index 763494f59..d43f51f04 100644 --- a/nsqd/channel.go +++ b/nsqd/channel.go @@ -460,14 +460,18 @@ func (c *Channel) SetOrdered(enable bool) { default: } } else { - if c.GetClientsCount() == 0 { - atomic.StoreInt32(&c.requireOrder, 0) - select { - case c.tryReadBackend <- true: - default: - } - } else { - nsqLog.Logf("can not set ordered to false while the channel is still consuming by client") + c.RLock() + defer c.RUnlock() + if !c.IsOrdered() { + return + } + for _, c := range c.clients { + c.Exit() + } + atomic.StoreInt32(&c.requireOrder, 0) + select { + case c.tryReadBackend <- true: + default: } } } @@ -571,7 +575,7 @@ func (c *Channel) exit(deleted bool) error { <-c.exitSyncChan // write anything leftover to disk - c.Flush() + c.Flush(true) if deleted { // empty the queue (deletes the backend files, too) if c.GetDelayedQueue() != nil { @@ -597,13 +601,13 @@ func (c *Channel) skipChannelToEnd() (BackendQueueEnd, error) { return e, nil } -func (c *Channel) Flush() error { +func (c *Channel) Flush(fsync bool) error { if c.ephemeral { return nil } d, ok := c.backend.(*diskQueueReader) if ok { - d.Flush() + d.Flush(fsync) } return nil } @@ -707,6 +711,7 @@ func (c *Channel) doSkip(skipped bool) error { } } else { atomic.StoreInt32(&c.skipped, 0) + c.TryRefreshChannelEnd() } return nil } @@ -1049,6 +1054,14 @@ func (c *Channel) internalFinishMessage(clientID int64, clientAddr string, return offset, cnt, changed, msg, nil } +// if some message is skipped, we should try refresh channel end +// to get more possible new messages, since the end will only be updated when new message come in first time +func (c *Channel) TryRefreshChannelEnd() { + if c.IsWaitingMoreDiskData() { + c.moreDataCallback(c) + } +} + func (c *Channel) ContinueConsumeForOrder() { if c.IsOrdered() && atomic.LoadInt32(&c.needNotifyRead) == 1 { select { @@ -1979,11 +1992,20 @@ LOOP: } //let timer sync to update backend in replicas' channels - if c.IsSkipped() || c.shouldSkipZanTest(msg) { + needSkip := c.IsSkipped() + isZanTestSkip := false + if !needSkip { + isZanTestSkip = c.shouldSkipZanTest(msg) + needSkip = isZanTestSkip + } + if needSkip { if msg.DelayedType == ChannelDelayed { c.ConfirmDelayedMessage(msg) } else { c.ConfirmBackendQueue(msg) + if isZanTestSkip { + c.TryRefreshChannelEnd() + } } c.CleanWaitingRequeueChan(msg) continue LOOP diff --git a/nsqd/channel_test.go b/nsqd/channel_test.go index b6d1c190e..6468e1f77 100644 --- a/nsqd/channel_test.go +++ b/nsqd/channel_test.go @@ -65,7 +65,7 @@ func TestPutMessage(t *testing.T) { var id MessageID msg := NewMessage(id, []byte("test")) topic.PutMessage(msg) - topic.flush(true) + topic.ForceFlush() outputMsg := <-channel1.clientMsgChan equal(t, msg.ID, outputMsg.ID) @@ -89,7 +89,7 @@ func TestPutMessage2Chan(t *testing.T) { var id MessageID msg := NewMessage(id, []byte("test")) topic.PutMessage(msg) - topic.flush(true) + topic.flushBuffer(true) outputMsg1 := <-channel1.clientMsgChan equal(t, msg.ID, outputMsg1.ID) @@ -235,7 +235,7 @@ func TestChannelSkip(t *testing.T) { msgBytes := []byte(strconv.Itoa(10)) msg := NewMessage(msgId, msgBytes) _, backendOffsetMid, _, _, _ := topic.PutMessage(msg) - topic.flush(true) + topic.ForceFlush() equal(t, channel.Depth(), int64(11)) msgs = make([]*Message, 0, 9) @@ -247,7 +247,8 @@ func TestChannelSkip(t *testing.T) { msgs = append(msgs, msg) } topic.PutMessages(msgs) - topic.flush(true) + topic.flushBuffer(true) + time.Sleep(time.Millisecond) equal(t, channel.Depth(), int64(20)) //skip forward to message 10 @@ -288,13 +289,13 @@ func TestChannelInitWithOldStart(t *testing.T) { msgs = append(msgs, msg) } topic.PutMessages(msgs) - topic.flush(true) + topic.flushBuffer(true) var msgId MessageID msgBytes := []byte(strconv.Itoa(10)) msg := NewMessage(msgId, msgBytes) _, backendOffsetMid, _, _, _ := topic.PutMessage(msg) - topic.flush(true) + topic.ForceFlush() equal(t, channel.Depth(), int64(11)) channel.SetConsumeOffset(backendOffsetMid, 10, true) time.Sleep(time.Second) @@ -311,7 +312,7 @@ func TestChannelInitWithOldStart(t *testing.T) { msgs = append(msgs, msg) } _, putOffset, _, _, putEnd, _ := topic.PutMessages(msgs) - topic.flush(true) + topic.ForceFlush() t.Log(putEnd) t.Log(putOffset) @@ -319,7 +320,7 @@ func TestChannelInitWithOldStart(t *testing.T) { msgBytes = []byte(strconv.Itoa(1001)) msg = NewMessage(msgId2, msgBytes) topic.PutMessage(msg) - topic.flush(true) + topic.ForceFlush() channel2.skipChannelToEnd() channel3.SetConsumeOffset(putEnd.Offset(), putEnd.TotalMsgCnt(), true) @@ -370,7 +371,7 @@ func TestChannelResetReadEnd(t *testing.T) { msgBytes := []byte(strconv.Itoa(10)) msg := NewMessage(msgId, msgBytes) _, backendOffsetMid, _, _, _ := topic.PutMessage(msg) - topic.flush(true) + topic.ForceFlush() equal(t, channel.Depth(), int64(11)) msgs = make([]*Message, 0, 9) @@ -382,7 +383,8 @@ func TestChannelResetReadEnd(t *testing.T) { msgs = append(msgs, msg) } topic.PutMessages(msgs) - topic.flush(true) + topic.flushBuffer(true) + time.Sleep(time.Millisecond) equal(t, channel.Depth(), int64(20)) //skip forward to message 10 @@ -430,7 +432,7 @@ func TestChannelDepthTimestamp(t *testing.T) { msgs = append(msgs, msg) } topic.PutMessages(msgs) - topic.flush(true) + topic.ForceFlush() lastDepthTs := int64(0) for i := 0; i < 9; i++ { @@ -446,6 +448,85 @@ func TestChannelDepthTimestamp(t *testing.T) { equal(t, channel.DepthTimestamp(), int64(0)) } +func TestChannelUpdateEndWhenNeed(t *testing.T) { + // put will try update channel end if channel need more data + // and channel will try get newest end while need more data (no new put) + // consider below: + // 1. put 1,2,3 update channel end to 3 + // 2. consume 1 + // 3. put 4, no need update end + // 4. consume 2, 3 + // 5. check consume 4 without topic flush + // 6. consume end and put 5 + // 7. check consume 5 without flush + opts := NewOptions() + opts.SyncEvery = 100 + opts.LogLevel = 2 + opts.SyncTimeout = time.Second * 10 + opts.Logger = newTestLogger(t) + if testing.Verbose() { + opts.LogLevel = 4 + SetLogger(opts.Logger) + } + _, _, nsqd := mustStartNSQD(opts) + defer os.RemoveAll(opts.DataPath) + defer nsqd.Exit() + + topicName := "test_channel_end_update" + strconv.Itoa(int(time.Now().Unix())) + topic := nsqd.GetTopicIgnPart(topicName) + channel := topic.GetChannel("channel") + + msgs := make([]*Message, 0, 9) + for i := 0; i < 10; i++ { + var msgId MessageID + msgBytes := []byte(strconv.Itoa(i + 11)) + msg := NewMessage(msgId, msgBytes) + time.Sleep(time.Millisecond) + msgs = append(msgs, msg) + } + topic.PutMessages(msgs) + topic.flushBuffer(true) + + for i := 0; i < 5; i++ { + msgOutput := <-channel.clientMsgChan + channel.StartInFlightTimeout(msgOutput, NewFakeConsumer(0), "", opts.MsgTimeout) + channel.ConfirmBackendQueue(msgOutput) + t.Logf("consume %v", string(msgOutput.Body)) + } + for i := 0; i < 5; i++ { + var id MessageID + msg := NewMessage(id, []byte("test")) + topic.PutMessage(msg) + } + for i := 0; i < 10; i++ { + select { + case msgOutput := <-channel.clientMsgChan: + channel.StartInFlightTimeout(msgOutput, NewFakeConsumer(0), "", opts.MsgTimeout) + channel.ConfirmBackendQueue(msgOutput) + t.Logf("consume %v", string(msgOutput.Body)) + case <-time.After(time.Second): + t.Fatalf("timeout consume new messages") + } + } + for i := 0; i < 5; i++ { + var id MessageID + msg := NewMessage(id, []byte("test")) + topic.PutMessage(msg) + time.Sleep(time.Millisecond) + } + for i := 0; i < 5; i++ { + select { + case msgOutput := <-channel.clientMsgChan: + channel.StartInFlightTimeout(msgOutput, NewFakeConsumer(0), "", opts.MsgTimeout) + channel.ConfirmBackendQueue(msgOutput) + t.Logf("consume %v", string(msgOutput.Body)) + case <-time.After(time.Second): + t.Fatalf("timeout consume new messages") + } + } + // test new conn consume start from end queue before new message puts +} + func TestRangeTree(t *testing.T) { //tr := NewIntervalTree() tr := NewIntervalSkipList() diff --git a/nsqd/client_v2.go b/nsqd/client_v2.go index f37699fda..597ff1234 100644 --- a/nsqd/client_v2.go +++ b/nsqd/client_v2.go @@ -134,6 +134,7 @@ type ClientV2 struct { isExtendSupport int32 TagMsgChannel chan *Message extFilter ExtFilterData + PubStats *ClientPubStats } func NewClientV2(id int64, conn net.Conn, opts *Options, tls *tls.Config) *ClientV2 { diff --git a/nsqd/delay_queue.go b/nsqd/delay_queue.go index b3342c121..7b0a60101 100644 --- a/nsqd/delay_queue.go +++ b/nsqd/delay_queue.go @@ -762,7 +762,7 @@ func (q *DelayQueue) put(m *Message, rawData []byte, trace bool, checkSize int64 syncEvery := atomic.LoadInt64(&q.SyncEvery) if syncEvery == 1 || dend.TotalMsgCnt()-atomic.LoadInt64(&q.lastSyncCnt) >= syncEvery { - q.flush(true) + q.flush(false) } return m.ID, offset, writeBytes, dend, nil diff --git a/nsqd/diskqueue_reader.go b/nsqd/diskqueue_reader.go index 5b63346d6..1b67e1ac1 100644 --- a/nsqd/diskqueue_reader.go +++ b/nsqd/diskqueue_reader.go @@ -6,7 +6,6 @@ import ( "errors" "fmt" "io" - "math/rand" "os" "path" "sync" @@ -14,7 +13,6 @@ import ( "time" "github.com/youzan/nsq/internal/levellogger" - "github.com/youzan/nsq/internal/util" ) const ( @@ -22,6 +20,10 @@ const ( readBufferSize = 1024 * 4 ) +var errInvalidMetaFileData = errors.New("invalid meta file data") +var diskMagicEndBytes = []byte{0xae, 0x83} +var testCrash = false + var ( ErrReadQueueAlreadyCleaned = errors.New("the queue position has been cleaned") ErrConfirmSizeInvalid = errors.New("Confirm data size invalid.") @@ -236,7 +238,7 @@ func (d *diskQueueReader) exit(deleted bool) error { d.readFile.Close() d.readFile = nil } - d.sync() + d.sync(true) if deleted { d.skipToEndofQueue() err := os.Remove(d.metaDataFileName(false)) @@ -265,19 +267,21 @@ func (d *diskQueueReader) ConfirmRead(offset BackendOffset, cnt int64) error { if oldConfirm != d.confirmedQueueInfo.Offset() { d.needSync = true if d.syncEvery == 1 { - d.sync() + d.sync(false) } } return err } -func (d *diskQueueReader) Flush() { +func (d *diskQueueReader) Flush(fsync bool) { d.Lock() defer d.Unlock() if d.exitFlag == 1 { return } - d.internalUpdateEnd(nil, false) + if d.needSync { + d.sync(fsync) + } } func (d *diskQueueReader) ResetReadToConfirmed() (BackendQueueEnd, error) { @@ -292,7 +296,7 @@ func (d *diskQueueReader) ResetReadToConfirmed() (BackendQueueEnd, error) { if old != d.confirmedQueueInfo.Offset() { d.needSync = true if d.syncEvery == 1 { - d.sync() + d.sync(false) } } } @@ -320,7 +324,7 @@ func (d *diskQueueReader) ResetReadToOffset(offset BackendOffset, cnt int64) (Ba if err == nil { if old != d.confirmedQueueInfo.Offset() { d.needSync = true - d.sync() + d.sync(false) } } @@ -359,7 +363,7 @@ func (d *diskQueueReader) SkipReadToOffset(offset BackendOffset, cnt int64) (Bac if old != d.confirmedQueueInfo.Offset() { d.needSync = true if d.syncEvery == 1 { - d.sync() + d.sync(false) } } } @@ -385,7 +389,7 @@ func (d *diskQueueReader) SkipReadToEnd() (BackendQueueEnd, error) { if old != d.confirmedQueueInfo.Offset() { d.needSync = true if d.syncEvery == 1 { - d.sync() + d.sync(false) } } } @@ -404,9 +408,9 @@ func (d *diskQueueReader) isReadToEnd() bool { if d.IsWaitingMoreData() { return true } - d.Lock() + d.RLock() hasDiskData := d.queueEndInfo.EndOffset.GreatThan(&d.readQueueInfo.EndOffset) - d.Unlock() + d.RUnlock() return !hasDiskData } @@ -897,6 +901,7 @@ CheckFileOpen: if d.readQueueInfo.EndOffset.FileNum == d.queueEndInfo.EndOffset.FileNum { currentFileEnd = d.queueEndInfo.EndOffset.Pos } else if d.readQueueInfo.EndOffset.FileNum < d.queueEndInfo.EndOffset.FileNum { + // TODO: maybe cache stats if filenum unchanged to speed up next readOne stat, result.Err = d.readFile.Stat() if result.Err == nil { currentFileEnd = stat.Size() @@ -1017,8 +1022,8 @@ CheckFileOpen: } // sync fsyncs the current writeFile and persists metadata -func (d *diskQueueReader) sync() error { - err := d.persistMetaData() +func (d *diskQueueReader) sync(fsync bool) error { + err := d.persistMetaData(fsync) if err != nil { return err } @@ -1027,6 +1032,23 @@ func (d *diskQueueReader) sync() error { return nil } +func checkMetaFileEnd(f *os.File) error { + endFlag := make([]byte, len(diskMagicEndBytes)) + n, err := f.Read(endFlag) + if err != nil { + if err == io.EOF { + // old meta file + } else { + nsqLog.Errorf("reader meta end error, need fix: %v", err.Error()) + return errInvalidMetaFileData + } + } else if !bytes.Equal(endFlag[:n], diskMagicEndBytes) { + nsqLog.Errorf("reader meta end data invalid: %v need fix: %v", n, endFlag) + return errInvalidMetaFileData + } + return nil +} + // retrieveMetaData initializes state from the filesystem func (d *diskQueueReader) retrieveMetaData() error { var f *os.File @@ -1047,6 +1069,10 @@ func (d *diskQueueReader) retrieveMetaData() error { nsqLog.Infof("fscanf new meta file err : %v", errV2) return errV2 } + err = checkMetaFileEnd(fV2) + if err != nil { + nsqLog.Errorf("reader (%v) meta invalid, need fix: %v", d.readerMetaName, err.Error()) + } } else { nsqLog.Infof("new meta file err : %v", errV2) @@ -1060,15 +1086,21 @@ func (d *diskQueueReader) retrieveMetaData() error { _, err = fmt.Fscanf(f, "%d\n%d,%d,%d\n%d,%d,%d\n", &d.queueEndInfo.totalMsgCnt, &d.confirmedQueueInfo.EndOffset.FileNum, &d.confirmedQueueInfo.EndOffset.Pos, &d.confirmedQueueInfo.virtualEnd, - &d.queueEndInfo.EndOffset.FileNum, &d.queueEndInfo.EndOffset.Pos, &d.queueEndInfo.virtualEnd) + &d.queueEndInfo.EndOffset.FileNum, &d.queueEndInfo.EndOffset.Pos, &d.queueEndInfo.virtualEnd, + ) if err != nil { return err } + err = checkMetaFileEnd(f) + if err != nil { + nsqLog.Errorf("reader (%v) meta invalid, need fix: %v", d.readerMetaName, err.Error()) + } + if d.confirmedQueueInfo.virtualEnd == d.queueEndInfo.virtualEnd { d.confirmedQueueInfo.totalMsgCnt = d.queueEndInfo.totalMsgCnt } - d.persistMetaData() + d.persistMetaData(false) } if d.confirmedQueueInfo.TotalMsgCnt() == 0 && d.confirmedQueueInfo.Offset() != BackendOffset(0) { nsqLog.Warningf("reader (%v) count is missing, need fix: %v", d.readerMetaName, d.confirmedQueueInfo) @@ -1085,37 +1117,82 @@ func (d *diskQueueReader) retrieveMetaData() error { d.readQueueInfo = d.confirmedQueueInfo d.updateDepth() - return nil + return err +} + +func preWriteMetaEnd(f *os.File) error { + // error can be ignored since we just make sure end with non-magic + f.Seek(-1*int64(len(diskMagicEndBytes)), os.SEEK_END) + _, err := f.Write(make([]byte, len(diskMagicEndBytes))) + if err != nil { + return err + } + _, err = f.Seek(0, os.SEEK_SET) + return err +} + +func (d *diskQueueReader) writeMeta(f *os.File, perr error) (int, error) { + if perr != nil { + return 0, perr + } + n, err := fmt.Fprintf(f, "%d\n%d\n%d,%d,%d\n%d,%d,%d\n", + d.confirmedQueueInfo.TotalMsgCnt(), + d.queueEndInfo.totalMsgCnt, + d.confirmedQueueInfo.EndOffset.FileNum, d.confirmedQueueInfo.EndOffset.Pos, d.confirmedQueueInfo.Offset(), + d.queueEndInfo.EndOffset.FileNum, d.queueEndInfo.EndOffset.Pos, d.queueEndInfo.Offset()) + return n, err +} + +func writeMetaEnd(f *os.File, perr error, pos int) (int, error) { + if perr != nil { + return 0, perr + } + // write magic end + n, err := f.Write(diskMagicEndBytes) + pos += n + f.Truncate(int64(pos)) + return n, err } // persistMetaData atomically writes state to the filesystem -func (d *diskQueueReader) persistMetaData() error { +func (d *diskQueueReader) persistMetaData(fsync bool) error { var f *os.File var err error + var n int + pos := 0 fileName := d.metaDataFileName(true) - tmpFileName := fmt.Sprintf("%s.%d.tmp", fileName, rand.Int()) - // write to tmp file - f, err = os.OpenFile(tmpFileName, os.O_RDWR|os.O_CREATE, 0644) + s := time.Now() + f, err = os.OpenFile(fileName, os.O_RDWR|os.O_CREATE, 0644) if err != nil { return err } - _, err = fmt.Fprintf(f, "%d\n%d\n%d,%d,%d\n%d,%d,%d\n", - d.confirmedQueueInfo.TotalMsgCnt(), - d.queueEndInfo.totalMsgCnt, - d.confirmedQueueInfo.EndOffset.FileNum, d.confirmedQueueInfo.EndOffset.Pos, d.confirmedQueueInfo.Offset(), - d.queueEndInfo.EndOffset.FileNum, d.queueEndInfo.EndOffset.Pos, d.queueEndInfo.Offset()) + err = preWriteMetaEnd(f) + cost1 := time.Since(s) + + if testCrash { + return errors.New("test crash") + } + n, err = d.writeMeta(f, err) + pos += n + _, err = writeMetaEnd(f, err, pos) + cost2 := time.Since(s) + if err != nil { - f.Close() - return err + nsqLog.Errorf("reader (%v) meta write failed, need fix: %v", d.readerMetaName, err.Error()) + } else if fsync { + f.Sync() } - f.Sync() f.Close() - + cost3 := time.Since(s) + if cost3 >= time.Second/10 { + nsqLog.Logf("reader (%v) meta perist slow : %v,%v,%v", d.readerMetaName, cost1, cost2, cost3) + } + return err // atomically rename - return util.AtomicRename(tmpFileName, fileName) + //return util.AtomicRename(tmpFileName, fileName) } func (d *diskQueueReader) metaDataFileName(newVer bool) string { @@ -1167,9 +1244,6 @@ func (d *diskQueueReader) handleReadError() { func (d *diskQueueReader) internalUpdateEnd(endPos *diskQueueEndInfo, forceReload bool) (bool, error) { if endPos == nil { - if d.needSync { - d.sync() - } return false, nil } if forceReload { diff --git a/nsqd/diskqueue_reader_test.go b/nsqd/diskqueue_reader_test.go index b3aacdf72..e08b46e4c 100644 --- a/nsqd/diskqueue_reader_test.go +++ b/nsqd/diskqueue_reader_test.go @@ -384,3 +384,80 @@ func TestDiskQueueSnapshotReader(t *testing.T) { test.Equal(t, 100, len(data)) // remove some begin of queue, and test queue start } + +func TestDiskQueueReaderMetaInvalid(t *testing.T) { + nsqLog.Logger = newTestLogger(t) + dqName := "test_disk_queue" + strconv.Itoa(int(time.Now().Unix())) + tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano())) + test.Nil(t, err) + defer os.RemoveAll(tmpDir) + queue, _ := NewDiskQueueWriter(dqName, tmpDir, 1024, 4, 1<<10, 1) + dqWriter := queue.(*diskQueueWriter) + defer dqWriter.Close() + test.NotNil(t, dqWriter) + + msg := []byte("test") + msgNum := 1000 + for i := 0; i < msgNum; i++ { + dqWriter.Put(msg) + } + dqWriter.Flush(false) + end := dqWriter.GetQueueWriteEnd() + test.Nil(t, err) + + dqReader := newDiskQueueReader(dqName, dqName, tmpDir, 1024, 4, 1<<10, 1, 2*time.Second, nil, true) + dqReader.UpdateQueueEnd(end, false) + msgOut, _ := dqReader.TryReadOne() + equal(t, msgOut.Data, msg) + test.Equal(t, msgOut.Offset+BackendOffset(msgOut.MovedSize), dqReader.(*diskQueueReader).readQueueInfo.Offset()) + test.Equal(t, msgOut.CurCnt, dqReader.(*diskQueueReader).readQueueInfo.TotalMsgCnt()) + dqReader.(*diskQueueReader).sync(false) + fname := dqReader.(*diskQueueReader).metaDataFileName(true) + t.Log(fname) + dqReader.Close() + + tmpf, err := os.OpenFile(fname, os.O_RDWR, 0644) + test.Nil(t, err) + fs, _ := tmpf.Stat() + t.Log(fs.Size()) + magic := make([]byte, len(diskMagicEndBytes)) + n, err := tmpf.Read(magic) + test.Equal(t, n, len(diskMagicEndBytes)) + t.Log(magic) + noff, err := tmpf.Seek(-1*int64(len(diskMagicEndBytes)), 2) + test.Nil(t, err) + t.Log(noff) + magic = make([]byte, len(diskMagicEndBytes)) + n, err = tmpf.Read(magic) + t.Log(magic) + test.Equal(t, n, len(diskMagicEndBytes)) + test.Equal(t, diskMagicEndBytes, magic) + + dqReader = newDiskQueueReader(dqName, dqName, tmpDir, 1024, 4, 1<<10, 1, 2*time.Second, nil, true) + err = dqReader.(*diskQueueReader).retrieveMetaData() + test.Nil(t, err) + + tmpf.Truncate(fs.Size() - 1) + err = dqReader.(*diskQueueReader).retrieveMetaData() + test.NotNil(t, err) + tmpf.Seek(-1*int64(len(diskMagicEndBytes))+1, 2) + tmpf.Write([]byte("01")) + err = dqReader.(*diskQueueReader).retrieveMetaData() + test.NotNil(t, err) + tmpf.Seek(-1*int64(len(diskMagicEndBytes)), 2) + tmpf.Write(diskMagicEndBytes) + err = dqReader.(*diskQueueReader).retrieveMetaData() + test.Nil(t, err) + + testCrash = true + err = dqReader.(*diskQueueReader).persistMetaData(false) + test.NotNil(t, err) + testCrash = false + err = dqReader.(*diskQueueReader).retrieveMetaData() + test.NotNil(t, err) + + err = dqReader.(*diskQueueReader).persistMetaData(false) + test.Nil(t, err) + err = dqReader.(*diskQueueReader).retrieveMetaData() + test.Nil(t, err) +} diff --git a/nsqd/diskqueue_writer.go b/nsqd/diskqueue_writer.go index 227682f38..01ba07839 100644 --- a/nsqd/diskqueue_writer.go +++ b/nsqd/diskqueue_writer.go @@ -107,9 +107,10 @@ func newDiskQueueWriter(name string, dataPath string, maxBytesPerFile int64, } // no need to lock here, nothing else could possibly be touching this instance - err := d.retrieveMetaData() + err := d.retrieveMetaData(!readOnly) if err != nil && !os.IsNotExist(err) { nsqLog.LogErrorf("diskqueue(%s) failed to retrieveMetaData - %s", d.name, err) + return &d, err } err = d.initQueueReadStart() if err != nil && !os.IsNotExist(err) { @@ -118,12 +119,35 @@ func newDiskQueueWriter(name string, dataPath string, maxBytesPerFile int64, } if !readOnly { + if d.diskQueueStart.EndOffset.GreatThan(&d.diskWriteEnd.EndOffset) || + d.diskQueueStart.Offset() > d.diskWriteEnd.Offset() { + nsqLog.LogErrorf("diskqueue(%s) queue start invalid: %v", d.name, d.diskQueueStart) + if d.diskWriteEnd.EndOffset.FileNum == 0 && + d.diskWriteEnd.EndOffset.Pos == 0 { + // auto fix this case + d.diskQueueStart.EndOffset = d.diskWriteEnd.EndOffset + } else { + return &d, ErrNeedFixQueueStart + } + } d.saveExtraMeta() } return &d, nil } +func (d *diskQueueWriter) tryFixData() error { + d.Lock() + defer d.Unlock() + // queue start may be invalid after crash, so we can fix by manual to delete meta and reload it + err := d.initQueueReadStart() + if err != nil && !os.IsNotExist(err) { + return err + } + d.saveExtraMeta() + return nil +} + func (d *diskQueueWriter) SetBufSize(s int64) { atomic.StoreInt64(&d.bufSize, s) } @@ -140,7 +164,6 @@ func (d *diskQueueWriter) PutV2(data []byte) (BackendOffset, int32, diskQueueEnd if dend != nil { e = *dend } - d.needSync = true d.Unlock() return offset, writeBytes, e, werr } @@ -157,7 +180,6 @@ func (d *diskQueueWriter) PutRawV2(data []byte, msgCnt int32) (BackendOffset, in if dend != nil { e = *dend } - d.needSync = true d.Unlock() return offset, writeBytes, e, werr } @@ -175,7 +197,6 @@ func (d *diskQueueWriter) Put(data []byte) (BackendOffset, int32, int64, error) if dend != nil { e = *dend } - d.needSync = true d.Unlock() return offset, writeBytes, e.TotalMsgCnt(), werr } @@ -230,8 +251,13 @@ func (d *diskQueueWriter) ResetWriteWithQueueStart(queueStart BackendQueueEnd) e defer d.Unlock() nsqLog.Warningf("DISKQUEUE %v reset the queue start from %v:%v to new queue start: %v", d.name, d.diskQueueStart, d.diskWriteEnd, queueStart) + d.cleanOldData() + if queueStart.Offset() == 0 && d.diskQueueStart.Offset() == 0 { + d.diskWriteEnd.EndOffset.FileNum = 0 + d.diskWriteEnd.EndOffset.Pos = 0 + } d.diskQueueStart = d.diskWriteEnd d.diskQueueStart.virtualEnd = queueStart.Offset() d.diskQueueStart.totalMsgCnt = queueStart.TotalMsgCnt() @@ -239,6 +265,7 @@ func (d *diskQueueWriter) ResetWriteWithQueueStart(queueStart BackendQueueEnd) e d.diskReadEnd = d.diskWriteEnd nsqLog.Warningf("DISKQUEUE %v new queue start : %v:%v", d.name, d.diskQueueStart, d.diskWriteEnd) + d.persistMetaData(true) d.saveExtraMeta() return nil } @@ -379,7 +406,7 @@ func (d *diskQueueWriter) closeCurrentFile() { d.bufferWriter.Flush() } if d.diskReadEnd.EndOffset.GreatThan(&d.diskWriteEnd.EndOffset) { - nsqLog.LogWarningf("DISKQUEUE(%s): old read is greater: %v, %v", d.name, + nsqLog.Logf("DISKQUEUE(%s): old read is greater: %v, %v", d.name, d.diskReadEnd, d.diskWriteEnd) } d.diskReadEnd = d.diskWriteEnd @@ -404,6 +431,7 @@ func (d *diskQueueWriter) truncateDiskQueueToWriteEnd() { tmpFile.Close() } } + d.persistMetaData(true) cleanNum := d.diskWriteEnd.EndOffset.FileNum + 1 for { fileName := d.fileName(cleanNum) @@ -553,6 +581,8 @@ func (d *diskQueueWriter) Empty() error { func (d *diskQueueWriter) deleteAllFiles(deleted bool) error { d.cleanOldData() + d.persistMetaData(true) + d.saveExtraMeta() if deleted { nsqLog.Logf("DISKQUEUE(%s): deleting meta file", d.name) @@ -561,29 +591,14 @@ func (d *diskQueueWriter) deleteAllFiles(deleted bool) error { nsqLog.LogErrorf("diskqueue(%s) failed to remove metadata file - %s", d.name, innerErr) return innerErr } - cleanStartFileNum := d.diskQueueStart.EndOffset.FileNum - MAX_QUEUE_OFFSET_META_DATA_KEEP - 1 - if cleanStartFileNum < 0 { - cleanStartFileNum = 0 - } os.Remove(d.extraMetaFileName()) - for i := cleanStartFileNum; i <= d.diskWriteEnd.EndOffset.FileNum; i++ { - fName := d.fileName(i) + ".offsetmeta.dat" - innerErr := os.Remove(fName) - nsqLog.Logf("DISKQUEUE(%s): removed offset meta file: %v", d.name, fName) - if innerErr != nil && !os.IsNotExist(innerErr) { - nsqLog.LogErrorf("diskqueue(%s) failed to remove offset meta file %v - %s", d.name, fName, innerErr) - } - } } return nil } func (d *diskQueueWriter) cleanOldData() error { - d.closeCurrentFile() - d.saveFileOffsetMeta() - cleanStartFileNum := d.diskQueueStart.EndOffset.FileNum - MAX_QUEUE_OFFSET_META_DATA_KEEP - 1 if cleanStartFileNum < 0 { cleanStartFileNum = 0 @@ -594,6 +609,13 @@ func (d *diskQueueWriter) cleanOldData() error { nsqLog.Logf("DISKQUEUE(%s): removed data file: %v", d.name, fn) if innerErr != nil && !os.IsNotExist(innerErr) { nsqLog.LogErrorf("diskqueue(%s) failed to remove data file - %s", d.name, innerErr) + } else { + fName := d.fileName(i) + ".offsetmeta.dat" + innerErr := os.Remove(fName) + nsqLog.Logf("DISKQUEUE(%s): removed offset meta file: %v", d.name, fName) + if innerErr != nil && !os.IsNotExist(innerErr) { + nsqLog.LogErrorf("diskqueue(%s) failed to remove offset meta file %v - %s", d.name, fName, innerErr) + } } } @@ -601,7 +623,6 @@ func (d *diskQueueWriter) cleanOldData() error { d.diskWriteEnd.EndOffset.Pos = 0 d.diskReadEnd = d.diskWriteEnd d.diskQueueStart = d.diskWriteEnd - d.saveExtraMeta() return nil } @@ -683,6 +704,7 @@ func (d *diskQueueWriter) writeOne(data []byte, isRaw bool, msgCnt int32) (Backe } } + d.needSync = true dataLen := int32(len(data)) if !isRaw { if dataLen < d.minMsgSize || dataLen > d.maxMsgSize { @@ -741,6 +763,7 @@ func (d *diskQueueWriter) writeOne(data []byte, isRaw bool, msgCnt int32) (Backe d.diskWriteEnd.EndOffset.FileNum++ d.diskWriteEnd.EndOffset.Pos = 0 d.diskReadEnd = d.diskWriteEnd + d.needSync = true } return writeOffset, int32(totalBytes), &d.diskWriteEnd, err @@ -802,7 +825,7 @@ func (d *diskQueueWriter) sync(fsync bool) error { d.diskReadEnd = d.diskWriteEnd - err := d.persistMetaData() + err := d.persistMetaData(fsync) if err != nil { return err } @@ -919,7 +942,7 @@ func (d *diskQueueWriter) saveExtraMeta() error { } // retrieveMetaData initializes state from the filesystem -func (d *diskQueueWriter) retrieveMetaData() error { +func (d *diskQueueWriter) retrieveMetaData(fix bool) error { var f *os.File var err error @@ -939,36 +962,72 @@ func (d *diskQueueWriter) retrieveMetaData() error { } atomic.StoreInt64(&d.diskWriteEnd.totalMsgCnt, totalCnt) d.diskReadEnd = d.diskWriteEnd + err = checkMetaFileEnd(f) + if err != nil && fix { + // recovery from tmp meta file + tmpFileName := fmt.Sprintf("%s.tmp", fileName) + badFileName := fmt.Sprintf("%s.%d.corrupt", fileName, rand.Int()) + nsqLog.Errorf("meta file corrupt to %v, try recover meta from file %v ", badFileName, tmpFileName) + err2 := util.AtomicRename(fileName, badFileName) + if err2 != nil { + nsqLog.Warningf("%v failed rename to %v : %v", fileName, badFileName, err2.Error()) + } else { + err2 = util.AtomicRename(tmpFileName, fileName) + if err2 != nil { + nsqLog.Errorf("%v failed recover to %v : %v", tmpFileName, fileName, err2.Error()) + util.AtomicRename(badFileName, fileName) + } + } + } + return err +} - return nil +// used for recovery if the normal meta data file corrupt. +func (d *diskQueueWriter) persistTmpMetaData() error { + fileName := d.metaDataFileName() + tmpFileName := fmt.Sprintf("%s.tmp", fileName) + f, err := os.OpenFile(tmpFileName, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644) + if err != nil { + return err + } + _, err = fmt.Fprintf(f, "%d\n%d,%d,%d\n", + atomic.LoadInt64(&d.diskWriteEnd.totalMsgCnt), + d.diskWriteEnd.EndOffset.FileNum, d.diskWriteEnd.EndOffset.Pos, d.diskWriteEnd.Offset()) + + f.Close() + return err } // persistMetaData atomically writes state to the filesystem -func (d *diskQueueWriter) persistMetaData() error { +func (d *diskQueueWriter) persistMetaData(fsync bool) error { var f *os.File var err error + var n int + pos := 0 + err = d.persistTmpMetaData() + if err != nil { + return err + } fileName := d.metaDataFileName() - tmpFileName := fmt.Sprintf("%s.%d.tmp", fileName, rand.Int()) - - // write to tmp file - f, err = os.OpenFile(tmpFileName, os.O_RDWR|os.O_CREATE, 0644) + f, err = os.OpenFile(fileName, os.O_RDWR|os.O_CREATE, 0644) if err != nil { return err } + err = preWriteMetaEnd(f) - _, err = fmt.Fprintf(f, "%d\n%d,%d,%d\n", + n, err = fmt.Fprintf(f, "%d\n%d,%d,%d\n", atomic.LoadInt64(&d.diskWriteEnd.totalMsgCnt), d.diskWriteEnd.EndOffset.FileNum, d.diskWriteEnd.EndOffset.Pos, d.diskWriteEnd.Offset()) - if err != nil { - f.Close() - return err + pos += n + + _, err = writeMetaEnd(f, err, pos) + if fsync && err == nil { + f.Sync() } - f.Sync() - f.Close() - // atomically rename - return util.AtomicRename(tmpFileName, fileName) + f.Close() + return err } func (d *diskQueueWriter) metaDataFileName() string { diff --git a/nsqd/diskqueue_writer_test.go b/nsqd/diskqueue_writer_test.go index 0686fa53c..cc95f0151 100644 --- a/nsqd/diskqueue_writer_test.go +++ b/nsqd/diskqueue_writer_test.go @@ -489,6 +489,12 @@ func TestDiskQueueWriterRollbackAndResetEnd(t *testing.T) { leftPos := int64(end.Offset()) - 1024*dqWriter.diskWriteEnd.EndOffset.FileNum equal(t, end.(*diskQueueEndInfo).EndOffset.Pos, leftPos) equal(t, end.(*diskQueueEndInfo).EndOffset.Pos, dqWriter.diskWriteEnd.EndOffset.Pos) + // test reopen + dqWriter.Close() + queue, _ = NewDiskQueueWriter(dqName, tmpDir, 1024, 4, 1<<10, 1) + dqWriter = queue.(*diskQueueWriter) + newEnd := dqWriter.GetQueueWriteEnd() + test.Equal(t, true, newEnd.IsSame(end)) resetOffset := int64((len(msg) + 4) * totalCnt / 2) err = dqWriter.ResetWriteEnd(BackendOffset(resetOffset), int64(totalCnt/2)) @@ -559,7 +565,7 @@ func TestDiskQueueWriterInitWithQueueStart(t *testing.T) { test.Equal(t, int64(0), oldStart.EndOffset.FileNum) dqWriter.Close() - queue, err = NewDiskQueueWriter(dqName, tmpDir, 1024, 4, 1<<10, 1) + queue, err = NewDiskQueueWriter(dqName, tmpDir, int64(fileMaxSize), 4, 1<<10, 1) test.Nil(t, err) dqWriter = queue.(*diskQueueWriter) test.Equal(t, oldStart, dqWriter.diskQueueStart) @@ -572,15 +578,17 @@ func TestDiskQueueWriterInitWithQueueStart(t *testing.T) { test.Equal(t, oldStart.EndOffset.FileNum*int64(cntInFile), newStart.TotalMsgCnt()) dqWriter.Close() - queue, err = NewDiskQueueWriter(dqName, tmpDir, 1024, 4, 1<<10, 1) + queue, err = NewDiskQueueWriter(dqName, tmpDir, int64(fileMaxSize), 4, 1<<10, 1) test.Nil(t, err) dqWriter = queue.(*diskQueueWriter) test.Equal(t, newStart, dqWriter.GetQueueReadStart()) dqWriter.cleanOldData() + dqWriter.persistMetaData(false) + dqWriter.saveExtraMeta() test.Equal(t, dqWriter.GetQueueReadStart(), dqWriter.GetQueueWriteEnd()) newStart = dqWriter.GetQueueReadStart() dqWriter.Close() - queue, err = NewDiskQueueWriter(dqName, tmpDir, 1024, 4, 1<<10, 1) + queue, err = NewDiskQueueWriter(dqName, tmpDir, int64(fileMaxSize), 4, 1<<10, 1) test.Nil(t, err) dqWriter = queue.(*diskQueueWriter) test.Equal(t, newStart, dqWriter.GetQueueReadStart()) @@ -847,3 +855,55 @@ func benchmarkDiskQueueReaderGet(size int64, b *testing.B) { dqReader.TryReadOne() } } + +func TestDiskQueueWriterInvalidMeta(t *testing.T) { + l := newTestLogger(t) + nsqLog.Logger = l + dqName := "test_disk_queue_invalid_meta" + strconv.Itoa(int(time.Now().Unix())) + tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano())) + if err != nil { + panic(err) + } + defer os.RemoveAll(tmpDir) + // require a non-zero message length for the corrupt (len 0) test below + dq, _ := NewDiskQueueWriter(dqName, tmpDir, 1000, 10, 1<<10, 1) + defer dq.Close() + + msg := make([]byte, 123) // 127 bytes per message, 8 (1016 bytes) messages per file + for i := 0; i < 25; i++ { + dq.Put(msg) + } + dq.Flush(true) + dq.(*diskQueueWriter).sync(true) + + dqFn := dq.(*diskQueueWriter).metaDataFileName() + t.Log(dqFn) + + tmpf, err := os.OpenFile(dqFn, os.O_RDWR|os.O_CREATE, 0644) + test.Nil(t, err) + fs, _ := tmpf.Stat() + t.Log(fs.Size()) + + noff, err := tmpf.Seek(-1*int64(len(diskMagicEndBytes)), 2) + test.Nil(t, err) + t.Log(noff) + magic := make([]byte, len(diskMagicEndBytes)) + n, err := tmpf.Read(magic) + t.Log(magic) + test.Equal(t, n, len(diskMagicEndBytes)) + test.Equal(t, diskMagicEndBytes, magic) + + err = dq.(*diskQueueWriter).retrieveMetaData(false) + test.Nil(t, err) + tmpf.Truncate(fs.Size() - 1) + err = dq.(*diskQueueWriter).retrieveMetaData(false) + test.NotNil(t, err) + tmpf.Seek(-1*int64(len(diskMagicEndBytes))+1, 2) + tmpf.Write([]byte("01")) + err = dq.(*diskQueueWriter).retrieveMetaData(false) + test.NotNil(t, err) + tmpf.Seek(-1*int64(len(diskMagicEndBytes)), 2) + tmpf.Write(diskMagicEndBytes) + err = dq.(*diskQueueWriter).retrieveMetaData(false) + test.Nil(t, err) +} diff --git a/nsqd/nsqd.go b/nsqd/nsqd.go index 2e7d970fc..d3596fccb 100644 --- a/nsqd/nsqd.go +++ b/nsqd/nsqd.go @@ -15,6 +15,7 @@ import ( "time" "github.com/bitly/go-simplejson" + "github.com/spaolacci/murmur3" "github.com/youzan/nsq/internal/clusterinfo" "github.com/youzan/nsq/internal/dirlock" "github.com/youzan/nsq/internal/http_api" @@ -31,6 +32,10 @@ const ( TLSRequired ) +const ( + jobQueueChLen = 100 +) + type errStore struct { err error } @@ -53,6 +58,7 @@ type INsqdNotify interface { NotifyStateChanged(v interface{}, needPersist bool) ReqToEnd(*Channel, *Message, time.Duration) error NotifyScanDelayed(*Channel) + PushTopicJob(*Topic, func()) } type ReqToEndFunc func(*Channel, *Message, time.Duration) error @@ -70,7 +76,9 @@ type NSQD struct { topicMap map[string]map[int]*Topic magicCodeMutex sync.Mutex - poolSize int + poolSize int + topicJobPoolSize int + topicJobChList [16]chan func() MetaNotifyChan chan interface{} OptsNotificationChan chan struct{} @@ -115,6 +123,9 @@ func New(opts *Options) *NSQD { persistNotifyCh: make(chan struct{}, 2), persistClosed: make(chan struct{}), } + for i := 0; i < len(n.topicJobChList); i++ { + n.topicJobChList[i] = make(chan func(), jobQueueChLen) + } n.SwapOpts(opts) n.errValue.Store(errStore{}) @@ -251,6 +262,7 @@ func (n *NSQD) GetTopicMapCopy() []*Topic { func (n *NSQD) Start() { n.waitGroup.Wrap(func() { n.queueScanLoop() }) + n.waitGroup.Wrap(func() { n.queueTopicJobLoop() }) n.persistWaitGroup.Wrap(func() { n.persistLoop() }) } @@ -758,26 +770,12 @@ type responseData struct { // func (n *NSQD) resizePool(num int, workCh chan *Channel, responseCh chan responseData, closeCh chan int) { idealPoolSize := int(float64(num) * 0.25) - if idealPoolSize < 1 { - idealPoolSize = 1 - } else if idealPoolSize > n.GetOpts().QueueScanWorkerPoolMax { + if idealPoolSize > n.GetOpts().QueueScanWorkerPoolMax { idealPoolSize = n.GetOpts().QueueScanWorkerPoolMax } - for { - if idealPoolSize == n.poolSize { - break - } else if idealPoolSize < n.poolSize { - // contract - closeCh <- 1 - n.poolSize-- - } else { - // expand - n.waitGroup.Wrap(func() { - n.queueScanWorker(workCh, responseCh, closeCh) - }) - n.poolSize++ - } - } + n.poolSize = n.resizeWorkerPool(idealPoolSize, n.poolSize, closeCh, func(cc chan int) { + n.queueScanWorker(workCh, responseCh, cc) + }) } // queueScanWorker receives work (in the form of a channel) from queueScanLoop @@ -919,6 +917,98 @@ exit: fastTimer.Stop() } +func (n *NSQD) resizeWorkerPool(idealPoolSize int, actualSize int, closeCh chan int, workerFunc func(chan int)) int { + if idealPoolSize < 1 { + idealPoolSize = 1 + } + for { + if idealPoolSize == actualSize { + break + } else if idealPoolSize < actualSize { + // contract + closeCh <- 1 + actualSize-- + } else { + // expand + n.waitGroup.Wrap(func() { + workerFunc(closeCh) + }) + actualSize++ + } + } + return actualSize +} + +func (n *NSQD) resizeTopicJobPool(tnum int, jobCh chan func(), closeCh chan int) { + idealPoolSize := int(float64(tnum) * 0.1) + if idealPoolSize > n.GetOpts().QueueTopicJobWorkerPoolMax { + idealPoolSize = n.GetOpts().QueueTopicJobWorkerPoolMax + } + n.topicJobPoolSize = n.resizeWorkerPool(idealPoolSize, n.topicJobPoolSize, closeCh, func(cc chan int) { + n.topicJobLoop(jobCh, cc) + }) +} + +func (n *NSQD) PushTopicJob(t *Topic, job func()) { + h := int(murmur3.Sum32([]byte(t.GetFullName()))) + index := h % len(n.topicJobChList) + for i := 0; i < len(n.topicJobChList); i++ { + ch := n.topicJobChList[(index+i)%len(n.topicJobChList)] + select { + case ch <- job: + return + default: + } + } + nsqLog.Logf("%v topic job push ignored: %v", t.GetFullName(), job) +} + +func (n *NSQD) topicJobLoop(jobCh chan func(), closeCh chan int) { + for { + select { + case job := <-jobCh: + job() + case <-closeCh: + return + } + } +} + +func (n *NSQD) queueTopicJobLoop() { + closeCh := make(chan int) + topics := n.GetTopicMapCopy() + refreshTicker := time.NewTicker(n.GetOpts().QueueScanRefreshInterval) + aggJobCh := make(chan func(), len(n.topicJobChList)*jobQueueChLen+1) + for i := 0; i < len(n.topicJobChList); i++ { + go func(c chan func()) { + for { + select { + case job := <-c: + aggJobCh <- job + case <-n.exitChan: + return + } + } + }(n.topicJobChList[i]) + } + n.resizeTopicJobPool(len(topics), aggJobCh, closeCh) + for { + select { + case <-refreshTicker.C: + topics := n.GetTopicMapCopy() + n.resizeTopicJobPool(len(topics), aggJobCh, closeCh) + continue + case <-n.exitChan: + goto exit + } + } + +exit: + nsqLog.Logf("QUEUE topic job loop: closing") + close(closeCh) + refreshTicker.Stop() +} + func (n *NSQD) IsAuthEnabled() bool { return len(n.GetOpts().AuthHTTPAddresses) != 0 } diff --git a/nsqd/options.go b/nsqd/options.go index e01e37f3d..f556f4c6c 100644 --- a/nsqd/options.go +++ b/nsqd/options.go @@ -41,11 +41,12 @@ type Options struct { SyncEvery int64 `flag:"sync-every"` SyncTimeout time.Duration `flag:"sync-timeout"` - QueueScanInterval time.Duration `flag:"queue-scan-interval"` - QueueScanRefreshInterval time.Duration `flag:"queue-scan-refresh-interval"` - QueueScanSelectionCount int `flag:"queue-scan-selection-count"` - QueueScanWorkerPoolMax int `flag:"queue-scan-worker-pool-max"` - QueueScanDirtyPercent float64 `flag:"queue-scan-dirty-percent"` + QueueScanInterval time.Duration `flag:"queue-scan-interval"` + QueueScanRefreshInterval time.Duration `flag:"queue-scan-refresh-interval"` + QueueScanSelectionCount int `flag:"queue-scan-selection-count"` + QueueScanWorkerPoolMax int `flag:"queue-scan-worker-pool-max"` + QueueTopicJobWorkerPoolMax int `flag:"queue-topic-job-worker-pool-max"` + QueueScanDirtyPercent float64 `flag:"queue-scan-dirty-percent"` // msg and command options MsgTimeout time.Duration `flag:"msg-timeout" arg:"60s"` @@ -134,11 +135,12 @@ func NewOptions() *Options { SyncEvery: 2500, SyncTimeout: 2 * time.Second, - QueueScanInterval: 500 * time.Millisecond, - QueueScanRefreshInterval: 5 * time.Second, - QueueScanSelectionCount: 20, - QueueScanWorkerPoolMax: 4, - QueueScanDirtyPercent: 0.25, + QueueScanInterval: 500 * time.Millisecond, + QueueScanRefreshInterval: 5 * time.Second, + QueueScanSelectionCount: 20, + QueueScanWorkerPoolMax: 16, + QueueTopicJobWorkerPoolMax: 100, + QueueScanDirtyPercent: 0.25, MsgTimeout: 60 * time.Second, MaxMsgTimeout: 15 * time.Minute, diff --git a/nsqd/stats.go b/nsqd/stats.go index 167c55332..2c63c780c 100644 --- a/nsqd/stats.go +++ b/nsqd/stats.go @@ -145,6 +145,14 @@ type ClientPubStats struct { LastPubTs int64 `json:"last_pub_ts"` } +func (cps *ClientPubStats) IncrCounter(count int64, hasErr bool) { + if hasErr { + atomic.AddInt64(&cps.ErrCount, count) + } else { + atomic.AddInt64(&cps.PubCount, count) + } +} + type ClientStats struct { // TODO: deprecated, remove in 1.0 Name string `json:"name"` @@ -438,33 +446,29 @@ func (self *DetailStatsInfo) UpdateTopicMsgStats(msgSize int64, latency int64) { } } -func (self *DetailStatsInfo) UpdatePubClientStats(remote string, agent string, protocol string, count int64, hasErr bool) { +func (self *DetailStatsInfo) InitPubClientStats(remote string, agent string, protocol string) *ClientPubStats { self.Lock() defer self.Unlock() s, ok := self.clientPubStats[remote] - if !ok { - // too much clients pub to this topic - // we just ignore stats - if len(self.clientPubStats) > maxPubClientStats { - nsqLog.Debugf("too much pub client : %v", len(self.clientPubStats)) - return - } - s = &ClientPubStats{ - RemoteAddress: remote, - UserAgent: agent, - Protocol: protocol, - } - // only update ts for new client connection, to avoid too much time.Now() call - // - s.LastPubTs = time.Now().Unix() - self.clientPubStats[remote] = s + if ok { + return s } - - if hasErr { - s.ErrCount++ - } else { - s.PubCount += count + // too much clients pub to this topic + // we just ignore stats + if len(self.clientPubStats) > maxPubClientStats { + nsqLog.Debugf("too much pub client : %v", len(self.clientPubStats)) + return nil + } + s = &ClientPubStats{ + RemoteAddress: remote, + UserAgent: agent, + Protocol: protocol, } + // only update ts for new client connection, to avoid too much time.Now() call + // + s.LastPubTs = time.Now().Unix() + self.clientPubStats[remote] = s + return s } func (self *DetailStatsInfo) RemovePubStats(remote string, protocol string) { @@ -545,7 +549,6 @@ func (self *DetailStatsInfo) SaveHistory(fileName string) error { nsqLog.LogWarningf("failed to save history stats: %v", err) return err } - f.Sync() f.Close() err = util.AtomicRename(tmpFileName, fileName) diff --git a/nsqd/topic.go b/nsqd/topic.go index 6e3dfa4a9..b4afb124f 100644 --- a/nsqd/topic.go +++ b/nsqd/topic.go @@ -214,7 +214,6 @@ func NewTopicWithExt(topicName string, part int, ext bool, ordered bool, opt *Op if err == ErrNeedFixQueueStart { t.SetDataFixState(true) } else { - t.MarkAsRemoved() return nil } } @@ -575,10 +574,10 @@ func (t *Topic) UpdateCommittedOffset(offset BackendQueueEnd) { if syncEvery == 1 || offset.TotalMsgCnt()-atomic.LoadInt64(&t.lastSyncCnt) >= syncEvery { if !t.IsWriteDisabled() { - t.flush(true) + t.flushBuffer(true) } } else { - t.flushForChannels() + t.notifyChEndChanged(false) } } @@ -815,7 +814,7 @@ func (t *Topic) RollbackNoLock(vend BackendOffset, diffCnt uint64) error { dend, err := t.backend.RollbackWriteV2(vend, diffCnt) if err == nil { t.UpdateCommittedOffset(&dend) - t.updateChannelsEnd(true) + t.updateChannelsEnd(true, true) } return err } @@ -831,7 +830,7 @@ func (t *Topic) ResetBackendEndNoLock(vend BackendOffset, totalCnt int64) error nsqLog.LogErrorf("reset backend to %v error: %v", vend, err) } else { t.UpdateCommittedOffset(&dend) - t.updateChannelsEnd(true) + t.updateChannelsEnd(true, true) } return err @@ -872,20 +871,8 @@ func (t *Topic) flushForChannelMoreData(c *Channel) { } hasData := t.backend.FlushBuffer() if hasData { - e := t.backend.GetQueueReadEnd() - curCommit := t.GetCommitted() - // if not committed, we need wait to notify channel. - if curCommit != nil && e.Offset() > curCommit.Offset() { - e = curCommit - } - err := c.UpdateQueueEnd(e, false) - if err != nil { - if err != ErrExiting { - nsqLog.Logf( - "failed to update topic end to channel(%s) - %s", - c.name, err) - } - } + e := t.getCommittedEnd() + updateChannelEnd(false, e, c) } } @@ -894,11 +881,15 @@ func (t *Topic) ForceFlushForChannels() { // no need sync to disk, since sync is heavy IO. hasData := t.backend.FlushBuffer() if hasData { - t.updateChannelsEnd(false) + t.updateChannelsEnd(false, false) } } -func (t *Topic) flushForChannels() { +func (t *Topic) notifyChEndChanged(force bool) { + t.nsqdNotify.PushTopicJob(t, func() { t.flushForChannels(force) }) +} + +func (t *Topic) flushForChannels(forceUpdate bool) { if t.IsWriteDisabled() { return } @@ -911,10 +902,12 @@ func (t *Topic) flushForChannels() { } } t.channelLock.RUnlock() + hasData := false if needFlush { - // flush buffer only to allow the channel read recent write - // no need sync to disk, since sync is heavy IO. - t.ForceFlushForChannels() + hasData = t.backend.FlushBuffer() + } + if hasData || forceUpdate { + t.updateChannelsEnd(false, forceUpdate) } } @@ -1029,8 +1022,8 @@ func (t *Topic) PutMessagesNoLock(msgs []*Message) (MessageID, BackendOffset, in // PutMessages writes multiple Messages to the queue func (t *Topic) PutMessages(msgs []*Message) (MessageID, BackendOffset, int32, int64, BackendQueueEnd, error) { t.Lock() + defer t.Unlock() firstMsgID, firstOffset, batchBytes, totalCnt, dend, err := t.PutMessagesNoLock(msgs) - t.Unlock() return firstMsgID, firstOffset, batchBytes, totalCnt, dend, err } @@ -1061,8 +1054,21 @@ func (t *Topic) put(m *Message, trace bool, checkSize int64) (MessageID, Backend return m.ID, offset, writeBytes, dend, nil } -func (t *Topic) updateChannelsEnd(forceReload bool) { - s := time.Now() +func updateChannelEnd(forceReload bool, e BackendQueueEnd, ch *Channel) { + if e == nil { + return + } + err := ch.UpdateQueueEnd(e, forceReload) + if err != nil { + if err != ErrExiting { + nsqLog.LogErrorf( + "failed to update topic end to channel(%s) - %s", + ch.name, err) + } + } +} + +func (t *Topic) getCommittedEnd() BackendQueueEnd { e := t.backend.GetQueueReadEnd() curCommit := t.GetCommitted() // if not committed, we need wait to notify channel. @@ -1072,29 +1078,23 @@ func (t *Topic) updateChannelsEnd(forceReload bool) { } e = curCommit } + return e +} + +func (t *Topic) updateChannelsEnd(forceReload bool, forceUpdate bool) { + s := time.Now() + e := t.getCommittedEnd() t.channelLock.RLock() if e != nil { for _, channel := range t.channelMap { - oldEnd := channel.GetChannelEnd() - err := channel.UpdateQueueEnd(e, forceReload) - if err != nil { - if err != ErrExiting { - nsqLog.LogErrorf( - "failed to update topic end to channel(%s) - %s", - channel.name, err) - } - } else { - if e.Offset() < oldEnd.Offset() { - nsqLog.LogWarningf( - "update topic %v new end is less than old channel(%s) - %v, %v", t.GetTopicName(), - channel.name, oldEnd, e) - } + if forceUpdate || channel.IsWaitingMoreData() { + updateChannelEnd(forceReload, e, channel) } } } t.channelLock.RUnlock() cost := time.Now().Sub(s) - if cost > time.Second { + if cost > time.Second/2 { nsqLog.LogWarningf("topic(%s): update channels end cost: %v", t.GetFullName(), cost) } } @@ -1167,7 +1167,8 @@ func (t *Topic) exit(deleted bool) error { } // write anything leftover to disk - t.flush(true) + t.flushData() + t.updateChannelsEnd(false, true) nsqLog.Logf("[TRACE_DATA] exiting topic end: %v, cnt: %v", t.TotalDataSize(), t.TotalMessageCnt()) t.SaveChannelMeta() t.channelLock.RLock() @@ -1193,9 +1194,10 @@ func (t *Topic) IsWriteDisabled() bool { } func (t *Topic) DisableForSlave() { - atomic.StoreInt32(&t.writeDisabled, 1) - nsqLog.Logf("[TRACE_DATA] while disable topic %v end: %v, cnt: %v, queue start: %v", t.GetFullName(), - t.TotalDataSize(), t.TotalMessageCnt(), t.backend.GetQueueReadStart()) + if atomic.CompareAndSwapInt32(&t.writeDisabled, 0, 1) { + nsqLog.Logf("[TRACE_DATA] while disable topic %v end: %v, cnt: %v, queue start: %v", t.GetFullName(), + t.TotalDataSize(), t.TotalMessageCnt(), t.backend.GetQueueReadStart()) + } t.channelLock.RLock() for _, c := range t.channelMap { c.DisableConsume(true) @@ -1214,7 +1216,9 @@ func (t *Topic) DisableForSlave() { } func (t *Topic) EnableForMaster() { - nsqLog.Logf("[TRACE_DATA] while enable topic %v end: %v, cnt: %v", t.GetFullName(), t.TotalDataSize(), t.TotalMessageCnt()) + if atomic.CompareAndSwapInt32(&t.writeDisabled, 1, 0) { + nsqLog.Logf("[TRACE_DATA] while enable topic %v end: %v, cnt: %v", t.GetFullName(), t.TotalDataSize(), t.TotalMessageCnt()) + } t.channelLock.RLock() for _, c := range t.channelMap { c.DisableConsume(false) @@ -1227,7 +1231,6 @@ func (t *Topic) EnableForMaster() { c.GetConfirmed(), c.Depth(), c.backend.GetQueueReadEnd(), curRead) } t.channelLock.RUnlock() - atomic.StoreInt32(&t.writeDisabled, 0) // notify re-register to lookup t.nsqdNotify.NotifyStateChanged(t, false) } @@ -1245,15 +1248,18 @@ func (t *Topic) ForceFlush() { } s := time.Now() - t.flush(true) + t.flushData() + e := t.getCommittedEnd() cost := time.Now().Sub(s) if cost > time.Second { nsqLog.LogWarningf("topic(%s): flush cost: %v", t.GetFullName(), cost) } s = time.Now() + useFsync := t.option.UseFsync t.channelLock.RLock() for _, channel := range t.channelMap { - channel.Flush() + updateChannelEnd(false, e, channel) + channel.Flush(useFsync) } t.channelLock.RUnlock() cost = time.Now().Sub(s) @@ -1262,9 +1268,16 @@ func (t *Topic) ForceFlush() { } } -func (t *Topic) flush(notifyChan bool) error { +func (t *Topic) flushBuffer(notifyCh bool) error { + hasData := t.backend.FlushBuffer() + if notifyCh && hasData { + t.notifyChEndChanged(true) + } + return nil +} + +func (t *Topic) flushData() (error, bool) { syncEvery := atomic.LoadInt64(&t.dynamicConf.SyncEvery) - // TODO: if replication is 1 we may need fsync useFsync := syncEvery == 1 || t.option.UseFsync if t.GetDelayedQueue() != nil { @@ -1273,26 +1286,20 @@ func (t *Topic) flush(notifyChan bool) error { ok := atomic.CompareAndSwapInt32(&t.needFlush, 1, 0) if !ok { - if notifyChan { - t.updateChannelsEnd(false) - } - return nil + return nil, false } atomic.StoreInt64(&t.lastSyncCnt, t.backend.GetQueueWriteEnd().TotalMsgCnt()) err := t.backend.Flush(useFsync) if err != nil { nsqLog.LogErrorf("failed flush: %v", err) - return err - } - if notifyChan { - t.updateChannelsEnd(false) + return err, false } - return err + return err, true } func (t *Topic) PrintCurrentStats() { - nsqLog.Logf("topic(%s) status: write end %v", t.GetFullName(), t.backend.GetQueueWriteEnd()) + nsqLog.Logf("topic(%s) status: start: %v, write end %v", t.GetFullName(), t.backend.GetQueueReadStart(), t.backend.GetQueueWriteEnd()) t.channelLock.RLock() for _, ch := range t.channelMap { nsqLog.Logf("channel(%s) depth: %v, confirmed: %v, debug: %v", ch.GetName(), ch.Depth(), @@ -1479,3 +1486,10 @@ func (t *Topic) UpdateDelayedQueueConsumedState(ts int64, keyList RecentKeyList, return dq.UpdateConsumedState(ts, keyList, cntList, channelCntList) } + +// after crash, some topic meta need to be fixed by manual +func (t *Topic) TryFixData() error { + t.backend.tryFixData() + // TODO: fix channel meta + return nil +} diff --git a/nsqd/topic_test.go b/nsqd/topic_test.go index 94e5fd257..0489385e3 100644 --- a/nsqd/topic_test.go +++ b/nsqd/topic_test.go @@ -269,6 +269,8 @@ func TestTopicPutChannelWait(t *testing.T) { test.Equal(t, topic.backend.GetQueueReadEnd(), channel.GetChannelEnd()) msg.ID = 0 topic.PutMessage(msg) + // wait channel end notify done + time.Sleep(time.Millisecond) test.Equal(t, false, channel.IsWaitingMoreData()) test.Equal(t, topic.backend.GetQueueReadEnd(), topic.backend.GetQueueWriteEnd()) test.Equal(t, topic.backend.GetQueueReadEnd(), channel.GetChannelEnd()) @@ -282,11 +284,13 @@ func TestTopicPutChannelWait(t *testing.T) { test.Equal(t, true, channel.IsWaitingMoreData()) msg.ID = 0 topic.PutMessage(msg) + time.Sleep(time.Millisecond) test.Equal(t, false, channel.IsWaitingMoreData()) test.Equal(t, topic.backend.GetQueueReadEnd(), topic.backend.GetQueueWriteEnd()) test.Equal(t, topic.backend.GetQueueReadEnd(), channel.GetChannelEnd()) msg.ID = 0 topic.PutMessage(msg) + time.Sleep(time.Millisecond) test.NotEqual(t, topic.backend.GetQueueReadEnd(), topic.backend.GetQueueWriteEnd()) test.Equal(t, topic.backend.GetQueueReadEnd(), channel.GetChannelEnd()) } @@ -465,12 +469,31 @@ func TestTopicCleanOldDataByRetentionDayWithResetStart(t *testing.T) { topic.DisableForSlave() err := topic.ResetBackendWithQueueStartNoLock(0, 0) test.Nil(t, err) + writeEnd := topic.backend.GetQueueWriteEnd() + test.Equal(t, BackendOffset(0), writeEnd.Offset()) + test.Equal(t, int64(0), writeEnd.TotalMsgCnt()) readStart = *(topic.backend.GetQueueReadStart().(*diskQueueEndInfo)) - test.Equal(t, int64(1), readStart.EndOffset.FileNum) + test.Equal(t, BackendOffset(0), readStart.Offset()) + test.Equal(t, int64(0), readStart.TotalMsgCnt()) err = topic.ResetBackendWithQueueStartNoLock(0, 0) test.Nil(t, err) + writeEnd = topic.backend.GetQueueWriteEnd() readStart = *(topic.backend.GetQueueReadStart().(*diskQueueEndInfo)) - test.Equal(t, int64(2), readStart.EndOffset.FileNum) + test.Equal(t, BackendOffset(0), readStart.Offset()) + test.Equal(t, int64(0), readStart.TotalMsgCnt()) + test.Equal(t, BackendOffset(0), writeEnd.Offset()) + test.Equal(t, int64(0), writeEnd.TotalMsgCnt()) + + nsqd.CloseExistingTopic("test", 0) + topic = nsqd.GetTopic("test", 0, false) + topic.dynamicConf.AutoCommit = 1 + topic.dynamicConf.SyncEvery = 10 + + writeEnd2 := topic.backend.GetQueueWriteEnd() + readStart2 := topic.backend.GetQueueReadStart().(*diskQueueEndInfo) + test.Equal(t, true, writeEnd2.IsSame(writeEnd)) + test.Equal(t, true, readStart2.IsSame(&readStart)) + topic.EnableForMaster() msgNum := 1000 channel := topic.GetChannel("ch") @@ -481,7 +504,8 @@ func TestTopicCleanOldDataByRetentionDayWithResetStart(t *testing.T) { var dend BackendQueueEnd for i := 0; i < msgNum; i++ { msg.ID = 0 - _, _, msgSize, dend, _ = topic.PutMessage(msg) + _, _, msgSize, dend, err = topic.PutMessage(msg) + test.Nil(t, err) msg.Timestamp = time.Now().Add(-1 * time.Hour * 24 * time.Duration(4-dend.(*diskQueueEndInfo).EndOffset.FileNum)).UnixNano() } topic.ForceFlush() @@ -489,7 +513,6 @@ func TestTopicCleanOldDataByRetentionDayWithResetStart(t *testing.T) { fileNum := topic.backend.diskWriteEnd.EndOffset.FileNum test.Equal(t, int64(readStart.EndOffset.FileNum), topic.backend.GetQueueReadStart().(*diskQueueEndInfo).EndOffset.FileNum) - test.Equal(t, true, fileNum == 2) for i := 0; i < msgNum; i++ { msg := <-channel.clientMsgChan channel.ConfirmBackendQueue(msg) @@ -578,6 +601,23 @@ func TestTopicResetWithQueueStart(t *testing.T) { test.Equal(t, int64(0), newEnd.EndOffset.Pos) test.Equal(t, resetStart.Offset(), channel.GetConfirmed().Offset()) test.Equal(t, resetStart.TotalMsgCnt(), channel.GetChannelEnd().TotalMsgCnt()) + newReadStart := topic.backend.GetQueueReadStart().(*diskQueueEndInfo) + test.Equal(t, resetStart.Offset(), newReadStart.Offset()) + test.Equal(t, resetStart.TotalMsgCnt(), newReadStart.TotalMsgCnt()) + + test.Equal(t, true, newEnd.IsSame(newReadStart)) + + // test reopen + nsqd.CloseExistingTopic("test", 0) + topic = nsqd.GetTopic("test", 0, false) + topic.dynamicConf.AutoCommit = 1 + topic.dynamicConf.SyncEvery = 10 + channel = topic.GetChannel("ch") + test.NotNil(t, channel) + newEnd2 := topic.backend.GetQueueWriteEnd().(*diskQueueEndInfo) + newReadStart2 := topic.backend.GetQueueReadStart().(*diskQueueEndInfo) + test.Equal(t, true, newEnd.IsSame(newEnd2)) + test.Equal(t, true, newReadStart.IsSame(newReadStart2)) for i := 0; i < msgNum; i++ { msg.ID = 0 @@ -606,6 +646,24 @@ func TestTopicResetWithQueueStart(t *testing.T) { test.Equal(t, int64(0), newEnd.EndOffset.Pos) test.Equal(t, resetStart.Offset(), channel.GetConfirmed().Offset()) test.Equal(t, resetStart.TotalMsgCnt(), channel.GetChannelEnd().TotalMsgCnt()) + + newReadStart = topic.backend.GetQueueReadStart().(*diskQueueEndInfo) + test.Equal(t, resetStart.Offset(), newReadStart.Offset()) + test.Equal(t, resetStart.TotalMsgCnt(), newReadStart.TotalMsgCnt()) + test.Equal(t, true, newEnd.IsSame(newReadStart)) + + // test reopen + nsqd.CloseExistingTopic("test", 0) + topic = nsqd.GetTopic("test", 0, false) + topic.dynamicConf.AutoCommit = 1 + topic.dynamicConf.SyncEvery = 10 + channel = topic.GetChannel("ch") + test.NotNil(t, channel) + newEnd2 = topic.backend.GetQueueWriteEnd().(*diskQueueEndInfo) + newReadStart2 = topic.backend.GetQueueReadStart().(*diskQueueEndInfo) + test.Equal(t, true, newEnd.IsSame(newEnd2)) + test.Equal(t, true, newReadStart.IsSame(newReadStart2)) + for i := 0; i < msgNum; i++ { msg.ID = 0 _, _, _, dend, _ = topic.PutMessage(msg) diff --git a/nsqdserver/http.go b/nsqdserver/http.go index ebe55dac2..f2fe841d0 100644 --- a/nsqdserver/http.go +++ b/nsqdserver/http.go @@ -90,6 +90,7 @@ func newHTTPServer(ctx *context, tlsEnabled bool, tlsRequired bool) *httpServer router.Handle("GET", "/delayqueue/backupto", http_api.Decorate(s.doDelayedQueueBackupTo, log, http_api.V1Stream)) router.Handle("POST", "/topic/greedyclean", http_api.Decorate(s.doGreedyCleanTopic, log, http_api.V1)) + router.Handle("POST", "/topic/fixdata", http_api.Decorate(s.doFixTopicData, log, http_api.V1)) //router.Handle("POST", "/topic/delete", http_api.Decorate(s.doDeleteTopic, http_api.DeprecatedAPI, log, http_api.V1)) router.Handle("POST", "/disable/write", http_api.Decorate(s.doDisableClusterWrite, log, http_api.V1)) @@ -242,6 +243,15 @@ func (s *httpServer) getExistingTopicFromQuery(req *http.Request) (url.Values, * return reqParams, topic, nil } +func (s *httpServer) doFixTopicData(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { + _, localTopic, err := s.getExistingTopicFromQuery(req) + if err != nil { + return nil, err + } + localTopic.TryFixData() + return nil, nil +} + func (s *httpServer) doGreedyCleanTopic(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { _, localTopic, err := s.getExistingTopicFromQuery(req) if err != nil { diff --git a/nsqdserver/protocol_v2.go b/nsqdserver/protocol_v2.go index 15297d51b..464c4d9b7 100644 --- a/nsqdserver/protocol_v2.go +++ b/nsqdserver/protocol_v2.go @@ -625,6 +625,7 @@ func (p *protocolV2) messagePump(client *nsqd.ClientV2, startedChan chan bool, // and the reader keep moving forward. offset, confirmedCnt, changed := subChannel.ConfirmBackendQueue(msg) subChannel.CleanWaitingRequeueChan(msg) + subChannel.TryRefreshChannelEnd() if changed && p.ctx.nsqdCoord != nil { p.ctx.nsqdCoord.SetChannelConsumeOffsetToCluster(subChannel, int64(offset), confirmedCnt, true) } @@ -636,8 +637,12 @@ func (p *protocolV2) messagePump(client *nsqd.ClientV2, startedChan chan bool, matched = !matched } if !matched { + if nsqd.NsqLogger().Level() >= levellogger.LOG_DETAIL { + nsqd.NsqLogger().Debugf("channel %v filtered message %v", subChannel.GetName(), nsqd.PrintMessageNoBody(msg)) + } subChannel.ConfirmBackendQueue(msg) subChannel.CleanWaitingRequeueChan(msg) + subChannel.TryRefreshChannelEnd() subChannel.ContinueConsumeForOrder() continue } @@ -1391,7 +1396,9 @@ func (p *protocolV2) preparePub(client *nsqd.ClientV2, params [][]byte, maxBody if err := p.CheckAuth(client, "PUB", topicName, ""); err != nil { return bodyLen, nil, err } - // mpub + if client.PubStats == nil { + client.PubStats = topic.GetDetailStats().InitPubClientStats(client.String(), client.UserAgent, "tcp") + } return bodyLen, topic, nil } @@ -1451,14 +1458,15 @@ func internalPubAsync(clientTimer *time.Timer, msgBody *bytes.Buffer, topic *nsq ExtContent: extContent, StartPub: time.Now(), } - if clientTimer == nil { - clientTimer = time.NewTimer(time.Second * 5) - } else { - clientTimer.Reset(time.Second * 5) - } + select { case topic.GetWaitChan() <- info: default: + if clientTimer == nil { + clientTimer = time.NewTimer(time.Second * 5) + } else { + clientTimer.Reset(time.Second * 5) + } select { case topic.GetWaitChan() <- info: case <-topic.QuitChan(): @@ -1566,59 +1574,58 @@ func (p *protocolV2) internalPubExtAndTrace(client *nsqd.ClientV2, params [][]by if needTraceRsp || atomic.LoadInt32(&topic.EnableTrace) == 1 { asyncAction = false } - if p.ctx.checkForMasterWrite(topicName, partition) { - if !topic.IsExt() && extContent.ExtVersion() != ext.NO_EXT_VER { - if p.ctx.getOpts().AllowExtCompatible { - filterIllegalZanTestHeader(topicName, jsonHeader) - } - canIgnoreExt := canIgnoreJsonHeader(topicName, jsonHeader) - if p.ctx.getOpts().AllowExtCompatible && canIgnoreExt { - extContent = ext.NewNoExt() - nsqd.NsqLogger().Debugf("ext content ignored in topic: %v", topicName) - } else { - nsqd.NsqLogger().Infof("ext content not supported in topic: %v", topicName) - return nil, protocol.NewClientErr(nil, ext.E_EXT_NOT_SUPPORT, - fmt.Sprintf("ext content not supported in topic %v", topicName)) - } + if !topic.IsExt() && extContent.ExtVersion() != ext.NO_EXT_VER { + if p.ctx.getOpts().AllowExtCompatible { + filterIllegalZanTestHeader(topicName, jsonHeader) } - id := nsqd.MessageID(0) - offset := nsqd.BackendOffset(0) - rawSize := int32(0) - if asyncAction { - err = internalPubAsync(client.PubTimeout, messageBodyBuffer, topic, extContent) + canIgnoreExt := canIgnoreJsonHeader(topicName, jsonHeader) + if p.ctx.getOpts().AllowExtCompatible && canIgnoreExt { + extContent = ext.NewNoExt() + nsqd.NsqLogger().Debugf("ext content ignored in topic: %v", topicName) } else { - id, offset, rawSize, _, err = p.ctx.PutMessage(topic, realBody, extContent, traceID) + nsqd.NsqLogger().Infof("ext content not supported in topic: %v", topicName) + return nil, protocol.NewClientErr(nil, ext.E_EXT_NOT_SUPPORT, + fmt.Sprintf("ext content not supported in topic %v", topicName)) } - //p.ctx.setHealth(err) - if err != nil { - topic.GetDetailStats().UpdatePubClientStats(client.String(), client.UserAgent, "tcp", 1, true) - nsqd.NsqLogger().LogErrorf("topic %v put message failed: %v", topic.GetFullName(), err) - if clusterErr, ok := err.(*consistence.CommonCoordErr); ok { - if !clusterErr.IsLocalErr() { - return nil, protocol.NewClientErr(err, FailedOnNotWritable, "") - } - } - return nil, protocol.NewClientErr(err, "E_PUB_FAILED", err.Error()) + } + id := nsqd.MessageID(0) + offset := nsqd.BackendOffset(0) + rawSize := int32(0) + if asyncAction { + err = internalPubAsync(client.PubTimeout, messageBodyBuffer, topic, extContent) + } else { + id, offset, rawSize, _, err = p.ctx.PutMessage(topic, realBody, extContent, traceID) + } + //p.ctx.setHealth(err) + if err != nil { + if client.PubStats != nil { + client.PubStats.IncrCounter(1, true) } - topic.GetDetailStats().UpdatePubClientStats(client.String(), client.UserAgent, "tcp", 1, false) - cost := time.Now().UnixNano() - startPub - topic.GetDetailStats().UpdateTopicMsgStats(int64(len(realBody)), cost/1000) - - if traceID != 0 || atomic.LoadInt32(&topic.EnableTrace) == 1 || nsqd.NsqLogger().Level() >= levellogger.LOG_DETAIL { - nsqd.GetMsgTracer().TracePubClient(topic.GetTopicName(), topic.GetTopicPart(), traceID, id, offset, client.String()) + nsqd.NsqLogger().LogErrorf("topic %v put message failed: %v, from: %v", topic.GetFullName(), err, client.String()) + if !p.ctx.checkForMasterWrite(topicName, partition) { + topic.DisableForSlave() + return nil, protocol.NewClientErr(err, FailedOnNotLeader, "") } - if needTraceRsp { - return getTracedReponse(id, traceID, offset, rawSize) + if clusterErr, ok := err.(*consistence.CommonCoordErr); ok { + if !clusterErr.IsLocalErr() { + return nil, protocol.NewClientErr(err, FailedOnNotWritable, "") + } } - return okBytes, nil - } else { - topic.GetDetailStats().UpdatePubClientStats(client.String(), client.UserAgent, "tcp", 1, true) - //forward to master of topic - nsqd.NsqLogger().LogDebugf("should put to master: %v, from %v", - topic.GetFullName(), client.String()) - topic.DisableForSlave() - return nil, protocol.NewClientErr(err, FailedOnNotLeader, "") + return nil, protocol.NewClientErr(err, "E_PUB_FAILED", err.Error()) + } + if client.PubStats != nil { + client.PubStats.IncrCounter(1, false) + } + cost := time.Now().UnixNano() - startPub + topic.GetDetailStats().UpdateTopicMsgStats(int64(len(realBody)), cost/1000) + + if traceID != 0 || atomic.LoadInt32(&topic.EnableTrace) == 1 || nsqd.NsqLogger().Level() >= levellogger.LOG_DETAIL { + nsqd.GetMsgTracer().TracePubClient(topic.GetTopicName(), topic.GetTopicPart(), traceID, id, offset, client.String()) } + if needTraceRsp { + return getTracedReponse(id, traceID, offset, rawSize) + } + return okBytes, nil } func (p *protocolV2) internalMPUBEXTAndTrace(client *nsqd.ClientV2, params [][]byte, mpubExt bool, traceEnable bool) ([]byte, error) { @@ -1648,7 +1655,9 @@ func (p *protocolV2) internalMPUBEXTAndTrace(client *nsqd.ClientV2, params [][]b if err != nil { topic.IncrPubFailed() incrServerPubFailed() - topic.GetDetailStats().UpdatePubClientStats(client.String(), client.UserAgent, "tcp", int64(len(messages)), true) + if client.PubStats != nil { + client.PubStats.IncrCounter(int64(len(messages)), true) + } nsqd.NsqLogger().LogErrorf("topic %v put message failed: %v", topic.GetFullName(), err) if clusterErr, ok := err.(*consistence.CommonCoordErr); ok { @@ -1658,7 +1667,9 @@ func (p *protocolV2) internalMPUBEXTAndTrace(client *nsqd.ClientV2, params [][]b } return nil, protocol.NewFatalClientErr(err, "E_MPUB_FAILED", err.Error()) } - topic.GetDetailStats().UpdatePubClientStats(client.String(), client.UserAgent, "tcp", int64(len(messages)), false) + if client.PubStats != nil { + client.PubStats.IncrCounter(int64(len(messages)), false) + } cost := time.Now().UnixNano() - startPub topic.GetDetailStats().BatchUpdateTopicLatencyStats(cost/int64(time.Microsecond), int64(len(messages))) if !traceEnable { @@ -1668,7 +1679,9 @@ func (p *protocolV2) internalMPUBEXTAndTrace(client *nsqd.ClientV2, params [][]b } else { topic.IncrPubFailed() incrServerPubFailed() - topic.GetDetailStats().UpdatePubClientStats(client.String(), client.UserAgent, "tcp", int64(len(messages)), true) + if client.PubStats != nil { + client.PubStats.IncrCounter(int64(len(messages)), true) + } //forward to master of topic nsqd.NsqLogger().LogDebugf("should put to master: %v, from %v", topic.GetFullName(), client.String()) diff --git a/nsqdserver/protocol_v2_test.go b/nsqdserver/protocol_v2_test.go index 3958a72c1..62786026f 100644 --- a/nsqdserver/protocol_v2_test.go +++ b/nsqdserver/protocol_v2_test.go @@ -145,6 +145,18 @@ func readValidate(t *testing.T, conn io.ReadWriter, f int32, d string) []byte { } } +func closeConnAfterTimeout(conn net.Conn, to time.Duration, stop chan int) { + go func() { + select { + case <-time.After(to): + conn.Close() + case <-stop: + return + } + + }() +} + func recvNextMsgAndCheckClientMsg(t *testing.T, conn io.ReadWriter, expLen int, expTraceID uint64, autoFin bool) *nsq.Message { for { resp, err := nsq.ReadResponse(conn) @@ -264,6 +276,7 @@ func recvNextMsgAndCheckExt(t *testing.T, conn io.ReadWriter, test.Nil(t, err) if expLen > 0 { test.Equal(t, expLen, len(msgOut.Body)) + t.Logf("msg body: %v", string(msgOut.Body)) } if expTraceID > 0 { traceID := binary.BigEndian.Uint64(msgOut.ID[8:]) @@ -4319,7 +4332,12 @@ func TestSubOrderedWithFilter(t *testing.T) { opts := nsqdNs.NewOptions() opts.Logger = newTestLogger(t) - opts.LogLevel = 1 + opts.LogLevel = 2 + opts.SyncTimeout = time.Minute + if testing.Verbose() { + opts.LogLevel = 4 + nsqdNs.SetLogger(opts.Logger) + } tcpAddr, _, nsqd, nsqdServer := mustStartNSQD(opts) defer os.RemoveAll(opts.DataPath) defer nsqdServer.Exit() @@ -4329,11 +4347,11 @@ func TestSubOrderedWithFilter(t *testing.T) { topic := nsqd.GetTopicIgnPart(topicName) topicDynConf := nsqdNs.TopicDynamicConf{ AutoCommit: 1, - SyncEvery: 1, + SyncEvery: 100, Ext: true, } topic.SetDynamicInfo(topicDynConf, nil) - topic.GetChannel("ordered_ch") + channel := topic.GetChannel("ordered_ch") clientParams := make(map[string]interface{}) clientParams["client_id"] = "client_b" @@ -4353,8 +4371,15 @@ func TestSubOrderedWithFilter(t *testing.T) { } // since no match, will recv no message - time.Sleep(time.Second) + for { + time.Sleep(time.Second) + if channel.Depth() == 0 { + break + } + } conn.Close() + // wait old connection exit + time.Sleep(time.Second) for i := 0; i < 10; i++ { topic.PutMessage(nsqdNs.NewMessage(0, []byte("second"))) } @@ -4369,8 +4394,10 @@ func TestSubOrderedWithFilter(t *testing.T) { _, err = nsq.Ready(1).WriteTo(conn) test.Equal(t, err, nil) defer conn.Close() + closeConnAfterTimeout(conn, time.Second*10, nil) for i := 0; i < 10; i++ { msgOut := recvNextMsgAndCheckExt(t, conn, 0, msg.TraceID, true, true) + test.NotNil(t, msgOut) msgOut.Body = msgOut.Body[12:] test.Equal(t, msgOut.Body, []byte("second")) }