Skip to content

Commit

Permalink
Revert "Revert "make storage egress and ingress usage component aware…
Browse files Browse the repository at this point in the history
… (#29846)" (#29883)" (#29917)

GitOrigin-RevId: dab054fa2ecc8da07854bab1234d479c17ea9f04
  • Loading branch information
sujayakar authored and Convex, Inc. committed Sep 17, 2024
1 parent 8192a98 commit 583ec19
Show file tree
Hide file tree
Showing 9 changed files with 235 additions and 43 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 11 additions & 2 deletions crates/application/src/export_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,10 @@ impl<RT: Runtime> ExportWorker<RT> {
content_type,
file_storage_entry.sha256.clone(),
)
.track_storage_egress_size(file_stream.content_length as u64);
.track_storage_egress_size(
component_path.clone(),
file_stream.content_length as u64,
);
zip_snapshot_upload
.stream_full_file(path, file_stream.stream)
.await?;
Expand Down Expand Up @@ -1218,7 +1221,13 @@ mod tests {

let usage = usage.gather_user_stats();
assert!(usage.database_egress_size.is_empty());
assert_eq!(usage.storage_egress_size, 3);
assert_eq!(
*usage
.storage_egress_size
.get(&ComponentPath::test_user())
.unwrap(),
3
);

Ok(())
}
Expand Down
9 changes: 6 additions & 3 deletions crates/application/src/snapshot_import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2096,7 +2096,7 @@ async fn import_storage_table<RT: Runtime>(
content_type,
entry.sha256,
)
.track_storage_ingress_size(file_size);
.track_storage_ingress_size(component_path.clone(), file_size);
num_files += 1;
if let Some(import_id) = import_id {
best_effort_update_progress_message(
Expand Down Expand Up @@ -3517,8 +3517,11 @@ a
.await?;

let stats = usage.gather_user_stats();
assert!(stats.database_ingress_size[&(component_path, table_name.to_string())] > 0);
assert_eq!(stats.storage_ingress_size, 9);
assert!(stats.database_ingress_size[&(component_path.clone(), table_name.to_string())] > 0);
assert_eq!(
*stats.storage_ingress_size.get(&component_path).unwrap(),
9u64
);

Ok(())
}
Expand Down
1 change: 1 addition & 0 deletions crates/events/src/usage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ pub enum UsageEvent {
/// import/export).
StorageBandwidth {
id: String,
component_path: Option<String>,
ingress: u64,
egress: u64,
},
Expand Down
19 changes: 14 additions & 5 deletions crates/file_storage/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,8 +243,9 @@ impl<RT: Runtime> TransactionalFileStorage<RT> {
let stream = storage_get_stream.stream;
let content_length = ContentLength(storage_get_stream.content_length as u64);

let component_path = ComponentPath::TODO();
let call_tracker = usage_tracker.track_storage_call(
ComponentPath::TODO(),
component_path.clone(),
"get range",
storage_id,
content_type.clone(),
Expand All @@ -255,11 +256,12 @@ impl<RT: Runtime> TransactionalFileStorage<RT> {
content_length,
content_range,
content_type,
stream: Self::track_stream_usage(stream, get_file_type, call_tracker),
stream: Self::track_stream_usage(component_path, stream, get_file_type, call_tracker),
})
}

fn track_stream_usage(
component_path: ComponentPath,
stream: BoxStream<'static, futures::io::Result<bytes::Bytes>>,
get_file_type: GetFileType,
storage_call_tracker: Box<dyn StorageCallTracker>,
Expand Down Expand Up @@ -291,7 +293,8 @@ impl<RT: Runtime> TransactionalFileStorage<RT> {
if let Ok(ref bytes) = bytes {
let bytes_size = bytes.len() as u64;
log_get_file_chunk_size(bytes_size, get_file_type);
storage_call_tracker.track_storage_egress_size(bytes_size);
storage_call_tracker
.track_storage_egress_size(component_path.clone(), bytes_size);
}
bytes
}),
Expand Down Expand Up @@ -429,8 +432,14 @@ impl<RT: Runtime> FileStorage<RT> {
.await?;

usage_tracker
.track_storage_call(component_path, "store", storage_id, content_type, sha256)
.track_storage_ingress_size(size as u64);
.track_storage_call(
component_path.clone(),
"store",
storage_id,
content_type,
sha256,
)
.track_storage_ingress_size(component_path, size as u64);
Ok(virtual_id)
}
}
5 changes: 3 additions & 2 deletions crates/isolate/src/environment/action/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,15 +80,16 @@ impl<RT: Runtime> TaskExecutor<RT> {
.storage_store_file_entry(self.identity.clone(), self.component_id(), entry)
.await?;

let component_path = ComponentPath::TODO();
self.usage_tracker
.track_storage_call(
ComponentPath::TODO(),
component_path.clone(),
"store",
storage_id,
content_type,
sha256,
)
.track_storage_ingress_size(size as u64);
.track_storage_ingress_size(component_path, size as u64);

Ok(storage_doc_id)
}
Expand Down
9 changes: 9 additions & 0 deletions crates/pb/protos/usage.proto
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,12 @@ package usage;

message FunctionUsageStats {
repeated CounterWithTag storage_calls = 1;
// TODO(ENG-7342): Remove after services are pushed with by_component version
optional uint64 storage_ingress_size = 2;
repeated CounterWithComponent storage_ingress_size_by_component = 8;
// TODO(ENG-7342): Remove after services are pushed with by_component version
optional uint64 storage_egress_size = 3;
repeated CounterWithComponent storage_egress_size_by_component = 9;
repeated CounterWithTag database_ingress_size = 4;
repeated CounterWithTag database_egress_size = 5;
repeated CounterWithTag vector_ingress_size = 6;
Expand All @@ -17,3 +21,8 @@ message CounterWithTag {
optional string table_name = 1;
optional uint64 count = 2;
}

message CounterWithComponent {
optional string component_path = 1;
optional uint64 count = 2;
}
1 change: 1 addition & 0 deletions crates/usage_tracking/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ anyhow = { workspace = true }
common = { path = "../common" }
events = { path = "../events" }
headers = { workspace = true }
maplit = { workspace = true }
metrics = { path = "../metrics" }
parking_lot = { workspace = true, features = ["hardware-lock-elision"] }
pb = { path = "../pb" }
Expand Down
Loading

0 comments on commit 583ec19

Please sign in to comment.