From 5c95ef3e3121ce1f31680b2c4d7e828f2b91bc24 Mon Sep 17 00:00:00 2001 From: Chao Chen Date: Tue, 21 Nov 2023 21:44:04 -0800 Subject: [PATCH] [3.4] backport etcd-io#13359 Fix http2 authority header in single endpoint scenario Signed-off-by: Chao Chen --- clientv3/client.go | 17 ++- tests/e2e/cluster_proxy_test.go | 4 + tests/e2e/cluster_test.go | 2 + tests/e2e/ctl_v3_grpc_test.go | 214 ++++++++++++++++++++++++++++++++ tests/e2e/etcd_process.go | 16 ++- tests/e2e/etcd_spawn_nocov.go | 27 +++- 6 files changed, 276 insertions(+), 4 deletions(-) create mode 100644 tests/e2e/ctl_v3_grpc_test.go diff --git a/clientv3/client.go b/clientv3/client.go index e16d04be524..3928a197512 100644 --- a/clientv3/client.go +++ b/clientv3/client.go @@ -281,8 +281,7 @@ func (c *Client) dial(creds grpccredentials.TransportCredentials, dopts ...grpc. defer cancel() // TODO: Is this right for cases where grpc.WithBlock() is not set on the dial options? } - initialEndpoints := strings.Join(c.Endpoints(), ";") - target := fmt.Sprintf("%s://%p/#initially=[%s]", resolver.Schema, c, initialEndpoints) + target := fmt.Sprintf("%s://%p/%s", resolver.Schema, c, authority(c.Endpoints()[0])) conn, err := grpc.DialContext(dctx, target, opts...) if err != nil { return nil, err @@ -290,6 +289,20 @@ func (c *Client) dial(creds grpccredentials.TransportCredentials, dopts ...grpc. return conn, nil } +func authority(endpoint string) string { + spl := strings.SplitN(endpoint, "://", 2) + if len(spl) < 2 { + if strings.HasPrefix(endpoint, "unix:") { + return endpoint[len("unix:"):] + } + if strings.HasPrefix(endpoint, "unixs:") { + return endpoint[len("unixs:"):] + } + return endpoint + } + return spl[1] +} + func (c *Client) credentialsForEndpoint(ep string) grpccredentials.TransportCredentials { r := endpoint.RequiresCredentials(ep) switch r { diff --git a/tests/e2e/cluster_proxy_test.go b/tests/e2e/cluster_proxy_test.go index efb05f71e97..4387983484b 100644 --- a/tests/e2e/cluster_proxy_test.go +++ b/tests/e2e/cluster_proxy_test.go @@ -116,6 +116,10 @@ func (p *proxyEtcdProcess) WithStopSignal(sig os.Signal) os.Signal { return p.etcdProc.WithStopSignal(sig) } +func (p *proxyEtcdProcess) Logs() logsExpect { + return p.etcdProc.Logs() +} + type proxyProc struct { execPath string args []string diff --git a/tests/e2e/cluster_test.go b/tests/e2e/cluster_test.go index f13fc88c590..deac4ed1e5e 100644 --- a/tests/e2e/cluster_test.go +++ b/tests/e2e/cluster_test.go @@ -100,6 +100,7 @@ type etcdProcessClusterConfig struct { execPath string dataDirPath string keepDataDir bool + envVars map[string]string clusterSize int @@ -291,6 +292,7 @@ func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs() []*etcdServerPro etcdCfgs[i] = &etcdServerProcessConfig{ execPath: cfg.execPath, args: args, + envVars: cfg.envVars, tlsArgs: cfg.tlsArgs(), dataDirPath: dataDirPath, keepDataDir: cfg.keepDataDir, diff --git a/tests/e2e/ctl_v3_grpc_test.go b/tests/e2e/ctl_v3_grpc_test.go new file mode 100644 index 00000000000..e84c9b6bd8e --- /dev/null +++ b/tests/e2e/ctl_v3_grpc_test.go @@ -0,0 +1,214 @@ +// Copyright 2021 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build !cluster_proxy +// +build !cluster_proxy + +package e2e + +import ( + "fmt" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "go.etcd.io/etcd/pkg/testutil" +) + +func TestAuthority(t *testing.T) { + tcs := []struct { + name string + useTLS bool + useInsecureTLS bool + // Pattern used to generate endpoints for client. Fields filled + // %d - will be filled with member grpc port + clientURLPattern string + + // Pattern used to validate authority received by server. Fields filled: + // %d - will be filled with first member grpc port + expectAuthorityPattern string + }{ + { + name: "http://domain[:port]", + clientURLPattern: "http://localhost:%d", + expectAuthorityPattern: "localhost:%d", + }, + { + name: "http://address[:port]", + clientURLPattern: "http://127.0.0.1:%d", + expectAuthorityPattern: "127.0.0.1:%d", + }, + { + name: "https://domain[:port] insecure", + useTLS: true, + useInsecureTLS: true, + clientURLPattern: "https://localhost:%d", + expectAuthorityPattern: "localhost:%d", + }, + { + name: "https://address[:port] insecure", + useTLS: true, + useInsecureTLS: true, + clientURLPattern: "https://127.0.0.1:%d", + expectAuthorityPattern: "127.0.0.1:%d", + }, + { + name: "https://domain[:port]", + useTLS: true, + clientURLPattern: "https://localhost:%d", + expectAuthorityPattern: "localhost:%d", + }, + { + name: "https://address[:port]", + useTLS: true, + clientURLPattern: "https://127.0.0.1:%d", + expectAuthorityPattern: "127.0.0.1:%d", + }, + } + for _, tc := range tcs { + for _, clusterSize := range []int{1, 3} { + t.Run(fmt.Sprintf("Size: %d, Scenario: %q", clusterSize, tc.name), func(t *testing.T) { + defer testutil.AfterTest(t) + + cfg := configNoTLS + cfg.clusterSize = clusterSize + if tc.useTLS { + cfg.clientTLS = clientTLS + } + cfg.isClientAutoTLS = tc.useInsecureTLS + // Enable debug mode to get logs with http2 headers (including authority) + cfg.envVars = map[string]string{"GODEBUG": "http2debug=2"} + + epc, err := newEtcdProcessCluster(&cfg) + if err != nil { + t.Fatalf("could not start etcd process cluster (%v)", err) + } + defer epc.Close() + endpoints := templateEndpoints(t, tc.clientURLPattern, epc) + + client := clusterEtcdctlV3(&cfg, endpoints) + err = client.Put("foo", "bar") + if err != nil { + t.Fatal(err) + } + + executeWithTimeout(t, 5*time.Second, func() { + assertAuthority(t, fmt.Sprintf(tc.expectAuthorityPattern, 20000), epc) + }) + }) + + } + } +} + +func templateEndpoints(t *testing.T, pattern string, clus *etcdProcessCluster) []string { + t.Helper() + endpoints := []string{} + for i := 0; i < clus.cfg.clusterSize; i++ { + ent := pattern + if strings.Contains(ent, "%d") { + ent = fmt.Sprintf(ent, etcdProcessBasePort+i*5) + } + if strings.Contains(ent, "%") { + t.Fatalf("Failed to template pattern, %% symbol left %q", ent) + } + endpoints = append(endpoints, ent) + } + return endpoints +} + +func assertAuthority(t *testing.T, expectAurhority string, clus *etcdProcessCluster) { + logs := []logsExpect{} + for _, proc := range clus.procs { + logs = append(logs, proc.Logs()) + } + line := firstMatch(t, `http2: decoded hpack field header field ":authority"`, logs...) + line = strings.TrimSuffix(line, "\n") + line = strings.TrimSuffix(line, "\r") + expectLine := fmt.Sprintf(`http2: decoded hpack field header field ":authority" = %q`, expectAurhority) + assert.True(t, strings.HasSuffix(line, expectLine), fmt.Sprintf("Got %q expected suffix %q", line, expectLine)) +} + +func firstMatch(t *testing.T, expectLine string, logs ...logsExpect) string { + t.Helper() + match := make(chan string, len(logs)) + for i := range logs { + go func(l logsExpect) { + line, _ := l.Expect(expectLine) + match <- line + }(logs[i]) + } + return <-match +} + +func executeWithTimeout(t *testing.T, timeout time.Duration, f func()) { + donec := make(chan struct{}) + go func() { + defer close(donec) + f() + }() + + select { + case <-time.After(timeout): + testutil.FatalStack(t, fmt.Sprintf("test timed out after %v", timeout)) + case <-donec: + } +} + +type etcdctlV3 struct { + cfg *etcdProcessClusterConfig + endpoints []string +} + +func clusterEtcdctlV3(cfg *etcdProcessClusterConfig, endpoints []string) *etcdctlV3 { + return &etcdctlV3{ + cfg: cfg, + endpoints: endpoints, + } +} + +func (ctl *etcdctlV3) Put(key, value string) error { + return ctl.runCmd("put", key, value) +} + +func (ctl *etcdctlV3) runCmd(args ...string) error { + cmdArgs := []string{ctlBinPath + "3"} + for k, v := range ctl.flags() { + cmdArgs = append(cmdArgs, fmt.Sprintf("--%s=%s", k, v)) + } + cmdArgs = append(cmdArgs, args...) + return spawnWithExpect(cmdArgs, "OK") +} + +func (ctl *etcdctlV3) flags() map[string]string { + fmap := make(map[string]string) + if ctl.cfg.clientTLS == clientTLS { + if ctl.cfg.isClientAutoTLS { + fmap["insecure-transport"] = "false" + fmap["insecure-skip-tls-verify"] = "true" + } else if ctl.cfg.isClientCRL { + fmap["cacert"] = caPath + fmap["cert"] = revokedCertPath + fmap["key"] = revokedPrivateKeyPath + } else { + fmap["cacert"] = caPath + fmap["cert"] = certPath + fmap["key"] = privateKeyPath + } + } + fmap["endpoints"] = strings.Join(ctl.endpoints, ",") + return fmap +} diff --git a/tests/e2e/etcd_process.go b/tests/e2e/etcd_process.go index bdbcc209246..ef46ff97265 100644 --- a/tests/e2e/etcd_process.go +++ b/tests/e2e/etcd_process.go @@ -43,6 +43,12 @@ type etcdProcess interface { Close() error WithStopSignal(sig os.Signal) os.Signal Config() *etcdServerProcessConfig + + Logs() logsExpect +} + +type logsExpect interface { + Expect(string) (string, error) } type etcdServerProcess struct { @@ -54,6 +60,7 @@ type etcdServerProcess struct { type etcdServerProcessConfig struct { execPath string args []string + envVars map[string]string tlsArgs []string dataDirPath string @@ -98,7 +105,7 @@ func (ep *etcdServerProcess) Start() error { if ep.proc != nil { panic("already started") } - proc, err := spawnCmd(append([]string{ep.cfg.execPath}, ep.cfg.args...)) + proc, err := spawnCmdWithEnv(append([]string{ep.cfg.execPath}, ep.cfg.args...), ep.cfg.envVars) if err != nil { return err } @@ -153,3 +160,10 @@ func (ep *etcdServerProcess) waitReady() error { } func (ep *etcdServerProcess) Config() *etcdServerProcessConfig { return ep.cfg } + +func (ep *etcdServerProcess) Logs() logsExpect { + if ep.proc == nil { + panic("please grab logs before process is stopped") + } + return ep.proc +} diff --git a/tests/e2e/etcd_spawn_nocov.go b/tests/e2e/etcd_spawn_nocov.go index a5760078af0..b21636edfc9 100644 --- a/tests/e2e/etcd_spawn_nocov.go +++ b/tests/e2e/etcd_spawn_nocov.go @@ -18,6 +18,7 @@ package e2e import ( + "fmt" "os" "strings" @@ -27,7 +28,11 @@ import ( const noOutputLineCount = 0 // regular binaries emit no extra lines func spawnCmd(args []string) (*expect.ExpectProcess, error) { - env := os.Environ() + return spawnCmdWithEnv(args, nil) +} + +func spawnCmdWithEnv(args []string, envVars map[string]string) (*expect.ExpectProcess, error) { + env := mergeEnvVariables(envVars) switch { case strings.HasSuffix(args[0], ctlBinPath+"2"): env = append(env, "ETCDCTL_API=2") @@ -38,3 +43,23 @@ func spawnCmd(args []string) (*expect.ExpectProcess, error) { } return expect.NewExpectWithEnv(args[0], args[1:], env) } + +func mergeEnvVariables(envVars map[string]string) []string { + var env []string + // Environment variables are passed as parameter have higher priority + // than os environment variables. + for k, v := range envVars { + env = append(env, fmt.Sprintf("%s=%s", k, v)) + } + + // Now, we can set os environment variables not passed as parameter. + currVars := os.Environ() + for _, v := range currVars { + p := strings.Split(v, "=") + if _, ok := envVars[p[0]]; !ok { + env = append(env, fmt.Sprintf("%s=%s", p[0], p[1])) + } + } + + return env +}