From 82d5c1a2611491fe33939219f67b7fcf4f2db870 Mon Sep 17 00:00:00 2001 From: Derek Menteer Date: Wed, 14 Sep 2022 14:46:07 -0500 Subject: [PATCH] Add envoy connection balancing to upstreams. --- agent/structs/config_entry.go | 9 +++++++++ agent/structs/config_entry_test.go | 11 +++++++---- agent/xds/listeners.go | 11 +++++++++++ api/config_entry.go | 6 ++++++ 4 files changed, 33 insertions(+), 4 deletions(-) diff --git a/agent/structs/config_entry.go b/agent/structs/config_entry.go index 72efbffce3d45..08d36e67d723c 100644 --- a/agent/structs/config_entry.go +++ b/agent/structs/config_entry.go @@ -765,6 +765,12 @@ type UpstreamConfig struct { // EnterpriseMeta is only accepted within a service-defaults config entry. acl.EnterpriseMeta `hcl:",squash" mapstructure:",squash"` + // EnvoyConnectionBalanceType specifies how envoy connections should + // be distributed across worker threads. Currently, only the "exact_balance" + // type is accepted. + // https://cloudnative.to/envoy/api-v3/config/listener/v3/listener.proto.html#config-listener-v3-listener-connectionbalanceconfig + EnvoyConnectionBalanceType string `json:",omitempty" alias:"envoy_connection_balance_type"` + // EnvoyListenerJSON is a complete override ("escape hatch") for the upstream's // listener. // @@ -827,6 +833,9 @@ func (cfg *UpstreamConfig) ServiceName() ServiceName { func (cfg UpstreamConfig) MergeInto(dst map[string]interface{}) { // Avoid storing empty values in the map, since these can act as overrides + if cfg.EnvoyConnectionBalanceType != "" { + dst["envoy_connection_balance_type"] = cfg.EnvoyConnectionBalanceType + } if cfg.EnvoyListenerJSON != "" { dst["envoy_listener_json"] = cfg.EnvoyListenerJSON } diff --git a/agent/structs/config_entry_test.go b/agent/structs/config_entry_test.go index 7a699417f06bd..93e096976d221 100644 --- a/agent/structs/config_entry_test.go +++ b/agent/structs/config_entry_test.go @@ -415,6 +415,7 @@ func TestDecodeConfigEntry(t *testing.T) { defaults { connect_timeout_ms = 5 protocol = "http" + envoy_connection_balance_type = "something" envoy_listener_json = "foo" envoy_cluster_json = "bar" limits { @@ -454,6 +455,7 @@ func TestDecodeConfigEntry(t *testing.T) { }, ] Defaults { + EnvoyConnectionBalanceType = "something" EnvoyListenerJSON = "foo" EnvoyClusterJSON = "bar" ConnectTimeoutMs = 5 @@ -493,10 +495,11 @@ func TestDecodeConfigEntry(t *testing.T) { }, }, Defaults: &UpstreamConfig{ - EnvoyListenerJSON: "foo", - EnvoyClusterJSON: "bar", - ConnectTimeoutMs: 5, - Protocol: "http", + EnvoyConnectionBalanceType: "something", + EnvoyListenerJSON: "foo", + EnvoyClusterJSON: "bar", + ConnectTimeoutMs: 5, + Protocol: "http", Limits: &UpstreamLimits{ MaxConnections: intPointer(3), MaxPendingRequests: intPointer(4), diff --git a/agent/xds/listeners.go b/agent/xds/listeners.go index cfea25cbc1e60..da6f61dc234f6 100644 --- a/agent/xds/listeners.go +++ b/agent/xds/listeners.go @@ -193,6 +193,7 @@ func (s *ResourceGenerator) listenersFromSnapshotConnectProxy(cfgSnap *proxycfg. upstreamListener.FilterChains = []*envoy_listener_v3.FilterChain{ filterChain, } + injectConnectionBalanceConfig(&cfg, upstreamListener) resources = append(resources, upstreamListener) // Avoid creating filter chains below for upstreams that have dedicated listeners @@ -388,6 +389,7 @@ func (s *ResourceGenerator) listenersFromSnapshotConnectProxy(cfgSnap *proxycfg. upstreamListener.FilterChains = []*envoy_listener_v3.FilterChain{ filterChain, } + injectConnectionBalanceConfig(&cfg, upstreamListener) resources = append(resources, upstreamListener) // Avoid creating filter chains below for upstreams that have dedicated listeners @@ -574,6 +576,7 @@ func (s *ResourceGenerator) listenersFromSnapshotConnectProxy(cfgSnap *proxycfg. upstreamListener.FilterChains = []*envoy_listener_v3.FilterChain{ filterChain, } + injectConnectionBalanceConfig(&cfg, upstreamListener) resources = append(resources, upstreamListener) } @@ -905,6 +908,14 @@ func makeListenerFromUserConfig(configJSON string) (*envoy_listener_v3.Listener, return &l, nil } +func injectConnectionBalanceConfig(cfg *structs.UpstreamConfig, listener *envoy_listener_v3.Listener) { + if cfg.EnvoyConnectionBalanceType == "exact_balance" { + listener.ConnectionBalanceConfig = &envoy_listener_v3.Listener_ConnectionBalanceConfig{ + BalanceType: &envoy_listener_v3.Listener_ConnectionBalanceConfig_ExactBalance_{}, + } + } +} + // Ensure that the first filter in each filter chain of a public listener is // the authz filter to prevent unauthorized access. func (s *ResourceGenerator) injectConnectFilters(cfgSnap *proxycfg.ConfigSnapshot, listener *envoy_listener_v3.Listener) error { diff --git a/api/config_entry.go b/api/config_entry.go index acdb5bfa86a70..c6104eb15232d 100644 --- a/api/config_entry.go +++ b/api/config_entry.go @@ -142,6 +142,12 @@ type UpstreamConfig struct { // Namespace is only accepted within a service-defaults config entry. Namespace string `json:",omitempty"` + // EnvoyConnectionBalanceType specifies how envoy connections should + // be distributed across worker threads. Currently, only the "exact_balance" + // type is accepted. + // https://cloudnative.to/envoy/api-v3/config/listener/v3/listener.proto.html#config-listener-v3-listener-connectionbalanceconfig + EnvoyConnectionBalanceType string `json:",omitempty" alias:"envoy_connection_balance_type"` + // EnvoyListenerJSON is a complete override ("escape hatch") for the upstream's // listener. //