Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: Using the global singleton for pd client and tikv client, and fix pd client freeze #1217

Merged
merged 2 commits into from
Dec 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion cdc/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
cerror "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/security"
"github.com/pingcap/ticdc/pkg/util"
pd "github.com/tikv/pd/client"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/clientv3/concurrency"
"go.etcd.io/etcd/mvcc"
Expand All @@ -49,6 +50,7 @@ type processorOpts struct {
// Capture represents a Capture server, it monitors the changefeed information in etcd and schedules Task on it.
type Capture struct {
etcdClient kv.CDCEtcdClient
pdCli pd.Client
credential *security.Credential

processors map[string]*processor
Expand All @@ -67,6 +69,7 @@ type Capture struct {
func NewCapture(
ctx context.Context,
pdEndpoints []string,
pdCli pd.Client,
credential *security.Credential,
advertiseAddr string,
opts *processorOpts,
Expand Down Expand Up @@ -126,6 +129,7 @@ func NewCapture(
election: elec,
info: info,
opts: opts,
pdCli: pdCli,
}

return
Expand Down Expand Up @@ -245,7 +249,7 @@ func (c *Capture) assignTask(ctx context.Context, task *Task) (*processor, error
zap.String("changefeed", task.ChangeFeedID))

p, err := runProcessor(
ctx, c.credential, c.session, *cf, task.ChangeFeedID, *c.info, task.CheckpointTS, c.opts.flushCheckpointInterval)
ctx, c.pdCli, c.credential, c.session, *cf, task.ChangeFeedID, *c.info, task.CheckpointTS, c.opts.flushCheckpointInterval)
if err != nil {
log.Error("run processor failed",
zap.String("changefeed", task.ChangeFeedID),
Expand Down
6 changes: 3 additions & 3 deletions cdc/owner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,7 @@ func (s *ownerSuite) TestHandleAdmin(c *check.C) {
defer sink.Close() //nolint:errcheck
sampleCF.sink = sink

capture, err := NewCapture(ctx, []string{s.clientURL.String()},
capture, err := NewCapture(ctx, []string{s.clientURL.String()}, nil,
&security.Credential{}, "127.0.0.1:12034", &processorOpts{flushCheckpointInterval: time.Millisecond * 200})
c.Assert(err, check.IsNil)
err = capture.Campaign(ctx)
Expand Down Expand Up @@ -806,7 +806,7 @@ func (s *ownerSuite) TestWatchCampaignKey(c *check.C) {
defer s.TearDownTest(c)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
capture, err := NewCapture(ctx, []string{s.clientURL.String()},
capture, err := NewCapture(ctx, []string{s.clientURL.String()}, nil,
&security.Credential{}, "127.0.0.1:12034", &processorOpts{})
c.Assert(err, check.IsNil)
err = capture.Campaign(ctx)
Expand Down Expand Up @@ -864,7 +864,7 @@ func (s *ownerSuite) TestCleanUpStaleTasks(c *check.C) {
defer cancel()
addr := "127.0.0.1:12034"
ctx = util.PutCaptureAddrInCtx(ctx, addr)
capture, err := NewCapture(ctx, []string{s.clientURL.String()},
capture, err := NewCapture(ctx, []string{s.clientURL.String()}, nil,
&security.Credential{}, addr, &processorOpts{})
c.Assert(err, check.IsNil)
err = s.client.PutCaptureInfo(ctx, capture.info, capture.session.Lease())
Expand Down
12 changes: 3 additions & 9 deletions cdc/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,6 @@ const (
schemaStorageGCLag = time.Minute * 20
)

var fNewPDCli = pd.NewClientWithContext

type processor struct {
id string
captureInfo model.CaptureInfo
Expand Down Expand Up @@ -156,6 +154,7 @@ func (t *tableInfo) safeStop() (stopped bool, checkpointTs model.Ts) {
// newProcessor creates and returns a processor for the specified change feed
func newProcessor(
ctx context.Context,
pdCli pd.Client,
credential *security.Credential,
session *concurrency.Session,
changefeed model.ChangeFeedInfo,
Expand All @@ -167,12 +166,6 @@ func newProcessor(
flushCheckpointInterval time.Duration,
) (*processor, error) {
etcdCli := session.Client()
endpoints := session.Client().Endpoints()
pdCli, err := fNewPDCli(ctx, endpoints, credential.PDSecurityOption())
if err != nil {
return nil, errors.Annotatef(
cerror.WrapError(cerror.ErrNewProcessorFailed, err), "create pd client failed, addr: %v", endpoints)
}
cdcEtcdCli := kv.NewCDCEtcdClient(ctx, etcdCli)
limitter := puller.NewBlurResourceLimmter(defaultMemBufferCapacity)

Expand Down Expand Up @@ -1258,6 +1251,7 @@ func (p *processor) isStopped() bool {
// runProcessor creates a new processor then starts it.
func runProcessor(
ctx context.Context,
pdCli pd.Client,
credential *security.Credential,
session *concurrency.Session,
info model.ChangeFeedInfo,
Expand All @@ -1284,7 +1278,7 @@ func runProcessor(
cancel()
return nil, errors.Trace(err)
}
processor, err := newProcessor(ctx, credential, session, info, sink,
processor, err := newProcessor(ctx, pdCli, credential, session, info, sink,
changefeedID, captureInfo, checkpointTs, errCh, flushCheckpointInterval)
if err != nil {
cancel()
Expand Down
25 changes: 13 additions & 12 deletions cdc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,18 @@ func (s *Server) Run(ctx context.Context) error {
if err != nil {
return err
}

kvStore, err := kv.CreateTiStore(strings.Join(s.pdEndpoints, ","), s.opts.credential)
if err != nil {
return errors.Trace(err)
}
defer func() {
err := kvStore.Close()
if err != nil {
log.Warn("kv store close failed", zap.Error(err))
}
}()
ctx = util.PutKVStorageInCtx(ctx, kvStore)
// When a capture suicided, restart it
for {
if err := s.run(ctx); cerror.ErrCaptureSuicide.NotEqual(err) {
Expand Down Expand Up @@ -306,20 +318,9 @@ func (s *Server) campaignOwnerLoop(ctx context.Context) error {
func (s *Server) run(ctx context.Context) (err error) {
ctx = util.PutCaptureAddrInCtx(ctx, s.opts.advertiseAddr)
ctx = util.PutTimezoneInCtx(ctx, s.opts.timezone)
kvStore, err := kv.CreateTiStore(strings.Join(s.pdEndpoints, ","), s.opts.credential)
if err != nil {
return errors.Trace(err)
}
defer func() {
err := kvStore.Close()
if err != nil {
log.Warn("kv store close failed", zap.Error(err))
}
}()
ctx = util.PutKVStorageInCtx(ctx, kvStore)

procOpts := &processorOpts{flushCheckpointInterval: s.opts.processorFlushInterval}
capture, err := NewCapture(ctx, s.pdEndpoints, s.opts.credential, s.opts.advertiseAddr, procOpts)
capture, err := NewCapture(ctx, s.pdEndpoints, s.pdClient, s.opts.credential, s.opts.advertiseAddr, procOpts)
if err != nil {
return err
}
Expand Down