Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.common.base.Stopwatch;
import io.grpc.Attributes;
import io.grpc.ChannelLogger.ChannelLogLevel;
import io.grpc.Context;
import io.grpc.EquivalentAddressGroup;
import io.grpc.LoadBalancer;
import io.grpc.Status;
Expand All @@ -45,6 +46,7 @@ class GrpclbLoadBalancer extends LoadBalancer {
private static final GrpclbConfig DEFAULT_CONFIG = GrpclbConfig.create(Mode.ROUND_ROBIN);

private final Helper helper;
private final Context context;
private final TimeProvider time;
private final Stopwatch stopwatch;
private final SubchannelPool subchannelPool;
Expand All @@ -58,11 +60,13 @@ class GrpclbLoadBalancer extends LoadBalancer {

GrpclbLoadBalancer(
Helper helper,
Context context,
SubchannelPool subchannelPool,
TimeProvider time,
Stopwatch stopwatch,
BackoffPolicy.Provider backoffPolicyProvider) {
this.helper = checkNotNull(helper, "helper");
this.context = checkNotNull(context, "context");
this.time = checkNotNull(time, "time provider");
this.stopwatch = checkNotNull(stopwatch, "stopwatch");
this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider");
Expand Down Expand Up @@ -131,7 +135,7 @@ private void recreateStates() {
checkState(grpclbState == null, "Should've been cleared");
grpclbState =
new GrpclbState(
config, helper, subchannelPool, time, stopwatch, backoffPolicyProvider);
config, helper, context, subchannelPool, time, stopwatch, backoffPolicyProvider);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.grpc.grpclb;

import com.google.common.base.Stopwatch;
import io.grpc.Context;
import io.grpc.Internal;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancerProvider;
Expand Down Expand Up @@ -62,6 +63,7 @@ public LoadBalancer newLoadBalancer(LoadBalancer.Helper helper) {
return
new GrpclbLoadBalancer(
helper,
Context.ROOT,
new CachedSubchannelPool(helper),
TimeProvider.SYSTEM_TIME_PROVIDER,
Stopwatch.createUnstarted(),
Expand Down
11 changes: 10 additions & 1 deletion grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import io.grpc.ChannelLogger.ChannelLogLevel;
import io.grpc.ConnectivityState;
import io.grpc.ConnectivityStateInfo;
import io.grpc.Context;
import io.grpc.EquivalentAddressGroup;
import io.grpc.LoadBalancer.CreateSubchannelArgs;
import io.grpc.LoadBalancer.Helper;
Expand Down Expand Up @@ -132,6 +133,7 @@ enum Mode {

private final String serviceName;
private final Helper helper;
private final Context context;
private final SynchronizationContext syncContext;
@Nullable
private final SubchannelPool subchannelPool;
Expand Down Expand Up @@ -182,12 +184,14 @@ enum Mode {
GrpclbState(
GrpclbConfig config,
Helper helper,
Context context,
SubchannelPool subchannelPool,
TimeProvider time,
Stopwatch stopwatch,
BackoffPolicy.Provider backoffPolicyProvider) {
this.config = checkNotNull(config, "config");
this.helper = checkNotNull(helper, "helper");
this.context = checkNotNull(context, "context");
this.syncContext = checkNotNull(helper.getSynchronizationContext(), "syncContext");
if (config.getMode() == Mode.ROUND_ROBIN) {
this.subchannelPool = checkNotNull(subchannelPool, "subchannelPool");
Expand Down Expand Up @@ -368,7 +372,12 @@ private void startLbRpc() {
checkState(lbStream == null, "previous lbStream has not been cleared yet");
LoadBalancerGrpc.LoadBalancerStub stub = LoadBalancerGrpc.newStub(lbCommChannel);
lbStream = new LbStream(stub);
lbStream.start();
Context baseContext = context.attach();
try {
lbStream.start();
} finally {
context.detach(baseContext);
}
stopwatch.reset().start();

LoadBalanceRequest initRequest = LoadBalanceRequest.newBuilder()
Expand Down
34 changes: 34 additions & 0 deletions grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@
import io.grpc.ClientStreamTracer;
import io.grpc.ConnectivityState;
import io.grpc.ConnectivityStateInfo;
import io.grpc.Context;
import io.grpc.Context.CancellableContext;
import io.grpc.EquivalentAddressGroup;
import io.grpc.LoadBalancer.CreateSubchannelArgs;
import io.grpc.LoadBalancer.Helper;
Expand Down Expand Up @@ -229,6 +231,7 @@ public Void answer(InvocationOnMock invocation) {
when(backoffPolicyProvider.get()).thenReturn(backoffPolicy1, backoffPolicy2);
balancer = new GrpclbLoadBalancer(
helper,
Context.ROOT,
subchannelPool,
fakeClock.getTimeProvider(),
fakeClock.getStopwatchSupplier().get(),
Expand Down Expand Up @@ -2683,6 +2686,37 @@ public void grpclbWorking_lbSendsFallbackMessage() {
.inOrder();
}

@Test
public void useIndependentRpcContext() {
// Simulates making RPCs within the context of an inbound RPC.
CancellableContext cancellableContext = Context.current().withCancellation();
Context baseContext = cancellableContext.attach();
try {
List<EquivalentAddressGroup> backendList = createResolvedBackendAddresses(2);
List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(2);
deliverResolvedAddresses(backendList, grpclbBalancerList);

verify(helper).createOobChannel(eq(xattr(grpclbBalancerList)),
eq(lbAuthority(0) + NO_USE_AUTHORITY_SUFFIX));
verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue();
assertEquals(1, lbRequestObservers.size());
StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll();
verify(lbRequestObserver).onNext(
eq(LoadBalanceRequest.newBuilder()
.setInitialRequest(
InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build())
.build()));

// The inbound RPC finishes and closes its context. The outbound RPC's control plane RPC
// should not be impacted.
cancellableContext.close();
verify(lbRequestObserver, never()).onError(any(Throwable.class));
} finally {
cancellableContext.detach(baseContext);
}
}

private void deliverSubchannelState(
final Subchannel subchannel, final ConnectivityStateInfo newState) {
((FakeSubchannel) subchannel).updateState(newState);
Expand Down