Skip to content

Commit

Permalink
Merge branch 'release-4.0' into cherry-pick-1969-to-release-4.0
Browse files Browse the repository at this point in the history
  • Loading branch information
amyangfei authored Jul 6, 2021
2 parents f379a3a + f6c3a6a commit 300af32
Show file tree
Hide file tree
Showing 91 changed files with 7,777 additions and 2,337 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ check: check-copyright fmt lint check-static tidy errdoc check-leaktest-added

coverage:
GO111MODULE=off go get github.com/wadey/gocovmerge
gocovmerge "$(TEST_DIR)"/cov.* | grep -vE ".*.pb.go|$(CDC_PKG)/testing_utils/.*|$(CDC_PKG)/cdc/kv/testing.go|$(CDC_PKG)/cdc/sink/simple_mysql_tester.go|.*.__failpoint_binding__.go" > "$(TEST_DIR)/all_cov.out"
gocovmerge "$(TEST_DIR)"/cov.* | grep -vE ".*.pb.go|$(CDC_PKG)/testing_utils/.*|$(CDC_PKG)/cdc/kv/testing.go|$(CDC_PKG)/cdc/entry/schema_test_helper.go|$(CDC_PKG)/cdc/sink/simple_mysql_tester.go|.*.__failpoint_binding__.go" > "$(TEST_DIR)/all_cov.out"
grep -vE ".*.pb.go|$(CDC_PKG)/testing_utils/.*|$(CDC_PKG)/cdc/kv/testing.go|$(CDC_PKG)/cdc/sink/simple_mysql_tester.go|.*.__failpoint_binding__.go" "$(TEST_DIR)/cov.unit.out" > "$(TEST_DIR)/unit_cov.out"
ifeq ("$(JenkinsCI)", "1")
GO111MODULE=off go get github.com/mattn/goveralls
Expand Down
47 changes: 24 additions & 23 deletions cdc/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ import (
"sync"
"time"

"github.com/pingcap/ticdc/pkg/version"

"github.com/google/uuid"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
Expand All @@ -28,10 +26,13 @@ import (
"github.com/pingcap/ticdc/cdc/model"
"github.com/pingcap/ticdc/cdc/processor"
"github.com/pingcap/ticdc/pkg/config"
cdcContext "github.com/pingcap/ticdc/pkg/context"
cerror "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/orchestrator"
"github.com/pingcap/ticdc/pkg/security"
"github.com/pingcap/ticdc/pkg/util"
"github.com/pingcap/ticdc/pkg/version"
tidbkv "github.com/pingcap/tidb/kv"
pd "github.com/tikv/pd/client"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/clientv3/concurrency"
Expand All @@ -43,16 +44,11 @@ import (
"google.golang.org/grpc/backoff"
)

// captureOpts records options for capture
type captureOpts struct {
flushCheckpointInterval time.Duration
captureSessionTTL int
}

// Capture represents a Capture server, it monitors the changefeed information in etcd and schedules Task on it.
type Capture struct {
etcdClient kv.CDCEtcdClient
pdCli pd.Client
kvStorage tidbkv.Storage
credential *security.Credential

processorManager *processor.Manager
Expand All @@ -66,19 +62,18 @@ type Capture struct {
session *concurrency.Session
election *concurrency.Election

opts *captureOpts
closed chan struct{}
}

// NewCapture returns a new Capture instance
func NewCapture(
ctx context.Context,
stdCtx context.Context,
pdEndpoints []string,
pdCli pd.Client,
credential *security.Credential,
advertiseAddr string,
opts *captureOpts,
kvStorage tidbkv.Storage,
) (c *Capture, err error) {
conf := config.GetGlobalServerConfig()
credential := conf.Security
tlsConfig, err := credential.ToTLSConfig()
if err != nil {
return nil, errors.Trace(err)
Expand All @@ -92,7 +87,7 @@ func NewCapture(
etcdCli, err := clientv3.New(clientv3.Config{
Endpoints: pdEndpoints,
TLS: tlsConfig,
Context: ctx,
Context: stdCtx,
LogConfig: &logConfig,
DialTimeout: 5 * time.Second,
DialOptions: []grpc.DialOption{
Expand All @@ -113,20 +108,20 @@ func NewCapture(
return nil, errors.Annotate(cerror.WrapError(cerror.ErrNewCaptureFailed, err), "new etcd client")
}
sess, err := concurrency.NewSession(etcdCli,
concurrency.WithTTL(opts.captureSessionTTL))
concurrency.WithTTL(conf.CaptureSessionTTL))
if err != nil {
return nil, errors.Annotate(cerror.WrapError(cerror.ErrNewCaptureFailed, err), "create capture session")
}
elec := concurrency.NewElection(sess, kv.CaptureOwnerKey)
cli := kv.NewCDCEtcdClient(ctx, etcdCli)
cli := kv.NewCDCEtcdClient(stdCtx, etcdCli)
id := uuid.New().String()
info := &model.CaptureInfo{
ID: id,
AdvertiseAddr: advertiseAddr,
AdvertiseAddr: conf.AdvertiseAddr,
Version: version.ReleaseVersion,
}
processorManager := processor.NewManager(pdCli, credential, info)
log.Info("creating capture", zap.String("capture-id", id), util.ZapFieldCapture(ctx))
processorManager := processor.NewManager()
log.Info("creating capture", zap.String("capture-id", id), util.ZapFieldCapture(stdCtx))

c = &Capture{
processors: make(map[string]*oldProcessor),
Expand All @@ -135,8 +130,8 @@ func NewCapture(
session: sess,
election: elec,
info: info,
opts: opts,
pdCli: pdCli,
kvStorage: kvStorage,
processorManager: processorManager,
closed: make(chan struct{}),
}
Expand All @@ -150,13 +145,19 @@ func (c *Capture) Run(ctx context.Context) (err error) {
// TODO: we'd better to add some wait mechanism to ensure no routine is blocked
defer cancel()
defer close(c.closed)

ctx = cdcContext.NewContext(ctx, &cdcContext.GlobalVars{
PDClient: c.pdCli,
KVStorage: c.kvStorage,
CaptureInfo: c.info,
})
err = c.register(ctx)
if err != nil {
return errors.Trace(err)
}
if config.NewReplicaImpl {
sessionCli := c.session.Client()
etcdWorker, err := orchestrator.NewEtcdWorker(kv.NewCDCEtcdClient(ctx, sessionCli).Client, kv.EtcdKeyBase, c.processorManager, processor.NewGlobalState(c.info.ID))
etcdWorker, err := orchestrator.NewEtcdWorker(kv.NewCDCEtcdClient(ctx, sessionCli).Client, kv.EtcdKeyBase, c.processorManager, model.NewGlobalState())
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -306,9 +307,9 @@ func (c *Capture) assignTask(ctx context.Context, task *Task) (*oldProcessor, er
log.Info("run processor",
zap.String("capture-id", c.info.ID), util.ZapFieldCapture(ctx),
zap.String("changefeed", task.ChangeFeedID))

conf := config.GetGlobalServerConfig()
p, err := runProcessorImpl(
ctx, c.pdCli, c.credential, c.session, *cf, task.ChangeFeedID, *c.info, task.CheckpointTS, c.opts.flushCheckpointInterval)
ctx, c.pdCli, c.credential, c.session, *cf, task.ChangeFeedID, *c.info, task.CheckpointTS, time.Duration(conf.ProcessorFlushInterval))
if err != nil {
log.Error("run processor failed",
zap.String("changefeed", task.ChangeFeedID),
Expand Down
Loading

0 comments on commit 300af32

Please sign in to comment.