Skip to content

Commit

Permalink
server: store the updated labels into etcd (#51451)
Browse files Browse the repository at this point in the history
close #51427
  • Loading branch information
you06 authored and pull[bot] committed May 8, 2024
1 parent 4d39f82 commit 1308099
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 1 deletion.
33 changes: 33 additions & 0 deletions pkg/domain/infosync/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,39 @@ func GetAllServerInfo(ctx context.Context) (map[string]*ServerInfo, error) {
return is.getAllServerInfo(ctx)
}

// UpdateServerLabel updates the server label for global info syncer.
func UpdateServerLabel(ctx context.Context, labels map[string]string) error {
is, err := getGlobalInfoSyncer()
if err != nil {
return err
}
// when etcdCli is nil, the server infos are generated from the latest config, no need to update.
if is.etcdCli == nil {
return nil
}
selfInfo, err := is.getServerInfoByID(ctx, is.info.ID)
if err != nil {
return err
}
changed := false
for k, v := range labels {
if selfInfo.Labels[k] != v {
changed = true
selfInfo.Labels[k] = v
}
}
if !changed {
return nil
}
infoBuf, err := selfInfo.Marshal()
if err != nil {
return errors.Trace(err)
}
str := string(hack.String(infoBuf))
err = util.PutKVToEtcd(ctx, is.etcdCli, keyOpDefaultRetryCnt, is.serverInfoPath, str, clientv3.WithLease(is.session.Lease()))
return err
}

// DeleteTiFlashTableSyncProgress is used to delete the tiflash table replica sync progress.
func DeleteTiFlashTableSyncProgress(tableInfo *model.TableInfo) error {
is, err := getGlobalInfoSyncer()
Expand Down
3 changes: 2 additions & 1 deletion pkg/server/handler/tests/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ go_test(
"main_test.go",
],
flaky = True,
shard_count = 36,
shard_count = 37,
deps = [
"//pkg/config",
"//pkg/ddl",
Expand Down Expand Up @@ -54,6 +54,7 @@ go_test(
"@com_github_pingcap_log//:log",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//tikv",
"@io_etcd_go_etcd_tests_v3//integration",
"@org_uber_go_goleak//:goleak",
"@org_uber_go_zap//:zap",
],
Expand Down
60 changes: 60 additions & 0 deletions pkg/server/handler/tests/http_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package tests

import (
"bytes"
"context"
"crypto/tls"
"crypto/x509"
"crypto/x509/pkix"
Expand Down Expand Up @@ -66,6 +67,7 @@ import (
"github.com/pingcap/tidb/pkg/util/rowcodec"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/tikv"
"go.etcd.io/etcd/tests/v3/integration"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -1202,6 +1204,64 @@ func TestSetLabels(t *testing.T) {
})
}

func TestSetLabelsWithEtcd(t *testing.T) {
ts := createBasicHTTPHandlerTestSuite()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

ts.startServer(t)
defer ts.stopServer(t)

integration.BeforeTestExternal(t)
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer cluster.Terminate(t)
client := cluster.RandClient()
infosync.SetEtcdClient(client)
ts.domain.InfoSyncer().Restart(ctx)

testUpdateLabels := func(labels, expected map[string]string) {
buffer := bytes.NewBuffer([]byte{})
require.Nil(t, json.NewEncoder(buffer).Encode(labels))
resp, err := ts.PostStatus("/labels", "application/json", buffer)
require.NoError(t, err)
require.NotNil(t, resp)
defer func() {
require.NoError(t, resp.Body.Close())
}()
require.Equal(t, http.StatusOK, resp.StatusCode)
newLabels := config.GetGlobalConfig().Labels
require.Equal(t, newLabels, expected)
servers, err := infosync.GetAllServerInfo(ctx)
require.NoError(t, err)
for _, server := range servers {
for k, expectV := range expected {
v, ok := server.Labels[k]
require.True(t, ok)
require.Equal(t, expectV, v)
}
return
}
require.Fail(t, "no server found")
}

labels := map[string]string{
"zone": "us-west-1",
"test": "123",
}
testUpdateLabels(labels, labels)

updated := map[string]string{
"zone": "bj-1",
}
labels["zone"] = "bj-1"
testUpdateLabels(updated, labels)

// reset the global variable
config.UpdateGlobal(func(conf *config.Config) {
conf.Labels = map[string]string{}
})
}

func TestSetLabelsConcurrentWithGetLabel(t *testing.T) {
ts := createBasicHTTPHandlerTestSuite()

Expand Down
5 changes: 5 additions & 0 deletions pkg/server/handler/tikvhandler/tikv_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1981,6 +1981,11 @@ func (LabelHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
}
}
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
if err := infosync.UpdateServerLabel(ctx, labels); err != nil {
logutil.BgLogger().Error("update etcd labels failed", zap.Any("labels", cfg.Labels), zap.Error(err))
}
cancel()
cfg.Labels = labels
config.StoreGlobalConfig(&cfg)
logutil.BgLogger().Info("update server labels", zap.Any("labels", cfg.Labels))
Expand Down

0 comments on commit 1308099

Please sign in to comment.