Skip to content

Commit

Permalink
[Metrics SDK] Performance improvement in measurement processing (#1993)
Browse files Browse the repository at this point in the history
  • Loading branch information
lalitb authored Mar 4, 2023
1 parent 649829f commit da333f8
Show file tree
Hide file tree
Showing 12 changed files with 300 additions and 65 deletions.
42 changes: 33 additions & 9 deletions sdk/include/opentelemetry/sdk/common/attributemap_hash.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

#pragma once

#include <iostream>
#include <string>
#include "opentelemetry/sdk/common/attribute_utils.h"

Expand All @@ -14,7 +13,7 @@ namespace common
{

template <class T>
inline void GetHashForAttributeValue(size_t &seed, const T arg)
inline void GetHash(size_t &seed, const T &arg)
{
std::hash<T> hasher;
// reference -
Expand All @@ -23,11 +22,11 @@ inline void GetHashForAttributeValue(size_t &seed, const T arg)
}

template <class T>
inline void GetHashForAttributeValue(size_t &seed, const std::vector<T> &arg)
inline void GetHash(size_t &seed, const std::vector<T> &arg)
{
for (auto v : arg)
{
GetHashForAttributeValue<T>(seed, v);
GetHash<T>(seed, v);
}
}

Expand All @@ -37,7 +36,7 @@ struct GetHashForAttributeValueVisitor
template <class T>
void operator()(T &v)
{
GetHashForAttributeValue(seed_, v);
GetHash(seed_, v);
}
size_t &seed_;
};
Expand All @@ -48,15 +47,40 @@ inline size_t GetHashForAttributeMap(const OrderedAttributeMap &attribute_map)
size_t seed = 0UL;
for (auto &kv : attribute_map)
{
std::hash<std::string> hasher;
// reference -
// https://www.boost.org/doc/libs/1_37_0/doc/html/hash/reference.html#boost.hash_combine
seed ^= hasher(kv.first) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
GetHash(seed, kv.first);
nostd::visit(GetHashForAttributeValueVisitor(seed), kv.second);
}
return seed;
}

// Calculate hash of keys and values of KeyValueIterable, filtered using callback.
inline size_t GetHashForAttributeMap(
const opentelemetry::common::KeyValueIterable &attributes,
nostd::function_ref<bool(nostd::string_view)> is_key_present_callback)
{
AttributeConverter converter;
size_t seed = 0UL;
attributes.ForEachKeyValue(
[&](nostd::string_view key, opentelemetry::common::AttributeValue value) noexcept {
if (!is_key_present_callback(key))
{
return true;
}
GetHash(seed, key.data());
auto attr_val = nostd::visit(converter, value);
nostd::visit(GetHashForAttributeValueVisitor(seed), attr_val);
return true;
});
return seed;
}

template <class T>
inline size_t GetHash(T value)
{
std::hash<T> hasher;
return hasher(value);
}

} // namespace common
} // namespace sdk
OPENTELEMETRY_END_NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -47,22 +47,24 @@ class AsyncMetricStorage : public MetricStorage, public AsyncWritableMetricStora
{
auto aggr = DefaultAggregation::CreateAggregation(aggregation_type_, instrument_descriptor_);
aggr->Aggregate(measurement.second);
auto prev = cumulative_hash_map_->Get(measurement.first);
auto hash = opentelemetry::sdk::common::GetHashForAttributeMap(measurement.first);
auto prev = cumulative_hash_map_->Get(hash);
if (prev)
{
auto delta = prev->Diff(*aggr);
// store received value in cumulative map, and the diff in delta map (to pass it to temporal
// storage)
cumulative_hash_map_->Set(measurement.first, std::move(aggr));
delta_hash_map_->Set(measurement.first, std::move(delta));
cumulative_hash_map_->Set(measurement.first, std::move(aggr), hash);
delta_hash_map_->Set(measurement.first, std::move(delta), hash);
}
else
{
// store received value in cumulative and delta map.
cumulative_hash_map_->Set(
measurement.first,
DefaultAggregation::CloneAggregation(aggregation_type_, instrument_descriptor_, *aggr));
delta_hash_map_->Set(measurement.first, std::move(aggr));
DefaultAggregation::CloneAggregation(aggregation_type_, instrument_descriptor_, *aggr),
hash);
delta_hash_map_->Set(measurement.first, std::move(aggr), hash);
}
}
}
Expand Down
89 changes: 72 additions & 17 deletions sdk/include/opentelemetry/sdk/metrics/state/attributes_hashmap.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include "opentelemetry/sdk/common/attributemap_hash.h"
#include "opentelemetry/sdk/metrics/aggregation/aggregation.h"
#include "opentelemetry/sdk/metrics/instruments.h"
#include "opentelemetry/sdk/metrics/view/attributes_processor.h"
#include "opentelemetry/version.h"

