Skip to content

Commit

Permalink
This is an automated cherry-pick of #9661
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
asddongmen authored and ti-chi-bot committed Sep 8, 2023
1 parent 2ab025e commit d824aa0
Show file tree
Hide file tree
Showing 11 changed files with 489 additions and 78 deletions.
14 changes: 13 additions & 1 deletion cdc/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/pingcap/tiflow/pkg/upstream"
"github.com/pingcap/tiflow/pkg/util"
"github.com/pingcap/tiflow/pkg/version"
pd "github.com/tikv/pd/client"
"go.etcd.io/etcd/client/v3/concurrency"
"go.etcd.io/etcd/server/v3/mvcc"
"go.uber.org/zap"
Expand Down Expand Up @@ -81,6 +82,7 @@ type captureImpl struct {
liveness model.Liveness
config *config.ServerConfig

pdClient pd.Client
pdEndpoints []string
ownerMu sync.Mutex
owner owner.Owner
Expand Down Expand Up @@ -132,7 +134,11 @@ func NewCapture(pdEndpoints []string,
grpcService *p2p.ServerWrapper,
tableActorSystem *system.System,
sortEngineMangerFactory *factory.SortEngineFactory,
<<<<<<< HEAD

Check failure on line 137 in cdc/capture/capture.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

syntax error: unexpected <<, expecting )
sorterSystem *ssystem.System,
=======

Check failure on line 139 in cdc/capture/capture.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

syntax error: unexpected ==, expecting )
pdClient pd.Client,
>>>>>>> 445268837e (etcd, pd (ticdc): refine pdClient and etcdClient initialization (#9661))

Check failure on line 141 in cdc/capture/capture.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

syntax error: unexpected >>, expecting )

Check failure on line 141 in cdc/capture/capture.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

exponent has no digits

Check failure on line 141 in cdc/capture/capture.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

invalid character U+0023 '#'
) Capture {
conf := config.GetGlobalServerConfig()
return &captureImpl{
Expand All @@ -146,12 +152,18 @@ func NewCapture(pdEndpoints []string,
newProcessorManager: processor.NewManager,
newOwner: owner.NewOwner,
info: &model.CaptureInfo{},
<<<<<<< HEAD

useSortEngine: sortEngineMangerFactory != nil,
sortEngineFactory: sortEngineMangerFactory,
sorterSystem: sorterSystem,

migrator: migrate.NewMigrator(etcdClient, pdEndpoints, conf),
=======
sortEngineFactory: sortEngineMangerFactory,
migrator: migrate.NewMigrator(etcdClient, pdEndpoints, conf),
pdClient: pdClient,
>>>>>>> 445268837e (etcd, pd (ticdc): refine pdClient and etcdClient initialization (#9661))

Check failure on line 166 in cdc/capture/capture.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

exponent has no digits

Check failure on line 166 in cdc/capture/capture.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

invalid character U+0023 '#'
}
}

Expand Down Expand Up @@ -217,7 +229,7 @@ func (c *captureImpl) reset(ctx context.Context) error {
c.upstreamManager.Close()
}
c.upstreamManager = upstream.NewManager(ctx, c.EtcdClient.GetGCServiceID())
_, err = c.upstreamManager.AddDefaultUpstream(c.pdEndpoints, c.config.Security)
_, err = c.upstreamManager.AddDefaultUpstream(c.pdEndpoints, c.config.Security, c.pdClient)
if err != nil {
return errors.Trace(err)
}
Expand Down
110 changes: 63 additions & 47 deletions cdc/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,11 @@ import (
"github.com/pingcap/tiflow/pkg/util"
p2pProto "github.com/pingcap/tiflow/proto/p2p"
pd "github.com/tikv/pd/client"
"go.etcd.io/etcd/client/pkg/v3/logutil"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"golang.org/x/net/netutil"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/keepalive"
)

const (
Expand Down Expand Up @@ -87,6 +83,7 @@ type server struct {
grpcService *p2p.ServerWrapper
statusServer *http.Server
etcdClient etcd.CDCEtcdClient
<<<<<<< HEAD
pdEndpoints []string

tableActorSystem *system.System
Expand All @@ -95,6 +92,14 @@ type server struct {
useEventSortEngine bool
sortEngineFactory *factory.SortEngineFactory
sorterSystem *ssystem.System
=======
// pdClient is the default upstream PD client.
// The PD acts as a metadata management service for TiCDC.
pdClient pd.Client
pdAPIClient pdutil.PDAPIClient
pdEndpoints []string
sortEngineFactory *factory.SortEngineFactory
>>>>>>> 445268837e (etcd, pd (ticdc): refine pdClient and etcdClient initialization (#9661))
}

// New creates a server instance.
Expand Down Expand Up @@ -140,35 +145,21 @@ func New(pdEndpoints []string) (*server, error) {
func (s *server) prepare(ctx context.Context) error {
conf := config.GetGlobalServerConfig()

grpcTLSOption, err := conf.Security.ToGRPCDialOption()
tlsConfig, err := conf.Security.ToTLSConfig()
if err != nil {
return errors.Trace(err)
}

tlsConfig, err := conf.Security.ToTLSConfig()
grpcTLSOption, err := conf.Security.ToGRPCDialOption()
if err != nil {
return errors.Trace(err)
}

logConfig := logutil.DefaultZapLoggerConfig
logConfig.Level = zap.NewAtomicLevelAt(zapcore.ErrorLevel)

log.Info("create etcdCli", zap.Strings("endpoints", s.pdEndpoints))
// we do not pass a `context` to the etcd client,
// to prevent it's cancelled when the server is closing.
// For example, when the non-owner node goes offline,
// it would resign the campaign key which was put by call `campaign`,
// if this is not done due to the passed context cancelled,
// the key will be kept for the lease TTL, which is 10 seconds,
// then cause the new owner cannot be elected immediately after the old owner offline.
// see https://github.com/etcd-io/etcd/blob/525d53bd41/client/v3/concurrency/election.go#L98
etcdCli, err := clientv3.New(clientv3.Config{
Endpoints: s.pdEndpoints,
TLS: tlsConfig,
LogConfig: &logConfig,
DialTimeout: 5 * time.Second,
AutoSyncInterval: 30 * time.Second,
DialOptions: []grpc.DialOption{
log.Info("create pd client", zap.Strings("endpoints", s.pdEndpoints))
s.pdClient, err = pd.NewClientWithContext(
ctx, s.pdEndpoints, conf.Security.PDSecurityOption(),
// the default `timeout` is 3s, maybe too small if the pd is busy,
// set to 10s to avoid frequent timeout.
pd.WithCustomTimeoutOption(10*time.Second),
pd.WithGRPCDialOptions(
grpcTLSOption,
grpc.WithBlock(),
grpc.WithConnectParams(grpc.ConnectParams{
Expand All @@ -180,12 +171,24 @@ func (s *server) prepare(ctx context.Context) error {
},
MinConnectTimeout: 3 * time.Second,
}),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 10 * time.Second,
Timeout: 20 * time.Second,
}),
},
})
))
if err != nil {
return errors.Trace(err)
}
s.pdAPIClient, err = pdutil.NewPDAPIClient(s.pdClient, conf.Security)
if err != nil {
return errors.Trace(err)
}
log.Info("create etcdCli", zap.Strings("endpoints", s.pdEndpoints))
// we do not pass a `context` to create a the etcd client,
// to prevent it's cancelled when the server is closing.
// For example, when the non-owner node goes offline,
// it would resign the campaign key which was put by call `campaign`,
// if this is not done due to the passed context cancelled,
// the key will be kept for the lease TTL, which is 10 seconds,
// then cause the new owner cannot be elected immediately after the old owner offline.
// see https://github.com/etcd-io/etcd/blob/525d53bd41/client/v3/concurrency/election.go#L98
etcdCli, err := etcd.CreateRawEtcdClient(tlsConfig, grpcTLSOption, s.pdEndpoints...)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -196,6 +199,15 @@ func (s *server) prepare(ctx context.Context) error {
}
s.etcdClient = cdcEtcdClient

// Collect all endpoints from pd here to make the server more robust.
// Because in some scenarios, the deployer may only provide one pd endpoint,
// this will cause the TiCDC server to fail to restart when some pd node is down.
allPDEndpoints, err := s.pdAPIClient.CollectMemberEndpoints(ctx)
if err != nil {
return errors.Trace(err)
}
s.pdEndpoints = append(s.pdEndpoints, allPDEndpoints...)

err = s.initDir(ctx)
if err != nil {
return errors.Trace(err)
Expand All @@ -209,10 +221,15 @@ func (s *server) prepare(ctx context.Context) error {
return errors.Trace(err)
}

<<<<<<< HEAD
s.capture = capture.NewCapture(
s.pdEndpoints, cdcEtcdClient, s.grpcService,
s.tableActorSystem, s.sortEngineFactory, s.sorterSystem)

=======
s.capture = capture.NewCapture(s.pdEndpoints, cdcEtcdClient,
s.grpcService, s.sortEngineFactory, s.pdClient)
>>>>>>> 445268837e (etcd, pd (ticdc): refine pdClient and etcdClient initialization (#9661))
return nil
}

Expand Down Expand Up @@ -340,18 +357,7 @@ func (s *server) startStatusHTTP(serverCtx context.Context, lis net.Listener) er
return nil
}

func (s *server) etcdHealthChecker(ctx context.Context) error {
conf := config.GetGlobalServerConfig()
grpcClient, err := pd.NewClientWithContext(ctx, s.pdEndpoints, conf.Security.PDSecurityOption())
if err != nil {
return errors.Trace(err)
}
pc, err := pdutil.NewPDAPIClient(grpcClient, conf.Security)
if err != nil {
return errors.Trace(err)
}
defer pc.Close()

func (s *server) upstreamPDHealthChecker(ctx context.Context) error {
ticker := time.NewTicker(time.Second * 3)
defer ticker.Stop()

Expand All @@ -360,15 +366,15 @@ func (s *server) etcdHealthChecker(ctx context.Context) error {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
endpoints, err := pc.CollectMemberEndpoints(ctx)
endpoints, err := s.pdAPIClient.CollectMemberEndpoints(ctx)
if err != nil {
log.Warn("etcd health check: cannot collect all members", zap.Error(err))
continue
}
for _, endpoint := range endpoints {
start := time.Now()
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
if err := pc.Healthy(ctx, endpoint); err != nil {
if err := s.pdAPIClient.Healthy(ctx, endpoint); err != nil {
log.Warn("etcd health check error",
zap.String("endpoint", endpoint), zap.Error(err))
}
Expand All @@ -389,15 +395,21 @@ func (s *server) etcdHealthChecker(ctx context.Context) error {
func (s *server) run(ctx context.Context) (err error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
defer s.pdAPIClient.Close()

wg, cctx := errgroup.WithContext(ctx)

wg.Go(func() error {
return s.capture.Run(cctx)
})

<<<<<<< HEAD
wg.Go(func() error {
return s.etcdHealthChecker(cctx)
=======
eg.Go(func() error {
return s.upstreamPDHealthChecker(egCtx)
>>>>>>> 445268837e (etcd, pd (ticdc): refine pdClient and etcdClient initialization (#9661))
})

wg.Go(func() error {
Expand Down Expand Up @@ -460,6 +472,10 @@ func (s *server) Close() {
}
s.tcpServer = nil
}

if s.pdClient != nil {
s.pdClient.Close()
}
}

func (s *server) stopActorSystems() {
Expand Down
Loading

0 comments on commit d824aa0

Please sign in to comment.