Skip to content

Commit

Permalink
[feat][misc] PIP-320: Add OpenTelemetry scaffolding (#22010)
Browse files Browse the repository at this point in the history
Co-authored-by: Matteo Merli <mmerli@apache.org>
  • Loading branch information
dragosvictor and merlimat authored Feb 9, 2024
1 parent 2b75ca0 commit 8957e35
Show file tree
Hide file tree
Showing 63 changed files with 1,196 additions and 27 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/pulsar-ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,9 @@ jobs:
- name: Transaction
group: TRANSACTION

- name: Metrics
group: METRICS

steps:
- name: checkout
uses: actions/checkout@v4
Expand Down
4 changes: 4 additions & 0 deletions build/run_integration_group.sh
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,10 @@ test_group_transaction() {
mvn_run_integration_test "$@" -DintegrationTestSuiteFile=pulsar-transaction.xml -DintegrationTests
}

test_group_metrics() {
mvn_run_integration_test "$@" -DintegrationTestSuiteFile=pulsar-metrics.xml -DintegrationTests
}

test_group_tiered_filesystem() {
mvn_run_integration_test "$@" -DintegrationTestSuiteFile=tiered-filesystem-storage.xml -DintegrationTests
}
Expand Down
28 changes: 28 additions & 0 deletions distribution/server/src/assemble/LICENSE.bin.txt
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,13 @@ The Apache Software License, Version 2.0
- io.prometheus-simpleclient_tracer_common-0.16.0.jar
- io.prometheus-simpleclient_tracer_otel-0.16.0.jar
- io.prometheus-simpleclient_tracer_otel_agent-0.16.0.jar
* Prometheus exporter
- io.prometheus-prometheus-metrics-config-1.1.0.jar
- io.prometheus-prometheus-metrics-exporter-common-1.1.0.jar
- io.prometheus-prometheus-metrics-exporter-httpserver-1.1.0.jar
- io.prometheus-prometheus-metrics-exposition-formats-1.1.0.jar
- io.prometheus-prometheus-metrics-model-1.1.0.jar
- io.prometheus-prometheus-metrics-shaded-protobuf-1.1.0.jar
* Jakarta Bean Validation API
- jakarta.validation-jakarta.validation-api-2.0.2.jar
- javax.validation-validation-api-1.1.0.Final.jar
Expand Down Expand Up @@ -503,6 +510,27 @@ The Apache Software License, Version 2.0
* RoaringBitmap
- org.roaringbitmap-RoaringBitmap-0.9.44.jar
- org.roaringbitmap-shims-0.9.44.jar
* OpenTelemetry
- io.opentelemetry-opentelemetry-api-1.34.1.jar
- io.opentelemetry-opentelemetry-api-events-1.34.1-alpha.jar
- io.opentelemetry-opentelemetry-context-1.34.1.jar
- io.opentelemetry-opentelemetry-exporter-common-1.34.1.jar
- io.opentelemetry-opentelemetry-exporter-otlp-1.34.1.jar
- io.opentelemetry-opentelemetry-exporter-otlp-common-1.34.1.jar
- io.opentelemetry-opentelemetry-exporter-prometheus-1.34.1-alpha.jar
- io.opentelemetry-opentelemetry-exporter-sender-okhttp-1.34.1.jar
- io.opentelemetry-opentelemetry-extension-incubator-1.34.1-alpha.jar
- io.opentelemetry-opentelemetry-sdk-1.34.1.jar
- io.opentelemetry-opentelemetry-sdk-common-1.34.1.jar
- io.opentelemetry-opentelemetry-sdk-extension-autoconfigure-1.34.1.jar
- io.opentelemetry-opentelemetry-sdk-extension-autoconfigure-spi-1.34.1.jar
- io.opentelemetry-opentelemetry-sdk-logs-1.34.1.jar
- io.opentelemetry-opentelemetry-sdk-metrics-1.34.1.jar
- io.opentelemetry-opentelemetry-sdk-trace-1.34.1.jar
- io.opentelemetry.instrumentation-opentelemetry-instrumentation-api-1.32.1.jar
- io.opentelemetry.instrumentation-opentelemetry-instrumentation-api-semconv-1.32.1-alpha.jar
- io.opentelemetry.instrumentation-opentelemetry-resources-1.32.1-alpha.jar
- io.opentelemetry.semconv-opentelemetry-semconv-1.23.1-alpha.jar

BSD 3-clause "New" or "Revised" License
* Google auth library
Expand Down
31 changes: 31 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,10 @@ flexible messaging model and an intuitive client API.</description>
<disruptor.version>3.4.3</disruptor.version>
<zstd-jni.version>1.5.2-3</zstd-jni.version>
<netty-reactive-streams.version>2.0.6</netty-reactive-streams.version>
<opentelemetry.version>1.34.1</opentelemetry.version>
<opentelemetry.alpha.version>1.34.1-alpha</opentelemetry.alpha.version>
<opentelemetry.instrumentation.version>1.32.1-alpha</opentelemetry.instrumentation.version>
<opentelemetry.semconv.version>1.23.1-alpha</opentelemetry.semconv.version>

<!-- test dependencies -->
<testcontainers.version>1.18.3</testcontainers.version>
Expand Down Expand Up @@ -1446,6 +1450,31 @@ flexible messaging model and an intuitive client API.</description>
<version>${restassured.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-bom</artifactId>
<version>${opentelemetry.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-bom-alpha</artifactId>
<version>${opentelemetry.alpha.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>io.opentelemetry.instrumentation</groupId>
<artifactId>opentelemetry-resources</artifactId>
<version>${opentelemetry.instrumentation.version}</version>
</dependency>
<dependency>
<groupId>io.opentelemetry.semconv</groupId>
<artifactId>opentelemetry-semconv</artifactId>
<version>${opentelemetry.semconv.version}</version>
</dependency>
</dependencies>
</dependencyManagement>

Expand Down Expand Up @@ -2266,6 +2295,7 @@ flexible messaging model and an intuitive client API.</description>
<module>pulsar-broker-auth-sasl</module>
<module>pulsar-client-auth-sasl</module>
<module>pulsar-config-validation</module>
<module>pulsar-opentelemetry</module>

<module>structured-event-log</module>

Expand Down Expand Up @@ -2330,6 +2360,7 @@ flexible messaging model and an intuitive client API.</description>
<module>pulsar-broker-auth-sasl</module>
<module>pulsar-client-auth-sasl</module>
<module>pulsar-config-validation</module>
<module>pulsar-opentelemetry</module>

<!-- transaction related modules -->
<module>pulsar-transaction</module>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ void testAuthentication() throws Exception {
proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl());
proxyConfig.setSaslJaasClientAllowedIds(".*" + localHostname + ".*");
proxyConfig.setSaslJaasServerSectionName("PulsarProxy");
proxyConfig.setClusterName(configClusterName);

// proxy connect to broker
proxyConfig.setBrokerClientAuthenticationPlugin(AuthenticationSasl.class.getName());
Expand Down
18 changes: 18 additions & 0 deletions pulsar-broker-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,28 @@
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.rest-assured</groupId>
<artifactId>rest-assured</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>test-jar</goal>
</goals>
</execution>
</executions>
</plugin>

<plugin>
<groupId>org.gaul</groupId>
<artifactId>modernizer-maven-plugin</artifactId>
Expand Down
14 changes: 14 additions & 0 deletions pulsar-broker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,12 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-opentelemetry</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-io-batch-discovery-triggerers</artifactId>
Expand Down Expand Up @@ -209,6 +215,14 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-broker-common</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<!-- functions related dependencies (end) -->

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.service.schema.SchemaStorageFactory;
import org.apache.pulsar.broker.stats.MetricsGenerator;
import org.apache.pulsar.broker.stats.PulsarBrokerOpenTelemetry;
import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
import org.apache.pulsar.broker.stats.prometheus.PulsarPrometheusMetricsServlet;
import org.apache.pulsar.broker.storage.ManagedLedgerStorage;
Expand Down Expand Up @@ -248,6 +249,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
private final Timer brokerClientSharedTimer;

private MetricsGenerator metricsGenerator;
private PulsarBrokerOpenTelemetry openTelemetry;

private TransactionMetadataStoreService transactionMetadataStoreService;
private TransactionBufferProvider transactionBufferProvider;
Expand Down Expand Up @@ -461,6 +463,9 @@ public CompletableFuture<Void> closeAsync() {
}

resetMetricsServlet();
if (openTelemetry != null) {
openTelemetry.close();
}

if (this.compactionServiceFactory != null) {
try {
Expand Down Expand Up @@ -897,6 +902,7 @@ public void start() throws PulsarServerException {
}

this.metricsGenerator = new MetricsGenerator(this);
this.openTelemetry = new PulsarBrokerOpenTelemetry(config);

// Initialize the message protocol handlers.
// start the protocol handlers only after the broker is ready,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.stats;

import io.opentelemetry.api.metrics.Meter;
import java.io.Closeable;
import lombok.Getter;
import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.opentelemetry.OpenTelemetryService;

public class PulsarBrokerOpenTelemetry implements Closeable {

public static final String SERVICE_NAME = "pulsar-broker";
private final OpenTelemetryService openTelemetryService;

@Getter
private final Meter meter;

public PulsarBrokerOpenTelemetry(ServiceConfiguration config) {
openTelemetryService = OpenTelemetryService.builder()
.clusterName(config.getClusterName())
.serviceName(SERVICE_NAME)
.serviceVersion(PulsarVersion.getVersion())
.build();
meter = openTelemetryService.getOpenTelemetry().getMeter("org.apache.pulsar.broker");
}

@Override
public void close() {
openTelemetryService.close();
}
}
6 changes: 6 additions & 0 deletions pulsar-functions/worker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-opentelemetry</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-client-original</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.functions.worker;

import io.opentelemetry.api.metrics.Meter;
import java.io.Closeable;
import lombok.Getter;
import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.opentelemetry.OpenTelemetryService;

public class PulsarWorkerOpenTelemetry implements Closeable {

public static final String SERVICE_NAME = "pulsar-function-worker";
private final OpenTelemetryService openTelemetryService;

@Getter
private final Meter meter;

public PulsarWorkerOpenTelemetry(WorkerConfig workerConfig) {
openTelemetryService = OpenTelemetryService.builder()
.clusterName(workerConfig.getPulsarFunctionsCluster())
.serviceName(SERVICE_NAME)
.serviceVersion(PulsarVersion.getVersion())
.build();
meter = openTelemetryService.getOpenTelemetry().getMeter("org.apache.pulsar.function_worker");
}

@Override
public void close() {
openTelemetryService.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ public interface PulsarClientCreator {
private PulsarAdmin brokerAdmin;
private PulsarAdmin functionAdmin;
private MetricsGenerator metricsGenerator;
private PulsarWorkerOpenTelemetry openTelemetry;
@VisibleForTesting
private URI dlogUri;
private LeaderService leaderService;
Expand Down Expand Up @@ -188,6 +189,7 @@ public void init(WorkerConfig workerConfig,
this.statsUpdater = Executors
.newSingleThreadScheduledExecutor(new DefaultThreadFactory("worker-stats-updater"));
this.metricsGenerator = new MetricsGenerator(this.statsUpdater, workerConfig);
this.openTelemetry = new PulsarWorkerOpenTelemetry(workerConfig);
this.workerConfig = workerConfig;
this.dlogUri = dlogUri;
this.workerStatsManager = new WorkerStatsManager(workerConfig, runAsStandalone);
Expand Down Expand Up @@ -659,6 +661,10 @@ public void stop() {
if (null != stateStoreProvider) {
stateStoreProvider.close();
}

if (null != openTelemetry) {
openTelemetry.close();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@
@Slf4j
public class FunctionAssignmentTailerTest {

private static final String CLUSTER_NAME = "test-cluster";

@Test(timeOut = 10000)
public void testErrorNotifier() throws Exception {
WorkerConfig workerConfig = new WorkerConfig();
Expand All @@ -71,6 +73,7 @@ public void testErrorNotifier() throws Exception {
workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
workerConfig.setStateStorageServiceUrl("foo");
workerConfig.setFunctionAssignmentTopicName("assignments");
workerConfig.setPulsarFunctionsCluster(CLUSTER_NAME);

Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder().setFunctionDetails(
Function.FunctionDetails.newBuilder()
Expand Down Expand Up @@ -183,6 +186,7 @@ public void testProcessingAssignments() throws Exception {
workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
workerConfig.setStateStorageServiceUrl("foo");
workerConfig.setFunctionAssignmentTopicName("assignments");
workerConfig.setPulsarFunctionsCluster(CLUSTER_NAME);

Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder().setFunctionDetails(
Function.FunctionDetails.newBuilder()
Expand Down Expand Up @@ -307,6 +311,7 @@ public void testTriggerReadToTheEndAndExit() throws Exception {
workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
workerConfig.setStateStorageServiceUrl("foo");
workerConfig.setFunctionAssignmentTopicName("assignments");
workerConfig.setPulsarFunctionsCluster(CLUSTER_NAME);

Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder().setFunctionDetails(
Function.FunctionDetails.newBuilder()
Expand Down
Loading

0 comments on commit 8957e35

Please sign in to comment.