From 18df271ce57fb8f3b9b4d3a31700765af6c0d81e Mon Sep 17 00:00:00 2001 From: Hu# Date: Thu, 2 Feb 2023 17:43:56 +0800 Subject: [PATCH] client: both Support names and config path (#5886) close tikv/pd#5885, ref tikv/pd#5887, ref tikv/pd#5888, ref pingcap/tiflow#8110 The global config needs to support both names and configuration paths, not only for compatibility but also to make more sense. Signed-off-by: husharp Co-authored-by: Ti Chi Robot --- client/client.go | 29 +++++-- client/go.mod | 2 +- client/go.sum | 4 +- client/resourcemanager_client.go | 2 +- go.mod | 2 +- go.sum | 4 +- server/grpc_service.go | 57 +++++++++++-- tests/client/global_config_test.go | 129 +++++++++++++++++++++++++---- tests/client/go.mod | 2 +- tests/client/go.sum | 4 +- tests/msc/go.mod | 2 +- tests/msc/go.sum | 4 +- tools/pd-tso-bench/go.sum | 4 +- 13 files changed, 200 insertions(+), 45 deletions(-) diff --git a/client/client.go b/client/client.go index 466e5620017..4c7e100d564 100644 --- a/client/client.go +++ b/client/client.go @@ -53,6 +53,7 @@ type GlobalConfigItem struct { EventType pdpb.EventType Name string Value string + PayLoad []byte } // Client is a PD (Placement Driver) client. @@ -122,7 +123,7 @@ type Client interface { GetOperator(ctx context.Context, regionID uint64) (*pdpb.GetOperatorResponse, error) // LoadGlobalConfig gets the global config from etcd - LoadGlobalConfig(ctx context.Context, configPath string) ([]GlobalConfigItem, int64, error) + LoadGlobalConfig(ctx context.Context, names []string, configPath string) ([]GlobalConfigItem, int64, error) // StoreGlobalConfig set the config from etcd StoreGlobalConfig(ctx context.Context, configPath string, items []GlobalConfigItem) error // WatchGlobalConfig returns an stream with all global config and updates @@ -419,7 +420,7 @@ func NewClientWithContext(ctx context.Context, pdAddrs []string, security Securi } // Start the daemons. c.updateTSODispatcher() - c.createTokenispatcher() + c.createTokenDispatcher() c.wg.Add(3) go c.tsLoop() go c.tsCancelLoop() @@ -1820,16 +1821,22 @@ func trimHTTPPrefix(str string) string { return str } -func (c *client) LoadGlobalConfig(ctx context.Context, configPath string) ([]GlobalConfigItem, int64, error) { - resp, err := c.getClient().LoadGlobalConfig(ctx, &pdpb.LoadGlobalConfigRequest{ConfigPath: configPath}) +func (c *client) LoadGlobalConfig(ctx context.Context, names []string, configPath string) ([]GlobalConfigItem, int64, error) { + resp, err := c.getClient().LoadGlobalConfig(ctx, &pdpb.LoadGlobalConfigRequest{Names: names, ConfigPath: configPath}) if err != nil { return nil, 0, err } res := make([]GlobalConfigItem, len(resp.GetItems())) for i, item := range resp.GetItems() { - cfg := GlobalConfigItem{Name: item.GetName()} - cfg.Value = item.GetValue() + cfg := GlobalConfigItem{Name: item.GetName(), EventType: item.GetKind(), PayLoad: item.GetPayload()} + if item.GetValue() == "" { + // We need to keep the Value field for CDC compatibility. + // But if you not use `Names`, will only have `Payload` field. + cfg.Value = string(item.GetPayload()) + } else { + cfg.Value = item.GetValue() + } res[i] = cfg } return res, resp.GetRevision(), nil @@ -1838,7 +1845,7 @@ func (c *client) LoadGlobalConfig(ctx context.Context, configPath string) ([]Glo func (c *client) StoreGlobalConfig(ctx context.Context, configPath string, items []GlobalConfigItem) error { resArr := make([]*pdpb.GlobalConfigItem, len(items)) for i, it := range items { - resArr[i] = &pdpb.GlobalConfigItem{Name: it.Name, Value: it.Value, Kind: it.EventType} + resArr[i] = &pdpb.GlobalConfigItem{Name: it.Name, Value: it.Value, Kind: it.EventType, Payload: it.PayLoad} } _, err := c.getClient().StoreGlobalConfig(ctx, &pdpb.StoreGlobalConfigRequest{Changes: resArr, ConfigPath: configPath}) if err != nil { @@ -1874,7 +1881,13 @@ func (c *client) WatchGlobalConfig(ctx context.Context, configPath string, revis } arr := make([]GlobalConfigItem, len(m.Changes)) for j, i := range m.Changes { - arr[j] = GlobalConfigItem{i.GetKind(), i.GetName(), i.GetValue()} + // We need to keep the Value field for CDC compatibility. + // But if you not use `Names`, will only have `Payload` field. + if i.GetValue() == "" { + arr[j] = GlobalConfigItem{i.GetKind(), i.GetName(), string(i.GetPayload()), i.GetPayload()} + } else { + arr[j] = GlobalConfigItem{i.GetKind(), i.GetName(), i.GetValue(), i.GetPayload()} + } } select { case <-ctx.Done(): diff --git a/client/go.mod b/client/go.mod index 3675df6bb92..47c68005f00 100644 --- a/client/go.mod +++ b/client/go.mod @@ -7,7 +7,7 @@ require ( github.com/opentracing/opentracing-go v1.2.0 github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 - github.com/pingcap/kvproto v0.0.0-20230131104319-a7c51106dfe7 + github.com/pingcap/kvproto v0.0.0-20230201112839-2b853bed8125 github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 github.com/prometheus/client_golang v1.11.0 github.com/stretchr/testify v1.8.1 diff --git a/client/go.sum b/client/go.sum index a7355525135..9b1d38d5e9b 100644 --- a/client/go.sum +++ b/client/go.sum @@ -561,8 +561,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c h1:xpW9bvK+HuuTm github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= -github.com/pingcap/kvproto v0.0.0-20230131104319-a7c51106dfe7 h1:oYUK4V5PMlyIooU/+pPkKrJ3vELwcuuCNyKKlqSQa5c= -github.com/pingcap/kvproto v0.0.0-20230131104319-a7c51106dfe7/go.mod h1:+on3Lfk/fb1lXkud3XvskJumhSIEEgN2TTbMObUlrxE= +github.com/pingcap/kvproto v0.0.0-20230201112839-2b853bed8125 h1:ZiCJcEzmmF5xNgt8GIXekd3WQXI/22kzYQnrHi3Fc/4= +github.com/pingcap/kvproto v0.0.0-20230201112839-2b853bed8125/go.mod h1:+on3Lfk/fb1lXkud3XvskJumhSIEEgN2TTbMObUlrxE= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= diff --git a/client/resourcemanager_client.go b/client/resourcemanager_client.go index 59bf605abaa..1cd15d501c3 100644 --- a/client/resourcemanager_client.go +++ b/client/resourcemanager_client.go @@ -234,7 +234,7 @@ type resourceManagerConnectionContext struct { cancel context.CancelFunc } -func (c *client) createTokenispatcher() { +func (c *client) createTokenDispatcher() { dispatcherCtx, dispatcherCancel := context.WithCancel(c.ctx) dispatcher := &tokenDispatcher{ dispatcherCancel: dispatcherCancel, diff --git a/go.mod b/go.mod index 4088a6693d8..1828bb16205 100644 --- a/go.mod +++ b/go.mod @@ -26,7 +26,7 @@ require ( github.com/pingcap/errcode v0.3.0 github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce - github.com/pingcap/kvproto v0.0.0-20230131104319-a7c51106dfe7 + github.com/pingcap/kvproto v0.0.0-20230201112839-2b853bed8125 github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 github.com/pingcap/sysutil v0.0.0-20211208032423-041a72e5860d github.com/pingcap/tidb-dashboard v0.0.0-20221201151320-ea3ee6971f2e diff --git a/go.sum b/go.sum index b59ff20efc9..cf9d4a16d95 100644 --- a/go.sum +++ b/go.sum @@ -368,8 +368,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce h1:Y1kCxlCtlPTMtVcOkjUcuQKh+YrluSo7+7YMCQSzy30= github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce/go.mod h1:w4PEZ5y16LeofeeGwdgZB4ddv9bLyDuIX+ljstgKZyk= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= -github.com/pingcap/kvproto v0.0.0-20230131104319-a7c51106dfe7 h1:oYUK4V5PMlyIooU/+pPkKrJ3vELwcuuCNyKKlqSQa5c= -github.com/pingcap/kvproto v0.0.0-20230131104319-a7c51106dfe7/go.mod h1:+on3Lfk/fb1lXkud3XvskJumhSIEEgN2TTbMObUlrxE= +github.com/pingcap/kvproto v0.0.0-20230201112839-2b853bed8125 h1:ZiCJcEzmmF5xNgt8GIXekd3WQXI/22kzYQnrHi3Fc/4= +github.com/pingcap/kvproto v0.0.0-20230201112839-2b853bed8125/go.mod h1:+on3Lfk/fb1lXkud3XvskJumhSIEEgN2TTbMObUlrxE= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= diff --git a/server/grpc_service.go b/server/grpc_service.go index 2ee6c107131..3725a19b2b6 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -18,6 +18,7 @@ import ( "context" "fmt" "io" + "path" "strconv" "sync/atomic" "time" @@ -1881,16 +1882,30 @@ func checkStream(streamCtx context.Context, cancel context.CancelFunc, done chan <-done } +// for CDC compatibility, we need to initialize config path to `globalConfigPath` +const globalConfigPath = "/global/config/" + // StoreGlobalConfig store global config into etcd by transaction +// Since item value needs to support marshal of different struct types, +// it should be set to `Payload bytes` instead of `Value string` func (s *GrpcServer) StoreGlobalConfig(_ context.Context, request *pdpb.StoreGlobalConfigRequest) (*pdpb.StoreGlobalConfigResponse, error) { + configPath := request.GetConfigPath() + if configPath == "" { + configPath = globalConfigPath + } ops := make([]clientv3.Op, len(request.Changes)) for i, item := range request.Changes { - name := item.GetName() + name := path.Join(configPath, item.GetName()) switch item.GetKind() { case pdpb.EventType_PUT: - ops[i] = clientv3.OpPut(request.GetConfigPath()+name, item.GetValue()) + // For CDC compatibility, we need to check the Value field firstly. + value := item.GetValue() + if value == "" { + value = string(item.GetPayload()) + } + ops[i] = clientv3.OpPut(name, value) case pdpb.EventType_DELETE: - ops[i] = clientv3.OpDelete(request.GetConfigPath() + name) + ops[i] = clientv3.OpDelete(name) } } res, err := @@ -1904,16 +1919,38 @@ func (s *GrpcServer) StoreGlobalConfig(_ context.Context, request *pdpb.StoreGlo return &pdpb.StoreGlobalConfigResponse{}, nil } -// LoadGlobalConfig load global config from etcd +// LoadGlobalConfig support 2 ways to load global config from etcd +// - `Names` iteratively get value from `ConfigPath/Name` but not care about revision +// - `ConfigPath` if `Names` is nil can get all values and revision of current path func (s *GrpcServer) LoadGlobalConfig(ctx context.Context, request *pdpb.LoadGlobalConfigRequest) (*pdpb.LoadGlobalConfigResponse, error) { configPath := request.GetConfigPath() + if configPath == "" { + configPath = globalConfigPath + } + // Since item value needs to support marshal of different struct types, + // it should be set to `Payload bytes` instead of `Value string`. + if request.Names != nil { + res := make([]*pdpb.GlobalConfigItem, len(request.Names)) + for i, name := range request.Names { + r, err := s.client.Get(ctx, path.Join(configPath, name)) + if err != nil { + res[i] = &pdpb.GlobalConfigItem{Name: name, Error: &pdpb.Error{Type: pdpb.ErrorType_UNKNOWN, Message: err.Error()}} + } else if len(r.Kvs) == 0 { + msg := "key " + name + " not found" + res[i] = &pdpb.GlobalConfigItem{Name: name, Error: &pdpb.Error{Type: pdpb.ErrorType_GLOBAL_CONFIG_NOT_FOUND, Message: msg}} + } else { + res[i] = &pdpb.GlobalConfigItem{Name: name, Payload: r.Kvs[0].Value, Kind: pdpb.EventType_PUT} + } + } + return &pdpb.LoadGlobalConfigResponse{Items: res}, nil + } r, err := s.client.Get(ctx, configPath, clientv3.WithPrefix()) if err != nil { return &pdpb.LoadGlobalConfigResponse{}, err } res := make([]*pdpb.GlobalConfigItem, len(r.Kvs)) for i, value := range r.Kvs { - res[i] = &pdpb.GlobalConfigItem{Kind: pdpb.EventType_PUT, Name: string(value.Key), Value: string(value.Value)} + res[i] = &pdpb.GlobalConfigItem{Kind: pdpb.EventType_PUT, Name: string(value.Key), Payload: value.Value} } return &pdpb.LoadGlobalConfigResponse{Items: res, Revision: r.Header.GetRevision()}, nil } @@ -1924,11 +1961,15 @@ func (s *GrpcServer) LoadGlobalConfig(ctx context.Context, request *pdpb.LoadGlo func (s *GrpcServer) WatchGlobalConfig(req *pdpb.WatchGlobalConfigRequest, server pdpb.PD_WatchGlobalConfigServer) error { ctx, cancel := context.WithCancel(s.Context()) defer cancel() + configPath := req.GetConfigPath() + if configPath == "" { + configPath = globalConfigPath + } revision := req.GetRevision() // If the revision is compacted, will meet required revision has been compacted error. // - If required revision < CompactRevision, we need to reload all configs to avoid losing data. // - If required revision >= CompactRevision, just keep watching. - watchChan := s.client.Watch(ctx, req.GetConfigPath(), clientv3.WithPrefix(), clientv3.WithRev(revision)) + watchChan := s.client.Watch(ctx, configPath, clientv3.WithPrefix(), clientv3.WithRev(revision)) for { select { case <-ctx.Done(): @@ -1947,7 +1988,9 @@ func (s *GrpcServer) WatchGlobalConfig(req *pdpb.WatchGlobalConfigRequest, serve cfgs := make([]*pdpb.GlobalConfigItem, 0, len(res.Events)) for _, e := range res.Events { - cfgs = append(cfgs, &pdpb.GlobalConfigItem{Name: string(e.Kv.Key), Value: string(e.Kv.Value), Kind: pdpb.EventType(e.Type)}) + // Since item value needs to support marshal of different struct types, + // it should be set to `Payload bytes` instead of `Value string`. + cfgs = append(cfgs, &pdpb.GlobalConfigItem{Name: string(e.Kv.Key), Payload: e.Kv.Value, Kind: pdpb.EventType(e.Type)}) } if len(cfgs) > 0 { if err := server.Send(&pdpb.WatchGlobalConfigResponse{Changes: cfgs, Revision: res.Header.GetRevision()}); err != nil { diff --git a/tests/client/global_config_test.go b/tests/client/global_config_test.go index a08c16b4031..7a88dec9fa9 100644 --- a/tests/client/global_config_test.go +++ b/tests/client/global_config_test.go @@ -15,6 +15,7 @@ package client_test import ( + "path" "strconv" "testing" "time" @@ -31,7 +32,7 @@ import ( "google.golang.org/grpc" ) -const globalConfigPath = "global/config/" +const globalConfigPath = "/global/config/" type testReceiver struct { re *require.Assertions @@ -43,9 +44,9 @@ func (s testReceiver) Send(m *pdpb.WatchGlobalConfigResponse) error { for _, change := range m.GetChanges() { switch change.GetKind() { case pdpb.EventType_PUT: - s.re.Contains(change.Name, globalConfigPath+change.Value) + s.re.Contains(change.Name, globalConfigPath+string(change.Payload)) case pdpb.EventType_DELETE: - s.re.Empty(change.Value) + s.re.Empty(change.Payload) } } return nil @@ -84,7 +85,7 @@ func (suite *globalConfigTestSuite) GetEtcdPath(configPath string) string { return globalConfigPath + configPath } -func (suite *globalConfigTestSuite) TestLoad() { +func (suite *globalConfigTestSuite) TestLoadWithoutNames() { defer func() { // clean up _, err := suite.server.GetClient().Delete(suite.server.Context(), suite.GetEtcdPath("test")) @@ -98,17 +99,78 @@ func (suite *globalConfigTestSuite) TestLoad() { suite.NoError(err) suite.Len(res.Items, 1) suite.Equal(r.Header.GetRevision(), res.Revision) - suite.Equal("test", res.Items[0].Value) + suite.Equal("test", string(res.Items[0].Payload)) +} + +func (suite *globalConfigTestSuite) TestLoadWithoutConfigPath() { + defer func() { + // clean up + _, err := suite.server.GetClient().Delete(suite.server.Context(), suite.GetEtcdPath("source_id")) + suite.NoError(err) + }() + _, err := suite.server.GetClient().Put(suite.server.Context(), suite.GetEtcdPath("source_id"), "1") + suite.NoError(err) + res, err := suite.server.LoadGlobalConfig(suite.server.Context(), &pdpb.LoadGlobalConfigRequest{ + Names: []string{"source_id"}, + }) + suite.NoError(err) + suite.Len(res.Items, 1) + suite.Equal([]byte("1"), res.Items[0].Payload) +} + +func (suite *globalConfigTestSuite) TestLoadOtherConfigPath() { + defer func() { + for i := 0; i < 3; i++ { + _, err := suite.server.GetClient().Delete(suite.server.Context(), suite.GetEtcdPath(strconv.Itoa(i))) + suite.NoError(err) + } + }() + for i := 0; i < 3; i++ { + _, err := suite.server.GetClient().Put(suite.server.Context(), path.Join("OtherConfigPath", strconv.Itoa(i)), strconv.Itoa(i)) + suite.NoError(err) + } + res, err := suite.server.LoadGlobalConfig(suite.server.Context(), &pdpb.LoadGlobalConfigRequest{ + Names: []string{"0", "1"}, + ConfigPath: "OtherConfigPath", + }) + suite.NoError(err) + suite.Len(res.Items, 2) + for i, item := range res.Items { + suite.Equal(&pdpb.GlobalConfigItem{Kind: pdpb.EventType_PUT, Name: strconv.Itoa(i), Payload: []byte(strconv.Itoa(i))}, item) + } +} + +func (suite *globalConfigTestSuite) TestLoadAndStore() { + defer func() { + for i := 0; i < 3; i++ { + _, err := suite.server.GetClient().Delete(suite.server.Context(), suite.GetEtcdPath("test")) + suite.NoError(err) + } + }() + changes := []*pdpb.GlobalConfigItem{{Kind: pdpb.EventType_PUT, Name: "0", Payload: []byte("0")}, {Kind: pdpb.EventType_PUT, Name: "1", Payload: []byte("1")}, {Kind: pdpb.EventType_PUT, Name: "2", Payload: []byte("2")}} + _, err := suite.server.StoreGlobalConfig(suite.server.Context(), &pdpb.StoreGlobalConfigRequest{ + ConfigPath: globalConfigPath, + Changes: changes, + }) + suite.NoError(err) + res, err := suite.server.LoadGlobalConfig(suite.server.Context(), &pdpb.LoadGlobalConfigRequest{ + ConfigPath: globalConfigPath, + }) + suite.Len(res.Items, 3) + suite.NoError(err) + for i, item := range res.Items { + suite.Equal(&pdpb.GlobalConfigItem{Kind: pdpb.EventType_PUT, Name: suite.GetEtcdPath(strconv.Itoa(i)), Payload: []byte(strconv.Itoa(i))}, item) + } } func (suite *globalConfigTestSuite) TestStore() { defer func() { for i := 0; i < 3; i++ { - _, err := suite.server.GetClient().Delete(suite.server.Context(), globalConfigPath+"test") + _, err := suite.server.GetClient().Delete(suite.server.Context(), suite.GetEtcdPath("test")) suite.NoError(err) } }() - changes := []*pdpb.GlobalConfigItem{{Kind: pdpb.EventType_PUT, Name: "0", Value: "0"}, {Kind: pdpb.EventType_PUT, Name: "1", Value: "1"}, {Kind: pdpb.EventType_PUT, Name: "2", Value: "2"}} + changes := []*pdpb.GlobalConfigItem{{Kind: pdpb.EventType_PUT, Name: "0", Payload: []byte("0")}, {Kind: pdpb.EventType_PUT, Name: "1", Payload: []byte("1")}, {Kind: pdpb.EventType_PUT, Name: "2", Payload: []byte("2")}} _, err := suite.server.StoreGlobalConfig(suite.server.Context(), &pdpb.StoreGlobalConfigRequest{ ConfigPath: globalConfigPath, Changes: changes, @@ -140,18 +202,55 @@ func (suite *globalConfigTestSuite) TestWatch() { } } -func (suite *globalConfigTestSuite) TestClientLoad() { +func (suite *globalConfigTestSuite) TestClientLoadWithoutNames() { + defer func() { + for i := 0; i < 3; i++ { + _, err := suite.server.GetClient().Delete(suite.server.Context(), suite.GetEtcdPath(strconv.Itoa(i))) + suite.NoError(err) + } + }() + for i := 0; i < 3; i++ { + _, err := suite.server.GetClient().Put(suite.server.Context(), suite.GetEtcdPath(strconv.Itoa(i)), strconv.Itoa(i)) + suite.NoError(err) + } + res, _, err := suite.client.LoadGlobalConfig(suite.server.Context(), nil, globalConfigPath) + suite.NoError(err) + suite.Len(res, 3) + for i, item := range res { + suite.Equal(pd.GlobalConfigItem{EventType: pdpb.EventType_PUT, Name: suite.GetEtcdPath(strconv.Itoa(i)), PayLoad: []byte(strconv.Itoa(i)), Value: strconv.Itoa(i)}, item) + } +} + +func (suite *globalConfigTestSuite) TestClientLoadWithoutConfigPath() { defer func() { - _, err := suite.server.GetClient().Delete(suite.server.Context(), globalConfigPath+"test") + _, err := suite.server.GetClient().Delete(suite.server.Context(), suite.GetEtcdPath("source_id")) suite.NoError(err) }() - r, err := suite.server.GetClient().Put(suite.server.Context(), suite.GetEtcdPath("test"), "test") + _, err := suite.server.GetClient().Put(suite.server.Context(), suite.GetEtcdPath("source_id"), "1") suite.NoError(err) - res, revision, err := suite.client.LoadGlobalConfig(suite.server.Context(), globalConfigPath) + res, _, err := suite.client.LoadGlobalConfig(suite.server.Context(), []string{"source_id"}, "") suite.NoError(err) suite.Len(res, 1) - suite.Equal(r.Header.GetRevision(), revision) - suite.Equal(pd.GlobalConfigItem{Name: suite.GetEtcdPath("test"), Value: "test", EventType: pdpb.EventType_PUT}, res[0]) + suite.Equal(pd.GlobalConfigItem{EventType: pdpb.EventType_PUT, Name: "source_id", PayLoad: []byte("1"), Value: "1"}, res[0]) +} + +func (suite *globalConfigTestSuite) TestClientLoadOtherConfigPath() { + defer func() { + for i := 0; i < 3; i++ { + _, err := suite.server.GetClient().Delete(suite.server.Context(), suite.GetEtcdPath(strconv.Itoa(i))) + suite.NoError(err) + } + }() + for i := 0; i < 3; i++ { + _, err := suite.server.GetClient().Put(suite.server.Context(), path.Join("OtherConfigPath", strconv.Itoa(i)), strconv.Itoa(i)) + suite.NoError(err) + } + res, _, err := suite.client.LoadGlobalConfig(suite.server.Context(), []string{"0", "1"}, "OtherConfigPath") + suite.NoError(err) + suite.Len(res, 2) + for i, item := range res { + suite.Equal(pd.GlobalConfigItem{EventType: pdpb.EventType_PUT, Name: strconv.Itoa(i), PayLoad: []byte(strconv.Itoa(i)), Value: strconv.Itoa(i)}, item) + } } func (suite *globalConfigTestSuite) TestClientStore() { @@ -184,11 +283,11 @@ func (suite *globalConfigTestSuite) TestClientWatchWithRevision() { // Mock get revision by loading r, err := suite.server.GetClient().Put(suite.server.Context(), suite.GetEtcdPath("test"), "test") suite.NoError(err) - res, revision, err := suite.client.LoadGlobalConfig(suite.server.Context(), globalConfigPath) + res, revision, err := suite.client.LoadGlobalConfig(suite.server.Context(), nil, globalConfigPath) suite.NoError(err) suite.Len(res, 1) suite.Equal(r.Header.GetRevision(), revision) - suite.Equal(pd.GlobalConfigItem{Name: suite.GetEtcdPath("test"), Value: "test", EventType: pdpb.EventType_PUT}, res[0]) + suite.Equal(pd.GlobalConfigItem{EventType: pdpb.EventType_PUT, Name: suite.GetEtcdPath("test"), PayLoad: []byte("test"), Value: "test"}, res[0]) // Mock when start watcher there are existed some keys, will load firstly for i := 3; i < 6; i++ { _, err = suite.server.GetClient().Put(suite.server.Context(), suite.GetEtcdPath(strconv.Itoa(i)), strconv.Itoa(i)) diff --git a/tests/client/go.mod b/tests/client/go.mod index 6e2fbe12404..d2c68656e56 100644 --- a/tests/client/go.mod +++ b/tests/client/go.mod @@ -4,7 +4,7 @@ go 1.19 require ( github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 - github.com/pingcap/kvproto v0.0.0-20230131104319-a7c51106dfe7 + github.com/pingcap/kvproto v0.0.0-20230201112839-2b853bed8125 github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 github.com/stretchr/testify v1.8.1 github.com/tikv/pd v0.0.0-00010101000000-000000000000 diff --git a/tests/client/go.sum b/tests/client/go.sum index fddf2bd353f..9e2ee1c8c79 100644 --- a/tests/client/go.sum +++ b/tests/client/go.sum @@ -789,8 +789,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= -github.com/pingcap/kvproto v0.0.0-20230131104319-a7c51106dfe7 h1:oYUK4V5PMlyIooU/+pPkKrJ3vELwcuuCNyKKlqSQa5c= -github.com/pingcap/kvproto v0.0.0-20230131104319-a7c51106dfe7/go.mod h1:+on3Lfk/fb1lXkud3XvskJumhSIEEgN2TTbMObUlrxE= +github.com/pingcap/kvproto v0.0.0-20230201112839-2b853bed8125 h1:ZiCJcEzmmF5xNgt8GIXekd3WQXI/22kzYQnrHi3Fc/4= +github.com/pingcap/kvproto v0.0.0-20230201112839-2b853bed8125/go.mod h1:+on3Lfk/fb1lXkud3XvskJumhSIEEgN2TTbMObUlrxE= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= diff --git a/tests/msc/go.mod b/tests/msc/go.mod index 6f482bd1128..9b680b0477b 100644 --- a/tests/msc/go.mod +++ b/tests/msc/go.mod @@ -3,7 +3,7 @@ module github.com/tikv/pd/tests/msc go 1.19 require ( - github.com/pingcap/kvproto v0.0.0-20230131104319-a7c51106dfe7 + github.com/pingcap/kvproto v0.0.0-20230201112839-2b853bed8125 github.com/stretchr/testify v1.8.1 github.com/tikv/pd v0.0.0-00010101000000-000000000000 github.com/tikv/pd/client v0.0.0-00010101000000-000000000000 diff --git a/tests/msc/go.sum b/tests/msc/go.sum index 86240e465bb..da5bcd13f0d 100644 --- a/tests/msc/go.sum +++ b/tests/msc/go.sum @@ -790,8 +790,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= -github.com/pingcap/kvproto v0.0.0-20230131104319-a7c51106dfe7 h1:oYUK4V5PMlyIooU/+pPkKrJ3vELwcuuCNyKKlqSQa5c= -github.com/pingcap/kvproto v0.0.0-20230131104319-a7c51106dfe7/go.mod h1:+on3Lfk/fb1lXkud3XvskJumhSIEEgN2TTbMObUlrxE= +github.com/pingcap/kvproto v0.0.0-20230201112839-2b853bed8125 h1:ZiCJcEzmmF5xNgt8GIXekd3WQXI/22kzYQnrHi3Fc/4= +github.com/pingcap/kvproto v0.0.0-20230201112839-2b853bed8125/go.mod h1:+on3Lfk/fb1lXkud3XvskJumhSIEEgN2TTbMObUlrxE= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= diff --git a/tools/pd-tso-bench/go.sum b/tools/pd-tso-bench/go.sum index a9c358ad49a..66c58454f59 100644 --- a/tools/pd-tso-bench/go.sum +++ b/tools/pd-tso-bench/go.sum @@ -561,8 +561,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c h1:xpW9bvK+HuuTm github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= -github.com/pingcap/kvproto v0.0.0-20230131104319-a7c51106dfe7 h1:oYUK4V5PMlyIooU/+pPkKrJ3vELwcuuCNyKKlqSQa5c= -github.com/pingcap/kvproto v0.0.0-20230131104319-a7c51106dfe7/go.mod h1:+on3Lfk/fb1lXkud3XvskJumhSIEEgN2TTbMObUlrxE= +github.com/pingcap/kvproto v0.0.0-20230201112839-2b853bed8125 h1:ZiCJcEzmmF5xNgt8GIXekd3WQXI/22kzYQnrHi3Fc/4= +github.com/pingcap/kvproto v0.0.0-20230201112839-2b853bed8125/go.mod h1:+on3Lfk/fb1lXkud3XvskJumhSIEEgN2TTbMObUlrxE= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=