Skip to content

Commit

Permalink
[fix][broker] Avoid OOM not trigger PulsarByteBufAllocator outOfMemor…
Browse files Browse the repository at this point in the history
…yListener when use ByteBufAllocator.DEFAULT.heapBuffer in PrometheusMetricsGeneratorUtils (#18747)

Co-authored-by: nicklixinyang <nicklixinyang@didiglobal.com>
  • Loading branch information
Nicklee007 and nicklixinyang authored Dec 8, 2022
1 parent 68ca60c commit b268847
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@
package org.apache.pulsar.broker.stats.prometheus;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.prometheus.client.Collector;
import io.prometheus.client.CollectorRegistry;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Enumeration;
import java.util.List;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.util.SimpleTextOutputStream;

/**
Expand All @@ -38,7 +38,7 @@ public class PrometheusMetricsGeneratorUtils {
public static void generate(String cluster, OutputStream out,
List<PrometheusRawMetricsProvider> metricsProviders)
throws IOException {
ByteBuf buf = ByteBufAllocator.DEFAULT.heapBuffer();
ByteBuf buf = PulsarByteBufAllocator.DEFAULT.heapBuffer();
try {
SimpleTextOutputStream stream = new SimpleTextOutputStream(buf);
generateSystemMetrics(stream, cluster);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import static org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGeneratorUtils.getTypeStr;
import static org.apache.pulsar.common.stats.JvmMetrics.getJvmDirectMemoryUsed;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.prometheus.client.Collector;
import io.prometheus.client.CollectorRegistry;
Expand Down Expand Up @@ -51,6 +50,7 @@
import org.apache.pulsar.broker.stats.metrics.ManagedCursorMetrics;
import org.apache.pulsar.broker.stats.metrics.ManagedLedgerCacheMetrics;
import org.apache.pulsar.broker.stats.metrics.ManagedLedgerMetrics;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.util.DirectMemoryUtils;
import org.apache.pulsar.common.util.SimpleTextOutputStream;
Expand Down Expand Up @@ -138,7 +138,7 @@ public static synchronized void generate(PulsarService pulsar, boolean includeTo
} catch (IOException e) {
log.error("Generate metrics failed", e);
//return empty buffer if exception happens
return ByteBufAllocator.DEFAULT.heapBuffer(0);
return PulsarByteBufAllocator.DEFAULT.heapBuffer(0);
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import com.google.gson.JsonSerializationContext;
import com.google.gson.JsonSerializer;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufUtil;
import java.io.IOException;
import java.lang.reflect.Type;
Expand All @@ -44,6 +43,7 @@
import java.util.SortedMap;
import java.util.TreeMap;
import org.apache.pulsar.client.internal.DefaultImplementation;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaInfoWithVersion;
Expand Down Expand Up @@ -367,7 +367,7 @@ public static byte[] convertKeyValueDataStringToSchemaInfoSchema(
int dataLength = 4 + keyBytes.length + 4 + valueBytes.length;
byte[] schema = new byte[dataLength];
//record the key value schema respective length
ByteBuf byteBuf = ByteBufAllocator.DEFAULT.heapBuffer(dataLength);
ByteBuf byteBuf = PulsarByteBufAllocator.DEFAULT.heapBuffer(dataLength);
byteBuf.writeInt(keyBytes.length).writeBytes(keyBytes).writeInt(valueBytes.length).writeBytes(valueBytes);
byteBuf.readBytes(schema);
return schema;
Expand Down

0 comments on commit b268847

Please sign in to comment.