Skip to content

Commit 1dbda04

Browse files
rrwang7Convex, Inc.
authored andcommitted
make export worker use independent storage tracking (#30797)
GitOrigin-RevId: 52a773a5a2c971f91c7cb718ab3c5fdd91ba7f00
1 parent 3e68485 commit 1dbda04

File tree

3 files changed

+90
-24
lines changed

3 files changed

+90
-24
lines changed

crates/application/src/export_worker.rs

Lines changed: 44 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ use model::{
7070
types::{
7171
Export,
7272
ExportFormat,
73+
ExportRequestor,
7374
},
7475
ExportsModel,
7576
},
@@ -288,6 +289,7 @@ impl<RT: Runtime> ExportWorker<RT> {
288289
&mut self,
289290
format: ExportFormat,
290291
component: ComponentId,
292+
requestor: ExportRequestor,
291293
) -> anyhow::Result<(Timestamp, ObjectKey, FunctionUsageTracker)> {
292294
tracing::info!("Beginning snapshot export...");
293295
let storage = &self.storage;
@@ -346,6 +348,7 @@ impl<RT: Runtime> ExportWorker<RT> {
346348
system_tables,
347349
include_storage,
348350
usage.clone(),
351+
requestor,
349352
);
350353
let (_, ()) = try_join!(uploader, zipper)?;
351354
let zip_object_key = upload.complete().await?;
@@ -367,6 +370,7 @@ impl<RT: Runtime> ExportWorker<RT> {
367370
system_tables: &BTreeMap<(TableNamespace, TableName), TabletId>,
368371
include_storage: bool,
369372
usage: FunctionUsageTracker,
373+
requestor: ExportRequestor,
370374
) -> anyhow::Result<()> {
371375
let namespace: TableNamespace = component_tree.id.into();
372376
let component_path = component_ids_to_paths
@@ -477,19 +481,18 @@ impl<RT: Runtime> ExportWorker<RT> {
477481
.as_ref()
478482
.map(|ct| ct.parse())
479483
.transpose()?;
480-
usage
481-
.track_storage_call(
482-
component_path.clone(),
483-
"snapshot_export",
484-
file_storage_entry.storage_id.clone(),
485-
content_type,
486-
file_storage_entry.sha256.clone(),
487-
)
488-
.track_storage_egress_size(
489-
component_path.clone(),
490-
"snapshot_export".to_string(),
491-
file_stream.content_length as u64,
492-
);
484+
usage.track_storage_call(
485+
component_path.clone(),
486+
"snapshot_export",
487+
file_storage_entry.storage_id.clone(),
488+
content_type,
489+
file_storage_entry.sha256.clone(),
490+
);
491+
self.usage_tracking.track_independent_storage_egress_size(
492+
component_path.clone(),
493+
requestor.usage_tag(),
494+
file_stream.content_length as u64,
495+
);
493496
zip_snapshot_upload
494497
.stream_full_file(path, file_stream.stream)
495498
.await?;
@@ -552,6 +555,7 @@ impl<RT: Runtime> ExportWorker<RT> {
552555
system_tables,
553556
include_storage,
554557
usage.clone(),
558+
requestor,
555559
)
556560
.await?;
557561
}
@@ -570,6 +574,7 @@ impl<RT: Runtime> ExportWorker<RT> {
570574
system_tables: BTreeMap<(TableNamespace, TableName), TabletId>,
571575
include_storage: bool,
572576
usage: FunctionUsageTracker,
577+
requestor: ExportRequestor,
573578
) -> anyhow::Result<()> {
574579
let mut zip_snapshot_upload = ZipSnapshotUpload::new(&mut writer).await?;
575580

@@ -584,6 +589,7 @@ impl<RT: Runtime> ExportWorker<RT> {
584589
&system_tables,
585590
include_storage,
586591
usage,
592+
requestor,
587593
)
588594
.await?;
589595

@@ -597,21 +603,34 @@ impl<RT: Runtime> ExportWorker<RT> {
597603
&mut self,
598604
export: ParsedDocument<Export>,
599605
) -> anyhow::Result<()> {
600-
let (ts, object_keys, usage) = self
601-
.export_inner(export.format(), export.component())
606+
let (ts, object_key, usage) = self
607+
.export_inner(export.format(), export.component(), export.requestor())
602608
.await?;
603609

604610
let mut tx = self.database.begin(Identity::system()).await?;
605611
let completed_export =
606612
(*export)
607613
.clone()
608-
.completed(ts, *tx.begin_timestamp(), object_keys)?;
614+
.completed(ts, *tx.begin_timestamp(), object_key.clone())?;
609615
SystemMetadataModel::new_global(&mut tx)
610616
.replace(export.id(), completed_export.try_into()?)
611617
.await?;
612618
self.database
613619
.commit_with_write_source(tx, "export_worker_mark_complete")
614620
.await?;
621+
let object_attributes = self
622+
.storage
623+
.get_object_attributes(&object_key)
624+
.await?
625+
.context("error getting export object attributes from S3")?;
626+
627+
// Charge file bandwidth for the upload of the snapshot to exports storage
628+
self.usage_tracking.track_independent_storage_ingress_size(
629+
ComponentPath::root(),
630+
export.requestor().usage_tag(),
631+
object_attributes.size,
632+
);
633+
// Charge database bandwidth accumulated during the export
615634
self.usage_tracking.track_call(
616635
UdfIdentifier::Cli("export".to_string()),
617636
ExecutionId::new(),
@@ -809,7 +828,10 @@ mod tests {
809828
use headers::ContentType;
810829
use keybroker::Identity;
811830
use model::{
812-
exports::types::ExportFormat,
831+
exports::types::{
832+
ExportFormat,
833+
ExportRequestor,
834+
},
813835
file_storage::types::FileStorageEntry,
814836
test_helpers::DbFixturesWithModel,
815837
};
@@ -932,6 +954,7 @@ mod tests {
932954
include_storage: true,
933955
},
934956
ComponentId::Root,
957+
ExportRequestor::SnapshotExport,
935958
)
936959
.await?;
937960

@@ -1046,6 +1069,7 @@ mod tests {
10461069
include_storage: false,
10471070
},
10481071
ComponentId::Root,
1072+
ExportRequestor::SnapshotExport,
10491073
)
10501074
.await?;
10511075

@@ -1106,6 +1130,7 @@ mod tests {
11061130
include_storage: false,
11071131
},
11081132
child_component,
1133+
ExportRequestor::SnapshotExport,
11091134
)
11101135
.await?;
11111136

@@ -1199,6 +1224,7 @@ mod tests {
11991224
include_storage: true,
12001225
},
12011226
ComponentId::Root,
1227+
ExportRequestor::SnapshotExport,
12021228
)
12031229
.await?;
12041230

@@ -1224,13 +1250,6 @@ mod tests {
12241250

12251251
let usage = usage.gather_user_stats();
12261252
assert!(usage.database_egress_size.is_empty());
1227-
assert_eq!(
1228-
*usage
1229-
.storage_egress_size
1230-
.get(&ComponentPath::test_user())
1231-
.unwrap(),
1232-
3
1233-
);
12341253

12351254
Ok(())
12361255
}
@@ -1268,6 +1287,7 @@ mod tests {
12681287
include_storage: false,
12691288
},
12701289
ComponentId::test_user(),
1290+
ExportRequestor::SnapshotExport,
12711291
)
12721292
.await?;
12731293
Ok(())

crates/model/src/exports/types.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,15 @@ impl Export {
240240
| Export::Failed { component, .. } => *component,
241241
}
242242
}
243+
244+
pub fn requestor(&self) -> ExportRequestor {
245+
match self {
246+
Export::Requested { requestor, .. }
247+
| Export::InProgress { requestor, .. }
248+
| Export::Completed { requestor, .. }
249+
| Export::Failed { requestor, .. } => *requestor,
250+
}
251+
}
243252
}
244253

