Skip to content

Commit

Permalink
[Metrics SDK] Add Metrics ExemplarFilter and ExemplarReservoir (open-…
Browse files Browse the repository at this point in the history
  • Loading branch information
esigo authored and yxue committed Dec 5, 2022
1 parent 3667058 commit 70b2b9a
Show file tree
Hide file tree
Showing 26 changed files with 876 additions and 83 deletions.
84 changes: 84 additions & 0 deletions sdk/include/opentelemetry/sdk/metrics/data/exemplar_data.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

#pragma once
#ifndef ENABLE_METRICS_PREVIEW
# include <memory>
# include "opentelemetry/common/timestamp.h"
# include "opentelemetry/context/context.h"
# include "opentelemetry/sdk/common/attribute_utils.h"
# include "opentelemetry/sdk/metrics/data/metric_data.h"
# include "opentelemetry/sdk/metrics/export/metric_producer.h"

OPENTELEMETRY_BEGIN_NAMESPACE
namespace sdk
{
namespace metrics
{
using MetricAttributes = opentelemetry::sdk::common::OrderedAttributeMap;
/**
* A sample input measurement.
*
* Exemplars also hold information about the environment when the measurement was recorded, for
* example the span and trace ID of the active span when the exemplar was recorded.
*/
class ExemplarData
{
public:
static ExemplarData Create(std::shared_ptr<trace::SpanContext> context,
const opentelemetry::common::SystemTimestamp &timestamp,
const PointDataAttributes &point_data_attr)
{
return ExemplarData(context, timestamp, point_data_attr);
}

/**
* The set of key/value pairs that were filtered out by the aggregator, but recorded alongside
* the original measurement. Only key/value pairs that were filtered out by the aggregator
* should be included
*/
MetricAttributes GetFilteredAttributes() { return MetricAttributes{}; }

/** Returns the timestamp in nanos when measurement was collected. */
opentelemetry::common::SystemTimestamp GetEpochNanos() { return timestamp_; }

/**
* Returns the SpanContext associated with this exemplar. If the exemplar was not recorded
* inside a sampled trace, the Context will be invalid.
*/
const trace::SpanContext &GetSpanContext() const noexcept { return context_; }

static PointType CreateSumPointData(ValueType value)
{
SumPointData sum_point_data{};
sum_point_data.value_ = value;
return sum_point_data;
}

static PointType CreateLastValuePointData(ValueType value)
{
LastValuePointData last_value_point_data{};
last_value_point_data.value_ = value;
last_value_point_data.is_lastvalue_valid_ = true;
last_value_point_data.sample_ts_ = opentelemetry::common::SystemTimestamp{};
return last_value_point_data;
}

static PointType CreateDropPointData() { return DropPointData{}; }

private:
ExemplarData(std::shared_ptr<trace::SpanContext> context,
opentelemetry::common::SystemTimestamp timestamp,
const PointDataAttributes &point_data_attr)
: context_(*context.get()), timestamp_(timestamp), point_data_attr_(point_data_attr)
{}

trace::SpanContext context_;
opentelemetry::common::SystemTimestamp timestamp_;
PointDataAttributes point_data_attr_;
};

} // namespace metrics
} // namespace sdk
OPENTELEMETRY_END_NAMESPACE
#endif
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,6 @@ namespace metrics
class AlwaysSampleFilter final : public ExemplarFilter
{
public:
static nostd::shared_ptr<ExemplarFilter> GetAlwaysSampleFilter()
{
static nostd::shared_ptr<ExemplarFilter> alwaysSampleFilter{new AlwaysSampleFilter{}};
return alwaysSampleFilter;
}

bool ShouldSampleMeasurement(
long /* value */,
const MetricAttributes & /* attributes */,
Expand All @@ -36,7 +30,6 @@ class AlwaysSampleFilter final : public ExemplarFilter
return true;
}

private:
explicit AlwaysSampleFilter() = default;
};
} // namespace metrics
Expand Down
45 changes: 0 additions & 45 deletions sdk/include/opentelemetry/sdk/metrics/exemplar/data.h

This file was deleted.

4 changes: 4 additions & 0 deletions sdk/include/opentelemetry/sdk/metrics/exemplar/filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ class ExemplarFilter
const opentelemetry::context::Context &context) noexcept = 0;

virtual ~ExemplarFilter() = default;

static std::shared_ptr<ExemplarFilter> GetNeverSampleFilter() noexcept;
static std::shared_ptr<ExemplarFilter> GetAlwaysSampleFilter() noexcept;
static std::shared_ptr<ExemplarFilter> GetWithTraceSampleFilter() noexcept;
};

} // namespace metrics
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

#pragma once
#ifndef ENABLE_METRICS_PREVIEW
# include <memory>
# include <vector>
# include "opentelemetry/context/context.h"
# include "opentelemetry/nostd/shared_ptr.h"
# include "opentelemetry/sdk/common/attribute_utils.h"
# include "opentelemetry/sdk/metrics/exemplar/filter.h"
# include "opentelemetry/sdk/metrics/exemplar/reservoir.h"