#include <functional>
Expand All @@ -19,6 +20,7 @@ namespace sdk
{
namespace metrics
{

using opentelemetry::sdk::common::OrderedAttributeMap;

class AttributeHashGenerator
Expand All @@ -33,12 +35,12 @@ class AttributeHashGenerator
class AttributesHashMap
{
public:
Aggregation *Get(const MetricAttributes &attributes) const
Aggregation *Get(size_t hash) const
{
auto it = hash_map_.find(attributes);
auto it = hash_map_.find(hash);
if (it != hash_map_.end())
{
return it->second.get();
return it->second.second.get();
}
return nullptr;
}
Expand All @@ -47,35 +49,89 @@ class AttributesHashMap
* @return check if key is present in hash
*
*/
bool Has(const MetricAttributes &attributes) const
{
return (hash_map_.find(attributes) == hash_map_.end()) ? false : true;
}
bool Has(size_t hash) const { return hash_map_.find(hash) != hash_map_.end(); }

/**
* @return the pointer to value for given key if present.
* If not present, it uses the provided callback to generate
* value and store in the hash
*/
Aggregation *GetOrSetDefault(const opentelemetry::common::KeyValueIterable &attributes,
std::function<std::unique_ptr<Aggregation>()> aggregation_callback,
size_t hash)
{
auto it = hash_map_.find(hash);
if (it != hash_map_.end())
{
return it->second.second.get();
}

MetricAttributes attr{attributes};

hash_map_[hash] = {attr, aggregation_callback()};
return hash_map_[hash].second.get();
}

Aggregation *GetOrSetDefault(std::function<std::unique_ptr<Aggregation>()> aggregation_callback,
size_t hash)
{
auto it = hash_map_.find(hash);
if (it != hash_map_.end())
{
return it->second.second.get();
}
MetricAttributes attr{};
hash_map_[hash] = {attr, aggregation_callback()};
return hash_map_[hash].second.get();
}

Aggregation *GetOrSetDefault(const MetricAttributes &attributes,
std::function<std::unique_ptr<Aggregation>()> aggregation_callback)
std::function<std::unique_ptr<Aggregation>()> aggregation_callback,
size_t hash)
{
auto it = hash_map_.find(attributes);
auto it = hash_map_.find(hash);
if (it != hash_map_.end())
{
return it->second.get();
return it->second.second.get();
}

hash_map_[attributes] = aggregation_callback();
return hash_map_[attributes].get();
MetricAttributes attr{attributes};

hash_map_[hash] = {attr, aggregation_callback()};
return hash_map_[hash].second.get();
}

/**
* Set the value for given key, overwriting the value if already present
*/
void Set(const MetricAttributes &attributes, std::unique_ptr<Aggregation> value)
void Set(const opentelemetry::common::KeyValueIterable &attributes,
std::unique_ptr<Aggregation> aggr,
size_t hash)
{
hash_map_[attributes] = std::move(value);
auto it = hash_map_.find(hash);
if (it != hash_map_.end())
{
it->second.second = std::move(aggr);
}
else
{
MetricAttributes attr{attributes};
hash_map_[hash] = {attr, std::move(aggr)};
}
}

void Set(const MetricAttributes &attributes, std::unique_ptr<Aggregation> aggr, size_t hash)
{
auto it = hash_map_.find(hash);
if (it != hash_map_.end())
{
it->second.second = std::move(aggr);
}
else
{
MetricAttributes attr{attributes};
hash_map_[hash] = {attr, std::move(aggr)};
}
}

/**
Expand All @@ -86,7 +142,7 @@ class AttributesHashMap
{
for (auto &kv : hash_map_)
{
if (!callback(kv.first, *(kv.second.get())))
if (!callback(kv.second.first, *(kv.second.second.get())))
{
return false; // callback is not prepared to consume data
}
Expand All @@ -100,8 +156,7 @@ class AttributesHashMap
size_t Size() { return hash_map_.size(); }

private:
std::unordered_map<MetricAttributes, std::unique_ptr<Aggregation>, AttributeHashGenerator>
hash_map_;
std::unordered_map<size_t, std::pair<MetricAttributes, std::unique_ptr<Aggregation>>> hash_map_;
};
} // namespace metrics

Expand Down
43 changes: 33 additions & 10 deletions sdk/include/opentelemetry/sdk/metrics/state/sync_metric_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,11 @@ class SyncMetricStorage : public MetricStorage, public SyncWritableMetricStorage
const AggregationConfig *aggregation_config)
: instrument_descriptor_(instrument_descriptor),
attributes_hashmap_(new AttributesHashMap()),
attributes_processor_{attributes_processor},
attributes_processor_(attributes_processor),
#ifdef ENABLE_METRICS_EXEMPLAR_PREVIEW
exemplar_reservoir_(exemplar_reservoir),
#endif
temporal_metric_storage_(instrument_descriptor, aggregation_type, aggregation_config)

{
create_default_aggregation_ = [&, aggregation_type,
aggregation_config]() -> std::unique_ptr<Aggregation> {
Expand All @@ -60,8 +59,9 @@ class SyncMetricStorage : public MetricStorage, public SyncWritableMetricStorage
#ifdef ENABLE_METRICS_EXEMPLAR_PREVIEW
exemplar_reservoir_->OfferMeasurement(value, {}, context, std::chrono::system_clock::now());
#endif
static size_t hash = opentelemetry::sdk::common::GetHash("");
std::lock_guard<opentelemetry::common::SpinLockMutex> guard(attribute_hashmap_lock_);
attributes_hashmap_->GetOrSetDefault({}, create_default_aggregation_)->Aggregate(value);
attributes_hashmap_->GetOrSetDefault(create_default_aggregation_, hash)->Aggregate(value);
}

void RecordLong(int64_t value,
Expand All @@ -77,9 +77,21 @@ class SyncMetricStorage : public MetricStorage, public SyncWritableMetricStorage
exemplar_reservoir_->OfferMeasurement(value, attributes, context,
std::chrono::system_clock::now());
#endif
auto attr = attributes_processor_->process(attributes);
auto hash = opentelemetry::sdk::common::GetHashForAttributeMap(
attributes, [this](nostd::string_view key) {
if (attributes_processor_)
{
return attributes_processor_->isPresent(key);
}
else
{
return true;
}
});

std::lock_guard<opentelemetry::common::SpinLockMutex> guard(attribute_hashmap_lock_);
attributes_hashmap_->GetOrSetDefault(attr, create_default_aggregation_)->Aggregate(value);
attributes_hashmap_->GetOrSetDefault(attributes, create_default_aggregation_, hash)
->Aggregate(value);
}

void RecordDouble(double value,
Expand All @@ -93,8 +105,9 @@ class SyncMetricStorage : public MetricStorage, public SyncWritableMetricStorage
#ifdef ENABLE_METRICS_EXEMPLAR_PREVIEW
exemplar_reservoir_->OfferMeasurement(value, {}, context, std::chrono::system_clock::now());
#endif
static size_t hash = opentelemetry::sdk::common::GetHash("");
std::lock_guard<opentelemetry::common::SpinLockMutex> guard(attribute_hashmap_lock_);
attributes_hashmap_->GetOrSetDefault({}, create_default_aggregation_)->Aggregate(value);
attributes_hashmap_->GetOrSetDefault(create_default_aggregation_, hash)->Aggregate(value);
}

void RecordDouble(double value,
Expand All @@ -114,9 +127,20 @@ class SyncMetricStorage : public MetricStorage, public SyncWritableMetricStorage
exemplar_reservoir_->OfferMeasurement(value, attributes, context,
std::chrono::system_clock::now());
#endif
auto attr = attributes_processor_->process(attributes);
auto hash = opentelemetry::sdk::common::GetHashForAttributeMap(
attributes, [this](nostd::string_view key) {
if (attributes_processor_)
{
return attributes_processor_->isPresent(key);
}
else
{
return true;
}
});
std::lock_guard<opentelemetry::common::SpinLockMutex> guard(attribute_hashmap_lock_);
attributes_hashmap_->GetOrSetDefault(attr, create_default_aggregation_)->Aggregate(value);
attributes_hashmap_->GetOrSetDefault(attributes, create_default_aggregation_, hash)
->Aggregate(value);
}

bool Collect(CollectorHandle *collector,
Expand All @@ -127,16 +151,15 @@ class SyncMetricStorage : public MetricStorage, public SyncWritableMetricStorage

private:
InstrumentDescriptor instrument_descriptor_;

// hashmap to maintain the metrics for delta collection (i.e, collection since last Collect call)
std::unique_ptr<AttributesHashMap> attributes_hashmap_;
// unreported metrics stash for all the collectors
std::unordered_map<CollectorHandle *, std::list<std::shared_ptr<AttributesHashMap>>>
unreported_metrics_;
// last reported metrics stash for all the collectors.
std::unordered_map<CollectorHandle *, LastReportedMetrics> last_reported_metrics_;
const AttributesProcessor *attributes_processor_;
std::function<std::unique_ptr<Aggregation>()> create_default_aggregation_;
const AttributesProcessor *attributes_processor_;
#ifdef ENABLE_METRICS_EXEMPLAR_PREVIEW
nostd::shared_ptr<ExemplarReservoir> exemplar_reservoir_;
#endif
Expand Down
Loading

0 comments on commit da333f8

Please sign in to comment.