diff --git a/server/embed/etcd.go b/server/embed/etcd.go index 2e20566de3e8..feb846ea1284 100644 --- a/server/embed/etcd.go +++ b/server/embed/etcd.go @@ -539,7 +539,7 @@ func (e *Etcd) servePeers() (err error) { for _, p := range e.Peers { u := p.Listener.Addr().String() - gs := v3rpc.Server(e.Server, peerTLScfg) + gs := v3rpc.Server(e.Server, peerTLScfg, nil) m := cmux.New(p.Listener) go gs.Serve(m.Match(cmux.HTTP2())) srv := &http.Server{ diff --git a/server/embed/serve.go b/server/embed/serve.go index 17b55384ebb6..c3e786321cd0 100644 --- a/server/embed/serve.go +++ b/server/embed/serve.go @@ -110,7 +110,7 @@ func (sctx *serveCtx) serve( }() if sctx.insecure { - gs = v3rpc.Server(s, nil, gopts...) + gs = v3rpc.Server(s, nil, nil, gopts...) v3electionpb.RegisterElectionServer(gs, servElection) v3lockpb.RegisterLockServer(gs, servLock) if sctx.serviceRegister != nil { @@ -148,7 +148,7 @@ func (sctx *serveCtx) serve( if tlsErr != nil { return tlsErr } - gs = v3rpc.Server(s, tlscfg, gopts...) + gs = v3rpc.Server(s, tlscfg, nil, gopts...) v3electionpb.RegisterElectionServer(gs, servElection) v3lockpb.RegisterLockServer(gs, servLock) if sctx.serviceRegister != nil { diff --git a/server/etcdserver/api/v3rpc/grpc.go b/server/etcdserver/api/v3rpc/grpc.go index 26c52b385b43..ea3dd75705fd 100644 --- a/server/etcdserver/api/v3rpc/grpc.go +++ b/server/etcdserver/api/v3rpc/grpc.go @@ -36,19 +36,21 @@ const ( maxSendBytes = math.MaxInt32 ) -func Server(s *etcdserver.EtcdServer, tls *tls.Config, gopts ...grpc.ServerOption) *grpc.Server { +func Server(s *etcdserver.EtcdServer, tls *tls.Config, interceptor grpc.UnaryServerInterceptor, gopts ...grpc.ServerOption) *grpc.Server { var opts []grpc.ServerOption opts = append(opts, grpc.CustomCodec(&codec{})) if tls != nil { bundle := credentials.NewBundle(credentials.Config{TLSConfig: tls}) opts = append(opts, grpc.Creds(bundle.TransportCredentials())) } - chainUnaryInterceptors := []grpc.UnaryServerInterceptor{ newLogUnaryInterceptor(s), newUnaryInterceptor(s), grpc_prometheus.UnaryServerInterceptor, } + if interceptor != nil { + chainUnaryInterceptors = append(chainUnaryInterceptors, interceptor) + } chainStreamInterceptors := []grpc.StreamServerInterceptor{ newStreamInterceptor(s), diff --git a/server/etcdserver/api/v3rpc/interceptor.go b/server/etcdserver/api/v3rpc/interceptor.go index 0d4d5c32907f..97d7b511c351 100644 --- a/server/etcdserver/api/v3rpc/interceptor.go +++ b/server/etcdserver/api/v3rpc/interceptor.go @@ -76,13 +76,59 @@ func newLogUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerIntercepto startTime := time.Now() resp, err := handler(ctx, req) lg := s.Logger() - if lg != nil { // acquire stats if debug level is enabled or request is expensive + if lg != nil { // acquire stats if debug level is enabled or RequestInfo is expensive defer logUnaryRequestStats(ctx, lg, s.Cfg.WarningUnaryRequestDuration, info, startTime, req, resp) } return resp, err } } +type GrpcRecorder struct { + mux sync.RWMutex + requests []RequestInfo +} + +type RequestInfo struct { + FullMethod string + Authority string +} + +func (ri *GrpcRecorder) UnaryInterceptor() grpc.UnaryServerInterceptor { + return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { + ri.record(recordedRequest(ctx, info)) + resp, err := handler(ctx, req) + return resp, err + } +} + +func (ri *GrpcRecorder) RecordedRequests() []RequestInfo { + ri.mux.RLock() + defer ri.mux.RUnlock() + reqs := make([]RequestInfo, len(ri.requests)) + copy(reqs, ri.requests) + return reqs +} + +func recordedRequest(ctx context.Context, info *grpc.UnaryServerInfo) RequestInfo { + req := RequestInfo{ + FullMethod: info.FullMethod, + } + md, ok := metadata.FromIncomingContext(ctx) + if ok { + as := md.Get(":authority") + if len(as) != 0 { + req.Authority = as[0] + } + } + return req +} + +func (ri *GrpcRecorder) record(r RequestInfo) { + ri.mux.Lock() + defer ri.mux.Unlock() + ri.requests = append(ri.requests, r) +} + func logUnaryRequestStats(ctx context.Context, lg *zap.Logger, warnLatency time.Duration, info *grpc.UnaryServerInfo, startTime time.Time, req interface{}, resp interface{}) { duration := time.Since(startTime) var enabledDebugLevel, expensiveRequest bool diff --git a/tests/e2e/cluster_proxy_test.go b/tests/e2e/cluster_proxy_test.go index b96a10037fde..bff7d0e82cbe 100644 --- a/tests/e2e/cluster_proxy_test.go +++ b/tests/e2e/cluster_proxy_test.go @@ -122,6 +122,7 @@ type proxyProc struct { ep string murl string donec chan struct{} + env []string proc *expect.ExpectProcess } @@ -132,7 +133,7 @@ func (pp *proxyProc) start() error { if pp.proc != nil { panic("already started") } - proc, err := spawnCmdWithLogger(pp.lg, append([]string{pp.execPath}, pp.args...)) + proc, err := spawnCmdWithLogger(pp.lg, append([]string{pp.execPath}, pp.args...), pp.env) if err != nil { return err } diff --git a/tests/e2e/cluster_test.go b/tests/e2e/cluster_test.go index 4b3993d5c61d..53d5a2bff59a 100644 --- a/tests/e2e/cluster_test.go +++ b/tests/e2e/cluster_test.go @@ -174,6 +174,7 @@ type etcdProcessClusterConfig struct { v2deprecation string rollingStart bool + env []string } // newEtcdProcessCluster launches a new cluster from etcd processes, returning @@ -326,6 +327,7 @@ func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs(tb testing.TB) []* acurl: curl, murl: murl, initialToken: cfg.initialToken, + env: cfg.env, } } diff --git a/tests/e2e/ctl_v3_grpc_test.go b/tests/e2e/ctl_v3_grpc_test.go new file mode 100644 index 000000000000..afa3e74fa8ed --- /dev/null +++ b/tests/e2e/ctl_v3_grpc_test.go @@ -0,0 +1,114 @@ +// Copyright 2016 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package e2e + +import ( + "fmt" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "go.etcd.io/etcd/client/pkg/v3/testutil" +) + +func TestCtlV3AuthoritySingleEndpoint(t *testing.T) { + BeforeTest(t) + + cfg := newConfigAutoTLS() + cfg.clusterSize = 1 + // Enable debug mode to get logs with http2 headers (including authority) + cfg.env = []string{"GODEBUG=http2debug=2"} + + epc, err := newEtcdProcessCluster(t, cfg) + if err != nil { + t.Fatalf("could not start etcd process cluster (%v)", err) + } + err = put(t, cfg, epc, "foo", "bar") + if err != nil { + t.Error(err) + } + + executeWithTimeout(t, 5*time.Second, func() { + assertAuthority(t, "#initially=[http://localhost:20000]", epc.procs[0].Logs()) + }) + assert.NoError(t, epc.Close()) +} + +func TestCtlV3AuthorityMultipleEndpoints(t *testing.T) { + BeforeTest(t) + + cfg := newConfigAutoTLS() + cfg.clusterSize = 3 + // Enable debug mode to get logs with http2 headers (including authority) + cfg.env = []string{"GODEBUG=http2debug=2"} + + epc, err := newEtcdProcessCluster(t, cfg) + if err != nil { + t.Fatalf("could not start etcd process cluster (%v)", err) + } + err = put(t, cfg, epc, "foo", "bar") + if err != nil { + t.Error(err) + } + + executeWithTimeout(t, 10*time.Second, func() { + assertAuthority(t, "#initially=[http://localhost:20000;http://localhost:20005;http://localhost:20010]", epc.procs[0].Logs(), epc.procs[1].Logs(), epc.procs[2].Logs()) + }) + assert.NoError(t, epc.Close()) +} + +func assertAuthority(t *testing.T, expectAurhority string, logs ...logsExpect) { + line := firstMatch(t, `http2: decoded hpack field header field ":authority"`, logs...) + line = strings.TrimSuffix(line, "\n") + line = strings.TrimSuffix(line, "\r") + expectLine := fmt.Sprintf(`http2: decoded hpack field header field ":authority" = %q`, expectAurhority) + assert.True(t, strings.HasSuffix(line, expectLine), fmt.Sprintf("Got %q expected suffix %q", line, expectLine)) +} + +func firstMatch(t *testing.T, expectLine string, logs ...logsExpect) string { + t.Helper() + match := make(chan string, len(logs)) + for i := range logs { + go func(l logsExpect) { + line, _ := l.Expect(expectLine) + match <- line + }(logs[i]) + } + return <-match +} + +func executeWithTimeout(t *testing.T, timeout time.Duration, f func()) { + donec := make(chan struct{}) + go func() { + defer close(donec) + f() + }() + + select { + case <-time.After(timeout): + testutil.FatalStack(t, fmt.Sprintf("test timed out after %v", timeout)) + case <-donec: + } +} + +func put(t *testing.T, cfg *etcdProcessClusterConfig, epc *etcdProcessCluster, key, value string) error { + ctx := ctlCtx{ + t: t, + cfg: *cfg, + epc: epc, + } + return ctlV3Put(ctx, key, value, "") +} diff --git a/tests/e2e/etcd_process.go b/tests/e2e/etcd_process.go index f744fa81cd9d..db87c8a7c0b7 100644 --- a/tests/e2e/etcd_process.go +++ b/tests/e2e/etcd_process.go @@ -43,6 +43,11 @@ type etcdProcess interface { Close() error WithStopSignal(sig os.Signal) os.Signal Config() *etcdServerProcessConfig + Logs() logsExpect +} + +type logsExpect interface { + Expect(string) (string, error) } type etcdServerProcess struct { @@ -56,6 +61,7 @@ type etcdServerProcessConfig struct { execPath string args []string tlsArgs []string + env []string dataDirPath string keepDataDir bool @@ -92,7 +98,7 @@ func (ep *etcdServerProcess) Start() error { panic("already started") } ep.cfg.lg.Info("starting server...", zap.String("name", ep.cfg.name)) - proc, err := spawnCmdWithLogger(ep.cfg.lg, append([]string{ep.cfg.execPath}, ep.cfg.args...)) + proc, err := spawnCmdWithLogger(ep.cfg.lg, append([]string{ep.cfg.execPath}, ep.cfg.args...), ep.cfg.env) if err != nil { return err } @@ -163,3 +169,10 @@ func (ep *etcdServerProcess) waitReady() error { } func (ep *etcdServerProcess) Config() *etcdServerProcessConfig { return ep.cfg } + +func (ep *etcdServerProcess) Logs() logsExpect { + if ep.proc == nil { + ep.cfg.lg.Panic("Please grap logs before process is stopped") + } + return ep.proc +} diff --git a/tests/e2e/etcd_spawn_nocov.go b/tests/e2e/etcd_spawn_nocov.go index b0e872fb220f..85b457e091a2 100644 --- a/tests/e2e/etcd_spawn_nocov.go +++ b/tests/e2e/etcd_spawn_nocov.go @@ -28,19 +28,20 @@ import ( const noOutputLineCount = 0 // regular binaries emit no extra lines func spawnCmd(args []string) (*expect.ExpectProcess, error) { - return spawnCmdWithLogger(zap.NewNop(), args) + return spawnCmdWithLogger(zap.NewNop(), args, nil) } -func spawnCmdWithLogger(lg *zap.Logger, args []string) (*expect.ExpectProcess, error) { +func spawnCmdWithLogger(lg *zap.Logger, args []string, env []string) (*expect.ExpectProcess, error) { wd, err := os.Getwd() if err != nil { return nil, err } if strings.HasSuffix(args[0], "/etcdctl3") { - env := append(os.Environ(), "ETCDCTL_API=3") - lg.Info("spawning process with ETCDCTL_API=3", zap.Strings("args", args), zap.String("working-dir", wd)) + env = append(env, os.Environ()...) + env = append(env, "ETCDCTL_API=3") + lg.Info("spawning process with ETCDCTL_API=3", zap.Strings("args", args), zap.String("working-dir", wd), zap.Strings("env", env)) return expect.NewExpectWithEnv(ctlBinPath, args[1:], env) } - lg.Info("spawning process", zap.Strings("args", args), zap.String("working-dir", wd)) - return expect.NewExpect(args[0], args[1:]...) + lg.Info("spawning process", zap.Strings("args", args), zap.String("working-dir", wd), zap.Strings("env", env)) + return expect.NewExpectWithEnv(args[0], args[1:], env) } diff --git a/tests/integration/clientv3/grpc_test.go b/tests/integration/clientv3/grpc_test.go new file mode 100644 index 000000000000..735ca99fd06e --- /dev/null +++ b/tests/integration/clientv3/grpc_test.go @@ -0,0 +1,66 @@ +// Copyright 2016 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package clientv3test + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "go.etcd.io/etcd/server/v3/etcdserver/api/v3rpc" + "go.etcd.io/etcd/tests/v3/integration" +) + +func TestAuthoritySingleEndpoint(t *testing.T) { + integration.BeforeTest(t) + + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + defer clus.Terminate(t) + + kv := clus.Client(0) + + ctx := context.TODO() + _, err := kv.Put(ctx, "foo", "bar") + if err != nil { + t.Fatal(err) + } + + reqs := clus.Members[0].RecordedRequests() + assert.ElementsMatch(t, []v3rpc.RequestInfo{{FullMethod: "/etcdserverpb.KV/Put", Authority: "#initially=[unix://localhost:m00]"}}, reqs) +} + +func TestAuthorityMultipleEndpoints(t *testing.T) { + integration.BeforeTest(t) + + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) + defer clus.Terminate(t) + + kv, err := clus.ClusterClient() + if err != nil { + t.Fatal(err) + } + + ctx := context.TODO() + _, err = kv.Put(ctx, "foo", "bar") + if err != nil { + t.Fatal(err) + } + + reqs := []v3rpc.RequestInfo{} + for _, m := range clus.Members { + reqs = append(reqs, m.RecordedRequests()...) + } + assert.ElementsMatch(t, []v3rpc.RequestInfo{{FullMethod: "/etcdserverpb.KV/Put", Authority: "#initially=[unix://localhost:m00;unix://localhost:m10;unix://localhost:m20]"}}, reqs) +} diff --git a/tests/integration/cluster.go b/tests/integration/cluster.go index cbf8adacf249..2573efdd8968 100644 --- a/tests/integration/cluster.go +++ b/tests/integration/cluster.go @@ -296,27 +296,7 @@ func (c *cluster) HTTPMembers() []client.Member { } func (c *cluster) mustNewMember(t testutil.TB) *member { - m := mustNewMember(t, - memberConfig{ - name: c.generateMemberName(), - authToken: c.cfg.AuthToken, - peerTLS: c.cfg.PeerTLS, - clientTLS: c.cfg.ClientTLS, - quotaBackendBytes: c.cfg.QuotaBackendBytes, - maxTxnOps: c.cfg.MaxTxnOps, - maxRequestBytes: c.cfg.MaxRequestBytes, - snapshotCount: c.cfg.SnapshotCount, - snapshotCatchUpEntries: c.cfg.SnapshotCatchUpEntries, - grpcKeepAliveMinTime: c.cfg.GRPCKeepAliveMinTime, - grpcKeepAliveInterval: c.cfg.GRPCKeepAliveInterval, - grpcKeepAliveTimeout: c.cfg.GRPCKeepAliveTimeout, - clientMaxCallSendMsgSize: c.cfg.ClientMaxCallSendMsgSize, - clientMaxCallRecvMsgSize: c.cfg.ClientMaxCallRecvMsgSize, - useIP: c.cfg.UseIP, - enableLeaseCheckpoint: c.cfg.EnableLeaseCheckpoint, - leaseCheckpointInterval: c.cfg.LeaseCheckpointInterval, - WatchProgressNotifyInterval: c.cfg.WatchProgressNotifyInterval, - }) + m := mustNewMember(t, c.memberConfig()) m.DiscoveryURL = c.cfg.DiscoveryURL if c.cfg.UseGRPC { if err := m.listenGRPC(); err != nil { @@ -326,6 +306,29 @@ func (c *cluster) mustNewMember(t testutil.TB) *member { return m } +func (c *cluster) memberConfig() memberConfig { + return memberConfig{ + name: c.generateMemberName(), + authToken: c.cfg.AuthToken, + peerTLS: c.cfg.PeerTLS, + clientTLS: c.cfg.ClientTLS, + quotaBackendBytes: c.cfg.QuotaBackendBytes, + maxTxnOps: c.cfg.MaxTxnOps, + maxRequestBytes: c.cfg.MaxRequestBytes, + snapshotCount: c.cfg.SnapshotCount, + snapshotCatchUpEntries: c.cfg.SnapshotCatchUpEntries, + grpcKeepAliveMinTime: c.cfg.GRPCKeepAliveMinTime, + grpcKeepAliveInterval: c.cfg.GRPCKeepAliveInterval, + grpcKeepAliveTimeout: c.cfg.GRPCKeepAliveTimeout, + clientMaxCallSendMsgSize: c.cfg.ClientMaxCallSendMsgSize, + clientMaxCallRecvMsgSize: c.cfg.ClientMaxCallRecvMsgSize, + useIP: c.cfg.UseIP, + enableLeaseCheckpoint: c.cfg.EnableLeaseCheckpoint, + leaseCheckpointInterval: c.cfg.LeaseCheckpointInterval, + WatchProgressNotifyInterval: c.cfg.WatchProgressNotifyInterval, + } +} + // addMember return PeerURLs of the added member. func (c *cluster) addMember(t testutil.TB) types.URLs { m := c.mustNewMember(t) @@ -585,6 +588,8 @@ type member struct { isLearner bool closed bool + + grpcServerRecorder *v3rpc.GrpcRecorder } func (m *member) GRPCAddr() string { return m.grpcAddr } @@ -708,7 +713,7 @@ func mustNewMember(t testutil.TB, mcfg memberConfig) *member { m.WarningUnaryRequestDuration = embed.DefaultWarningUnaryRequestDuration m.V2Deprecation = config.V2_DEPR_DEFAULT - + m.grpcServerRecorder = &v3rpc.GrpcRecorder{} m.Logger = memberLogger(t, mcfg.name) t.Cleanup(func() { // if we didn't cleanup the logger, the consecutive test @@ -857,8 +862,8 @@ func (m *member) Launch() error { return err } } - m.grpcServer = v3rpc.Server(m.s, tlscfg, m.grpcServerOpts...) - m.grpcServerPeer = v3rpc.Server(m.s, peerTLScfg) + m.grpcServer = v3rpc.Server(m.s, tlscfg, m.grpcServerRecorder.UnaryInterceptor(), m.grpcServerOpts...) + m.grpcServerPeer = v3rpc.Server(m.s, peerTLScfg, m.grpcServerRecorder.UnaryInterceptor()) m.serverClient = v3client.New(m.s) lockpb.RegisterLockServer(m.grpcServer, v3lock.NewLockServer(m.serverClient)) epb.RegisterElectionServer(m.grpcServer, v3election.NewElectionServer(m.serverClient)) @@ -993,6 +998,10 @@ func (m *member) Launch() error { return nil } +func (m *member) RecordedRequests() []v3rpc.RequestInfo { + return m.grpcServerRecorder.RecordedRequests() +} + func (m *member) WaitOK(t testutil.TB) { m.WaitStarted(t) for m.s.Leader() == 0 { @@ -1282,8 +1291,9 @@ func (p SortableMemberSliceByPeerURLs) Swap(i, j int) { p[i], p[j] = p[j], p[i] type ClusterV3 struct { *cluster - mu sync.Mutex - clients []*clientv3.Client + mu sync.Mutex + clients []*clientv3.Client + clusterClient *clientv3.Client } // NewClusterV3 returns a launched cluster with a grpc client connection @@ -1329,6 +1339,11 @@ func (c *ClusterV3) Terminate(t testutil.TB) { t.Error(err) } } + if c.clusterClient != nil { + if err := c.clusterClient.Close(); err != nil { + t.Error(err) + } + } c.mu.Unlock() c.cluster.Terminate(t) } @@ -1341,6 +1356,28 @@ func (c *ClusterV3) Client(i int) *clientv3.Client { return c.clients[i] } +func (c *ClusterV3) ClusterClient() (client *clientv3.Client, err error) { + if c.clusterClient == nil { + endpoints := []string{} + for _, m := range c.Members { + endpoints = append(endpoints, m.grpcAddr) + } + mcfg := c.memberConfig() + cfg := clientv3.Config{ + Endpoints: endpoints, + DialTimeout: 5 * time.Second, + DialOptions: []grpc.DialOption{grpc.WithBlock()}, + MaxCallSendMsgSize: mcfg.clientMaxCallSendMsgSize, + MaxCallRecvMsgSize: mcfg.clientMaxCallRecvMsgSize, + } + c.clusterClient, err = newClientV3(cfg) + if err != nil { + return nil, err + } + } + return c.clusterClient, nil +} + // NewClientV3 creates a new grpc client connection to the member func (c *ClusterV3) NewClientV3(memberIndex int) (*clientv3.Client, error) { return NewClientV3(c.Members[memberIndex])