Skip to content

Commit

Permalink
Adds telemetry service
Browse files Browse the repository at this point in the history
Signed-off-by: Gagan Juneja <gjjuneja@amazon.com>
  • Loading branch information
Gagan Juneja committed May 13, 2024
1 parent cd55bca commit d30fca6
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 39 deletions.
49 changes: 10 additions & 39 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.telemetry.metrics.MetricsRegistryFactory;
import org.opensearch.telemetry.metrics.NoopMetricsRegistryFactory;
import org.opensearch.telemetry.service.TelemetryService;
import org.opensearch.telemetry.tracing.NoopTracerFactory;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.telemetry.tracing.TracerFactory;
Expand Down Expand Up @@ -409,9 +410,6 @@ public static class DiscoverySettings {
private final Collection<LifecycleComponent> pluginLifecycleComponents;
private final LocalNodeFactory localNodeFactory;
private final NodeService nodeService;
private final Tracer tracer;

private final MetricsRegistry metricsRegistry;
final NamedWriteableRegistry namedWriteableRegistry;
private final AtomicReference<RunnableTaskExecutionListener> runnableTaskListener;
private FileCache fileCache;
Expand Down Expand Up @@ -614,36 +612,9 @@ protected Node(
);
}

TracerFactory tracerFactory;
MetricsRegistryFactory metricsRegistryFactory;
if (FeatureFlags.isEnabled(TELEMETRY)) {
final TelemetrySettings telemetrySettings = new TelemetrySettings(settings, clusterService.getClusterSettings());
if (telemetrySettings.isTracingFeatureEnabled() || telemetrySettings.isMetricsFeatureEnabled()) {
List<TelemetryPlugin> telemetryPlugins = pluginsService.filterPlugins(TelemetryPlugin.class);
TelemetryModule telemetryModule = new TelemetryModule(telemetryPlugins, telemetrySettings);
if (telemetrySettings.isTracingFeatureEnabled()) {
tracerFactory = new TracerFactory(telemetrySettings, telemetryModule.getTelemetry(), threadPool.getThreadContext());
} else {
tracerFactory = new NoopTracerFactory();
}
if (telemetrySettings.isMetricsFeatureEnabled()) {
metricsRegistryFactory = new MetricsRegistryFactory(telemetrySettings, telemetryModule.getTelemetry());
} else {
metricsRegistryFactory = new NoopMetricsRegistryFactory();
}
} else {
tracerFactory = new NoopTracerFactory();
metricsRegistryFactory = new NoopMetricsRegistryFactory();
}
} else {
tracerFactory = new NoopTracerFactory();
metricsRegistryFactory = new NoopMetricsRegistryFactory();
}

tracer = tracerFactory.getTracer();
metricsRegistry = metricsRegistryFactory.getMetricsRegistry();
resourcesToClose.add(tracer::close);
resourcesToClose.add(metricsRegistry::close);
TelemetryService telemetryService = new TelemetryService(settings, clusterService.getClusterSettings(), pluginsService, threadPool);
pluginsService.setTelemetryService(telemetryService);
resourcesToClose.add(telemetryService);

