Skip to content

Commit

Permalink
Merge branch 'grpc:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
SreeramdasLavanya authored Sep 23, 2024
2 parents fef4c92 + d8f73e0 commit 778cfb4
Show file tree
Hide file tree
Showing 12 changed files with 129 additions and 152 deletions.
18 changes: 16 additions & 2 deletions netty/src/main/java/io/grpc/netty/InternalProtocolNegotiators.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@
package io.grpc.netty;

import io.grpc.ChannelLogger;
import io.grpc.internal.ObjectPool;
import io.grpc.netty.ProtocolNegotiators.ClientTlsHandler;
import io.grpc.netty.ProtocolNegotiators.GrpcNegotiationHandler;
import io.grpc.netty.ProtocolNegotiators.WaitUntilActiveHandler;
import io.netty.channel.ChannelHandler;
import io.netty.handler.ssl.SslContext;
import io.netty.util.AsciiString;
import java.util.concurrent.Executor;

/**
* Internal accessor for {@link ProtocolNegotiators}.
Expand All @@ -35,9 +37,12 @@ private InternalProtocolNegotiators() {}
* Returns a {@link ProtocolNegotiator} that ensures the pipeline is set up so that TLS will
* be negotiated, the {@code handler} is added and writes to the {@link io.netty.channel.Channel}
* may happen immediately, even before the TLS Handshake is complete.
* @param executorPool a dedicated {@link Executor} pool for time-consuming TLS tasks
*/
public static InternalProtocolNegotiator.ProtocolNegotiator tls(SslContext sslContext) {
final io.grpc.netty.ProtocolNegotiator negotiator = ProtocolNegotiators.tls(sslContext);
public static InternalProtocolNegotiator.ProtocolNegotiator tls(SslContext sslContext,
ObjectPool<? extends Executor> executorPool) {
final io.grpc.netty.ProtocolNegotiator negotiator = ProtocolNegotiators.tls(sslContext,
executorPool);
final class TlsNegotiator implements InternalProtocolNegotiator.ProtocolNegotiator {

@Override
Expand All @@ -58,6 +63,15 @@ public void close() {

return new TlsNegotiator();
}

/**
* Returns a {@link ProtocolNegotiator} that ensures the pipeline is set up so that TLS will
* be negotiated, the {@code handler} is added and writes to the {@link io.netty.channel.Channel}
* may happen immediately, even before the TLS Handshake is complete.
*/
public static InternalProtocolNegotiator.ProtocolNegotiator tls(SslContext sslContext) {
return tls(sslContext, null);
}

/**
* Returns a {@link ProtocolNegotiator} that ensures the pipeline is set up so that TLS will be
Expand Down
21 changes: 7 additions & 14 deletions s2a/src/main/java/io/grpc/s2a/MtlsToS2AChannelCredentials.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,16 @@
import static com.google.common.base.Strings.isNullOrEmpty;

import io.grpc.ChannelCredentials;
import io.grpc.ExperimentalApi;
import io.grpc.TlsChannelCredentials;
import io.grpc.util.AdvancedTlsX509KeyManager;
import io.grpc.util.AdvancedTlsX509TrustManager;
import java.io.File;
import java.io.IOException;
import java.security.GeneralSecurityException;

/**
* Configures an {@code S2AChannelCredentials.Builder} instance with credentials used to establish a
* connection with the S2A to support talking to the S2A over mTLS.
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/11533")
public final class MtlsToS2AChannelCredentials {
/**
* Creates a {@code S2AChannelCredentials.Builder} builder, that talks to the S2A over mTLS.
Expand All @@ -42,7 +41,7 @@ public final class MtlsToS2AChannelCredentials {
* @param trustBundlePath the path to the trust bundle PEM.
* @return a {@code MtlsToS2AChannelCredentials.Builder} instance.
*/
public static Builder createBuilder(
public static Builder newBuilder(
String s2aAddress, String privateKeyPath, String certChainPath, String trustBundlePath) {
checkArgument(!isNullOrEmpty(s2aAddress), "S2A address must not be null or empty.");
checkArgument(!isNullOrEmpty(privateKeyPath), "privateKeyPath must not be null or empty.");
Expand All @@ -66,7 +65,7 @@ public static final class Builder {
this.trustBundlePath = trustBundlePath;
}

public S2AChannelCredentials.Builder build() throws GeneralSecurityException, IOException {
public S2AChannelCredentials.Builder build() throws IOException {
checkState(!isNullOrEmpty(s2aAddress), "S2A address must not be null or empty.");
checkState(!isNullOrEmpty(privateKeyPath), "privateKeyPath must not be null or empty.");
checkState(!isNullOrEmpty(certChainPath), "certChainPath must not be null or empty.");
Expand All @@ -75,19 +74,13 @@ public S2AChannelCredentials.Builder build() throws GeneralSecurityException, IO
File certChainFile = new File(certChainPath);
File trustBundleFile = new File(trustBundlePath);

AdvancedTlsX509KeyManager keyManager = new AdvancedTlsX509KeyManager();
keyManager.updateIdentityCredentials(certChainFile, privateKeyFile);

AdvancedTlsX509TrustManager trustManager = AdvancedTlsX509TrustManager.newBuilder().build();
trustManager.updateTrustCredentials(trustBundleFile);

ChannelCredentials channelToS2ACredentials =
TlsChannelCredentials.newBuilder()
.keyManager(keyManager)
.trustManager(trustManager)
.keyManager(certChainFile, privateKeyFile)
.trustManager(trustBundleFile)
.build();

return S2AChannelCredentials.createBuilder(s2aAddress)
return S2AChannelCredentials.newBuilder(s2aAddress)
.setS2AChannelCredentials(channelToS2ACredentials);
}
}
Expand Down
12 changes: 7 additions & 5 deletions s2a/src/main/java/io/grpc/s2a/S2AChannelCredentials.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,29 +24,31 @@
import com.google.errorprone.annotations.CanIgnoreReturnValue;
import io.grpc.Channel;
import io.grpc.ChannelCredentials;
import io.grpc.ExperimentalApi;
import io.grpc.InsecureChannelCredentials;
import io.grpc.internal.ObjectPool;
import io.grpc.internal.SharedResourcePool;
import io.grpc.netty.InternalNettyChannelCredentials;
import io.grpc.netty.InternalProtocolNegotiator;
import io.grpc.s2a.channel.S2AHandshakerServiceChannel;
import io.grpc.s2a.handshaker.S2AIdentity;
import io.grpc.s2a.handshaker.S2AProtocolNegotiatorFactory;
import java.util.Optional;
import javax.annotation.concurrent.NotThreadSafe;
import org.checkerframework.checker.nullness.qual.Nullable;

/**
* Configures gRPC to use S2A for transport security when establishing a secure channel. Only for
* use on the client side of a gRPC connection.
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/11533")
public final class S2AChannelCredentials {
/**
* Creates a channel credentials builder for establishing an S2A-secured connection.
*
* @param s2aAddress the address of the S2A server used to secure the connection.
* @return a {@code S2AChannelCredentials.Builder} instance.
*/
public static Builder createBuilder(String s2aAddress) {
public static Builder newBuilder(String s2aAddress) {
checkArgument(!isNullOrEmpty(s2aAddress), "S2A address must not be null or empty.");
return new Builder(s2aAddress);
}
Expand All @@ -56,13 +58,13 @@ public static Builder createBuilder(String s2aAddress) {
public static final class Builder {
private final String s2aAddress;
private ObjectPool<Channel> s2aChannelPool;
private Optional<ChannelCredentials> s2aChannelCredentials;
private ChannelCredentials s2aChannelCredentials;
private @Nullable S2AIdentity localIdentity = null;

Builder(String s2aAddress) {
this.s2aAddress = s2aAddress;
this.s2aChannelPool = null;
this.s2aChannelCredentials = Optional.empty();
this.s2aChannelCredentials = InsecureChannelCredentials.create();
}

/**
Expand Down Expand Up @@ -107,7 +109,7 @@ public Builder setLocalUid(String localUid) {
/** Sets the credentials to be used when connecting to the S2A. */
@CanIgnoreReturnValue
public Builder setS2AChannelCredentials(ChannelCredentials s2aChannelCredentials) {
this.s2aChannelCredentials = Optional.of(s2aChannelCredentials);
this.s2aChannelCredentials = s2aChannelCredentials;
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,10 @@
import io.grpc.MethodDescriptor;
import io.grpc.internal.SharedResourceHolder.Resource;
import io.grpc.netty.NettyChannelBuilder;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.ConcurrentMap;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.concurrent.ThreadSafe;

/**
Expand All @@ -61,7 +58,6 @@
public final class S2AHandshakerServiceChannel {
private static final ConcurrentMap<String, Resource<Channel>> SHARED_RESOURCE_CHANNELS =
Maps.newConcurrentMap();
private static final Duration DELEGATE_TERMINATION_TIMEOUT = Duration.ofSeconds(2);
private static final Duration CHANNEL_SHUTDOWN_TIMEOUT = Duration.ofSeconds(10);

/**
Expand All @@ -74,8 +70,9 @@ public final class S2AHandshakerServiceChannel {
* running at {@code s2aAddress}.
*/
public static Resource<Channel> getChannelResource(
String s2aAddress, Optional<ChannelCredentials> s2aChannelCredentials) {
String s2aAddress, ChannelCredentials s2aChannelCredentials) {
checkNotNull(s2aAddress);
checkNotNull(s2aChannelCredentials);
return SHARED_RESOURCE_CHANNELS.computeIfAbsent(
s2aAddress, channelResource -> new ChannelResource(s2aAddress, s2aChannelCredentials));
}
Expand All @@ -87,49 +84,31 @@ public static Resource<Channel> getChannelResource(
*/
private static class ChannelResource implements Resource<Channel> {
private final String targetAddress;
private final Optional<ChannelCredentials> channelCredentials;
private final ChannelCredentials channelCredentials;

public ChannelResource(String targetAddress, Optional<ChannelCredentials> channelCredentials) {
public ChannelResource(String targetAddress, ChannelCredentials channelCredentials) {
this.targetAddress = targetAddress;
this.channelCredentials = channelCredentials;
}

/**
* Creates a {@code EventLoopHoldingChannel} instance to the service running at {@code
* targetAddress}. This channel uses a dedicated thread pool for its {@code EventLoopGroup}
* instance to avoid blocking.
* Creates a {@code HandshakerServiceChannel} instance to the service running at {@code
* targetAddress}.
*/
@Override
public Channel create() {
EventLoopGroup eventLoopGroup =
new NioEventLoopGroup(1, new DefaultThreadFactory("S2A channel pool", true));
ManagedChannel channel = null;
if (channelCredentials.isPresent()) {
// Create a secure channel.
channel =
NettyChannelBuilder.forTarget(targetAddress, channelCredentials.get())
.channelType(NioSocketChannel.class)
.directExecutor()
.eventLoopGroup(eventLoopGroup)
.build();
} else {
// Create a plaintext channel.
channel =
NettyChannelBuilder.forTarget(targetAddress)
.channelType(NioSocketChannel.class)
.directExecutor()
.eventLoopGroup(eventLoopGroup)
.usePlaintext()
.build();
}
return EventLoopHoldingChannel.create(channel, eventLoopGroup);
ManagedChannel channel =
NettyChannelBuilder.forTarget(targetAddress, channelCredentials)
.directExecutor()
.build();
return HandshakerServiceChannel.create(channel);
}

/** Destroys a {@code EventLoopHoldingChannel} instance. */
/** Destroys a {@code HandshakerServiceChannel} instance. */
@Override
public void close(Channel instanceChannel) {
checkNotNull(instanceChannel);
EventLoopHoldingChannel channel = (EventLoopHoldingChannel) instanceChannel;
HandshakerServiceChannel channel = (HandshakerServiceChannel) instanceChannel;
channel.close();
}

Expand All @@ -140,23 +119,21 @@ public String toString() {
}

/**
* Manages a channel using a {@link ManagedChannel} instance that belong to the {@code
* EventLoopGroup} thread pool.
* Manages a channel using a {@link ManagedChannel} instance.
*/
@VisibleForTesting
static class EventLoopHoldingChannel extends Channel {
static class HandshakerServiceChannel extends Channel {
private static final Logger logger =
Logger.getLogger(S2AHandshakerServiceChannel.class.getName());
private final ManagedChannel delegate;
private final EventLoopGroup eventLoopGroup;

static EventLoopHoldingChannel create(ManagedChannel delegate, EventLoopGroup eventLoopGroup) {
static HandshakerServiceChannel create(ManagedChannel delegate) {
checkNotNull(delegate);
checkNotNull(eventLoopGroup);
return new EventLoopHoldingChannel(delegate, eventLoopGroup);
return new HandshakerServiceChannel(delegate);
}

private EventLoopHoldingChannel(ManagedChannel delegate, EventLoopGroup eventLoopGroup) {
private HandshakerServiceChannel(ManagedChannel delegate) {
this.delegate = delegate;
this.eventLoopGroup = eventLoopGroup;
}

/**
Expand All @@ -178,16 +155,12 @@ public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(
@SuppressWarnings("FutureReturnValueIgnored")
public void close() {
delegate.shutdownNow();
boolean isDelegateTerminated;
try {
isDelegateTerminated =
delegate.awaitTermination(DELEGATE_TERMINATION_TIMEOUT.getSeconds(), SECONDS);
delegate.awaitTermination(CHANNEL_SHUTDOWN_TIMEOUT.getSeconds(), SECONDS);
} catch (InterruptedException e) {
isDelegateTerminated = false;
Thread.currentThread().interrupt();
logger.log(Level.WARNING, "Channel to S2A was not shutdown.");
}
long quietPeriodSeconds = isDelegateTerminated ? 0 : 1;
eventLoopGroup.shutdownGracefully(
quietPeriodSeconds, CHANNEL_SHUTDOWN_TIMEOUT.getSeconds(), SECONDS);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@
import com.google.common.util.concurrent.MoreExecutors;
import com.google.errorprone.annotations.ThreadSafe;
import io.grpc.Channel;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.ObjectPool;
import io.grpc.internal.SharedResourcePool;
import io.grpc.netty.GrpcHttp2ConnectionHandler;
import io.grpc.netty.InternalProtocolNegotiator;
import io.grpc.netty.InternalProtocolNegotiator.ProtocolNegotiator;
Expand Down Expand Up @@ -227,7 +229,10 @@ protected void handlerAdded0(ChannelHandlerContext ctx) {
@Override
public void onSuccess(SslContext sslContext) {
ChannelHandler handler =
InternalProtocolNegotiators.tls(sslContext).newHandler(grpcHandler);
InternalProtocolNegotiators.tls(
sslContext,
SharedResourcePool.forResource(GrpcUtil.SHARED_CHANNEL_EXECUTOR))
.newHandler(grpcHandler);

// Remove the bufferReads handler and delegate the rest of the handshake to the TLS
// handler.
Expand Down
3 changes: 3 additions & 0 deletions s2a/src/main/java/io/grpc/s2a/handshaker/S2ATrustManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@ private void checkPeerTrusted(X509Certificate[] chain, boolean isCheckingClientC
try {
resp = stub.send(reqBuilder.build());
} catch (IOException | InterruptedException e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
throw new CertificateException("Failed to send request to S2A.", e);
}
if (resp.hasStatus() && resp.getStatus().getCode() != 0) {
Expand Down
Loading

0 comments on commit 778cfb4

Please sign in to comment.