Skip to content

core: add Grpc.TRANSPORT_ATTR_LOCAL_ADDR #4906

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Oct 3, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
.set(TSI_PEER_KEY, altsEvt.peer())
.set(ALTS_CONTEXT_KEY, altsContext)
.set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, ctx.channel().remoteAddress())
.set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, ctx.channel().localAddress())
.set(CallCredentials.ATTR_SECURITY_LEVEL, SecurityLevel.PRIVACY_AND_INTEGRITY)
.build(),
new Security(new OtherSecurity("alts", Any.pack(altsContext.context))));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,8 @@ public void peerPropagated() throws Exception {
.isEqualTo(mockedAltsContext);
assertThat(grpcHandler.attrs.get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR).toString())
.isEqualTo("embedded");
assertThat(grpcHandler.attrs.get(Grpc.TRANSPORT_ATTR_LOCAL_ADDR).toString())
.isEqualTo("embedded");
assertThat(grpcHandler.attrs.get(CallCredentials.ATTR_SECURITY_LEVEL))
.isEqualTo(SecurityLevel.PRIVACY_AND_INTEGRITY);
}
Expand Down
12 changes: 10 additions & 2 deletions core/src/main/java/io/grpc/Grpc.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,23 @@ private Grpc() {
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1710")
@TransportAttr
public static final Attributes.Key<SocketAddress> TRANSPORT_ATTR_REMOTE_ADDR =
Attributes.Key.create("remote-addr");
Attributes.Key.create("remote-addr");

/**
* Attribute key for the local address of a transport.
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1710")
@TransportAttr
public static final Attributes.Key<SocketAddress> TRANSPORT_ATTR_LOCAL_ADDR =
Attributes.Key.create("local-addr");

/**
* Attribute key for SSL session of a transport.
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1710")
@TransportAttr
public static final Attributes.Key<SSLSession> TRANSPORT_ATTR_SSL_SESSION =
Attributes.Key.create("ssl-session");
Attributes.Key.create("ssl-session");

/**
* Annotation for transport attributes. It follows the annotation semantics defined
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ public void run() {
synchronized (InProcessTransport.this) {
Attributes serverTransportAttrs = Attributes.newBuilder()
.set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, new InProcessSocketAddress(name))
.set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, new InProcessSocketAddress(name))
.build();
serverStreamAttributes = serverTransportListener.transportReady(serverTransportAttrs);
clientTransportListener.transportReady();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1759,7 +1759,7 @@ protected static void assertSuccess(StreamRecorder<?> recorder) {
}
}

/** Helper for getting remote address {@link io.grpc.ServerCall#getAttributes()} */
/** Helper for getting remote address from {@link io.grpc.ServerCall#getAttributes()} */
protected SocketAddress obtainRemoteClientAddr() {
TestServiceGrpc.TestServiceBlockingStub stub =
blockingStub.withDeadlineAfter(5, TimeUnit.SECONDS);
Expand All @@ -1769,6 +1769,16 @@ protected SocketAddress obtainRemoteClientAddr() {
return serverCallCapture.get().getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
}

/** Helper for getting local address from {@link io.grpc.ServerCall#getAttributes()} */
protected SocketAddress obtainLocalClientAddr() {
TestServiceGrpc.TestServiceBlockingStub stub =
blockingStub.withDeadlineAfter(5, TimeUnit.SECONDS);

stub.unaryCall(SimpleRequest.getDefaultInstance());

return serverCallCapture.get().getAttributes().get(Grpc.TRANSPORT_ATTR_LOCAL_ADDR);
}

/** Helper for asserting TLS info in SSLSession {@link io.grpc.ServerCall#getAttributes()} */
protected void assertX500SubjectDn(String tlsInfo) {
TestServiceGrpc.TestServiceBlockingStub stub =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,13 @@ public void remoteAddr() throws Exception {
assertNotEquals(getPort(), isa.getPort());
}

@Test
public void localAddr() throws Exception {
InetSocketAddress isa = (InetSocketAddress) obtainLocalClientAddr();
assertEquals(InetAddress.getLoopbackAddress(), isa.getAddress());
assertEquals(getPort(), isa.getPort());
}

@Test
public void tlsInfo() {
assertX500SubjectDn("CN=testclient, O=Internet Widgits Pty Ltd, ST=Some-State, C=AU");
Expand Down
5 changes: 5 additions & 0 deletions netty/src/main/java/io/grpc/netty/ProtocolNegotiators.java
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
// Set sttributes before replace to be sure we pass it before accepting any requests.
handler.handleProtocolNegotiationCompleted(Attributes.newBuilder()
.set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, ctx.channel().remoteAddress())
.set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, ctx.channel().localAddress())
.build(),
/*securityInfo=*/ null);
// Just replace this handler with the gRPC handler.
Expand Down Expand Up @@ -163,6 +164,7 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
Attributes.newBuilder()
.set(Grpc.TRANSPORT_ATTR_SSL_SESSION, session)
.set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, ctx.channel().remoteAddress())
.set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, ctx.channel().localAddress())
.build(),
new InternalChannelz.Security(new InternalChannelz.Tls(session)));
// Replace this handler with the GRPC handler.
Expand Down Expand Up @@ -673,6 +675,7 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
Attributes.newBuilder()
.set(Grpc.TRANSPORT_ATTR_SSL_SESSION, session)
.set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, ctx.channel().remoteAddress())
.set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, ctx.channel().localAddress())
.set(CallCredentials.ATTR_SECURITY_LEVEL, SecurityLevel.PRIVACY_AND_INTEGRITY)
.build(),
new InternalChannelz.Security(new InternalChannelz.Tls(session)));
Expand Down Expand Up @@ -721,6 +724,7 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception {
Attributes
.newBuilder()
.set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, ctx.channel().remoteAddress())
.set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, ctx.channel().localAddress())
.set(CallCredentials.ATTR_SECURITY_LEVEL, SecurityLevel.NONE)
.build(),
/*securityInfo=*/ null);
Expand Down Expand Up @@ -764,6 +768,7 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
Attributes
.newBuilder()
.set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, ctx.channel().remoteAddress())
.set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, ctx.channel().localAddress())
.set(CallCredentials.ATTR_SECURITY_LEVEL, SecurityLevel.NONE)
.build(),
/*securityInfo=*/ null);
Expand Down
12 changes: 11 additions & 1 deletion netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,14 @@
import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.net.ssl.SSLHandshakeException;
Expand All @@ -105,6 +107,8 @@ public class NettyClientTransportTest {
private ManagedClientTransport.Listener clientTransportListener;

private final List<NettyClientTransport> transports = new ArrayList<>();
private final LinkedBlockingQueue<Attributes> serverTransportAttributesList =
new LinkedBlockingQueue<>();
private final NioEventLoopGroup group = new NioEventLoopGroup(1);
private final EchoServerListener serverListener = new EchoServerListener();
private final InternalChannelz channelz = new InternalChannelz();
Expand Down Expand Up @@ -536,6 +540,11 @@ public void clientStreamGetsAttributes() throws Exception {

assertNotNull(rpc.stream.getAttributes().get(Grpc.TRANSPORT_ATTR_SSL_SESSION));
assertEquals(address, rpc.stream.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR));
Attributes serverTransportAttrs = serverTransportAttributesList.poll(1, TimeUnit.SECONDS);
assertNotNull(serverTransportAttrs);
SocketAddress clientAddr = serverTransportAttrs.get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
assertNotNull(clientAddr);
assertEquals(clientAddr, rpc.stream.getAttributes().get(Grpc.TRANSPORT_ATTR_LOCAL_ADDR));
}

