diff --git a/agent/structs/config_entry.go b/agent/structs/config_entry.go index 72efbffce3d4..08d36e67d723 100644 --- a/agent/structs/config_entry.go +++ b/agent/structs/config_entry.go @@ -765,6 +765,12 @@ type UpstreamConfig struct { // EnterpriseMeta is only accepted within a service-defaults config entry. acl.EnterpriseMeta `hcl:",squash" mapstructure:",squash"` + // EnvoyConnectionBalanceType specifies how envoy connections should + // be distributed across worker threads. Currently, only the "exact_balance" + // type is accepted. + // https://cloudnative.to/envoy/api-v3/config/listener/v3/listener.proto.html#config-listener-v3-listener-connectionbalanceconfig + EnvoyConnectionBalanceType string `json:",omitempty" alias:"envoy_connection_balance_type"` + // EnvoyListenerJSON is a complete override ("escape hatch") for the upstream's // listener. // @@ -827,6 +833,9 @@ func (cfg *UpstreamConfig) ServiceName() ServiceName { func (cfg UpstreamConfig) MergeInto(dst map[string]interface{}) { // Avoid storing empty values in the map, since these can act as overrides + if cfg.EnvoyConnectionBalanceType != "" { + dst["envoy_connection_balance_type"] = cfg.EnvoyConnectionBalanceType + } if cfg.EnvoyListenerJSON != "" { dst["envoy_listener_json"] = cfg.EnvoyListenerJSON } diff --git a/agent/structs/config_entry_test.go b/agent/structs/config_entry_test.go index 7a699417f06b..30bcfd8d71e9 100644 --- a/agent/structs/config_entry_test.go +++ b/agent/structs/config_entry_test.go @@ -415,6 +415,7 @@ func TestDecodeConfigEntry(t *testing.T) { defaults { connect_timeout_ms = 5 protocol = "http" + envoy_connection_balance_type = "something" envoy_listener_json = "foo" envoy_cluster_json = "bar" limits { @@ -454,6 +455,7 @@ func TestDecodeConfigEntry(t *testing.T) { }, ] Defaults { + EnvoyConnectionBalanceType = "something" EnvoyListenerJSON = "foo" EnvoyClusterJSON = "bar" ConnectTimeoutMs = 5 @@ -493,10 +495,11 @@ func TestDecodeConfigEntry(t *testing.T) { }, }, Defaults: &UpstreamConfig{ - EnvoyListenerJSON: "foo", - EnvoyClusterJSON: "bar", - ConnectTimeoutMs: 5, - Protocol: "http", + EnvoyConnectionBalanceType: "something", + EnvoyListenerJSON: "foo", + EnvoyClusterJSON: "bar", + ConnectTimeoutMs: 5, + Protocol: "http", Limits: &UpstreamLimits{ MaxConnections: intPointer(3), MaxPendingRequests: intPointer(4), @@ -2665,10 +2668,11 @@ func TestUpstreamConfig_MergeInto(t *testing.T) { { name: "kitchen sink", source: UpstreamConfig{ - EnvoyListenerJSON: "foo", - EnvoyClusterJSON: "bar", - ConnectTimeoutMs: 5, - Protocol: "http", + EnvoyConnectionBalanceType: "something", + EnvoyListenerJSON: "foo", + EnvoyClusterJSON: "bar", + ConnectTimeoutMs: 5, + Protocol: "http", Limits: &UpstreamLimits{ MaxConnections: intPointer(3), MaxPendingRequests: intPointer(4), @@ -2682,10 +2686,11 @@ func TestUpstreamConfig_MergeInto(t *testing.T) { }, destination: make(map[string]interface{}), want: map[string]interface{}{ - "envoy_listener_json": "foo", - "envoy_cluster_json": "bar", - "connect_timeout_ms": 5, - "protocol": "http", + "envoy_connection_balance_type": "something", + "envoy_listener_json": "foo", + "envoy_cluster_json": "bar", + "connect_timeout_ms": 5, + "protocol": "http", "limits": &UpstreamLimits{ MaxConnections: intPointer(3), MaxPendingRequests: intPointer(4), @@ -2701,10 +2706,11 @@ func TestUpstreamConfig_MergeInto(t *testing.T) { { name: "kitchen sink override of destination", source: UpstreamConfig{ - EnvoyListenerJSON: "foo", - EnvoyClusterJSON: "bar", - ConnectTimeoutMs: 5, - Protocol: "http", + EnvoyConnectionBalanceType: "something", + EnvoyListenerJSON: "foo", + EnvoyClusterJSON: "bar", + ConnectTimeoutMs: 5, + Protocol: "http", Limits: &UpstreamLimits{ MaxConnections: intPointer(3), MaxPendingRequests: intPointer(4), @@ -2717,10 +2723,11 @@ func TestUpstreamConfig_MergeInto(t *testing.T) { MeshGateway: MeshGatewayConfig{Mode: MeshGatewayModeRemote}, }, destination: map[string]interface{}{ - "envoy_listener_json": "zip", - "envoy_cluster_json": "zap", - "connect_timeout_ms": 10, - "protocol": "grpc", + "envoy_connection_balance_type": "zup", + "envoy_listener_json": "zip", + "envoy_cluster_json": "zap", + "connect_timeout_ms": 10, + "protocol": "grpc", "limits": &UpstreamLimits{ MaxConnections: intPointer(10), MaxPendingRequests: intPointer(11), @@ -2733,10 +2740,11 @@ func TestUpstreamConfig_MergeInto(t *testing.T) { "mesh_gateway": MeshGatewayConfig{Mode: MeshGatewayModeLocal}, }, want: map[string]interface{}{ - "envoy_listener_json": "foo", - "envoy_cluster_json": "bar", - "connect_timeout_ms": 5, - "protocol": "http", + "envoy_connection_balance_type": "something", + "envoy_listener_json": "foo", + "envoy_cluster_json": "bar", + "connect_timeout_ms": 5, + "protocol": "http", "limits": &UpstreamLimits{ MaxConnections: intPointer(3), MaxPendingRequests: intPointer(4), @@ -2753,10 +2761,11 @@ func TestUpstreamConfig_MergeInto(t *testing.T) { name: "empty source leaves destination intact", source: UpstreamConfig{}, destination: map[string]interface{}{ - "envoy_listener_json": "zip", - "envoy_cluster_json": "zap", - "connect_timeout_ms": 10, - "protocol": "grpc", + "envoy_connection_balance_type": "something", + "envoy_listener_json": "zip", + "envoy_cluster_json": "zap", + "connect_timeout_ms": 10, + "protocol": "grpc", "limits": &UpstreamLimits{ MaxConnections: intPointer(10), MaxPendingRequests: intPointer(11), @@ -2770,10 +2779,11 @@ func TestUpstreamConfig_MergeInto(t *testing.T) { "mesh_gateway": MeshGatewayConfig{Mode: MeshGatewayModeLocal}, }, want: map[string]interface{}{ - "envoy_listener_json": "zip", - "envoy_cluster_json": "zap", - "connect_timeout_ms": 10, - "protocol": "grpc", + "envoy_connection_balance_type": "something", + "envoy_listener_json": "zip", + "envoy_cluster_json": "zap", + "connect_timeout_ms": 10, + "protocol": "grpc", "limits": &UpstreamLimits{ MaxConnections: intPointer(10), MaxPendingRequests: intPointer(11), diff --git a/agent/xds/listeners.go b/agent/xds/listeners.go index cfea25cbc1e6..da6f61dc234f 100644 --- a/agent/xds/listeners.go +++ b/agent/xds/listeners.go @@ -193,6 +193,7 @@ func (s *ResourceGenerator) listenersFromSnapshotConnectProxy(cfgSnap *proxycfg. upstreamListener.FilterChains = []*envoy_listener_v3.FilterChain{ filterChain, } + injectConnectionBalanceConfig(&cfg, upstreamListener) resources = append(resources, upstreamListener) // Avoid creating filter chains below for upstreams that have dedicated listeners @@ -388,6 +389,7 @@ func (s *ResourceGenerator) listenersFromSnapshotConnectProxy(cfgSnap *proxycfg. upstreamListener.FilterChains = []*envoy_listener_v3.FilterChain{ filterChain, } + injectConnectionBalanceConfig(&cfg, upstreamListener) resources = append(resources, upstreamListener) // Avoid creating filter chains below for upstreams that have dedicated listeners @@ -574,6 +576,7 @@ func (s *ResourceGenerator) listenersFromSnapshotConnectProxy(cfgSnap *proxycfg. upstreamListener.FilterChains = []*envoy_listener_v3.FilterChain{ filterChain, } + injectConnectionBalanceConfig(&cfg, upstreamListener) resources = append(resources, upstreamListener) } @@ -905,6 +908,14 @@ func makeListenerFromUserConfig(configJSON string) (*envoy_listener_v3.Listener, return &l, nil } +func injectConnectionBalanceConfig(cfg *structs.UpstreamConfig, listener *envoy_listener_v3.Listener) { + if cfg.EnvoyConnectionBalanceType == "exact_balance" { + listener.ConnectionBalanceConfig = &envoy_listener_v3.Listener_ConnectionBalanceConfig{ + BalanceType: &envoy_listener_v3.Listener_ConnectionBalanceConfig_ExactBalance_{}, + } + } +} + // Ensure that the first filter in each filter chain of a public listener is // the authz filter to prevent unauthorized access. func (s *ResourceGenerator) injectConnectFilters(cfgSnap *proxycfg.ConfigSnapshot, listener *envoy_listener_v3.Listener) error { diff --git a/api/config_entry.go b/api/config_entry.go index acdb5bfa86a7..c6104eb15232 100644 --- a/api/config_entry.go +++ b/api/config_entry.go @@ -142,6 +142,12 @@ type UpstreamConfig struct { // Namespace is only accepted within a service-defaults config entry. Namespace string `json:",omitempty"` + // EnvoyConnectionBalanceType specifies how envoy connections should + // be distributed across worker threads. Currently, only the "exact_balance" + // type is accepted. + // https://cloudnative.to/envoy/api-v3/config/listener/v3/listener.proto.html#config-listener-v3-listener-connectionbalanceconfig + EnvoyConnectionBalanceType string `json:",omitempty" alias:"envoy_connection_balance_type"` + // EnvoyListenerJSON is a complete override ("escape hatch") for the upstream's // listener. //