diff --git a/cdc/owner.go b/cdc/owner.go index 47c9da69121..300859d9a67 100644 --- a/cdc/owner.go +++ b/cdc/owner.go @@ -67,13 +67,9 @@ type Owner struct { const cdcServiceSafePointID = "ticdc" // NewOwner creates a new Owner instance -func NewOwner(sess *concurrency.Session, gcTTL int64) (*Owner, error) { +func NewOwner(pdClient pd.Client, sess *concurrency.Session, gcTTL int64) (*Owner, error) { cli := kv.NewCDCEtcdClient(sess.Client()) endpoints := sess.Client().Endpoints() - pdClient, err := pd.NewClient(endpoints, pd.SecurityOption{}) - if err != nil { - return nil, errors.Trace(err) - } owner := &Owner{ done: make(chan struct{}), diff --git a/cdc/processor.go b/cdc/processor.go index 21efc7a7bd9..59337dff7c9 100644 --- a/cdc/processor.go +++ b/cdc/processor.go @@ -60,7 +60,7 @@ const ( ) var ( - fNewPDCli = pd.NewClient + fNewPDCli = pd.NewClientWithContext fNewTsRWriter = createTsRWriter ) @@ -128,7 +128,7 @@ func newProcessor( checkpointTs uint64) (*processor, error) { etcdCli := session.Client() endpoints := session.Client().Endpoints() - pdCli, err := fNewPDCli(endpoints, pd.SecurityOption{}) + pdCli, err := fNewPDCli(ctx, endpoints, pd.SecurityOption{}) if err != nil { return nil, errors.Annotatef(err, "create pd client failed, addr: %v", endpoints) } diff --git a/cdc/server.go b/cdc/server.go index 93e1b145085..d456b42f543 100644 --- a/cdc/server.go +++ b/cdc/server.go @@ -21,10 +21,13 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/log" + pd "github.com/pingcap/pd/v4/client" "github.com/pingcap/ticdc/pkg/util" "go.etcd.io/etcd/mvcc" "go.uber.org/zap" "golang.org/x/sync/errgroup" + "google.golang.org/grpc" + "google.golang.org/grpc/backoff" ) const ( @@ -102,6 +105,8 @@ type Server struct { capture *Capture owner *Owner statusServer *http.Server + pdClient pd.Client + pdEndpoints []string } // NewServer creates a Server instance. @@ -128,7 +133,31 @@ func NewServer(opt ...ServerOption) (*Server, error) { // Run runs the server. func (s *Server) Run(ctx context.Context) error { - err := s.startStatusHTTP() + s.pdEndpoints = strings.Split(s.opts.pdEndpoints, ",") + pdClient, err := pd.NewClientWithContext( + ctx, s.pdEndpoints, pd.SecurityOption{}, + pd.WithGRPCDialOptions( + grpc.WithBlock(), + grpc.WithConnectParams(grpc.ConnectParams{ + Backoff: backoff.Config{ + BaseDelay: time.Second, + Multiplier: 1.1, + Jitter: 0.1, + MaxDelay: 3 * time.Second, + }, + MinConnectTimeout: 3 * time.Second, + }), + )) + if err != nil { + return errors.Trace(err) + } + s.pdClient = pdClient + + err = util.CheckClusterVersion(ctx, s.pdClient, s.pdEndpoints[0]) + if err != nil { + return err + } + err = s.startStatusHTTP() if err != nil { return err } @@ -157,7 +186,7 @@ func (s *Server) campaignOwnerLoop(ctx context.Context) error { continue } log.Info("campaign owner successfully", zap.String("capture", s.capture.info.ID)) - owner, err := NewOwner(s.capture.session, s.opts.gcTTL) + owner, err := NewOwner(s.pdClient, s.capture.session, s.opts.gcTTL) if err != nil { log.Warn("create new owner failed", zap.Error(err)) continue @@ -182,8 +211,7 @@ func (s *Server) campaignOwnerLoop(ctx context.Context) error { } func (s *Server) run(ctx context.Context) (err error) { - capture, err := NewCapture( - ctx, strings.Split(s.opts.pdEndpoints, ","), s.opts.advertiseAddr) + capture, err := NewCapture(ctx, s.pdEndpoints, s.opts.advertiseAddr) if err != nil { return err } diff --git a/cmd/client.go b/cmd/client.go index cdd889918a4..6bb5a9c62ac 100644 --- a/cmd/client.go +++ b/cmd/client.go @@ -13,6 +13,7 @@ import ( pd "github.com/pingcap/pd/v4/client" "github.com/pingcap/ticdc/cdc/kv" "github.com/pingcap/ticdc/cdc/model" + "github.com/pingcap/ticdc/pkg/util" "github.com/spf13/cobra" "go.etcd.io/etcd/clientv3" "google.golang.org/grpc" @@ -132,6 +133,12 @@ func newCliCommand() *cobra.Command { if err != nil { return errors.Annotate(err, "fail to open PD client") } + ctx, cancel := contextTimeout() + defer cancel() + err = util.CheckClusterVersion(ctx, pdCli, cliPdAddr) + if err != nil { + return err + } return nil }, diff --git a/cmd/server.go b/cmd/server.go index 8d0935d2a91..7f14253365e 100644 --- a/cmd/server.go +++ b/cmd/server.go @@ -33,7 +33,7 @@ func init() { rootCmd.AddCommand(serverCmd) serverCmd.Flags().StringVar(&serverPdAddr, "pd", "http://127.0.0.1:2379", "Set the PD endpoints to use. Use `,` to separate multiple PDs") - serverCmd.Flags().StringVar(&address, "addr", "0.0.0.0:8300", "Set the listening address") + serverCmd.Flags().StringVar(&address, "addr", "127.0.0.1:8300", "Set the listening address") serverCmd.Flags().StringVar(&advertiseAddr, "advertise-addr", "", "Set the advertise listening address for client communication") serverCmd.Flags().StringVar(&timezone, "tz", "System", "Specify time zone of TiCDC cluster") serverCmd.Flags().Int64Var(&gcTTL, "gc-ttl", cdc.DefaultCDCGCSafePointTTL, "CDC GC safepoint TTL duration, specified in seconds") diff --git a/go.mod b/go.mod index 3d5e415c2ed..62ed93e7164 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( github.com/biogo/store v0.0.0-20190426020002-884f370e325d github.com/cenkalti/backoff v2.2.1+incompatible github.com/chzyer/readline v0.0.0-20171208011716-f6d7a1f6fbf3 + github.com/coreos/go-semver v0.2.0 github.com/davecgh/go-spew v1.1.1 github.com/edwingeng/deque v0.0.0-20191220032131-8596380dee17 github.com/go-sql-driver/mysql v1.4.1 diff --git a/go.sum b/go.sum index 899cf6916d6..055e67b8601 100644 --- a/go.sum +++ b/go.sum @@ -399,6 +399,7 @@ github.com/remyoudompheng/bigfft v0.0.0-20190728182440-6a916e37a237 h1:HQagqIiBm github.com/remyoudompheng/bigfft v0.0.0-20190728182440-6a916e37a237/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= +github.com/rogpeppe/go-internal v1.3.0 h1:RR9dF3JtopPvtkroDZuVD7qquD0bnHlKSqaQhgwt8yk= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rs/cors v1.7.0/go.mod h1:gFx+x8UowdsKA9AchylcLynDq+nNFfI8FkUZdN/jGCU= github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g= diff --git a/pkg/util/check.go b/pkg/util/check.go new file mode 100644 index 00000000000..0bfe66b1faf --- /dev/null +++ b/pkg/util/check.go @@ -0,0 +1,94 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package util + +import ( + "context" + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "strings" + + "github.com/coreos/go-semver/semver" + "github.com/pingcap/errors" + pd "github.com/pingcap/pd/v4/client" +) + +var minPDVersion *semver.Version = semver.New("4.0.0-rc.1") +var minTiKVVersion *semver.Version = semver.New("4.0.0-rc.1") + +func removeV(v string) string { + if v == "" { + return v + } + return strings.TrimPrefix(v, "v") +} + +// CheckClusterVersion check TiKV and PD version. +func CheckClusterVersion(ctx context.Context, client pd.Client, pdHTTP string) error { + stores, err := client.GetAllStores(ctx, pd.WithExcludeTombstone()) + if err != nil { + return err + } + for _, s := range stores { + ver, err := semver.NewVersion(removeV(s.Version)) + if err != nil { + return err + } + ord := ver.Compare(*minTiKVVersion) + if ord < 0 { + return errors.NotSupportedf("TiKV %s is not supported, require minimal version %s", + removeV(s.Version), minTiKVVersion) + } + } + // See more: https://github.com/pingcap/pd/blob/v4.0.0-rc.1/server/api/version.go + pdVer := struct { + Version string `json:"version"` + }{} + req, err := http.NewRequestWithContext( + ctx, http.MethodGet, fmt.Sprintf("%s/pd/api/v1/version", pdHTTP), nil) + if err != nil { + return errors.Annotate(err, "fail to request PD") + } + resp, err := http.DefaultClient.Do(req) + if err != nil { + return errors.Annotate(err, "fail to request PD") + } + if resp.StatusCode < 200 && resp.StatusCode >= 300 { + return errors.BadRequestf("fail to requet PD %s", resp.Status) + } + content, err := ioutil.ReadAll(resp.Body) + if err != nil { + return errors.Annotate(err, "fail to request PD") + } + err = json.Unmarshal(content, &pdVer) + if err != nil { + return errors.Annotate(err, "fail to request PD") + } + err = resp.Body.Close() + if err != nil { + return errors.Annotate(err, "fail to request PD") + } + ver, err := semver.NewVersion(removeV(pdVer.Version)) + if err != nil { + return err + } + ord := ver.Compare(*minPDVersion) + if ord < 0 { + return errors.NotSupportedf("PD %s is not supported, require minimal version %s", + removeV(pdVer.Version), minPDVersion) + } + return nil +} diff --git a/pkg/util/check_test.go b/pkg/util/check_test.go new file mode 100644 index 00000000000..0783383e716 --- /dev/null +++ b/pkg/util/check_test.go @@ -0,0 +1,103 @@ +package util + +import ( + "context" + "fmt" + "net/http" + "net/url" + "time" + + "github.com/coreos/go-semver/semver" + "github.com/pingcap/check" + "github.com/pingcap/kvproto/pkg/metapb" + pd "github.com/pingcap/pd/v4/client" + "github.com/pingcap/pd/v4/pkg/tempurl" +) + +type checkSuite struct{} + +var _ = check.Suite(&checkSuite{}) + +type mockPDClient struct { + pd.Client + getAllStores func() []*metapb.Store + getVersion func() string +} + +func (m *mockPDClient) GetAllStores(ctx context.Context, opts ...pd.GetStoreOption) ([]*metapb.Store, error) { + if m.getAllStores != nil { + return m.getAllStores(), nil + } + return []*metapb.Store{}, nil +} + +func (m *mockPDClient) ServeHTTP(resp http.ResponseWriter, _ *http.Request) { + if m.getVersion != nil { + _, _ = resp.Write([]byte(fmt.Sprintf(`{"version":"%s"}`, m.getVersion()))) + } +} + +func (s *checkSuite) TestCheckClusterVersion(c *check.C) { + mock := mockPDClient{ + Client: nil, + } + pdURL, _ := url.Parse(tempurl.Alloc()) + pdHTTP := fmt.Sprintf("http://%s", pdURL.Host) + svr := http.Server{Addr: pdURL.Host, Handler: &mock} + go func() { + c.Assert(svr.ListenAndServe(), check.IsNil) + }() + defer svr.Close() + for i := 0; i < 20; i++ { + time.Sleep(100 * time.Millisecond) + _, err := http.Get(pdHTTP) + if err == nil { + break + } + c.Error(err) + if i == 199 { + c.Fatal("http server timeout", err) + } + } + + { + mock.getVersion = func() string { + return minPDVersion.String() + } + mock.getAllStores = func() []*metapb.Store { + return []*metapb.Store{{Version: minTiKVVersion.String()}} + } + err := CheckClusterVersion(context.Background(), &mock, pdHTTP) + c.Assert(err, check.IsNil) + } + + { + mock.getVersion = func() string { + return `v1.0.0-alpha-271-g824ae7fd` + } + mock.getAllStores = func() []*metapb.Store { + return []*metapb.Store{{Version: minTiKVVersion.String()}} + } + err := CheckClusterVersion(context.Background(), &mock, pdHTTP) + c.Assert(err, check.ErrorMatches, "PD .* is not supported.*") + } + + { + mock.getVersion = func() string { + return minPDVersion.String() + } + mock.getAllStores = func() []*metapb.Store { + // TiKV does not include 'v'. + return []*metapb.Store{{Version: `1.0.0-alpha-271-g824ae7fd`}} + } + err := CheckClusterVersion(context.Background(), &mock, pdHTTP) + c.Assert(err, check.ErrorMatches, "TiKV .* is not supported.*") + } +} + +func (s *checkSuite) TestCompareVersion(c *check.C) { + c.Assert(semver.New("4.0.0-rc").Compare(*semver.New("4.0.0-rc.2")), check.Equals, -1) + c.Assert(semver.New("4.0.0-rc.1").Compare(*semver.New("4.0.0-rc.2")), check.Equals, -1) + // BUG it should be "<" instead of ">". + // c.Assert(semver.New("4.0.0-rc-35-g31dae220").Compare(*semver.New("4.0.0-rc.2")), check.Equals, -1) +} diff --git a/tests/resolve_lock/main.go b/tests/resolve_lock/main.go index 661a2396606..cdb4be15acb 100644 --- a/tests/resolve_lock/main.go +++ b/tests/resolve_lock/main.go @@ -98,7 +98,8 @@ func addLock(cfg *util.Config) error { return errors.Trace(err) } - pdcli, err := pd.NewClient(strings.Split(cfg.PDAddr, ","), pd.SecurityOption{}) + pdcli, err := pd.NewClientWithContext( + ctx, strings.Split(cfg.PDAddr, ","), pd.SecurityOption{}) if err != nil { return errors.Trace(err) }