@Test
Expand Down Expand Up @@ -749,7 +758,7 @@ public void closed(Status status) {
}
}

private static final class EchoServerListener implements ServerListener {
private final class EchoServerListener implements ServerListener {
final List<NettyServerTransport> transports = new ArrayList<>();
final List<EchoServerStreamListener> streamListeners =
Collections.synchronizedList(new ArrayList<EchoServerStreamListener>());
Expand All @@ -769,6 +778,7 @@ public void streamCreated(ServerStream stream, String method, Metadata headers)

@Override
public Attributes transportReady(Attributes transportAttrs) {
serverTransportAttributesList.add(transportAttrs);
return transportAttrs;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,7 @@ sslSocketFactory, hostnameVerifier, sock, getOverridenHost(), getOverridenPort()
attributes = Attributes
.newBuilder()
.set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, sock.getRemoteSocketAddress())
.set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, sock.getLocalSocketAddress())
.set(Grpc.TRANSPORT_ATTR_SSL_SESSION, sslSession)
.set(CallCredentials.ATTR_SECURITY_LEVEL,
sslSession == null ? SecurityLevel.NONE : SecurityLevel.PRIVACY_AND_INTEGRITY)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -707,6 +707,7 @@ public void basicStream() throws Exception {
assertEquals("additional attribute value",
serverStream.getAttributes().get(ADDITIONAL_TRANSPORT_ATTR_KEY));
assertNotNull(serverStream.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR));
assertNotNull(serverStream.getAttributes().get(Grpc.TRANSPORT_ATTR_LOCAL_ADDR));

serverStream.request(1);
assertTrue(clientStreamListener.awaitOnReadyAndDrain(TIMEOUT_MS, TimeUnit.MILLISECONDS));
Expand Down