Skip to content

Commit

Permalink
Bound registry work in progress
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Cristian Drutu <bogdandrutu@gmail.com>
  • Loading branch information
bogdandrutu committed Feb 14, 2020
1 parent 7e34851 commit d93ffa3
Show file tree
Hide file tree
Showing 15 changed files with 613 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ abstract class AbstractBoundInstrument implements BoundInstrument {
private final Aggregator aggregator;

AbstractBoundInstrument(Aggregator aggregator) {
this.aggregator = aggregator;
this.refCountMapped = new AtomicLong(0);
this.aggregator = aggregator;
}

/**
Expand Down Expand Up @@ -78,4 +78,8 @@ final void recordLong(long value) {
final void recordDouble(double value) {
aggregator.recordDouble(value);
}

final Aggregator getAggregator() {
return aggregator;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@
import io.opentelemetry.sdk.common.InstrumentationLibraryInfo;
import io.opentelemetry.sdk.metrics.common.InstrumentType;
import io.opentelemetry.sdk.metrics.common.InstrumentValueType;
import io.opentelemetry.sdk.metrics.view.Aggregations;
import io.opentelemetry.sdk.metrics.view.View;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;

abstract class AbstractInstrument implements Instrument {

Expand All @@ -32,6 +35,7 @@ abstract class AbstractInstrument implements Instrument {
private final List<String> labelKeys;
private final MeterSharedState meterSharedState;
private final InstrumentationLibraryInfo instrumentationLibraryInfo;
private final ViewManager viewManager;

// All arguments cannot be null because they are checked in the abstract builder classes.
AbstractInstrument(
Expand All @@ -51,6 +55,16 @@ abstract class AbstractInstrument implements Instrument {
this.labelKeys = labelKeys;
this.meterSharedState = meterSharedState;
this.instrumentationLibraryInfo = instrumentationLibraryInfo;
// TODO: Allow to install views from config instead of always installing the default View.
viewManager =
new ViewManager(
getDefaultView(name, description, constantLabels, instrumentType),
unit,
constantLabels,
instrumentType,
instrumentValueType,
meterSharedState,
instrumentationLibraryInfo);
}

final String getName() {
Expand Down Expand Up @@ -81,6 +95,10 @@ final InstrumentationLibraryInfo getInstrumentationLibraryInfo() {
return instrumentationLibraryInfo;
}

final ViewManager getViewManager() {
return viewManager;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down Expand Up @@ -120,4 +138,23 @@ static InstrumentType getMeasureInstrumentType(boolean absolute) {
static InstrumentType getObserverInstrumentType(boolean monotonic) {
return monotonic ? InstrumentType.OBSERVER_MONOTONIC : InstrumentType.OBSERVER_NON_MONOTONIC;
}

@Nullable
private static View getDefaultView(
String name,
String description,
Map<String, String> constantLabels,
InstrumentType instrumentType) {
switch (instrumentType) {
case COUNTER_MONOTONIC:
case COUNTER_NON_MONOTONIC:
return View.create(name, description, Aggregations.sum(), constantLabels.keySet());
case MEASURE_ABSOLUTE:
case MEASURE_NON_ABSOLUTE:
case OBSERVER_MONOTONIC:
case OBSERVER_NON_MONOTONIC:
// TODO: Add default views for every instrument type.
}
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Copyright 2020, OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.opentelemetry.sdk.metrics;

import io.opentelemetry.metrics.LabelSet;
import io.opentelemetry.sdk.common.InstrumentationLibraryInfo;
import io.opentelemetry.sdk.metrics.common.InstrumentType;
import io.opentelemetry.sdk.metrics.common.InstrumentValueType;
import io.opentelemetry.sdk.metrics.data.MetricData;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;

abstract class AbstractInstrumentWithBinding<B extends AbstractBoundInstrument>
extends AbstractInstrument {
private final ConcurrentHashMap<LabelSet, B> boundLabels;
private final ReentrantLock collectLock;
private long collectionStartTime;

AbstractInstrumentWithBinding(
String name,
String description,
String unit,
Map<String, String> constantLabels,
List<String> labelKeys,
InstrumentType instrumentType,
InstrumentValueType instrumentValueType,
MeterSharedState meterSharedState,
InstrumentationLibraryInfo instrumentationLibraryInfo) {
super(
name,
description,
unit,
constantLabels,
labelKeys,
instrumentType,
instrumentValueType,
meterSharedState,
instrumentationLibraryInfo);
boundLabels = new ConcurrentHashMap<>();
collectLock = new ReentrantLock();
collectionStartTime = getMeterSharedState().getClock().now();
}

// Cannot make this "bind" because of a Java problem if we make this class also implement the
// InstrumentWithBinding then the subclass will fail to compile because of different "bind"
// signature. This is a good trade-off.
final B bindInternal(LabelSet labelSet) {
B binding = boundLabels.get(labelSet);
if (binding != null && binding.bind()) {
// At this moment it is guaranteed that the Bound is in the map and will not be removed.
return binding;
}

// Missing entry or no longer mapped, try to add a new entry.
binding = newBinding();
while (true) {
B oldBound = boundLabels.putIfAbsent(labelSet, binding);
if (oldBound != null) {
if (oldBound.bind()) {
// At this moment it is guaranteed that the Bound is in the map and will not be removed.
return oldBound;
}
// Try to remove the oldBound. This will race with the collect method, but only one will
// succeed.
boundLabels.remove(labelSet, oldBound);
continue;
}
return binding;
}
}

/**
* Collects records from all the entries (labelSet, Bound) that changed since the last collect()
* call.
*/
final List<MetricData> collect() {
AggregatorMap aggregatorMap = getViewManager().startCollection(collectionStartTime);
collectionStartTime = getMeterSharedState().getClock().now();
collectLock.lock();
try {
for (Map.Entry<LabelSet, B> entry : boundLabels.entrySet()) {
if (entry.getValue().tryUnmap()) {
// If able to unmap then remove the record from the current Map. This can race with the
// acquire but because we requested a specific value only one will succeed.
boundLabels.remove(entry.getKey(), entry.getValue());
}
aggregatorMap.collect(entry.getKey(), entry.getValue().getAggregator());
}
} finally {
collectLock.unlock();
}
return aggregatorMap.stopCollection(collectionStartTime);
}

abstract B newBinding();
}
110 changes: 110 additions & 0 deletions sdk/src/main/java/io/opentelemetry/sdk/metrics/AggregatorMap.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Copyright 2020, OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.opentelemetry.sdk.metrics;

