Skip to content

Add memberlist TLS configuration options #4046

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Apr 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,14 @@
* `-alertmanager.cluster.advertise-address` instead of `-cluster.advertise-address`
* `-alertmanager.cluster.peers` instead of `-cluster.peer`
* `-alertmanager.cluster.peer-timeout` instead of `-cluster.peer-timeout`
* [FEATURE] Memberlist: add TLS configuration options for the memberlist transport layer used by the gossip KV store. #4046
* New flags added for memberlist communication:
* `-memberlist.tls-enabled`
* `-memberlist.tls-cert-path`
* `-memberlist.tls-key-path`
* `-memberlist.tls-ca-path`
* `-memberlist.tls-server-name`
* `-memberlist.tls-insecure-skip-verify`
* [FEATURE] Ruler: added `local` backend support to the ruler storage configuration under the `-ruler-storage.` flag prefix. #3932
* [ENHANCEMENT] Upgraded Docker base images to `alpine:3.13`. #4042
* [ENHANCEMENT] Blocks storage: reduce ingester memory by eliminating series reference cache. #3951
Expand Down
27 changes: 27 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -3794,6 +3794,33 @@ The `memberlist_config` configures the Gossip memberlist.
# Timeout for writing 'packet' data.
# CLI flag: -memberlist.packet-write-timeout
[packet_write_timeout: <duration> | default = 5s]

# Enable TLS on the memberlist transport layer.
# CLI flag: -memberlist.tls-enabled
[tls_enabled: <boolean> | default = false]

# Path to the client certificate file, which will be used for authenticating
# with the server. Also requires the key path to be configured.
# CLI flag: -memberlist.tls-cert-path
[tls_cert_path: <string> | default = ""]

# Path to the key file for the client certificate. Also requires the client
# certificate to be configured.
# CLI flag: -memberlist.tls-key-path
[tls_key_path: <string> | default = ""]

# Path to the CA certificates file to validate server certificate against. If
# not set, the host's root CA certificates are used.
# CLI flag: -memberlist.tls-ca-path
[tls_ca_path: <string> | default = ""]

# Override the expected name on the server certificate.
# CLI flag: -memberlist.tls-server-name
[tls_server_name: <string> | default = ""]

