diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml index daaeae9a9ccee..ff9d2311b0fa1 100644 --- a/conf/functions_worker.yml +++ b/conf/functions_worker.yml @@ -28,12 +28,6 @@ pulsarServiceUrl: pulsar://localhost:6650 pulsarWebServiceUrl: http://localhost:8080 numFunctionPackageReplicas: 1 downloadDirectory: /tmp/pulsar_functions -metricsConfig: - metricsSinkClassName: org.apache.pulsar.functions.metrics.sink.PrometheusSink - metricsCollectionInterval: 30 - metricsSinkConfig: - path: /metrics - port: 9099 #threadContainerFactory: # threadGroupName: "Thread Function Container Group" processContainerFactory: @@ -45,3 +39,4 @@ failureCheckFreqMs: 30000 rescheduleTimeoutMs: 60000 initialBrokerReconnectMaxRetries: 60 assignmentWriteMaxRetries: 60 +instanceLivenessCheckFreqMs: 30000 diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java index 18693dae8b068..22115ea607782 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java @@ -23,7 +23,7 @@ import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.common.policies.data.ReplicatorStats; -import org.apache.pulsar.utils.SimpleTextOutputStream; +import org.apache.pulsar.common.util.SimpleTextOutputStream; import io.netty.util.concurrent.FastThreadLocal; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java index e7bcda1b04b90..167ec1c619857 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java @@ -24,7 +24,7 @@ import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.stats.metrics.JvmMetrics; -import org.apache.pulsar.utils.SimpleTextOutputStream; +import org.apache.pulsar.common.util.SimpleTextOutputStream; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; @@ -36,6 +36,7 @@ import io.prometheus.client.Gauge; import io.prometheus.client.Gauge.Child; import io.prometheus.client.hotspot.DefaultExports; +import org.apache.pulsar.functions.worker.FunctionsStatsGenerator; /** * Generate metrics aggregated at the namespace level and optionally at a topic level and formats them out @@ -71,6 +72,9 @@ public static void generate(PulsarService pulsar, boolean includeTopicMetrics, O NamespaceStatsAggregator.generate(pulsar, includeTopicMetrics, stream); + FunctionsStatsGenerator.generate(pulsar.getWorkerService(), + pulsar.getConfiguration().getClusterName(), stream); + out.write(buf.array(), buf.arrayOffset(), buf.readableBytes()); } finally { buf.release(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java index 0dc8749bc892f..b924177c3affa 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java @@ -60,7 +60,6 @@ public void init() throws ServletException { protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { AsyncContext context = request.startAsync(); - executor.execute(safeRun(() -> { HttpServletResponse res = (HttpServletResponse) context.getResponse(); try { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java index a750b75581e01..f5938b613a976 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java @@ -20,7 +20,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl; import org.apache.bookkeeper.mledger.util.StatsBuckets; -import org.apache.pulsar.utils.SimpleTextOutputStream; +import org.apache.pulsar.common.util.SimpleTextOutputStream; import java.util.HashMap; import java.util.Map; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/utils/StatsOutputStream.java b/pulsar-broker/src/main/java/org/apache/pulsar/utils/StatsOutputStream.java index 7bdc66b2cb2fb..90d5c3e16a812 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/utils/StatsOutputStream.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/utils/StatsOutputStream.java @@ -21,6 +21,7 @@ import java.util.Stack; import io.netty.buffer.ByteBuf; +import org.apache.pulsar.common.util.SimpleTextOutputStream; public class StatsOutputStream extends SimpleTextOutputStream { private final Stack separators = new Stack<>(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/utils/SimpleTextOutputStreamTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/utils/SimpleTextOutputStreamTest.java index 57a5b643dfea7..030ff3037e05d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/utils/SimpleTextOutputStreamTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/utils/SimpleTextOutputStreamTest.java @@ -22,8 +22,7 @@ import java.nio.charset.Charset; -import org.apache.pulsar.utils.SimpleTextOutputStream; -import org.apache.pulsar.utils.StatsOutputStream; +import org.apache.pulsar.common.util.SimpleTextOutputStream; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java index ae348ae5fc073..b38aed2dd936a 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java @@ -477,8 +477,7 @@ void runCmd() throws Exception { instanceConfig, userCodeFile, containerFactory, - null, - 0); + null); spawners.add(runtimeSpawner); runtimeSpawner.start(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/utils/NumberFormat.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/NumberFormat.java similarity index 98% rename from pulsar-broker/src/main/java/org/apache/pulsar/utils/NumberFormat.java rename to pulsar-common/src/main/java/org/apache/pulsar/common/util/NumberFormat.java index 0db4adbba8099..eaf3664f65835 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/utils/NumberFormat.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/NumberFormat.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.utils; +package org.apache.pulsar.common.util; import io.netty.buffer.ByteBuf; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/utils/SimpleTextOutputStream.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/SimpleTextOutputStream.java similarity index 94% rename from pulsar-broker/src/main/java/org/apache/pulsar/utils/SimpleTextOutputStream.java rename to pulsar-common/src/main/java/org/apache/pulsar/common/util/SimpleTextOutputStream.java index 091c529c64987..3f75fde1184bf 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/utils/SimpleTextOutputStream.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/SimpleTextOutputStream.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.utils; +package org.apache.pulsar.common.util; import io.netty.buffer.ByteBuf; @@ -99,17 +99,17 @@ public SimpleTextOutputStream writeEncoded(String s) { return this; } - SimpleTextOutputStream write(boolean b) { + public SimpleTextOutputStream write(boolean b) { write(b ? "true" : "false"); return this; } - SimpleTextOutputStream write(long n) { + public SimpleTextOutputStream write(long n) { NumberFormat.format(this.buffer, n); return this; } - SimpleTextOutputStream write(double d) { + public SimpleTextOutputStream write(double d) { long i = (long) d; write(i); @@ -131,5 +131,4 @@ SimpleTextOutputStream write(double d) { write(r); return this; } - } diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java index 08bfbe4d4642b..594ea10d92772 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java @@ -36,6 +36,7 @@ import java.lang.reflect.Type; import java.util.Map; +import java.util.concurrent.ExecutionException; /** * A function container implemented using java thread. @@ -166,8 +167,7 @@ public void start() throws Exception { instanceConfig, jarFile, containerFactory, - null, - 0); + null); server = ServerBuilder.forPort(port) .addService(new InstanceControlImpl(runtimeSpawner)) @@ -221,5 +221,22 @@ public void getFunctionStatus(Empty request, StreamObserver responseObserver) { + Runtime runtime = runtimeSpawner.getRuntime(); + if (runtime != null) { + try { + InstanceCommunication.MetricsData metrics = runtime.getAndResetMetrics().get(); + responseObserver.onNext(metrics); + responseObserver.onCompleted(); + } catch (InterruptedException | ExecutionException e) { + log.error("Exception in JavaInstance doing getAndResetMetrics", e); + throw new RuntimeException(e); + } + } + } + } } diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java index d6c203a353951..6bcb988e1f14e 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java @@ -31,10 +31,6 @@ import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.functions.instance.InstanceConfig; import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus; -import org.apache.pulsar.functions.runtime.RuntimeFactory; -import org.apache.pulsar.functions.runtime.Runtime; -import org.apache.pulsar.functions.metrics.MetricsSink; -import org.apache.pulsar.functions.utils.FunctionConfigUtils; @Slf4j public class RuntimeSpawner implements AutoCloseable { @@ -45,22 +41,19 @@ public class RuntimeSpawner implements AutoCloseable { @Getter private Runtime runtime; - private MetricsSink metricsSink; - private int metricsCollectionInterval; - private Timer metricsCollectionTimer; + private Timer processLivenessCheckTimer; private int numRestarts; + private Long instanceLivenessCheckFreqMs; + public RuntimeSpawner(InstanceConfig instanceConfig, String codeFile, - RuntimeFactory containerFactory, - MetricsSink metricsSink, - int metricsCollectionInterval) { + RuntimeFactory containerFactory, Long instanceLivenessCheckFreqMs) { this.instanceConfig = instanceConfig; this.runtimeFactory = containerFactory; this.codeFile = codeFile; - this.metricsSink = metricsSink; - this.metricsCollectionInterval = metricsCollectionInterval; this.numRestarts = 0; + this.instanceLivenessCheckFreqMs = instanceLivenessCheckFreqMs; } public void start() throws Exception { @@ -68,34 +61,21 @@ public void start() throws Exception { this.instanceConfig.getInstanceId()); runtime = runtimeFactory.createContainer(this.instanceConfig, codeFile); runtime.start(); - if (metricsSink != null) { - log.info("Scheduling Metrics Collection every {} secs for {} - {}", - metricsCollectionInterval, - FunctionConfigUtils.getFullyQualifiedName(instanceConfig.getFunctionConfig()), - instanceConfig.getInstanceId()); - metricsCollectionTimer = new Timer(); - metricsCollectionTimer.scheduleAtFixedRate(new TimerTask() { + + // monitor function runtime to make sure it is running. If not, restart the function runtime + if (instanceLivenessCheckFreqMs != null) { + processLivenessCheckTimer = new Timer(); + processLivenessCheckTimer.scheduleAtFixedRate(new TimerTask() { @Override public void run() { - if (runtime.isAlive()) { - - log.info("Collecting metrics for function {} - {}", - FunctionConfigUtils.getFullyQualifiedName(instanceConfig.getFunctionConfig()), - instanceConfig.getInstanceId()); - runtime.getAndResetMetrics().thenAccept(t -> { - if (t != null) { - log.debug("Collected metrics {}", t); - metricsSink.processRecord(t, instanceConfig.getFunctionConfig()); - } - }); - } else { + if (!runtime.isAlive()) { log.error("Function Container is dead with exception", runtime.getDeathException()); log.error("Restarting..."); runtime.start(); numRestarts++; } } - }, metricsCollectionInterval * 1000, metricsCollectionInterval * 1000); + }, instanceLivenessCheckFreqMs, instanceLivenessCheckFreqMs); } } @@ -122,9 +102,9 @@ public void close() { runtime.stop(); runtime = null; } - if (metricsCollectionTimer != null) { - metricsCollectionTimer.cancel(); - metricsCollectionTimer = null; + if (processLivenessCheckTimer != null) { + processLivenessCheckTimer.cancel(); + processLivenessCheckTimer = null; } } } diff --git a/pulsar-functions/worker/pom.xml b/pulsar-functions/worker/pom.xml index 3dab594f6bf37..a4e9f034ccae7 100644 --- a/pulsar-functions/worker/pom.xml +++ b/pulsar-functions/worker/pom.xml @@ -118,6 +118,12 @@ ${project.version} + + ${project.groupId} + pulsar-common + ${project.version} + + org.glassfish.jersey.media jersey-media-json-jackson diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java index 296e7d0f6a7fa..1eb3d288ae02c 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java @@ -32,7 +32,6 @@ import org.apache.pulsar.functions.proto.Function.FunctionMetaData; import org.apache.pulsar.functions.runtime.RuntimeFactory; import org.apache.pulsar.functions.instance.InstanceConfig; -import org.apache.pulsar.functions.metrics.MetricsSink; import org.apache.pulsar.functions.runtime.RuntimeSpawner; import org.apache.pulsar.functions.utils.FunctionConfigUtils; @@ -52,8 +51,6 @@ public class FunctionActioner implements AutoCloseable { private final WorkerConfig workerConfig; private final RuntimeFactory runtimeFactory; - private final MetricsSink metricsSink; - private final int metricsCollectionInterval; private final Namespace dlogNamespace; private LinkedBlockingQueue actionQueue; private volatile boolean running; @@ -61,14 +58,10 @@ public class FunctionActioner implements AutoCloseable { public FunctionActioner(WorkerConfig workerConfig, RuntimeFactory runtimeFactory, - MetricsSink metricsSink, - int metricCollectionInterval, Namespace dlogNamespace, LinkedBlockingQueue actionQueue) { this.workerConfig = workerConfig; this.runtimeFactory = runtimeFactory; - this.metricsSink = metricsSink; - this.metricsCollectionInterval = metricCollectionInterval; this.dlogNamespace = dlogNamespace; this.actionQueue = actionQueue; actioner = new Thread(() -> { @@ -168,8 +161,8 @@ private void startFunction(FunctionRuntimeInfo functionRuntimeInfo) throws Excep instanceConfig.setFunctionVersion(UUID.randomUUID().toString()); instanceConfig.setInstanceId(String.valueOf(instanceId)); instanceConfig.setMaxBufferedTuples(1024); - RuntimeSpawner runtimeSpawner = new RuntimeSpawner(instanceConfig, pkgFile.getAbsolutePath(), runtimeFactory, - metricsSink, metricsCollectionInterval); + RuntimeSpawner runtimeSpawner = new RuntimeSpawner(instanceConfig, pkgFile.getAbsolutePath(), + runtimeFactory, workerConfig.getInstanceLivenessCheckFreqMs()); functionRuntimeInfo.setRuntimeSpawner(runtimeSpawner); runtimeSpawner.start(); diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java index 1d65078a1029f..a20c782e062d1 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java @@ -31,7 +31,6 @@ import org.apache.pulsar.functions.runtime.RuntimeFactory; import org.apache.pulsar.functions.runtime.ProcessRuntimeFactory; import org.apache.pulsar.functions.runtime.ThreadRuntimeFactory; -import org.apache.pulsar.functions.metrics.MetricsSink; import org.apache.pulsar.functions.runtime.RuntimeSpawner; import javax.ws.rs.client.Client; @@ -73,8 +72,6 @@ public class FunctionRuntimeManager implements AutoCloseable{ private final FunctionAssignmentTailer functionAssignmentTailer; - private MetricsSink metricsSink; - private FunctionActioner functionActioner; private RuntimeFactory runtimeFactory; @@ -110,12 +107,9 @@ public FunctionRuntimeManager(WorkerConfig workerConfig, throw new RuntimeException("Either Thread or Process Container Factory need to be set"); } - this.metricsSink = createMetricsSink(); - this.actionQueue = new LinkedBlockingQueue<>(); this.functionActioner = new FunctionActioner(this.workerConfig, runtimeFactory, - this.metricsSink, this.workerConfig.getMetricsConfig().getMetricsCollectionInterval(), dlogNamespace, actionQueue); this.membershipManager = membershipManager; @@ -127,7 +121,6 @@ public FunctionRuntimeManager(WorkerConfig workerConfig, public void start() { log.info("/** Starting Function Runtime Manager **/"); log.info("Initialize metrics sink..."); - this.metricsSink.init(this.workerConfig.getMetricsConfig().getMetricsSinkConfig()); log.info("Starting function actioner..."); this.functionActioner.start(); log.info("Starting function assignment tailer..."); @@ -428,6 +421,9 @@ public synchronized void processAssignmentUpdate(MessageId messageId, Assignment } } + public Map getFunctionRuntimeInfos() { + return this.functionRuntimeInfoMap; + } /** * Private methods for internal use. Should not be used outside of this class */ @@ -517,20 +513,6 @@ public void close() throws Exception { this.functionAssignmentTailer.close(); } - private MetricsSink createMetricsSink() { - String className = workerConfig.getMetricsConfig().getMetricsSinkClassName(); - try { - MetricsSink sink = (MetricsSink) Class.forName(className).newInstance(); - return sink; - } catch (InstantiationException e) { - throw new RuntimeException(e + " IMetricsSink class must have a no-arg constructor."); - } catch (IllegalAccessException e) { - throw new RuntimeException(e + " IMetricsSink class must be concrete."); - } catch (ClassNotFoundException e) { - throw new RuntimeException(e + " IMetricsSink class must be a class path."); - } - } - private Map diff(Map assignmentMap1, Map assignmentMap2) { Map result = new HashMap<>(); for (Map.Entry entry : assignmentMap1.entrySet()) { diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java new file mode 100644 index 0000000000000..c9fd53987a352 --- /dev/null +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.worker; + +import org.apache.pulsar.functions.proto.InstanceCommunication; +import org.apache.pulsar.functions.runtime.Runtime; +import org.apache.pulsar.functions.runtime.RuntimeSpawner; +import org.apache.pulsar.common.util.SimpleTextOutputStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.concurrent.ExecutionException; + +/** + * A class to generate stats for pulsar functions running on this broker + */ +public class FunctionsStatsGenerator { + + private static final Logger log = LoggerFactory.getLogger(FunctionsStatsGenerator.class); + + public static void generate(WorkerService workerService, String cluster, SimpleTextOutputStream out) { + if (workerService != null) { + Map functionRuntimes + = workerService.getFunctionRuntimeManager().getFunctionRuntimeInfos(); + + for (Map.Entry entry : functionRuntimes.entrySet()) { + String fullyQualifiedInstanceName = entry.getKey(); + FunctionRuntimeInfo functionRuntimeInfo = entry.getValue(); + RuntimeSpawner functionRuntimeSpawner = functionRuntimeInfo.getRuntimeSpawner(); + + if (functionRuntimeSpawner != null) { + Runtime functionRuntime = functionRuntimeSpawner.getRuntime(); + if (functionRuntime != null) { + try { + InstanceCommunication.MetricsData metrics = functionRuntime.getAndResetMetrics().get(); + for (Map.Entry metricsEntry + : metrics.getMetricsMap().entrySet()) { + String metricName = metricsEntry.getKey(); + InstanceCommunication.MetricsData.DataDigest dataDigest = metricsEntry.getValue(); + + String tenant = functionRuntimeInfo.getFunctionInstance() + .getFunctionMetaData().getFunctionConfig().getTenant(); + String namespace = functionRuntimeInfo.getFunctionInstance() + .getFunctionMetaData().getFunctionConfig().getNamespace(); + String name = functionRuntimeInfo.getFunctionInstance() + .getFunctionMetaData().getFunctionConfig().getName(); + int instanceId = functionRuntimeInfo.getFunctionInstance().getInstanceId(); + String qualifiedNamespace = String.format("%s/%s", tenant, namespace); + + metric(out, cluster, qualifiedNamespace, name, String.format("pulsar_function%scount", metricName), + instanceId, dataDigest.getCount()); + metric(out, cluster, qualifiedNamespace, name, String.format("pulsar_function%smax", metricName), + instanceId, dataDigest.getMax()); + metric(out, cluster, qualifiedNamespace,name, String.format("pulsar_function%smin", metricName), + instanceId, dataDigest.getMin()); + metric(out, cluster, qualifiedNamespace, name, String.format("pulsar_function%ssum", metricName), + instanceId, dataDigest.getSum()); + + } + } catch (InterruptedException | ExecutionException e) { + log.warn("Failed to collect metrics for function instance {}", + fullyQualifiedInstanceName, e); + } + } + } + } + } + } + + private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, + String functionName, String metricName, int instanceId, double value) { + stream.write(metricName).write("{cluster=\"").write(cluster).write("\", namespace=\"").write(namespace) + .write("\", name=\"").write(functionName).write("\", instanceId=\"").write(instanceId).write("\"} "); + stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n'); + } +} diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java index 579572a5875fa..fb875b5ee73e1 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java @@ -49,7 +49,6 @@ public class WorkerConfig implements Serializable { private String pulsarFunctionsNamespace; private int numFunctionPackageReplicas; private String downloadDirectory; - private MetricsConfig metricsConfig; private long snapshotFreqMs; private String stateStorageServiceUrl; private String functionAssignmentTopicName; @@ -58,6 +57,7 @@ public class WorkerConfig implements Serializable { private long rescheduleTimeoutMs; private int initialBrokerReconnectMaxRetries; private int assignmentWriteMaxRetries; + private long instanceLivenessCheckFreqMs; @Data @Setter @@ -97,30 +97,4 @@ public static WorkerConfig load(String yamlFile) throws IOException { ObjectMapper mapper = new ObjectMapper(new YAMLFactory()); return mapper.readValue(new File(yamlFile), WorkerConfig.class); } - - @Data - @Setter - @Getter - @EqualsAndHashCode - @ToString - @AllArgsConstructor - @NoArgsConstructor - @Accessors(chain = true) - /** - * This represents the config related to the resource limits of function calls - */ - public static class MetricsConfig implements Serializable { - - private static final long serialVersionUID = 1L; - - private String metricsSinkClassName; - private int metricsCollectionInterval; - private Map metricsSinkConfig; - - public static MetricsConfig load(String yamlFile) throws IOException { - ObjectMapper mapper = new ObjectMapper(new YAMLFactory()); - return mapper.readValue(new File(yamlFile), MetricsConfig.class); - } - - } } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java index 732ab07c97715..20ffd8d9452e9 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java @@ -58,21 +58,6 @@ public WorkerService(WorkerConfig workerConfig) { this.workerConfig = workerConfig; } - public static ServletContextHandler newServletContextHandler(String contextPath, WorkerService workerService) { - final ResourceConfig config = new ResourceConfig(Resources.get()); - final ServletContextHandler contextHandler = - new ServletContextHandler(ServletContextHandler.NO_SESSIONS); - - contextHandler.setAttribute(FunctionApiResource.ATTRIBUTE_FUNCTION_WORKER, workerService); - contextHandler.setContextPath(contextPath); - - final ServletHolder apiServlet = - new ServletHolder(new ServletContainer(config)); - contextHandler.addServlet(apiServlet, "/*"); - - return contextHandler; - } - public void start() throws InterruptedException { try { start(FunctionMetadataSetup.setupFunctionMetadata(workerConfig)); diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/Resources.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/Resources.java index 702ab4cc08720..ff43e2501b00b 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/Resources.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/Resources.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.functions.worker.rest; +import org.apache.pulsar.functions.worker.rest.api.FunctionsMetricsResource; import org.apache.pulsar.functions.worker.rest.api.v2.FunctionApiV2Resource; import org.glassfish.jersey.media.multipart.MultiPartFeature; @@ -31,15 +32,19 @@ public final class Resources { private Resources() { } - public static Set> get() { - return new HashSet<>(getClasses()); + public static Set> getApiResources() { + return new HashSet<>( + Arrays.asList( + FunctionApiV2Resource.class, + MultiPartFeature.class + )); } - private static List> getClasses() { - return Arrays.asList( - ConfigurationResource.class, - FunctionApiV2Resource.class, - MultiPartFeature.class - ); + public static Set> getRootResources() { + return new HashSet<>( + Arrays.asList( + ConfigurationResource.class, + FunctionsMetricsResource.class + )); } } \ No newline at end of file diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java index 0e897da189cc6..524a6ad295315 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java @@ -31,6 +31,10 @@ import org.eclipse.jetty.server.handler.ContextHandlerCollection; import org.eclipse.jetty.server.handler.DefaultHandler; import org.eclipse.jetty.server.handler.HandlerCollection; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.servlet.ServletHolder; +import org.glassfish.jersey.server.ResourceConfig; +import org.glassfish.jersey.servlet.ServletContainer; @Slf4j public class WorkerServer implements Runnable { @@ -57,8 +61,13 @@ public void run() { final Server server = new Server(this.workerConfig.getWorkerPort()); List handlers = new ArrayList<>(2); - handlers.add(WorkerService.newServletContextHandler("/admin", workerService)); - handlers.add(WorkerService.newServletContextHandler("/admin/v2", workerService)); + handlers.add(newServletContextHandler("/admin", + new ResourceConfig(Resources.getApiResources()), workerService)); + handlers.add(newServletContextHandler("/admin/v2", + new ResourceConfig(Resources.getApiResources()), workerService)); + handlers.add(newServletContextHandler("/", + new ResourceConfig(Resources.getRootResources()), workerService)); + ContextHandlerCollection contexts = new ContextHandlerCollection(); contexts.setHandlers(handlers.toArray(new Handler[handlers.size()])); HandlerCollection handlerCollection = new HandlerCollection(); @@ -86,4 +95,18 @@ contexts, new DefaultHandler() public String getThreadName() { return "worker-server-thread-" + this.workerConfig.getWorkerId(); } + + public static ServletContextHandler newServletContextHandler(String contextPath, ResourceConfig config, WorkerService workerService) { + final ServletContextHandler contextHandler = + new ServletContextHandler(ServletContextHandler.NO_SESSIONS); + + contextHandler.setAttribute(FunctionApiResource.ATTRIBUTE_FUNCTION_WORKER, workerService); + contextHandler.setContextPath(contextPath); + + final ServletHolder apiServlet = + new ServletHolder(new ServletContainer(config)); + contextHandler.addServlet(apiServlet, "/*"); + + return contextHandler; + } } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsMetricsResource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsMetricsResource.java new file mode 100644 index 0000000000000..0b8706041a2b4 --- /dev/null +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsMetricsResource.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.worker.rest.api; + +import com.fasterxml.jackson.core.JsonProcessingException; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import org.apache.pulsar.common.util.SimpleTextOutputStream; +import org.apache.pulsar.functions.worker.FunctionsStatsGenerator; +import org.apache.pulsar.functions.worker.WorkerService; +import org.apache.pulsar.functions.worker.rest.FunctionApiResource; + +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.StreamingOutput; + +@Path("/") +public class FunctionsMetricsResource extends FunctionApiResource { + @Path("metrics") + @GET + @Produces(MediaType.TEXT_PLAIN) + public Response getMetrics() throws JsonProcessingException { + + WorkerService workerService = get(); + + ByteBuf buf = ByteBufAllocator.DEFAULT.heapBuffer(); + try { + SimpleTextOutputStream stream = new SimpleTextOutputStream(buf); + FunctionsStatsGenerator.generate(workerService,"default", stream); + byte[] payload = buf.array(); + int arrayOffset = buf.arrayOffset(); + int readableBytes = buf.readableBytes(); + StreamingOutput streamOut = out -> { + out.write(payload, arrayOffset, readableBytes); + out.flush(); + }; + return Response.ok(streamOut).build(); + } finally { + buf.release(); + } + } +} diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java index 25ecf0a9329a1..7effcd8d84eaf 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java @@ -128,5 +128,4 @@ public Response getCluster() { public Response getAssignments() { return functions.getAssignments(); } - } diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java index 327b4f6ddff09..3bca55c319885 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java @@ -79,7 +79,6 @@ public void testProcessAssignmentUpdateAddFunctions() throws Exception { workerConfig.setPulsarServiceUrl("pulsar://localhost:6650"); workerConfig.setStateStorageServiceUrl("foo"); workerConfig.setFunctionAssignmentTopicName("assignments"); - workerConfig.setMetricsConfig(new WorkerConfig.MetricsConfig().setMetricsSinkClassName(TestSink.class.getName())); PulsarClient pulsarClient = mock(PulsarClient.class); ReaderBuilder readerBuilder = mock(ReaderBuilder.class); @@ -173,7 +172,6 @@ public void testProcessAssignmentUpdateDeleteFunctions() throws Exception { workerConfig.setThreadContainerFactory(new WorkerConfig.ThreadContainerFactory().setThreadGroupName("test")); workerConfig.setPulsarServiceUrl("pulsar://localhost:6650"); workerConfig.setStateStorageServiceUrl("foo"); - workerConfig.setMetricsConfig(new WorkerConfig.MetricsConfig().setMetricsSinkClassName(TestSink.class.getName())); PulsarClient pulsarClient = mock(PulsarClient.class); ReaderBuilder readerBuilder = mock(ReaderBuilder.class); @@ -271,7 +269,6 @@ public void testProcessAssignmentUpdateModifyFunctions() throws Exception { workerConfig.setThreadContainerFactory(new WorkerConfig.ThreadContainerFactory().setThreadGroupName("test")); workerConfig.setPulsarServiceUrl("pulsar://localhost:6650"); workerConfig.setStateStorageServiceUrl("foo"); - workerConfig.setMetricsConfig(new WorkerConfig.MetricsConfig().setMetricsSinkClassName(TestSink.class.getName())); PulsarClient pulsarClient = mock(PulsarClient.class); ReaderBuilder readerBuilder = mock(ReaderBuilder.class); diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java new file mode 100644 index 0000000000000..5bdb81275afd1 --- /dev/null +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java @@ -0,0 +1,201 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.worker; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import lombok.ToString; +import org.apache.pulsar.common.util.SimpleTextOutputStream; +import org.apache.pulsar.functions.proto.Function; +import org.apache.pulsar.functions.proto.InstanceCommunication; +import org.apache.pulsar.functions.runtime.Runtime; +import org.apache.pulsar.functions.runtime.RuntimeSpawner; +import org.testng.Assert; +import org.testng.annotations.Test; + +import java.nio.charset.Charset; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.CompletableFuture; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static com.google.common.base.Preconditions.checkArgument; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.testng.Assert.assertEquals; + +public class FunctionStatsGeneratorTest { + + @Test + public void testFunctionsStatsGenerate() { + FunctionRuntimeManager functionRuntimeManager = mock(FunctionRuntimeManager.class); + Map functionRuntimeInfoMap = new HashMap<>(); + + WorkerService workerService = mock(WorkerService.class); + doReturn(functionRuntimeManager).when(workerService).getFunctionRuntimeManager(); + + CompletableFuture metricsDataCompletableFuture = new CompletableFuture<>(); + InstanceCommunication.MetricsData metricsData = InstanceCommunication.MetricsData.newBuilder() + .putMetrics( + "__total_processed__", + InstanceCommunication.MetricsData.DataDigest.newBuilder() + .setCount(100.0).setMax(200.0).setSum(300.0).setMin(0.0).build()) + .putMetrics("__avg_latency_ms__", + InstanceCommunication.MetricsData.DataDigest.newBuilder() + .setCount(10.0).setMax(20.0).setSum(30.0).setMin(0.0).build()) + .build(); + + metricsDataCompletableFuture.complete(metricsData); + Runtime runtime = mock(Runtime.class); + doReturn(metricsDataCompletableFuture).when(runtime).getAndResetMetrics(); + + RuntimeSpawner runtimeSpawner = mock(RuntimeSpawner.class); + doReturn(runtime).when(runtimeSpawner).getRuntime(); + + Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder().setFunctionConfig( + Function.FunctionConfig.newBuilder() + .setTenant("test-tenant").setNamespace("test-namespace").setName("func-1")).build(); + + Function.Instance instance = Function.Instance.newBuilder() + .setFunctionMetaData(function1).setInstanceId(0).build(); + + FunctionRuntimeInfo functionRuntimeInfo = mock(FunctionRuntimeInfo.class); + doReturn(runtimeSpawner).when(functionRuntimeInfo).getRuntimeSpawner(); + doReturn(instance).when(functionRuntimeInfo).getFunctionInstance(); + + functionRuntimeInfoMap.put(Utils.getFullyQualifiedInstanceId(instance), functionRuntimeInfo); + doReturn(functionRuntimeInfoMap).when(functionRuntimeManager).getFunctionRuntimeInfos(); + + ByteBuf buf = ByteBufAllocator.DEFAULT.heapBuffer(); + SimpleTextOutputStream statsOut = new SimpleTextOutputStream(buf); + FunctionsStatsGenerator.generate(workerService, "default", statsOut); + + String str = buf.toString(Charset.defaultCharset()); + buf.release(); + Map metrics = parseMetrics(str); + + Assert.assertEquals(metrics.size(), 8); + + Metric m = metrics.get("pulsar_function__total_processed__count"); + assertEquals(m.tags.get("cluster"), "default"); + assertEquals(m.tags.get("instanceId"), "0"); + assertEquals(m.tags.get("name"), "func-1"); + assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace"); + assertEquals(m.value, 100.0); + + m = metrics.get("pulsar_function__total_processed__max"); + assertEquals(m.tags.get("cluster"), "default"); + assertEquals(m.tags.get("instanceId"), "0"); + assertEquals(m.tags.get("name"), "func-1"); + assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace"); + assertEquals(m.value, 200.0); + + m = metrics.get("pulsar_function__total_processed__sum"); + assertEquals(m.tags.get("cluster"), "default"); + assertEquals(m.tags.get("instanceId"), "0"); + assertEquals(m.tags.get("name"), "func-1"); + assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace"); + assertEquals(m.value, 300.0); + + m = metrics.get("pulsar_function__total_processed__min"); + assertEquals(m.tags.get("cluster"), "default"); + assertEquals(m.tags.get("instanceId"), "0"); + assertEquals(m.tags.get("name"), "func-1"); + assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace"); + assertEquals(m.value, 0.0); + + m = metrics.get("pulsar_function__avg_latency_ms__count"); + assertEquals(m.tags.get("cluster"), "default"); + assertEquals(m.tags.get("instanceId"), "0"); + assertEquals(m.tags.get("name"), "func-1"); + assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace"); + assertEquals(m.value, 10.0); + + m = metrics.get("pulsar_function__avg_latency_ms__max"); + assertEquals(m.tags.get("cluster"), "default"); + assertEquals(m.tags.get("instanceId"), "0"); + assertEquals(m.tags.get("name"), "func-1"); + assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace"); + assertEquals(m.value, 20.0); + + m = metrics.get("pulsar_function__avg_latency_ms__sum"); + assertEquals(m.tags.get("cluster"), "default"); + assertEquals(m.tags.get("instanceId"), "0"); + assertEquals(m.tags.get("name"), "func-1"); + assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace"); + assertEquals(m.value, 30.0); + + m = metrics.get("pulsar_function__avg_latency_ms__min"); + assertEquals(m.tags.get("cluster"), "default"); + assertEquals(m.tags.get("instanceId"), "0"); + assertEquals(m.tags.get("name"), "func-1"); + assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace"); + assertEquals(m.value, 0.0); + } + + /** + * Hacky parsing of Prometheus text format. Sould be good enough for unit tests + */ + private static Map parseMetrics(String metrics) { + Map parsed = new HashMap<>(); + + // Example of lines are + // jvm_threads_current{cluster="standalone",} 203.0 + // or + // pulsar_subscriptions_count{cluster="standalone", namespace="sample/standalone/ns1", + // topic="persistent://sample/standalone/ns1/test-2"} 0.0 1517945780897 + Pattern pattern = Pattern.compile("^(\\w+)\\{([^\\}]+)\\}\\s(-?[\\d\\w\\.]+)(\\s(\\d+))?$"); + Pattern tagsPattern = Pattern.compile("(\\w+)=\"([^\"]+)\"(,\\s?)?"); + + Arrays.asList(metrics.split("\n")).forEach(line -> { + if (line.isEmpty()) { + return; + } + Matcher matcher = pattern.matcher(line); + + checkArgument(matcher.matches()); + String name = matcher.group(1); + + Metric m = new Metric(); + m.value = Double.valueOf(matcher.group(3)); + + String tags = matcher.group(2); + Matcher tagsMatcher = tagsPattern.matcher(tags); + while (tagsMatcher.find()) { + String tag = tagsMatcher.group(1); + String value = tagsMatcher.group(2); + m.tags.put(tag, value); + } + + parsed.put(name, m); + }); + + return parsed; + } + + @ToString + static class Metric { + Map tags = new TreeMap<>(); + double value; + } + +} diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java index 43b08a17c9fd9..4fd72b992683b 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java @@ -60,7 +60,6 @@ public MembershipManagerTest() { workerConfig.setThreadContainerFactory(new WorkerConfig.ThreadContainerFactory().setThreadGroupName("test")); workerConfig.setPulsarServiceUrl("pulsar://localhost:6650"); workerConfig.setStateStorageServiceUrl("foo"); - workerConfig.setMetricsConfig(new WorkerConfig.MetricsConfig().setMetricsSinkClassName(FunctionRuntimeManagerTest.TestSink.class.getName())); } @Test @@ -250,7 +249,6 @@ public void testCheckFailuresSomeUnassigned() throws Exception { workerConfig.setThreadContainerFactory(new WorkerConfig.ThreadContainerFactory().setThreadGroupName("test")); workerConfig.setPulsarServiceUrl("pulsar://localhost:6650"); workerConfig.setStateStorageServiceUrl("foo"); - workerConfig.setMetricsConfig(new WorkerConfig.MetricsConfig().setMetricsSinkClassName(FunctionRuntimeManagerTest.TestSink.class.getName())); workerConfig.setRescheduleTimeoutMs(30000); SchedulerManager schedulerManager = mock(SchedulerManager.class); PulsarClient pulsarClient = mock(PulsarClient.class); diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java index a5ada20e87979..b115b1767a34a 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java @@ -69,8 +69,6 @@ public void setup() throws PulsarClientException { workerConfig.setPulsarServiceUrl("pulsar://localhost:6650"); workerConfig.setStateStorageServiceUrl("foo"); workerConfig.setFunctionAssignmentTopicName("assignments"); - workerConfig.setMetricsConfig(new WorkerConfig.MetricsConfig() - .setMetricsSinkClassName(FunctionRuntimeManagerTest.TestSink.class.getName())); workerConfig.setSchedulerClassName(RoundRobinScheduler.class.getName()); workerConfig.setAssignmentWriteMaxRetries(0);