Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat][misc] PIP-320: Add OpenTelemetry scaffolding #22010

Merged
merged 139 commits into from
Feb 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
139 commits
Select commit Hold shift + click to select a range
2749cee
Add OTel dependencies in pom
dragosvictor Jan 9, 2024
ebc5e92
Add PulsarBrokerOpenTelemetry class
dragosvictor Jan 9, 2024
68a5760
Add PulsarBrokerOpenTelemetry instance to PulsarService
dragosvictor Jan 9, 2024
b54628c
Cosmetic fix
dragosvictor Jan 9, 2024
b742550
Add worker and proxy providers
dragosvictor Jan 9, 2024
079e429
Add exporters to pom
dragosvictor Jan 9, 2024
5bd9ed3
Add dummy unit test
dragosvictor Jan 9, 2024
540c6c5
Add test utility
dragosvictor Jan 9, 2024
6268af5
Remove metric exporter customizer
dragosvictor Jan 9, 2024
7f39c05
Add argument matcher for MetricData
dragosvictor Jan 10, 2024
7058849
Add builder for OpenTelemetryService
dragosvictor Jan 11, 2024
96006c4
Add cardinality limit test
dragosvictor Jan 11, 2024
800f31b
Fix test
dragosvictor Jan 11, 2024
89c094e
Add OpenTelemetry SDK testing pom
dragosvictor Jan 12, 2024
a4f20e3
Merge remote-tracking branch 'origin/master' into pip-320-metric-scaf…
dragosvictor Jan 17, 2024
e31e428
Use InMemoryReader for tests
dragosvictor Jan 18, 2024
67d0226
Remove TestMetricProvider
dragosvictor Jan 18, 2024
f7844b6
Add serviceName parameter
dragosvictor Jan 18, 2024
7b485ca
Update tests
dragosvictor Jan 18, 2024
cb6c131
Add VisibleForTesting annotation for extraReaders field
dragosvictor Jan 18, 2024
fd1adaa
Rename class MetricDataMatcher under tests
dragosvictor Jan 18, 2024
bc6e23d
Move MetricDataMatcher to own class file
dragosvictor Jan 18, 2024
a14def0
Draft integration test containers
dragosvictor Jan 18, 2024
4d32e7f
Update container definitions
dragosvictor Jan 19, 2024
a93d9dc
Fix config file name typo for Prometheus
dragosvictor Jan 19, 2024
0148c59
Add MetricsTest draft
dragosvictor Jan 19, 2024
504181e
Add missing licenses
dragosvictor Jan 19, 2024
903677a
Rename otel-collector-config yaml file
dragosvictor Jan 19, 2024
d84e2d0
Configure cluster name for MetricsTest
dragosvictor Jan 19, 2024
c1ed51d
Fix container names and add wait strategies
dragosvictor Jan 19, 2024
50e33a9
Add opentelemetry java agent pom
dragosvictor Jan 19, 2024
6fcbf6a
Move javaagent out of runtime scope
dragosvictor Jan 19, 2024
16c7e1b
Fix missing dependencies issue
dragosvictor Jan 19, 2024
d567fdf
Add pulsar-broker test-jar to integration tests
dragosvictor Jan 19, 2024
70026c8
Cleanup otel collector config
dragosvictor Jan 19, 2024
6d91ced
Refactor tests
dragosvictor Jan 19, 2024
6ec7869
Cosmetic fix
dragosvictor Jan 19, 2024
661a384
Cosmetic fix
dragosvictor Jan 19, 2024
a7d86be
Allow specifying of function worker env for test PulsarCluster
dragosvictor Jan 19, 2024
4606bc1
Test fix
dragosvictor Jan 19, 2024
00ad59b
Disable function worker test
dragosvictor Jan 20, 2024
6e072e6
Disable OTel by default
dragosvictor Jan 23, 2024
d204f99
Add TODO regarding prometheus exporter issue
dragosvictor Jan 23, 2024
f35685f
Fix function worker integration testing
dragosvictor Jan 23, 2024
3efb866
Cosmetic fix
dragosvictor Jan 23, 2024
86dd4fa
Remove extra JAR imports
dragosvictor Jan 23, 2024
3dccf2e
Typo fix
dragosvictor Jan 23, 2024
e93c496
pom cleanup
dragosvictor Jan 23, 2024
a6b73b5
Remove prometheus container
dragosvictor Jan 23, 2024
266f81b
Remove serviceName parameter in OpenTelemetryService builder
dragosvictor Jan 23, 2024
569d134
Split test testIsClusterNameRequired into two
dragosvictor Jan 23, 2024
20e48d3
Add test todo
dragosvictor Jan 23, 2024
a91822a
Add prometheus exporter JARs
dragosvictor Jan 23, 2024
6de2f46
Allow cluster name to be overriden by env variables
dragosvictor Jan 25, 2024
eb10f81
Merge remote-tracking branch 'origin/master' into pip-320-metric-scaf…
dragosvictor Jan 29, 2024
c1401af
Test fixes
dragosvictor Jan 29, 2024
efe5fcc
Cosmetic test fixes
dragosvictor Jan 29, 2024
2dfd6ec
Cosmetic test fixes
dragosvictor Jan 30, 2024
063b641
Cosmetic test fixes
dragosvictor Jan 30, 2024
5bf7fdb
Abstract extra reader test setup details in OpenTelemetryService
dragosvictor Jan 30, 2024
3c7be40
Update LICENSE.bin.txt
dragosvictor Jan 30, 2024
c0aafa7
Fix ProxyConfiguration missing cluster name field in tests
dragosvictor Jan 30, 2024
7356ecc
Fix cluster name discrepancies in some proxy tests
dragosvictor Jan 30, 2024
9f5ad0a
Proxy test fixes
dragosvictor Jan 30, 2024
1ef19a6
Fix pulsar function tests missing cluster name
dragosvictor Jan 30, 2024
cc77905
Temporary assert cluster name not empty
dragosvictor Jan 30, 2024
0d55bbe
Add integration test group METRICS
dragosvictor Jan 30, 2024
bd57f73
Merge remote-tracking branch 'origin/master' into pip-320-metric-scaf…
dragosvictor Jan 30, 2024
ddb3092
Create pulsar-otel-metrics-provider artifact
dragosvictor Jan 31, 2024
65d3cc9
Revert pulsar-broker-common POM changes
dragosvictor Jan 31, 2024
b6be1bc
Update POMs
dragosvictor Jan 31, 2024
2ed1059
Add package-info
dragosvictor Jan 31, 2024
473a94f
Merge remote-tracking branch 'origin/master' into pip-320-metric-scaf…
dragosvictor Jan 31, 2024
19f3a4b
Remove animal-sniffer-annotations from server LICENSE
dragosvictor Jan 31, 2024
f742149
Import correct OTel library in POM
dragosvictor Jan 31, 2024
075c757
Exclude logs and traces dependencies from distribution
dragosvictor Jan 31, 2024
7912180
Minor cleanup
dragosvictor Jan 31, 2024
eae6616
Remove debug assertion
dragosvictor Jan 31, 2024
3816032
Add docs
dragosvictor Jan 31, 2024
bde20f3
Revert "Exclude logs and traces dependencies from distribution"
dragosvictor Jan 31, 2024
7b7ee9a
Revert pulsar-common/pom.xml changes
dragosvictor Jan 31, 2024
d74daaa
Add description to MetricsTest
dragosvictor Jan 31, 2024
f8d4929
Fix cluster name in AdminProxyHandlerKeystoreTLSTest
dragosvictor Jan 31, 2024
d14bbce
Close OpenTelemetry service on PulsarService.close
dragosvictor Jan 31, 2024
b887209
Add OpenTelemetryService description
dragosvictor Feb 1, 2024
3e22db8
Cosmetic fixes
dragosvictor Feb 1, 2024
575446d
Use OpenTelemetry for the Function Worker only, instead of the spawne…
dragosvictor Feb 1, 2024
5a7163a
Fix integration test to not launch worker processes
dragosvictor Feb 1, 2024
ae1205e
Cosmetic fixes
dragosvictor Feb 1, 2024
1c4b4c0
Move dependency to pulsar-functions/worker
dragosvictor Feb 1, 2024
dd80990
Revert pulsar-functions-instance changes
dragosvictor Feb 1, 2024
af61cc1
Add test OpenTelemetryServiceTest#testServiceIsDisabledByDefault
dragosvictor Feb 2, 2024
bba153e
Factor out attribute names for reuse
dragosvictor Feb 2, 2024
82acbe9
Add more attribute definitions
dragosvictor Feb 3, 2024
fa444c7
Use ServiceConfiguration parameter in PulsarBrokerOpenTelemetry const…
dragosvictor Feb 5, 2024
d84655b
Fix instrumentation scope names
dragosvictor Feb 5, 2024
5d7f0ab
Fix description in OpenTelemetryService
dragosvictor Feb 5, 2024
893aae3
Clarify comment regarding OTel cardinality limit
dragosvictor Feb 5, 2024
ce7676e
Remove redundant testDoubleCounter
dragosvictor Feb 5, 2024
e9e9078
Use ProxyConfiguration in PulsarProxyOpenTelemetry constructor
dragosvictor Feb 5, 2024
d6e5bf8
Cleanup creation of OTel Collector container in tests
dragosvictor Feb 5, 2024
b15e31a
Rename integration test to OpenTelemetrySanityTest
dragosvictor Feb 5, 2024
d9a1614
Generalize function worker container specs across all workers
dragosvictor Feb 5, 2024
7657b21
Refactor OpenTelemetrySanityTest
dragosvictor Feb 5, 2024
8bb00dd
Rename artifact to pulsar-otel-integration
dragosvictor Feb 5, 2024
542221c
Checkstyle fix
dragosvictor Feb 6, 2024
cdc34ed
Fix metric cardinality test
dragosvictor Feb 6, 2024
7a99baf
Fix metric cardinality test
dragosvictor Feb 6, 2024
ef6f148
Add service name and version attributes
dragosvictor Feb 6, 2024
fefcc89
Rename OpenTelemetrySanityTest.getOpenTelemetryProps
dragosvictor Feb 6, 2024
5e5551f
Remove extra attribute definitions until needed
dragosvictor Feb 6, 2024
4951a20
Refactor sdkBuilder customization in OpenTelemetryService
dragosvictor Feb 7, 2024
e71cf0e
Refactor OpenTelemetryService constructor
dragosvictor Feb 7, 2024
1af770f
Expose OpenTelemetry object from OpenTelemetryService
dragosvictor Feb 7, 2024
6eb9f16
Fix missing test dependency
dragosvictor Feb 7, 2024
3abb653
Inline fields
dragosvictor Feb 7, 2024
2b126e7
Cleanup MetricDataMatcher
dragosvictor Feb 7, 2024
a41b41d
Close OpenTelemetry in PulsarWorkerService
dragosvictor Feb 7, 2024
e1ecd3b
Cleanup property definitions in OpenTelemetryService
dragosvictor Feb 7, 2024
cd05268
Cleanup OpenTelemetryServiceTest
dragosvictor Feb 7, 2024
8d0a747
Cleanup OpenTelemetryServiceTest
dragosvictor Feb 7, 2024
eaafe92
Checkstyle fixes
dragosvictor Feb 7, 2024
1286960
Use service attributes from semconv
dragosvictor Feb 7, 2024
c6df6ff
Factor out attributes to interface OpenTelemetryAttributes
dragosvictor Feb 7, 2024
d50d0a2
Use OpenTelemetryAssertions test library
dragosvictor Feb 7, 2024
23f5595
Remove redundant MetricDataMatcher class
dragosvictor Feb 7, 2024
122c72b
Cleanup
dragosvictor Feb 7, 2024
fa95f9f
Add opentelemetry-semconv to LICENSE
dragosvictor Feb 7, 2024
ac8a2e9
Revert pulsar-broker minor pom change
dragosvictor Feb 8, 2024
e56d6f1
Merge remote-tracking branch 'origin/master' into pip-320-metric-scaf…
dragosvictor Feb 8, 2024
2c2dd46
Enable coverage for metrics integration tests
dragosvictor Feb 8, 2024
baa4fa9
Rename module to pulsar-opentelemetry
dragosvictor Feb 8, 2024
c5e4ae2
Rename package to org.apache.pulsar.opentelemetry
dragosvictor Feb 8, 2024
17c844d
Move PulsarWorkerOpenTelemetry to org.apache.pulsar.functions.worker
dragosvictor Feb 8, 2024
6d5dd72
Checkstyle fixes
dragosvictor Feb 8, 2024
fc5ba4f
Add Javadoc
dragosvictor Feb 8, 2024
7bfda76
Add automatic resource providers
dragosvictor Feb 9, 2024
7241867
Disable JarServiceNameDetector in unit tests
dragosvictor Feb 9, 2024
e7696fc
Merge branch 'master' into pip-320-metric-scaffolding
merlimat Feb 9, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/pulsar-ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,9 @@ jobs:
- name: Transaction
group: TRANSACTION

