Skip to content

Commit

Permalink
inprocess: Support tracing message sizes guarded by flag (#11629)
Browse files Browse the repository at this point in the history
  • Loading branch information
shivaspeaks authored Oct 23, 2024
1 parent 62f4098 commit b65cbf5
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ public abstract class AbstractTransportTest {

private static final int TIMEOUT_MS = 5000;

protected static final String GRPC_EXPERIMENTAL_SUPPORT_TRACING_MESSAGE_SIZES =
"GRPC_EXPERIMENTAL_SUPPORT_TRACING_MESSAGE_SIZES";

private static final Attributes.Key<String> ADDITIONAL_TRANSPORT_ATTR_KEY =
Attributes.Key.create("additional-attr");

Expand Down Expand Up @@ -238,6 +241,13 @@ protected void advanceClock(long offset, TimeUnit unit) {
throw new UnsupportedOperationException();
}

/**
* Returns true if env var is set.
*/
protected static boolean isEnabledSupportTracingMessageSizes() {
return GrpcUtil.getFlag(GRPC_EXPERIMENTAL_SUPPORT_TRACING_MESSAGE_SIZES, false);
}

/**
* Returns the current time, for tests that rely on the clock.
*/
Expand Down Expand Up @@ -850,16 +860,21 @@ public void basicStream() throws Exception {
message.close();
assertThat(clientStreamTracer1.nextOutboundEvent())
.matches("outboundMessageSent\\(0, -?[0-9]+, -?[0-9]+\\)");
assertThat(clientStreamTracer1.getOutboundWireSize()).isGreaterThan(0L);
assertThat(clientStreamTracer1.getOutboundUncompressedSize()).isGreaterThan(0L);
if (isEnabledSupportTracingMessageSizes()) {
assertThat(clientStreamTracer1.getOutboundWireSize()).isGreaterThan(0L);
assertThat(clientStreamTracer1.getOutboundUncompressedSize()).isGreaterThan(0L);
}

assertThat(serverStreamTracer1.nextInboundEvent()).isEqualTo("inboundMessage(0)");
assertNull("no additional message expected", serverStreamListener.messageQueue.poll());

clientStream.halfClose();
assertTrue(serverStreamListener.awaitHalfClosed(TIMEOUT_MS, TimeUnit.MILLISECONDS));

assertThat(serverStreamTracer1.getInboundWireSize()).isGreaterThan(0L);
assertThat(serverStreamTracer1.getInboundUncompressedSize()).isGreaterThan(0L);
if (isEnabledSupportTracingMessageSizes()) {
assertThat(serverStreamTracer1.getInboundWireSize()).isGreaterThan(0L);
assertThat(serverStreamTracer1.getInboundUncompressedSize()).isGreaterThan(0L);
}
assertThat(serverStreamTracer1.nextInboundEvent())
.matches("inboundMessageRead\\(0, -?[0-9]+, -?[0-9]+\\)");

Expand Down Expand Up @@ -890,15 +905,19 @@ public void basicStream() throws Exception {
assertNotNull("message expected", message);
assertThat(serverStreamTracer1.nextOutboundEvent())
.matches("outboundMessageSent\\(0, -?[0-9]+, -?[0-9]+\\)");
assertThat(serverStreamTracer1.getOutboundWireSize()).isGreaterThan(0L);
assertThat(serverStreamTracer1.getOutboundUncompressedSize()).isGreaterThan(0L);
if (isEnabledSupportTracingMessageSizes()) {
assertThat(serverStreamTracer1.getOutboundWireSize()).isGreaterThan(0L);
assertThat(serverStreamTracer1.getOutboundUncompressedSize()).isGreaterThan(0L);
}
assertTrue(clientStreamTracer1.getInboundHeaders());
assertThat(clientStreamTracer1.nextInboundEvent()).isEqualTo("inboundMessage(0)");
assertEquals("Hi. Who are you?", methodDescriptor.parseResponse(message));
assertThat(clientStreamTracer1.nextInboundEvent())
.matches("inboundMessageRead\\(0, -?[0-9]+, -?[0-9]+\\)");
assertThat(clientStreamTracer1.getInboundWireSize()).isGreaterThan(0L);
assertThat(clientStreamTracer1.getInboundUncompressedSize()).isGreaterThan(0L);
if (isEnabledSupportTracingMessageSizes()) {
assertThat(clientStreamTracer1.getInboundWireSize()).isGreaterThan(0L);
assertThat(clientStreamTracer1.getInboundUncompressedSize()).isGreaterThan(0L);
}

message.close();
assertNull("no additional message expected", clientStreamListener.messageQueue.poll());
Expand Down Expand Up @@ -1258,10 +1277,12 @@ public void onReady() {
serverStream.close(Status.OK, new Metadata());
assertTrue(clientStreamTracer1.getOutboundHeaders());
assertTrue(clientStreamTracer1.getInboundHeaders());
assertThat(clientStreamTracer1.getInboundWireSize()).isGreaterThan(0L);
assertThat(clientStreamTracer1.getInboundUncompressedSize()).isGreaterThan(0L);
assertThat(serverStreamTracer1.getOutboundWireSize()).isGreaterThan(0L);
assertThat(serverStreamTracer1.getOutboundUncompressedSize()).isGreaterThan(0L);
if (isEnabledSupportTracingMessageSizes()) {
assertThat(clientStreamTracer1.getInboundWireSize()).isGreaterThan(0L);
assertThat(clientStreamTracer1.getInboundUncompressedSize()).isGreaterThan(0L);
assertThat(serverStreamTracer1.getOutboundWireSize()).isGreaterThan(0L);
assertThat(serverStreamTracer1.getOutboundUncompressedSize()).isGreaterThan(0L);
}
assertNull(clientStreamTracer1.getInboundTrailers());
assertSame(status, clientStreamTracer1.getStatus());
// There is a race between client cancelling and server closing. The final status seen by the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static io.grpc.inprocess.InProcessTransport.isEnabledSupportTracingMessageSizes;

import com.google.errorprone.annotations.DoNotCall;
import io.grpc.ChannelCredentials;
Expand Down Expand Up @@ -118,6 +119,9 @@ public ClientTransportFactory buildClientTransportFactory() {
managedChannelImplBuilder.setStatsRecordStartedRpcs(false);
managedChannelImplBuilder.setStatsRecordFinishedRpcs(false);
managedChannelImplBuilder.setStatsRecordRetryMetrics(false);
if (!isEnabledSupportTracingMessageSizes) {
managedChannelImplBuilder.disableRetry();
}
}

@Internal
Expand Down
88 changes: 49 additions & 39 deletions inprocess/src/main/java/io/grpc/inprocess/InProcessTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@
@ThreadSafe
final class InProcessTransport implements ServerTransport, ConnectionClientTransport {
private static final Logger log = Logger.getLogger(InProcessTransport.class.getName());
static boolean isEnabledSupportTracingMessageSizes =
GrpcUtil.getFlag("GRPC_EXPERIMENTAL_SUPPORT_TRACING_MESSAGE_SIZES", false);

private final InternalLogId logId;
private final SocketAddress address;
Expand Down Expand Up @@ -485,22 +487,25 @@ private void clientCancelled(Status status) {

@Override
public void writeMessage(InputStream message) {
long messageLength;
try {
if (assumedMessageSize != -1) {
messageLength = assumedMessageSize;
} else if (message instanceof KnownLength || message instanceof ByteArrayInputStream) {
messageLength = message.available();
} else {
InputStream oldMessage = message;
byte[] payload = ByteStreams.toByteArray(message);
messageLength = payload.length;
message = new ByteArrayInputStream(payload);
oldMessage.close();
long messageLength = 0;
if (isEnabledSupportTracingMessageSizes) {
try {
if (assumedMessageSize != -1) {
messageLength = assumedMessageSize;
} else if (message instanceof KnownLength || message instanceof ByteArrayInputStream) {
messageLength = message.available();
} else {
InputStream oldMessage = message;
byte[] payload = ByteStreams.toByteArray(message);
messageLength = payload.length;
message = new ByteArrayInputStream(payload);
oldMessage.close();
}
} catch (Exception e) {
throw new RuntimeException("Error processing the message length", e);
}
} catch (Exception e) {
throw new RuntimeException("Error processing the message length", e);
}

synchronized (this) {
if (closed) {
return;
Expand All @@ -509,11 +514,13 @@ public void writeMessage(InputStream message) {
statsTraceCtx.outboundMessageSent(outboundSeqNo, -1, -1);
clientStream.statsTraceCtx.inboundMessage(outboundSeqNo);
clientStream.statsTraceCtx.inboundMessageRead(outboundSeqNo, -1, -1);
statsTraceCtx.outboundUncompressedSize(messageLength);
statsTraceCtx.outboundWireSize(messageLength);
// messageLength should be same at receiver's end as no actual wire is involved.
clientStream.statsTraceCtx.inboundUncompressedSize(messageLength);
clientStream.statsTraceCtx.inboundWireSize(messageLength);
if (isEnabledSupportTracingMessageSizes) {
statsTraceCtx.outboundUncompressedSize(messageLength);
statsTraceCtx.outboundWireSize(messageLength);
// messageLength should be same at receiver's end as no actual wire is involved.
clientStream.statsTraceCtx.inboundUncompressedSize(messageLength);
clientStream.statsTraceCtx.inboundWireSize(messageLength);
}
outboundSeqNo++;
StreamListener.MessageProducer producer = new SingleMessageProducer(message);
if (clientRequested > 0) {
Expand All @@ -523,7 +530,6 @@ public void writeMessage(InputStream message) {
clientReceiveQueue.add(producer);
}
}

syncContext.drain();
}

Expand Down Expand Up @@ -777,21 +783,23 @@ private void serverClosed(Status serverListenerStatus, Status serverTracerStatus

@Override
public void writeMessage(InputStream message) {
long messageLength;
try {
if (assumedMessageSize != -1) {
messageLength = assumedMessageSize;
} else if (message instanceof KnownLength || message instanceof ByteArrayInputStream) {
messageLength = message.available();
} else {
InputStream oldMessage = message;
byte[] payload = ByteStreams.toByteArray(message);
messageLength = payload.length;
message = new ByteArrayInputStream(payload);
oldMessage.close();
long messageLength = 0;
if (isEnabledSupportTracingMessageSizes) {
try {
if (assumedMessageSize != -1) {
messageLength = assumedMessageSize;
} else if (message instanceof KnownLength || message instanceof ByteArrayInputStream) {
messageLength = message.available();
} else {
InputStream oldMessage = message;
byte[] payload = ByteStreams.toByteArray(message);
messageLength = payload.length;
message = new ByteArrayInputStream(payload);
oldMessage.close();
}
} catch (Exception e) {
throw new RuntimeException("Error processing the message length", e);
}
} catch (Exception e) {
throw new RuntimeException("Error processing the message length", e);
}
synchronized (this) {
if (closed) {
Expand All @@ -801,11 +809,13 @@ public void writeMessage(InputStream message) {
statsTraceCtx.outboundMessageSent(outboundSeqNo, -1, -1);
serverStream.statsTraceCtx.inboundMessage(outboundSeqNo);
serverStream.statsTraceCtx.inboundMessageRead(outboundSeqNo, -1, -1);
statsTraceCtx.outboundUncompressedSize(messageLength);
statsTraceCtx.outboundWireSize(messageLength);
// messageLength should be same at receiver's end as no actual wire is involved.
serverStream.statsTraceCtx.inboundUncompressedSize(messageLength);
serverStream.statsTraceCtx.inboundWireSize(messageLength);
if (isEnabledSupportTracingMessageSizes) {
statsTraceCtx.outboundUncompressedSize(messageLength);
statsTraceCtx.outboundWireSize(messageLength);
// messageLength should be same at receiver's end as no actual wire is involved.
serverStream.statsTraceCtx.inboundUncompressedSize(messageLength);
serverStream.statsTraceCtx.inboundWireSize(messageLength);
}
outboundSeqNo++;
StreamListener.MessageProducer producer = new SingleMessageProducer(message);
if (serverRequested > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,9 +234,11 @@ public void basicStreamInProcess() throws Exception {

private void assertAssumedMessageSize(
TestStreamTracer streamTracerSender, TestStreamTracer streamTracerReceiver) {
Assert.assertEquals(TEST_MESSAGE_LENGTH, streamTracerSender.getOutboundWireSize());
Assert.assertEquals(TEST_MESSAGE_LENGTH, streamTracerSender.getOutboundUncompressedSize());
Assert.assertEquals(TEST_MESSAGE_LENGTH, streamTracerReceiver.getInboundWireSize());
Assert.assertEquals(TEST_MESSAGE_LENGTH, streamTracerReceiver.getInboundUncompressedSize());
if (isEnabledSupportTracingMessageSizes()) {
Assert.assertEquals(TEST_MESSAGE_LENGTH, streamTracerSender.getOutboundWireSize());
Assert.assertEquals(TEST_MESSAGE_LENGTH, streamTracerSender.getOutboundUncompressedSize());
Assert.assertEquals(TEST_MESSAGE_LENGTH, streamTracerReceiver.getInboundWireSize());
Assert.assertEquals(TEST_MESSAGE_LENGTH, streamTracerReceiver.getInboundUncompressedSize());
}
}
}

0 comments on commit b65cbf5

Please sign in to comment.