Description
With #94543 we expose some stats about transport request/response sizes. We also expose information about tasks to the APM tracer. And every transport request/response pair begets a task. So we should be able to expose information about transport request/response sizes to APM too. This would be awfully useful when digging deeply into network traffic investigations.
It looks like we can add arbitrary attributes to a span via Tracer#setAttribute
. Getting the request size there isn't too bad (see below) but the response size looks harder because we close the task before serialising the response (edit:see below). Also I don't know if there are any standard attribute names we should use here, or whether it works in APM if we invent attribute names like es.transport.request_bytes
.
Patch that adds a request size attribute
diff --git a/server/src/main/java/org/elasticsearch/tasks/TaskManager.java b/server/src/main/java/org/elasticsearch/tasks/TaskManager.java
index 4d43bfdfa86..90b5096ef9b 100644
--- a/server/src/main/java/org/elasticsearch/tasks/TaskManager.java
+++ b/server/src/main/java/org/elasticsearch/tasks/TaskManager.java
@@ -492,6 +492,10 @@ public class TaskManager implements ClusterStateApplier {
return cancellableTasks.assertConsistent();
}
+ public void traceRequestSize(Task task, int requestSizeInBytes) {
+ tracer.setAttribute(getSpanId(task), Tracer.AttributeKeys.TRANSPORT_REQUEST_BYTES, requestSizeInBytes);
+ }
+
private class Ban {
final String reason;
final Set<ChannelPendingTaskTracker> channels;
diff --git a/server/src/main/java/org/elasticsearch/tracing/Tracer.java b/server/src/main/java/org/elasticsearch/tracing/Tracer.java
index 5c61e897222..4e61161d267 100644
--- a/server/src/main/java/org/elasticsearch/tracing/Tracer.java
+++ b/server/src/main/java/org/elasticsearch/tracing/Tracer.java
@@ -190,5 +190,6 @@ public interface Tracer {
String PARENT_TASK_ID = "es.task.parent.id";
String CLUSTER_NAME = "es.cluster.name";
String NODE_NAME = "es.node.name";
+ String TRANSPORT_REQUEST_BYTES = "es.transport.request_bytes";
}
}
diff --git a/server/src/main/java/org/elasticsearch/transport/InboundHandler.java b/server/src/main/java/org/elasticsearch/transport/InboundHandler.java
index 503d2c3be81..f8e67c10469 100644
--- a/server/src/main/java/org/elasticsearch/transport/InboundHandler.java
+++ b/server/src/main/java/org/elasticsearch/transport/InboundHandler.java
@@ -222,7 +222,9 @@ public class InboundHandler {
} else {
final TransportChannel transportChannel;
final RequestHandlerRegistry<T> reg;
+ final int requestSizeInBytes;
try {
+ requestSizeInBytes = header.getNetworkMessageSize() + TcpHeader.BYTES_REQUIRED_FOR_MESSAGE_SIZE;
reg = requestHandlers.getHandler(action);
assert message.isShortCircuit() || reg != null : action;
transportChannel = new TcpTransportChannel(
@@ -297,7 +299,7 @@ public class InboundHandler {
if (ThreadPool.Names.SAME.equals(executor)) {
try (var ignored = threadPool.getThreadContext().newTraceContext()) {
try {
- reg.processMessageReceived(request, transportChannel);
+ reg.processMessageReceived(request, transportChannel, requestSizeInBytes);
} catch (Exception e) {
sendErrorResponse(reg.getAction(), transportChannel, e);
}
@@ -310,7 +312,7 @@ public class InboundHandler {
.execute(threadPool.getThreadContext().preserveContextWithTracing(new AbstractRunnable() {
@Override
protected void doRun() throws Exception {
- reg.processMessageReceived(request, transportChannel);
+ reg.processMessageReceived(request, transportChannel, requestSizeInBytes);
}
@Override
diff --git a/server/src/main/java/org/elasticsearch/transport/RequestHandlerRegistry.java b/server/src/main/java/org/elasticsearch/transport/RequestHandlerRegistry.java
index 8dc67fb3896..1234b53fae3 100644
--- a/server/src/main/java/org/elasticsearch/transport/RequestHandlerRegistry.java
+++ b/server/src/main/java/org/elasticsearch/transport/RequestHandlerRegistry.java
@@ -61,8 +61,9 @@ public class RequestHandlerRegistry<Request extends TransportRequest> implements
return requestReader.read(in);
}
- public void processMessageReceived(Request request, TransportChannel channel) throws Exception {
+ public void processMessageReceived(Request request, TransportChannel channel, int requestSizeInBytes) throws Exception {
final Task task = taskManager.register(channel.getChannelType(), action, request);
+ taskManager.traceRequestSize(task, requestSizeInBytes);
Releasable unregisterTask = () -> taskManager.unregister(task);
try {
if (channel instanceof TcpTransportChannel tcpTransportChannel && task instanceof CancellableTask cancellableTask) {
diff --git a/server/src/main/java/org/elasticsearch/transport/TransportService.java b/server/src/main/java/org/elasticsearch/transport/TransportService.java
index b22f4436e93..13928c10aee 100644
--- a/server/src/main/java/org/elasticsearch/transport/TransportService.java
+++ b/server/src/main/java/org/elasticsearch/transport/TransportService.java
@@ -976,7 +976,7 @@ public class TransportService extends AbstractLifecycleComponent
if (ThreadPool.Names.SAME.equals(executor)) {
try (var ignored = threadPool.getThreadContext().newTraceContext()) {
try {
- reg.processMessageReceived(request, channel);
+ reg.processMessageReceived(request, channel, 0);
} catch (Exception e) {
handleSendToLocalException(channel, e, action);
}
@@ -988,7 +988,7 @@ public class TransportService extends AbstractLifecycleComponent
threadPool.executor(executor).execute(threadPool.getThreadContext().preserveContextWithTracing(new AbstractRunnable() {
@Override
protected void doRun() throws Exception {
- reg.processMessageReceived(request, channel);
+ reg.processMessageReceived(request, channel, 0);
}
@Override
Edit: #94865 keeps the task alive until after serializing the response, so maybe this is easier to achieve now.