- name: Metrics
group: METRICS

steps:
- name: checkout
uses: actions/checkout@v4
Expand Down
4 changes: 4 additions & 0 deletions build/run_integration_group.sh
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,10 @@ test_group_transaction() {
mvn_run_integration_test "$@" -DintegrationTestSuiteFile=pulsar-transaction.xml -DintegrationTests
}

test_group_metrics() {
mvn_run_integration_test "$@" -DintegrationTestSuiteFile=pulsar-metrics.xml -DintegrationTests
}

test_group_tiered_filesystem() {
mvn_run_integration_test "$@" -DintegrationTestSuiteFile=tiered-filesystem-storage.xml -DintegrationTests
}
Expand Down
28 changes: 28 additions & 0 deletions distribution/server/src/assemble/LICENSE.bin.txt
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,13 @@ The Apache Software License, Version 2.0
- io.prometheus-simpleclient_tracer_common-0.16.0.jar
- io.prometheus-simpleclient_tracer_otel-0.16.0.jar
- io.prometheus-simpleclient_tracer_otel_agent-0.16.0.jar
* Prometheus exporter
- io.prometheus-prometheus-metrics-config-1.1.0.jar
- io.prometheus-prometheus-metrics-exporter-common-1.1.0.jar
- io.prometheus-prometheus-metrics-exporter-httpserver-1.1.0.jar
- io.prometheus-prometheus-metrics-exposition-formats-1.1.0.jar
- io.prometheus-prometheus-metrics-model-1.1.0.jar
- io.prometheus-prometheus-metrics-shaded-protobuf-1.1.0.jar
* Jakarta Bean Validation API
- jakarta.validation-jakarta.validation-api-2.0.2.jar
- javax.validation-validation-api-1.1.0.Final.jar
Expand Down Expand Up @@ -503,6 +510,27 @@ The Apache Software License, Version 2.0
* RoaringBitmap
- org.roaringbitmap-RoaringBitmap-0.9.44.jar
- org.roaringbitmap-shims-0.9.44.jar
* OpenTelemetry
- io.opentelemetry-opentelemetry-api-1.34.1.jar
- io.opentelemetry-opentelemetry-api-events-1.34.1-alpha.jar
- io.opentelemetry-opentelemetry-context-1.34.1.jar
- io.opentelemetry-opentelemetry-exporter-common-1.34.1.jar
- io.opentelemetry-opentelemetry-exporter-otlp-1.34.1.jar
- io.opentelemetry-opentelemetry-exporter-otlp-common-1.34.1.jar
- io.opentelemetry-opentelemetry-exporter-prometheus-1.34.1-alpha.jar
- io.opentelemetry-opentelemetry-exporter-sender-okhttp-1.34.1.jar
- io.opentelemetry-opentelemetry-extension-incubator-1.34.1-alpha.jar
- io.opentelemetry-opentelemetry-sdk-1.34.1.jar
- io.opentelemetry-opentelemetry-sdk-common-1.34.1.jar
- io.opentelemetry-opentelemetry-sdk-extension-autoconfigure-1.34.1.jar
- io.opentelemetry-opentelemetry-sdk-extension-autoconfigure-spi-1.34.1.jar
- io.opentelemetry-opentelemetry-sdk-logs-1.34.1.jar
- io.opentelemetry-opentelemetry-sdk-metrics-1.34.1.jar
- io.opentelemetry-opentelemetry-sdk-trace-1.34.1.jar
- io.opentelemetry.instrumentation-opentelemetry-instrumentation-api-1.32.1.jar
- io.opentelemetry.instrumentation-opentelemetry-instrumentation-api-semconv-1.32.1-alpha.jar
- io.opentelemetry.instrumentation-opentelemetry-resources-1.32.1-alpha.jar
- io.opentelemetry.semconv-opentelemetry-semconv-1.23.1-alpha.jar

