Skip to content

Commit

Permalink
fix(sdk-metrics-base): combine concurrent observable callback invocat…
Browse files Browse the repository at this point in the history
…ions
  • Loading branch information
legendecas committed Feb 14, 2022
1 parent fd62fa4 commit 2a2d3ff
Show file tree
Hide file tree
Showing 16 changed files with 489 additions and 129 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ export interface Meter {

/**
* Creates a new `ObservableGauge` metric.
*
* The callback SHOULD not be called concurrently.
*
* @param name the name of the metric.
* @param callback the observable callback
* @param [options] the metric options.
Expand All @@ -92,6 +95,9 @@ export interface Meter {

/**
* Creates a new `ObservableCounter` metric.
*
* The callback SHOULD not be called concurrently.
*
* @param name the name of the metric.
* @param callback the observable callback
* @param [options] the metric options.
Expand All @@ -104,6 +110,9 @@ export interface Meter {

/**
* Creates a new `ObservableUpDownCounter` metric.
*
* The callback SHOULD not be called concurrently.
*
* @param name the name of the metric.
* @param callback the observable callback
* @param [options] the metric options.
Expand Down
71 changes: 11 additions & 60 deletions experimental/packages/opentelemetry-sdk-metrics-base/src/Meter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,34 +16,27 @@

import * as metrics from '@opentelemetry/api-metrics-wip';
import { InstrumentationLibrary } from '@opentelemetry/core';
import { createInstrumentDescriptor, InstrumentDescriptor, InstrumentType } from './InstrumentDescriptor';
import { createInstrumentDescriptor, InstrumentType } from './InstrumentDescriptor';
import { Counter, Histogram, UpDownCounter } from './Instruments';
import { MeterProviderSharedState } from './state/MeterProviderSharedState';
import { MultiMetricStorage } from './state/MultiWritableMetricStorage';
import { SyncMetricStorage } from './state/SyncMetricStorage';
import { MetricStorage } from './state/MetricStorage';
import { MetricData } from './export/MetricData';
import { isNotNullish } from './utils';
import { MetricCollectorHandle } from './state/MetricCollector';
import { HrTime } from '@opentelemetry/api';
import { AsyncMetricStorage } from './state/AsyncMetricStorage';
import { MeterSharedState } from './state/MeterSharedState';

/**
* This class implements the {@link metrics.Meter} interface.
*/
export class Meter implements metrics.Meter {
private _metricStorageRegistry = new Map<string, MetricStorage>();
private _meterSharedState: MeterSharedState;

constructor(private _meterProviderSharedState: MeterProviderSharedState, private _instrumentationLibrary: InstrumentationLibrary) {
this._meterProviderSharedState.meters.push(this);
constructor(meterProviderSharedState: MeterProviderSharedState, instrumentationLibrary: InstrumentationLibrary) {
this._meterSharedState = meterProviderSharedState.getMeterSharedState(instrumentationLibrary);
}

/**
* Create a {@link metrics.Histogram} instrument.
*/
createHistogram(name: string, options?: metrics.HistogramOptions): metrics.Histogram {
const descriptor = createInstrumentDescriptor(name, InstrumentType.HISTOGRAM, options);
const storage = this._registerMetricStorage(descriptor);
const storage = this._meterSharedState.registerMetricStorage(descriptor);
return new Histogram(storage, descriptor);
}

Expand All @@ -52,7 +45,7 @@ export class Meter implements metrics.Meter {
*/
createCounter(name: string, options?: metrics.CounterOptions): metrics.Counter {
const descriptor = createInstrumentDescriptor(name, InstrumentType.COUNTER, options);
const storage = this._registerMetricStorage(descriptor);
const storage = this._meterSharedState.registerMetricStorage(descriptor);
return new Counter(storage, descriptor);
}

Expand All @@ -61,7 +54,7 @@ export class Meter implements metrics.Meter {
*/
createUpDownCounter(name: string, options?: metrics.UpDownCounterOptions): metrics.UpDownCounter {
const descriptor = createInstrumentDescriptor(name, InstrumentType.UP_DOWN_COUNTER, options);
const storage = this._registerMetricStorage(descriptor);
const storage = this._meterSharedState.registerMetricStorage(descriptor);
return new UpDownCounter(storage, descriptor);
}

Expand All @@ -74,7 +67,7 @@ export class Meter implements metrics.Meter {
options?: metrics.ObservableGaugeOptions,
): void {
const descriptor = createInstrumentDescriptor(name, InstrumentType.OBSERVABLE_GAUGE, options);
this._registerAsyncMetricStorage(descriptor, callback);
this._meterSharedState.registerAsyncMetricStorage(descriptor, callback);
}

/**
Expand All @@ -86,7 +79,7 @@ export class Meter implements metrics.Meter {
options?: metrics.ObservableCounterOptions,
): void {
const descriptor = createInstrumentDescriptor(name, InstrumentType.OBSERVABLE_COUNTER, options);
this._registerAsyncMetricStorage(descriptor, callback);
this._meterSharedState.registerAsyncMetricStorage(descriptor, callback);
}

/**
Expand All @@ -98,48 +91,6 @@ export class Meter implements metrics.Meter {
options?: metrics.ObservableUpDownCounterOptions,
): void {
const descriptor = createInstrumentDescriptor(name, InstrumentType.OBSERVABLE_UP_DOWN_COUNTER, options);
this._registerAsyncMetricStorage(descriptor, callback);
}

private _registerMetricStorage(descriptor: InstrumentDescriptor) {
const views = this._meterProviderSharedState.viewRegistry.findViews(descriptor, this._instrumentationLibrary);
const storages = views.map(view => {
const storage = SyncMetricStorage.create(view, descriptor);
// TODO: handle conflicts
this._metricStorageRegistry.set(descriptor.name, storage);
return storage;
});
if (storages.length === 1) {
return storages[0];
}
return new MultiMetricStorage(storages);
}

private _registerAsyncMetricStorage(descriptor: InstrumentDescriptor, callback: metrics.ObservableCallback) {
const views = this._meterProviderSharedState.viewRegistry.findViews(descriptor, this._instrumentationLibrary);
views.forEach(view => {
const storage = AsyncMetricStorage.create(view, descriptor, callback);
// TODO: handle conflicts
this._metricStorageRegistry.set(descriptor.name, storage);
});
}

/**
* @internal
* @param collector opaque handle of {@link MetricCollector} which initiated the collection.
* @param collectionTime the HrTime at which the collection was initiated.
* @returns the list of {@link MetricData} collected.
*/
async collect(collector: MetricCollectorHandle, collectionTime: HrTime): Promise<MetricData[]> {
const result = await Promise.all(Array.from(this._metricStorageRegistry.values()).map(metricStorage => {
return metricStorage.collect(
collector,
this._meterProviderSharedState.metricCollectors,
this._meterProviderSharedState.resource,
this._instrumentationLibrary,
this._meterProviderSharedState.sdkStartTime,
collectionTime);
}));
return result.filter(isNotNullish);
this._meterSharedState.registerAsyncMetricStorage(descriptor, callback);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@
import { HrTime } from '@opentelemetry/api';
import { ObservableCallback } from '@opentelemetry/api-metrics-wip';
import { Accumulation, Aggregator } from '../aggregator/types';
import { View } from '../view/View';
import { createInstrumentDescriptorWithView, InstrumentDescriptor } from '../InstrumentDescriptor';
import { InstrumentDescriptor } from '../InstrumentDescriptor';
import { AttributesProcessor } from '../view/AttributesProcessor';
import { MetricStorage } from './MetricStorage';
import { InstrumentationLibrary } from '@opentelemetry/core';
Expand All @@ -28,8 +27,8 @@ import { DeltaMetricProcessor } from './DeltaMetricProcessor';
import { TemporalMetricProcessor } from './TemporalMetricProcessor';
import { Maybe } from '../utils';
import { MetricCollectorHandle } from './MetricCollector';
import { ObservableResult } from '../ObservableResult';
import { AttributeHashMap } from './HashMap';
import { ObservableRegistry } from './ObservableRegistry';

/**
* Internal interface.
Expand All @@ -44,7 +43,8 @@ export class AsyncMetricStorage<T extends Maybe<Accumulation>> implements Metric
private _instrumentDescriptor: InstrumentDescriptor,
aggregator: Aggregator<T>,
private _attributesProcessor: AttributesProcessor,
private _callback: ObservableCallback
private _callback: ObservableCallback,
private _observableRegistry: ObservableRegistry,
) {
this._deltaMetricStorage = new DeltaMetricProcessor(aggregator);
this._temporalMetricStorage = new TemporalMetricProcessor(aggregator);
Expand All @@ -65,17 +65,18 @@ export class AsyncMetricStorage<T extends Maybe<Accumulation>> implements Metric
* Note: This is a stateful operation and may reset any interval-related
* state for the MetricCollector.
*/
async collect(
collect(
collector: MetricCollectorHandle,
collectors: MetricCollectorHandle[],
resource: Resource,
instrumentationLibrary: InstrumentationLibrary,
sdkStartTime: HrTime,
collectionTime: HrTime,
): Promise<Maybe<MetricData>> {
const observableResult = new ObservableResult();
// TODO: timeout with callback
await this._callback(observableResult);
): Maybe<MetricData> {
const observableResult = this._observableRegistry.getObservableResult(this._callback);
if (observableResult == null) {
return;
}
this._record(observableResult.buffer);

const accumulations = this._deltaMetricStorage.collect();
Expand All @@ -91,10 +92,4 @@ export class AsyncMetricStorage<T extends Maybe<Accumulation>> implements Metric
collectionTime
);
}

static create(view: View, instrument: InstrumentDescriptor, callback: ObservableCallback): AsyncMetricStorage<Maybe<Accumulation>> {
instrument = createInstrumentDescriptorWithView(view, instrument);
const aggregator = view.aggregation.createAggregator(instrument);
return new AsyncMetricStorage(instrument, aggregator, view.attributesProcessor, callback);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
*/

import { HrTime } from '@opentelemetry/api';
import { hrTime } from '@opentelemetry/core';
import { hrTime, InstrumentationLibrary } from '@opentelemetry/core';
import { Resource } from '@opentelemetry/resources';
import { Meter } from '../Meter';
import { ViewRegistry } from '../view/ViewRegistry';
import { MeterSharedState } from './MeterSharedState';
import { MetricCollector } from './MetricCollector';

/**
Expand All @@ -30,7 +30,15 @@ export class MeterProviderSharedState {

metricCollectors: MetricCollector[] = [];

meters: Meter[] = [];
meterSharedStates: MeterSharedState[] = [];

constructor(public resource: Resource) {}

getMeterSharedState(instrumentationLibrary: InstrumentationLibrary) {
// TODO: meter identity
// https://github.com/open-telemetry/opentelemetry-specification/pull/2317
const meterSharedState = new MeterSharedState(this, instrumentationLibrary);
this.meterSharedStates.push(meterSharedState);
return meterSharedState;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { HrTime } from '@opentelemetry/api';
import * as metrics from '@opentelemetry/api-metrics-wip';
import { InstrumentationLibrary } from '@opentelemetry/core';
import { MetricData } from '..';
import { createInstrumentDescriptorWithView, InstrumentDescriptor } from '../InstrumentDescriptor';
import { isNotNullish, promiseFinally } from '../utils';
import { AsyncMetricStorage } from './AsyncMetricStorage';
import { MeterProviderSharedState } from './MeterProviderSharedState';
import { MetricCollectorHandle } from './MetricCollector';
import { MetricStorage } from './MetricStorage';
import { MultiMetricStorage } from './MultiWritableMetricStorage';
import { ObservableRegistry } from './ObservableRegistry';
import { SyncMetricStorage } from './SyncMetricStorage';

/**
* An internal record for shared meter provider states.
*/
export class MeterSharedState {
private _metricStorageRegistry = new Map<string, MetricStorage>();
private _pendingCollectPromise: Promise<MetricData[]> | null = null;
private _observableRegistry = new ObservableRegistry();

constructor(private _meterProviderSharedState: MeterProviderSharedState, private _instrumentationLibrary: InstrumentationLibrary) {}

registerMetricStorage(descriptor: InstrumentDescriptor) {
const views = this._meterProviderSharedState.viewRegistry.findViews(descriptor, this._instrumentationLibrary);
const storages = views.map(view => {
const viewDescriptor = createInstrumentDescriptorWithView(view, descriptor);
const aggregator = view.aggregation.createAggregator(viewDescriptor);
const storage = new SyncMetricStorage(viewDescriptor, aggregator, view.attributesProcessor);
// TODO: handle conflicts
this._metricStorageRegistry.set(viewDescriptor.name, storage);
return storage;
});
if (storages.length === 1) {
return storages[0];
}
return new MultiMetricStorage(storages);
}

registerAsyncMetricStorage(descriptor: InstrumentDescriptor, callback: metrics.ObservableCallback) {
const views = this._meterProviderSharedState.viewRegistry.findViews(descriptor, this._instrumentationLibrary);
views.forEach(view => {
const viewDescriptor = createInstrumentDescriptorWithView(view, descriptor);
const aggregator = view.aggregation.createAggregator(viewDescriptor);
const storage = new AsyncMetricStorage(viewDescriptor, aggregator, view.attributesProcessor, callback, this._observableRegistry);
this._observableRegistry.addCallback(callback, storage);
// TODO: handle conflicts
this._metricStorageRegistry.set(viewDescriptor.name, storage);
});
}

/**
* @param collector opaque handle of {@link MetricCollector} which initiated the collection.
* @param collectionTime the HrTime at which the collection was initiated.
* @returns the list of {@link MetricData} collected.
*/
collect(collector: MetricCollectorHandle, collectionTime: HrTime): Promise<MetricData[]> {
if (this._pendingCollectPromise != null) {
return this._pendingCollectPromise;
}

/**
* 1. Call all observable callbacks first.
* 2. Collect metric result for the collector.
*/
const promise = this._observableRegistry.observe()
.then(() => {
return Array.from(this._metricStorageRegistry.values()).map(metricStorage => {
return metricStorage.collect(
collector,
this._meterProviderSharedState.metricCollectors,
this._meterProviderSharedState.resource,
this._instrumentationLibrary,
this._meterProviderSharedState.sdkStartTime,
collectionTime);
})
.filter(isNotNullish);
});

this._pendingCollectPromise = promiseFinally(promise, () => {
this._pendingCollectPromise = null;
});
return this._pendingCollectPromise;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ export class MetricCollector implements MetricProducer {

async collect(): Promise<MetricData[]> {
const collectionTime = hrTime();
const results = await Promise.all(this._sharedState.meters
.map(meter => meter.collect(this, collectionTime)));
const results = await Promise.all(this._sharedState.meterSharedStates
.map(meterSharedState => meterSharedState.collect(this, collectionTime)));

return results.reduce((cumulation, current) => cumulation.concat(current), []);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,5 @@ export interface MetricStorage {
instrumentationLibrary: InstrumentationLibrary,
sdkStartTime: HrTime,
collectionTime: HrTime,
): Promise<Maybe<MetricData>>;
): Maybe<MetricData>;
}
Loading

0 comments on commit 2a2d3ff

Please sign in to comment.