Skip to content

Commit

Permalink
observer(ticdc): reduce observer logs when error happens (#8360)
Browse files Browse the repository at this point in the history
close #8354
  • Loading branch information
amyangfei authored Feb 28, 2023
1 parent f527780 commit dca26ee
Showing 1 changed file with 15 additions and 2 deletions.
17 changes: 15 additions & 2 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,15 @@ package owner

import (
"context"
"fmt"
"strings"
"sync"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/tidb/errno"
timodel "github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tiflow/cdc/contextutil"
"github.com/pingcap/tiflow/cdc/model"
Expand Down Expand Up @@ -1045,13 +1048,23 @@ func (c *changefeed) checkUpstream() (skip bool, err error) {
// observer, if needed run it in an independent goroutine with 5s timeout.
func (c *changefeed) tickDownstreamObserver(ctx context.Context) {
if time.Since(c.observerLastTick.Load()) > downstreamObserverTickDuration {
c.observerLastTick.Store(time.Now())
select {
case <-ctx.Done():
return
default:
}
go func() {
cctx, cancel := context.WithTimeout(ctx, time.Second*5)
defer cancel()
if err := c.downstreamObserver.Tick(cctx); err != nil {
log.Error("backend observer tick error", zap.Error(err))
// Prometheus is not deployed, it happens in non production env.
if strings.Contains(err.Error(),
fmt.Sprintf(":%d", errno.ErrPrometheusAddrIsNotSet)) {
return
}
log.Warn("backend observer tick error", zap.Error(err))
}
c.observerLastTick.Store(time.Now())
}()
}
}

0 comments on commit dca26ee

Please sign in to comment.