245254
#[derive(Copy, Clone, Debug, PartialEq)]
@@ -283,6 +292,15 @@ pub enum ExportRequestor {
283292
CloudBackup,
284293
}
285294

295+
impl ExportRequestor {
296+
pub fn usage_tag(&self) -> String {
297+
match self {
298+
Self::SnapshotExport => "snapshot_export".to_string(),
299+
Self::CloudBackup => "cloud_backup".to_string(),
300+
}
301+
}
302+
}
303+
286304
impl Export {
287305
pub fn requested(
288306
format: ExportFormat,

crates/usage_tracking/src/lib.rs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,34 @@ impl UsageCounter {
4747
pub fn new(usage_logger: Arc<dyn UsageEventLogger>) -> Self {
4848
Self { usage_logger }
4949
}
50+
51+
// Used for tracking storage ingress outside of a user function (e.g. snapshot
52+
// import/export).
53+
pub fn track_independent_storage_ingress_size(
54+
&self,
55+
component_path: ComponentPath,
56+
tag: String,
57+
ingress_size: u64,
58+
) {
59+
let independent_tracker =
60+
IndependentStorageCallTracker::new(ExecutionId::new(), self.usage_logger.clone());
61+
62+
independent_tracker.track_storage_ingress_size(component_path, tag, ingress_size);
63+
}
64+
65+
// Used for tracking storage egress outside of a user function (e.g. snapshot
66+
// import/export).
67+
pub fn track_independent_storage_egress_size(
68+
&self,
69+
component_path: ComponentPath,
70+
tag: String,
71+
egress_size: u64,
72+
) {
73+
let independent_tracker =
74+
IndependentStorageCallTracker::new(ExecutionId::new(), self.usage_logger.clone());
75+
76+
independent_tracker.track_storage_egress_size(component_path, tag, egress_size);
77+
}
5078
}
5179

5280
pub enum CallType {

0 commit comments

Comments
 (0)