Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds: Update error handling for ADS stream close and failure scenarios #11596

Merged
merged 6 commits into from
Oct 9, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 35 additions & 16 deletions xds/src/main/java/io/grpc/xds/client/ControlPlaneClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package io.grpc.xds.client;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;

Expand Down Expand Up @@ -60,7 +59,8 @@
*/
final class ControlPlaneClient {

public static final String CLOSED_BY_SERVER = "Closed by server";
public static final String CLOSED_BY_SERVER_AFTER_RECEIVING_A_RESPONSE =
"Closed by server after receiving a response";
private final SynchronizationContext syncContext;
private final InternalLogId logId;
private final XdsLogger logger;
Expand Down Expand Up @@ -358,11 +358,7 @@ public void run() {
@Override
public void onStatusReceived(final Status status) {
syncContext.execute(() -> {
if (status.isOk()) {
handleRpcStreamClosed(Status.UNAVAILABLE.withDescription(CLOSED_BY_SERVER));
} else {
handleRpcStreamClosed(status);
}
handleRpcStreamClosed(status);
});
}

Expand All @@ -381,7 +377,7 @@ final void handleRpcResponse(XdsResourceType<?> type, String versionInfo, List<A
processingTracker.onComplete();
}

private void handleRpcStreamClosed(Status error) {
private void handleRpcStreamClosed(Status status) {
if (closed) {
return;
}
Expand All @@ -399,15 +395,38 @@ private void handleRpcStreamClosed(Status error) {
rpcRetryTimer = syncContext.schedule(
new RpcRetryTask(), delayNanos, TimeUnit.NANOSECONDS, timeService);

checkArgument(!error.isOk(), "unexpected OK status");
String errorMsg = error.getDescription() != null
&& error.getDescription().equals(CLOSED_BY_SERVER)
? "ADS stream closed with status {0}: {1}. Cause: {2}"
: "ADS stream failed with status {0}: {1}. Cause: {2}";
logger.log(
XdsLogLevel.ERROR, errorMsg, error.getCode(), error.getDescription(), error.getCause());
Status newStatus = status;
if (responseReceived) {
// A closed ADS stream after a successful response is not considered an error. Servers may
// close streams for various reasons during normal operation, such as load balancing or
// underlying connection hitting its max connection age limit (see gRFC A9).
if (!status.isOk()) {
newStatus = Status.UNAVAILABLE.withDescription(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why isn't newStatus being set to Status.OK? If you did that, then the if statement below and in XdsClientImpl could be simply (!newStatus.isOk()). Particularly for XdsClientImpl it would be much clearer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is what I initially had.
After looking at Go's error handling for same, I changed it to return an error instead.
I think we need to differentiate ADS stream being closed with Status.OK and being considered Status.OK because it received a response as stated in gRFC A57.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could do Status.OK.withDescription(

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated. New status is Status.OK when a ADS stream is closed with an error but a response has been received.

CLOSED_BY_SERVER_AFTER_RECEIVING_A_RESPONSE);
logger.log( XdsLogLevel.DEBUG, "ADS stream closed with error {0}: {1}. However, a "
+ "response was received, so this will not be treated as an error. Cause: {2}.",
status.getCode(), status.getDescription(), status.getCause());
} else {
logger.log(XdsLogLevel.DEBUG,
"ADS stream closed by server after responses received status {0}: {1}. Cause: {2}");
DNVindhya marked this conversation as resolved.
Show resolved Hide resolved
}
larry-safran marked this conversation as resolved.
Show resolved Hide resolved
} else {
// If the ADS stream is closed without ever having received a response from the server, then
// the XdsClient should consider that a connectivity error (see gRFC A57).
if (status.isOk()) {
newStatus = Status.UNAVAILABLE.withDescription(
"ADS stream failed, because connection was closed before receiving a response.");
ejona86 marked this conversation as resolved.
Show resolved Hide resolved
}
}

if (!newStatus.isOk() && !(newStatus.getDescription() != null
larry-safran marked this conversation as resolved.
Show resolved Hide resolved
&& newStatus.getDescription().contains(CLOSED_BY_SERVER_AFTER_RECEIVING_A_RESPONSE))) {
logger.log(
XdsLogLevel.ERROR, "ADS stream failed with status {0}: {1}. Cause: {2}",
newStatus.getCode(), newStatus.getDescription(), newStatus.getCause());
}
closed = true;
xdsResponseHandler.handleStreamClosed(error);
xdsResponseHandler.handleStreamClosed(newStatus);
cleanUp();

logger.log(XdsLogLevel.INFO, "Retry ADS stream in {0} ns", delayNanos);
Expand Down
16 changes: 11 additions & 5 deletions xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static io.grpc.xds.client.Bootstrapper.XDSTP_SCHEME;
import static io.grpc.xds.client.ControlPlaneClient.CLOSED_BY_SERVER_AFTER_RECEIVING_A_RESPONSE;
import static io.grpc.xds.client.XdsResourceType.ParsedResource;
import static io.grpc.xds.client.XdsResourceType.ValidatedResourceUpdate;

Expand Down Expand Up @@ -142,11 +143,16 @@ public void handleResourceResponse(
public void handleStreamClosed(Status error) {
syncContext.throwIfNotInThisSynchronizationContext();
cleanUpResourceTimers();
for (Map<String, ResourceSubscriber<? extends ResourceUpdate>> subscriberMap :
resourceSubscribers.values()) {
for (ResourceSubscriber<? extends ResourceUpdate> subscriber : subscriberMap.values()) {
if (!subscriber.hasResult()) {
subscriber.onError(error, null);
// Resource subscribers are not notified when the server closes the stream after a
// response is received.
if (!error.isOk() && !(error.getDescription() != null
&& error.getDescription().contains(CLOSED_BY_SERVER_AFTER_RECEIVING_A_RESPONSE))) {
for (Map<String, ResourceSubscriber<? extends ResourceUpdate>> subscriberMap :
resourceSubscribers.values()) {
for (ResourceSubscriber<? extends ResourceUpdate> subscriber : subscriberMap.values()) {
if (!subscriber.hasResult()) {
subscriber.onError(error, null);
}
}
}
}
Expand Down
40 changes: 30 additions & 10 deletions xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java
larry-safran marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -3331,6 +3331,28 @@ public void useIndependentRpcContext() {
}
}

@Test
public void streamClosedWithNoResponse() {
xdsClient.watchXdsResource(XdsListenerResource.getInstance(),LDS_RESOURCE, ldsResourceWatcher);
xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(),RDS_RESOURCE,
rdsResourceWatcher);
DiscoveryRpcCall call = resourceDiscoveryCalls.poll();
// Management server closes the RPC stream before sending any response.
call.sendCompleted();
verify(ldsResourceWatcher, Mockito.timeout(1000).times(1))
.onError(errorCaptor.capture());
verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE,
"ADS stream failed due to connectivity error");
verify(rdsResourceWatcher).onError(errorCaptor.capture());
verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE,
"ADS stream failed due to connectivity error");
ScheduledTask retryTask =
Iterables.getOnlyElement(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER));
assertThat(retryTask.getDelay(TimeUnit.NANOSECONDS)).isEqualTo(10L);
DNVindhya marked this conversation as resolved.
Show resolved Hide resolved

verifyNoMoreInteractions(ldsResourceWatcher, rdsResourceWatcher);
larry-safran marked this conversation as resolved.
Show resolved Hide resolved
}

@Test
public void streamClosedAndRetryWithBackoff() {
InOrder inOrder = Mockito.inOrder(backoffPolicyProvider, backoffPolicy1, backoffPolicy2);
Expand Down Expand Up @@ -3408,10 +3430,10 @@ public void streamClosedAndRetryWithBackoff() {
call.sendError(Status.DEADLINE_EXCEEDED.asException());
verify(ldsResourceWatcher, times(2)).onError(errorCaptor.capture());
verify(rdsResourceWatcher, times(2)).onError(errorCaptor.capture());
verify(cdsResourceWatcher, times(3)).onError(errorCaptor.capture());
verifyStatusWithNodeId(errorCaptor.getValue(), Code.DEADLINE_EXCEEDED, "");
verify(edsResourceWatcher, times(3)).onError(errorCaptor.capture());
verifyStatusWithNodeId(errorCaptor.getValue(), Code.DEADLINE_EXCEEDED, "");
verify(cdsResourceWatcher, times(2)).onError(errorCaptor.capture());
verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, errorMsg);
verify(edsResourceWatcher, times(2)).onError(errorCaptor.capture());
verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, errorMsg);

// Reset backoff sequence and retry after backoff.
inOrder.verify(backoffPolicyProvider).get();
Expand All @@ -3430,9 +3452,9 @@ public void streamClosedAndRetryWithBackoff() {
call.sendError(Status.UNAVAILABLE.asException());
verify(ldsResourceWatcher, times(2)).onError(errorCaptor.capture());
verify(rdsResourceWatcher, times(2)).onError(errorCaptor.capture());
verify(cdsResourceWatcher, times(4)).onError(errorCaptor.capture());
verify(cdsResourceWatcher, times(3)).onError(errorCaptor.capture());
verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, "");
verify(edsResourceWatcher, times(4)).onError(errorCaptor.capture());
verify(edsResourceWatcher, times(3)).onError(errorCaptor.capture());
verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, "");

// Retry after backoff.
Expand Down Expand Up @@ -3516,10 +3538,8 @@ public void streamClosedAndRetryRestartsResourceInitialFetchTimerForUnresolvedRe
assertThat(edsResourceTimeout.isCancelled()).isTrue();
verify(ldsResourceWatcher, never()).onError(errorCaptor.capture());
verify(rdsResourceWatcher, never()).onError(errorCaptor.capture());
verify(cdsResourceWatcher).onError(errorCaptor.capture());
verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, "");
verify(edsResourceWatcher).onError(errorCaptor.capture());
verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, "");
verify(cdsResourceWatcher, never()).onError(errorCaptor.capture());
verify(edsResourceWatcher, never()).onError(errorCaptor.capture());

fakeClock.forwardNanos(10L);
assertThat(fakeClock.getPendingTasks(LDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).hasSize(0);
Expand Down
Loading