From 583ec1924963f67532884232a23f16a9ad002e08 Mon Sep 17 00:00:00 2001 From: Sujay Jayakar Date: Tue, 17 Sep 2024 18:00:41 -0400 Subject: [PATCH] Revert "Revert "make storage egress and ingress usage component aware (#29846)" (#29883)" (#29917) GitOrigin-RevId: dab054fa2ecc8da07854bab1234d479c17ea9f04 --- Cargo.lock | 1 + crates/application/src/export_worker.rs | 13 +- crates/application/src/snapshot_import.rs | 9 +- crates/events/src/usage.rs | 1 + crates/file_storage/src/core.rs | 19 +- .../isolate/src/environment/action/storage.rs | 5 +- crates/pb/protos/usage.proto | 9 + crates/usage_tracking/Cargo.toml | 1 + crates/usage_tracking/src/lib.rs | 220 +++++++++++++++--- 9 files changed, 235 insertions(+), 43 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index aef4931a..736faaec 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8495,6 +8495,7 @@ dependencies = [ "common", "events", "headers", + "maplit", "metrics", "parking_lot", "pb", diff --git a/crates/application/src/export_worker.rs b/crates/application/src/export_worker.rs index 46dd8f4c..8d419287 100644 --- a/crates/application/src/export_worker.rs +++ b/crates/application/src/export_worker.rs @@ -483,7 +483,10 @@ impl ExportWorker { content_type, file_storage_entry.sha256.clone(), ) - .track_storage_egress_size(file_stream.content_length as u64); + .track_storage_egress_size( + component_path.clone(), + file_stream.content_length as u64, + ); zip_snapshot_upload .stream_full_file(path, file_stream.stream) .await?; @@ -1218,7 +1221,13 @@ mod tests { let usage = usage.gather_user_stats(); assert!(usage.database_egress_size.is_empty()); - assert_eq!(usage.storage_egress_size, 3); + assert_eq!( + *usage + .storage_egress_size + .get(&ComponentPath::test_user()) + .unwrap(), + 3 + ); Ok(()) } diff --git a/crates/application/src/snapshot_import.rs b/crates/application/src/snapshot_import.rs index 402e7445..d61f18ac 100644 --- a/crates/application/src/snapshot_import.rs +++ b/crates/application/src/snapshot_import.rs @@ -2096,7 +2096,7 @@ async fn import_storage_table( content_type, entry.sha256, ) - .track_storage_ingress_size(file_size); + .track_storage_ingress_size(component_path.clone(), file_size); num_files += 1; if let Some(import_id) = import_id { best_effort_update_progress_message( @@ -3517,8 +3517,11 @@ a .await?; let stats = usage.gather_user_stats(); - assert!(stats.database_ingress_size[&(component_path, table_name.to_string())] > 0); - assert_eq!(stats.storage_ingress_size, 9); + assert!(stats.database_ingress_size[&(component_path.clone(), table_name.to_string())] > 0); + assert_eq!( + *stats.storage_ingress_size.get(&component_path).unwrap(), + 9u64 + ); Ok(()) } diff --git a/crates/events/src/usage.rs b/crates/events/src/usage.rs index 3f092845..0e7a8bbc 100644 --- a/crates/events/src/usage.rs +++ b/crates/events/src/usage.rs @@ -77,6 +77,7 @@ pub enum UsageEvent { /// import/export). StorageBandwidth { id: String, + component_path: Option, ingress: u64, egress: u64, }, diff --git a/crates/file_storage/src/core.rs b/crates/file_storage/src/core.rs index bc480471..da9bd897 100644 --- a/crates/file_storage/src/core.rs +++ b/crates/file_storage/src/core.rs @@ -243,8 +243,9 @@ impl TransactionalFileStorage { let stream = storage_get_stream.stream; let content_length = ContentLength(storage_get_stream.content_length as u64); + let component_path = ComponentPath::TODO(); let call_tracker = usage_tracker.track_storage_call( - ComponentPath::TODO(), + component_path.clone(), "get range", storage_id, content_type.clone(), @@ -255,11 +256,12 @@ impl TransactionalFileStorage { content_length, content_range, content_type, - stream: Self::track_stream_usage(stream, get_file_type, call_tracker), + stream: Self::track_stream_usage(component_path, stream, get_file_type, call_tracker), }) } fn track_stream_usage( + component_path: ComponentPath, stream: BoxStream<'static, futures::io::Result>, get_file_type: GetFileType, storage_call_tracker: Box, @@ -291,7 +293,8 @@ impl TransactionalFileStorage { if let Ok(ref bytes) = bytes { let bytes_size = bytes.len() as u64; log_get_file_chunk_size(bytes_size, get_file_type); - storage_call_tracker.track_storage_egress_size(bytes_size); + storage_call_tracker + .track_storage_egress_size(component_path.clone(), bytes_size); } bytes }), @@ -429,8 +432,14 @@ impl FileStorage { .await?; usage_tracker - .track_storage_call(component_path, "store", storage_id, content_type, sha256) - .track_storage_ingress_size(size as u64); + .track_storage_call( + component_path.clone(), + "store", + storage_id, + content_type, + sha256, + ) + .track_storage_ingress_size(component_path, size as u64); Ok(virtual_id) } } diff --git a/crates/isolate/src/environment/action/storage.rs b/crates/isolate/src/environment/action/storage.rs index 8508906a..60862113 100644 --- a/crates/isolate/src/environment/action/storage.rs +++ b/crates/isolate/src/environment/action/storage.rs @@ -80,15 +80,16 @@ impl TaskExecutor { .storage_store_file_entry(self.identity.clone(), self.component_id(), entry) .await?; + let component_path = ComponentPath::TODO(); self.usage_tracker .track_storage_call( - ComponentPath::TODO(), + component_path.clone(), "store", storage_id, content_type, sha256, ) - .track_storage_ingress_size(size as u64); + .track_storage_ingress_size(component_path, size as u64); Ok(storage_doc_id) } diff --git a/crates/pb/protos/usage.proto b/crates/pb/protos/usage.proto index 5762d1ea..cbd27391 100644 --- a/crates/pb/protos/usage.proto +++ b/crates/pb/protos/usage.proto @@ -4,8 +4,12 @@ package usage; message FunctionUsageStats { repeated CounterWithTag storage_calls = 1; + // TODO(ENG-7342): Remove after services are pushed with by_component version optional uint64 storage_ingress_size = 2; + repeated CounterWithComponent storage_ingress_size_by_component = 8; + // TODO(ENG-7342): Remove after services are pushed with by_component version optional uint64 storage_egress_size = 3; + repeated CounterWithComponent storage_egress_size_by_component = 9; repeated CounterWithTag database_ingress_size = 4; repeated CounterWithTag database_egress_size = 5; repeated CounterWithTag vector_ingress_size = 6; @@ -17,3 +21,8 @@ message CounterWithTag { optional string table_name = 1; optional uint64 count = 2; } + +message CounterWithComponent { + optional string component_path = 1; + optional uint64 count = 2; +} diff --git a/crates/usage_tracking/Cargo.toml b/crates/usage_tracking/Cargo.toml index 7f8879e4..1f0661d4 100644 --- a/crates/usage_tracking/Cargo.toml +++ b/crates/usage_tracking/Cargo.toml @@ -13,6 +13,7 @@ anyhow = { workspace = true } common = { path = "../common" } events = { path = "../events" } headers = { workspace = true } +maplit = { workspace = true } metrics = { path = "../metrics" } parking_lot = { workspace = true, features = ["hardware-lock-elision"] } pb = { path = "../pb" } diff --git a/crates/usage_tracking/src/lib.rs b/crates/usage_tracking/src/lib.rs index 614ad6bc..482dad40 100644 --- a/crates/usage_tracking/src/lib.rs +++ b/crates/usage_tracking/src/lib.rs @@ -1,5 +1,6 @@ #![feature(iterator_try_collect)] #![feature(lazy_cell)] +#![feature(let_chains)] use std::{ collections::BTreeMap, @@ -23,8 +24,10 @@ use events::usage::{ UsageEventLogger, }; use headers::ContentType; +use maplit::btreemap; use parking_lot::Mutex; use pb::usage::{ + CounterWithComponent as CounterWithComponentProto, CounterWithTag as CounterWithTagProto, FunctionUsageStats as FunctionUsageStatsProto, }; @@ -179,7 +182,7 @@ impl UsageCounter { usage_metrics: &mut Vec, ) { // Merge the storage stats. - let (component_path, udf_id) = udf_path.clone().into_component_and_udf_path(); + let (_, udf_id) = udf_path.clone().into_component_and_udf_path(); for ((component_path, storage_api), function_count) in stats.storage_calls { usage_metrics.push(UsageEvent::FunctionStorageCalls { id: execution_id.to_string(), @@ -189,13 +192,25 @@ impl UsageCounter { count: function_count, }); } - usage_metrics.push(UsageEvent::FunctionStorageBandwidth { - id: execution_id.to_string(), - component_path: component_path.clone(), - udf_id: udf_id.clone(), - ingress: stats.storage_ingress_size, - egress: stats.storage_egress_size, - }); + + for (component_path, ingress_size) in stats.storage_ingress_size { + usage_metrics.push(UsageEvent::FunctionStorageBandwidth { + id: execution_id.to_string(), + component_path: component_path.serialize(), + udf_id: udf_id.clone(), + ingress: ingress_size, + egress: 0, + }); + } + for (component_path, egress_size) in stats.storage_egress_size { + usage_metrics.push(UsageEvent::FunctionStorageBandwidth { + id: execution_id.to_string(), + component_path: component_path.serialize(), + udf_id: udf_id.clone(), + ingress: 0, + egress: egress_size, + }); + } // Merge "by table" bandwidth stats. for ((component_path, table_name), ingress_size) in stats.database_ingress_size { usage_metrics.push(UsageEvent::DatabaseBandwidth { @@ -255,8 +270,8 @@ pub trait StorageUsageTracker: Send + Sync { } pub trait StorageCallTracker: Send + Sync { - fn track_storage_ingress_size(&self, ingress_size: u64); - fn track_storage_egress_size(&self, egress_size: u64); + fn track_storage_ingress_size(&self, component_path: ComponentPath, ingress_size: u64); + fn track_storage_egress_size(&self, component_path: ComponentPath, egress_size: u64); } struct IndependentStorageCallTracker { @@ -274,19 +289,21 @@ impl IndependentStorageCallTracker { } impl StorageCallTracker for IndependentStorageCallTracker { - fn track_storage_ingress_size(&self, ingress_size: u64) { + fn track_storage_ingress_size(&self, component_path: ComponentPath, ingress_size: u64) { metrics::storage::log_storage_ingress_size(ingress_size); self.usage_logger.record(vec![UsageEvent::StorageBandwidth { id: self.execution_id.to_string(), + component_path: component_path.serialize(), ingress: ingress_size, egress: 0, }]); } - fn track_storage_egress_size(&self, egress_size: u64) { + fn track_storage_egress_size(&self, component_path: ComponentPath, egress_size: u64) { metrics::storage::log_storage_egress_size(egress_size); self.usage_logger.record(vec![UsageEvent::StorageBandwidth { id: self.execution_id.to_string(), + component_path: component_path.serialize(), ingress: 0, egress: egress_size, }]); @@ -477,16 +494,20 @@ impl FunctionUsageTracker { // aggregate over the entire UDF and not worry about sending usage events or // creating unique execution ids. impl StorageCallTracker for FunctionUsageTracker { - fn track_storage_ingress_size(&self, ingress_size: u64) { + fn track_storage_ingress_size(&self, component_path: ComponentPath, ingress_size: u64) { let mut state = self.state.lock(); metrics::storage::log_storage_ingress_size(ingress_size); - state.storage_ingress_size += ingress_size; + state + .storage_ingress_size + .mutate_entry_or_default(component_path, |count| *count += ingress_size); } - fn track_storage_egress_size(&self, egress_size: u64) { + fn track_storage_egress_size(&self, component_path: ComponentPath, egress_size: u64) { let mut state = self.state.lock(); metrics::storage::log_storage_egress_size(egress_size); - state.storage_egress_size += egress_size; + state + .storage_egress_size + .mutate_entry_or_default(component_path, |count| *count += egress_size); } } @@ -515,11 +536,10 @@ type StorageAPI = String; /// User-facing UDF stats, built #[derive(Debug, Clone, PartialEq, Eq, Default)] -#[cfg_attr(any(test, feature = "testing"), derive(proptest_derive::Arbitrary))] pub struct FunctionUsageStats { pub storage_calls: WithHeapSize>, - pub storage_ingress_size: u64, - pub storage_egress_size: u64, + pub storage_ingress_size: WithHeapSize>, + pub storage_egress_size: WithHeapSize>, pub database_ingress_size: WithHeapSize>, pub database_egress_size: WithHeapSize>, pub vector_ingress_size: WithHeapSize>, @@ -531,8 +551,8 @@ impl FunctionUsageStats { AggregatedFunctionUsageStats { database_read_bytes: self.database_egress_size.values().sum(), database_write_bytes: self.database_ingress_size.values().sum(), - storage_read_bytes: self.storage_egress_size, - storage_write_bytes: self.storage_ingress_size, + storage_read_bytes: self.storage_egress_size.values().sum(), + storage_write_bytes: self.storage_ingress_size.values().sum(), vector_index_read_bytes: self.vector_egress_size.values().sum(), vector_index_write_bytes: self.vector_ingress_size.values().sum(), } @@ -544,8 +564,14 @@ impl FunctionUsageStats { self.storage_calls .mutate_entry_or_default(key, |count| *count += function_count); } - self.storage_ingress_size += other.storage_ingress_size; - self.storage_egress_size += other.storage_egress_size; + for (key, ingress_size) in other.storage_ingress_size { + self.storage_ingress_size + .mutate_entry_or_default(key, |count| *count += ingress_size); + } + for (key, egress_size) in other.storage_egress_size { + self.storage_egress_size + .mutate_entry_or_default(key, |count| *count += egress_size); + } // Merge "by table" bandwidth other. for (key, ingress_size) in other.database_ingress_size { @@ -567,6 +593,84 @@ impl FunctionUsageStats { } } +#[cfg(any(test, feature = "testing"))] +mod usage_arbitrary { + use proptest::prelude::*; + + use crate::{ + ComponentPath, + FunctionUsageStats, + StorageAPI, + TableName, + WithHeapSize, + }; + + impl Arbitrary for FunctionUsageStats { + type Parameters = (); + type Strategy = BoxedStrategy; + + fn arbitrary_with((): Self::Parameters) -> Self::Strategy { + let strategies = ( + proptest::collection::btree_map( + any::<(ComponentPath, StorageAPI)>(), + 0..=1024u64, + 0..=4, + ) + .prop_map(WithHeapSize::from), + proptest::collection::btree_map(any::(), 0..=1024u64, 0..=4) + .prop_map(WithHeapSize::from), + proptest::collection::btree_map(any::(), 0..=1024u64, 0..=4) + .prop_map(WithHeapSize::from), + proptest::collection::btree_map( + any::<(ComponentPath, TableName)>(), + 0..=1024u64, + 0..=4, + ) + .prop_map(WithHeapSize::from), + proptest::collection::btree_map( + any::<(ComponentPath, TableName)>(), + 0..=1024u64, + 0..=4, + ) + .prop_map(WithHeapSize::from), + proptest::collection::btree_map( + any::<(ComponentPath, TableName)>(), + 0..=1024u64, + 0..=4, + ) + .prop_map(WithHeapSize::from), + proptest::collection::btree_map( + any::<(ComponentPath, TableName)>(), + 0..=1024u64, + 0..=4, + ) + .prop_map(WithHeapSize::from), + ); + strategies + .prop_map( + |( + storage_calls, + storage_ingress_size, + storage_egress_size, + database_ingress_size, + database_egress_size, + vector_ingress_size, + vector_egress_size, + )| FunctionUsageStats { + storage_calls, + storage_ingress_size, + storage_egress_size, + database_ingress_size, + database_egress_size, + vector_ingress_size, + vector_egress_size, + }, + ) + .boxed() + } + } +} + fn to_by_tag_count( counts: impl Iterator, ) -> Vec { @@ -581,6 +685,17 @@ fn to_by_tag_count( .collect() } +fn to_by_component_count( + counts: impl Iterator, +) -> Vec { + counts + .map(|(component_path, count)| CounterWithComponentProto { + component_path: component_path.serialize(), + count: Some(count), + }) + .collect() +} + fn from_by_tag_count( counts: Vec, ) -> anyhow::Result> { @@ -596,12 +711,32 @@ fn from_by_tag_count( Ok(counts.into_iter()) } +fn from_by_component_tag_count( + counts: Vec, +) -> anyhow::Result> { + let counts: Vec<_> = counts + .into_iter() + .map(|c| -> anyhow::Result<_> { + let component_path = ComponentPath::deserialize(c.component_path.as_deref())?; + let count = c.count.context("Missing `count` field")?; + Ok((component_path, count)) + }) + .try_collect()?; + Ok(counts.into_iter()) +} + impl From for FunctionUsageStatsProto { fn from(stats: FunctionUsageStats) -> Self { FunctionUsageStatsProto { storage_calls: to_by_tag_count(stats.storage_calls.into_iter()), - storage_ingress_size: Some(stats.storage_ingress_size), - storage_egress_size: Some(stats.storage_egress_size), + storage_ingress_size: Some(stats.storage_ingress_size.values().sum()), + storage_ingress_size_by_component: to_by_component_count( + stats.storage_ingress_size.into_iter(), + ), + storage_egress_size: Some(stats.storage_egress_size.values().sum()), + storage_egress_size_by_component: to_by_component_count( + stats.storage_egress_size.into_iter(), + ), database_ingress_size: to_by_tag_count(stats.database_ingress_size.into_iter()), database_egress_size: to_by_tag_count(stats.database_egress_size.into_iter()), vector_ingress_size: to_by_tag_count(stats.vector_ingress_size.into_iter()), @@ -615,12 +750,35 @@ impl TryFrom for FunctionUsageStats { fn try_from(stats: FunctionUsageStatsProto) -> anyhow::Result { let storage_calls = from_by_tag_count(stats.storage_calls)?.collect(); - let storage_ingress_size = stats - .storage_ingress_size - .context("Missing `storage_ingress_size` field")?; - let storage_egress_size = stats - .storage_egress_size - .context("Missing `storage_egress_size` field")?; + // TODO(ENG-7342) Remove support for old protos + let storage_ingress_size = if let Some(storage_ingress_size) = stats.storage_ingress_size + && stats.storage_ingress_size_by_component.is_empty() + { + if storage_ingress_size > 0 { + btreemap! { + ComponentPath::root() => storage_ingress_size, + } + .into_iter() + .collect() + } else { + btreemap! {}.into_iter().collect() + } + } else { + from_by_component_tag_count(stats.storage_ingress_size_by_component)?.collect() + }; + // TODO(ENG-7342) Remove support for old protos + let storage_egress_size = if let Some(storage_egress_size) = stats.storage_egress_size + && stats.storage_egress_size_by_component.is_empty() + && storage_egress_size > 0 + { + btreemap! { + ComponentPath::root() => storage_egress_size, + } + .into_iter() + .collect() + } else { + from_by_component_tag_count(stats.storage_egress_size_by_component)?.collect() + }; let database_ingress_size = from_by_tag_count(stats.database_ingress_size)?.collect(); let database_egress_size = from_by_tag_count(stats.database_egress_size)?.collect(); let vector_ingress_size = from_by_tag_count(stats.vector_ingress_size)?.collect();