Skip to content

Commit f049530

Browse files
authored
xds: use a standalone Context for xDS control plane RPCs (v1.38.x backport) (#8153) (#8157)
Control plane RPCs are independent of application RPCs, they can stand for completely different lifetime. So the context for making application RPCs should not be propagated to control plane RPCs. This change makes control plane RPCs use the ROOT Context.
1 parent 48ea63d commit f049530

File tree

6 files changed

+55
-10
lines changed

6 files changed

+55
-10
lines changed

xds/src/main/java/io/grpc/xds/AbstractXdsClient.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import io.envoyproxy.envoy.service.discovery.v3.AggregatedDiscoveryServiceGrpc;
2929
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest;
3030
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse;
31+
import io.grpc.Context;
3132
import io.grpc.InternalLogId;
3233
import io.grpc.ManagedChannel;
3334
import io.grpc.Status;
@@ -81,6 +82,7 @@ public void uncaughtException(Thread t, Throwable e) {
8182
private final InternalLogId logId;
8283
private final XdsLogger logger;
8384
private final ManagedChannel channel;
85+
private final Context context;
8486
private final ScheduledExecutorService timeService;
8587
private final BackoffPolicy.Provider backoffPolicyProvider;
8688
private final Stopwatch stopwatch;
@@ -103,10 +105,11 @@ public void uncaughtException(Thread t, Throwable e) {
103105
private ScheduledHandle rpcRetryTimer;
104106

105107
AbstractXdsClient(ManagedChannel channel, Bootstrapper.BootstrapInfo bootstrapInfo,
106-
ScheduledExecutorService timeService, BackoffPolicy.Provider backoffPolicyProvider,
107-
Supplier<Stopwatch> stopwatchSupplier) {
108+
Context context, ScheduledExecutorService timeService,
109+
BackoffPolicy.Provider backoffPolicyProvider, Supplier<Stopwatch> stopwatchSupplier) {
108110
this.channel = checkNotNull(channel, "channel");
109111
this.bootstrapInfo = checkNotNull(bootstrapInfo, "bootstrapInfo");
112+
this.context = checkNotNull(context, "context");
110113
this.timeService = checkNotNull(timeService, "timeService");
111114
this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider");
112115
stopwatch = checkNotNull(stopwatchSupplier, "stopwatchSupplier").get();
@@ -305,7 +308,12 @@ private void startRpcStream() {
305308
} else {
306309
adsStream = new AdsStreamV2();
307310
}
308-
adsStream.start();
311+
Context prevContext = context.attach();
312+
try {
313+
adsStream.start();
314+
} finally {
315+
context.detach(prevContext);
316+
}
309317
logger.log(XdsLogLevel.INFO, "ADS stream started");
310318
stopwatch.reset().start();
311319
}

xds/src/main/java/io/grpc/xds/ClientXdsClient.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.Rds;
4949
import io.envoyproxy.envoy.type.v3.FractionalPercent;
5050
import io.envoyproxy.envoy.type.v3.FractionalPercent.DenominatorType;
51+
import io.grpc.Context;
5152
import io.grpc.EquivalentAddressGroup;
5253
import io.grpc.ManagedChannel;
5354
import io.grpc.Status;
@@ -132,13 +133,13 @@ final class ClientXdsClient extends AbstractXdsClient {
132133
private boolean reportingLoad;
133134

134135
ClientXdsClient(
135-
ManagedChannel channel, Bootstrapper.BootstrapInfo bootstrapInfo,
136+
ManagedChannel channel, Bootstrapper.BootstrapInfo bootstrapInfo, Context context,
136137
ScheduledExecutorService timeService, BackoffPolicy.Provider backoffPolicyProvider,
137138
Supplier<Stopwatch> stopwatchSupplier, TimeProvider timeProvider) {
138-
super(channel, bootstrapInfo, timeService, backoffPolicyProvider, stopwatchSupplier);
139+
super(channel, bootstrapInfo, context, timeService, backoffPolicyProvider, stopwatchSupplier);
139140
loadStatsManager = new LoadStatsManager2(stopwatchSupplier);
140141
this.timeProvider = timeProvider;
141-
lrsClient = new LoadReportClient(loadStatsManager, channel,
142+
lrsClient = new LoadReportClient(loadStatsManager, channel, context,
142143
bootstrapInfo.getServers().get(0).isUseProtocolV3(), bootstrapInfo.getNode(),
143144
getSyncContext(), timeService, backoffPolicyProvider, stopwatchSupplier);
144145
}

xds/src/main/java/io/grpc/xds/LoadReportClient.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import io.envoyproxy.envoy.service.load_stats.v3.LoadReportingServiceGrpc.LoadReportingServiceStub;
2929
import io.envoyproxy.envoy.service.load_stats.v3.LoadStatsRequest;
3030
import io.envoyproxy.envoy.service.load_stats.v3.LoadStatsResponse;
31+
import io.grpc.Context;
3132
import io.grpc.InternalLogId;
3233
import io.grpc.ManagedChannel;
3334
import io.grpc.Status;
@@ -55,6 +56,7 @@ final class LoadReportClient {
5556
private final InternalLogId logId;
5657
private final XdsLogger logger;
5758
private final ManagedChannel channel;
59+
private final Context context;
5860
private final boolean useProtocolV3;
5961
private final Node node;
6062
private final SynchronizationContext syncContext;
@@ -74,6 +76,7 @@ final class LoadReportClient {
7476
LoadReportClient(
7577
LoadStatsManager2 loadStatsManager,
7678
ManagedChannel channel,
79+
Context context,
7780
boolean useProtocolV3,
7881
Node node,
7982
SynchronizationContext syncContext,
@@ -82,6 +85,7 @@ final class LoadReportClient {
8285
Supplier<Stopwatch> stopwatchSupplier) {
8386
this.loadStatsManager = checkNotNull(loadStatsManager, "loadStatsManager");
8487
this.channel = checkNotNull(channel, "xdsChannel");
88+
this.context = checkNotNull(context, "context");
8589
this.useProtocolV3 = useProtocolV3;
8690
this.syncContext = checkNotNull(syncContext, "syncContext");
8791
this.timerService = checkNotNull(scheduledExecutorService, "timeService");
@@ -163,7 +167,12 @@ private void startLrsRpc() {
163167
lrsStream = new LrsStreamV2();
164168
}
165169
retryStopwatch.reset().start();
166-
lrsStream.start();
170+
Context prevContext = context.attach();
171+
try {
172+
lrsStream.start();
173+
} finally {
174+
context.detach(prevContext);
175+
}
167176
}
168177

169178
private abstract class LrsStream {

xds/src/main/java/io/grpc/xds/SharedXdsClientPoolProvider.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import com.google.common.annotations.VisibleForTesting;
2222
import io.grpc.ChannelCredentials;
23+
import io.grpc.Context;
2324
import io.grpc.Grpc;
2425
import io.grpc.ManagedChannel;
2526
import io.grpc.internal.ExponentialBackoffPolicy;
@@ -105,6 +106,7 @@ private static class SharedXdsClientPoolProviderHolder {
105106
@ThreadSafe
106107
@VisibleForTesting
107108
static class RefCountedXdsClientObjectPool implements ObjectPool<XdsClient> {
109+
private final Context context = Context.ROOT;
108110
private final BootstrapInfo bootstrapInfo;
109111
private final Object lock = new Object();
110112
@GuardedBy("lock")
@@ -132,7 +134,7 @@ public XdsClient getObject() {
132134
.keepAliveTime(5, TimeUnit.MINUTES)
133135
.build();
134136
scheduler = SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE);
135-
xdsClient = new ClientXdsClient(channel, bootstrapInfo, scheduler,
137+
xdsClient = new ClientXdsClient(channel, bootstrapInfo, context, scheduler,
136138
new ExponentialBackoffPolicy.Provider(), GrpcUtil.STOPWATCH_SUPPLIER,
137139
TimeProvider.SYSTEM_TIME_PROVIDER);
138140
}

xds/src/test/java/io/grpc/xds/ClientXdsClientTestBase.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@
2222
import static io.grpc.xds.AbstractXdsClient.ResourceType.EDS;
2323
import static io.grpc.xds.AbstractXdsClient.ResourceType.LDS;
2424
import static io.grpc.xds.AbstractXdsClient.ResourceType.RDS;
25+
import static org.mockito.ArgumentMatchers.any;
2526
import static org.mockito.Mockito.mock;
27+
import static org.mockito.Mockito.never;
2628
import static org.mockito.Mockito.times;
2729
import static org.mockito.Mockito.verify;
2830
import static org.mockito.Mockito.verifyNoInteractions;
@@ -40,6 +42,8 @@
4042
import io.envoyproxy.envoy.config.route.v3.FilterConfig;
4143
import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.SdsSecretConfig;
4244
import io.grpc.BindableService;
45+
import io.grpc.Context;
46+
import io.grpc.Context.CancellableContext;
4347
import io.grpc.InsecureChannelCredentials;
4448
import io.grpc.ManagedChannel;
4549
import io.grpc.Status;
@@ -270,6 +274,7 @@ public void setUp() throws IOException {
270274
new ClientXdsClient(
271275
channel,
272276
bootstrapInfo,
277+
Context.ROOT,
273278
fakeClock.getScheduledExecutorService(),
274279
backoffPolicyProvider,
275280
fakeClock.getStopwatchSupplier(),
@@ -1776,6 +1781,26 @@ public void multipleEdsWatchers() {
17761781
verifySubscribedResourcesMetadataSizes(0, 0, 0, 2);
17771782
}
17781783

1784+
@Test
1785+
public void useIndependentRpcContext() {
1786+
// Simulates making RPCs within the context of an inbound RPC.
1787+
CancellableContext cancellableContext = Context.current().withCancellation();
1788+
Context prevContext = cancellableContext.attach();
1789+
try {
1790+
DiscoveryRpcCall call = startResourceWatcher(LDS, LDS_RESOURCE, ldsResourceWatcher);
1791+
1792+
// The inbound RPC finishes and closes its context. The outbound RPC's control plane RPC
1793+
// should not be impacted.
1794+
cancellableContext.close();
1795+
verify(ldsResourceWatcher, never()).onError(any(Status.class));
1796+
1797+
call.sendResponse(LDS, testListenerRds, VERSION_1, "0000");
1798+
verify(ldsResourceWatcher).onChanged(any(LdsUpdate.class));
1799+
} finally {
1800+
cancellableContext.detach(prevContext);
1801+
}
1802+
}
1803+
17791804
@Test
17801805
public void streamClosedAndRetryWithBackoff() {
17811806
InOrder inOrder = Mockito.inOrder(backoffPolicyProvider, backoffPolicy1, backoffPolicy2);

xds/src/test/java/io/grpc/xds/LoadReportClientTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -172,8 +172,8 @@ public void cancelled(Context context) {
172172
when(backoffPolicy2.nextBackoffNanos())
173173
.thenReturn(TimeUnit.SECONDS.toNanos(2L), TimeUnit.SECONDS.toNanos(20L));
174174
addFakeStatsData();
175-
lrsClient = new LoadReportClient(loadStatsManager, channel, false, NODE, syncContext,
176-
fakeClock.getScheduledExecutorService(), backoffPolicyProvider,
175+
lrsClient = new LoadReportClient(loadStatsManager, channel, Context.ROOT, false, NODE,
176+
syncContext, fakeClock.getScheduledExecutorService(), backoffPolicyProvider,
177177
fakeClock.getStopwatchSupplier());
178178
syncContext.execute(new Runnable() {
179179
@Override

0 commit comments

Comments
 (0)