Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: check cluster version #570

Merged
merged 3 commits into from
May 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions cdc/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,9 @@ type Owner struct {
const cdcServiceSafePointID = "ticdc"

// NewOwner creates a new Owner instance
func NewOwner(sess *concurrency.Session, gcTTL int64) (*Owner, error) {
func NewOwner(pdClient pd.Client, sess *concurrency.Session, gcTTL int64) (*Owner, error) {
cli := kv.NewCDCEtcdClient(sess.Client())
endpoints := sess.Client().Endpoints()
pdClient, err := pd.NewClient(endpoints, pd.SecurityOption{})
if err != nil {
return nil, errors.Trace(err)
}

owner := &Owner{
done: make(chan struct{}),
Expand Down
4 changes: 2 additions & 2 deletions cdc/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ const (
)

var (
fNewPDCli = pd.NewClient
fNewPDCli = pd.NewClientWithContext
fNewTsRWriter = createTsRWriter
)

Expand Down Expand Up @@ -128,7 +128,7 @@ func newProcessor(
checkpointTs uint64) (*processor, error) {
etcdCli := session.Client()
endpoints := session.Client().Endpoints()
pdCli, err := fNewPDCli(endpoints, pd.SecurityOption{})
pdCli, err := fNewPDCli(ctx, endpoints, pd.SecurityOption{})
if err != nil {
return nil, errors.Annotatef(err, "create pd client failed, addr: %v", endpoints)
}
Expand Down
36 changes: 32 additions & 4 deletions cdc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/log"
pd "github.com/pingcap/pd/v4/client"
"github.com/pingcap/ticdc/pkg/util"
"go.etcd.io/etcd/mvcc"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
)

const (
Expand Down Expand Up @@ -102,6 +105,8 @@ type Server struct {
capture *Capture
owner *Owner
statusServer *http.Server
pdClient pd.Client
pdEndpoints []string
}

// NewServer creates a Server instance.
Expand All @@ -128,7 +133,31 @@ func NewServer(opt ...ServerOption) (*Server, error) {

// Run runs the server.
func (s *Server) Run(ctx context.Context) error {
err := s.startStatusHTTP()
s.pdEndpoints = strings.Split(s.opts.pdEndpoints, ",")
pdClient, err := pd.NewClientWithContext(
ctx, s.pdEndpoints, pd.SecurityOption{},
pd.WithGRPCDialOptions(
grpc.WithBlock(),
grpc.WithConnectParams(grpc.ConnectParams{
Backoff: backoff.Config{
BaseDelay: time.Second,
Multiplier: 1.1,
Jitter: 0.1,
MaxDelay: 3 * time.Second,
},
MinConnectTimeout: 3 * time.Second,
}),
))
if err != nil {
return errors.Trace(err)
}
s.pdClient = pdClient

err = util.CheckClusterVersion(ctx, s.pdClient, s.pdEndpoints[0])
if err != nil {
return err
}
err = s.startStatusHTTP()
if err != nil {
return err
}
Expand Down Expand Up @@ -157,7 +186,7 @@ func (s *Server) campaignOwnerLoop(ctx context.Context) error {
continue
}
log.Info("campaign owner successfully", zap.String("capture", s.capture.info.ID))
owner, err := NewOwner(s.capture.session, s.opts.gcTTL)
owner, err := NewOwner(s.pdClient, s.capture.session, s.opts.gcTTL)
if err != nil {
log.Warn("create new owner failed", zap.Error(err))
continue
Expand All @@ -182,8 +211,7 @@ func (s *Server) campaignOwnerLoop(ctx context.Context) error {
}

func (s *Server) run(ctx context.Context) (err error) {
capture, err := NewCapture(
ctx, strings.Split(s.opts.pdEndpoints, ","), s.opts.advertiseAddr)
capture, err := NewCapture(ctx, s.pdEndpoints, s.opts.advertiseAddr)
if err != nil {
return err
}
Expand Down
7 changes: 7 additions & 0 deletions cmd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
pd "github.com/pingcap/pd/v4/client"
"github.com/pingcap/ticdc/cdc/kv"
"github.com/pingcap/ticdc/cdc/model"
"github.com/pingcap/ticdc/pkg/util"
"github.com/spf13/cobra"
"go.etcd.io/etcd/clientv3"
"google.golang.org/grpc"
Expand Down Expand Up @@ -132,6 +133,12 @@ func newCliCommand() *cobra.Command {
if err != nil {
return errors.Annotate(err, "fail to open PD client")
}
ctx, cancel := contextTimeout()
defer cancel()
err = util.CheckClusterVersion(ctx, pdCli, cliPdAddr)
if err != nil {
return err
}

return nil
},
Expand Down
2 changes: 1 addition & 1 deletion cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func init() {
rootCmd.AddCommand(serverCmd)

serverCmd.Flags().StringVar(&serverPdAddr, "pd", "http://127.0.0.1:2379", "Set the PD endpoints to use. Use `,` to separate multiple PDs")
serverCmd.Flags().StringVar(&address, "addr", "0.0.0.0:8300", "Set the listening address")
serverCmd.Flags().StringVar(&address, "addr", "127.0.0.1:8300", "Set the listening address")
serverCmd.Flags().StringVar(&advertiseAddr, "advertise-addr", "", "Set the advertise listening address for client communication")
serverCmd.Flags().StringVar(&timezone, "tz", "System", "Specify time zone of TiCDC cluster")
serverCmd.Flags().Int64Var(&gcTTL, "gc-ttl", cdc.DefaultCDCGCSafePointTTL, "CDC GC safepoint TTL duration, specified in seconds")
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
github.com/biogo/store v0.0.0-20190426020002-884f370e325d
github.com/cenkalti/backoff v2.2.1+incompatible
github.com/chzyer/readline v0.0.0-20171208011716-f6d7a1f6fbf3
github.com/coreos/go-semver v0.2.0
github.com/davecgh/go-spew v1.1.1
github.com/edwingeng/deque v0.0.0-20191220032131-8596380dee17
github.com/go-sql-driver/mysql v1.4.1
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,7 @@ github.com/remyoudompheng/bigfft v0.0.0-20190728182440-6a916e37a237 h1:HQagqIiBm
github.com/remyoudompheng/bigfft v0.0.0-20190728182440-6a916e37a237/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/rogpeppe/go-internal v1.3.0 h1:RR9dF3JtopPvtkroDZuVD7qquD0bnHlKSqaQhgwt8yk=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rs/cors v1.7.0/go.mod h1:gFx+x8UowdsKA9AchylcLynDq+nNFfI8FkUZdN/jGCU=
github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g=
Expand Down
94 changes: 94 additions & 0 deletions pkg/util/check.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Copyright 2020 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package util

import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"strings"

"github.com/coreos/go-semver/semver"
"github.com/pingcap/errors"
pd "github.com/pingcap/pd/v4/client"
)

var minPDVersion *semver.Version = semver.New("4.0.0-rc.1")
var minTiKVVersion *semver.Version = semver.New("4.0.0-rc.1")

func removeV(v string) string {
if v == "" {
return v
}
return strings.TrimPrefix(v, "v")
}

// CheckClusterVersion check TiKV and PD version.
func CheckClusterVersion(ctx context.Context, client pd.Client, pdHTTP string) error {
stores, err := client.GetAllStores(ctx, pd.WithExcludeTombstone())
if err != nil {
return err
}
for _, s := range stores {
ver, err := semver.NewVersion(removeV(s.Version))
if err != nil {
return err
}
ord := ver.Compare(*minTiKVVersion)
if ord < 0 {
return errors.NotSupportedf("TiKV %s is not supported, require minimal version %s",
removeV(s.Version), minTiKVVersion)
}
}
// See more: https://github.com/pingcap/pd/blob/v4.0.0-rc.1/server/api/version.go
pdVer := struct {
Version string `json:"version"`
}{}
req, err := http.NewRequestWithContext(
ctx, http.MethodGet, fmt.Sprintf("%s/pd/api/v1/version", pdHTTP), nil)
if err != nil {
return errors.Annotate(err, "fail to request PD")
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
return errors.Annotate(err, "fail to request PD")
}
if resp.StatusCode < 200 && resp.StatusCode >= 300 {
return errors.BadRequestf("fail to requet PD %s", resp.Status)
}
content, err := ioutil.ReadAll(resp.Body)
if err != nil {
return errors.Annotate(err, "fail to request PD")
}
err = json.Unmarshal(content, &pdVer)
if err != nil {
return errors.Annotate(err, "fail to request PD")
}
err = resp.Body.Close()
if err != nil {
return errors.Annotate(err, "fail to request PD")
}
ver, err := semver.NewVersion(removeV(pdVer.Version))
if err != nil {
return err
}
ord := ver.Compare(*minPDVersion)
if ord < 0 {
return errors.NotSupportedf("PD %s is not supported, require minimal version %s",
removeV(pdVer.Version), minPDVersion)
}
return nil
}
103 changes: 103 additions & 0 deletions pkg/util/check_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package util

import (
"context"
"fmt"
"net/http"
"net/url"
"time"

"github.com/coreos/go-semver/semver"
"github.com/pingcap/check"
"github.com/pingcap/kvproto/pkg/metapb"
pd "github.com/pingcap/pd/v4/client"
"github.com/pingcap/pd/v4/pkg/tempurl"
)

type checkSuite struct{}

var _ = check.Suite(&checkSuite{})

type mockPDClient struct {
pd.Client
getAllStores func() []*metapb.Store
getVersion func() string
}

func (m *mockPDClient) GetAllStores(ctx context.Context, opts ...pd.GetStoreOption) ([]*metapb.Store, error) {
if m.getAllStores != nil {
return m.getAllStores(), nil
}
return []*metapb.Store{}, nil
}

func (m *mockPDClient) ServeHTTP(resp http.ResponseWriter, _ *http.Request) {
if m.getVersion != nil {
_, _ = resp.Write([]byte(fmt.Sprintf(`{"version":"%s"}`, m.getVersion())))
}
}

func (s *checkSuite) TestCheckClusterVersion(c *check.C) {
mock := mockPDClient{
Client: nil,
}
pdURL, _ := url.Parse(tempurl.Alloc())
pdHTTP := fmt.Sprintf("http://%s", pdURL.Host)
svr := http.Server{Addr: pdURL.Host, Handler: &mock}
go func() {
c.Assert(svr.ListenAndServe(), check.IsNil)
}()
defer svr.Close()
for i := 0; i < 20; i++ {
time.Sleep(100 * time.Millisecond)
_, err := http.Get(pdHTTP)
if err == nil {
break
}
c.Error(err)
if i == 199 {
c.Fatal("http server timeout", err)
}
}

{
mock.getVersion = func() string {
return minPDVersion.String()
}
mock.getAllStores = func() []*metapb.Store {
return []*metapb.Store{{Version: minTiKVVersion.String()}}
}
err := CheckClusterVersion(context.Background(), &mock, pdHTTP)
c.Assert(err, check.IsNil)
}

{
mock.getVersion = func() string {
return `v1.0.0-alpha-271-g824ae7fd`
}
mock.getAllStores = func() []*metapb.Store {
return []*metapb.Store{{Version: minTiKVVersion.String()}}
}
err := CheckClusterVersion(context.Background(), &mock, pdHTTP)
c.Assert(err, check.ErrorMatches, "PD .* is not supported.*")
}

{
mock.getVersion = func() string {
return minPDVersion.String()
}
mock.getAllStores = func() []*metapb.Store {
// TiKV does not include 'v'.
return []*metapb.Store{{Version: `1.0.0-alpha-271-g824ae7fd`}}
}
err := CheckClusterVersion(context.Background(), &mock, pdHTTP)
c.Assert(err, check.ErrorMatches, "TiKV .* is not supported.*")
}
}

func (s *checkSuite) TestCompareVersion(c *check.C) {
c.Assert(semver.New("4.0.0-rc").Compare(*semver.New("4.0.0-rc.2")), check.Equals, -1)
c.Assert(semver.New("4.0.0-rc.1").Compare(*semver.New("4.0.0-rc.2")), check.Equals, -1)
// BUG it should be "<" instead of ">".
// c.Assert(semver.New("4.0.0-rc-35-g31dae220").Compare(*semver.New("4.0.0-rc.2")), check.Equals, -1)
}
3 changes: 2 additions & 1 deletion tests/resolve_lock/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ func addLock(cfg *util.Config) error {
return errors.Trace(err)
}

pdcli, err := pd.NewClient(strings.Split(cfg.PDAddr, ","), pd.SecurityOption{})
pdcli, err := pd.NewClientWithContext(
ctx, strings.Split(cfg.PDAddr, ","), pd.SecurityOption{})
if err != nil {
return errors.Trace(err)
}
Expand Down