Skip to content

xds: Update error handling for ADS stream close and failure scenarios #11596

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Oct 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 29 additions & 16 deletions xds/src/main/java/io/grpc/xds/client/ControlPlaneClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package io.grpc.xds.client;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;

Expand Down Expand Up @@ -60,7 +59,6 @@
*/
final class ControlPlaneClient {

public static final String CLOSED_BY_SERVER = "Closed by server";
private final SynchronizationContext syncContext;
private final InternalLogId logId;
private final XdsLogger logger;
Expand Down Expand Up @@ -358,11 +356,7 @@ public void run() {
@Override
public void onStatusReceived(final Status status) {
syncContext.execute(() -> {
if (status.isOk()) {
handleRpcStreamClosed(Status.UNAVAILABLE.withDescription(CLOSED_BY_SERVER));
} else {
handleRpcStreamClosed(status);
}
handleRpcStreamClosed(status);
});
}

Expand All @@ -381,7 +375,7 @@ final void handleRpcResponse(XdsResourceType<?> type, String versionInfo, List<A
processingTracker.onComplete();
}

private void handleRpcStreamClosed(Status error) {
private void handleRpcStreamClosed(Status status) {
if (closed) {
return;
}
Expand All @@ -399,15 +393,34 @@ private void handleRpcStreamClosed(Status error) {
rpcRetryTimer = syncContext.schedule(
new RpcRetryTask(), delayNanos, TimeUnit.NANOSECONDS, timeService);

checkArgument(!error.isOk(), "unexpected OK status");
String errorMsg = error.getDescription() != null
&& error.getDescription().equals(CLOSED_BY_SERVER)
? "ADS stream closed with status {0}: {1}. Cause: {2}"
: "ADS stream failed with status {0}: {1}. Cause: {2}";
logger.log(
XdsLogLevel.ERROR, errorMsg, error.getCode(), error.getDescription(), error.getCause());
Status newStatus = status;
if (responseReceived) {
// A closed ADS stream after a successful response is not considered an error. Servers may
// close streams for various reasons during normal operation, such as load balancing or
// underlying connection hitting its max connection age limit (see gRFC A9).
if (!status.isOk()) {
newStatus = Status.OK;
logger.log( XdsLogLevel.DEBUG, "ADS stream closed with error {0}: {1}. However, a "
+ "response was received, so this will not be treated as an error. Cause: {2}",
status.getCode(), status.getDescription(), status.getCause());
} else {
logger.log(XdsLogLevel.DEBUG,
"ADS stream closed by server after a response was received");
}
} else {
// If the ADS stream is closed without ever having received a response from the server, then
// the XdsClient should consider that a connectivity error (see gRFC A57).
if (status.isOk()) {
newStatus = Status.UNAVAILABLE.withDescription(
"ADS stream closed with OK before receiving a response");
}
logger.log(
XdsLogLevel.ERROR, "ADS stream failed with status {0}: {1}. Cause: {2}",
newStatus.getCode(), newStatus.getDescription(), newStatus.getCause());
}

closed = true;
xdsResponseHandler.handleStreamClosed(error);
xdsResponseHandler.handleStreamClosed(newStatus);
cleanUp();

logger.log(XdsLogLevel.INFO, "Retry ADS stream in {0} ns", delayNanos);
Expand Down
12 changes: 7 additions & 5 deletions xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,13 @@ public void handleResourceResponse(
public void handleStreamClosed(Status error) {
syncContext.throwIfNotInThisSynchronizationContext();
cleanUpResourceTimers();
for (Map<String, ResourceSubscriber<? extends ResourceUpdate>> subscriberMap :
resourceSubscribers.values()) {
for (ResourceSubscriber<? extends ResourceUpdate> subscriber : subscriberMap.values()) {
if (!subscriber.hasResult()) {
subscriber.onError(error, null);
if (!error.isOk()) {
for (Map<String, ResourceSubscriber<? extends ResourceUpdate>> subscriberMap :
resourceSubscribers.values()) {
for (ResourceSubscriber<? extends ResourceUpdate> subscriber : subscriberMap.values()) {
if (!subscriber.hasResult()) {
subscriber.onError(error, null);
}
}
}
}
Expand Down
55 changes: 45 additions & 10 deletions xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -3331,6 +3331,43 @@ public void useIndependentRpcContext() {
}
}

@Test
public void streamClosedWithNoResponse() {
xdsClient.watchXdsResource(XdsListenerResource.getInstance(),LDS_RESOURCE, ldsResourceWatcher);
xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(),RDS_RESOURCE,
rdsResourceWatcher);
DiscoveryRpcCall call = resourceDiscoveryCalls.poll();
// Management server closes the RPC stream before sending any response.
call.sendCompleted();
verify(ldsResourceWatcher, Mockito.timeout(1000).times(1))
.onError(errorCaptor.capture());
verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE,
"ADS stream closed with OK before receiving a response");
verify(rdsResourceWatcher).onError(errorCaptor.capture());
verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE,
"ADS stream closed with OK before receiving a response");
}

@Test
public void streamClosedAfterSendingResponses() {
xdsClient.watchXdsResource(XdsListenerResource.getInstance(),LDS_RESOURCE, ldsResourceWatcher);
xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(),RDS_RESOURCE,
rdsResourceWatcher);
DiscoveryRpcCall call = resourceDiscoveryCalls.poll();
ScheduledTask ldsResourceTimeout =
Iterables.getOnlyElement(fakeClock.getPendingTasks(LDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER));
ScheduledTask rdsResourceTimeout =
Iterables.getOnlyElement(fakeClock.getPendingTasks(RDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER));
call.sendResponse(LDS, testListenerRds, VERSION_1, "0000");
assertThat(ldsResourceTimeout.isCancelled()).isTrue();
call.sendResponse(RDS, testRouteConfig, VERSION_1, "0000");
assertThat(rdsResourceTimeout.isCancelled()).isTrue();
// Management server closes the RPC stream after sending responses.
call.sendCompleted();
verify(ldsResourceWatcher, never()).onError(errorCaptor.capture());
verify(rdsResourceWatcher, never()).onError(errorCaptor.capture());
}

@Test
public void streamClosedAndRetryWithBackoff() {
InOrder inOrder = Mockito.inOrder(backoffPolicyProvider, backoffPolicy1, backoffPolicy2);
Expand Down Expand Up @@ -3408,10 +3445,10 @@ public void streamClosedAndRetryWithBackoff() {
call.sendError(Status.DEADLINE_EXCEEDED.asException());
verify(ldsResourceWatcher, times(2)).onError(errorCaptor.capture());
verify(rdsResourceWatcher, times(2)).onError(errorCaptor.capture());
verify(cdsResourceWatcher, times(3)).onError(errorCaptor.capture());
verifyStatusWithNodeId(errorCaptor.getValue(), Code.DEADLINE_EXCEEDED, "");
verify(edsResourceWatcher, times(3)).onError(errorCaptor.capture());
verifyStatusWithNodeId(errorCaptor.getValue(), Code.DEADLINE_EXCEEDED, "");
verify(cdsResourceWatcher, times(2)).onError(errorCaptor.capture());
verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, errorMsg);
verify(edsResourceWatcher, times(2)).onError(errorCaptor.capture());
verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, errorMsg);

// Reset backoff sequence and retry after backoff.
inOrder.verify(backoffPolicyProvider).get();
Expand All @@ -3430,9 +3467,9 @@ public void streamClosedAndRetryWithBackoff() {
call.sendError(Status.UNAVAILABLE.asException());
verify(ldsResourceWatcher, times(2)).onError(errorCaptor.capture());
verify(rdsResourceWatcher, times(2)).onError(errorCaptor.capture());
verify(cdsResourceWatcher, times(4)).onError(errorCaptor.capture());
verify(cdsResourceWatcher, times(3)).onError(errorCaptor.capture());
verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, "");
verify(edsResourceWatcher, times(4)).onError(errorCaptor.capture());
verify(edsResourceWatcher, times(3)).onError(errorCaptor.capture());
verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, "");

// Retry after backoff.
Expand Down Expand Up @@ -3516,10 +3553,8 @@ public void streamClosedAndRetryRestartsResourceInitialFetchTimerForUnresolvedRe
assertThat(edsResourceTimeout.isCancelled()).isTrue();
verify(ldsResourceWatcher, never()).onError(errorCaptor.capture());
verify(rdsResourceWatcher, never()).onError(errorCaptor.capture());
verify(cdsResourceWatcher).onError(errorCaptor.capture());
verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, "");
verify(edsResourceWatcher).onError(errorCaptor.capture());
verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, "");
verify(cdsResourceWatcher, never()).onError(errorCaptor.capture());
verify(edsResourceWatcher, never()).onError(errorCaptor.capture());

fakeClock.forwardNanos(10L);
assertThat(fakeClock.getPendingTasks(LDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).hasSize(0);
Expand Down