Skip to content

Commit

Permalink
Functions metrics prometheus (#1373)
Browse files Browse the repository at this point in the history
* adding pulsar function stats to broker prometheus

* refactoring class name

* adding missing license header

* refactoring code and removing function's metrics sink

* fixing header

* adding instance liveness check

* adding null check

* adding unittest
  • Loading branch information
jerrypeng authored and merlimat committed Mar 15, 2018
1 parent 672047c commit 6fbd8c3
Show file tree
Hide file tree
Showing 26 changed files with 455 additions and 147 deletions.
7 changes: 1 addition & 6 deletions conf/functions_worker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,6 @@ pulsarServiceUrl: pulsar://localhost:6650
pulsarWebServiceUrl: http://localhost:8080
numFunctionPackageReplicas: 1
downloadDirectory: /tmp/pulsar_functions
metricsConfig:
metricsSinkClassName: org.apache.pulsar.functions.metrics.sink.PrometheusSink
metricsCollectionInterval: 30
metricsSinkConfig:
path: /metrics
port: 9099
#threadContainerFactory:
# threadGroupName: "Thread Function Container Group"
processContainerFactory:
Expand All @@ -45,3 +39,4 @@ failureCheckFreqMs: 30000
rescheduleTimeoutMs: 60000
initialBrokerReconnectMaxRetries: 60
assignmentWriteMaxRetries: 60
instanceLivenessCheckFreqMs: 30000
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.common.policies.data.ReplicatorStats;
import org.apache.pulsar.utils.SimpleTextOutputStream;
import org.apache.pulsar.common.util.SimpleTextOutputStream;

import io.netty.util.concurrent.FastThreadLocal;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.stats.metrics.JvmMetrics;
import org.apache.pulsar.utils.SimpleTextOutputStream;
import org.apache.pulsar.common.util.SimpleTextOutputStream;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
Expand All @@ -36,6 +36,7 @@
import io.prometheus.client.Gauge;
import io.prometheus.client.Gauge.Child;
import io.prometheus.client.hotspot.DefaultExports;
import org.apache.pulsar.functions.worker.FunctionsStatsGenerator;

/**
* Generate metrics aggregated at the namespace level and optionally at a topic level and formats them out
Expand Down Expand Up @@ -71,6 +72,9 @@ public static void generate(PulsarService pulsar, boolean includeTopicMetrics, O

NamespaceStatsAggregator.generate(pulsar, includeTopicMetrics, stream);

FunctionsStatsGenerator.generate(pulsar.getWorkerService(),
pulsar.getConfiguration().getClusterName(), stream);

out.write(buf.array(), buf.arrayOffset(), buf.readableBytes());
} finally {
buf.release();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ public void init() throws ServletException {
protected void doGet(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException {
AsyncContext context = request.startAsync();

executor.execute(safeRun(() -> {
HttpServletResponse res = (HttpServletResponse) context.getResponse();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl;
import org.apache.bookkeeper.mledger.util.StatsBuckets;
import org.apache.pulsar.utils.SimpleTextOutputStream;
import org.apache.pulsar.common.util.SimpleTextOutputStream;

import java.util.HashMap;
import java.util.Map;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Stack;

import io.netty.buffer.ByteBuf;
import org.apache.pulsar.common.util.SimpleTextOutputStream;

public class StatsOutputStream extends SimpleTextOutputStream {
private final Stack<Boolean> separators = new Stack<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@

import java.nio.charset.Charset;

import org.apache.pulsar.utils.SimpleTextOutputStream;
import org.apache.pulsar.utils.StatsOutputStream;
import org.apache.pulsar.common.util.SimpleTextOutputStream;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -477,8 +477,7 @@ void runCmd() throws Exception {
instanceConfig,
userCodeFile,
containerFactory,
null,
0);
null);
spawners.add(runtimeSpawner);
runtimeSpawner.start();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.utils;
package org.apache.pulsar.common.util;

import io.netty.buffer.ByteBuf;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.utils;
package org.apache.pulsar.common.util;

import io.netty.buffer.ByteBuf;

Expand Down Expand Up @@ -99,17 +99,17 @@ public SimpleTextOutputStream writeEncoded(String s) {
return this;
}

SimpleTextOutputStream write(boolean b) {
public SimpleTextOutputStream write(boolean b) {
write(b ? "true" : "false");
return this;
}

SimpleTextOutputStream write(long n) {
public SimpleTextOutputStream write(long n) {
NumberFormat.format(this.buffer, n);
return this;
}

SimpleTextOutputStream write(double d) {
public SimpleTextOutputStream write(double d) {
long i = (long) d;
write(i);

Expand All @@ -131,5 +131,4 @@ SimpleTextOutputStream write(double d) {
write(r);
return this;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

import java.lang.reflect.Type;
import java.util.Map;
import java.util.concurrent.ExecutionException;

/**
* A function container implemented using java thread.
Expand Down Expand Up @@ -166,8 +167,7 @@ public void start() throws Exception {
instanceConfig,
jarFile,
containerFactory,
null,
0);
null);

server = ServerBuilder.forPort(port)
.addService(new InstanceControlImpl(runtimeSpawner))
Expand Down Expand Up @@ -221,5 +221,22 @@ public void getFunctionStatus(Empty request, StreamObserver<InstanceCommunicatio
throw new RuntimeException(e);
}
}

@Override
public void getAndResetMetrics(com.google.protobuf.Empty request,
io.grpc.stub.StreamObserver<org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData> responseObserver) {
Runtime runtime = runtimeSpawner.getRuntime();
if (runtime != null) {
try {
InstanceCommunication.MetricsData metrics = runtime.getAndResetMetrics().get();
responseObserver.onNext(metrics);
responseObserver.onCompleted();
} catch (InterruptedException | ExecutionException e) {
log.error("Exception in JavaInstance doing getAndResetMetrics", e);
throw new RuntimeException(e);
}
}
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,6 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
import org.apache.pulsar.functions.runtime.RuntimeFactory;
import org.apache.pulsar.functions.runtime.Runtime;
import org.apache.pulsar.functions.metrics.MetricsSink;
import org.apache.pulsar.functions.utils.FunctionConfigUtils;

@Slf4j
public class RuntimeSpawner implements AutoCloseable {
Expand All @@ -45,57 +41,41 @@ public class RuntimeSpawner implements AutoCloseable {

@Getter
private Runtime runtime;
private MetricsSink metricsSink;
private int metricsCollectionInterval;
private Timer metricsCollectionTimer;
private Timer processLivenessCheckTimer;
private int numRestarts;
private Long instanceLivenessCheckFreqMs;


public RuntimeSpawner(InstanceConfig instanceConfig,
String codeFile,
RuntimeFactory containerFactory,
MetricsSink metricsSink,
int metricsCollectionInterval) {
RuntimeFactory containerFactory, Long instanceLivenessCheckFreqMs) {
this.instanceConfig = instanceConfig;
this.runtimeFactory = containerFactory;
this.codeFile = codeFile;
this.metricsSink = metricsSink;
this.metricsCollectionInterval = metricsCollectionInterval;
this.numRestarts = 0;
this.instanceLivenessCheckFreqMs = instanceLivenessCheckFreqMs;
}

public void start() throws Exception {
log.info("RuntimeSpawner starting function {} - {}", this.instanceConfig.getFunctionConfig().getName(),
this.instanceConfig.getInstanceId());
runtime = runtimeFactory.createContainer(this.instanceConfig, codeFile);
runtime.start();
if (metricsSink != null) {
log.info("Scheduling Metrics Collection every {} secs for {} - {}",
metricsCollectionInterval,
FunctionConfigUtils.getFullyQualifiedName(instanceConfig.getFunctionConfig()),
instanceConfig.getInstanceId());
metricsCollectionTimer = new Timer();
metricsCollectionTimer.scheduleAtFixedRate(new TimerTask() {

// monitor function runtime to make sure it is running. If not, restart the function runtime
if (instanceLivenessCheckFreqMs != null) {
processLivenessCheckTimer = new Timer();
processLivenessCheckTimer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
if (runtime.isAlive()) {

log.info("Collecting metrics for function {} - {}",
FunctionConfigUtils.getFullyQualifiedName(instanceConfig.getFunctionConfig()),
instanceConfig.getInstanceId());
runtime.getAndResetMetrics().thenAccept(t -> {
if (t != null) {
log.debug("Collected metrics {}", t);
metricsSink.processRecord(t, instanceConfig.getFunctionConfig());
}
});
} else {
if (!runtime.isAlive()) {
log.error("Function Container is dead with exception", runtime.getDeathException());
log.error("Restarting...");
runtime.start();
numRestarts++;
}
}
}, metricsCollectionInterval * 1000, metricsCollectionInterval * 1000);
}, instanceLivenessCheckFreqMs, instanceLivenessCheckFreqMs);
}
}

Expand All @@ -122,9 +102,9 @@ public void close() {
runtime.stop();
runtime = null;
}
if (metricsCollectionTimer != null) {
metricsCollectionTimer.cancel();
metricsCollectionTimer = null;
if (processLivenessCheckTimer != null) {
processLivenessCheckTimer.cancel();
processLivenessCheckTimer = null;
}
}
}
6 changes: 6 additions & 0 deletions pulsar-functions/worker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,12 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-common</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.glassfish.jersey.media</groupId>
<artifactId>jersey-media-json-jackson</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
import org.apache.pulsar.functions.runtime.RuntimeFactory;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.metrics.MetricsSink;
import org.apache.pulsar.functions.runtime.RuntimeSpawner;
import org.apache.pulsar.functions.utils.FunctionConfigUtils;

Expand All @@ -52,23 +51,17 @@ public class FunctionActioner implements AutoCloseable {

private final WorkerConfig workerConfig;
private final RuntimeFactory runtimeFactory;
private final MetricsSink metricsSink;
private final int metricsCollectionInterval;
private final Namespace dlogNamespace;
private LinkedBlockingQueue<FunctionAction> actionQueue;
private volatile boolean running;
private Thread actioner;

public FunctionActioner(WorkerConfig workerConfig,
RuntimeFactory runtimeFactory,
MetricsSink metricsSink,
int metricCollectionInterval,
Namespace dlogNamespace,
LinkedBlockingQueue<FunctionAction> actionQueue) {
this.workerConfig = workerConfig;
this.runtimeFactory = runtimeFactory;
this.metricsSink = metricsSink;
this.metricsCollectionInterval = metricCollectionInterval;
this.dlogNamespace = dlogNamespace;
this.actionQueue = actionQueue;
actioner = new Thread(() -> {
Expand Down Expand Up @@ -168,8 +161,8 @@ private void startFunction(FunctionRuntimeInfo functionRuntimeInfo) throws Excep
instanceConfig.setFunctionVersion(UUID.randomUUID().toString());
instanceConfig.setInstanceId(String.valueOf(instanceId));
instanceConfig.setMaxBufferedTuples(1024);
RuntimeSpawner runtimeSpawner = new RuntimeSpawner(instanceConfig, pkgFile.getAbsolutePath(), runtimeFactory,
metricsSink, metricsCollectionInterval);
RuntimeSpawner runtimeSpawner = new RuntimeSpawner(instanceConfig, pkgFile.getAbsolutePath(),
runtimeFactory, workerConfig.getInstanceLivenessCheckFreqMs());

functionRuntimeInfo.setRuntimeSpawner(runtimeSpawner);
runtimeSpawner.start();
Expand Down
Loading

0 comments on commit 6fbd8c3

Please sign in to comment.