Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: fix memory leak introduced by timer.After #6720

Merged
merged 11 commits into from
Jul 3, 2023
4 changes: 3 additions & 1 deletion client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -526,14 +526,16 @@ func newClientWithKeyspaceName(

func (c *client) initRetry(f func(s string) error, str string) error {
var err error
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for i := 0; i < c.option.maxRetryTimes; i++ {
if err = f(str); err == nil {
return nil
}
select {
case <-c.ctx.Done():
return err
case <-time.After(time.Second):
case <-ticker.C:
}
}
return errors.WithStack(err)
Expand Down
4 changes: 3 additions & 1 deletion client/pd_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,14 +206,16 @@ func (c *pdServiceDiscovery) Init() error {

func (c *pdServiceDiscovery) initRetry(f func() error) error {
var err error
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for i := 0; i < c.option.maxRetryTimes; i++ {
if err = f(); err == nil {
return nil
}
select {
case <-c.ctx.Done():
return err
case <-time.After(time.Second):
case <-ticker.C:
}
}
return errors.WithStack(err)
Expand Down
4 changes: 3 additions & 1 deletion client/resource_manager_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,8 @@ func (c *client) tryResourceManagerConnect(ctx context.Context, connection *reso
err error
stream rmpb.ResourceManager_AcquireTokenBucketsClient
)
ticker := time.NewTicker(retryInterval)
defer ticker.Stop()
for i := 0; i < maxRetryTimes; i++ {
cc, err := c.resourceManagerClient()
if err != nil {
Expand All @@ -406,7 +408,7 @@ func (c *client) tryResourceManagerConnect(ctx context.Context, connection *reso
select {
case <-ctx.Done():
return err
case <-time.After(retryInterval):
case <-ticker.C:
}
}
return err
Expand Down
60 changes: 45 additions & 15 deletions client/tso_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/tikv/pd/client/errs"
"github.com/tikv/pd/client/grpcutil"
"github.com/tikv/pd/client/tsoutil"
"github.com/tikv/pd/pkg/timerpool"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -138,12 +139,27 @@ func (c *tsoClient) updateTSODispatcher() {
})
}

type deadline struct {
timer <-chan time.Time
// TSDeadline is used to watch the deadline of each tso request.
type TSDeadline struct {
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
timer *time.Timer
done chan struct{}
cancel context.CancelFunc
}

// NewTSDeadline creates a new TSDeadline.
func NewTSDeadline(
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
timeout time.Duration,
done chan struct{},
cancel context.CancelFunc,
) *TSDeadline {
timer := timerpool.GlobalTimerPool.Get(timeout)
return &TSDeadline{
timer: timer,
done: done,
cancel: cancel,
}
}

func (c *tsoClient) tsCancelLoop() {
defer c.wg.Done()

Expand Down Expand Up @@ -172,19 +188,21 @@ func (c *tsoClient) tsCancelLoop() {

func (c *tsoClient) watchTSDeadline(ctx context.Context, dcLocation string) {
if _, exist := c.tsDeadline.Load(dcLocation); !exist {
tsDeadlineCh := make(chan deadline, 1)
tsDeadlineCh := make(chan *TSDeadline, 1)
c.tsDeadline.Store(dcLocation, tsDeadlineCh)
go func(dc string, tsDeadlineCh <-chan deadline) {
go func(dc string, tsDeadlineCh <-chan *TSDeadline) {
for {
select {
case d := <-tsDeadlineCh:
select {
case <-d.timer:
case <-d.timer.C:
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
log.Error("[tso] tso request is canceled due to timeout", zap.String("dc-location", dc), errs.ZapError(errs.ErrClientGetTSOTimeout))
d.cancel()
timerpool.GlobalTimerPool.Put(d.timer)
case <-d.done:
continue
timerpool.GlobalTimerPool.Put(d.timer)
case <-ctx.Done():
timerpool.GlobalTimerPool.Put(d.timer)
return
}
case <-ctx.Done():
Expand Down Expand Up @@ -234,6 +252,8 @@ func (c *tsoClient) checkAllocator(
}()
cc, u := c.GetTSOAllocatorClientConnByDCLocation(dc)
healthCli := healthpb.NewHealthClient(cc)
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
// the pd/allocator leader change, we need to re-establish the stream
if u != url {
Expand All @@ -259,7 +279,7 @@ func (c *tsoClient) checkAllocator(
select {
case <-dispatcherCtx.Done():
return
case <-time.After(time.Second):
case <-ticker.C:
// To ensure we can get the latest allocator leader
// and once the leader is changed, we can exit this function.
_, u = c.GetTSOAllocatorClientConnByDCLocation(dc)
Expand Down Expand Up @@ -389,6 +409,15 @@ tsoBatchLoop:
if maxBatchWaitInterval >= 0 {
tbc.adjustBestBatchSize()
}
// Stop the timer if it's not stopped.
if !streamLoopTimer.Stop() {
select {
case <-streamLoopTimer.C: // try to drain from the channel
rleungx marked this conversation as resolved.
Show resolved Hide resolved
default:
}
}
// We need be careful here, see more details in the comments of Timer.Reset.
// https://pkg.go.dev/time@master#Timer.Reset
streamLoopTimer.Reset(c.option.timeout)
// Choose a stream to send the TSO gRPC request.
streamChoosingLoop:
Expand All @@ -403,16 +432,19 @@ tsoBatchLoop:
if c.updateTSOConnectionCtxs(dispatcherCtx, dc, &connectionCtxs) {
continue streamChoosingLoop
}
timer := time.NewTimer(retryInterval)
select {
case <-dispatcherCtx.Done():
timer.Stop()
return
case <-streamLoopTimer.C:
err = errs.ErrClientCreateTSOStream.FastGenByArgs(errs.RetryTimeoutErr)
log.Error("[tso] create tso stream error", zap.String("dc-location", dc), errs.ZapError(err))
c.svcDiscovery.ScheduleCheckMemberChanged()
c.finishRequest(tbc.getCollectedRequests(), 0, 0, 0, errors.WithStack(err))
timer.Stop()
continue tsoBatchLoop
case <-time.After(retryInterval):
case <-timer.C:
continue streamChoosingLoop
}
}
Expand All @@ -429,11 +461,7 @@ tsoBatchLoop:
}
}
done := make(chan struct{})
dl := deadline{
timer: time.After(c.option.timeout),
done: done,
cancel: cancel,
}
dl := NewTSDeadline(c.option.timeout, done, cancel)
tsDeadlineCh, ok := c.tsDeadline.Load(dc)
for !ok || tsDeadlineCh == nil {
c.scheduleCheckTSDeadline()
Expand All @@ -443,7 +471,7 @@ tsoBatchLoop:
select {
case <-dispatcherCtx.Done():
return
case tsDeadlineCh.(chan deadline) <- dl:
case tsDeadlineCh.(chan *TSDeadline) <- dl:
}
opts = extractSpanReference(tbc, opts[:0])
err = c.processRequests(stream, dc, tbc, opts)
Expand Down Expand Up @@ -558,6 +586,8 @@ func (c *tsoClient) tryConnectToTSO(
}
// retry several times before falling back to the follower when the network problem happens

ticker := time.NewTicker(retryInterval)
defer ticker.Stop()
for i := 0; i < maxRetryTimes; i++ {
c.svcDiscovery.ScheduleCheckMemberChanged()
cc, url = c.GetTSOAllocatorClientConnByDCLocation(dc)
Expand Down Expand Up @@ -587,7 +617,7 @@ func (c *tsoClient) tryConnectToTSO(
select {
case <-dispatcherCtx.Done():
return err
case <-time.After(retryInterval):
case <-ticker.C:
}
}

Expand Down
8 changes: 6 additions & 2 deletions client/tso_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,14 +209,16 @@ func (c *tsoServiceDiscovery) retry(
maxRetryTimes int, retryInterval time.Duration, f func() error,
) error {
var err error
ticker := time.NewTicker(retryInterval)
defer ticker.Stop()
for i := 0; i < maxRetryTimes; i++ {
if err = f(); err == nil {
return nil
}
select {
case <-c.ctx.Done():
return err
case <-time.After(retryInterval):
case <-ticker.C:
}
}
return errors.WithStack(err)
Expand Down Expand Up @@ -245,11 +247,13 @@ func (c *tsoServiceDiscovery) startCheckMemberLoop() {

ctx, cancel := context.WithCancel(c.ctx)
defer cancel()
ticker := time.NewTicker(memberUpdateInterval)
defer ticker.Stop()

for {
select {
case <-c.checkMembershipCh:
case <-time.After(memberUpdateInterval):
case <-ticker.C:
case <-ctx.Done():
log.Info("[tso] exit check member loop")
return
Expand Down
4 changes: 3 additions & 1 deletion client/tso_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,12 @@ func (b *tsoTSOStreamBuilder) build(
}

func checkStreamTimeout(ctx context.Context, cancel context.CancelFunc, done chan struct{}, timeout time.Duration) {
timer := time.NewTimer(timeout)
defer timer.Stop()
select {
case <-done:
return
case <-time.After(timeout):
case <-timer.C:
cancel()
case <-ctx.Done():
}
Expand Down
14 changes: 13 additions & 1 deletion pkg/election/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ func (l *lease) KeepAlive(ctx context.Context) {
timeCh := l.keepAliveWorker(ctx, l.leaseTimeout/3)

var maxExpire time.Time
timer := time.NewTimer(l.leaseTimeout)
defer timer.Stop()
for {
select {
case t := <-timeCh:
Expand All @@ -122,7 +124,17 @@ func (l *lease) KeepAlive(ctx context.Context) {
l.expireTime.Store(t)
}
}
case <-time.After(l.leaseTimeout):
// Stop the timer if it's not stopped.
if !timer.Stop() {
select {
case <-timer.C: // try to drain from the channel
default:
}
}
// We need be careful here, see more details in the comments of Timer.Reset.
// https://pkg.go.dev/time@master#Timer.Reset
timer.Reset(l.leaseTimeout)
case <-timer.C:
log.Info("lease timeout", zap.Time("expire", l.expireTime.Load().(time.Time)), zap.String("purpose", l.Purpose))
return
case <-ctx.Done():
Expand Down
4 changes: 3 additions & 1 deletion pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,8 @@ func (s *Server) waitAPIServiceReady() error {
ready bool
err error
)
ticker := time.NewTicker(retryIntervalWaitAPIService)
defer ticker.Stop()
for i := 0; i < maxRetryTimesWaitAPIService; i++ {
ready, err = s.isAPIServiceReady()
if err == nil && ready {
Expand All @@ -604,7 +606,7 @@ func (s *Server) waitAPIServiceReady() error {
select {
case <-s.ctx.Done():
return errors.New("context canceled while waiting api server ready")
case <-time.After(retryIntervalWaitAPIService):
case <-ticker.C:
}
}
if err != nil {
Expand Down
4 changes: 3 additions & 1 deletion pkg/mcs/utils/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,16 @@ const (

// InitClusterID initializes the cluster ID.
func InitClusterID(ctx context.Context, client *clientv3.Client) (id uint64, err error) {
ticker := time.NewTicker(retryInterval)
defer ticker.Stop()
for i := 0; i < maxRetryTimes; i++ {
if clusterID, err := etcdutil.GetClusterID(client, clusterIDPath); err == nil && clusterID != 0 {
return clusterID, nil
}
select {
case <-ctx.Done():
return 0, err
case <-time.After(retryInterval):
case <-ticker.C:
}
}
return 0, errors.Errorf("failed to init cluster ID after retrying %d times", maxRetryTimes)
Expand Down
8 changes: 6 additions & 2 deletions pkg/replication/replication_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,14 +375,18 @@ const (
// Run starts the background job.
func (m *ModeManager) Run(ctx context.Context) {
// Wait for a while when just start, in case tikv do not connect in time.
timer := time.NewTimer(idleTimeout)
defer timer.Stop()
select {
case <-time.After(idleTimeout):
case <-timer.C:
case <-ctx.Done():
return
}
ticker := time.NewTicker(tickInterval)
defer ticker.Stop()
for {
select {
case <-time.After(tickInterval):
case <-ticker.C:
case <-ctx.Done():
return
}
Expand Down
14 changes: 6 additions & 8 deletions pkg/schedule/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,8 @@ func (c *Coordinator) PatrolRegions() {
defer logutil.LogPanic()

defer c.wg.Done()
timer := time.NewTimer(c.cluster.GetOpts().GetPatrolRegionInterval())
defer timer.Stop()
ticker := time.NewTicker(c.cluster.GetOpts().GetPatrolRegionInterval())
defer ticker.Stop()

log.Info("Coordinator starts patrol regions")
start := time.Now()
Expand All @@ -139,8 +139,7 @@ func (c *Coordinator) PatrolRegions() {
)
for {
select {
case <-timer.C:
timer.Reset(c.cluster.GetOpts().GetPatrolRegionInterval())
case <-ticker.C:
case <-c.ctx.Done():
log.Info("patrol regions has been stopped")
return
Expand Down Expand Up @@ -848,12 +847,11 @@ func (c *Coordinator) runScheduler(s *scheduleController) {
defer c.wg.Done()
defer s.Scheduler.Cleanup(c.cluster)

timer := time.NewTimer(s.GetInterval())
defer timer.Stop()
ticker := time.NewTicker(s.GetInterval())
defer ticker.Stop()
for {
select {
case <-timer.C:
timer.Reset(s.GetInterval())
case <-ticker.C:
diagnosable := s.diagnosticRecorder.isAllowed()
if !s.AllowSchedule(diagnosable) {
continue
Expand Down
Loading