Skip to content

Commit

Permalink
chore(data model): Wrap metric tag mapping with Arc
Browse files Browse the repository at this point in the history
The tag key:value mappings in a metric is the most expensive data structure in
that type. This change wraps that data in a reference counted allocation, under
the assumption that it will change little once it is initially created though
the surrounding metric may be cloned repeatedly.
  • Loading branch information
bruceg committed Dec 22, 2022
1 parent b9626dd commit 5b2ab6a
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 30 deletions.
23 changes: 23 additions & 0 deletions lib/vector-config/src/stdlib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::{
NonZeroU8, NonZeroUsize,
},
path::PathBuf,
sync::Arc,
time::Duration,
};

Expand Down Expand Up @@ -302,3 +303,25 @@ impl Configurable for Duration {
))
}
}

impl<T: Configurable> Configurable for Arc<T> {
fn is_optional() -> bool {
T::is_optional()
}

fn referenceable_name() -> Option<&'static str> {
T::referenceable_name()
}

fn metadata() -> Metadata<Self> {
todo!("FIXME T::metadata().into()")
}

fn generate_schema(gen: &mut SchemaGenerator) -> Result<SchemaObject, GenerateError> {
T::generate_schema(gen)
}

fn validate_metadata(_metadata: &Metadata<Self>) -> Result<(), GenerateError> {
todo!("FIXME T::validate_metadata(metadata)")
}
}
14 changes: 7 additions & 7 deletions lib/vector-core/src/event/lua/metric.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::BTreeMap;
use std::{collections::BTreeMap, sync::Arc};

use mlua::prelude::*;

