Skip to content

Trace transport request/response sizes in APM #94845

Open

Description

With #94543 we expose some stats about transport request/response sizes. We also expose information about tasks to the APM tracer. And every transport request/response pair begets a task. So we should be able to expose information about transport request/response sizes to APM too. This would be awfully useful when digging deeply into network traffic investigations.

It looks like we can add arbitrary attributes to a span via Tracer#setAttribute. Getting the request size there isn't too bad (see below) but the response size looks harder because we close the task before serialising the response (edit:see below). Also I don't know if there are any standard attribute names we should use here, or whether it works in APM if we invent attribute names like es.transport.request_bytes.

Patch that adds a request size attribute
diff --git a/server/src/main/java/org/elasticsearch/tasks/TaskManager.java b/server/src/main/java/org/elasticsearch/tasks/TaskManager.java
index 4d43bfdfa86..90b5096ef9b 100644
--- a/server/src/main/java/org/elasticsearch/tasks/TaskManager.java
+++ b/server/src/main/java/org/elasticsearch/tasks/TaskManager.java
@@ -492,6 +492,10 @@ public class TaskManager implements ClusterStateApplier {
         return cancellableTasks.assertConsistent();
     }
 
+    public void traceRequestSize(Task task, int requestSizeInBytes) {
+        tracer.setAttribute(getSpanId(task), Tracer.AttributeKeys.TRANSPORT_REQUEST_BYTES, requestSizeInBytes);
+    }
+
     private class Ban {
         final String reason;
         final Set<ChannelPendingTaskTracker> channels;
diff --git a/server/src/main/java/org/elasticsearch/tracing/Tracer.java b/server/src/main/java/org/elasticsearch/tracing/Tracer.java
index 5c61e897222..4e61161d267 100644
--- a/server/src/main/java/org/elasticsearch/tracing/Tracer.java
+++ b/server/src/main/java/org/elasticsearch/tracing/Tracer.java
@@ -190,5 +190,6 @@ public interface Tracer {
         String PARENT_TASK_ID = "es.task.parent.id";
         String CLUSTER_NAME = "es.cluster.name";
         String NODE_NAME = "es.node.name";
+        String TRANSPORT_REQUEST_BYTES = "es.transport.request_bytes";
     }
 }
diff --git a/server/src/main/java/org/elasticsearch/transport/InboundHandler.java b/server/src/main/java/org/elasticsearch/transport/InboundHandler.java
index 503d2c3be81..f8e67c10469 100644
--- a/server/src/main/java/org/elasticsearch/transport/InboundHandler.java
+++ b/server/src/main/java/org/elasticsearch/transport/InboundHandler.java
@@ -222,7 +222,9 @@ public class InboundHandler {
         } else {
             final TransportChannel transportChannel;
             final RequestHandlerRegistry<T> reg;
+            final int requestSizeInBytes;
             try {
+                requestSizeInBytes = header.getNetworkMessageSize() + TcpHeader.BYTES_REQUIRED_FOR_MESSAGE_SIZE;
                 reg = requestHandlers.getHandler(action);
                 assert message.isShortCircuit() || reg != null : action;
                 transportChannel = new TcpTransportChannel(
@@ -297,7 +299,7 @@ public class InboundHandler {
                         if (ThreadPool.Names.SAME.equals(executor)) {
                             try (var ignored = threadPool.getThreadContext().newTraceContext()) {
                                 try {
-                                    reg.processMessageReceived(request, transportChannel);
+                                    reg.processMessageReceived(request, transportChannel, requestSizeInBytes);
                                 } catch (Exception e) {
                                     sendErrorResponse(reg.getAction(), transportChannel, e);
                                 }
@@ -310,7 +312,7 @@ public class InboundHandler {
                                     .execute(threadPool.getThreadContext().preserveContextWithTracing(new AbstractRunnable() {
                                         @Override
                                         protected void doRun() throws Exception {
-                                            reg.processMessageReceived(request, transportChannel);
+                                            reg.processMessageReceived(request, transportChannel, requestSizeInBytes);
                                         }
 
                                         @Override
diff --git a/server/src/main/java/org/elasticsearch/transport/RequestHandlerRegistry.java b/server/src/main/java/org/elasticsearch/transport/RequestHandlerRegistry.java
index 8dc67fb3896..1234b53fae3 100644
--- a/server/src/main/java/org/elasticsearch/transport/RequestHandlerRegistry.java
+++ b/server/src/main/java/org/elasticsearch/transport/RequestHandlerRegistry.java
@@ -61,8 +61,9 @@ public class RequestHandlerRegistry<Request extends TransportRequest> implements
         return requestReader.read(in);
     }
 
-    public void processMessageReceived(Request request, TransportChannel channel) throws Exception {
+    public void processMessageReceived(Request request, TransportChannel channel, int requestSizeInBytes) throws Exception {
         final Task task = taskManager.register(channel.getChannelType(), action, request);
+        taskManager.traceRequestSize(task, requestSizeInBytes);
         Releasable unregisterTask = () -> taskManager.unregister(task);
         try {
             if (channel instanceof TcpTransportChannel tcpTransportChannel && task instanceof CancellableTask cancellableTask) {
diff --git a/server/src/main/java/org/elasticsearch/transport/TransportService.java b/server/src/main/java/org/elasticsearch/transport/TransportService.java
index b22f4436e93..13928c10aee 100644
--- a/server/src/main/java/org/elasticsearch/transport/TransportService.java
+++ b/server/src/main/java/org/elasticsearch/transport/TransportService.java
@@ -976,7 +976,7 @@ public class TransportService extends AbstractLifecycleComponent
             if (ThreadPool.Names.SAME.equals(executor)) {
                 try (var ignored = threadPool.getThreadContext().newTraceContext()) {
                     try {
-                        reg.processMessageReceived(request, channel);
+                        reg.processMessageReceived(request, channel, 0);
                     } catch (Exception e) {
                         handleSendToLocalException(channel, e, action);
                     }
@@ -988,7 +988,7 @@ public class TransportService extends AbstractLifecycleComponent
                     threadPool.executor(executor).execute(threadPool.getThreadContext().preserveContextWithTracing(new AbstractRunnable() {
                         @Override
                         protected void doRun() throws Exception {
-                            reg.processMessageReceived(request, channel);
+                            reg.processMessageReceived(request, channel, 0);
                         }
 
                         @Override

Edit: #94865 keeps the task alive until after serializing the response, so maybe this is easier to achieve now.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Metadata

Assignees

No one assigned

    Labels

    :Distributed/NetworkHttp and internode communication implementations>enhancementSupportabilityImprove our (devs, SREs, support eng, users) ability to troubleshoot/self-service product better.Team:DistributedMeta label for distributed team

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions