Skip to content

Commit 164cf7c

Browse files
committed
grpclb: use a standalone Context for gRPCLB control plane RPCs (grpc#8154)
Inject a standalone Context that is independent of application RPCs to GrpclbLoadBalancer for control plane RPCs. The control plane RPC should be independent and not impacted by the lifetime of Context used for application RPCs.
1 parent 332a2d8 commit 164cf7c

File tree

5 files changed

+58
-2
lines changed

5 files changed

+58
-2
lines changed

grpclb/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ java_library(
99
deps = [
1010
":load_balancer_java_grpc",
1111
"//api",
12+
"//context",
1213
"//core:internal",
1314
"//core:util",
1415
"//stub",

grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.google.common.base.Stopwatch;
2424
import io.grpc.Attributes;
2525
import io.grpc.ChannelLogger.ChannelLogLevel;
26+
import io.grpc.Context;
2627
import io.grpc.EquivalentAddressGroup;
2728
import io.grpc.LoadBalancer;
2829
import io.grpc.Status;
@@ -45,6 +46,7 @@ class GrpclbLoadBalancer extends LoadBalancer {
4546
private static final GrpclbConfig DEFAULT_CONFIG = GrpclbConfig.create(Mode.ROUND_ROBIN);
4647

4748
private final Helper helper;
49+
private final Context context;
4850
private final TimeProvider time;
4951
private final Stopwatch stopwatch;
5052
private final SubchannelPool subchannelPool;
@@ -58,11 +60,13 @@ class GrpclbLoadBalancer extends LoadBalancer {
5860

5961
GrpclbLoadBalancer(
6062
Helper helper,
63+
Context context,
6164
SubchannelPool subchannelPool,
6265
TimeProvider time,
6366
Stopwatch stopwatch,
6467
BackoffPolicy.Provider backoffPolicyProvider) {
6568
this.helper = checkNotNull(helper, "helper");
69+
this.context = checkNotNull(context, "context");
6670
this.time = checkNotNull(time, "time provider");
6771
this.stopwatch = checkNotNull(stopwatch, "stopwatch");
6872
this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider");
@@ -128,7 +132,7 @@ private void recreateStates() {
128132
checkState(grpclbState == null, "Should've been cleared");
129133
grpclbState =
130134
new GrpclbState(
131-
config, helper, subchannelPool, time, stopwatch, backoffPolicyProvider);
135+
config, helper, context, subchannelPool, time, stopwatch, backoffPolicyProvider);
132136
}
133137

134138
@Override

grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancerProvider.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package io.grpc.grpclb;
1818

1919
import com.google.common.base.Stopwatch;
20+
import io.grpc.Context;
2021
import io.grpc.Internal;
2122
import io.grpc.LoadBalancer;
2223
import io.grpc.LoadBalancerProvider;
@@ -62,6 +63,7 @@ public LoadBalancer newLoadBalancer(LoadBalancer.Helper helper) {
6263
return
6364
new GrpclbLoadBalancer(
6465
helper,
66+
Context.ROOT,
6567
new CachedSubchannelPool(helper),
6668
TimeProvider.SYSTEM_TIME_PROVIDER,
6769
Stopwatch.createUnstarted(),

grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import io.grpc.ChannelLogger.ChannelLogLevel;
3636
import io.grpc.ConnectivityState;
3737
import io.grpc.ConnectivityStateInfo;
38+
import io.grpc.Context;
3839
import io.grpc.EquivalentAddressGroup;
3940
import io.grpc.LoadBalancer.CreateSubchannelArgs;
4041
import io.grpc.LoadBalancer.Helper;
@@ -116,6 +117,7 @@ enum Mode {
116117

117118
private final String serviceName;
118119
private final Helper helper;
120+
private final Context context;
119121
private final SynchronizationContext syncContext;
120122
@Nullable
121123
private final SubchannelPool subchannelPool;
@@ -163,12 +165,14 @@ enum Mode {
163165
GrpclbState(
164166
GrpclbConfig config,
165167
Helper helper,
168+
Context context,
166169
SubchannelPool subchannelPool,
167170
TimeProvider time,
168171
Stopwatch stopwatch,
169172
BackoffPolicy.Provider backoffPolicyProvider) {
170173
this.config = checkNotNull(config, "config");
171174
this.helper = checkNotNull(helper, "helper");
175+
this.context = checkNotNull(context, "context");
172176
this.syncContext = checkNotNull(helper.getSynchronizationContext(), "syncContext");
173177
if (config.getMode() == Mode.ROUND_ROBIN) {
174178
this.subchannelPool = checkNotNull(subchannelPool, "subchannelPool");
@@ -342,7 +346,12 @@ private void startLbRpc() {
342346
checkState(lbStream == null, "previous lbStream has not been cleared yet");
343347
LoadBalancerGrpc.LoadBalancerStub stub = LoadBalancerGrpc.newStub(lbCommChannel);
344348
lbStream = new LbStream(stub);
345-
lbStream.start();
349+
Context prevContext = context.attach();
350+
try {
351+
lbStream.start();
352+
} finally {
353+
context.detach(prevContext);
354+
}
346355
stopwatch.reset().start();
347356

348357
LoadBalanceRequest initRequest = LoadBalanceRequest.newBuilder()

grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@
5454
import io.grpc.ClientStreamTracer;
5555
import io.grpc.ConnectivityState;
5656
import io.grpc.ConnectivityStateInfo;
57+
import io.grpc.Context;
58+
import io.grpc.Context.CancellableContext;
5759
import io.grpc.EquivalentAddressGroup;
5860
import io.grpc.LoadBalancer.CreateSubchannelArgs;
5961
import io.grpc.LoadBalancer.Helper;
@@ -227,6 +229,7 @@ public Void answer(InvocationOnMock invocation) {
227229
when(backoffPolicyProvider.get()).thenReturn(backoffPolicy1, backoffPolicy2);
228230
balancer = new GrpclbLoadBalancer(
229231
helper,
232+
Context.ROOT,
230233
subchannelPool,
231234
fakeClock.getTimeProvider(),
232235
fakeClock.getStopwatchSupplier().get(),
@@ -2474,6 +2477,43 @@ public void grpclbWorking_lbSendsFallbackMessage() {
24742477
.inOrder();
24752478
}
24762479

2480+
@Test
2481+
public void useIndependentRpcContext() {
2482+
// Simulates making RPCs within the context of an inbound RPC.
2483+
CancellableContext cancellableContext = Context.current().withCancellation();
2484+
Context prevContext = cancellableContext.attach();
2485+
try {
2486+
List<EquivalentAddressGroup> backendList = createResolvedBackendAddresses(2);
2487+
List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(2);
2488+
deliverResolvedAddresses(backendList, grpclbBalancerList);
2489+
2490+
List<SocketAddress> addrs = new ArrayList<>();
2491+
addrs.addAll(grpclbBalancerList.get(0).getAddresses());
2492+
addrs.addAll(grpclbBalancerList.get(1).getAddresses());
2493+
Attributes attr = grpclbBalancerList.get(0).getAttributes();
2494+
EquivalentAddressGroup oobChannelEag = new EquivalentAddressGroup(addrs, attr);
2495+
verify(helper).createOobChannel(eq(oobChannelEag), eq(lbAuthority(0)));
2496+
verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
2497+
StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue();
2498+
assertEquals(1, lbRequestObservers.size());
2499+
StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll();
2500+
verify(lbRequestObserver).onNext(
2501+
eq(LoadBalanceRequest.newBuilder()
2502+
.setInitialRequest(
2503+
InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build())
2504+
.build()));
2505+
lbResponseObserver.onNext(buildInitialResponse());
2506+
2507+
// The inbound RPC finishes and closes its context. The outbound RPC's control plane RPC
2508+
// should not be impacted (no retry).
2509+
cancellableContext.close();
2510+
assertEquals(0, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER));
2511+
verifyNoMoreInteractions(mockLbService);
2512+
} finally {
2513+
cancellableContext.detach(prevContext);
2514+
}
2515+
}
2516+
24772517
private void deliverSubchannelState(
24782518
final Subchannel subchannel, final ConnectivityStateInfo newState) {
24792519
((FakeSubchannel) subchannel).updateState(newState);

0 commit comments

Comments
 (0)