# Skip validating server certificate.
# CLI flag: -memberlist.tls-insecure-skip-verify
[tls_insecure_skip_verify: <boolean> | default = false]
```

### `limits_config`
Expand Down
58 changes: 52 additions & 6 deletions integration/integration_memberlist_single_binary_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,33 @@
package integration

import (
"crypto/x509"
"crypto/x509/pkix"
"os"
"path/filepath"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/cortexproject/cortex/integration/ca"
"github.com/cortexproject/cortex/integration/e2e"
e2edb "github.com/cortexproject/cortex/integration/e2e/db"
"github.com/cortexproject/cortex/integration/e2ecortex"
"github.com/cortexproject/cortex/pkg/util"
)

func TestSingleBinaryWithMemberlist(t *testing.T) {
t.Run("default", func(t *testing.T) {
testSingleBinaryEnv(t, false)
})

t.Run("tls", func(t *testing.T) {
testSingleBinaryEnv(t, true)
})
}

func testSingleBinaryEnv(t *testing.T, tlsEnabled bool) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()
Expand All @@ -25,10 +40,37 @@ func TestSingleBinaryWithMemberlist(t *testing.T) {
require.NoError(t, s.StartAndWaitReady(dynamo))

require.NoError(t, writeFileToSharedDir(s, cortexSchemaConfigFile, []byte(cortexSchemaConfigYaml)))

cortex1 := newSingleBinary("cortex-1", "")
cortex2 := newSingleBinary("cortex-2", networkName+"-cortex-1:8000")
cortex3 := newSingleBinary("cortex-3", networkName+"-cortex-1:8000")
var cortex1, cortex2, cortex3 *e2ecortex.CortexService
if tlsEnabled {
var (
memberlistDNS = "cortex-memberlist"
)
// set the ca
cert := ca.New("single-binary-memberlist")

// Ensure the entire path of directories exist.
require.NoError(t, os.MkdirAll(filepath.Join(s.SharedDir(), "certs"), os.ModePerm))
require.NoError(t, cert.WriteCACertificate(filepath.Join(s.SharedDir(), caCertFile)))
require.NoError(t, cert.WriteCertificate(
&x509.Certificate{
Subject: pkix.Name{CommonName: "memberlist"},
DNSNames: []string{
memberlistDNS,
},
ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageAny},
},
filepath.Join(s.SharedDir(), clientCertFile),
filepath.Join(s.SharedDir(), clientKeyFile),
))

cortex1 = newSingleBinary("cortex-1", memberlistDNS, "")
cortex2 = newSingleBinary("cortex-2", memberlistDNS, networkName+"-cortex-1:8000")
cortex3 = newSingleBinary("cortex-3", memberlistDNS, networkName+"-cortex-1:8000")
} else {
cortex1 = newSingleBinary("cortex-1", "", "")
cortex2 = newSingleBinary("cortex-2", "", networkName+"-cortex-1:8000")
cortex3 = newSingleBinary("cortex-3", "", networkName+"-cortex-1:8000")
}

// start cortex-1 first, as cortex-2 and cortex-3 both connect to cortex-1
require.NoError(t, s.StartAndWaitReady(cortex1))
Expand Down Expand Up @@ -57,7 +99,7 @@ func TestSingleBinaryWithMemberlist(t *testing.T) {
require.NoError(t, s.Stop(cortex3))
}

func newSingleBinary(name string, join string) *e2ecortex.CortexService {
func newSingleBinary(name string, servername string, join string) *e2ecortex.CortexService {
flags := map[string]string{
"-ingester.final-sleep": "0s",
"-ingester.join-after": "0s", // join quickly
Expand All @@ -77,7 +119,11 @@ func newSingleBinary(name string, join string) *e2ecortex.CortexService {

serv := e2ecortex.NewSingleBinary(
name,
mergeFlags(ChunksStorageFlags(), flags),
mergeFlags(
ChunksStorageFlags(),
flags,
getTLSFlagsWithPrefix("memberlist", servername, servername == ""),
),
"",
8000,
)
Expand Down
55 changes: 45 additions & 10 deletions pkg/ring/kv/memberlist/tcp_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package memberlist
import (
"bytes"
"crypto/md5"
"crypto/tls"
"flag"
"fmt"
"io"
Expand All @@ -15,10 +16,12 @@ import (
"github.com/go-kit/kit/log/level"
"github.com/hashicorp/go-sockaddr"
"github.com/hashicorp/memberlist"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/atomic"

"github.com/cortexproject/cortex/pkg/util/flagext"
tlsutil "github.com/cortexproject/cortex/pkg/util/tls"
)

type messageType uint8
Expand Down Expand Up @@ -56,6 +59,9 @@ type TCPTransportConfig struct {
// Where to put custom metrics. nil = don't register.
MetricsRegisterer prometheus.Registerer `yaml:"-"`
MetricsNamespace string `yaml:"-"`

TLSEnabled bool `yaml:"tls_enabled"`
TLS tlsutil.ClientConfig `yaml:",inline"`
}

// RegisterFlags registers flags.
Expand All @@ -66,6 +72,9 @@ func (cfg *TCPTransportConfig) RegisterFlags(f *flag.FlagSet, prefix string) {
f.DurationVar(&cfg.PacketDialTimeout, prefix+"memberlist.packet-dial-timeout", 5*time.Second, "Timeout used when connecting to other nodes to send packet.")
f.DurationVar(&cfg.PacketWriteTimeout, prefix+"memberlist.packet-write-timeout", 5*time.Second, "Timeout for writing 'packet' data.")
f.BoolVar(&cfg.TransportDebug, prefix+"memberlist.transport-debug", false, "Log debug transport messages. Note: global log.level must be at debug level as well.")

f.BoolVar(&cfg.TLSEnabled, prefix+"memberlist.tls-enabled", false, "Enable TLS on the memberlist transport layer.")
cfg.TLS.RegisterFlagsWithPrefix(prefix+"memberlist", f)
}

// TCPTransport is a memberlist.Transport implementation that uses TCP for both packet and stream
Expand All @@ -77,7 +86,8 @@ type TCPTransport struct {
packetCh chan *memberlist.Packet
connCh chan net.Conn
wg sync.WaitGroup
tcpListeners []*net.TCPListener
tcpListeners []net.Listener
tlsConfig *tls.Config

shutdown atomic.Int32

Expand Down Expand Up @@ -114,6 +124,14 @@ func NewTCPTransport(config TCPTransportConfig, logger log.Logger) (*TCPTranspor
connCh: make(chan net.Conn),
}

var err error
if config.TLSEnabled {
t.tlsConfig, err = config.TLS.GetTLSConfig()
if err != nil {
return nil, errors.Wrap(err, "unable to create TLS config")
}
}

t.registerMetrics()

// Clean up listeners if there's an error.
Expand All @@ -129,10 +147,20 @@ func NewTCPTransport(config TCPTransportConfig, logger log.Logger) (*TCPTranspor
ip := net.ParseIP(addr)

tcpAddr := &net.TCPAddr{IP: ip, Port: port}
tcpLn, err := net.ListenTCP("tcp", tcpAddr)
if err != nil {
return nil, fmt.Errorf("failed to start TCP listener on %q port %d: %v", addr, port, err)

var tcpLn net.Listener
if config.TLSEnabled {
tcpLn, err = tls.Listen("tcp", tcpAddr.String(), t.tlsConfig)
if err != nil {
return nil, errors.Wrapf(err, "failed to start TLS TCP listener on %q port %d", addr, port)
}
} else {
tcpLn, err = net.ListenTCP("tcp", tcpAddr)
if err != nil {
return nil, errors.Wrapf(err, "failed to start TCP listener on %q port %d", addr, port)
}
}

t.tcpListeners = append(t.tcpListeners, tcpLn)

// If the config port given was zero, use the first TCP listener
Expand All @@ -157,7 +185,7 @@ func NewTCPTransport(config TCPTransportConfig, logger log.Logger) (*TCPTranspor
// and spawns new go routine to handle each connection. This transport uses TCP connections
// for both packet sending and streams.
// (copied from Memberlist net_transport.go)
func (t *TCPTransport) tcpListen(tcpLn *net.TCPListener) {
func (t *TCPTransport) tcpListen(tcpLn net.Listener) {
defer t.wg.Done()

// baseDelay is the initial delay after an AcceptTCP() error before attempting again
Expand All @@ -170,7 +198,7 @@ func (t *TCPTransport) tcpListen(tcpLn *net.TCPListener) {

var loopDelay time.Duration
for {
conn, err := tcpLn.AcceptTCP()
conn, err := tcpLn.Accept()
if err != nil {
if s := t.shutdown.Load(); s == 1 {
break
Expand Down Expand Up @@ -206,7 +234,7 @@ func (t *TCPTransport) debugLog() log.Logger {
return noopLogger
}

func (t *TCPTransport) handleConnection(conn *net.TCPConn) {
func (t *TCPTransport) handleConnection(conn net.Conn) {
t.debugLog().Log("msg", "TCPTransport: New connection", "addr", conn.RemoteAddr())

closeConn := true
Expand Down Expand Up @@ -300,6 +328,13 @@ func (a addr) String() string {
return string(a)
}

func (t *TCPTransport) getConnection(addr string, timeout time.Duration) (net.Conn, error) {
if t.cfg.TLSEnabled {
return tls.DialWithDialer(&net.Dialer{Timeout: timeout}, "tcp", addr, t.tlsConfig)
}
return net.DialTimeout("tcp", addr, timeout)
}

// GetAutoBindPort returns the bind port that was automatically given by the
// kernel, if a bind port of 0 was given.
func (t *TCPTransport) GetAutoBindPort() int {
Expand Down Expand Up @@ -396,9 +431,9 @@ func (t *TCPTransport) WriteTo(b []byte, addr string) (time.Time, error) {

func (t *TCPTransport) writeTo(b []byte, addr string) error {
// Open connection, write packet header and data, data hash, close. Simple.
c, err := net.DialTimeout("tcp", addr, t.cfg.PacketDialTimeout)
c, err := t.getConnection(addr, t.cfg.PacketDialTimeout)
if err != nil {
return nil
return err
}

closed := false
Expand Down Expand Up @@ -476,8 +511,8 @@ func (t *TCPTransport) PacketCh() <-chan *memberlist.Packet {
// two-way communication with a peer.
func (t *TCPTransport) DialTimeout(addr string, timeout time.Duration) (net.Conn, error) {
t.outgoingStreams.Inc()
c, err := t.getConnection(addr, timeout)

c, err := net.DialTimeout("tcp", addr, timeout)
if err != nil {
t.outgoingStreamErrors.Inc()
return nil, err
Expand Down