Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add envoy connection balancing. #14616

Merged
merged 13 commits into from
Sep 26, 2022
3 changes: 3 additions & 0 deletions .changelog/14616.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:feature
connect: Add Envoy connection balancing configuration fields.
```
39 changes: 19 additions & 20 deletions agent/configentry/resolve.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func ComputeResolvedServiceConfig(
structs.NewServiceID(args.Name, &args.EnterpriseMeta),
)
if serviceConf != nil {

if serviceConf.Expose.Checks {
thisReply.Expose.Checks = true
}
Expand All @@ -62,12 +63,6 @@ func ComputeResolvedServiceConfig(
if serviceConf.MeshGateway.Mode != structs.MeshGatewayModeDefault {
thisReply.MeshGateway.Mode = serviceConf.MeshGateway.Mode
}
if serviceConf.Protocol != "" {
if thisReply.ProxyConfig == nil {
thisReply.ProxyConfig = make(map[string]interface{})
}
thisReply.ProxyConfig["protocol"] = serviceConf.Protocol
}
if serviceConf.TransparentProxy.OutboundListenerPort != 0 {
thisReply.TransparentProxy.OutboundListenerPort = serviceConf.TransparentProxy.OutboundListenerPort
}
Expand All @@ -81,25 +76,29 @@ func ComputeResolvedServiceConfig(
thisReply.Destination = *serviceConf.Destination
}

// Populate values for the proxy config map
proxyConf := thisReply.ProxyConfig
if proxyConf == nil {
proxyConf = make(map[string]interface{})
}
if serviceConf.Protocol != "" {
proxyConf["protocol"] = serviceConf.Protocol
}
if serviceConf.BalanceInboundConnections != "" {
proxyConf["balance_inbound_connections"] = serviceConf.BalanceInboundConnections
}
if serviceConf.MaxInboundConnections > 0 {
if thisReply.ProxyConfig == nil {
thisReply.ProxyConfig = map[string]interface{}{}
}
thisReply.ProxyConfig["max_inbound_connections"] = serviceConf.MaxInboundConnections
proxyConf["max_inbound_connections"] = serviceConf.MaxInboundConnections
}

if serviceConf.LocalConnectTimeoutMs > 0 {
if thisReply.ProxyConfig == nil {
thisReply.ProxyConfig = map[string]interface{}{}
}
thisReply.ProxyConfig["local_connect_timeout_ms"] = serviceConf.LocalConnectTimeoutMs
proxyConf["local_connect_timeout_ms"] = serviceConf.LocalConnectTimeoutMs
}

if serviceConf.LocalRequestTimeoutMs > 0 {
if thisReply.ProxyConfig == nil {
thisReply.ProxyConfig = map[string]interface{}{}
}
thisReply.ProxyConfig["local_request_timeout_ms"] = serviceConf.LocalRequestTimeoutMs
proxyConf["local_request_timeout_ms"] = serviceConf.LocalRequestTimeoutMs
}
// Add the proxy conf to the response if any fields were populated
if len(proxyConf) > 0 {
thisReply.ProxyConfig = proxyConf
}

thisReply.Meta = serviceConf.Meta
Expand Down
20 changes: 20 additions & 0 deletions agent/configentry/resolve_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,26 @@ func Test_ComputeResolvedServiceConfig(t *testing.T) {
args args
want *structs.ServiceConfigResponse
}{
{
name: "proxy with balanceinboundconnections",
args: args{
scReq: &structs.ServiceConfigRequest{
Name: "sid",
},
entries: &ResolvedServiceConfigSet{
ServiceDefaults: map[structs.ServiceID]*structs.ServiceConfigEntry{
sid: {
BalanceInboundConnections: "exact_balance",
},
},
},
},
want: &structs.ServiceConfigResponse{
ProxyConfig: map[string]interface{}{
"balance_inbound_connections": "exact_balance",
},
},
},
hashi-derek marked this conversation as resolved.
Show resolved Hide resolved
{
name: "proxy with maxinboundsconnections",
args: args{
Expand Down
34 changes: 21 additions & 13 deletions agent/structs/config_entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,19 +98,20 @@ type WarningConfigEntry interface {
// ServiceConfiguration is the top-level struct for the configuration of a service
// across the entire cluster.
type ServiceConfigEntry struct {
Kind string
Name string
Protocol string
Mode ProxyMode `json:",omitempty"`
TransparentProxy TransparentProxyConfig `json:",omitempty" alias:"transparent_proxy"`
MeshGateway MeshGatewayConfig `json:",omitempty" alias:"mesh_gateway"`
Expose ExposeConfig `json:",omitempty"`
ExternalSNI string `json:",omitempty" alias:"external_sni"`
UpstreamConfig *UpstreamConfiguration `json:",omitempty" alias:"upstream_config"`
Destination *DestinationConfig `json:",omitempty"`
MaxInboundConnections int `json:",omitempty" alias:"max_inbound_connections"`
LocalConnectTimeoutMs int `json:",omitempty" alias:"local_connect_timeout_ms"`
LocalRequestTimeoutMs int `json:",omitempty" alias:"local_request_timeout_ms"`
Kind string
Name string
Protocol string
Mode ProxyMode `json:",omitempty"`
TransparentProxy TransparentProxyConfig `json:",omitempty" alias:"transparent_proxy"`
MeshGateway MeshGatewayConfig `json:",omitempty" alias:"mesh_gateway"`
Expose ExposeConfig `json:",omitempty"`
ExternalSNI string `json:",omitempty" alias:"external_sni"`
UpstreamConfig *UpstreamConfiguration `json:",omitempty" alias:"upstream_config"`
Destination *DestinationConfig `json:",omitempty"`
MaxInboundConnections int `json:",omitempty" alias:"max_inbound_connections"`
LocalConnectTimeoutMs int `json:",omitempty" alias:"local_connect_timeout_ms"`
LocalRequestTimeoutMs int `json:",omitempty" alias:"local_request_timeout_ms"`
BalanceInboundConnections string `json:",omitempty" alias:"balance_inbound_connections"`

Meta map[string]string `json:",omitempty"`
acl.EnterpriseMeta `hcl:",squash" mapstructure:",squash"`
Expand Down Expand Up @@ -800,6 +801,10 @@ type UpstreamConfig struct {

// MeshGatewayConfig controls how Mesh Gateways are configured and used
MeshGateway MeshGatewayConfig `json:",omitempty" alias:"mesh_gateway" `

// BalanceOutboundConnections indicates how the proxy should attempt to distribute
// connections across worker threads. Only used by envoy proxies.
BalanceOutboundConnections string `json:",omitempty" alias:"balance_outbound_connections"`
}

func (cfg UpstreamConfig) Clone() UpstreamConfig {
Expand Down Expand Up @@ -848,6 +853,9 @@ func (cfg UpstreamConfig) MergeInto(dst map[string]interface{}) {
if cfg.PassiveHealthCheck != nil {
dst["passive_health_check"] = cfg.PassiveHealthCheck
}
if cfg.BalanceOutboundConnections != "" {
dst["balance_outbound_connections"] = cfg.BalanceOutboundConnections
}
}

func (cfg *UpstreamConfig) NormalizeWithoutName() error {
Expand Down
69 changes: 41 additions & 28 deletions agent/structs/config_entry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,7 @@ func TestDecodeConfigEntry(t *testing.T) {
"moreconfig" {
"moar" = "config"
}
"balance_inbound_connections" = "exact_balance"
}
mesh_gateway {
mode = "remote"
Expand All @@ -358,6 +359,7 @@ func TestDecodeConfigEntry(t *testing.T) {
"moreconfig" {
"moar" = "config"
}
"balance_inbound_connections" = "exact_balance"
}
MeshGateway {
Mode = "remote"
Expand All @@ -376,6 +378,7 @@ func TestDecodeConfigEntry(t *testing.T) {
"moreconfig": map[string]interface{}{
"moar": "config",
},
"balance_inbound_connections": "exact_balance",
},
MeshGateway: MeshGatewayConfig{
Mode: MeshGatewayModeRemote,
Expand Down Expand Up @@ -415,6 +418,7 @@ func TestDecodeConfigEntry(t *testing.T) {
defaults {
connect_timeout_ms = 5
protocol = "http"
balance_outbound_connections = "exact_balance"
envoy_listener_json = "foo"
envoy_cluster_json = "bar"
limits {
Expand Down Expand Up @@ -463,6 +467,7 @@ func TestDecodeConfigEntry(t *testing.T) {
MaxPendingRequests = 4
MaxConcurrentRequests = 5
}
BalanceOutboundConnections = "exact_balance"
}
}
`,
Expand Down Expand Up @@ -502,6 +507,7 @@ func TestDecodeConfigEntry(t *testing.T) {
MaxPendingRequests: intPointer(4),
MaxConcurrentRequests: intPointer(5),
},
BalanceOutboundConnections: "exact_balance",
},
},
},
Expand Down Expand Up @@ -2665,10 +2671,11 @@ func TestUpstreamConfig_MergeInto(t *testing.T) {
{
name: "kitchen sink",
source: UpstreamConfig{
EnvoyListenerJSON: "foo",
EnvoyClusterJSON: "bar",
ConnectTimeoutMs: 5,
Protocol: "http",
BalanceOutboundConnections: "exact_balance",
EnvoyListenerJSON: "foo",
EnvoyClusterJSON: "bar",
ConnectTimeoutMs: 5,
Protocol: "http",
Limits: &UpstreamLimits{
MaxConnections: intPointer(3),
MaxPendingRequests: intPointer(4),
Expand All @@ -2682,10 +2689,11 @@ func TestUpstreamConfig_MergeInto(t *testing.T) {
},
destination: make(map[string]interface{}),
want: map[string]interface{}{
"envoy_listener_json": "foo",
"envoy_cluster_json": "bar",
"connect_timeout_ms": 5,
"protocol": "http",
"balance_outbound_connections": "exact_balance",
"envoy_listener_json": "foo",
"envoy_cluster_json": "bar",
"connect_timeout_ms": 5,
"protocol": "http",
"limits": &UpstreamLimits{
MaxConnections: intPointer(3),
MaxPendingRequests: intPointer(4),
Expand All @@ -2701,10 +2709,11 @@ func TestUpstreamConfig_MergeInto(t *testing.T) {
{
name: "kitchen sink override of destination",
source: UpstreamConfig{
EnvoyListenerJSON: "foo",
EnvoyClusterJSON: "bar",
ConnectTimeoutMs: 5,
Protocol: "http",
BalanceOutboundConnections: "exact_balance",
EnvoyListenerJSON: "foo",
EnvoyClusterJSON: "bar",
ConnectTimeoutMs: 5,
Protocol: "http",
Limits: &UpstreamLimits{
MaxConnections: intPointer(3),
MaxPendingRequests: intPointer(4),
Expand All @@ -2717,10 +2726,11 @@ func TestUpstreamConfig_MergeInto(t *testing.T) {
MeshGateway: MeshGatewayConfig{Mode: MeshGatewayModeRemote},
},
destination: map[string]interface{}{
"envoy_listener_json": "zip",
"envoy_cluster_json": "zap",
"connect_timeout_ms": 10,
"protocol": "grpc",
"balance_outbound_connections": "",
"envoy_listener_json": "zip",
"envoy_cluster_json": "zap",
"connect_timeout_ms": 10,
"protocol": "grpc",
"limits": &UpstreamLimits{
MaxConnections: intPointer(10),
MaxPendingRequests: intPointer(11),
Expand All @@ -2733,10 +2743,11 @@ func TestUpstreamConfig_MergeInto(t *testing.T) {
"mesh_gateway": MeshGatewayConfig{Mode: MeshGatewayModeLocal},
},
want: map[string]interface{}{
"envoy_listener_json": "foo",
"envoy_cluster_json": "bar",
"connect_timeout_ms": 5,
"protocol": "http",
"balance_outbound_connections": "exact_balance",
"envoy_listener_json": "foo",
"envoy_cluster_json": "bar",
"connect_timeout_ms": 5,
"protocol": "http",
"limits": &UpstreamLimits{
MaxConnections: intPointer(3),
MaxPendingRequests: intPointer(4),
Expand All @@ -2753,10 +2764,11 @@ func TestUpstreamConfig_MergeInto(t *testing.T) {
name: "empty source leaves destination intact",
source: UpstreamConfig{},
destination: map[string]interface{}{
"envoy_listener_json": "zip",
"envoy_cluster_json": "zap",
"connect_timeout_ms": 10,
"protocol": "grpc",
"balance_outbound_connections": "exact_balance",
"envoy_listener_json": "zip",
"envoy_cluster_json": "zap",
"connect_timeout_ms": 10,
"protocol": "grpc",
"limits": &UpstreamLimits{
MaxConnections: intPointer(10),
MaxPendingRequests: intPointer(11),
Expand All @@ -2770,10 +2782,11 @@ func TestUpstreamConfig_MergeInto(t *testing.T) {
"mesh_gateway": MeshGatewayConfig{Mode: MeshGatewayModeLocal},
},
want: map[string]interface{}{
"envoy_listener_json": "zip",
"envoy_cluster_json": "zap",
"connect_timeout_ms": 10,
"protocol": "grpc",
"balance_outbound_connections": "exact_balance",
"envoy_listener_json": "zip",
"envoy_cluster_json": "zap",
"connect_timeout_ms": 10,
"protocol": "grpc",
"limits": &UpstreamLimits{
MaxConnections: intPointer(10),
MaxPendingRequests: intPointer(11),
Expand Down
4 changes: 4 additions & 0 deletions agent/xds/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ type ProxyConfig struct {
// MaxInboundConnections is the maximum number of inbound connections to
// the proxy. If not set, the default is 0 (no limit).
MaxInboundConnections int `mapstructure:"max_inbound_connections"`

// BalanceInboundConnections indicates how the proxy should attempt to distribute
// connections across worker threads. Only used by envoy proxies.
BalanceInboundConnections string `json:",omitempty" alias:"balance_inbound_connections"`
}

// ParseProxyConfig returns the ProxyConfig parsed from the an opaque map. If an
Expand Down
11 changes: 11 additions & 0 deletions agent/xds/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,17 @@ func TestParseProxyConfig(t *testing.T) {
Protocol: "tcp",
},
},
{
name: "balance inbound connections override, string",
input: map[string]interface{}{
"balance_inbound_connections": "exact_balance",
},
want: ProxyConfig{
LocalConnectTimeoutMs: 5000,
Protocol: "tcp",
BalanceInboundConnections: "exact_balance",
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down
13 changes: 13 additions & 0 deletions agent/xds/listeners.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ func (s *ResourceGenerator) listenersFromSnapshotConnectProxy(cfgSnap *proxycfg.
}

upstreamListener := makeListener(uid.EnvoyID(), upstreamCfg, envoy_core_v3.TrafficDirection_OUTBOUND)
injectConnectionBalanceConfig(cfg.BalanceOutboundConnections, upstreamListener)
upstreamListener.FilterChains = []*envoy_listener_v3.FilterChain{
filterChain,
}
Expand Down Expand Up @@ -385,6 +386,8 @@ func (s *ResourceGenerator) listenersFromSnapshotConnectProxy(cfgSnap *proxycfg.
}

upstreamListener := makeListener(uid.EnvoyID(), upstreamCfg, envoy_core_v3.TrafficDirection_OUTBOUND)
injectConnectionBalanceConfig(cfg.BalanceOutboundConnections, upstreamListener)

upstreamListener.FilterChains = []*envoy_listener_v3.FilterChain{
filterChain,
}
Expand Down Expand Up @@ -559,6 +562,7 @@ func (s *ResourceGenerator) listenersFromSnapshotConnectProxy(cfgSnap *proxycfg.
}

upstreamListener := makeListener(uid.EnvoyID(), u, envoy_core_v3.TrafficDirection_OUTBOUND)
injectConnectionBalanceConfig(cfg.BalanceOutboundConnections, upstreamListener)

filterChain, err := s.makeUpstreamFilterChain(filterChainOpts{
// TODO (SNI partition) add partition for upstream SNI
Expand Down Expand Up @@ -905,6 +909,14 @@ func makeListenerFromUserConfig(configJSON string) (*envoy_listener_v3.Listener,
return &l, nil
}

func injectConnectionBalanceConfig(balanceType string, listener *envoy_listener_v3.Listener) {
if balanceType == "exact_balance" {
hashi-derek marked this conversation as resolved.
Show resolved Hide resolved
listener.ConnectionBalanceConfig = &envoy_listener_v3.Listener_ConnectionBalanceConfig{
BalanceType: &envoy_listener_v3.Listener_ConnectionBalanceConfig_ExactBalance_{},
}
}
}

// Ensure that the first filter in each filter chain of a public listener is
// the authz filter to prevent unauthorized access.
func (s *ResourceGenerator) injectConnectFilters(cfgSnap *proxycfg.ConfigSnapshot, listener *envoy_listener_v3.Listener) error {
Expand Down Expand Up @@ -1221,6 +1233,7 @@ func (s *ResourceGenerator) makeInboundListener(cfgSnap *proxycfg.ConfigSnapshot
}

l = makePortListener(name, addr, port, envoy_core_v3.TrafficDirection_INBOUND)
injectConnectionBalanceConfig(cfg.BalanceInboundConnections, l)

var tracing *envoy_http_v3.HttpConnectionManager_Tracing
if cfg.ListenerTracingJSON != "" {
Expand Down
Loading