Skip to content

Refactor pooled CharBuffer usages within the agent (RabbitMQ body capture) #1540

Open
@SylvainJuge

Description

@SylvainJuge

Description

Found while trying to add RabbitMQ body capture when working on #1328.
While not strictly required to implement RabbitMQ message capture, it would greatly improve implementation and avoid any bug-sensitive code duplication.

  • RabbitMQ message is provided as a byte[] and we currently only allow for String messages (used in Kafka and JMS instrumentations)
  • Building a String from the byte[] would incur extra memory allocation, thus making the Message class to use a pooled CharBuffer as it's currently done with the Request class seems the right option.
  • Dealing with CharBuffer requires explicit cast to Buffer for some operations to provide compatibility with Java 7,8 and 9+ JDKs (see ElasticApmTracer crashes on JDK8 #400). For example CharBuffer#flip would return Buffer in Java7 and 8 (as it's directly inherithed from Buffer), and is overloaded in Java9+ and returns CharBuffer which breaks binary compatibility unless there is an explicit cast to Buffer.

Proposal

Creating a new class named PooledCharBuffer that would be used to hold the body field of Message and Request classes.

  • a simple String value when the buffer is not required (would be used for JMS/Kafka)
  • a CharBuffer when buffering is required (would be used for RabbitMQ/HTTP request capture)
  • write operations that would be delegated to the internal CharBuffer without exposing it directly
  • read operation asCharSequence() that would just return the internal String or CharBuffer depending on which one is used

Also, this could also be used for SQL statement capture as used in the JDBC instrumentation plugin.

Here is a draft implementation of such class

package co.elastic.apm.agent.impl.context;

import co.elastic.apm.agent.objectpool.Allocator;
import co.elastic.apm.agent.objectpool.ObjectPool;
import co.elastic.apm.agent.objectpool.Recyclable;
import co.elastic.apm.agent.objectpool.Resetter;
import co.elastic.apm.agent.objectpool.impl.QueueBasedObjectPool;
import co.elastic.apm.agent.report.serialize.DslJsonSerializer;
import org.jctools.queues.atomic.MpmcAtomicArrayQueue;

import javax.annotation.Nullable;
import java.nio.Buffer;
import java.nio.CharBuffer;

public class PooledBuffer implements Recyclable {

    // TODO the pool itself should not be static and should have configurable capacity and allocation sizes
    private static final ObjectPool<CharBuffer> charBufferPool = QueueBasedObjectPool.of(new MpmcAtomicArrayQueue<CharBuffer>(128), false,
        new Allocator<CharBuffer>() {
            @Override
            public CharBuffer createInstance() {
                return CharBuffer.allocate(DslJsonSerializer.MAX_LONG_STRING_VALUE_LENGTH);
            }
        },
        new Resetter<CharBuffer>() {
            @Override
            public void recycle(CharBuffer object) {
                ((Buffer) object).clear();
            }
        });

    @Nullable
    private CharBuffer buffer;

    private boolean bufferWriteFinished;

    @Nullable
    private String stringContent;

    public PooledBuffer() {
    }

    public PooledBuffer write(String content) {
        this.stringContent = content;
        return this;
    }

    public PooledBuffer startWriteBuffer() {
        checkWriteState(true);
        this.bufferWriteFinished = false;
        return this;
    }

    public PooledBuffer appendBuffer(CharSequence s) {
        checkWriteState(false);
        buffer.append(s);
        return this;
    }

    public PooledBuffer endWriteBuffer() {
        checkWriteState(false);
        ((Buffer) buffer).flip();
        bufferWriteFinished = true;
        return this;
    }

    @Nullable
    public CharSequence asCharSequence() {
        if (stringContent != null) {
            return stringContent;
        }
        return bufferWriteFinished ? buffer : null;
    }

    public void copyOf(PooledBuffer other) {
        // TODO 
    }

    @Override
    public void resetState() {
        this.stringContent = null;
        if (buffer != null) {
            charBufferPool.recycle(buffer);
            buffer.clear();
        }
        this.bufferWriteFinished = false;
    }

    private void checkWriteState(boolean expectFinished) {
        if (bufferWriteFinished != expectFinished) {
            throw new IllegalStateException("unexpected buffer write state");
        }
    }
}

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions