Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cdc rolling upgrade / scale-in ultilize two-phase-scheduling #1972

Merged
merged 64 commits into from
Jul 26, 2022
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
64 commits
Select commit Hold shift + click to select a range
099e673
add cdc api .
3AceShowHand Jun 30, 2022
9d5e395
http client add put method
3AceShowHand Jul 1, 2022
a28115d
add a lot to support 2 phase scheduling
3AceShowHand Jul 1, 2022
182b4eb
add 2 files
3AceShowHand Jul 1, 2022
d7814af
add more http method
3AceShowHand Jul 1, 2022
6c19916
add more code.
3AceShowHand Jul 2, 2022
7dfaff9
add first version prerestart for cdc
3AceShowHand Jul 5, 2022
8db8be3
add first version prerestart for cdc
3AceShowHand Jul 5, 2022
077609f
refine cdc rolling upgrade .
3AceShowHand Jul 5, 2022
527940a
add changes.
3AceShowHand Jul 6, 2022
c72c276
fix go.sum
3AceShowHand Jul 6, 2022
92df189
upgrade is ready for test.
3AceShowHand Jul 6, 2022
119a271
finish scale-in, ready for test.
3AceShowHand Jul 6, 2022
aaaf4dc
fix
3AceShowHand Jul 6, 2022
1d3875c
tiny fix.
3AceShowHand Jul 6, 2022
f14756c
update go.mod
3AceShowHand Jul 6, 2022
8e59bd2
check status code when drain the capture.
3AceShowHand Jul 6, 2022
868557f
enlarge the timeout for get all captures.
3AceShowHand Jul 6, 2022
6d22b15
Merge branch 'cdc-scale-in-drain-capture' of https://github.com/3AceS…
3AceShowHand Jul 7, 2022
d54af39
refine by the first code review.
3AceShowHand Jul 7, 2022
411ebf8
fix log.
3AceShowHand Jul 7, 2022
4fbd54e
ignore 404 when all captures closed.
3AceShowHand Jul 7, 2022
a43363d
check status code for all http request.
3AceShowHand Jul 7, 2022
535b4cd
also checks body.
3AceShowHand Jul 7, 2022
653fc4d
check status before scale-in
3AceShowHand Jul 7, 2022
5f77b6f
do not force stop cdc when restart, if not all cdc nodes selected.
3AceShowHand Jul 7, 2022
4abfe3f
fix make check
3AceShowHand Jul 7, 2022
321a0c8
fix make check
3AceShowHand Jul 7, 2022
750dc82
fix stop and api
3AceShowHand Jul 8, 2022
4f7587f
refact the drain capture.
3AceShowHand Jul 8, 2022
6c19b95
refact the drain capture.
3AceShowHand Jul 8, 2022
99f6129
refact the drain capture.
3AceShowHand Jul 8, 2022
77ad85b
remove the log
3AceShowHand Jul 8, 2022
e3b5ab4
redact the log
3AceShowHand Jul 8, 2022
7caf37f
refact drain capture.
3AceShowHand Jul 8, 2022
0427d3a
remove unncessary change, and drop changes to restart command.
3AceShowHand Jul 8, 2022
dd3c143
refact one more time.
3AceShowHand Jul 8, 2022
42497ab
upgrade check instance status.
3AceShowHand Jul 8, 2022
b5165f2
add a log for debug
3AceShowHand Jul 8, 2022
4f31e63
refine cdc api url
3AceShowHand Jul 9, 2022
569aa82
for debug
3AceShowHand Jul 9, 2022
ed6d598
pass upgrade 6.0.0 to 6.1.0
3AceShowHand Jul 9, 2022
e9a5823
stop cdc cluster can be upgrade now.
3AceShowHand Jul 9, 2022
efa1826
fix stop cluster tlscfg not nil.
3AceShowHand Jul 9, 2022
9a46feb
fix inst count, and add timing to pre-restart and post-restart
3AceShowHand Jul 9, 2022
29382b3
change log level to debug, api layer return all errors, and handle by…
3AceShowHand Jul 9, 2022
6c0eb54
fix cdc api.
3AceShowHand Jul 9, 2022
4cf51ab
tiny fix on log.
3AceShowHand Jul 9, 2022
84f8b19
change level to debug
3AceShowHand Jul 9, 2022
40aeffb
fix old version does not support api, cause capture not found.
3AceShowHand Jul 9, 2022
1ea178a
tiny fix.
3AceShowHand Jul 10, 2022
8954563
refine drain capture error handling.
3AceShowHand Jul 10, 2022
fc5160b
revert change in go.mod
3AceShowHand Jul 10, 2022
c3fc898
fix cdc api.
3AceShowHand Jul 11, 2022
a47f234
revert change
3AceShowHand Jul 11, 2022
525824d
update api-timeout flag description
3AceShowHand Jul 11, 2022
642386f
tiny fix
3AceShowHand Jul 11, 2022
59bae37
sleep 2 seconds after new owner found.
3AceShowHand Jul 11, 2022
b005eac
add log for debug
3AceShowHand Jul 12, 2022
0240507
fix some typo.
3AceShowHand Jul 20, 2022
885d39b
tiny fix.
3AceShowHand Jul 20, 2022
5533f3f
fix scale-in, pass option directly.
3AceShowHand Jul 21, 2022
f7fa27f
refine the code, can be reviewed now.
3AceShowHand Jul 21, 2022
05ccc02
Merge branch 'master' into cdc-scale-in-drain-capture
3AceShowHand Jul 21, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions components/cluster/command/scale_in.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ func newScaleInCmd() *cobra.Command {
}