OPENTELEMETRY_BEGIN_NAMESPACE
namespace sdk
{
namespace metrics
{
class FilteredExemplarReservoir final : public ExemplarReservoir
{

public:
FilteredExemplarReservoir(std::shared_ptr<ExemplarFilter> filter,
std::shared_ptr<ExemplarReservoir> reservoir)
: filter_(filter), reservoir_(reservoir)
{}

void OfferMeasurement(long value,
const MetricAttributes &attributes,
const opentelemetry::context::Context &context,
const opentelemetry::common::SystemTimestamp &timestamp) noexcept override
{
if (filter_->ShouldSampleMeasurement(value, attributes, context))
{
reservoir_->OfferMeasurement(value, attributes, context, timestamp);
}
}

void OfferMeasurement(double value,
const MetricAttributes &attributes,
const opentelemetry::context::Context &context,
const opentelemetry::common::SystemTimestamp &timestamp) noexcept override
{
if (filter_->ShouldSampleMeasurement(value, attributes, context))
{
reservoir_->OfferMeasurement(value, attributes, context, timestamp);
}
}

std::vector<std::shared_ptr<ExemplarData>> CollectAndReset(
const MetricAttributes &pointAttributes) noexcept override
{
return reservoir_->CollectAndReset(pointAttributes);
}

private:
explicit FilteredExemplarReservoir() = default;
std::shared_ptr<ExemplarFilter> filter_;
std::shared_ptr<ExemplarReservoir> reservoir_;
};

} // namespace metrics
} // namespace sdk
OPENTELEMETRY_END_NAMESPACE
#endif
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

#pragma once
#ifndef ENABLE_METRICS_PREVIEW
# include <memory>
# include <vector>
# include "opentelemetry/context/context.h"
# include "opentelemetry/nostd/function_ref.h"
# include "opentelemetry/nostd/shared_ptr.h"
# include "opentelemetry/sdk/common/attribute_utils.h"
# include "opentelemetry/sdk/metrics/exemplar/reservoir.h"
# include "opentelemetry/sdk/metrics/exemplar/reservoir_cell.h"
# include "opentelemetry/sdk/metrics/exemplar/reservoir_cell_selector.h"

OPENTELEMETRY_BEGIN_NAMESPACE
namespace sdk
{
namespace metrics
{

class FixedSizeExemplarReservoir : public ExemplarReservoir
{

public:
FixedSizeExemplarReservoir(size_t size,
std::shared_ptr<ReservoirCellSelector> reservoir_cell_selector,
std::shared_ptr<ExemplarData> (ReservoirCell::*map_and_reset_cell)(
const common::OrderedAttributeMap &attributes))
: storage_(size),
reservoir_cell_selector_(reservoir_cell_selector),
map_and_reset_cell_(map_and_reset_cell)
{}

void OfferMeasurement(long value,
const MetricAttributes &attributes,
const opentelemetry::context::Context &context,
const opentelemetry::common::SystemTimestamp &timestamp) noexcept override
{
if (!reservoir_cell_selector_)
{
return;
}
auto idx =
reservoir_cell_selector_->ReservoirCellIndexFor(storage_, value, attributes, context);
if (idx != -1)
{
storage_[idx].RecordDoubleMeasurement(value, attributes, context);
}
}

void OfferMeasurement(double value,
const MetricAttributes &attributes,
const opentelemetry::context::Context &context,
const opentelemetry::common::SystemTimestamp &timestamp) noexcept override
{
if (!reservoir_cell_selector_)
{
return;
}
auto idx =
reservoir_cell_selector_->ReservoirCellIndexFor(storage_, value, attributes, context);
if (idx != -1)
{
storage_[idx].RecordDoubleMeasurement(value, attributes, context);
}
}

std::vector<std::shared_ptr<ExemplarData>> CollectAndReset(
const MetricAttributes &pointAttributes) noexcept override
{
std::vector<std::shared_ptr<ExemplarData>> results;
if (!reservoir_cell_selector_)
{
return results;
}
if (!map_and_reset_cell_)
{
reservoir_cell_selector_.reset();
return results;
}
for (auto reservoirCell : storage_)
{
auto result = (reservoirCell.*(map_and_reset_cell_))(pointAttributes);
results.push_back(result);
}
reservoir_cell_selector_.reset();
return results;
}

private:
explicit FixedSizeExemplarReservoir() = default;
std::vector<ReservoirCell> storage_;
std::shared_ptr<ReservoirCellSelector> reservoir_cell_selector_;
std::shared_ptr<ExemplarData> (ReservoirCell::*map_and_reset_cell_)(
const common::OrderedAttributeMap &attributes);
};

} // namespace metrics
} // namespace sdk
OPENTELEMETRY_END_NAMESPACE
#endif
Loading

0 comments on commit 70b2b9a

Please sign in to comment.