Skip to content

Commit

Permalink
fix(sdk-metrics): prevent per-reader storages from keeping unreported…
Browse files Browse the repository at this point in the history
… accumulations in memory (#4163)
  • Loading branch information
pichlermarc authored Oct 10, 2023
1 parent c320c98 commit 4eb10f7
Show file tree
Hide file tree
Showing 10 changed files with 90 additions and 147 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ For experimental package changes, see the [experimental CHANGELOG](experimental/

* fix(sdk-trace-base): BatchSpanProcessor flushes when `maxExportBatchSize` is reached [#3958](https://github.com/open-telemetry/opentelemetry-js/pull/3958) @nordfjord
* fix(sdk-metrics): allow instrument names to contain '/' [#4155](https://github.com/open-telemetry/opentelemetry-js/pull/4155)
* fix(sdk-metrics): prevent per-reader storages from keeping unreported accumulations in memory [#4163](https://github.com/open-telemetry/opentelemetry-js/pull/4163) @pichlermarc
* fixes a memory leak which occurred when two or more `MetricReader` instances are registered to a `MeterProvider`
* fix(sdk-metrics): do not report empty scopes and metrics [#4135](https://github.com/open-telemetry/opentelemetry-js/pull/4135) @pichlermarc
* Instruments that were created, but did not have measurements will not be exported anymore
* Meters (Scopes) that were created, but did not have any instruments with measurements under them will not be exported anymore.
Expand Down
10 changes: 6 additions & 4 deletions packages/sdk-metrics/src/state/AsyncMetricStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,15 @@ export class AsyncMetricStorage<T extends Maybe<Accumulation>>
constructor(
_instrumentDescriptor: InstrumentDescriptor,
aggregator: Aggregator<T>,
private _attributesProcessor: AttributesProcessor
private _attributesProcessor: AttributesProcessor,
collectorHandles: MetricCollectorHandle[]
) {
super(_instrumentDescriptor);
this._deltaMetricStorage = new DeltaMetricProcessor(aggregator);
this._temporalMetricStorage = new TemporalMetricProcessor(aggregator);
this._temporalMetricStorage = new TemporalMetricProcessor(
aggregator,
collectorHandles
);
}

record(measurements: AttributeHashMap<number>, observationTime: HrTime) {
Expand All @@ -66,14 +70,12 @@ export class AsyncMetricStorage<T extends Maybe<Accumulation>>
*/
collect(
collector: MetricCollectorHandle,
collectors: MetricCollectorHandle[],
collectionTime: HrTime
): Maybe<MetricData> {
const accumulations = this._deltaMetricStorage.collect();

return this._temporalMetricStorage.buildMetrics(
collector,
collectors,
this._instrumentDescriptor,
accumulations,
collectionTime
Expand Down
15 changes: 7 additions & 8 deletions packages/sdk-metrics/src/state/MeterSharedState.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,7 @@ export class MeterSharedState {

const metricDataList = storages
.map(metricStorage => {
return metricStorage.collect(
collector,
this._meterProviderSharedState.metricCollectors,
collectionTime
);
return metricStorage.collect(collector, collectionTime);
})
.filter(isNotNullish);

Expand Down Expand Up @@ -145,7 +141,8 @@ export class MeterSharedState {
const viewStorage = new MetricStorageType(
viewDescriptor,
aggregator,
view.attributesProcessor
view.attributesProcessor,
this._meterProviderSharedState.metricCollectors
) as R;
this.metricStorageRegistry.register(viewStorage);
return viewStorage;
Expand All @@ -169,7 +166,8 @@ export class MeterSharedState {
const storage = new MetricStorageType(
descriptor,
aggregator,
AttributesProcessor.Noop()
AttributesProcessor.Noop(),
[collector]
) as R;
this.metricStorageRegistry.registerForCollector(collector, storage);
return storage;
Expand All @@ -191,6 +189,7 @@ interface MetricStorageConstructor {
new (
instrumentDescriptor: InstrumentDescriptor,
aggregator: Aggregator<Maybe<Accumulation>>,
attributesProcessor: AttributesProcessor
attributesProcessor: AttributesProcessor,
collectors: MetricCollectorHandle[]
): MetricStorage;
}
1 change: 0 additions & 1 deletion packages/sdk-metrics/src/state/MetricStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ export abstract class MetricStorage {
*/
abstract collect(
collector: MetricCollectorHandle,
collectors: MetricCollectorHandle[],
collectionTime: HrTime
): Maybe<MetricData>;

Expand Down
10 changes: 6 additions & 4 deletions packages/sdk-metrics/src/state/SyncMetricStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,15 @@ export class SyncMetricStorage<T extends Maybe<Accumulation>>
constructor(
instrumentDescriptor: InstrumentDescriptor,
aggregator: Aggregator<T>,
private _attributesProcessor: AttributesProcessor
private _attributesProcessor: AttributesProcessor,
collectorHandles: MetricCollectorHandle[]
) {
super(instrumentDescriptor);
this._deltaMetricStorage = new DeltaMetricProcessor(aggregator);
this._temporalMetricStorage = new TemporalMetricProcessor(aggregator);
this._temporalMetricStorage = new TemporalMetricProcessor(
aggregator,
collectorHandles
);
}

record(
Expand All @@ -66,14 +70,12 @@ export class SyncMetricStorage<T extends Maybe<Accumulation>>
*/
collect(
collector: MetricCollectorHandle,
collectors: MetricCollectorHandle[],
collectionTime: HrTime
): Maybe<MetricData> {
const accumulations = this._deltaMetricStorage.collect();

return this._temporalMetricStorage.buildMetrics(
collector,
collectors,
this._instrumentDescriptor,
accumulations,
collectionTime
Expand Down
26 changes: 15 additions & 11 deletions packages/sdk-metrics/src/state/TemporalMetricProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,14 @@ export class TemporalMetricProcessor<T extends Maybe<Accumulation>> {
LastReportedHistory<T>
>();

constructor(private _aggregator: Aggregator<T>) {}
constructor(
private _aggregator: Aggregator<T>,
collectorHandles: MetricCollectorHandle[]
) {
collectorHandles.forEach(handle => {
this._unreportedAccumulations.set(handle, []);
});
}

/**
* Builds the {@link MetricData} streams to report against a specific MetricCollector.
Expand All @@ -74,12 +81,11 @@ export class TemporalMetricProcessor<T extends Maybe<Accumulation>> {
*/
buildMetrics(
collector: MetricCollectorHandle,
collectors: MetricCollectorHandle[],
instrumentDescriptor: InstrumentDescriptor,
currentAccumulations: AttributeHashMap<T>,
collectionTime: HrTime
): Maybe<MetricData> {
this._stashAccumulations(collectors, currentAccumulations);
this._stashAccumulations(currentAccumulations);
const unreportedAccumulations =
this._getMergedUnreportedAccumulations(collector);

Expand Down Expand Up @@ -148,18 +154,16 @@ export class TemporalMetricProcessor<T extends Maybe<Accumulation>> {
);
}

private _stashAccumulations(
collectors: MetricCollectorHandle[],
currentAccumulation: AttributeHashMap<T>
) {
collectors.forEach(it => {
let stash = this._unreportedAccumulations.get(it);
private _stashAccumulations(currentAccumulation: AttributeHashMap<T>) {
const registeredCollectors = this._unreportedAccumulations.keys();
for (const collector of registeredCollectors) {
let stash = this._unreportedAccumulations.get(collector);
if (stash === undefined) {
stash = [];
this._unreportedAccumulations.set(it, stash);
this._unreportedAccumulations.set(collector, stash);
}
stash.push(currentAccumulation);
});
}
}

private _getMergedUnreportedAccumulations(collector: MetricCollectorHandle) {
Expand Down
Loading

0 comments on commit 4eb10f7

Please sign in to comment.