Open
Description
Description
Found while trying to add RabbitMQ body capture when working on #1328.
While not strictly required to implement RabbitMQ message capture, it would greatly improve implementation and avoid any bug-sensitive code duplication.
- RabbitMQ message is provided as a
byte[]
and we currently only allow forString
messages (used in Kafka and JMS instrumentations) - Building a
String
from thebyte[]
would incur extra memory allocation, thus making theMessage
class to use a pooledCharBuffer
as it's currently done with theRequest
class seems the right option. - Dealing with
CharBuffer
requires explicit cast toBuffer
for some operations to provide compatibility with Java 7,8 and 9+ JDKs (see ElasticApmTracer crashes on JDK8 #400). For exampleCharBuffer#flip
would returnBuffer
in Java7 and 8 (as it's directly inherithed fromBuffer
), and is overloaded in Java9+ and returnsCharBuffer
which breaks binary compatibility unless there is an explicit cast toBuffer
.
Proposal
Creating a new class named PooledCharBuffer
that would be used to hold the body
field of Message
and Request
classes.
- a simple
String
value when the buffer is not required (would be used for JMS/Kafka) - a
CharBuffer
when buffering is required (would be used for RabbitMQ/HTTP request capture) - write operations that would be delegated to the internal
CharBuffer
without exposing it directly - read operation
asCharSequence()
that would just return the internalString
orCharBuffer
depending on which one is used
Also, this could also be used for SQL statement capture as used in the JDBC instrumentation plugin.
Here is a draft implementation of such class
package co.elastic.apm.agent.impl.context;
import co.elastic.apm.agent.objectpool.Allocator;
import co.elastic.apm.agent.objectpool.ObjectPool;
import co.elastic.apm.agent.objectpool.Recyclable;
import co.elastic.apm.agent.objectpool.Resetter;
import co.elastic.apm.agent.objectpool.impl.QueueBasedObjectPool;
import co.elastic.apm.agent.report.serialize.DslJsonSerializer;
import org.jctools.queues.atomic.MpmcAtomicArrayQueue;
import javax.annotation.Nullable;
import java.nio.Buffer;
import java.nio.CharBuffer;
public class PooledBuffer implements Recyclable {
// TODO the pool itself should not be static and should have configurable capacity and allocation sizes
private static final ObjectPool<CharBuffer> charBufferPool = QueueBasedObjectPool.of(new MpmcAtomicArrayQueue<CharBuffer>(128), false,
new Allocator<CharBuffer>() {
@Override
public CharBuffer createInstance() {
return CharBuffer.allocate(DslJsonSerializer.MAX_LONG_STRING_VALUE_LENGTH);
}
},
new Resetter<CharBuffer>() {
@Override
public void recycle(CharBuffer object) {
((Buffer) object).clear();
}
});
@Nullable
private CharBuffer buffer;
private boolean bufferWriteFinished;
@Nullable
private String stringContent;
public PooledBuffer() {
}
public PooledBuffer write(String content) {
this.stringContent = content;
return this;
}
public PooledBuffer startWriteBuffer() {
checkWriteState(true);
this.bufferWriteFinished = false;
return this;
}
public PooledBuffer appendBuffer(CharSequence s) {
checkWriteState(false);
buffer.append(s);
return this;
}
public PooledBuffer endWriteBuffer() {
checkWriteState(false);
((Buffer) buffer).flip();
bufferWriteFinished = true;
return this;
}
@Nullable
public CharSequence asCharSequence() {
if (stringContent != null) {
return stringContent;
}
return bufferWriteFinished ? buffer : null;
}
public void copyOf(PooledBuffer other) {
// TODO
}
@Override
public void resetState() {
this.stringContent = null;
if (buffer != null) {
charBufferPool.recycle(buffer);
buffer.clear();
}
this.bufferWriteFinished = false;
}
private void checkWriteState(boolean expectFinished) {
if (bufferWriteFinished != expectFinished) {
throw new IllegalStateException("unexpected buffer write state");
}
}
}