Skip to content

Commit

Permalink
Batch observer (#1137)
Browse files Browse the repository at this point in the history
* chore: adding batch observer, some metrics refactoring

* chore: undo changes after testing

* chore: undo changes after testing

* chore: addressing comments

* chore: renaming observer into value observer, fixing few spotted issues

* chore: missing renamed for ValueObserver

* chore: removing unused class

* chore: cleanup

* chore: refactoring, renaming aggregators

* chore: refactoring observer to have base class that can be extended

* chore: changing aggregator for ValueObserver, exposing batcher so it can be used to override a default one

* chore: addressing comments

* chore: addressing comments

* chore: preventing user from updating observer after timeout or update

* chore: aligning aggregators for value observer and recorder with regards to last spec changes

* chore: fixing test

* chore: fixes after merge

* chore: changes after review

* chore: changes after review with some additional fixes around typing

* chore: changes after review

* chore: lint

* chore: reviews

* chore: typo
  • Loading branch information
obecny authored Jun 30, 2020
1 parent 5aa851a commit 9845b4f
Show file tree
Hide file tree
Showing 40 changed files with 1,274 additions and 693 deletions.
6 changes: 4 additions & 2 deletions examples/metrics/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ npm run start:observer

### Prometheus

1. In prometheus search for "metric_observer"
1. In prometheus search for "cpu_core_usage", "cpu_temp_per_app", "cpu_usage_per_app"

### Links

Expand All @@ -35,7 +35,9 @@ npm run start:observer

### Example

<p align="center"><img src="metrics/observer.png"/></p>
![Screenshot of the running example](metrics/observer.png)
![Screenshot of the running example](metrics/observer_batch.png)
![Screenshot of the running example](metrics/observer_batch2.png)

## Useful links

Expand Down
80 changes: 62 additions & 18 deletions examples/metrics/metrics/observer.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
'use strict';

const { MeterProvider, MetricObservable } = require('@opentelemetry/metrics');
const { MeterProvider } = require('@opentelemetry/metrics');
const { ConsoleLogger, LogLevel } = require('@opentelemetry/core');
const { PrometheusExporter } = require('@opentelemetry/exporter-prometheus');

const exporter = new PrometheusExporter(
Expand All @@ -19,25 +20,68 @@ const meter = new MeterProvider({
interval: 2000,
}).getMeter('example-observer');

const otelCpuUsage = meter.createObserver('metric_observer', {
monotonic: false,
description: 'Example of a observer',
meter.createValueObserver('cpu_core_usage', {
description: 'Example of a sync value observer with callback',
}, (observerResult) => { // this callback is called once per each interval
observerResult.observe(getRandomValue(), { core: '1' });
observerResult.observe(getRandomValue(), { core: '2' });
});

function getCpuUsage() {
return Math.random();
}
// no callback as they will be updated in batch observer
const tempMetric = meter.createValueObserver('cpu_temp_per_app', {
description: 'Example of sync value observer used with async batch observer',
});

const observable = new MetricObservable();
// no callback as they will be updated in batch observer
const cpuUsageMetric = meter.createValueObserver('cpu_usage_per_app', {
description: 'Example of sync value observer used with async batch observer',
});

setInterval(() => {
observable.next(getCpuUsage());
}, 5000);
meter.createBatchObserver('metric_batch_observer', (observerBatchResult) => {
Promise.all([
someAsyncMetrics(),
// simulate waiting
new Promise((resolve, reject) => {
setTimeout(resolve, 300);
}),
]).then(([apps, waiting]) => {
apps.forEach(app => {
observerBatchResult.observe({ app: app.name, core: '1' }, [
tempMetric.observation(app.core1.temp),
cpuUsageMetric.observation(app.core1.usage),
]);
observerBatchResult.observe({ app: app.name, core: '2' }, [
tempMetric.observation(app.core2.temp),
cpuUsageMetric.observation(app.core2.usage),
]);
});
});
}, {
maxTimeoutUpdateMS: 500,
logger: new ConsoleLogger(LogLevel.DEBUG)
},
);

otelCpuUsage.setCallback((observerResult) => {
observerResult.observe(getCpuUsage, { pid: process.pid, core: '1' });
observerResult.observe(getCpuUsage, { pid: process.pid, core: '2' });
observerResult.observe(getCpuUsage, { pid: process.pid, core: '3' });
observerResult.observe(getCpuUsage, { pid: process.pid, core: '4' });
observerResult.observe(observable, { pid: process.pid, core: '5' });
});
function someAsyncMetrics() {
return new Promise((resolve) => {
setTimeout(() => {
const stats = [
{
name: 'app1',
core1: { usage: getRandomValue(), temp: getRandomValue() * 100 },
core2: { usage: getRandomValue(), temp: getRandomValue() * 100 },
},
{
name: 'app2',
core1: { usage: getRandomValue(), temp: getRandomValue() * 100 },
core2: { usage: getRandomValue(), temp: getRandomValue() * 100 },
},
];
resolve(stats);
}, 200);
});
}

function getRandomValue() {
return Math.random();
}
Binary file modified examples/metrics/metrics/observer.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added examples/metrics/metrics/observer_batch.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added examples/metrics/metrics/observer_batch2.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
1 change: 1 addition & 0 deletions examples/metrics/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
"url": "https://github.com/open-telemetry/opentelemetry-js/issues"
},
"dependencies": {
"@opentelemetry/core": "^0.9.0",
"@opentelemetry/exporter-prometheus": "^0.9.0",
"@opentelemetry/metrics": "^0.9.0"
},
Expand Down
10 changes: 8 additions & 2 deletions examples/tracer-web/examples/xml-http-request/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,19 @@ providerWithZone.register({

const webTracerWithZone = providerWithZone.getTracer('example-tracer-web');

const getData = (url) => new Promise((resolve, _reject) => {
const getData = (url) => new Promise((resolve, reject) => {
// eslint-disable-next-line no-undef
const req = new XMLHttpRequest();
req.open('GET', url, true);
req.setRequestHeader('Content-Type', 'application/json');
req.setRequestHeader('Accept', 'application/json');
req.send();
req.onload = () => {
resolve();
};
req.onerror = () => {
reject();
};
req.send();
});

// example of keeping track of context between async operations
Expand All @@ -53,6 +56,9 @@ const prepareClickEvent = () => {
getData(url1).then((_data) => {
webTracerWithZone.getCurrentSpan().addEvent('fetching-span1-completed');
span1.end();
}, ()=> {
webTracerWithZone.getCurrentSpan().addEvent('fetching-error');
span1.end();
});
});
}
Expand Down
3 changes: 2 additions & 1 deletion packages/opentelemetry-api/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@ export * from './context/propagation/NoopHttpTextPropagator';
export * from './context/propagation/setter';
export * from './correlation_context/CorrelationContext';
export * from './correlation_context/EntryValue';
export * from './metrics/BatchObserverResult';
export * from './metrics/BoundInstrument';
export * from './metrics/Meter';
export * from './metrics/MeterProvider';
export * from './metrics/Metric';
export * from './metrics/MetricObservable';
export * from './metrics/NoopMeter';
export * from './metrics/NoopMeterProvider';
export * from './metrics/Observation';
export * from './metrics/ObserverResult';
export * from './trace/attributes';
export * from './trace/Event';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
export interface MetricObservable {
/**
* Sets the next value for observable metric
* @param value
*/
next: (value: number) => void;
/**
* Subscribes for every value change
* @param callback
*/
subscribe: (callback: (value: number) => void) => void;

import { Labels } from './Metric';
import { Observation } from './Observation';

/**
* Interface that is being used in callback function for Observer Metric
* for batch
*/
export interface BatchObserverResult {
/**
* Removes the subscriber
* @param [callback]
* Used to observe (update) observations for certain labels
* @param labels
* @param observations
*/
unsubscribe: (callback?: (value: number) => void) => void;
observe(labels: Labels, observations: Observation[]): void;
}
5 changes: 5 additions & 0 deletions packages/opentelemetry-api/src/metrics/BoundInstrument.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,8 @@ export interface BoundValueRecorder {
spanContext: SpanContext
): void;
}

/** An Instrument for Base Observer */
export interface BoundBaseObserver {
update(value: number): void;
}
28 changes: 25 additions & 3 deletions packages/opentelemetry-api/src/metrics/Meter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,17 @@
* limitations under the License.
*/

import { BatchObserverResult } from './BatchObserverResult';
import {
MetricOptions,
Counter,
ValueRecorder,
Observer,
ValueObserver,
BatchObserver,
BatchMetricOptions,
UpDownCounter,
} from './Metric';
import { ObserverResult } from './ObserverResult';

/**
* An interface to allow the recording metrics.
Expand Down Expand Up @@ -66,9 +70,27 @@ export interface Meter {
createUpDownCounter(name: string, options?: MetricOptions): UpDownCounter;

/**
* Creates a new `Observer` metric.
* Creates a new `ValueObserver` metric.
* @param name the name of the metric.
* @param [options] the metric options.
* @param [callback] the observer callback
*/
createObserver(name: string, options?: MetricOptions): Observer;
createValueObserver(
name: string,
options?: MetricOptions,
callback?: (observerResult: ObserverResult) => void
): ValueObserver;

/**
* Creates a new `BatchObserver` metric, can be used to update many metrics
* at the same time and when operations needs to be async
* @param name the name of the metric.
* @param callback the batch observer callback
* @param [options] the metric batch options.
*/
createBatchObserver(
name: string,
callback: (batchObserverResult: BatchObserverResult) => void,
options?: BatchMetricOptions
): BatchObserver;
}
41 changes: 31 additions & 10 deletions packages/opentelemetry-api/src/metrics/Metric.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,12 @@

import { CorrelationContext } from '../correlation_context/CorrelationContext';
import { SpanContext } from '../trace/span_context';
import { ObserverResult } from './ObserverResult';
import { BoundCounter, BoundValueRecorder } from './BoundInstrument';
import {
BoundBaseObserver,
BoundCounter,
BoundValueRecorder,
} from './BoundInstrument';
import { Logger } from '../common/Logger';

/**
* Options needed for metric creation
Expand Down Expand Up @@ -58,6 +62,18 @@ export interface MetricOptions {
* @default {@link ValueType.DOUBLE}
*/
valueType?: ValueType;

/**
* User provided logger.
*/
logger?: Logger;
}

export interface BatchMetricOptions extends MetricOptions {
/**
* Indicates how long the batch metric should wait to update before cancel
*/
maxTimeoutUpdateMS?: number;
}

/** The Type of value. It describes how the data is reported. */
Expand Down Expand Up @@ -148,16 +164,21 @@ export interface ValueRecorder extends UnboundMetric<BoundValueRecorder> {
}

/** Base interface for the Observer metrics. */
export interface Observer extends Metric {
/**
* Sets a callback where user can observe value for certain labels. The
* observers are called periodically to retrieve the value.
* @param callback a function that will be called once to set observers
* for values
*/
setCallback(callback: (observerResult: ObserverResult) => void): void;
export interface BaseObserver extends UnboundMetric<BoundBaseObserver> {
observation: (
value: number
) => {
value: number;
observer: BaseObserver;
};
}

/** Base interface for the Value Observer metrics. */
export type ValueObserver = BaseObserver;

/** Base interface for the Batch Observer metrics. */
export type BatchObserver = Metric;

/**
* key-value pairs passed by the user.
*/
Expand Down
Loading

0 comments on commit 9845b4f

Please sign in to comment.