|
| 1 | +// Copyright 2022 PingCAP, Inc. |
| 2 | +// |
| 3 | +// Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | +// you may not use this file except in compliance with the License. |
| 5 | +// You may obtain a copy of the License at |
| 6 | +// |
| 7 | +// http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | +// |
| 9 | +// Unless required by applicable law or agreed to in writing, software |
| 10 | +// distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | +// See the License for the specific language governing permissions and |
| 13 | +// limitations under the License. |
| 14 | + |
| 15 | +package router |
| 16 | + |
| 17 | +import ( |
| 18 | + "context" |
| 19 | + "encoding/json" |
| 20 | + "strings" |
| 21 | + "time" |
| 22 | + |
| 23 | + "github.com/pingcap/TiProxy/lib/config" |
| 24 | + "github.com/pingcap/TiProxy/lib/util/errors" |
| 25 | + "github.com/pingcap/TiProxy/pkg/manager/cert" |
| 26 | + "github.com/pingcap/tidb/domain/infosync" |
| 27 | + clientv3 "go.etcd.io/etcd/client/v3" |
| 28 | + "go.uber.org/zap" |
| 29 | + "golang.org/x/exp/slices" |
| 30 | + "google.golang.org/grpc" |
| 31 | + "google.golang.org/grpc/backoff" |
| 32 | + "google.golang.org/grpc/keepalive" |
| 33 | +) |
| 34 | + |
| 35 | +// BackendFetcher is an interface to fetch the backend list. |
| 36 | +type BackendFetcher interface { |
| 37 | + GetBackendList(context.Context) map[string]*BackendInfo |
| 38 | +} |
| 39 | + |
| 40 | +// InitEtcdClient initializes an etcd client that fetches TiDB instance topology from PD. |
| 41 | +func InitEtcdClient(logger *zap.Logger, cfg *config.Config, certMgr *cert.CertManager) (*clientv3.Client, error) { |
| 42 | + pdAddr := cfg.Proxy.PDAddrs |
| 43 | + if len(pdAddr) == 0 { |
| 44 | + // use tidb server addresses directly |
| 45 | + return nil, nil |
| 46 | + } |
| 47 | + pdEndpoints := strings.Split(pdAddr, ",") |
| 48 | + logger.Info("connect PD servers", zap.Strings("addrs", pdEndpoints)) |
| 49 | + etcdClient, err := clientv3.New(clientv3.Config{ |
| 50 | + Endpoints: pdEndpoints, |
| 51 | + TLS: certMgr.ClusterTLS(), |
| 52 | + Logger: logger.Named("etcdcli"), |
| 53 | + AutoSyncInterval: 30 * time.Second, |
| 54 | + DialTimeout: 5 * time.Second, |
| 55 | + DialOptions: []grpc.DialOption{ |
| 56 | + grpc.WithKeepaliveParams(keepalive.ClientParameters{ |
| 57 | + Time: 10 * time.Second, |
| 58 | + Timeout: 3 * time.Second, |
| 59 | + }), |
| 60 | + grpc.WithConnectParams(grpc.ConnectParams{ |
| 61 | + Backoff: backoff.Config{ |
| 62 | + BaseDelay: time.Second, |
| 63 | + Multiplier: 1.1, |
| 64 | + Jitter: 0.1, |
| 65 | + MaxDelay: 3 * time.Second, |
| 66 | + }, |
| 67 | + MinConnectTimeout: 3 * time.Second, |
| 68 | + }), |
| 69 | + }, |
| 70 | + }) |
| 71 | + return etcdClient, errors.Wrapf(err, "init etcd client failed") |
| 72 | +} |
| 73 | + |
| 74 | +type pdBackendInfo struct { |
| 75 | + // The TopologyInfo received from the /info path. |
| 76 | + *infosync.TopologyInfo |
| 77 | + // The TTL time in the topology info. |
| 78 | + ttl []byte |
| 79 | + // Last time the TTL time is refreshed. |
| 80 | + // If the TTL stays unchanged for a long time, the backend might be a tombstone. |
| 81 | + lastUpdate time.Time |
| 82 | +} |
| 83 | + |
| 84 | +// PDFetcher fetches backend list from PD. |
| 85 | +type PDFetcher struct { |
| 86 | + // All the backend info in the topology, including tombstones. |
| 87 | + backendInfo map[string]*pdBackendInfo |
| 88 | + client *clientv3.Client |
| 89 | + logger *zap.Logger |
| 90 | + config *HealthCheckConfig |
| 91 | +} |
| 92 | + |
| 93 | +func NewPDFetcher(client *clientv3.Client, logger *zap.Logger, config *HealthCheckConfig) *PDFetcher { |
| 94 | + return &PDFetcher{ |
| 95 | + backendInfo: make(map[string]*pdBackendInfo), |
| 96 | + client: client, |
| 97 | + logger: logger, |
| 98 | + config: config, |
| 99 | + } |
| 100 | +} |
| 101 | + |
| 102 | +func (pf *PDFetcher) GetBackendList(ctx context.Context) map[string]*BackendInfo { |
| 103 | + pf.fetchBackendList(ctx) |
| 104 | + backendInfo := pf.filterTombstoneBackends() |
| 105 | + return backendInfo |
| 106 | +} |
| 107 | + |
| 108 | +func (pf *PDFetcher) fetchBackendList(ctx context.Context) { |
| 109 | + // We query the etcd periodically instead of watching events from etcd because: |
| 110 | + // - When a new backend starts and writes etcd, the HTTP status port is not ready yet. |
| 111 | + // - When a backend shuts down, it doesn't delete itself from the etcd. |
| 112 | + var response *clientv3.GetResponse |
| 113 | + var err error |
| 114 | + // It's a critical problem if the proxy cannot connect to the server, so we always retry. |
| 115 | + for ctx.Err() == nil { |
| 116 | + // In case there are too many tombstone backends, the query would be slow, so no need to set a timeout here. |
| 117 | + if response, err = pf.client.Get(ctx, infosync.TopologyInformationPath, clientv3.WithPrefix()); err == nil { |
| 118 | + break |
| 119 | + } |
| 120 | + pf.logger.Error("fetch backend list failed, will retry later", zap.Error(err)) |
| 121 | + time.Sleep(pf.config.healthCheckRetryInterval) |
| 122 | + } |
| 123 | + |
| 124 | + if ctx.Err() != nil { |
| 125 | + return |
| 126 | + } |
| 127 | + |
| 128 | + allBackendInfo := make(map[string]*pdBackendInfo, len(response.Kvs)) |
| 129 | + now := time.Now() |
| 130 | + for _, kv := range response.Kvs { |
| 131 | + key := string(kv.Key) |
| 132 | + if strings.HasSuffix(key, ttlPathSuffix) { |
| 133 | + addr := key[len(infosync.TopologyInformationPath)+1 : len(key)-len(ttlPathSuffix)] |
| 134 | + info, ok := allBackendInfo[addr] |
| 135 | + if !ok { |
| 136 | + info, ok = pf.backendInfo[addr] |
| 137 | + } |
| 138 | + if ok { |
| 139 | + if slices.Compare(info.ttl, kv.Value) != 0 { |
| 140 | + // The TTL is updated this time. |
| 141 | + info.lastUpdate = now |
| 142 | + info.ttl = kv.Value |
| 143 | + } |
| 144 | + } else { |
| 145 | + // A new backend. |
| 146 | + info = &pdBackendInfo{ |
| 147 | + lastUpdate: now, |
| 148 | + ttl: kv.Value, |
| 149 | + } |
| 150 | + } |
| 151 | + allBackendInfo[addr] = info |
| 152 | + } else if strings.HasSuffix(key, infoPathSuffix) { |
| 153 | + addr := key[len(infosync.TopologyInformationPath)+1 : len(key)-len(infoPathSuffix)] |
| 154 | + // A backend may restart with a same address but a different status port in a short time, so |
| 155 | + // we still need to marshal and update the topology even if the address exists in the map. |
| 156 | + var topo *infosync.TopologyInfo |
| 157 | + if err = json.Unmarshal(kv.Value, &topo); err != nil { |
| 158 | + pf.logger.Error("unmarshal topology info failed", zap.String("key", string(kv.Key)), |
| 159 | + zap.ByteString("value", kv.Value), zap.Error(err)) |
| 160 | + continue |
| 161 | + } |
| 162 | + info, ok := allBackendInfo[addr] |
| 163 | + if !ok { |
| 164 | + info, ok = pf.backendInfo[addr] |
| 165 | + } |
| 166 | + if ok { |
| 167 | + info.TopologyInfo = topo |
| 168 | + } else { |
| 169 | + info = &pdBackendInfo{ |
| 170 | + TopologyInfo: topo, |
| 171 | + } |
| 172 | + } |
| 173 | + allBackendInfo[addr] = info |
| 174 | + } |
| 175 | + } |
| 176 | + pf.backendInfo = allBackendInfo |
| 177 | +} |
| 178 | + |
| 179 | +func (pf *PDFetcher) filterTombstoneBackends() map[string]*BackendInfo { |
| 180 | + now := time.Now() |
| 181 | + aliveBackends := make(map[string]*BackendInfo, len(pf.backendInfo)) |
| 182 | + for addr, info := range pf.backendInfo { |
| 183 | + if info.TopologyInfo == nil || info.ttl == nil { |
| 184 | + continue |
| 185 | + } |
| 186 | + // After running for a long time, there might be many tombstones because failed TiDB instances |
| 187 | + // don't delete themselves from the Etcd. Checking their health is a waste of time, leading to |
| 188 | + // longer and longer checking interval. So tombstones won't be added to aliveBackends. |
| 189 | + if info.lastUpdate.Add(pf.config.tombstoneThreshold).Before(now) { |
| 190 | + continue |
| 191 | + } |
| 192 | + aliveBackends[addr] = &BackendInfo{ |
| 193 | + IP: info.IP, |
| 194 | + StatusPort: info.StatusPort, |
| 195 | + } |
| 196 | + } |
| 197 | + return aliveBackends |
| 198 | +} |
| 199 | + |
| 200 | +// StaticFetcher uses configured static addrs. This is only used for testing. |
| 201 | +type StaticFetcher struct { |
| 202 | + backends map[string]*BackendInfo |
| 203 | +} |
| 204 | + |
| 205 | +func NewStaticFetcher(staticAddrs []string) *StaticFetcher { |
| 206 | + return &StaticFetcher{ |
| 207 | + backends: backendListToMap(staticAddrs), |
| 208 | + } |
| 209 | +} |
| 210 | + |
| 211 | +func (sf *StaticFetcher) GetBackendList(context.Context) map[string]*BackendInfo { |
| 212 | + return sf.backends |
| 213 | +} |
| 214 | + |
| 215 | +func backendListToMap(addrs []string) map[string]*BackendInfo { |
| 216 | + backends := make(map[string]*BackendInfo, len(addrs)) |
| 217 | + for _, addr := range addrs { |
| 218 | + backends[addr] = &BackendInfo{} |
| 219 | + } |
| 220 | + return backends |
| 221 | +} |
0 commit comments