clusterName := args[0]
clusterReport.ID = scrubClusterName(clusterName)
teleCommand = append(teleCommand, scrubClusterName(clusterName))
scrubbedClusterName := scrubClusterName(clusterName)
clusterReport.ID = scrubbedClusterName
teleCommand = append(teleCommand, scrubbedClusterName)

scale := func(b *task.Builder, imetadata spec.Metadata, tlsCfg *tls.Config) {
metadata := imetadata.(*spec.ClusterMeta)
Expand Down
5 changes: 3 additions & 2 deletions components/cluster/command/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ func newUpgradeCmd() *cobra.Command {
if err != nil {
return err
}
clusterReport.ID = scrubClusterName(clusterName)
teleCommand = append(teleCommand, scrubClusterName(clusterName))
scrubbedClusterName := scrubClusterName(clusterName)
clusterReport.ID = scrubbedClusterName
teleCommand = append(teleCommand, scrubbedClusterName)
teleCommand = append(teleCommand, version)

return cm.Upgrade(clusterName, version, gOpt, skipConfirm, offlineMode)
Expand Down
3 changes: 2 additions & 1 deletion components/dm/command/scale_in.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func ScaleInDMCluster(
continue
}
instCount[instance.GetHost()]--
if err := operator.StopAndDestroyInstance(ctx, topo, instance, options, instCount[instance.GetHost()] == 0); err != nil {
if err := operator.StopAndDestroyInstance(ctx, topo, instance, options, false, instCount[instance.GetHost()] == 0); err != nil {
log.Warnf("failed to stop/destroy %s: %v", component.Name(), err)
}
}
Expand Down Expand Up @@ -156,6 +156,7 @@ func ScaleInDMCluster(
[]dm.Instance{instance},
noAgentHosts,
options.OptTimeout,
false,
false, /* evictLeader */
&tls.Config{}, /* not used as evictLeader is false */
); err != nil {
Expand Down
14 changes: 3 additions & 11 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,21 @@ require (
github.com/gofrs/flock v0.8.1
github.com/gogo/protobuf v1.3.2
github.com/golang/protobuf v1.5.2
github.com/golang/snappy v0.0.4 // indirect
github.com/google/uuid v1.3.0
github.com/gorilla/mux v1.8.0
github.com/grpc-ecosystem/grpc-gateway v1.16.0
github.com/jeremywohl/flatten v1.0.1
github.com/joomcode/errorx v1.1.0
github.com/juju/ansiterm v0.0.0-20210929141451-8b71cc96ebdc
github.com/kr/text v0.2.0 // indirect
github.com/mattn/go-colorable v0.1.12 // indirect
github.com/mattn/go-runewidth v0.0.13
github.com/otiai10/copy v1.7.0
github.com/pingcap/check v0.0.0-20211026125417-57bd13f7b5f0
github.com/pingcap/errors v0.11.5-0.20201126102027-b0a155152ca3
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20220423142525-ae43b7f4e5c3
github.com/pingcap/fn v1.0.0
github.com/pingcap/kvproto v0.0.0-20220525022339-6aaebf466305
github.com/pingcap/log v1.1.0 // indirect
github.com/pingcap/tidb-insight/collector v0.0.0-20220111101533-227008e9835b
github.com/pingcap/tiflow v0.0.0-20220701111639-6d1341c93c68
3AceShowHand marked this conversation as resolved.
Show resolved Hide resolved
github.com/pkg/errors v0.9.1
github.com/prometheus/client_model v0.2.0
github.com/prometheus/common v0.34.0
Expand All @@ -54,16 +51,12 @@ require (
github.com/skratchdot/open-golang v0.0.0-20200116055534-eef842397966
github.com/spf13/cobra v1.4.0
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.7.1
github.com/syndtr/goleveldb v1.0.1-0.20190318030020-c3a204f8e965 // indirect
github.com/stretchr/testify v1.7.2-0.20220504104629-106ec21d14df
github.com/tj/go-termd v0.0.1
github.com/tklauser/go-sysconf v0.3.10 // indirect
github.com/tklauser/numcpus v0.5.0 // indirect
github.com/yusufpapurcu/wmi v1.2.2 // indirect
go.etcd.io/etcd/client/pkg/v3 v3.5.4
go.etcd.io/etcd/client/v3 v3.5.4
go.uber.org/atomic v1.9.0
go.uber.org/multierr v1.8.0 // indirect
go.uber.org/zap v1.21.0
golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4
Expand All @@ -73,7 +66,6 @@ require (
golang.org/x/term v0.0.0-20220526004731-065cf7ba2467
golang.org/x/text v0.3.7
golang.org/x/tools v0.1.11 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20220525015930-6ca3db687a9d
google.golang.org/grpc v1.46.2
gopkg.in/ini.v1 v1.66.4
Expand Down
1,192 changes: 1,177 additions & 15 deletions go.sum

Large diffs are not rendered by default.

225 changes: 225 additions & 0 deletions pkg/cluster/api/cdcapi.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package api

import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"fmt"
"net/http"
"time"

"github.com/pingcap/tiflow/cdc/model"
logprinter "github.com/pingcap/tiup/pkg/logger/printer"
"github.com/pingcap/tiup/pkg/utils"
)

// CDCOpenAPIClient is client for access TiCDC Open API
type CDCOpenAPIClient struct {
addrs []string
tlsEnabled bool
client *utils.HTTPClient
ctx context.Context
}

// NewCDCOpenAPIClient return a `CDCOpenAPIClient`
func NewCDCOpenAPIClient(ctx context.Context, addrs []string, timeout time.Duration, tlsConfig *tls.Config) *CDCOpenAPIClient {
enableTLS := false
if tlsConfig != nil {
enableTLS = true
}

return &CDCOpenAPIClient{
addrs: addrs,
tlsEnabled: enableTLS,
client: utils.NewHTTPClient(timeout, tlsConfig),
ctx: ctx,
}
}

func drainCapture(client *CDCOpenAPIClient, target string) (int, error) {
api := "/api/v1/captures/drain"
endpoints := client.getEndpoints(api)

request := model.DrainCaptureRequest{
CaptureID: target,
}
body, err := json.Marshal(request)
if err != nil {
return 0, err
}

var data []byte
_, err = tryURLs(endpoints, func(endpoint string) ([]byte, error) {
body, statusCode, err := client.client.PUT(client.ctx, endpoint, bytes.NewReader(body))
if err != nil {
if statusCode == http.StatusNotFound {
// old version cdc does not support `DrainCapture`, return nil to trigger hard restart.
3AceShowHand marked this conversation as resolved.
Show resolved Hide resolved
return data, nil
}

if bytes.Contains(body, []byte("scheduler request failed")) {
3AceShowHand marked this conversation as resolved.
Show resolved Hide resolved
client.l().Debugf("cdc drain capture failed: %s", body)
return data, nil
}
if bytes.Contains(body, []byte("capture not exists")) {
client.l().Debugf("cdc drain capture failed: %s", body)
return data, nil
}
return data, err
}
3AceShowHand marked this conversation as resolved.
Show resolved Hide resolved
return data, nil
3AceShowHand marked this conversation as resolved.
Show resolved Hide resolved
})
if err != nil {
return 0, err
}

var resp model.DrainCaptureResp
err = json.Unmarshal(data, &resp)
if err != nil {
return resp.CurrentTableCount, err
}

return resp.CurrentTableCount, nil
}

// DrainCapture request cdc owner move all tables on the target capture to other captures.
func (c *CDCOpenAPIClient) DrainCapture(target string) (result int, err error) {
err = utils.Retry(func() error {
result, err = drainCapture(c, target)
if err != nil {
return err
}
return nil
}, utils.RetryOption{
Delay: 100 * time.Millisecond,
Timeout: 20 * time.Second,
3AceShowHand marked this conversation as resolved.
Show resolved Hide resolved
})

c.l().Infof("cdc drain capture finished, target=%+v, current_table_count=%+v, err=%+v", target, result, err)
3AceShowHand marked this conversation as resolved.
Show resolved Hide resolved

return result, err
}

// ResignOwner resign the cdc owner, to make owner switch
func (c *CDCOpenAPIClient) ResignOwner() error {
3AceShowHand marked this conversation as resolved.
Show resolved Hide resolved
api := "api/v1/owner/resign"
endpoints := c.getEndpoints(api)
_, err := tryURLs(endpoints, func(endpoint string) ([]byte, error) {
body, err := c.client.Post(c.ctx, endpoint, nil)
if err != nil {
c.l().Warnf("cdc resign owner failed: %v", err)
return body, err
}
c.l().Infof("cdc resign owner successfully")
return body, nil
})

return err
}

func (c *CDCOpenAPIClient) getURL(addr string) string {
httpPrefix := "http"
if c.tlsEnabled {
httpPrefix = "https"
}
return fmt.Sprintf("%s://%s", httpPrefix, addr)
}

func (c *CDCOpenAPIClient) getEndpoints(cmd string) (endpoints []string) {
for _, addr := range c.addrs {
endpoint := fmt.Sprintf("%s/%s", c.getURL(addr), cmd)
3AceShowHand marked this conversation as resolved.
Show resolved Hide resolved
endpoints = append(endpoints, endpoint)
}

return endpoints
}

// GetAllCaptures return all captures instantaneously
func (c *CDCOpenAPIClient) GetAllCaptures() (result []*model.Capture, err error) {
err = utils.Retry(func() error {
result, err = getAllCaptures(c)
if err != nil {
return err
}
return nil
}, utils.RetryOption{
Delay: 100 * time.Millisecond,
Timeout: 30 * time.Second,
})

if err != nil {
c.l().Warnf("cdc get all captures failed: %v", err)
}

return result, err
}

// GetStatus return the status of the TiCDC server.
func (c *CDCOpenAPIClient) GetStatus() error {
api := "/api/v1/status"
endpoints := c.getEndpoints(api)

// todo: only send request to the target capture.

var response model.ServerStatus
_, err := tryURLs(endpoints, func(endpoint string) ([]byte, error) {
data, err := c.client.Get(c.ctx, endpoint)
if err != nil {
return data, err
3AceShowHand marked this conversation as resolved.
Show resolved Hide resolved
}

err = json.Unmarshal(data, &response)
if err != nil {
return data, err
}

if response.Liveness != model.LivenessCaptureAlive {
return data, fmt.Errorf("cdc is not alive")
}
return data, nil
})
if err != nil {
return err
}

return nil
}

func getAllCaptures(client *CDCOpenAPIClient) ([]*model.Capture, error) {
api := "/api/v1/captures"
endpoints := client.getEndpoints(api)

var response []*model.Capture

_, err := tryURLs(endpoints, func(endpoint string) ([]byte, error) {
data, err := client.client.Get(client.ctx, endpoint)
if err != nil {
return data, err
3AceShowHand marked this conversation as resolved.
Show resolved Hide resolved
}

return data, json.Unmarshal(data, &response)
})

if err != nil {
return nil, err
}
return response, nil
}

func (c *CDCOpenAPIClient) l() *logprinter.Logger {
return c.ctx.Value(logprinter.ContextKeyLogger).(*logprinter.Logger)
}
2 changes: 1 addition & 1 deletion pkg/cluster/api/pdapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (pc *PDClient) tryIdentifyVersion() {
}
}

// GetURL builds the the client URL of PDClient
// GetURL builds the client URL of PDClient
func (pc *PDClient) GetURL(addr string) string {
httpPrefix := "http"
if pc.tlsEnabled {
Expand Down
2 changes: 1 addition & 1 deletion pkg/cluster/manager/scale_in.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (m *Manager) ScaleIn(
name string,
skipConfirm bool,
gOpt operator.Options,
scale func(builer *task.Builder, metadata spec.Metadata, tlsCfg *tls.Config),
scale func(builder *task.Builder, metadata spec.Metadata, tlsCfg *tls.Config),
) error {
if err := clusterutil.ValidateClusterNameOrError(name); err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion pkg/cluster/manager/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (m *Manager) Upgrade(name string, clusterVersion string, opt operator.Optio
deployDir := spec.Abs(base.User, inst.DeployDir())
// data dir would be empty for components which don't need it
dataDirs := spec.MultiDirAbs(base.User, inst.DataDir())
// log dir will always be with values, but might not used by the component
// log dir will always be with values, but the component might not use it
logDir := spec.Abs(base.User, inst.LogDir())

// Deploy component
Expand Down
20 changes: 20 additions & 0 deletions pkg/cluster/operation/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ func Stop(
insts,
noAgentHosts,
options.OptTimeout,
true,
evictLeader,
tlsCfg,
)
Expand Down Expand Up @@ -569,6 +570,7 @@ func StopComponent(ctx context.Context,
instances []spec.Instance,
noAgentHosts set.StringSet,
timeout uint64,
forceStop bool,
evictLeader bool,
tlsCfg *tls.Config,
) error {
Expand All @@ -591,6 +593,24 @@ func StopComponent(ctx context.Context,
logger.Debugf("Ignored stopping %s for %s:%d", name, ins.GetHost(), ins.GetPort())
continue
}
case spec.ComponentCDC:
nctx := checkpoint.NewContext(ctx)
if !forceStop {
// when scale-in cdc node, each node should be stopped one by one.
cdc, ok := ins.(spec.RollingUpdateInstance)
if !ok {
panic("cdc should support rolling upgrade, but not")
}
err := cdc.PreRestart(nctx, topo, int(timeout), tlsCfg)
3AceShowHand marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
}
}
if err := stopInstance(nctx, ins, timeout); err != nil {
return err
}
// continue here, to skip the logic below.
continue
}

// the checkpoint part of context can't be shared between goroutines
Expand Down
Loading