Skip to content

Commit

Permalink
*: check cluster version (#570)
Browse files Browse the repository at this point in the history
Signed-off-by: Neil Shen <overvenus@gmail.com>
  • Loading branch information
overvenus authored May 15, 2020
1 parent 183bfc4 commit 64df3f2
Show file tree
Hide file tree
Showing 10 changed files with 244 additions and 13 deletions.
6 changes: 1 addition & 5 deletions cdc/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,9 @@ type Owner struct {
const cdcServiceSafePointID = "ticdc"

// NewOwner creates a new Owner instance
func NewOwner(sess *concurrency.Session, gcTTL int64) (*Owner, error) {
func NewOwner(pdClient pd.Client, sess *concurrency.Session, gcTTL int64) (*Owner, error) {
cli := kv.NewCDCEtcdClient(sess.Client())
endpoints := sess.Client().Endpoints()
pdClient, err := pd.NewClient(endpoints, pd.SecurityOption{})
if err != nil {
return nil, errors.Trace(err)
}

owner := &Owner{
done: make(chan struct{}),
Expand Down
4 changes: 2 additions & 2 deletions cdc/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ const (
)

var (
fNewPDCli = pd.NewClient
fNewPDCli = pd.NewClientWithContext
fNewTsRWriter = createTsRWriter
)

Expand Down Expand Up @@ -128,7 +128,7 @@ func newProcessor(
checkpointTs uint64) (*processor, error) {
etcdCli := session.Client()
endpoints := session.Client().Endpoints()
pdCli, err := fNewPDCli(endpoints, pd.SecurityOption{})
pdCli, err := fNewPDCli(ctx, endpoints, pd.SecurityOption{})
if err != nil {
return nil, errors.Annotatef(err, "create pd client failed, addr: %v", endpoints)
}
Expand Down
36 changes: 32 additions & 4 deletions cdc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/log"
pd "github.com/pingcap/pd/v4/client"
"github.com/pingcap/ticdc/pkg/util"
"go.etcd.io/etcd/mvcc"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
)

const (
Expand Down Expand Up @@ -102,6 +105,8 @@ type Server struct {
capture *Capture
owner *Owner
statusServer *http.Server
pdClient pd.Client
pdEndpoints []string
}

// NewServer creates a Server instance.
Expand All @@ -128,7 +133,31 @@ func NewServer(opt ...ServerOption) (*Server, error) {

// Run runs the server.
func (s *Server) Run(ctx context.Context) error {
err := s.startStatusHTTP()
s.pdEndpoints = strings.Split(s.opts.pdEndpoints, ",")
pdClient, err := pd.NewClientWithContext(
ctx, s.pdEndpoints, pd.SecurityOption{},
pd.WithGRPCDialOptions(
grpc.WithBlock(),
grpc.WithConnectParams(grpc.ConnectParams{
Backoff: backoff.Config{
BaseDelay: time.Second,
Multiplier: 1.1,
Jitter: 0.1,
MaxDelay: 3 * time.Second,
},
MinConnectTimeout: 3 * time.Second,
}),
))
if err != nil {
return errors.Trace(err)
}
s.pdClient = pdClient

err = util.CheckClusterVersion(ctx, s.pdClient, s.pdEndpoints[0])
if err != nil {
return err
}
err = s.startStatusHTTP()
if err != nil {
return err
}
Expand Down Expand Up @@ -157,7 +186,7 @@ func (s *Server) campaignOwnerLoop(ctx context.Context) error {
continue
}
log.Info("campaign owner successfully", zap.String("capture", s.capture.info.ID))
owner, err := NewOwner(s.capture.session, s.opts.gcTTL)
owner, err := NewOwner(s.pdClient, s.capture.session, s.opts.gcTTL)
if err != nil {
log.Warn("create new owner failed", zap.Error(err))
continue
Expand All @@ -182,8 +211,7 @@ func (s *Server) campaignOwnerLoop(ctx context.Context) error {
}

func (s *Server) run(ctx context.Context) (err error) {
capture, err := NewCapture(
ctx, strings.Split(s.opts.pdEndpoints, ","), s.opts.advertiseAddr)
capture, err := NewCapture(ctx, s.pdEndpoints, s.opts.advertiseAddr)
if err != nil {
return err
}
Expand Down
7 changes: 7 additions & 0 deletions cmd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
pd "github.com/pingcap/pd/v4/client"
"github.com/pingcap/ticdc/cdc/kv"
"github.com/pingcap/ticdc/cdc/model"
"github.com/pingcap/ticdc/pkg/util"
"github.com/spf13/cobra"
"go.etcd.io/etcd/clientv3"
"google.golang.org/grpc"
Expand Down Expand Up @@ -132,6 +133,12 @@ func newCliCommand() *cobra.Command {
if err != nil {
return errors.Annotate(err, "fail to open PD client")
}
ctx, cancel := contextTimeout()
defer cancel()
err = util.CheckClusterVersion(ctx, pdCli, cliPdAddr)
if err != nil {
return err
}

return nil
},
Expand Down
2 changes: 1 addition & 1 deletion cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func init() {
rootCmd.AddCommand(serverCmd)

serverCmd.Flags().StringVar(&serverPdAddr, "pd", "http://127.0.0.1:2379", "Set the PD endpoints to use. Use `,` to separate multiple PDs")
serverCmd.Flags().StringVar(&address, "addr", "0.0.0.0:8300", "Set the listening address")
serverCmd.Flags().StringVar(&address, "addr", "127.0.0.1:8300", "Set the listening address")
serverCmd.Flags().StringVar(&advertiseAddr, "advertise-addr", "", "Set the advertise listening address for client communication")
serverCmd.Flags().StringVar(&timezone, "tz", "System", "Specify time zone of TiCDC cluster")
serverCmd.Flags().Int64Var(&gcTTL, "gc-ttl", cdc.DefaultCDCGCSafePointTTL, "CDC GC safepoint TTL duration, specified in seconds")
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
github.com/biogo/store v0.0.0-20190426020002-884f370e325d
github.com/cenkalti/backoff v2.2.1+incompatible
github.com/chzyer/readline v0.0.0-20171208011716-f6d7a1f6fbf3
github.com/coreos/go-semver v0.2.0
github.com/davecgh/go-spew v1.1.1
github.com/edwingeng/deque v0.0.0-20191220032131-8596380dee17
github.com/go-sql-driver/mysql v1.4.1
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,7 @@ github.com/remyoudompheng/bigfft v0.0.0-20190728182440-6a916e37a237 h1:HQagqIiBm
github.com/remyoudompheng/bigfft v0.0.0-20190728182440-6a916e37a237/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/rogpeppe/go-internal v1.3.0 h1:RR9dF3JtopPvtkroDZuVD7qquD0bnHlKSqaQhgwt8yk=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rs/cors v1.7.0/go.mod h1:gFx+x8UowdsKA9AchylcLynDq+nNFfI8FkUZdN/jGCU=
github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g=
Expand Down
94 changes: 94 additions & 0 deletions pkg/util/check.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Copyright 2020 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package util

import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"strings"

"github.com/coreos/go-semver/semver"
"github.com/pingcap/errors"
pd "github.com/pingcap/pd/v4/client"
)

var minPDVersion *semver.Version = semver.New("4.0.0-rc.1")
var minTiKVVersion *semver.Version = semver.New("4.0.0-rc.1")

func removeV(v string) string {
if v == "" {
return v
}
return strings.TrimPrefix(v, "v")
}

// CheckClusterVersion check TiKV and PD version.
func CheckClusterVersion(ctx context.Context, client pd.Client, pdHTTP string) error {
stores, err := client.GetAllStores(ctx, pd.WithExcludeTombstone())
if err != nil {
return err
}
for _, s := range stores {
ver, err := semver.NewVersion(removeV(s.Version))
if err != nil {
return err
}
ord := ver.Compare(*minTiKVVersion)
if ord < 0 {
return errors.NotSupportedf("TiKV %s is not supported, require minimal version %s",
removeV(s.Version), minTiKVVersion)
}
}
// See more: https://github.com/pingcap/pd/blob/v4.0.0-rc.1/server/api/version.go
pdVer := struct {
Version string `json:"version"`
}{}
req, err := http.NewRequestWithContext(
ctx, http.MethodGet, fmt.Sprintf("%s/pd/api/v1/version", pdHTTP), nil)
if err != nil {
return errors.Annotate(err, "fail to request PD")
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
return errors.Annotate(err, "fail to request PD")
}
if resp.StatusCode < 200 && resp.StatusCode >= 300 {
return errors.BadRequestf("fail to requet PD %s", resp.Status)
}
content, err := ioutil.ReadAll(resp.Body)
if err != nil {
return errors.Annotate(err, "fail to request PD")
}
err = json.Unmarshal(content, &pdVer)
if err != nil {
return errors.Annotate(err, "fail to request PD")
}
err = resp.Body.Close()
if err != nil {
return errors.Annotate(err, "fail to request PD")
}
ver, err := semver.NewVersion(removeV(pdVer.Version))
if err != nil {
return err
}
ord := ver.Compare(*minPDVersion)
if ord < 0 {
return errors.NotSupportedf("PD %s is not supported, require minimal version %s",
removeV(pdVer.Version), minPDVersion)
}
return nil
}
103 changes: 103 additions & 0 deletions pkg/util/check_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package util

import (
"context"
"fmt"
"net/http"
"net/url"
"time"

"github.com/coreos/go-semver/semver"
"github.com/pingcap/check"
"github.com/pingcap/kvproto/pkg/metapb"
pd "github.com/pingcap/pd/v4/client"
"github.com/pingcap/pd/v4/pkg/tempurl"
)

type checkSuite struct{}

var _ = check.Suite(&checkSuite{})

type mockPDClient struct {
pd.Client
getAllStores func() []*metapb.Store
getVersion func() string
}

func (m *mockPDClient) GetAllStores(ctx context.Context, opts ...pd.GetStoreOption) ([]*metapb.Store, error) {
if m.getAllStores != nil {
return m.getAllStores(), nil
}
return []*metapb.Store{}, nil
}

func (m *mockPDClient) ServeHTTP(resp http.ResponseWriter, _ *http.Request) {
if m.getVersion != nil {
_, _ = resp.Write([]byte(fmt.Sprintf(`{"version":"%s"}`, m.getVersion())))
}
}

func (s *checkSuite) TestCheckClusterVersion(c *check.C) {
mock := mockPDClient{
Client: nil,
}
pdURL, _ := url.Parse(tempurl.Alloc())
pdHTTP := fmt.Sprintf("http://%s", pdURL.Host)
svr := http.Server{Addr: pdURL.Host, Handler: &mock}
go func() {
c.Assert(svr.ListenAndServe(), check.IsNil)
}()
defer svr.Close()
for i := 0; i < 20; i++ {
time.Sleep(100 * time.Millisecond)
_, err := http.Get(pdHTTP)
if err == nil {
break
}
c.Error(err)
if i == 199 {
c.Fatal("http server timeout", err)
}
}

{
mock.getVersion = func() string {
return minPDVersion.String()
}
mock.getAllStores = func() []*metapb.Store {
return []*metapb.Store{{Version: minTiKVVersion.String()}}
}
err := CheckClusterVersion(context.Background(), &mock, pdHTTP)
c.Assert(err, check.IsNil)
}

{
mock.getVersion = func() string {
return `v1.0.0-alpha-271-g824ae7fd`
}
mock.getAllStores = func() []*metapb.Store {
return []*metapb.Store{{Version: minTiKVVersion.String()}}
}
err := CheckClusterVersion(context.Background(), &mock, pdHTTP)
c.Assert(err, check.ErrorMatches, "PD .* is not supported.*")
}

{
mock.getVersion = func() string {
return minPDVersion.String()
}
mock.getAllStores = func() []*metapb.Store {
// TiKV does not include 'v'.
return []*metapb.Store{{Version: `1.0.0-alpha-271-g824ae7fd`}}
}
err := CheckClusterVersion(context.Background(), &mock, pdHTTP)
c.Assert(err, check.ErrorMatches, "TiKV .* is not supported.*")
}
}

func (s *checkSuite) TestCompareVersion(c *check.C) {
c.Assert(semver.New("4.0.0-rc").Compare(*semver.New("4.0.0-rc.2")), check.Equals, -1)
c.Assert(semver.New("4.0.0-rc.1").Compare(*semver.New("4.0.0-rc.2")), check.Equals, -1)
// BUG it should be "<" instead of ">".
// c.Assert(semver.New("4.0.0-rc-35-g31dae220").Compare(*semver.New("4.0.0-rc.2")), check.Equals, -1)
}
3 changes: 2 additions & 1 deletion tests/resolve_lock/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ func addLock(cfg *util.Config) error {
return errors.Trace(err)
}

pdcli, err := pd.NewClient(strings.Split(cfg.PDAddr, ","), pd.SecurityOption{})
pdcli, err := pd.NewClientWithContext(
ctx, strings.Split(cfg.PDAddr, ","), pd.SecurityOption{})
if err != nil {
return errors.Trace(err)
}
Expand Down

0 comments on commit 64df3f2

Please sign in to comment.