Skip to content

Commit

Permalink
[feat] [broker] Add broker health check status into prometheus metrics (
Browse files Browse the repository at this point in the history
  • Loading branch information
vineeth1995 authored Oct 7, 2024
1 parent b9ededc commit 6c7ec4c
Show file tree
Hide file tree
Showing 7 changed files with 128 additions and 20 deletions.
2 changes: 2 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -1689,6 +1689,8 @@ exposePublisherStats=true
statsUpdateFrequencyInSecs=60
statsUpdateInitialDelayInSecs=60

healthCheckMetricsUpdateTimeInSeconds=-1

# Enable expose the precise backlog stats.
# Set false to use published counter and consumed counter to calculate, this would be more efficient but may be inaccurate.
# Default is false.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3280,6 +3280,12 @@ public double getLoadBalancerBandwidthOutResourceWeight() {
doc = "Stats update initial delay in seconds"
)
private int statsUpdateInitialDelayInSecs = 60;
@FieldContext(
category = CATEGORY_METRICS,
minValue = -1,
doc = "HealthCheck update frequency in seconds. Disable health check with value -1 (Default value -1)"
)
private int healthCheckMetricsUpdateTimeInSeconds = -1;
@FieldContext(
category = CATEGORY_METRICS,
doc = "If true, aggregate publisher stats of PartitionedTopicStats by producerName"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.commons.lang.StringUtils;
import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.PulsarService.State;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.admin.AdminResource;
Expand Down Expand Up @@ -422,26 +423,35 @@ public static String getHeartbeatTopicName(String brokerId, ServiceConfiguration
}

private CompletableFuture<Void> internalRunHealthCheck(TopicVersion topicVersion) {
String brokerId = pulsar().getBrokerId();
return internalRunHealthCheck(topicVersion, pulsar(), clientAppId());
}


public static CompletableFuture<Void> internalRunHealthCheck(TopicVersion topicVersion, PulsarService pulsar,
String clientAppId) {
NamespaceName namespaceName = (topicVersion == TopicVersion.V2)
? NamespaceService.getHeartbeatNamespaceV2(pulsar.getAdvertisedAddress(), pulsar.getConfiguration())
: NamespaceService.getHeartbeatNamespace(pulsar.getAdvertisedAddress(), pulsar.getConfiguration());
String brokerId = pulsar.getBrokerId();
final String topicName =
getHeartbeatTopicName(brokerId, pulsar().getConfiguration(), (topicVersion == TopicVersion.V2));
LOG.info("[{}] Running healthCheck with topic={}", clientAppId(), topicName);
getHeartbeatTopicName(brokerId, pulsar.getConfiguration(), (topicVersion == TopicVersion.V2));
LOG.info("[{}] Running healthCheck with topic={}", clientAppId, topicName);
final String messageStr = UUID.randomUUID().toString();
final String subscriptionName = "healthCheck-" + messageStr;
// create non-partitioned topic manually and close the previous reader if present.
return pulsar().getBrokerService().getTopic(topicName, true)
return pulsar.getBrokerService().getTopic(topicName, true)
.thenCompose(topicOptional -> {
if (!topicOptional.isPresent()) {
LOG.error("[{}] Fail to run health check while get topic {}. because get null value.",
clientAppId(), topicName);
clientAppId, topicName);
throw new RestException(Status.NOT_FOUND,
String.format("Topic [%s] not found after create.", topicName));
}
PulsarClient client;
try {
client = pulsar().getClient();
client = pulsar.getClient();
} catch (PulsarServerException e) {
LOG.error("[{}] Fail to run health check while get client.", clientAppId());
LOG.error("[{}] Fail to run health check while get client.", clientAppId);
throw new RestException(e);
}
CompletableFuture<Void> resultFuture = new CompletableFuture<>();
Expand All @@ -451,17 +461,18 @@ private CompletableFuture<Void> internalRunHealthCheck(TopicVersion topicVersion
.startMessageId(MessageId.latest)
.createAsync().exceptionally(createException -> {
producer.closeAsync().exceptionally(ex -> {
LOG.error("[{}] Close producer fail while heath check.", clientAppId());
LOG.error("[{}] Close producer fail while heath check.", clientAppId);
return null;
});
throw FutureUtil.wrapToCompletionException(createException);
}).thenCompose(reader -> producer.sendAsync(messageStr)
.thenCompose(__ -> FutureUtil.addTimeoutHandling(
healthCheckRecursiveReadNext(reader, messageStr),
HEALTH_CHECK_READ_TIMEOUT, pulsar().getBrokerService().executor(),
HEALTH_CHECK_READ_TIMEOUT, pulsar.getBrokerService().executor(),
() -> HEALTH_CHECK_TIMEOUT_EXCEPTION))
.whenComplete((__, ex) -> {
closeAndReCheck(producer, reader, topicOptional.get(), subscriptionName)
closeAndReCheck(producer, reader, topicOptional.get(), subscriptionName,
clientAppId)
.whenComplete((unused, innerEx) -> {
if (ex != null) {
resultFuture.completeExceptionally(ex);
Expand All @@ -479,6 +490,11 @@ HEALTH_CHECK_READ_TIMEOUT, pulsar().getBrokerService().executor(),
});
}

private CompletableFuture<Void> closeAndReCheck(Producer<String> producer, Reader<String> reader,
Topic topic, String subscriptionName) {
return closeAndReCheck(producer, reader, topic, subscriptionName, clientAppId());
}

/**
* Close producer and reader and then to re-check if this operation is success.
*
Expand All @@ -491,8 +507,8 @@ HEALTH_CHECK_READ_TIMEOUT, pulsar().getBrokerService().executor(),
* @param topic Topic
* @param subscriptionName Subscription name
*/
private CompletableFuture<Void> closeAndReCheck(Producer<String> producer, Reader<String> reader,
Topic topic, String subscriptionName) {
private static CompletableFuture<Void> closeAndReCheck(Producer<String> producer, Reader<String> reader,
Topic topic, String subscriptionName, String clientAppId) {
// no matter exception or success, we still need to
// close producer/reader
CompletableFuture<Void> producerFuture = producer.closeAsync();
Expand All @@ -503,32 +519,32 @@ private CompletableFuture<Void> closeAndReCheck(Producer<String> producer, Reade
return FutureUtil.waitForAll(Collections.unmodifiableList(futures))
.exceptionally(closeException -> {
if (readerFuture.isCompletedExceptionally()) {
LOG.error("[{}] Close reader fail while heath check.", clientAppId());
LOG.error("[{}] Close reader fail while heath check.", clientAppId);
Subscription subscription =
topic.getSubscription(subscriptionName);
// re-check subscription after reader close
if (subscription != null) {
LOG.warn("[{}] Force delete subscription {} "
+ "when it still exists after the"
+ " reader is closed.",
clientAppId(), subscription);
clientAppId, subscription);
subscription.deleteForcefully()
.exceptionally(ex -> {
LOG.error("[{}] Force delete subscription fail"
+ " while health check",
clientAppId(), ex);
clientAppId, ex);
return null;
});
}
} else {
// producer future fail.
LOG.error("[{}] Close producer fail while heath check.", clientAppId());
LOG.error("[{}] Close producer fail while heath check.", clientAppId);
}
return null;
});
}

private CompletableFuture<Void> healthCheckRecursiveReadNext(Reader<String> reader, String content) {
private static CompletableFuture<Void> healthCheckRecursiveReadNext(Reader<String> reader, String content) {
return reader.readNextAsync()
.thenCompose(msg -> {
if (!Objects.equals(content, msg.getValue())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.apache.bookkeeper.mledger.ManagedLedgerConfig.PROPERTY_SOURCE_TOPIC_KEY;
import static org.apache.commons.collections4.CollectionUtils.isEmpty;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.pulsar.broker.admin.impl.BrokersBase.internalRunHealthCheck;
import static org.apache.pulsar.client.util.RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX;
import static org.apache.pulsar.client.util.RetryMessageUtil.RETRY_GROUP_TOPIC_SUFFIX;
import static org.apache.pulsar.common.naming.SystemTopicNames.isTransactionInternalName;
Expand Down Expand Up @@ -157,6 +158,7 @@
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.naming.TopicVersion;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.AutoSubscriptionCreationOverride;
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
Expand Down Expand Up @@ -241,6 +243,7 @@ public class BrokerService implements Closeable {

private AuthorizationService authorizationService;
private final ScheduledExecutorService statsUpdater;

@Getter
private final ScheduledExecutorService backlogQuotaChecker;

Expand Down Expand Up @@ -346,6 +349,7 @@ public BrokerService(PulsarService pulsar, EventLoopGroup eventLoopGroup) throws
this.acceptorGroup = EventLoopUtil.newEventLoopGroup(
pulsar.getConfiguration().getNumAcceptorThreads(), false, acceptorThreadFactory);
this.workerGroup = eventLoopGroup;

this.statsUpdater = OrderedScheduler.newSchedulerBuilder()
.name("pulsar-stats-updater")
.numThreads(1)
Expand Down Expand Up @@ -611,6 +615,7 @@ public void start() throws Exception {
this.startStatsUpdater(
serviceConfig.getStatsUpdateInitialDelayInSecs(),
serviceConfig.getStatsUpdateFrequencyInSecs());
this.initializeHealthChecker();
this.startInactivityMonitor();
this.startMessageExpiryMonitor();
this.startCompactionMonitor();
Expand Down Expand Up @@ -640,6 +645,24 @@ protected void startStatsUpdater(int statsUpdateInitialDelayInSecs, int statsUpd
updateRates();
}

protected void initializeHealthChecker() {
ServiceConfiguration config = pulsar().getConfiguration();
if (config.getHealthCheckMetricsUpdateTimeInSeconds() > 0) {
int interval = config.getHealthCheckMetricsUpdateTimeInSeconds();
statsUpdater.scheduleAtFixedRate(this::checkHealth,
interval, interval, TimeUnit.SECONDS);
}
}

public CompletableFuture<Void> checkHealth() {
return internalRunHealthCheck(TopicVersion.V2, pulsar(), null).thenAccept(__ -> {
this.pulsarStats.getBrokerOperabilityMetrics().recordHealthCheckStatusSuccess();
}).exceptionally(ex -> {
this.pulsarStats.getBrokerOperabilityMetrics().recordHealthCheckStatusFail();
return null;
});
}

protected void startDeduplicationSnapshotMonitor() {
// We do not know whether users will enable deduplication on namespace level/topic level or not, so keep this
// scheduled task runs.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public class BrokerOperabilityMetrics implements AutoCloseable {
private final LongAdder connectionTotalCreatedCount;
private final LongAdder connectionTotalClosedCount;
private final LongAdder connectionActive;
private volatile int healthCheckStatus; // 1=success, 0=failure, -1=unknown

private final LongAdder connectionCreateSuccessCount;
private final LongAdder connectionCreateFailCount;
Expand All @@ -61,7 +62,7 @@ public BrokerOperabilityMetrics(PulsarService pulsar) {
this.connectionTotalCreatedCount = new LongAdder();
this.connectionTotalClosedCount = new LongAdder();
this.connectionActive = new LongAdder();

this.healthCheckStatus = -1;
this.connectionCreateSuccessCount = new LongAdder();
this.connectionCreateFailCount = new LongAdder();

Expand Down Expand Up @@ -103,6 +104,7 @@ private void generate() {
reset();
metricsList.add(getTopicLoadMetrics());
metricsList.add(getConnectionMetrics());
metricsList.add(getHealthMetrics());
}

public Metrics generateConnectionMetrics() {
Expand All @@ -119,6 +121,12 @@ Metrics getConnectionMetrics() {
return rMetrics;
}

Metrics getHealthMetrics() {
Metrics rMetrics = Metrics.create(getDimensionMap("broker_health"));
rMetrics.put("brk_health", healthCheckStatus);
return rMetrics;
}

Map<String, String> getDimensionMap(String metricsName) {
Map<String, String> dimensionMap = new HashMap<>();
dimensionMap.put("broker", brokerName);
Expand Down Expand Up @@ -179,4 +187,12 @@ public void recordConnectionCreateSuccess() {
public void recordConnectionCreateFail() {
this.connectionCreateFailCount.increment();
}

public void recordHealthCheckStatusSuccess() {
this.healthCheckStatus = 1;
}

public void recordHealthCheckStatusFail() {
this.healthCheckStatus = 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1607,6 +1607,38 @@ public void testBrokerConnectionStats() throws Exception {
assertEquals((long) map.get("brk_connection_create_fail_count"), 1);
}

/**
* There is detailed info about this test.
* see: https://github.com/apache/pulsar/issues/10150#issuecomment-1112380074
*/
@Test
public void testBrokerHealthCheckStatus() throws Exception {

cleanup();
conf.setSystemTopicEnabled(false);
conf.setTopicLevelPoliciesEnabled(false);
conf.setHealthCheckMetricsUpdateTimeInSeconds(60);
setup();
BrokerService brokerService = this.pulsar.getBrokerService();

Map<String, Object> map = null;

brokerService.checkHealth().get();
brokerService.updateRates();
Awaitility.await().until(() -> this.activeCount.get() == 1);
List<Metrics> metrics = brokerService.getTopicMetrics();
System.out.println(metrics);

for (int i = 0; i < metrics.size(); i++) {
if (metrics.get(i).getDimensions().containsValue("broker_health")) {
map = metrics.get(i).getMetrics();
break;
}
}
assertNotNull(map);
assertEquals(map.get("brk_health"), 1);
}

@Test
public void testPayloadCorruptionDetection() throws Exception {
final String topicName = "persistent://prop/ns-abc/topic1";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import org.apache.pulsar.broker.loadbalance.extensions.manager.UnloadManager;
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper;
import org.apache.pulsar.broker.service.AbstractTopic;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonitor;
Expand Down Expand Up @@ -1789,6 +1790,20 @@ public void testBrokerConnection() throws Exception {
compareBrokerConnectionStateCount(cm, 2.0);
}

@Test
public void testBrokerHealthCheckMetric() throws Exception {
conf.setHealthCheckMetricsUpdateTimeInSeconds(60);
BrokerService brokerService = pulsar.getBrokerService();
brokerService.checkHealth().get();
brokerService.updateRates();
ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
PrometheusMetricsTestUtil.generate(pulsar, true, false, false, statsOut);
String metricsStr = statsOut.toString();
Multimap<String, Metric> metrics = parseMetrics(metricsStr);
List<Metric> cm = (List<Metric>) metrics.get("pulsar_health");
compareBrokerConnectionStateCount(cm, 1);
}

private void compareBrokerConnectionStateCount(List<Metric> cm, double count) {
assertEquals(cm.size(), 1);
assertEquals(cm.get(0).tags.get("cluster"), "test");
Expand Down Expand Up @@ -1894,7 +1909,6 @@ public void testMetricsWithCache() throws Throwable {
PrometheusMetricsGenerator prometheusMetricsGenerator =
new PrometheusMetricsGenerator(pulsar, true, false, false,
false, clock);

String previousMetrics = null;
for (int a = 0; a < 4; a++) {
ByteArrayOutputStream statsOut1 = new ByteArrayOutputStream();
Expand All @@ -1908,7 +1922,6 @@ public void testMetricsWithCache() throws Throwable {
assertEquals(metricsStr1, metricsStr2);
assertNotEquals(metricsStr1, previousMetrics);
previousMetrics = metricsStr1;

// move time forward
currentTimeMillis.addAndGet(TimeUnit.SECONDS.toMillis(2));
}
Expand Down

0 comments on commit 6c7ec4c

Please sign in to comment.