BSD 3-clause "New" or "Revised" License
* Google auth library
Expand Down
31 changes: 31 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,10 @@ flexible messaging model and an intuitive client API.</description>
<disruptor.version>3.4.3</disruptor.version>
<zstd-jni.version>1.5.2-3</zstd-jni.version>
<netty-reactive-streams.version>2.0.6</netty-reactive-streams.version>
<opentelemetry.version>1.34.1</opentelemetry.version>
<opentelemetry.alpha.version>1.34.1-alpha</opentelemetry.alpha.version>
<opentelemetry.instrumentation.version>1.32.1-alpha</opentelemetry.instrumentation.version>
<opentelemetry.semconv.version>1.23.1-alpha</opentelemetry.semconv.version>

<!-- test dependencies -->
<testcontainers.version>1.18.3</testcontainers.version>
Expand Down Expand Up @@ -1446,6 +1450,31 @@ flexible messaging model and an intuitive client API.</description>
<version>${restassured.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-bom</artifactId>
<version>${opentelemetry.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-bom-alpha</artifactId>
<version>${opentelemetry.alpha.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>io.opentelemetry.instrumentation</groupId>
<artifactId>opentelemetry-resources</artifactId>
<version>${opentelemetry.instrumentation.version}</version>
</dependency>
<dependency>
<groupId>io.opentelemetry.semconv</groupId>
<artifactId>opentelemetry-semconv</artifactId>
<version>${opentelemetry.semconv.version}</version>
</dependency>
</dependencies>
</dependencyManagement>

Expand Down Expand Up @@ -2266,6 +2295,7 @@ flexible messaging model and an intuitive client API.</description>
<module>pulsar-broker-auth-sasl</module>
<module>pulsar-client-auth-sasl</module>
<module>pulsar-config-validation</module>
<module>pulsar-opentelemetry</module>

<module>structured-event-log</module>

Expand Down Expand Up @@ -2330,6 +2360,7 @@ flexible messaging model and an intuitive client API.</description>
<module>pulsar-broker-auth-sasl</module>
<module>pulsar-client-auth-sasl</module>
<module>pulsar-config-validation</module>
<module>pulsar-opentelemetry</module>

<!-- transaction related modules -->
<module>pulsar-transaction</module>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ void testAuthentication() throws Exception {
proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl());
proxyConfig.setSaslJaasClientAllowedIds(".*" + localHostname + ".*");
proxyConfig.setSaslJaasServerSectionName("PulsarProxy");
proxyConfig.setClusterName(configClusterName);

// proxy connect to broker
proxyConfig.setBrokerClientAuthenticationPlugin(AuthenticationSasl.class.getName());
Expand Down
18 changes: 18 additions & 0 deletions pulsar-broker-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,28 @@
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.rest-assured</groupId>
<artifactId>rest-assured</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>test-jar</goal>
</goals>
</execution>
</executions>
</plugin>

<plugin>
<groupId>org.gaul</groupId>
<artifactId>modernizer-maven-plugin</artifactId>
Expand Down
14 changes: 14 additions & 0 deletions pulsar-broker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,12 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-opentelemetry</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-io-batch-discovery-triggerers</artifactId>
Expand Down Expand Up @@ -209,6 +215,14 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-broker-common</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<!-- functions related dependencies (end) -->

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.service.schema.SchemaStorageFactory;
import org.apache.pulsar.broker.stats.MetricsGenerator;
import org.apache.pulsar.broker.stats.PulsarBrokerOpenTelemetry;
import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
import org.apache.pulsar.broker.stats.prometheus.PulsarPrometheusMetricsServlet;
import org.apache.pulsar.broker.storage.ManagedLedgerStorage;
Expand Down Expand Up @@ -248,6 +249,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
private final Timer brokerClientSharedTimer;

private MetricsGenerator metricsGenerator;
private PulsarBrokerOpenTelemetry openTelemetry;

private TransactionMetadataStoreService transactionMetadataStoreService;
private TransactionBufferProvider transactionBufferProvider;
Expand Down Expand Up @@ -461,6 +463,9 @@ public CompletableFuture<Void> closeAsync() {
}

resetMetricsServlet();
if (openTelemetry != null) {
openTelemetry.close();
}

if (this.compactionServiceFactory != null) {
try {
Expand Down Expand Up @@ -897,6 +902,7 @@ public void start() throws PulsarServerException {
}

this.metricsGenerator = new MetricsGenerator(this);
this.openTelemetry = new PulsarBrokerOpenTelemetry(config);
lhotari marked this conversation as resolved.
Show resolved Hide resolved

// Initialize the message protocol handlers.
// start the protocol handlers only after the broker is ready,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.stats;

import io.opentelemetry.api.metrics.Meter;
import java.io.Closeable;
import lombok.Getter;
import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.opentelemetry.OpenTelemetryService;

public class PulsarBrokerOpenTelemetry implements Closeable {

public static final String SERVICE_NAME = "pulsar-broker";
private final OpenTelemetryService openTelemetryService;

@Getter
private final Meter meter;

public PulsarBrokerOpenTelemetry(ServiceConfiguration config) {
openTelemetryService = OpenTelemetryService.builder()
.clusterName(config.getClusterName())
.serviceName(SERVICE_NAME)
.serviceVersion(PulsarVersion.getVersion())
.build();
meter = openTelemetryService.getOpenTelemetry().getMeter("org.apache.pulsar.broker");
}

@Override
public void close() {
openTelemetryService.close();
}
}
6 changes: 6 additions & 0 deletions pulsar-functions/worker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-opentelemetry</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-client-original</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.functions.worker;

import io.opentelemetry.api.metrics.Meter;
import java.io.Closeable;
import lombok.Getter;
import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.opentelemetry.OpenTelemetryService;

public class PulsarWorkerOpenTelemetry implements Closeable {

public static final String SERVICE_NAME = "pulsar-function-worker";
private final OpenTelemetryService openTelemetryService;

@Getter
private final Meter meter;

public PulsarWorkerOpenTelemetry(WorkerConfig workerConfig) {
openTelemetryService = OpenTelemetryService.builder()
.clusterName(workerConfig.getPulsarFunctionsCluster())
.serviceName(SERVICE_NAME)
.serviceVersion(PulsarVersion.getVersion())
.build();
meter = openTelemetryService.getOpenTelemetry().getMeter("org.apache.pulsar.function_worker");
}

@Override
public void close() {
openTelemetryService.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ public interface PulsarClientCreator {
private PulsarAdmin brokerAdmin;
private PulsarAdmin functionAdmin;
private MetricsGenerator metricsGenerator;
private PulsarWorkerOpenTelemetry openTelemetry;
@VisibleForTesting
private URI dlogUri;
private LeaderService leaderService;
Expand Down Expand Up @@ -188,6 +189,7 @@ public void init(WorkerConfig workerConfig,
this.statsUpdater = Executors
.newSingleThreadScheduledExecutor(new DefaultThreadFactory("worker-stats-updater"));
this.metricsGenerator = new MetricsGenerator(this.statsUpdater, workerConfig);
this.openTelemetry = new PulsarWorkerOpenTelemetry(workerConfig);
dragosvictor marked this conversation as resolved.
Show resolved Hide resolved
dragosvictor marked this conversation as resolved.
Show resolved Hide resolved
this.workerConfig = workerConfig;
this.dlogUri = dlogUri;
this.workerStatsManager = new WorkerStatsManager(workerConfig, runAsStandalone);
Expand Down Expand Up @@ -659,6 +661,10 @@ public void stop() {
if (null != stateStoreProvider) {
stateStoreProvider.close();
}

if (null != openTelemetry) {
openTelemetry.close();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@
@Slf4j
public class FunctionAssignmentTailerTest {

private static final String CLUSTER_NAME = "test-cluster";

@Test(timeOut = 10000)
public void testErrorNotifier() throws Exception {
WorkerConfig workerConfig = new WorkerConfig();
Expand All @@ -71,6 +73,7 @@ public void testErrorNotifier() throws Exception {
workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
workerConfig.setStateStorageServiceUrl("foo");
workerConfig.setFunctionAssignmentTopicName("assignments");
workerConfig.setPulsarFunctionsCluster(CLUSTER_NAME);
dragosvictor marked this conversation as resolved.
Show resolved Hide resolved

Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder().setFunctionDetails(
Function.FunctionDetails.newBuilder()
Expand Down Expand Up @@ -183,6 +186,7 @@ public void testProcessingAssignments() throws Exception {
workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
workerConfig.setStateStorageServiceUrl("foo");
workerConfig.setFunctionAssignmentTopicName("assignments");
workerConfig.setPulsarFunctionsCluster(CLUSTER_NAME);

Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder().setFunctionDetails(
Function.FunctionDetails.newBuilder()
Expand Down Expand Up @@ -307,6 +311,7 @@ public void testTriggerReadToTheEndAndExit() throws Exception {
workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
workerConfig.setStateStorageServiceUrl("foo");
workerConfig.setFunctionAssignmentTopicName("assignments");
workerConfig.setPulsarFunctionsCluster(CLUSTER_NAME);

Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder().setFunctionDetails(
Function.FunctionDetails.newBuilder()
Expand Down
Loading
Loading