Skip to content

Commit

Permalink
Merge branch 'master' into FixIssue10245
Browse files Browse the repository at this point in the history
  • Loading branch information
SreeramdasLavanya committed Oct 9, 2024
2 parents 4d0aa82 + 62a88ec commit b905dfa
Show file tree
Hide file tree
Showing 12 changed files with 113 additions and 62 deletions.
3 changes: 1 addition & 2 deletions contextstorage/build.gradle
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
plugins {
id "java-library"
// until we are confident we like the name
//id "maven-publish"
id "maven-publish"

id "ru.vyarus.animalsniffer"
}
Expand Down
4 changes: 1 addition & 3 deletions core/src/test/java/io/grpc/internal/RetriableStreamTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -2592,9 +2592,7 @@ public void run() {
.closed(Status.fromCode(NON_FATAL_STATUS_CODE_1), REFUSED, new Metadata());
} finally {
transport2Lock.unlock();
if (transport1Lock.tryLock()) {
transport1Lock.unlock();
}
transport1Lock.unlock();
}
}
}, "Thread-transport2");
Expand Down
11 changes: 8 additions & 3 deletions s2a/src/main/java/io/grpc/s2a/internal/handshaker/ProtoUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ final class ProtoUtil {
*
* @param tlsVersion the {@link TLSVersion} object to be converted.
* @return a {@link String} representation of the TLS version.
* @throws AssertionError if the {@code tlsVersion} is not one of the supported TLS versions.
* @throws IllegalArgumentException if the {@code tlsVersion} is not one of
* the supported TLS versions.
*/
@VisibleForTesting
static String convertTlsProtocolVersion(TLSVersion tlsVersion) {
Expand All @@ -41,7 +42,7 @@ static String convertTlsProtocolVersion(TLSVersion tlsVersion) {
case TLS_VERSION_1_0:
return "TLSv1";
default:
throw new AssertionError(
throw new IllegalArgumentException(
String.format("TLS version %d is not supported.", tlsVersion.getNumber()));
}
}
Expand All @@ -62,7 +63,11 @@ static ImmutableSet<String> buildTlsProtocolVersionSet(
}
if (versionNumber >= minTlsVersion.getNumber()
&& versionNumber <= maxTlsVersion.getNumber()) {
tlsVersions.add(convertTlsProtocolVersion(tlsVersion));
try {
tlsVersions.add(convertTlsProtocolVersion(tlsVersion));
} catch (IllegalArgumentException e) {
continue;
}
}
}
return tlsVersions.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,10 @@ private void checkPeerTrusted(X509Certificate[] chain, boolean isCheckingClientC
SessionResp resp;
try {
resp = stub.send(reqBuilder.build());
} catch (IOException | InterruptedException e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
} catch (IOException e) {
throw new CertificateException("Failed to send request to S2A.", e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new CertificateException("Failed to send request to S2A.", e);
}
if (resp.hasStatus() && resp.getStatus().getCode() != 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,11 +138,13 @@ private static void configureSslContextWithClientTlsConfiguration(
NoSuchAlgorithmException,
UnrecoverableKeyException {
sslContextBuilder.keyManager(createKeylessManager(clientTlsConfiguration));
ImmutableSet<String> tlsVersions =
ImmutableSet<String> tlsVersions;
tlsVersions =
ProtoUtil.buildTlsProtocolVersionSet(
clientTlsConfiguration.getMinTlsVersion(), clientTlsConfiguration.getMaxTlsVersion());
if (tlsVersions.isEmpty()) {
throw new S2AConnectionException("Set of TLS versions received from S2A server is empty.");
throw new S2AConnectionException("Set of TLS versions received from S2A server is"
+ " empty or not supported.");
}
sslContextBuilder.protocols(tlsVersions);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import io.grpc.StatusRuntimeException;
import io.grpc.TlsChannelCredentials;
import io.grpc.TlsServerCredentials;
import io.grpc.benchmarks.Utils;
import io.grpc.internal.SharedResourceHolder.Resource;
import io.grpc.netty.NettyServerBuilder;
import io.grpc.stub.StreamObserver;
Expand Down Expand Up @@ -123,7 +122,7 @@ public void getChannelResource_twoDistinctChannels() {
InsecureChannelCredentials.create());
Resource<Channel> resourceTwo =
S2AHandshakerServiceChannel.getChannelResource(
"localhost:" + Utils.pickUnusedPort(), InsecureChannelCredentials.create());
"localhost:" + plaintextServer.getPort() + 1, InsecureChannelCredentials.create());
assertThat(resourceTwo).isNotEqualTo(resource);
}

Expand All @@ -135,7 +134,7 @@ public void getChannelResource_mtlsTwoDistinctChannels() throws Exception {
"localhost:" + mtlsServer.getPort(), getTlsChannelCredentials());
Resource<Channel> resourceTwo =
S2AHandshakerServiceChannel.getChannelResource(
"localhost:" + Utils.pickUnusedPort(), getTlsChannelCredentials());
"localhost:" + mtlsServer.getPort() + 1, getTlsChannelCredentials());
assertThat(resourceTwo).isNotEqualTo(resource);
}

Expand Down Expand Up @@ -229,13 +228,13 @@ private static Server createMtlsServer() throws Exception {
.clientAuth(TlsServerCredentials.ClientAuth.REQUIRE)
.build();
return grpcCleanup.register(
NettyServerBuilder.forPort(Utils.pickUnusedPort(), creds).addService(service).build());
NettyServerBuilder.forPort(0, creds).addService(service).build());
}

private static Server createPlaintextServer() {
SimpleServiceImpl service = new SimpleServiceImpl();
return grpcCleanup.register(
ServerBuilder.forPort(Utils.pickUnusedPort()).addService(service).build());
ServerBuilder.forPort(0).addService(service).build());
}