final ClusterInfoService clusterInfoService = newClusterInfoService(settings, clusterService, threadPool, client);
final UsageService usageService = new UsageService();
Expand Down Expand Up @@ -982,7 +953,7 @@ protected Node(
networkService,
restController,
clusterService.getClusterSettings(),
tracer,
telemetryService.getTracer(),
transportInterceptors,
secureSettingsFactories
);
Expand Down Expand Up @@ -1016,7 +987,7 @@ protected Node(
localNodeFactory,
settingsModule.getClusterSettings(),
taskHeaders,
tracer
telemetryService.getTracer()
);
TopNSearchTasksLogger taskConsumer = new TopNSearchTasksLogger(settings, settingsModule.getClusterSettings());
transportService.getTaskManager().registerTaskResourceConsumer(taskConsumer);
Expand Down Expand Up @@ -1320,10 +1291,11 @@ protected Node(
b.bind(ResourceUsageCollectorService.class).toInstance(resourceUsageCollectorService);
b.bind(SystemIndices.class).toInstance(systemIndices);
b.bind(IdentityService.class).toInstance(identityService);
b.bind(Tracer.class).toInstance(tracer);
b.bind(Tracer.class).toInstance(telemetryService.getTracer());
b.bind(SearchRequestStats.class).toInstance(searchRequestStats);
b.bind(SearchRequestSlowLog.class).toInstance(searchRequestSlowLog);
b.bind(MetricsRegistry.class).toInstance(metricsRegistry);
b.bind(TelemetryService.class).toInstance(telemetryService);
b.bind(MetricsRegistry.class).toInstance(telemetryService.getMetricsRegistry());
b.bind(RemoteClusterStateService.class).toProvider(() -> remoteClusterStateService);
b.bind(RemoteIndexPathUploader.class).toProvider(() -> remoteIndexPathUploader);
b.bind(PersistedStateRegistry.class).toInstance(persistedStateRegistry);
Expand Down Expand Up @@ -1710,8 +1682,7 @@ public synchronized void close() throws IOException {
toClose.add(injector.getInstance(NodeEnvironment.class));
toClose.add(stopWatch::stop);
if (FeatureFlags.isEnabled(TELEMETRY)) {
toClose.add(injector.getInstance(Tracer.class));
toClose.add(injector.getInstance(MetricsRegistry.class));
toClose.add(injector.getInstance(TelemetryService.class));
}

if (logger.isTraceEnabled()) {
Expand Down
7 changes: 7 additions & 0 deletions server/src/main/java/org/opensearch/plugins/Plugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.opensearch.index.shard.IndexSettingProvider;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.script.ScriptService;
import org.opensearch.telemetry.service.TelemetryService;
import org.opensearch.threadpool.ExecutorBuilder;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.watcher.ResourceWatcherService;
Expand Down Expand Up @@ -278,4 +279,10 @@ public Collection<IndexSettingProvider> getAdditionalIndexSettingProviders() {
public Optional<SecureSettingsFactory> getSecureSettingFactory(Settings settings) {
return Optional.empty();
}

/**
* Called to set the {@link TelemetryService} and plugins can use it for generating the instrumentations.
* @param telemetryService telemetry service
*/
public void setTelemetryService(TelemetryService telemetryService){}
}
11 changes: 11 additions & 0 deletions server/src/main/java/org/opensearch/plugins/PluginsService.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.opensearch.core.service.ReportingService;
import org.opensearch.index.IndexModule;
import org.opensearch.semver.SemverRange;
import org.opensearch.telemetry.service.TelemetryService;
import org.opensearch.threadpool.ExecutorBuilder;
import org.opensearch.transport.TransportSettings;

Expand Down Expand Up @@ -312,6 +313,16 @@ public void onIndexModule(IndexModule indexModule) {
}
}

/**
* Sets TelemetryService to all the plugins. Which can be used to instrument the plugin code.
* @param telemetryService telemetry service
*/
public void setTelemetryService(TelemetryService telemetryService) {
for (Tuple<PluginInfo, Plugin> plugin : plugins) {
plugin.v2().setTelemetryService(telemetryService);
}
}

/**
* Get information about plugins and modules
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.telemetry.service;

import java.io.IOException;
import java.util.List;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.plugins.PluginsService;
import org.opensearch.plugins.TelemetryPlugin;
import org.opensearch.telemetry.TelemetryModule;
import org.opensearch.telemetry.TelemetrySettings;
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.telemetry.metrics.MetricsRegistryFactory;
import org.opensearch.telemetry.metrics.NoopMetricsRegistryFactory;
import org.opensearch.telemetry.tracing.NoopTracerFactory;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.telemetry.tracing.TracerFactory;
import org.opensearch.threadpool.ThreadPool;

import static org.opensearch.common.util.FeatureFlags.TELEMETRY;

/**
* It initializes the telemetry plugin and creates the {@link TracerFactory} and {@link MetricsRegistryFactory} factories.
* It provides the access to {@link Tracer} and {@link MetricsRegistry} to other code paths for instrumentations.
* @opensearch.experimental
*/
@ExperimentalApi
public class TelemetryService extends AbstractLifecycleComponent {

private TracerFactory tracerFactory = new NoopTracerFactory();
private MetricsRegistryFactory metricsRegistryFactory = new NoopMetricsRegistryFactory();

/**
* Construcs the {@link TelemetryService}
* @param settings settings
* @param clusterSettings cluster settings.
* @param pluginsService plugin service.
* @param threadPool thread pool.
*/
public TelemetryService(Settings settings, ClusterSettings clusterSettings, PluginsService pluginsService, ThreadPool threadPool) {
init(settings, clusterSettings, pluginsService, threadPool);
}

private void init(Settings settings, ClusterSettings clusterSettings, PluginsService pluginsService, ThreadPool threadPool) {
if (FeatureFlags.isEnabled(TELEMETRY)) {
final TelemetrySettings telemetrySettings = new TelemetrySettings(settings, clusterSettings);
if (telemetrySettings.isTracingFeatureEnabled() || telemetrySettings.isMetricsFeatureEnabled()) {
List<TelemetryPlugin> telemetryPlugins = pluginsService.filterPlugins(TelemetryPlugin.class);
TelemetryModule telemetryModule = new TelemetryModule(telemetryPlugins, telemetrySettings);
if (telemetrySettings.isTracingFeatureEnabled()) {
tracerFactory = new TracerFactory(telemetrySettings, telemetryModule.getTelemetry(), threadPool.getThreadContext());
}
if (telemetrySettings.isMetricsFeatureEnabled()) {
metricsRegistryFactory = new MetricsRegistryFactory(telemetrySettings, telemetryModule.getTelemetry());
}
}
}
}

/**
* Returns the tracer object. If tracer or telemetry plugin is not configured then returns {@link org.opensearch.telemetry.tracing.noop.NoopTracer}
* will be returned.
* @return tracer.
*/
public Tracer getTracer(){
return tracerFactory.getTracer();
}

/**
* Returns the {@link MetricsRegistry} object. If metrics registry or telemetry plugin is not configured then returns {@link MetricsRegistry}
* @return
*/
public MetricsRegistry getMetricsRegistry(){
return metricsRegistryFactory.getMetricsRegistry();
}

@Override
protected void doStart() {
}

@Override
protected void doStop() {
}

@Override
protected void doClose() throws IOException {
metricsRegistryFactory.getMetricsRegistry().close();
tracerFactory.getTracer().close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/**
* This package contains classes needed for telemetry.
*/
package org.opensearch.telemetry.service;

0 comments on commit d30fca6

Please sign in to comment.