import io.opentelemetry.metrics.LabelSet;
import io.opentelemetry.sdk.common.InstrumentationLibraryInfo;
import io.opentelemetry.sdk.metrics.aggregator.Aggregator;
import io.opentelemetry.sdk.metrics.aggregator.AggregatorFactory;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.data.MetricData.Descriptor;
import io.opentelemetry.sdk.metrics.data.MetricData.Point;
import io.opentelemetry.sdk.resources.Resource;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

abstract class AggregatorMap {
static AggregatorMap getNoop() {
return Noop.INSTANCE;
}

static AggregatorMap getAllLabels(
Descriptor descriptor,
Resource resource,
InstrumentationLibraryInfo instrumentationLibraryInfo,
AggregatorFactory aggregatorFactory,
long startEpochNanos) {
return new AllLabels(
descriptor, resource, instrumentationLibraryInfo, aggregatorFactory, startEpochNanos);
}

abstract void collect(LabelSet labelSet, Aggregator aggregator);

abstract List<MetricData> stopCollection(long epochNanos);

private static final class Noop extends AggregatorMap {
private static final AggregatorMap INSTANCE = new Noop();

@Override
void collect(LabelSet labelSet, Aggregator aggregator) {
// Noop.
}

@Override
List<MetricData> stopCollection(long epochNanos) {
return Collections.emptyList();
}
}

private static final class AllLabels extends AggregatorMap {
private final Descriptor descriptor;
private final Resource resource;
private final InstrumentationLibraryInfo instrumentationLibraryInfo;
private final AggregatorFactory aggregatorFactory;
private final Map<Map<String, String>, Aggregator> aggregatorMap;
private final long startEpochNanos;

private AllLabels(
Descriptor descriptor,
Resource resource,
InstrumentationLibraryInfo instrumentationLibraryInfo,
AggregatorFactory aggregatorFactory,
long startEpochNanos) {
this.descriptor = descriptor;
this.resource = resource;
this.instrumentationLibraryInfo = instrumentationLibraryInfo;
this.aggregatorFactory = aggregatorFactory;
this.startEpochNanos = startEpochNanos;
this.aggregatorMap = new HashMap<>();
}

@Override
final void collect(LabelSet labelSet, Aggregator aggregator) {
// TODO: Add support to reduce labels.
Map<String, String> labels = ((LabelSetSdk) labelSet).getLabels();
Aggregator currentAggregator = aggregatorMap.get(labels);
if (currentAggregator == null) {
currentAggregator = aggregatorFactory.getAggregator();
aggregatorMap.put(labels, currentAggregator);
}
aggregator.mergeToAndReset(currentAggregator);
}

@Override
final List<MetricData> stopCollection(long epochNanos) {
List<Point> points = new ArrayList<>(aggregatorMap.size());
for (Map.Entry<Map<String, String>, Aggregator> entry : aggregatorMap.entrySet()) {
points.add(entry.getValue().toPoint(startEpochNanos, epochNanos, entry.getKey()));
}
return Collections.singletonList(
MetricData.create(descriptor, resource, instrumentationLibraryInfo, points));
}
}
}
Loading

0 comments on commit d93ffa3

Please sign in to comment.