Skip to content

Commit fba553d

Browse files
easwarsprintchard
authored andcommitted
internal/grpcsync: support two ways to schedule a callback with the serializer (grpc#7408)
1 parent d3a0261 commit fba553d

File tree

12 files changed

+82
-64
lines changed

12 files changed

+82
-64
lines changed

balancer_wrapper.go

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ func newCCBalancerWrapper(cc *ClientConn) *ccBalancerWrapper {
9595
// it is safe to call into the balancer here.
9696
func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnState) error {
9797
errCh := make(chan error)
98-
ok := ccb.serializer.Schedule(func(ctx context.Context) {
98+
uccs := func(ctx context.Context) {
9999
defer close(errCh)
100100
if ctx.Err() != nil || ccb.balancer == nil {
101101
return
@@ -110,17 +110,23 @@ func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnStat
110110
logger.Infof("error from balancer.UpdateClientConnState: %v", err)
111111
}
112112
errCh <- err
113-
})
114-
if !ok {
115-
return nil
116113
}
114+
onFailure := func() { close(errCh) }
115+
116+
// UpdateClientConnState can race with Close, and when the latter wins, the
117+
// serializer is closed, and the attempt to schedule the callback will fail.
118+
// It is acceptable to ignore this failure. But since we want to handle the
119+
// state update in a blocking fashion (when we successfully schedule the
120+
// callback), we have to use the ScheduleOr method and not the MaybeSchedule
121+
// method on the serializer.
122+
ccb.serializer.ScheduleOr(uccs, onFailure)
117123
return <-errCh
118124
}
119125

120126
// resolverError is invoked by grpc to push a resolver error to the underlying
121127
// balancer. The call to the balancer is executed from the serializer.
122128
func (ccb *ccBalancerWrapper) resolverError(err error) {
123-
ccb.serializer.Schedule(func(ctx context.Context) {
129+
ccb.serializer.TrySchedule(func(ctx context.Context) {
124130
if ctx.Err() != nil || ccb.balancer == nil {
125131
return
126132
}
@@ -136,7 +142,7 @@ func (ccb *ccBalancerWrapper) close() {
136142
ccb.closed = true
137143
ccb.mu.Unlock()
138144
channelz.Info(logger, ccb.cc.channelz, "ccBalancerWrapper: closing")
139-
ccb.serializer.Schedule(func(context.Context) {
145+
ccb.serializer.TrySchedule(func(context.Context) {
140146
if ccb.balancer == nil {
141147
return
142148
}
@@ -148,7 +154,7 @@ func (ccb *ccBalancerWrapper) close() {
148154

149155
// exitIdle invokes the balancer's exitIdle method in the serializer.
150156
func (ccb *ccBalancerWrapper) exitIdle() {
151-
ccb.serializer.Schedule(func(ctx context.Context) {
157+
ccb.serializer.TrySchedule(func(ctx context.Context) {
152158
if ctx.Err() != nil || ccb.balancer == nil {
153159
return
154160
}
@@ -256,7 +262,7 @@ type acBalancerWrapper struct {
256262
// updateState is invoked by grpc to push a subConn state update to the
257263
// underlying balancer.
258264
func (acbw *acBalancerWrapper) updateState(s connectivity.State, curAddr resolver.Address, err error) {
259-
acbw.ccb.serializer.Schedule(func(ctx context.Context) {
265+
acbw.ccb.serializer.TrySchedule(func(ctx context.Context) {
260266
if ctx.Err() != nil || acbw.ccb.balancer == nil {
261267
return
262268
}

internal/grpcsync/callback_serializer.go

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -53,16 +53,28 @@ func NewCallbackSerializer(ctx context.Context) *CallbackSerializer {
5353
return cs
5454
}
5555

56-
// Schedule adds a callback to be scheduled after existing callbacks are run.
56+
// TrySchedule tries to schedules the provided callback function f to be
57+
// executed in the order it was added. This is a best-effort operation. If the
58+
// context passed to NewCallbackSerializer was canceled before this method is
59+
// called, the callback will not be scheduled.
5760
//
5861
// Callbacks are expected to honor the context when performing any blocking
5962
// operations, and should return early when the context is canceled.
63+
func (cs *CallbackSerializer) TrySchedule(f func(ctx context.Context)) {
64+
cs.callbacks.Put(f)
65+
}
66+
67+
// ScheduleOr schedules the provided callback function f to be executed in the
68+
// order it was added. If the context passed to NewCallbackSerializer has been
69+
// canceled before this method is called, the onFailure callback will be
70+
// executed inline instead.
6071
//
61-
// Return value indicates if the callback was successfully added to the list of
62-
// callbacks to be executed by the serializer. It is not possible to add
63-
// callbacks once the context passed to NewCallbackSerializer is cancelled.
64-
func (cs *CallbackSerializer) Schedule(f func(ctx context.Context)) bool {
65-
return cs.callbacks.Put(f) == nil
72+
// Callbacks are expected to honor the context when performing any blocking
73+
// operations, and should return early when the context is canceled.
74+
func (cs *CallbackSerializer) ScheduleOr(f func(ctx context.Context), onFailure func()) {
75+
if cs.callbacks.Put(f) != nil {
76+
onFailure()
77+
}
6678
}
6779

6880
func (cs *CallbackSerializer) run(ctx context.Context) {

internal/grpcsync/callback_serializer_test.go

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ func (s) TestCallbackSerializer_Schedule_FIFO(t *testing.T) {
5555
mu.Lock()
5656
defer mu.Unlock()
5757
scheduleOrderCh <- id
58-
cs.Schedule(func(ctx context.Context) {
58+
cs.TrySchedule(func(ctx context.Context) {
5959
select {
6060
case <-ctx.Done():
6161
return
@@ -115,7 +115,7 @@ func (s) TestCallbackSerializer_Schedule_Concurrent(t *testing.T) {
115115
wg.Add(numCallbacks)
116116
for i := 0; i < numCallbacks; i++ {
117117
go func() {
118-
cs.Schedule(func(context.Context) {
118+
cs.TrySchedule(func(context.Context) {
119119
wg.Done()
120120
})
121121
}()
@@ -148,7 +148,7 @@ func (s) TestCallbackSerializer_Schedule_Close(t *testing.T) {
148148
// Schedule a callback which blocks until the context passed to it is
149149
// canceled. It also closes a channel to signal that it has started.
150150
firstCallbackStartedCh := make(chan struct{})
151-
cs.Schedule(func(ctx context.Context) {
151+
cs.TrySchedule(func(ctx context.Context) {
152152
close(firstCallbackStartedCh)
153153
<-ctx.Done()
154154
})
@@ -159,9 +159,9 @@ func (s) TestCallbackSerializer_Schedule_Close(t *testing.T) {
159159
callbackCh := make(chan int, numCallbacks)
160160
for i := 0; i < numCallbacks; i++ {
161161
num := i
162-
if !cs.Schedule(func(context.Context) { callbackCh <- num }) {
163-
t.Fatal("Schedule failed to accept a callback when the serializer is yet to be closed")
164-
}
162+
callback := func(context.Context) { callbackCh <- num }
163+
onFailure := func() { t.Fatal("Schedule failed to accept a callback when the serializer is yet to be closed") }
164+
cs.ScheduleOr(callback, onFailure)
165165
}
166166

167167
// Ensure that none of the newer callbacks are executed at this point.
@@ -192,15 +192,15 @@ func (s) TestCallbackSerializer_Schedule_Close(t *testing.T) {
192192
}
193193
<-cs.Done()
194194

195+
// Ensure that a callback cannot be scheduled after the serializer is
196+
// closed.
195197
done := make(chan struct{})
196-
if cs.Schedule(func(context.Context) { close(done) }) {
197-
t.Fatal("Scheduled a callback after closing the serializer")
198-
}
199-
200-
// Ensure that the latest callback is executed at this point.
198+
callback := func(context.Context) { t.Fatal("Scheduled a callback after closing the serializer") }
199+
onFailure := func() { close(done) }
200+
cs.ScheduleOr(callback, onFailure)
201201
select {
202-
case <-time.After(defaultTestShortTimeout):
202+
case <-time.After(defaultTestTimeout):
203+
t.Fatal("Successfully scheduled callback after serializer is closed")
203204
case <-done:
204-
t.Fatal("Newer callback executed when scheduled after closing serializer")
205205
}
206206
}

internal/grpcsync/pubsub.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ func (ps *PubSub) Subscribe(sub Subscriber) (cancel func()) {
7777

7878
if ps.msg != nil {
7979
msg := ps.msg
80-
ps.cs.Schedule(func(context.Context) {
80+
ps.cs.TrySchedule(func(context.Context) {
8181
ps.mu.Lock()
8282
defer ps.mu.Unlock()
8383
if !ps.subscribers[sub] {
@@ -103,7 +103,7 @@ func (ps *PubSub) Publish(msg any) {
103103
ps.msg = msg
104104
for sub := range ps.subscribers {
105105
s := sub
106-
ps.cs.Schedule(func(context.Context) {
106+
ps.cs.TrySchedule(func(context.Context) {
107107
ps.mu.Lock()
108108
defer ps.mu.Unlock()
109109
if !ps.subscribers[s] {

resolver_wrapper.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ func newCCResolverWrapper(cc *ClientConn) *ccResolverWrapper {
6666
// any newly created ccResolverWrapper, except that close may be called instead.
6767
func (ccr *ccResolverWrapper) start() error {
6868
errCh := make(chan error)
69-
ccr.serializer.Schedule(func(ctx context.Context) {
69+
ccr.serializer.TrySchedule(func(ctx context.Context) {
7070
if ctx.Err() != nil {
7171
return
7272
}
@@ -85,7 +85,7 @@ func (ccr *ccResolverWrapper) start() error {
8585
}
8686

8787
func (ccr *ccResolverWrapper) resolveNow(o resolver.ResolveNowOptions) {
88-
ccr.serializer.Schedule(func(ctx context.Context) {
88+
ccr.serializer.TrySchedule(func(ctx context.Context) {
8989
if ctx.Err() != nil || ccr.resolver == nil {
9090
return
9191
}
@@ -102,7 +102,7 @@ func (ccr *ccResolverWrapper) close() {
102102
ccr.closed = true
103103
ccr.mu.Unlock()
104104

105-
ccr.serializer.Schedule(func(context.Context) {
105+
ccr.serializer.TrySchedule(func(context.Context) {
106106
if ccr.resolver == nil {
107107
return
108108
}

xds/internal/balancer/cdsbalancer/cdsbalancer.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -309,8 +309,8 @@ func (b *cdsBalancer) UpdateClientConnState(state balancer.ClientConnState) erro
309309
b.lbCfg = lbCfg
310310

311311
// Handle the update in a blocking fashion.
312-
done := make(chan struct{})
313-
ok = b.serializer.Schedule(func(context.Context) {
312+
errCh := make(chan error, 1)
313+
callback := func(context.Context) {
314314
// A config update with a changed top-level cluster name means that none
315315
// of our old watchers make any sense any more.
316316
b.closeAllWatchers()
@@ -319,20 +319,20 @@ func (b *cdsBalancer) UpdateClientConnState(state balancer.ClientConnState) erro
319319
// could end up creating more watchers if turns out to be an aggregate
320320
// cluster.
321321
b.createAndAddWatcherForCluster(lbCfg.ClusterName)
322-
close(done)
323-
})
324-
if !ok {
322+
errCh <- nil
323+
}
324+
onFailure := func() {
325325
// The call to Schedule returns false *only* if the serializer has been
326326
// closed, which happens only when we receive an update after close.
327-
return errBalancerClosed
327+
errCh <- errBalancerClosed
328328
}
329-
<-done
330-
return nil
329+
b.serializer.ScheduleOr(callback, onFailure)
330+
return <-errCh
331331
}
332332

333333
// ResolverError handles errors reported by the xdsResolver.
334334
func (b *cdsBalancer) ResolverError(err error) {
335-
b.serializer.Schedule(func(context.Context) {
335+
b.serializer.TrySchedule(func(context.Context) {
336336
// Resource not found error is reported by the resolver when the
337337
// top-level cluster resource is removed by the management server.
338338
if xdsresource.ErrType(err) == xdsresource.ErrorTypeResourceNotFound {
@@ -364,7 +364,7 @@ func (b *cdsBalancer) closeAllWatchers() {
364364
// Close cancels the CDS watch, closes the child policy and closes the
365365
// cdsBalancer.
366366
func (b *cdsBalancer) Close() {
367-
b.serializer.Schedule(func(ctx context.Context) {
367+
b.serializer.TrySchedule(func(ctx context.Context) {
368368
b.closeAllWatchers()
369369

370370
if b.childLB != nil {
@@ -384,7 +384,7 @@ func (b *cdsBalancer) Close() {
384384
}
385385

386386
func (b *cdsBalancer) ExitIdle() {
387-
b.serializer.Schedule(func(context.Context) {
387+
b.serializer.TrySchedule(func(context.Context) {
388388
if b.childLB == nil {
389389
b.logger.Warningf("Received ExitIdle with no child policy")
390390
return

xds/internal/balancer/cdsbalancer/cluster_watcher.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,19 +33,19 @@ type clusterWatcher struct {
3333
}
3434

3535
func (cw *clusterWatcher) OnUpdate(u *xdsresource.ClusterResourceData) {
36-
cw.parent.serializer.Schedule(func(context.Context) {
36+
cw.parent.serializer.TrySchedule(func(context.Context) {
3737
cw.parent.onClusterUpdate(cw.name, u.Resource)
3838
})
3939
}
4040

4141
func (cw *clusterWatcher) OnError(err error) {
42-
cw.parent.serializer.Schedule(func(context.Context) {
42+
cw.parent.serializer.TrySchedule(func(context.Context) {
4343
cw.parent.onClusterError(cw.name, err)
4444
})
4545
}
4646

4747
func (cw *clusterWatcher) OnResourceDoesNotExist() {
48-
cw.parent.serializer.Schedule(func(context.Context) {
48+
cw.parent.serializer.TrySchedule(func(context.Context) {
4949
cw.parent.onClusterResourceNotFound(cw.name)
5050
})
5151
}

xds/internal/balancer/clusterresolver/resource_resolver.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -287,7 +287,7 @@ func (rr *resourceResolver) generateLocked() {
287287
}
288288

289289
func (rr *resourceResolver) onUpdate() {
290-
rr.serializer.Schedule(func(context.Context) {
290+
rr.serializer.TrySchedule(func(context.Context) {
291291
rr.mu.Lock()
292292
rr.generateLocked()
293293
rr.mu.Unlock()

xds/internal/resolver/serviceconfig.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ func (cs *configSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*iresolver.RP
182182
if v := atomic.AddInt32(ref, -1); v == 0 {
183183
// This entry will be removed from activeClusters when
184184
// producing the service config for the empty update.
185-
cs.r.serializer.Schedule(func(context.Context) {
185+
cs.r.serializer.TrySchedule(func(context.Context) {
186186
cs.r.onClusterRefDownToZero()
187187
})
188188
}
@@ -326,7 +326,7 @@ func (cs *configSelector) stop() {
326326
// selector; we need another update to delete clusters from the config (if
327327
// we don't have another update pending already).
328328
if needUpdate {
329-
cs.r.serializer.Schedule(func(context.Context) {
329+
cs.r.serializer.TrySchedule(func(context.Context) {
330330
cs.r.onClusterRefDownToZero()
331331
})
332332
}

xds/internal/resolver/watch_service.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,19 +37,19 @@ func newListenerWatcher(resourceName string, parent *xdsResolver) *listenerWatch
3737
}
3838

3939
func (l *listenerWatcher) OnUpdate(update *xdsresource.ListenerResourceData) {
40-
l.parent.serializer.Schedule(func(context.Context) {
40+
l.parent.serializer.TrySchedule(func(context.Context) {
4141
l.parent.onListenerResourceUpdate(update.Resource)
4242
})
4343
}
4444

4545
func (l *listenerWatcher) OnError(err error) {
46-
l.parent.serializer.Schedule(func(context.Context) {
46+
l.parent.serializer.TrySchedule(func(context.Context) {
4747
l.parent.onListenerResourceError(err)
4848
})
4949
}
5050

5151
func (l *listenerWatcher) OnResourceDoesNotExist() {
52-
l.parent.serializer.Schedule(func(context.Context) {
52+
l.parent.serializer.TrySchedule(func(context.Context) {
5353
l.parent.onListenerResourceNotFound()
5454
})
5555
}
@@ -72,19 +72,19 @@ func newRouteConfigWatcher(resourceName string, parent *xdsResolver) *routeConfi
7272
}
7373

7474
func (r *routeConfigWatcher) OnUpdate(update *xdsresource.RouteConfigResourceData) {
75-
r.parent.serializer.Schedule(func(context.Context) {
75+
r.parent.serializer.TrySchedule(func(context.Context) {
7676
r.parent.onRouteConfigResourceUpdate(r.resourceName, update.Resource)
7777
})
7878
}
7979

8080
func (r *routeConfigWatcher) OnError(err error) {
81-
r.parent.serializer.Schedule(func(context.Context) {
81+
r.parent.serializer.TrySchedule(func(context.Context) {
8282
r.parent.onRouteConfigResourceError(r.resourceName, err)
8383
})
8484
}
8585

8686
func (r *routeConfigWatcher) OnResourceDoesNotExist() {
87-
r.parent.serializer.Schedule(func(context.Context) {
87+
r.parent.serializer.TrySchedule(func(context.Context) {
8888
r.parent.onRouteConfigResourceNotFound(r.resourceName)
8989
})
9090
}

0 commit comments

Comments
 (0)