private static ChannelCredentials getTlsChannelCredentials() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ public void convertTlsProtocolVersion_success() {

@Test
public void convertTlsProtocolVersion_withUnknownTlsVersion_fails() {
AssertionError expected =
IllegalArgumentException expected =
assertThrows(
AssertionError.class,
IllegalArgumentException.class,
() -> ProtoUtil.convertTlsProtocolVersion(TLSVersion.TLS_VERSION_UNSPECIFIED));
expect.that(expected).hasMessageThat().isEqualTo("TLS version 0 is not supported.");
}
Expand Down Expand Up @@ -79,12 +79,10 @@ public void buildTlsProtocolVersionSet_success() {

@Test
public void buildTlsProtocolVersionSet_failure() {
AssertionError expected =
assertThrows(
AssertionError.class,
() ->
ProtoUtil.buildTlsProtocolVersionSet(
TLSVersion.TLS_VERSION_UNSPECIFIED, TLSVersion.TLS_VERSION_1_3));
expect.that(expected).hasMessageThat().isEqualTo("TLS version 0 is not supported.");
expect
.that(
ProtoUtil.buildTlsProtocolVersionSet(
TLSVersion.TLS_VERSION_UNSPECIFIED, TLSVersion.TLS_VERSION_1_3))
.isEqualTo(ImmutableSet.of("TLSv1", "TLSv1.1", "TLSv1.2", "TLSv1.3"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public void createForClient_getsBadTlsVersionsFromServer_throwsError() throws Ex

assertThat(expected)
.hasMessageThat()
.contains("Set of TLS versions received from S2A server is empty.");
.contains("Set of TLS versions received from S2A server is empty or not supported.");
}

@Test
Expand Down
4 changes: 2 additions & 2 deletions settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ include ":grpc-istio-interop-testing"
include ":grpc-inprocess"
include ":grpc-util"
include ":grpc-opentelemetry"
include ":grpc-opentelemetry-context-storage-override"
include ":grpc-context-override-opentelemetry"

project(':grpc-api').projectDir = "$rootDir/api" as File
project(':grpc-core').projectDir = "$rootDir/core" as File
Expand Down Expand Up @@ -114,7 +114,7 @@ project(':grpc-istio-interop-testing').projectDir = "$rootDir/istio-interop-test
project(':grpc-inprocess').projectDir = "$rootDir/inprocess" as File
project(':grpc-util').projectDir = "$rootDir/util" as File
project(':grpc-opentelemetry').projectDir = "$rootDir/opentelemetry" as File
project(':grpc-opentelemetry-context-storage-override').projectDir = "$rootDir/contextstorage" as File
project(':grpc-context-override-opentelemetry').projectDir = "$rootDir/contextstorage" as File

if (settings.hasProperty('skipCodegen') && skipCodegen.toBoolean()) {
println '*** Skipping the build of codegen and compilation of proto files because skipCodegen=true'
Expand Down
45 changes: 29 additions & 16 deletions xds/src/main/java/io/grpc/xds/client/ControlPlaneClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package io.grpc.xds.client;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;

Expand Down Expand Up @@ -60,7 +59,6 @@
*/
final class ControlPlaneClient {

public static final String CLOSED_BY_SERVER = "Closed by server";
private final SynchronizationContext syncContext;
private final InternalLogId logId;
private final XdsLogger logger;
Expand Down Expand Up @@ -358,11 +356,7 @@ public void run() {
@Override
public void onStatusReceived(final Status status) {
syncContext.execute(() -> {
if (status.isOk()) {
handleRpcStreamClosed(Status.UNAVAILABLE.withDescription(CLOSED_BY_SERVER));
} else {
handleRpcStreamClosed(status);
}
handleRpcStreamClosed(status);
});
}

Expand All @@ -381,7 +375,7 @@ final void handleRpcResponse(XdsResourceType<?> type, String versionInfo, List<A
processingTracker.onComplete();
}

private void handleRpcStreamClosed(Status error) {
private void handleRpcStreamClosed(Status status) {
if (closed) {
return;
}
Expand All @@ -399,15 +393,34 @@ private void handleRpcStreamClosed(Status error) {
rpcRetryTimer = syncContext.schedule(
new RpcRetryTask(), delayNanos, TimeUnit.NANOSECONDS, timeService);

checkArgument(!error.isOk(), "unexpected OK status");
String errorMsg = error.getDescription() != null
&& error.getDescription().equals(CLOSED_BY_SERVER)
? "ADS stream closed with status {0}: {1}. Cause: {2}"
: "ADS stream failed with status {0}: {1}. Cause: {2}";
logger.log(
XdsLogLevel.ERROR, errorMsg, error.getCode(), error.getDescription(), error.getCause());
Status newStatus = status;
if (responseReceived) {
// A closed ADS stream after a successful response is not considered an error. Servers may
// close streams for various reasons during normal operation, such as load balancing or
// underlying connection hitting its max connection age limit (see gRFC A9).
if (!status.isOk()) {
newStatus = Status.OK;
logger.log( XdsLogLevel.DEBUG, "ADS stream closed with error {0}: {1}. However, a "
+ "response was received, so this will not be treated as an error. Cause: {2}",
status.getCode(), status.getDescription(), status.getCause());
} else {
logger.log(XdsLogLevel.DEBUG,
"ADS stream closed by server after a response was received");
}
} else {
// If the ADS stream is closed without ever having received a response from the server, then
// the XdsClient should consider that a connectivity error (see gRFC A57).
if (status.isOk()) {
newStatus = Status.UNAVAILABLE.withDescription(
"ADS stream closed with OK before receiving a response");
}
logger.log(
XdsLogLevel.ERROR, "ADS stream failed with status {0}: {1}. Cause: {2}",
newStatus.getCode(), newStatus.getDescription(), newStatus.getCause());
}

closed = true;
xdsResponseHandler.handleStreamClosed(error);
xdsResponseHandler.handleStreamClosed(newStatus);
cleanUp();

logger.log(XdsLogLevel.INFO, "Retry ADS stream in {0} ns", delayNanos);
Expand Down
12 changes: 7 additions & 5 deletions xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,13 @@ public void handleResourceResponse(
public void handleStreamClosed(Status error) {
syncContext.throwIfNotInThisSynchronizationContext();
cleanUpResourceTimers();
for (Map<String, ResourceSubscriber<? extends ResourceUpdate>> subscriberMap :
resourceSubscribers.values()) {
for (ResourceSubscriber<? extends ResourceUpdate> subscriber : subscriberMap.values()) {
if (!subscriber.hasResult()) {
subscriber.onError(error, null);
if (!error.isOk()) {
for (Map<String, ResourceSubscriber<? extends ResourceUpdate>> subscriberMap :
resourceSubscribers.values()) {
for (ResourceSubscriber<? extends ResourceUpdate> subscriber : subscriberMap.values()) {
if (!subscriber.hasResult()) {
subscriber.onError(error, null);
}
}
}
}
Expand Down
55 changes: 45 additions & 10 deletions xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -3331,6 +3331,43 @@ public void useIndependentRpcContext() {
}
}

@Test
public void streamClosedWithNoResponse() {
xdsClient.watchXdsResource(XdsListenerResource.getInstance(),LDS_RESOURCE, ldsResourceWatcher);
xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(),RDS_RESOURCE,
rdsResourceWatcher);
DiscoveryRpcCall call = resourceDiscoveryCalls.poll();
// Management server closes the RPC stream before sending any response.
call.sendCompleted();
verify(ldsResourceWatcher, Mockito.timeout(1000).times(1))
.onError(errorCaptor.capture());
verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE,
"ADS stream closed with OK before receiving a response");
verify(rdsResourceWatcher).onError(errorCaptor.capture());
verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE,
"ADS stream closed with OK before receiving a response");
}

@Test
public void streamClosedAfterSendingResponses() {
xdsClient.watchXdsResource(XdsListenerResource.getInstance(),LDS_RESOURCE, ldsResourceWatcher);
xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(),RDS_RESOURCE,
rdsResourceWatcher);
DiscoveryRpcCall call = resourceDiscoveryCalls.poll();
ScheduledTask ldsResourceTimeout =
Iterables.getOnlyElement(fakeClock.getPendingTasks(LDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER));
ScheduledTask rdsResourceTimeout =
Iterables.getOnlyElement(fakeClock.getPendingTasks(RDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER));
call.sendResponse(LDS, testListenerRds, VERSION_1, "0000");
assertThat(ldsResourceTimeout.isCancelled()).isTrue();
call.sendResponse(RDS, testRouteConfig, VERSION_1, "0000");
assertThat(rdsResourceTimeout.isCancelled()).isTrue();
// Management server closes the RPC stream after sending responses.
call.sendCompleted();
verify(ldsResourceWatcher, never()).onError(errorCaptor.capture());
verify(rdsResourceWatcher, never()).onError(errorCaptor.capture());
}

@Test
public void streamClosedAndRetryWithBackoff() {
InOrder inOrder = Mockito.inOrder(backoffPolicyProvider, backoffPolicy1, backoffPolicy2);
Expand Down Expand Up @@ -3408,10 +3445,10 @@ public void streamClosedAndRetryWithBackoff() {
call.sendError(Status.DEADLINE_EXCEEDED.asException());
verify(ldsResourceWatcher, times(2)).onError(errorCaptor.capture());
verify(rdsResourceWatcher, times(2)).onError(errorCaptor.capture());
verify(cdsResourceWatcher, times(3)).onError(errorCaptor.capture());
verifyStatusWithNodeId(errorCaptor.getValue(), Code.DEADLINE_EXCEEDED, "");
verify(edsResourceWatcher, times(3)).onError(errorCaptor.capture());
verifyStatusWithNodeId(errorCaptor.getValue(), Code.DEADLINE_EXCEEDED, "");
verify(cdsResourceWatcher, times(2)).onError(errorCaptor.capture());
verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, errorMsg);
verify(edsResourceWatcher, times(2)).onError(errorCaptor.capture());
verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, errorMsg);

// Reset backoff sequence and retry after backoff.
inOrder.verify(backoffPolicyProvider).get();
Expand All @@ -3430,9 +3467,9 @@ public void streamClosedAndRetryWithBackoff() {
call.sendError(Status.UNAVAILABLE.asException());
verify(ldsResourceWatcher, times(2)).onError(errorCaptor.capture());
verify(rdsResourceWatcher, times(2)).onError(errorCaptor.capture());
verify(cdsResourceWatcher, times(4)).onError(errorCaptor.capture());
verify(cdsResourceWatcher, times(3)).onError(errorCaptor.capture());
verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, "");
verify(edsResourceWatcher, times(4)).onError(errorCaptor.capture());
verify(edsResourceWatcher, times(3)).onError(errorCaptor.capture());
verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, "");

// Retry after backoff.
Expand Down Expand Up @@ -3516,10 +3553,8 @@ public void streamClosedAndRetryRestartsResourceInitialFetchTimerForUnresolvedRe
assertThat(edsResourceTimeout.isCancelled()).isTrue();
verify(ldsResourceWatcher, never()).onError(errorCaptor.capture());
verify(rdsResourceWatcher, never()).onError(errorCaptor.capture());
verify(cdsResourceWatcher).onError(errorCaptor.capture());
verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, "");
verify(edsResourceWatcher).onError(errorCaptor.capture());
verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, "");
verify(cdsResourceWatcher, never()).onError(errorCaptor.capture());
verify(edsResourceWatcher, never()).onError(errorCaptor.capture());

fakeClock.forwardNanos(10L);
assertThat(fakeClock.getPendingTasks(LDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).hasSize(0);
Expand Down

0 comments on commit b905dfa

Please sign in to comment.