Skip to content

Commit

Permalink
feat(sdk-metrics): implement MetricProducer specification
Browse files Browse the repository at this point in the history
  • Loading branch information
aabmass committed Jul 19, 2023
1 parent 87f21ef commit 0271b33
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 16 deletions.
48 changes: 41 additions & 7 deletions packages/sdk-metrics/src/export/MetricReader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ export interface MetricReaderOptions {
* not configured, cumulative is used for all instruments.
*/
aggregationTemporalitySelector?: AggregationTemporalitySelector;
/**
* Additional MetricProducers to use as a source of aggregated metric data in addition to the
* SDK's metric data.
*/
metricProducers?: MetricProducer[];
}

/**
Expand All @@ -55,8 +60,10 @@ export abstract class MetricReader {
// Tracks the shutdown state.
// TODO: use BindOncePromise here once a new version of @opentelemetry/core is available.
private _shutdown = false;
// MetricProducer used by this instance.
private _metricProducer?: MetricProducer;
// Additional MetricProducers which will be combined with the SDK's output
private _metricProducers: MetricProducer[];
// MetricProducer used by this instance which produces metrics from the SDK
private _sdkMetricProducer?: MetricProducer;
private readonly _aggregationTemporalitySelector: AggregationTemporalitySelector;
private readonly _aggregationSelector: AggregationSelector;

Expand All @@ -66,6 +73,7 @@ export abstract class MetricReader {
this._aggregationTemporalitySelector =
options?.aggregationTemporalitySelector ??
DEFAULT_AGGREGATION_TEMPORALITY_SELECTOR;
this._metricProducers = options?.metricProducers ?? [];
}

/**
Expand All @@ -74,12 +82,12 @@ export abstract class MetricReader {
* @param metricProducer
*/
setMetricProducer(metricProducer: MetricProducer) {
if (this._metricProducer) {
if (this._sdkMetricProducer) {
throw new Error(
'MetricReader can not be bound to a MeterProvider again.'
);
}
this._metricProducer = metricProducer;
this._sdkMetricProducer = metricProducer;
this.onInitialized();
}

Expand Down Expand Up @@ -130,7 +138,7 @@ export abstract class MetricReader {
* Collect all metrics from the associated {@link MetricProducer}
*/
async collect(options?: CollectionOptions): Promise<CollectionResult> {
if (this._metricProducer === undefined) {
if (this._sdkMetricProducer === undefined) {
throw new Error('MetricReader is not bound to a MetricProducer');
}

Expand All @@ -139,9 +147,35 @@ export abstract class MetricReader {
throw new Error('MetricReader is shutdown');
}

return this._metricProducer.collect({
const collectionOptions = {
timeoutMillis: options?.timeoutMillis,
});
};
const [sdkCollectionResults, ...additionalCollectionResults] =
await Promise.all([
this._sdkMetricProducer.collect(collectionOptions),
...this._metricProducers.map(producer =>
producer.collect(collectionOptions)
),
]);

// Merge the results, keeping the SDK's Resource
const errors = sdkCollectionResults.errors.concat(
additionalCollectionResults.flatMap(result => result.errors)
);
const resource = sdkCollectionResults.resourceMetrics.resource;
const scopeMetrics =
sdkCollectionResults.resourceMetrics.scopeMetrics.concat(
additionalCollectionResults.flatMap(
result => result.resourceMetrics.scopeMetrics
)
);
return {
resourceMetrics: {
resource,
scopeMetrics,
},
errors,
};
}

/**
Expand Down
84 changes: 79 additions & 5 deletions packages/sdk-metrics/test/export/MetricReader.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,12 @@ import { MeterProvider } from '../../src/MeterProvider';
import { assertRejects } from '../test-utils';
import { emptyResourceMetrics, TestMetricProducer } from './TestMetricProducer';
import { TestMetricReader } from './TestMetricReader';
import { Aggregation, AggregationTemporality } from '../../src';
import {
Aggregation,
AggregationTemporality,
DataPointType,
InstrumentType,
} from '../../src';
import {
DEFAULT_AGGREGATION_SELECTOR,
DEFAULT_AGGREGATION_TEMPORALITY_SELECTOR,
Expand All @@ -29,6 +34,8 @@ import {
assertAggregationSelector,
assertAggregationTemporalitySelector,
} from './utils';
import { defaultResource } from '../util';
import { ValueType } from '@opentelemetry/api';

describe('MetricReader', () => {
describe('setMetricProducer', () => {
Expand Down Expand Up @@ -83,20 +90,87 @@ describe('MetricReader', () => {
assertRejects(reader.collect(), /MetricReader is shutdown/);
});

it('should call MetricProduce.collect with timeout', async () => {
it('should call MetricProducer.collect with timeout', async () => {
const reader = new TestMetricReader();
const producer = new TestMetricProducer();
reader.setMetricProducer(producer);

const collectStub = sinon.stub(producer, 'collect');
const collectSpy = sinon.spy(producer, 'collect');

await reader.collect({ timeoutMillis: 20 });
assert(collectStub.calledOnce);
const args = collectStub.args[0];
assert(collectSpy.calledOnce);
const args = collectSpy.args[0];
assert.deepStrictEqual(args, [{ timeoutMillis: 20 }]);

await reader.shutdown();
});

it('should collect metrics from the SDK and the additional metricProducers from config', async () => {
const meterProvider = new MeterProvider();
const additionalProducer = new TestMetricProducer({
resource: defaultResource,
scopeMetrics: [
{
scope: {
name: 'additionalMetricProducerMetrics',
},
metrics: [
{
aggregationTemporality: AggregationTemporality.CUMULATIVE,
dataPointType: DataPointType.SUM,
dataPoints: [
{
attributes: {},
value: 1,
startTime: [0, 0],
endTime: [1, 0],
},
],
descriptor: {
name: 'additionalCounter',
unit: '',
type: InstrumentType.COUNTER,
description: '',
valueType: ValueType.INT,
},
isMonotonic: true,
},
],
},
],
});
const reader = new TestMetricReader({
metricProducers: [additionalProducer],
});
meterProvider.addMetricReader(reader);

// Make a measurement
meterProvider
.getMeter('someSdkMetrics')
.createCounter('sdkCounter')
.add(5, { hello: 'world' });
const collectionResult = await reader.collect();

assert.strictEqual(collectionResult.errors.length, 0);
assert.strictEqual(
collectionResult.resourceMetrics.scopeMetrics.length,
2
);
const [sdkScopeMetrics, additionalScopeMetrics] =
collectionResult.resourceMetrics.scopeMetrics;
assert.strictEqual(sdkScopeMetrics.scope.name, 'someSdkMetrics');
assert.strictEqual(sdkScopeMetrics.metrics.length, 1);
assert.strictEqual(
sdkScopeMetrics.metrics[0].descriptor.name,
'sdkCounter'
);
assert.strictEqual(
additionalScopeMetrics.scope.name,
'additionalMetricProducerMetrics'
);

await reader.shutdown();
});
});

describe('selectAggregation', () => {
Expand Down
11 changes: 8 additions & 3 deletions packages/sdk-metrics/test/export/TestMetricProducer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

import { CollectionResult } from '../../src/export/MetricData';
import { CollectionResult, ResourceMetrics } from '../../src/export/MetricData';
import { MetricProducer } from '../../src/export/MetricProducer';
import { defaultResource } from '../util';

Expand All @@ -24,10 +24,15 @@ export const emptyResourceMetrics = {
};

export class TestMetricProducer implements MetricProducer {
constructor(
private resourceMetrics: ResourceMetrics = emptyResourceMetrics,
private errors: unknown[] = []
) {}

async collect(): Promise<CollectionResult> {
return {
resourceMetrics: { resource: defaultResource, scopeMetrics: [] },
errors: [],
resourceMetrics: this.resourceMetrics,
errors: this.errors,
};
}
}
2 changes: 1 addition & 1 deletion packages/sdk-metrics/test/export/TestMetricReader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ export class TestMetricReader extends MetricReader {
}

getMetricCollector(): MetricCollector {
return this['_metricProducer'] as MetricCollector;
return this['_sdkMetricProducer'] as MetricCollector;
}
}

Expand Down

0 comments on commit 0271b33

Please sign in to comment.