Skip to content

Commit

Permalink
Merge branch 'master' into feature/bucket
Browse files Browse the repository at this point in the history
  • Loading branch information
bufferflies authored Mar 29, 2022
2 parents 7e922f9 + 86e8d08 commit 5c951c8
Show file tree
Hide file tree
Showing 19 changed files with 268 additions and 51 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ docker-image:

swagger-spec: install-tools
go mod vendor
swag init --parseVendor -generalInfo server/api/router.go --exclude vendor/github.com/pingcap/tidb-dashboard --output docs/swagger
swag init --parseVendor --generalInfo server/api/router.go --exclude vendor/github.com/pingcap/tidb-dashboard --output docs/swagger
go mod tidy
rm -rf vendor

Expand Down
3 changes: 2 additions & 1 deletion cmd/pd-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/tikv/pd/pkg/swaggerserver"
"github.com/tikv/pd/server"
"github.com/tikv/pd/server/api"
"github.com/tikv/pd/server/apiv2"
"github.com/tikv/pd/server/config"
"github.com/tikv/pd/server/join"
"go.uber.org/zap"
Expand Down Expand Up @@ -92,7 +93,7 @@ func main() {

// Creates server.
ctx, cancel := context.WithCancel(context.Background())
serviceBuilders := []server.HandlerBuilder{api.NewHandler, swaggerserver.NewHandler, autoscaling.NewHandler}
serviceBuilders := []server.HandlerBuilder{api.NewHandler, apiv2.NewV2Handler, swaggerserver.NewHandler, autoscaling.NewHandler}
serviceBuilders = append(serviceBuilders, dashboard.GetServiceBuilders()...)
svr, err := server.CreateServer(ctx, cfg, serviceBuilders...)
if err != nil {
Expand Down
20 changes: 20 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,11 @@ error = '''
etcd member list failed
'''

["PD:etcd:ErrEtcdMemberRemove"]
error = '''
etcd remove member failed
'''

["PD:etcd:ErrEtcdMoveLeader"]
error = '''
etcd move leader error
Expand Down Expand Up @@ -321,6 +326,11 @@ error = '''
failed to convert a path to absolute path
'''

["PD:gin:ErrBindJSON"]
error = '''
bind JSON error
'''

["PD:grpc:ErrCloseGRPCConn"]
error = '''
close gRPC connection failed
Expand Down Expand Up @@ -591,11 +601,21 @@ error = '''
leader is nil
'''

["PD:server:ErrServerNotStarted"]
error = '''
server not started
'''

["PD:server:ErrServiceRegistered"]
error = '''
service with path [%s] already registered
'''

["PD:strconv:ErrStrconvParseBool"]
error = '''
parse bool error
'''

["PD:strconv:ErrStrconvParseFloat"]
error = '''
parse float error
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ require (
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e
github.com/coreos/go-semver v0.3.0
github.com/docker/go-units v0.4.0
github.com/gin-gonic/gin v1.7.4
github.com/go-echarts/go-echarts v1.0.0
github.com/gogo/protobuf v1.3.1
github.com/golang/protobuf v1.3.4
Expand Down
13 changes: 2 additions & 11 deletions pkg/apiutil/serverapi/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/slice"
"github.com/tikv/pd/server"
"github.com/urfave/negroni"
"go.uber.org/zap"
Expand Down Expand Up @@ -166,26 +167,16 @@ func (p *customReverseProxies) ServeHTTP(w http.ResponseWriter, r *http.Request)

return
}

http.Error(w, errRedirectFailed, http.StatusInternalServerError)
}

func copyHeader(dst, src http.Header) {
for k, vv := range src {
values := dst[k]
for _, v := range vv {
if !contains(values, v) {
if !slice.Contains(values, v) {
dst.Add(k, v)
}
}
}
}

func contains(s []string, x string) bool {
for _, n := range s {
if x == n {
return true
}
}
return false
}
8 changes: 8 additions & 0 deletions pkg/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ var (
ErrLeaderNil = errors.Normalize("leader is nil", errors.RFCCodeText("PD:server:ErrLeaderNil"))
ErrCancelStartEtcd = errors.Normalize("etcd start canceled", errors.RFCCodeText("PD:server:ErrCancelStartEtcd"))
ErrConfigItem = errors.Normalize("cannot set invalid configuration", errors.RFCCodeText("PD:server:ErrConfiguration"))
ErrServerNotStarted = errors.Normalize("server not started", errors.RFCCodeText("PD:server:ErrServerNotStarted"))
)

// logutil errors
Expand Down Expand Up @@ -198,6 +199,7 @@ var (
ErrEtcdWatcherCancel = errors.Normalize("watcher canceled", errors.RFCCodeText("PD:etcd:ErrEtcdWatcherCancel"))
ErrCloseEtcdClient = errors.Normalize("close etcd client failed", errors.RFCCodeText("PD:etcd:ErrCloseEtcdClient"))
ErrEtcdMemberList = errors.Normalize("etcd member list failed", errors.RFCCodeText("PD:etcd:ErrEtcdMemberList"))
ErrEtcdMemberRemove = errors.Normalize("etcd remove member failed", errors.RFCCodeText("PD:etcd:ErrEtcdMemberRemove"))
)

// dashboard errors
Expand All @@ -208,6 +210,7 @@ var (

// strconv errors
var (
ErrStrconvParseBool = errors.Normalize("parse bool error", errors.RFCCodeText("PD:strconv:ErrStrconvParseBool"))
ErrStrconvParseInt = errors.Normalize("parse int error", errors.RFCCodeText("PD:strconv:ErrStrconvParseInt"))
ErrStrconvParseUint = errors.Normalize("parse uint error", errors.RFCCodeText("PD:strconv:ErrStrconvParseUint"))
ErrStrconvParseFloat = errors.Normalize("parse float error", errors.RFCCodeText("PD:strconv:ErrStrconvParseFloat"))
Expand Down Expand Up @@ -313,3 +316,8 @@ var (
ErrCryptoX509KeyPair = errors.Normalize("x509 keypair error", errors.RFCCodeText("PD:crypto:ErrCryptoX509KeyPair"))
ErrCryptoAppendCertsFromPEM = errors.Normalize("cert pool append certs error", errors.RFCCodeText("PD:crypto:ErrCryptoAppendCertsFromPEM"))
)

// gin errors
var (
ErrBindJSON = errors.Normalize("bind JSON error", errors.RFCCodeText("PD:gin:ErrBindJSON"))
)
5 changes: 4 additions & 1 deletion pkg/etcdutil/etcdutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,10 @@ func RemoveEtcdMember(client *clientv3.Client, id uint64) (*clientv3.MemberRemov
ctx, cancel := context.WithTimeout(client.Ctx(), DefaultRequestTimeout)
rmResp, err := client.MemberRemove(ctx, id)
cancel()
return rmResp, errors.WithStack(err)
if err != nil {
return rmResp, errs.ErrEtcdMemberRemove.Wrap(err).GenWithStackByCause()
}
return rmResp, nil
}

// EtcdKVGet returns the etcd GetResponse by given key or key prefix
Expand Down
21 changes: 20 additions & 1 deletion pkg/slice/slice.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@

package slice

import "reflect"
import (
"reflect"
"strings"
)

// AnyOf returns true if any element in the slice matches the predict func.
func AnyOf(s interface{}, p func(int) bool) bool {
Expand All @@ -39,3 +42,19 @@ func AllOf(s interface{}, p func(int) bool) bool {
}
return NoneOf(s, np)
}

// Contains returns true if the given slice contains the value.
func Contains(slice interface{}, value interface{}) bool {
if reflect.TypeOf(slice).Kind() == reflect.Slice || reflect.TypeOf(slice).Kind() == reflect.Array {
sliceValue := reflect.ValueOf(slice)
for i := 0; i < sliceValue.Len(); i++ {
if value == sliceValue.Index(i).Interface() {
return true
}
}
}
if reflect.TypeOf(slice).Kind() == reflect.String && reflect.TypeOf(value).Kind() == reflect.String {
return strings.Contains(slice.(string), value.(string))
}
return false
}
14 changes: 14 additions & 0 deletions pkg/slice/slice_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,17 @@ func (s *testSliceSuite) Test(c *C) {
c.Assert(slice.AllOf(t.a, even), Equals, t.allOf)
}
}

func (s *testSliceSuite) TestSliceContains(c *C) {
ss := []string{"a", "b", "c"}
c.Assert(slice.Contains(ss, "a"), IsTrue)
c.Assert(slice.Contains(ss, "d"), IsFalse)

us := []uint64{1, 2, 3}
c.Assert(slice.Contains(us, uint64(1)), IsTrue)
c.Assert(slice.Contains(us, uint64(4)), IsFalse)

is := []int64{1, 2, 3}
c.Assert(slice.Contains(is, int64(1)), IsTrue)
c.Assert(slice.Contains(is, int64(4)), IsFalse)
}
22 changes: 7 additions & 15 deletions server/api/member.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/tikv/pd/pkg/apiutil"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/etcdutil"
"github.com/tikv/pd/pkg/slice"
"github.com/tikv/pd/server"
"github.com/unrolled/render"
"go.uber.org/zap"
Expand Down Expand Up @@ -71,17 +72,15 @@ func getMembers(svr *server.Server) (*pdpb.GetMembersResponse, error) {
return nil, errors.WithStack(err)
}
for _, m := range members.GetMembers() {
m.DcLocation = ""
binaryVersion, e := svr.GetMember().GetMemberBinaryVersion(m.GetMemberId())
var e error
m.BinaryVersion, e = svr.GetMember().GetMemberBinaryVersion(m.GetMemberId())
if e != nil {
log.Error("failed to load binary version", zap.Uint64("member", m.GetMemberId()), errs.ZapError(e))
}
m.BinaryVersion = binaryVersion
deployPath, e := svr.GetMember().GetMemberDeployPath(m.GetMemberId())
m.DeployPath, e = svr.GetMember().GetMemberDeployPath(m.GetMemberId())
if e != nil {
log.Error("failed to load deploy path", zap.Uint64("member", m.GetMemberId()), errs.ZapError(e))
}
m.DeployPath = deployPath
if svr.GetMember().GetEtcdLeader() == 0 {
log.Warn("no etcd leader, skip get leader priority", zap.Uint64("member", m.GetMemberId()))
continue
Expand All @@ -92,22 +91,15 @@ func getMembers(svr *server.Server) (*pdpb.GetMembersResponse, error) {
continue
}
m.LeaderPriority = int32(leaderPriority)
gitHash, e := svr.GetMember().GetMemberGitHash(m.GetMemberId())
m.GitHash, e = svr.GetMember().GetMemberGitHash(m.GetMemberId())
if e != nil {
log.Error("failed to load git hash", zap.Uint64("member", m.GetMemberId()), errs.ZapError(e))
continue
}
m.GitHash = gitHash
found := false
for dcLocation, serverIDs := range dclocationDistribution {
for _, serverID := range serverIDs {
if serverID == m.MemberId {
m.DcLocation = dcLocation
found = true
break
}
}
found := slice.Contains(serverIDs, m.MemberId)
if found {
m.DcLocation = dcLocation
break
}
}
Expand Down
37 changes: 37 additions & 0 deletions server/apiv2/middlewares/bootstrap_checker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright 2022 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package middlewares

import (
"net/http"

"github.com/gin-gonic/gin"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/server"
)

// BootstrapChecker is a middleware to check if raft cluster is started.
func BootstrapChecker() gin.HandlerFunc {
return func(c *gin.Context) {
svr := c.MustGet("server").(*server.Server)
rc := svr.GetRaftCluster()
if rc == nil {
c.AbortWithStatusJSON(http.StatusInternalServerError, errs.ErrNotBootstrapped.FastGenByArgs().Error())
return
}
c.Set("cluster", rc)
c.Next()
}
}
70 changes: 70 additions & 0 deletions server/apiv2/middlewares/redirector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright 2022 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package middlewares

import (
"net/http"
"net/url"

"github.com/gin-gonic/gin"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/apiutil/serverapi"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/server"
"go.uber.org/zap"
)

// Redirector is a middleware to redirect the request to the right place.
func Redirector() gin.HandlerFunc {
return func(c *gin.Context) {
svr := c.MustGet("server").(*server.Server)
allowFollowerHandle := len(c.Request.Header.Get(serverapi.AllowFollowerHandle)) > 0
isLeader := svr.GetMember().IsLeader()
if !svr.IsClosed() && (allowFollowerHandle || isLeader) {
c.Next()
return
}

// Prevent more than one redirection.
if name := c.Request.Header.Get(serverapi.RedirectorHeader); len(name) != 0 {
log.Error("redirect but server is not leader", zap.String("from", name), zap.String("server", svr.Name()), errs.ZapError(errs.ErrRedirect))
c.AbortWithStatusJSON(http.StatusInternalServerError, errs.ErrRedirect.FastGenByArgs().Error())
return
}

c.Request.Header.Set(serverapi.RedirectorHeader, svr.Name())

leader := svr.GetMember().GetLeader()
if leader == nil {
c.AbortWithStatusJSON(http.StatusServiceUnavailable, errs.ErrLeaderNil.FastGenByArgs().Error())
return
}
clientUrls := leader.GetClientUrls()
urls := make([]url.URL, 0, len(clientUrls))
for _, item := range clientUrls {
u, err := url.Parse(item)
if err != nil {
c.AbortWithStatusJSON(http.StatusInternalServerError, errs.ErrURLParse.Wrap(err).GenWithStackByCause().Error())
return
}

urls = append(urls, *u)
}

client := svr.GetHTTPClient()
serverapi.NewCustomReverseProxies(client, urls).ServeHTTP(c.Writer, c.Request)
c.Abort()
}
}
Loading

0 comments on commit 5c951c8

Please sign in to comment.