Skip to content

Commit

Permalink
client: Add grpc authority tests
Browse files Browse the repository at this point in the history
  • Loading branch information
serathius committed Sep 17, 2021
1 parent c2937d7 commit 8a74d7d
Show file tree
Hide file tree
Showing 11 changed files with 322 additions and 40 deletions.
2 changes: 1 addition & 1 deletion server/embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,7 @@ func (e *Etcd) servePeers() (err error) {

for _, p := range e.Peers {
u := p.Listener.Addr().String()
gs := v3rpc.Server(e.Server, peerTLScfg)
gs := v3rpc.Server(e.Server, peerTLScfg, nil)
m := cmux.New(p.Listener)
go gs.Serve(m.Match(cmux.HTTP2()))
srv := &http.Server{
Expand Down
4 changes: 2 additions & 2 deletions server/embed/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (sctx *serveCtx) serve(
}()

if sctx.insecure {
gs = v3rpc.Server(s, nil, gopts...)
gs = v3rpc.Server(s, nil, nil, gopts...)
v3electionpb.RegisterElectionServer(gs, servElection)
v3lockpb.RegisterLockServer(gs, servLock)
if sctx.serviceRegister != nil {
Expand Down Expand Up @@ -148,7 +148,7 @@ func (sctx *serveCtx) serve(
if tlsErr != nil {
return tlsErr
}
gs = v3rpc.Server(s, tlscfg, gopts...)
gs = v3rpc.Server(s, tlscfg, nil, gopts...)
v3electionpb.RegisterElectionServer(gs, servElection)
v3lockpb.RegisterLockServer(gs, servLock)
if sctx.serviceRegister != nil {
Expand Down
6 changes: 4 additions & 2 deletions server/etcdserver/api/v3rpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,21 @@ const (
maxSendBytes = math.MaxInt32
)

func Server(s *etcdserver.EtcdServer, tls *tls.Config, gopts ...grpc.ServerOption) *grpc.Server {
func Server(s *etcdserver.EtcdServer, tls *tls.Config, interceptor grpc.UnaryServerInterceptor, gopts ...grpc.ServerOption) *grpc.Server {
var opts []grpc.ServerOption
opts = append(opts, grpc.CustomCodec(&codec{}))
if tls != nil {
bundle := credentials.NewBundle(credentials.Config{TLSConfig: tls})
opts = append(opts, grpc.Creds(bundle.TransportCredentials()))
}

chainUnaryInterceptors := []grpc.UnaryServerInterceptor{
newLogUnaryInterceptor(s),
newUnaryInterceptor(s),
grpc_prometheus.UnaryServerInterceptor,
}
if interceptor != nil {
chainUnaryInterceptors = append(chainUnaryInterceptors, interceptor)
}

chainStreamInterceptors := []grpc.StreamServerInterceptor{
newStreamInterceptor(s),
Expand Down
48 changes: 47 additions & 1 deletion server/etcdserver/api/v3rpc/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,59 @@ func newLogUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerIntercepto
startTime := time.Now()
resp, err := handler(ctx, req)
lg := s.Logger()
if lg != nil { // acquire stats if debug level is enabled or request is expensive
if lg != nil { // acquire stats if debug level is enabled or RequestInfo is expensive
defer logUnaryRequestStats(ctx, lg, s.Cfg.WarningUnaryRequestDuration, info, startTime, req, resp)
}
return resp, err
}
}

type GrpcRecorder struct {
mux sync.RWMutex
requests []RequestInfo
}

type RequestInfo struct {
FullMethod string
Authority string
}

func (ri *GrpcRecorder) UnaryInterceptor() grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
ri.record(recordedRequest(ctx, info))
resp, err := handler(ctx, req)
return resp, err
}
}

func (ri *GrpcRecorder) RecordedRequests() []RequestInfo {
ri.mux.RLock()
defer ri.mux.RUnlock()
reqs := make([]RequestInfo, len(ri.requests))
copy(reqs, ri.requests)
return reqs
}

func recordedRequest(ctx context.Context, info *grpc.UnaryServerInfo) RequestInfo {
req := RequestInfo{
FullMethod: info.FullMethod,
}
md, ok := metadata.FromIncomingContext(ctx)
if ok {
as := md.Get(":authority")
if len(as) != 0 {
req.Authority = as[0]
}
}
return req
}

func (ri *GrpcRecorder) record(r RequestInfo) {
ri.mux.Lock()
defer ri.mux.Unlock()
ri.requests = append(ri.requests, r)
}

func logUnaryRequestStats(ctx context.Context, lg *zap.Logger, warnLatency time.Duration, info *grpc.UnaryServerInfo, startTime time.Time, req interface{}, resp interface{}) {
duration := time.Since(startTime)
var enabledDebugLevel, expensiveRequest bool
Expand Down
3 changes: 2 additions & 1 deletion tests/e2e/cluster_proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ type proxyProc struct {
ep string
murl string
donec chan struct{}
env []string

proc *expect.ExpectProcess
}
Expand All @@ -132,7 +133,7 @@ func (pp *proxyProc) start() error {
if pp.proc != nil {
panic("already started")
}
proc, err := spawnCmdWithLogger(pp.lg, append([]string{pp.execPath}, pp.args...))
proc, err := spawnCmdWithLogger(pp.lg, append([]string{pp.execPath}, pp.args...), pp.env)
if err != nil {
return err
}
Expand Down
2 changes: 2 additions & 0 deletions tests/e2e/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ type etcdProcessClusterConfig struct {
v2deprecation string

rollingStart bool
env []string
}

// newEtcdProcessCluster launches a new cluster from etcd processes, returning
Expand Down Expand Up @@ -326,6 +327,7 @@ func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs(tb testing.TB) []*
acurl: curl,
murl: murl,
initialToken: cfg.initialToken,
env: cfg.env,
}
}

Expand Down
114 changes: 114 additions & 0 deletions tests/e2e/ctl_v3_grpc_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package e2e

import (
"fmt"
"strings"
"testing"
"time"

"github.com/stretchr/testify/assert"
"go.etcd.io/etcd/client/pkg/v3/testutil"
)

func TestCtlV3AuthoritySingleEndpoint(t *testing.T) {
BeforeTest(t)

cfg := newConfigAutoTLS()
cfg.clusterSize = 1
// Enable debug mode to get logs with http2 headers (including authority)
cfg.env = []string{"GODEBUG=http2debug=2"}

epc, err := newEtcdProcessCluster(t, cfg)
if err != nil {
t.Fatalf("could not start etcd process cluster (%v)", err)
}
err = put(t, cfg, epc, "foo", "bar")
if err != nil {
t.Error(err)
}

executeWithTimeout(t, 5*time.Second, func() {
assertAuthority(t, "#initially=[http://localhost:20000]", epc.procs[0].Logs())
})
assert.NoError(t, epc.Close())
}

func TestCtlV3AuthorityMultipleEndpoints(t *testing.T) {
BeforeTest(t)

cfg := newConfigAutoTLS()
cfg.clusterSize = 3
// Enable debug mode to get logs with http2 headers (including authority)
cfg.env = []string{"GODEBUG=http2debug=2"}

epc, err := newEtcdProcessCluster(t, cfg)
if err != nil {
t.Fatalf("could not start etcd process cluster (%v)", err)
}
err = put(t, cfg, epc, "foo", "bar")
if err != nil {
t.Error(err)
}

executeWithTimeout(t, 10*time.Second, func() {
assertAuthority(t, "#initially=[http://localhost:20000;http://localhost:20005;http://localhost:20010]", epc.procs[0].Logs(), epc.procs[1].Logs(), epc.procs[2].Logs())
})
assert.NoError(t, epc.Close())
}

func assertAuthority(t *testing.T, expectAurhority string, logs ...logsExpect) {
line := firstMatch(t, `http2: decoded hpack field header field ":authority"`, logs...)
line = strings.TrimSuffix(line, "\n")
line = strings.TrimSuffix(line, "\r")
expectLine := fmt.Sprintf(`http2: decoded hpack field header field ":authority" = %q`, expectAurhority)
assert.True(t, strings.HasSuffix(line, expectLine), fmt.Sprintf("Got %q expected suffix %q", line, expectLine))
}

func firstMatch(t *testing.T, expectLine string, logs ...logsExpect) string {
t.Helper()
match := make(chan string, len(logs))
for i := range logs {
go func(l logsExpect) {
line, _ := l.Expect(expectLine)
match <- line
}(logs[i])
}
return <-match
}

func executeWithTimeout(t *testing.T, timeout time.Duration, f func()) {
donec := make(chan struct{})
go func() {
defer close(donec)
f()
}()

select {
case <-time.After(timeout):
testutil.FatalStack(t, fmt.Sprintf("test timed out after %v", timeout))
case <-donec:
}
}

func put(t *testing.T, cfg *etcdProcessClusterConfig, epc *etcdProcessCluster, key, value string) error {
ctx := ctlCtx{
t: t,
cfg: *cfg,
epc: epc,
}
return ctlV3Put(ctx, key, value, "")
}
15 changes: 14 additions & 1 deletion tests/e2e/etcd_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ type etcdProcess interface {
Close() error
WithStopSignal(sig os.Signal) os.Signal
Config() *etcdServerProcessConfig
Logs() logsExpect
}

type logsExpect interface {
Expect(string) (string, error)
}

type etcdServerProcess struct {
Expand All @@ -56,6 +61,7 @@ type etcdServerProcessConfig struct {
execPath string
args []string
tlsArgs []string
env []string

dataDirPath string
keepDataDir bool
Expand Down Expand Up @@ -92,7 +98,7 @@ func (ep *etcdServerProcess) Start() error {
panic("already started")
}
ep.cfg.lg.Info("starting server...", zap.String("name", ep.cfg.name))
proc, err := spawnCmdWithLogger(ep.cfg.lg, append([]string{ep.cfg.execPath}, ep.cfg.args...))
proc, err := spawnCmdWithLogger(ep.cfg.lg, append([]string{ep.cfg.execPath}, ep.cfg.args...), ep.cfg.env)
if err != nil {
return err
}
Expand Down Expand Up @@ -163,3 +169,10 @@ func (ep *etcdServerProcess) waitReady() error {
}

func (ep *etcdServerProcess) Config() *etcdServerProcessConfig { return ep.cfg }

func (ep *etcdServerProcess) Logs() logsExpect {
if ep.proc == nil {
ep.cfg.lg.Panic("Please grap logs before process is stopped")
}
return ep.proc
}
13 changes: 7 additions & 6 deletions tests/e2e/etcd_spawn_nocov.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,20 @@ import (
const noOutputLineCount = 0 // regular binaries emit no extra lines

func spawnCmd(args []string) (*expect.ExpectProcess, error) {
return spawnCmdWithLogger(zap.NewNop(), args)
return spawnCmdWithLogger(zap.NewNop(), args, nil)
}

func spawnCmdWithLogger(lg *zap.Logger, args []string) (*expect.ExpectProcess, error) {
func spawnCmdWithLogger(lg *zap.Logger, args []string, env []string) (*expect.ExpectProcess, error) {
wd, err := os.Getwd()
if err != nil {
return nil, err
}
if strings.HasSuffix(args[0], "/etcdctl3") {
env := append(os.Environ(), "ETCDCTL_API=3")
lg.Info("spawning process with ETCDCTL_API=3", zap.Strings("args", args), zap.String("working-dir", wd))
env = append(env, os.Environ()...)
env = append(env, "ETCDCTL_API=3")
lg.Info("spawning process with ETCDCTL_API=3", zap.Strings("args", args), zap.String("working-dir", wd), zap.Strings("env", env))
return expect.NewExpectWithEnv(ctlBinPath, args[1:], env)
}
lg.Info("spawning process", zap.Strings("args", args), zap.String("working-dir", wd))
return expect.NewExpect(args[0], args[1:]...)
lg.Info("spawning process", zap.Strings("args", args), zap.String("working-dir", wd), zap.Strings("env", env))
return expect.NewExpectWithEnv(args[0], args[1:], env)
}
66 changes: 66 additions & 0 deletions tests/integration/clientv3/grpc_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package clientv3test

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
"go.etcd.io/etcd/server/v3/etcdserver/api/v3rpc"
"go.etcd.io/etcd/tests/v3/integration"
)

func TestAuthoritySingleEndpoint(t *testing.T) {
integration.BeforeTest(t)

clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer clus.Terminate(t)

kv := clus.Client(0)

ctx := context.TODO()
_, err := kv.Put(ctx, "foo", "bar")
if err != nil {
t.Fatal(err)
}

reqs := clus.Members[0].RecordedRequests()
assert.ElementsMatch(t, []v3rpc.RequestInfo{{FullMethod: "/etcdserverpb.KV/Put", Authority: "#initially=[unix://localhost:m00]"}}, reqs)
}

func TestAuthorityMultipleEndpoints(t *testing.T) {
integration.BeforeTest(t)

clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
defer clus.Terminate(t)

kv, err := clus.ClusterClient()
if err != nil {
t.Fatal(err)
}

ctx := context.TODO()
_, err = kv.Put(ctx, "foo", "bar")
if err != nil {
t.Fatal(err)
}

reqs := []v3rpc.RequestInfo{}
for _, m := range clus.Members {
reqs = append(reqs, m.RecordedRequests()...)
}
assert.ElementsMatch(t, []v3rpc.RequestInfo{{FullMethod: "/etcdserverpb.KV/Put", Authority: "#initially=[unix://localhost:m00;unix://localhost:m10;unix://localhost:m20]"}}, reqs)
}
Loading

0 comments on commit 8a74d7d

Please sign in to comment.