Skip to content

Commit

Permalink
*: Using the global singleton for pd client and tikv client, and fix …
Browse files Browse the repository at this point in the history
…pd client freeze (#1217) #1220
  • Loading branch information
ti-srebot authored Dec 16, 2020
1 parent a8bafea commit bbb6aa5
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 25 deletions.
6 changes: 5 additions & 1 deletion cdc/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
cerror "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/security"
"github.com/pingcap/ticdc/pkg/util"
pd "github.com/tikv/pd/client"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/clientv3/concurrency"
"go.etcd.io/etcd/mvcc"
Expand All @@ -49,6 +50,7 @@ type processorOpts struct {
// Capture represents a Capture server, it monitors the changefeed information in etcd and schedules Task on it.
type Capture struct {
etcdClient kv.CDCEtcdClient
pdCli pd.Client
credential *security.Credential

processors map[string]*processor
Expand All @@ -67,6 +69,7 @@ type Capture struct {
func NewCapture(
ctx context.Context,
pdEndpoints []string,
pdCli pd.Client,
credential *security.Credential,
advertiseAddr string,
opts *processorOpts,
Expand Down Expand Up @@ -126,6 +129,7 @@ func NewCapture(
election: elec,
info: info,
opts: opts,
pdCli: pdCli,
}

return
Expand Down Expand Up @@ -252,7 +256,7 @@ func (c *Capture) assignTask(ctx context.Context, task *Task) (*processor, error
zap.String("changefeed", task.ChangeFeedID))

p, err := runProcessor(
ctx, c.credential, c.session, *cf, task.ChangeFeedID, *c.info, task.CheckpointTS, c.opts.flushCheckpointInterval)
ctx, c.pdCli, c.credential, c.session, *cf, task.ChangeFeedID, *c.info, task.CheckpointTS, c.opts.flushCheckpointInterval)
if err != nil {
log.Error("run processor failed",
zap.String("changefeed", task.ChangeFeedID),
Expand Down
6 changes: 3 additions & 3 deletions cdc/owner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,7 @@ func (s *ownerSuite) TestHandleAdmin(c *check.C) {
defer sink.Close() //nolint:errcheck
sampleCF.sink = sink

capture, err := NewCapture(ctx, []string{s.clientURL.String()},
capture, err := NewCapture(ctx, []string{s.clientURL.String()}, nil,
&security.Credential{}, "127.0.0.1:12034", &processorOpts{flushCheckpointInterval: time.Millisecond * 200})
c.Assert(err, check.IsNil)
err = capture.Campaign(ctx)
Expand Down Expand Up @@ -806,7 +806,7 @@ func (s *ownerSuite) TestWatchCampaignKey(c *check.C) {
defer s.TearDownTest(c)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
capture, err := NewCapture(ctx, []string{s.clientURL.String()},
capture, err := NewCapture(ctx, []string{s.clientURL.String()}, nil,
&security.Credential{}, "127.0.0.1:12034", &processorOpts{})
c.Assert(err, check.IsNil)
err = capture.Campaign(ctx)
Expand Down Expand Up @@ -864,7 +864,7 @@ func (s *ownerSuite) TestCleanUpStaleTasks(c *check.C) {
defer cancel()
addr := "127.0.0.1:12034"
ctx = util.PutCaptureAddrInCtx(ctx, addr)
capture, err := NewCapture(ctx, []string{s.clientURL.String()},
capture, err := NewCapture(ctx, []string{s.clientURL.String()}, nil,
&security.Credential{}, addr, &processorOpts{})
c.Assert(err, check.IsNil)
err = s.client.PutCaptureInfo(ctx, capture.info, capture.session.Lease())
Expand Down
12 changes: 3 additions & 9 deletions cdc/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,6 @@ const (
schemaStorageGCLag = time.Minute * 20
)

var fNewPDCli = pd.NewClientWithContext

type processor struct {
id string
captureInfo model.CaptureInfo
Expand Down Expand Up @@ -156,6 +154,7 @@ func (t *tableInfo) safeStop() (stopped bool, checkpointTs model.Ts) {
// newProcessor creates and returns a processor for the specified change feed
func newProcessor(
ctx context.Context,
pdCli pd.Client,
credential *security.Credential,
session *concurrency.Session,
changefeed model.ChangeFeedInfo,
Expand All @@ -167,12 +166,6 @@ func newProcessor(
flushCheckpointInterval time.Duration,
) (*processor, error) {
etcdCli := session.Client()
endpoints := session.Client().Endpoints()
pdCli, err := fNewPDCli(ctx, endpoints, credential.PDSecurityOption())
if err != nil {
return nil, errors.Annotatef(
cerror.WrapError(cerror.ErrNewProcessorFailed, err), "create pd client failed, addr: %v", endpoints)
}
cdcEtcdCli := kv.NewCDCEtcdClient(ctx, etcdCli)
limitter := puller.NewBlurResourceLimmter(defaultMemBufferCapacity)

Expand Down Expand Up @@ -1242,6 +1235,7 @@ func (p *processor) isStopped() bool {
// runProcessor creates a new processor then starts it.
func runProcessor(
ctx context.Context,
pdCli pd.Client,
credential *security.Credential,
session *concurrency.Session,
info model.ChangeFeedInfo,
Expand All @@ -1268,7 +1262,7 @@ func runProcessor(
cancel()
return nil, errors.Trace(err)
}
processor, err := newProcessor(ctx, credential, session, info, sink,
processor, err := newProcessor(ctx, pdCli, credential, session, info, sink,
changefeedID, captureInfo, checkpointTs, errCh, flushCheckpointInterval)
if err != nil {
cancel()
Expand Down
25 changes: 13 additions & 12 deletions cdc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,18 @@ func (s *Server) Run(ctx context.Context) error {
if err != nil {
return err
}

kvStore, err := kv.CreateTiStore(strings.Join(s.pdEndpoints, ","), s.opts.credential)
if err != nil {
return errors.Trace(err)
}
defer func() {
err := kvStore.Close()
if err != nil {
log.Warn("kv store close failed", zap.Error(err))
}
}()
ctx = util.PutKVStorageInCtx(ctx, kvStore)
// When a capture suicided, restart it
for {
if err := s.run(ctx); cerror.ErrCaptureSuicide.NotEqual(err) {
Expand Down Expand Up @@ -306,20 +318,9 @@ func (s *Server) campaignOwnerLoop(ctx context.Context) error {
func (s *Server) run(ctx context.Context) (err error) {
ctx = util.PutCaptureAddrInCtx(ctx, s.opts.advertiseAddr)
ctx = util.PutTimezoneInCtx(ctx, s.opts.timezone)
kvStore, err := kv.CreateTiStore(strings.Join(s.pdEndpoints, ","), s.opts.credential)
if err != nil {
return errors.Trace(err)
}
defer func() {
err := kvStore.Close()
if err != nil {
log.Warn("kv store close failed", zap.Error(err))
}
}()
ctx = util.PutKVStorageInCtx(ctx, kvStore)

procOpts := &processorOpts{flushCheckpointInterval: s.opts.processorFlushInterval}
capture, err := NewCapture(ctx, s.pdEndpoints, s.opts.credential, s.opts.advertiseAddr, procOpts)
capture, err := NewCapture(ctx, s.pdEndpoints, s.pdClient, s.opts.credential, s.opts.advertiseAddr, procOpts)
if err != nil {
return err
}
Expand Down

0 comments on commit bbb6aa5

Please sign in to comment.