Skip to content

Commit 48ea63d

Browse files
authored
grpclb: use a standalone Context for gRPCLB control plane RPCs (v1.38.x backport) (#8154) (#8159)
Inject a standalone Context that is independent of application RPCs to GrpclbLoadBalancer for control plane RPCs. The control plane RPC should be independent and not impacted by the lifetime of Context used for application RPCs.
1 parent d2160ea commit 48ea63d

File tree

5 files changed

+54
-2
lines changed

5 files changed

+54
-2
lines changed

grpclb/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ java_library(
99
deps = [
1010
":load_balancer_java_grpc",
1111
"//api",
12+
"//context",
1213
"//core:internal",
1314
"//core:util",
1415
"//stub",

grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.google.common.base.Stopwatch;
2424
import io.grpc.Attributes;
2525
import io.grpc.ChannelLogger.ChannelLogLevel;
26+
import io.grpc.Context;
2627
import io.grpc.EquivalentAddressGroup;
2728
import io.grpc.LoadBalancer;
2829
import io.grpc.Status;
@@ -45,6 +46,7 @@ class GrpclbLoadBalancer extends LoadBalancer {
4546
private static final GrpclbConfig DEFAULT_CONFIG = GrpclbConfig.create(Mode.ROUND_ROBIN);
4647

4748
private final Helper helper;
49+
private final Context context;
4850
private final TimeProvider time;
4951
private final Stopwatch stopwatch;
5052
private final SubchannelPool subchannelPool;
@@ -58,11 +60,13 @@ class GrpclbLoadBalancer extends LoadBalancer {
5860

5961
GrpclbLoadBalancer(
6062
Helper helper,
63+
Context context,
6164
SubchannelPool subchannelPool,
6265
TimeProvider time,
6366
Stopwatch stopwatch,
6467
BackoffPolicy.Provider backoffPolicyProvider) {
6568
this.helper = checkNotNull(helper, "helper");
69+
this.context = checkNotNull(context, "context");
6670
this.time = checkNotNull(time, "time provider");
6771
this.stopwatch = checkNotNull(stopwatch, "stopwatch");
6872
this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider");
@@ -131,7 +135,7 @@ private void recreateStates() {
131135
checkState(grpclbState == null, "Should've been cleared");
132136
grpclbState =
133137
new GrpclbState(
134-
config, helper, subchannelPool, time, stopwatch, backoffPolicyProvider);
138+
config, helper, context, subchannelPool, time, stopwatch, backoffPolicyProvider);
135139
}
136140

137141
@Override

grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancerProvider.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package io.grpc.grpclb;
1818

1919
import com.google.common.base.Stopwatch;
20+
import io.grpc.Context;
2021
import io.grpc.Internal;
2122
import io.grpc.LoadBalancer;
2223
import io.grpc.LoadBalancerProvider;
@@ -62,6 +63,7 @@ public LoadBalancer newLoadBalancer(LoadBalancer.Helper helper) {
6263
return
6364
new GrpclbLoadBalancer(
6465
helper,
66+
Context.ROOT,
6567
new CachedSubchannelPool(helper),
6668
TimeProvider.SYSTEM_TIME_PROVIDER,
6769
Stopwatch.createUnstarted(),

grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import io.grpc.ChannelLogger.ChannelLogLevel;
3636
import io.grpc.ConnectivityState;
3737
import io.grpc.ConnectivityStateInfo;
38+
import io.grpc.Context;
3839
import io.grpc.EquivalentAddressGroup;
3940
import io.grpc.LoadBalancer.CreateSubchannelArgs;
4041
import io.grpc.LoadBalancer.Helper;
@@ -132,6 +133,7 @@ enum Mode {
132133

133134
private final String serviceName;
134135
private final Helper helper;
136+
private final Context context;
135137
private final SynchronizationContext syncContext;
136138
@Nullable
137139
private final SubchannelPool subchannelPool;
@@ -182,12 +184,14 @@ enum Mode {
182184
GrpclbState(
183185
GrpclbConfig config,
184186
Helper helper,
187+
Context context,
185188
SubchannelPool subchannelPool,
186189
TimeProvider time,
187190
Stopwatch stopwatch,
188191
BackoffPolicy.Provider backoffPolicyProvider) {
189192
this.config = checkNotNull(config, "config");
190193
this.helper = checkNotNull(helper, "helper");
194+
this.context = checkNotNull(context, "context");
191195
this.syncContext = checkNotNull(helper.getSynchronizationContext(), "syncContext");
192196
if (config.getMode() == Mode.ROUND_ROBIN) {
193197
this.subchannelPool = checkNotNull(subchannelPool, "subchannelPool");
@@ -368,7 +372,12 @@ private void startLbRpc() {
368372
checkState(lbStream == null, "previous lbStream has not been cleared yet");
369373
LoadBalancerGrpc.LoadBalancerStub stub = LoadBalancerGrpc.newStub(lbCommChannel);
370374
lbStream = new LbStream(stub);
371-
lbStream.start();
375+
Context prevContext = context.attach();
376+
try {
377+
lbStream.start();
378+
} finally {
379+
context.detach(prevContext);
380+
}
372381
stopwatch.reset().start();
373382

374383
LoadBalanceRequest initRequest = LoadBalanceRequest.newBuilder()

grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@
5555
import io.grpc.ClientStreamTracer;
5656
import io.grpc.ConnectivityState;
5757
import io.grpc.ConnectivityStateInfo;
58+
import io.grpc.Context;
59+
import io.grpc.Context.CancellableContext;
5860
import io.grpc.EquivalentAddressGroup;
5961
import io.grpc.LoadBalancer.CreateSubchannelArgs;
6062
import io.grpc.LoadBalancer.Helper;
@@ -229,6 +231,7 @@ public Void answer(InvocationOnMock invocation) {
229231
when(backoffPolicyProvider.get()).thenReturn(backoffPolicy1, backoffPolicy2);
230232
balancer = new GrpclbLoadBalancer(
231233
helper,
234+
Context.ROOT,
232235
subchannelPool,
233236
fakeClock.getTimeProvider(),
234237
fakeClock.getStopwatchSupplier().get(),
@@ -2683,6 +2686,39 @@ public void grpclbWorking_lbSendsFallbackMessage() {
26832686
.inOrder();
26842687
}
26852688

2689+
@Test
2690+
public void useIndependentRpcContext() {
2691+
// Simulates making RPCs within the context of an inbound RPC.
2692+
CancellableContext cancellableContext = Context.current().withCancellation();
2693+
Context prevContext = cancellableContext.attach();
2694+
try {
2695+
List<EquivalentAddressGroup> backendList = createResolvedBackendAddresses(2);
2696+
List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(2);
2697+
deliverResolvedAddresses(backendList, grpclbBalancerList);
2698+
2699+
verify(helper).createOobChannel(eq(xattr(grpclbBalancerList)),
2700+
eq(lbAuthority(0) + NO_USE_AUTHORITY_SUFFIX));
2701+
verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
2702+
StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue();
2703+
assertEquals(1, lbRequestObservers.size());
2704+
StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll();
2705+
verify(lbRequestObserver).onNext(
2706+
eq(LoadBalanceRequest.newBuilder()
2707+
.setInitialRequest(
2708+
InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build())
2709+
.build()));
2710+
lbResponseObserver.onNext(buildInitialResponse());
2711+
2712+
// The inbound RPC finishes and closes its context. The outbound RPC's control plane RPC
2713+
// should not be impacted (no retry).
2714+
cancellableContext.close();
2715+
assertEquals(0, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER));
2716+
verifyNoMoreInteractions(mockLbService);
2717+
} finally {
2718+
cancellableContext.detach(prevContext);
2719+
}
2720+
}
2721+
26862722
private void deliverSubchannelState(
26872723
final Subchannel subchannel, final ConnectivityStateInfo newState) {
26882724
((FakeSubchannel) subchannel).updateState(newState);

0 commit comments

Comments
 (0)