From aa9e3065931bdad51cf83daeb4efcee96b761748 Mon Sep 17 00:00:00 2001 From: massakam Date: Sat, 14 Apr 2018 09:10:05 +0900 Subject: [PATCH] Fix bug that WebSocket proxy returns empty metrics (#1567) --- .../proxy/ProxyPublishConsumeTest.java | 12 ++++-- .../pulsar/websocket/stats/ProxyStats.java | 40 +++++++++++++++---- 2 files changed, 42 insertions(+), 10 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java index 1e5409c790176..23895fcccb101 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java @@ -446,9 +446,15 @@ private void verifyProxyMetrics(Client client, String baseUrl) { Response response = (Response) invocationBuilder.get(); String responseStr = response.readEntity(String.class); final Gson gson = new Gson(); - System.err.println("REQ: " + statUrl); - System.err.println("RESPONSE: " + responseStr); - final List data = gson.fromJson(responseStr, new TypeToken>() { + List data = gson.fromJson(responseStr, new TypeToken>() { + }.getType()); + Assert.assertFalse(data.isEmpty()); + // re-generate metrics + service.getProxyStats().generate(); + invocationBuilder = webTarget.request(MediaType.APPLICATION_JSON); + response = (Response) invocationBuilder.get(); + responseStr = response.readEntity(String.class); + data = gson.fromJson(responseStr, new TypeToken>() { }.getType()); Assert.assertFalse(data.isEmpty()); } diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/stats/ProxyStats.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/stats/ProxyStats.java index 76f1bb2e4850d..b1816c8089895 100644 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/stats/ProxyStats.java +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/stats/ProxyStats.java @@ -32,6 +32,9 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * It periodically generates stats metrics of proxy service, * @@ -41,7 +44,6 @@ public class ProxyStats { private final WebSocketService service; private final JvmMetrics jvmMetrics; private ConcurrentOpenHashMap topicStats; - private List tempMetricsCollection; private List metricsCollection; public ProxyStats(WebSocketService service) { @@ -49,7 +51,6 @@ public ProxyStats(WebSocketService service) { this.service = service; this.jvmMetrics = new JvmMetrics(service); this.topicStats = new ConcurrentOpenHashMap<>(); - this.tempMetricsCollection = Lists.newArrayList(); this.metricsCollection = Lists.newArrayList(); // schedule stat generation task every 1 minute service.getExecutor().scheduleAtFixedRate(() -> generate(), 120, 60, TimeUnit.SECONDS); @@ -59,25 +60,32 @@ public ProxyStats(WebSocketService service) { * generates stats-metrics of proxy service and updates metricsCollection cache with latest stats. */ public synchronized void generate() { + if (log.isDebugEnabled()) { + log.debug("Start generating proxy metrics"); + } topicStats.clear(); service.getProducers().forEach((topic, handlers) -> { + if (log.isDebugEnabled()) { + log.debug("Collect stats from {} producer handlers for topic {}", handlers.size(), topic); + } + final String namespaceName = TopicName.get(topic).getNamespace(); ProxyNamespaceStats nsStat = topicStats.computeIfAbsent(namespaceName, ns -> new ProxyNamespaceStats()); handlers.forEach(handler -> { nsStat.numberOfMsgPublished += handler.getAndResetNumMsgsSent(); nsStat.numberOfBytesPublished += handler.getAndResetNumBytesSent(); nsStat.numberOfPublishFailure += handler.getAndResetNumMsgsFailed(); - if (nsStat.publishMsgLatency == null) { - nsStat.publishMsgLatency = new StatsBuckets(ENTRY_LATENCY_BUCKETS_USEC); - } handler.getPublishLatencyStatsUSec().refresh(); nsStat.publishMsgLatency.addAll(handler.getPublishLatencyStatsUSec()); - System.out.println(nsStat.publishMsgLatency); }); }); service.getConsumers().forEach((topic, handlers) -> { + if (log.isDebugEnabled()) { + log.debug("Collect stats from {} consumer handlers for topic {}", handlers.size(), topic); + } + final String namespaceName = TopicName.get(topic).getNamespace(); ProxyNamespaceStats nsStat = topicStats.computeIfAbsent(namespaceName, ns -> new ProxyNamespaceStats()); handlers.forEach(handler -> { @@ -87,16 +95,28 @@ public synchronized void generate() { }); }); - tempMetricsCollection.clear(); + List tempMetricsCollection = Lists.newArrayList(); topicStats.forEach((namespace, stats) -> { + if (log.isDebugEnabled()) { + log.debug("Add ns-stats of namespace {} to metrics", namespace); + } tempMetricsCollection.add(stats.add(namespace)); }); + // add jvm-metrics + if (log.isDebugEnabled()) { + log.debug("Add jvm-stats to metrics"); + } tempMetricsCollection.add(jvmMetrics.generate()); + // swap tempmetrics to stat-metrics List tempRef = metricsCollection; metricsCollection = tempMetricsCollection; tempRef.clear(); + + if (log.isDebugEnabled()) { + log.debug("Complete generating proxy metrics"); + } } public synchronized List getMetrics() { @@ -114,6 +134,10 @@ private static class ProxyNamespaceStats { public long numberOfBytesDelivered; public long numberOfMsgsAcked; + public ProxyNamespaceStats() { + this.publishMsgLatency = new StatsBuckets(ENTRY_LATENCY_BUCKETS_USEC); + } + public Metrics add(String namespace) { publishMsgLatency.refresh(); @@ -137,4 +161,6 @@ public Metrics add(String namespace) { } } + private static final Logger log = LoggerFactory.getLogger(ProxyStats.class); + }