Skip to content

Commit

Permalink
added a flag server-count-source to control the behavior when two sou…
Browse files Browse the repository at this point in the history
…rces are present
  • Loading branch information
konryd committed Nov 15, 2024
1 parent 1eec50b commit 1b41692
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 23 deletions.
11 changes: 11 additions & 0 deletions cmd/agent/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ type GrpcProxyAgentOptions struct {
LeaseNamespace string
// Labels on which lease objects are managed.
LeaseLabel string
// ServerCountSource describes how server counts should be combined.
ServerCountSource string
// Path to kubeconfig (used by kubernetes client for lease listing)
KubeconfigPath string
// Content type of requests sent to apiserver.
Expand All @@ -108,6 +110,7 @@ func (o *GrpcProxyAgentOptions) ClientSetConfig(dialOptions ...grpc.DialOption)
WarnOnChannelLimit: o.WarnOnChannelLimit,
SyncForever: o.SyncForever,
XfrChannelSize: o.XfrChannelSize,
ServerCountSource: o.ServerCountSource,
}
}

Expand Down Expand Up @@ -138,6 +141,7 @@ func (o *GrpcProxyAgentOptions) Flags() *pflag.FlagSet {
flags.BoolVar(&o.CountServerLeases, "count-server-leases", o.CountServerLeases, "Enables lease counting system to determine the number of proxy servers to connect to.")
flags.StringVar(&o.LeaseNamespace, "lease-namespace", o.LeaseNamespace, "Namespace where lease objects are managed.")
flags.StringVar(&o.LeaseLabel, "lease-label", o.LeaseLabel, "The labels on which the lease objects are managed.")
flags.StringVar(&o.ServerCountSource, "server-count-source", o.ServerCountSource, "Defines how the server counts from lease and from server responses are combined. Possible values: 'default' to use only one source (server or leases depending on other flags), 'max' to take the larger value.")
flags.StringVar(&o.KubeconfigPath, "kubeconfig", o.KubeconfigPath, "Path to the kubeconfig file")
flags.StringVar(&o.APIContentType, "kube-api-content-type", o.APIContentType, "Content type of requests sent to apiserver.")
return flags
Expand Down Expand Up @@ -168,6 +172,7 @@ func (o *GrpcProxyAgentOptions) Print() {
klog.V(1).Infof("CountServerLeases set to %v.\n", o.CountServerLeases)
klog.V(1).Infof("LeaseNamespace set to %s.\n", o.LeaseNamespace)
klog.V(1).Infof("LeaseLabel set to %s.\n", o.LeaseLabel)
klog.V(1).Infof("ServerCountSource set to %s.\n", o.ServerCountSource)
klog.V(1).Infof("ChannelSize set to %d.\n", o.XfrChannelSize)
klog.V(1).Infof("APIContentType set to %v.\n", o.APIContentType)
}
Expand Down Expand Up @@ -232,6 +237,11 @@ func (o *GrpcProxyAgentOptions) Validate() error {
return err
}
}
if o.ServerCountSource != "" {
if o.ServerCountSource != "default" && o.ServerCountSource != "max" {
return fmt.Errorf("--server-count-source must be one of '', 'default', 'max', got %v", o.ServerCountSource)
}
}