Expand Down Expand Up @@ -101,15 +101,15 @@ impl<'a> FromLua<'a> for TagValueSet {

impl<'a> FromLua<'a> for MetricTags {
fn from_lua(value: LuaValue<'a>, lua: &'a Lua) -> LuaResult<Self> {
Ok(Self(BTreeMap::from_lua(value, lua)?))
Ok(Self(Arc::new(BTreeMap::from_lua(value, lua)?)))
}
}

impl<'a> ToLua<'a> for LuaMetricTags {
fn to_lua(self, lua: &'a Lua) -> LuaResult<LuaValue> {
if self.multi_value_tags {
Ok(LuaValue::Table(lua.create_table_from(
self.tags.0.into_iter().map(|(key, value)| {
self.tags.into_inner().into_iter().map(|(key, value)| {
let value: Vec<_> = value
.into_iter()
.filter_map(|tag_value| tag_value.into_option().to_lua(lua).ok())
Expand Down Expand Up @@ -434,13 +434,13 @@ mod test {
MetricKind::Incremental,
MetricValue::Counter { value: 1.0 },
)
.with_tags(Some(MetricTags(BTreeMap::from([(
.with_tags(Some(MetricTags(Arc::new(BTreeMap::from([(
"example tag".to_string(),
TagValueSet::from(vec![
TagValue::from("a".to_string()),
TagValue::from("b".to_string()),
]),
)]))));
)])))));

assert_metric(
metric,
Expand Down Expand Up @@ -674,13 +674,13 @@ mod test {
MetricValue::Counter { value: 1.0 },
)
.with_namespace(Some("example_namespace"))
.with_tags(Some(MetricTags(BTreeMap::from([(
.with_tags(Some(MetricTags(Arc::new(BTreeMap::from([(
"example tag".to_string(),
TagValueSet::from(vec![
TagValue::from("a".to_string()),
TagValue::from("b".to_string()),
]),
)]))))
)])))))
.with_timestamp(Some(Utc.ymd(2018, 11, 14).and_hms(8, 9, 10)));
assert_event_data_eq!(Lua::new().load(value).eval::<Metric>().unwrap(), expected);
}
Expand Down
55 changes: 35 additions & 20 deletions lib/vector-core/src/event/metric/tags.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
#[cfg(test)]
use std::borrow::Borrow;

use std::borrow::Cow;
use std::collections::{hash_map::DefaultHasher, BTreeMap};
use std::fmt::Display;
use std::hash::{Hash, Hasher};
use std::sync::Arc;
use std::{cmp::Ordering, mem};

use indexmap::IndexSet;
Expand Down Expand Up @@ -436,7 +436,7 @@ impl Serialize for TagValueSet {
#[configurable_component]
#[derive(Clone, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
pub struct MetricTags(
#[configurable(transparent)] pub(in crate::event) BTreeMap<String, TagValueSet>,
#[configurable(transparent)] pub(in crate::event) Arc<BTreeMap<String, TagValueSet>>,
);

impl MetricTags {
Expand Down Expand Up @@ -469,18 +469,31 @@ impl MetricTags {

/// Iterate over all values of each tag.
pub fn into_iter_all(self) -> impl Iterator<Item = (String, TagValue)> {
self.0
self.into_inner()
.into_iter()
.flat_map(|(name, tags)| tags.into_iter().map(move |tag| (name.clone(), tag)))
}

/// Iterate over a single value of each tag.
pub fn into_iter_single(self) -> impl Iterator<Item = (String, String)> {
self.0
self.into_inner()
.into_iter()
.filter_map(|(name, tags)| tags.into_single().map(|tag| (name, tag)))
}

/// Extract an owned copy of the tag value map out of the Arc.
pub(in crate::event) fn into_inner(self) -> BTreeMap<String, TagValueSet> {
// First, try to unwrap the inner value, assuming the reference count is likely to be 1.
Arc::try_unwrap(self.0)
// If the reference count is greater than one, make a mutable copy and take ownership
// out of that.
.unwrap_or_else(|mut arc| std::mem::take(Arc::make_mut(&mut arc)))
}

fn inner_mut(&mut self) -> &mut BTreeMap<String, TagValueSet> {
Arc::make_mut(&mut self.0)
}

pub fn contains_key(&self, name: &str) -> bool {
self.0.contains_key(name)
}
Expand All @@ -492,40 +505,43 @@ impl MetricTags {
/// Add a value to a tag. This does not replace any existing tags unless the value is a
/// duplicate.
pub fn insert(&mut self, name: String, value: impl Into<TagValue>) {
self.0.entry(name).or_default().insert(value.into());
self.inner_mut()
.entry(name)
.or_default()
.insert(value.into());
}

/// Replace all the values of a tag with a single value.
pub fn replace(&mut self, name: String, value: impl Into<TagValue>) -> Option<String> {
self.0
self.inner_mut()
.insert(name, TagValueSet::from([value.into()]))
.and_then(TagValueSet::into_single)
}

pub fn set_multi_value(&mut self, name: String, values: impl IntoIterator<Item = TagValue>) {
let x = TagValueSet::from_iter(values);
self.0.insert(name, x);
self.inner_mut().insert(name, x);
}

pub fn remove(&mut self, name: &str) -> Option<String> {
self.0.remove(name).and_then(TagValueSet::into_single)
self.inner_mut()
.remove(name)
.and_then(TagValueSet::into_single)
}

pub fn keys(&self) -> impl Iterator<Item = &str> {
self.0.keys().map(String::as_str)
}

pub fn extend(&mut self, tags: impl IntoIterator<Item = (String, String)>) {
let this = self.inner_mut();
for (key, value) in tags {
self.0
.entry(key)
.or_default()
.insert(TagValue::Value(value));
this.entry(key).or_default().insert(TagValue::Value(value));
}
}

pub fn retain(&mut self, mut f: impl FnMut(&str, &mut TagValueSet) -> bool) {
self.0.retain(|key, tags| f(key.as_str(), tags));
self.inner_mut().retain(|key, tags| f(key.as_str(), tags));
}
}

Expand All @@ -549,25 +565,24 @@ impl<const N: usize> From<[(String, String); N]> for MetricTags {

impl FromIterator<(String, String)> for MetricTags {
fn from_iter<T: IntoIterator<Item = (String, String)>>(tags: T) -> Self {
let mut result = Self::default();
let mut result = BTreeMap::<String, TagValueSet>::default();
for (key, value) in tags {
result
.0
.entry(key)
.or_default()
.insert(TagValue::Value(value));
}
result
Self(Arc::new(result))
}
}

impl FromIterator<(String, TagValue)> for MetricTags {
fn from_iter<T: IntoIterator<Item = (String, TagValue)>>(tags: T) -> Self {
let mut result = Self::default();
let mut result = BTreeMap::<String, TagValueSet>::default();
for (key, value) in tags {
result.0.entry(key).or_default().insert(value);
result.entry(key).or_default().insert(value);
}
result
Self(Arc::new(result))
}
}

Expand Down Expand Up @@ -599,7 +614,7 @@ mod test_support {

impl Arbitrary for MetricTags {
fn arbitrary(g: &mut Gen) -> Self {
Self(BTreeMap::arbitrary(g))
Self(Arc::new(BTreeMap::arbitrary(g)))
}

fn shrink(&self) -> Box<dyn Iterator<Item = Self>> {
Expand Down
8 changes: 5 additions & 3 deletions lib/vector-core/src/event/proto.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::sync::Arc;

use chrono::TimeZone;
use ordered_float::NotNan;

Expand Down Expand Up @@ -207,7 +209,7 @@ impl From<Metric> for event::Metric {
.timestamp
.map(|ts| chrono::Utc.timestamp(ts.seconds, ts.nanos as u32));

let mut tags = MetricTags(
let mut tags = MetricTags(Arc::new(
metric
.tags_v2
.into_iter()
Expand All @@ -222,7 +224,7 @@ impl From<Metric> for event::Metric {
)
})
.collect(),
);
));
// The current Vector encoding includes copies of the "single" values of tags in `tags_v2`
// above. This `extend` will re-add those values, forcing them to become the last added in
// the value set.
Expand Down Expand Up @@ -426,7 +428,7 @@ impl From<event::Metric> for WithMetadata<Metric> {
.collect();
// These are the full tag values.
let tags_v2 = tags
.0
.into_inner()
.into_iter()
.map(|(tag, values)| {
let values = values
Expand Down

0 comments on commit 5b2ab6a

Please sign in to comment.