Skip to content

Commit 97542ea

Browse files
committed
fixes server count logic
Signed-off-by: Imran Pochi <imranpochi@microsoft.com>
1 parent dac431f commit 97542ea

File tree

5 files changed

+201
-114
lines changed

5 files changed

+201
-114
lines changed

cmd/agent/app/server.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@ func (a *Agent) runProxyConnection(o *options.GrpcProxyAgentOptions, drainCh, st
143143
}
144144
cc := o.ClientSetConfig(dialOptions...)
145145

146+
var leaseCounter agent.ServerCounter
146147
if o.CountServerLeases {
147148
var config *rest.Config
148149
if o.KubeconfigPath != "" {
@@ -167,15 +168,20 @@ func (a *Agent) runProxyConnection(o *options.GrpcProxyAgentOptions, drainCh, st
167168
cache.WaitForCacheSync(stopCh, leaseInformer.HasSynced)
168169
leaseLister := coordinationv1lister.NewLeaseLister(leaseInformer.GetIndexer())
169170
serverLeaseSelector, _ := labels.Parse(o.LeaseLabel)
170-
serverLeaseCounter := agent.NewServerLeaseCounter(
171+
leaseCounter = agent.NewServerLeaseCounter(
171172
clock.RealClock{},
172173
leaseLister,
173174
serverLeaseSelector,
174175
)
175-
cc.ServerLeaseCounter = serverLeaseCounter
176176
}
177177

178178
cs := cc.NewAgentClientSet(drainCh, stopCh)
179+
// Always create the response-based counter.
180+
responseCounter := agent.NewResponseBasedCounter(cs)
181+
// Create the aggregate counter which acts as the master strategy.
182+
aggregateCounter := agent.NewAggregateServerCounter(leaseCounter, responseCounter, o.ServerCountSource)
183+
cs.SetServerCounter(aggregateCounter)
184+
179185
cs.Serve()
180186

181187
return cs, nil

pkg/agent/clientset.go

Lines changed: 41 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,8 @@ type ClientSet struct {
5151
// Address is the proxy server address. Assuming HA proxy server
5252
address string
5353

54-
// leaseCounter counts number of proxy server leases
55-
leaseCounter ServerCounter
54+
// serverCounter counts number of proxy server leases
55+
serverCounter ServerCounter
5656

5757
// lastReceivedServerCount is the last serverCount value received when connecting to a proxy server
5858
lastReceivedServerCount int
@@ -108,6 +108,11 @@ func (cs *ClientSet) ClientsCount() int {
108108
return len(cs.clients)
109109
}
110110

111+
// SetServerCounter sets the strategy for determining the server count.
112+
func (cs *ClientSet) SetServerCounter(counter ServerCounter) {
113+
cs.serverCounter = counter
114+
}
115+
111116
func (cs *ClientSet) HealthyClientsCount() int {
112117
cs.mu.Lock()
113118
defer cs.mu.Unlock()
@@ -175,7 +180,6 @@ type ClientSetConfig struct {
175180
WarnOnChannelLimit bool
176181
SyncForever bool
177182
XfrChannelSize int
178-
ServerLeaseCounter ServerCounter
179183
ServerCountSource string
180184
}
181185

@@ -195,7 +199,6 @@ func (cc *ClientSetConfig) NewAgentClientSet(drainCh, stopCh <-chan struct{}) *C
195199
drainCh: drainCh,
196200
xfrChannelSize: cc.XfrChannelSize,
197201
stopCh: stopCh,
198-
leaseCounter: cc.ServerLeaseCounter,
199202
serverCountSource: cc.ServerCountSource,
200203
}
201204
}
@@ -214,30 +217,40 @@ func (cs *ClientSet) resetBackoff() *wait.Backoff {
214217
}
215218
}
216219

217-
// sync makes sure that #clients >= #proxy servers
220+
// determineServerCount determines the number of proxy servers by delegating to its configured counter strategy.
221+
func (cs *ClientSet) determineServerCount() int {
222+
serverCount := cs.serverCounter.Count()
223+
metrics.Metrics.SetServerCount(serverCount)
224+
return serverCount
225+
}
226+
227+
// sync manages the backoff and the connection attempts to the proxy server.
228+
// sync runs until stopCh is closed
218229
func (cs *ClientSet) sync() {
219230
defer cs.shutdown()
220231
backoff := cs.resetBackoff()
221232
var duration time.Duration
222233
for {
223-
if serverCount, err := cs.connectOnce(); err != nil {
234+
if err := cs.connectOnce(); err != nil {
224235
if dse, ok := err.(*DuplicateServerError); ok {
225-
clientsCount := cs.ClientsCount()
226-
klog.V(4).InfoS("duplicate server", "serverID", dse.ServerID, "serverCount", serverCount, "clientsCount", clientsCount)
227-
if serverCount != 0 && clientsCount >= serverCount {
228-
duration = backoff.Step()
229-
} else {
230-
backoff = cs.resetBackoff()
231-
duration = wait.Jitter(backoff.Duration, backoff.Jitter)
232-
}
236+
klog.V(4).InfoS("duplicate server connection attempt", "serverID", dse.ServerID)
237+
// We connected to a server we already have a connection to.
238+
// This is expected in syncForever mode. We just wait for the
239+
// next sync period to try again. No need for backoff.
240+
backoff = cs.resetBackoff()
241+
duration = wait.Jitter(backoff.Duration, backoff.Jitter)
233242
} else {
243+
// A 'real' error, so we backoff.
234244
klog.ErrorS(err, "cannot connect once")
235245
duration = backoff.Step()
236246
}
237247
} else {
248+
// A successful connection was made, or no new connection was needed.
249+
// Reset the backoff and wait for the next sync period.
238250
backoff = cs.resetBackoff()
239251
duration = wait.Jitter(backoff.Duration, backoff.Jitter)
240252
}
253+
241254
time.Sleep(duration)
242255
select {
243256
case <-cs.stopCh:
@@ -247,76 +260,37 @@ func (cs *ClientSet) sync() {
247260
}
248261
}
249262

250-
func (cs *ClientSet) ServerCount() int {
251-
252-
var serverCount int
253-
var countSourceLabel string
254-
255-
switch cs.serverCountSource {
256-
case "", "default":
257-
if cs.leaseCounter != nil {
258-
serverCount = cs.leaseCounter.Count()
259-
countSourceLabel = fromLeases
260-
} else {
261-
serverCount = cs.lastReceivedServerCount
262-
countSourceLabel = fromResponses
263-
}
264-
case "max":
265-
countFromLeases := 0
266-
if cs.leaseCounter != nil {
267-
countFromLeases = cs.leaseCounter.Count()
268-
}
269-
countFromResponses := cs.lastReceivedServerCount
270-
271-
serverCount = countFromLeases
272-
countSourceLabel = fromLeases
273-
if countFromResponses > serverCount {
274-
serverCount = countFromResponses
275-
countSourceLabel = fromResponses
276-
}
277-
if serverCount == 0 {
278-
serverCount = 1
279-
countSourceLabel = fromFallback
280-
}
281-
282-
}
263+
func (cs *ClientSet) connectOnce() error {
264+
serverCount := cs.determineServerCount()
283265

284-
if serverCount != cs.lastServerCount {
285-
klog.Warningf("change detected in proxy server count (was: %d, now: %d, source: %q)", cs.lastServerCount, serverCount, countSourceLabel)
286-
cs.lastServerCount = serverCount
266+
// If not in syncForever mode, we only connect if we have fewer connections than the server count.
267+
if !cs.syncForever && cs.ClientsCount() >= serverCount && serverCount > 0 {
268+
return nil // Nothing to do.
287269
}
288270

289-
metrics.Metrics.SetServerCount(serverCount)
290-
return serverCount
291-
}
292-
293-
func (cs *ClientSet) connectOnce() (int, error) {
294-
serverCount := cs.ServerCount()
295-
296-
if !cs.syncForever && serverCount != 0 && cs.ClientsCount() >= serverCount {
297-
return serverCount, nil
298-
}
271+
// In syncForever mode, we always try to connect, to discover new servers.
299272
c, receivedServerCount, err := cs.newAgentClient()
300273
if err != nil {
301-
return serverCount, err
274+
return err
302275
}
276+
303277
if err := cs.AddClient(c.serverID, c); err != nil {
304278
c.Close()
305-
return serverCount, err
279+
return err // likely *DuplicateServerError
306280
}
307-
// By moving the update to here, we only accept the server count from a server
308-
// that we have successfully added to our active client set, implicitly ignoring
309-
// stale data from duplicate connection attempts.
281+
// SUCCESS: We connected to a new, unique server.
282+
// Only now do we update our view of the server count.
310283
cs.lastReceivedServerCount = receivedServerCount
311-
klog.V(2).InfoS("sync added client connecting to proxy server", "serverID", c.serverID)
284+
klog.V(2).InfoS("successfully connected to new proxy server", "serverID", c.serverID, "newServerCount", receivedServerCount)
312285

313286
labels := runpprof.Labels(
314287
"agentIdentifiers", cs.agentIdentifiers,
315288
"serverAddress", cs.address,
316289
"serverID", c.serverID,
317290
)
318291
go runpprof.Do(context.Background(), labels, func(context.Context) { c.Serve() })
319-
return serverCount, nil
292+
293+
return nil
320294
}
321295

322296
func (cs *ClientSet) Serve() {

pkg/agent/clientset_test.go

Lines changed: 69 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -28,65 +28,93 @@ func (f *FakeServerCounter) Count() int {
2828
return f.count
2929
}
3030

31-
func TestServerCount(t *testing.T) {
31+
func TestAggregateServerCounter(t *testing.T) {
3232
testCases := []struct {
33-
name string
34-
serverCountSource string
35-
leaseCounter ServerCounter
36-
responseCount int
37-
want int
33+
name string
34+
source string
35+
leaseCounter ServerCounter
36+
responseCounter ServerCounter
37+
want int
3838
}{
3939
{
40-
name: "higher from response",
41-
serverCountSource: "max",
42-
responseCount: 42,
43-
leaseCounter: &FakeServerCounter{24},
44-
want: 42,
40+
name: "max: higher from response",
41+
source: "max",
42+
leaseCounter: &FakeServerCounter{count: 24},
43+
responseCounter: &FakeServerCounter{count: 42},
44+
want: 42,
4545
},
4646
{
47-
name: "higher from leases",
48-
serverCountSource: "max",
49-
responseCount: 3,
50-
leaseCounter: &FakeServerCounter{6},
51-
want: 6,
47+
name: "max: higher from leases",
48+
source: "max",
49+
leaseCounter: &FakeServerCounter{count: 6},
50+
responseCounter: &FakeServerCounter{count: 3},
51+
want: 6,
5252
},
5353
{
54-
name: "both zero",
55-
serverCountSource: "max",
56-
responseCount: 0,
57-
leaseCounter: &FakeServerCounter{0},
58-
want: 1,
54+
name: "max: both zero",
55+
source: "max",
56+
leaseCounter: &FakeServerCounter{count: 0},
57+
responseCounter: &FakeServerCounter{count: 0},
58+
want: 1, // fallback
59+
},
60+
{
61+
name: "default: lease counter is nil",
62+
source: "default",
63+
leaseCounter: nil,
64+
responseCounter: &FakeServerCounter{count: 3},
65+
want: 3,
5966
},
60-
6167
{
62-
name: "response picked by default when no lease counter",
63-
serverCountSource: "default",
64-
responseCount: 3,
65-
leaseCounter: nil,
66-
want: 3,
68+
name: "default: lease counter is present",
69+
source: "default",
70+
leaseCounter: &FakeServerCounter{count: 3},
71+
responseCounter: &FakeServerCounter{count: 6},
72+
want: 3, // lease count is preferred
6773
},
6874
{
69-
name: "lease counter always picked when present",
70-
serverCountSource: "default",
71-
responseCount: 6,
72-
leaseCounter: &FakeServerCounter{3},
73-
want: 3,
75+
name: "default: lease count is zero",
76+
source: "default",
77+
leaseCounter: &FakeServerCounter{count: 0},
78+
responseCounter: &FakeServerCounter{count: 6},
79+
want: 1, // fallback
7480
},
7581
}
7682

7783
for _, tc := range testCases {
7884
t.Run(tc.name, func(t *testing.T) {
79-
80-
cs := &ClientSet{
81-
clients: make(map[string]*Client),
82-
leaseCounter: tc.leaseCounter,
83-
serverCountSource: tc.serverCountSource,
84-
}
85-
cs.lastReceivedServerCount = tc.responseCount
86-
if got := cs.ServerCount(); got != tc.want {
87-
t.Errorf("cs.ServerCount() = %v, want: %v", got, tc.want)
85+
agg := NewAggregateServerCounter(tc.leaseCounter, tc.responseCounter, tc.source)
86+
if got := agg.Count(); got != tc.want {
87+
t.Errorf("agg.Count() = %v, want: %v", got, tc.want)
8888
}
8989
})
9090
}
91+
}
9192

93+
func TestResponseBasedCounter(t *testing.T) {
94+
testCases := []struct {
95+
name string
96+
responseCount int
97+
want int
98+
}{
99+
{
100+
name: "non-zero count",
101+
responseCount: 5,
102+
want: 5,
103+
},
104+
{
105+
name: "zero count",
106+
responseCount: 0,
107+
want: 1, // fallback
108+
},
109+
}
110+
111+
for _, tc := range testCases {
112+
t.Run(tc.name, func(t *testing.T) {
113+
cs := &ClientSet{lastReceivedServerCount: tc.responseCount}
114+
rbc := NewResponseBasedCounter(cs)
115+
if got := rbc.Count(); got != tc.want {
116+
t.Errorf("rbc.Count() = %v, want: %v", got, tc.want)
117+
}
118+
})
119+
}
92120
}

pkg/agent/lease_counter.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,6 @@ import (
3636
coordinationv1lister "k8s.io/client-go/listers/coordination/v1"
3737
)
3838

39-
type ServerCounter interface {
40-
Count() int
41-
}
42-
4339
// A ServerLeaseCounter counts leases in the k8s apiserver to determine the
4440
// current proxy server count.
4541
type ServerLeaseCounter struct {

0 commit comments

Comments
 (0)