return nil
}
Expand Down Expand Up @@ -281,6 +291,7 @@ func NewGrpcProxyAgentOptions() *GrpcProxyAgentOptions {
CountServerLeases: false,
LeaseNamespace: "kube-system",
LeaseLabel: "k8s-app=konnectivity-server",
ServerCountSource: "default",
KubeconfigPath: "",
APIContentType: runtime.ContentTypeProtobuf,
}
Expand Down
4 changes: 4 additions & 0 deletions cmd/agent/app/options/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,10 @@ func TestValidate(t *testing.T) {
fieldMap: map[string]interface{}{"XfrChannelSize": -10},
expected: fmt.Errorf("channel size -10 must be greater than 0"),
},
"ServerCountSource": {
fieldMap: map[string]interface{}{"ServerCountSource": "foobar"},
expected: fmt.Errorf("--server-count-source must be one of '', 'default', 'max', got foobar"),
},
} {
t.Run(desc, func(t *testing.T) {
testAgentOptions := NewGrpcProxyAgentOptions()
Expand Down
55 changes: 40 additions & 15 deletions pkg/agent/clientset.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ import (
"sigs.k8s.io/apiserver-network-proxy/pkg/agent/metrics"
)

const (
fromResponses = "KNP server response headers"
fromLeases = "KNP lease count"
fromFallback = "fallback to 1"
)

// ClientSet consists of clients connected to each instance of an HA proxy server.
type ClientSet struct {
mu sync.Mutex //protects the clients.
Expand Down Expand Up @@ -68,6 +74,7 @@ type ClientSet struct {
xfrChannelSize int

syncForever bool // Continue syncing (support dynamic server count).
serverCountSource string
}

func (cs *ClientSet) ClientsCount() int {
Expand Down Expand Up @@ -148,6 +155,7 @@ type ClientSetConfig struct {
SyncForever bool
XfrChannelSize int
ServerLeaseCounter ServerCounter
ServerCountSource string
}

func (cc *ClientSetConfig) NewAgentClientSet(drainCh, stopCh <-chan struct{}) *ClientSet {
Expand All @@ -167,6 +175,7 @@ func (cc *ClientSetConfig) NewAgentClientSet(drainCh, stopCh <-chan struct{}) *C
xfrChannelSize: cc.XfrChannelSize,
stopCh: stopCh,
leaseCounter: cc.ServerLeaseCounter,
serverCountSource: cc.ServerCountSource,
}
}

Expand Down Expand Up @@ -218,25 +227,41 @@ func (cs *ClientSet) sync() {
}

func (cs *ClientSet) ServerCount() int {
countFromLeases := 0
if cs.leaseCounter != nil {
countFromLeases = cs.leaseCounter.Count()
}
countFromResponses := cs.lastReceivedServerCount

serverCount := countFromLeases
countSource := "KNP server lease count"
if countFromResponses > serverCount {
serverCount = countFromResponses
countSource = "KNP server response headers"
}
if serverCount == 0 {
serverCount = 1
countSource = "fallback to 1"
var serverCount int
var countSourceLabel string

switch cs.serverCountSource {
case "", "default":
if cs.leaseCounter != nil {
serverCount = cs.leaseCounter.Count()
countSourceLabel = fromLeases
} else {
serverCount = cs.lastReceivedServerCount
countSourceLabel = fromResponses
}
case "max":
countFromLeases := 0
if cs.leaseCounter != nil {
countFromLeases = cs.leaseCounter.Count()
}
countFromResponses := cs.lastReceivedServerCount

serverCount = countFromLeases
countSourceLabel = fromLeases
if countFromResponses > serverCount {
serverCount = countFromResponses
countSourceLabel = fromResponses
}
if serverCount == 0 {
serverCount = 1
countSourceLabel = fromFallback
}

}

if serverCount != cs.lastServerCount {
klog.Warningf("change detected in proxy server count (was: %d, now: %d, source: %q)", cs.lastServerCount, serverCount, countSource)
klog.Warningf("change detected in proxy server count (was: %d, now: %d, source: %q)", cs.lastServerCount, serverCount, countSourceLabel)
cs.lastServerCount = serverCount
}

Expand Down
34 changes: 26 additions & 8 deletions pkg/agent/clientset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,39 +31,57 @@ func (f *FakeServerCounter) Count() int {
func TestServerCount(t *testing.T) {
testCases := []struct{
name string
serverCountSource string
leaseCounter ServerCounter
responseCount int
leaseCount int
want int
} {
{
name: "higher from response",
serverCountSource: "max",
responseCount: 42,
leaseCount: 24,
leaseCounter: &FakeServerCounter{24},
want: 42,
},
{
name: "higher from leases",
serverCountSource: "max",
responseCount: 3,
leaseCount: 6,
leaseCounter: &FakeServerCounter{6},
want: 6,
},
{
name: "both zero",
serverCountSource: "max",
responseCount: 0,
leaseCount: 0,
leaseCounter: &FakeServerCounter{0},
want: 1,
},

{
name: "response picked by default when no lease counter",
serverCountSource: "default",
responseCount: 3,
leaseCounter: nil,
want: 3,
},
{
name: "lease counter always picked when present",
serverCountSource: "default",
responseCount: 6,
leaseCounter: &FakeServerCounter{3},
want: 3,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
lc := &FakeServerCounter{
count: tc.leaseCount,
}

cs := &ClientSet{
clients: make(map[string]*Client),
leaseCounter: lc,
leaseCounter: tc.leaseCounter,
serverCountSource: tc.serverCountSource,

}
cs.lastReceivedServerCount = tc.responseCount
if got := cs.ServerCount(); got != tc.want {
Expand Down

0 comments on commit 1b41692

Please sign in to comment.