Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -858,8 +858,8 @@ pub fn report_tx_metricses(
reducer: &str,
db: &RelationalDB,
tx_data: Option<&TxData>,
metrics_mut: Option<TxMetrics>,
metrics_read: TxMetrics,
metrics_mut: Option<&TxMetrics>,
metrics_read: &TxMetrics,
) {
if let Some(metrics_mut) = metrics_mut {
metrics_mut.report_with_db(reducer, db, tx_data);
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/db/relational_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -972,7 +972,7 @@ impl RelationalDB {
let mut tx = self.begin_tx(workload);
let res = f(&mut tx);
let (tx_metics, reducer) = self.release_tx(tx);
report_tx_metricses(&reducer, self, None, None, tx_metics);
report_tx_metricses(&reducer, self, None, None, &tx_metics);
res
}

Expand Down
12 changes: 10 additions & 2 deletions crates/core/src/sql/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ pub fn execute_sql_tx<'a>(

pub struct SqlResult {
pub rows: Vec<ProductValue>,
/// These metrics will be reported via `report_tx_metrics`.
/// They should not be reported separately to avoid double counting.
pub metrics: ExecutionMetrics,
}

Expand All @@ -199,10 +201,16 @@ pub fn run(
// and hence there are no deltas to process.
let (tx_data, tx_metrics_mut, tx) = tx.commit_downgrade(Workload::Sql);

// Release the tx on drop, so that we record metrics
// Release the tx on drop, so that we record metrics.
let mut tx = scopeguard::guard(tx, |tx| {
let (tx_metrics_downgrade, reducer) = db.release_tx(tx);
report_tx_metricses(&reducer, db, Some(&tx_data), Some(tx_metrics_mut), tx_metrics_downgrade);
report_tx_metricses(
&reducer,
db,
Some(&tx_data),
Some(&tx_metrics_mut),
&tx_metrics_downgrade,
);
});

// Compute the header for the result set
Expand Down
29 changes: 21 additions & 8 deletions crates/core/src/subscription/module_subscription_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,9 @@ impl ModuleSubscriptions {
}

/// Commit a transaction and broadcast its ModuleEvent to all interested subscribers.
///
/// The returned [`ExecutionMetrics`] are reported in this method via `report_tx_metrics`.
/// They are returned for testing purposes but should not be reported separately.
pub fn commit_and_broadcast_event(
&self,
caller: Option<&ClientConnectionSender>,
Expand All @@ -655,7 +658,7 @@ impl ModuleSubscriptions {
let subscriptions = self.subscriptions.read();
let stdb = &self.relational_db;
// Downgrade mutable tx.
// Ensure tx is released/cleaned up once out of scope.
// We'll later ensure tx is released/cleaned up once out of scope.
let (read_tx, tx_data, tx_metrics_mut) = match &mut event.status {
EventStatus::Committed(db_update) => {
let Some((tx_data, tx_metrics, read_tx)) = stdb.commit_tx_downgrade(tx, Workload::Update)? else {
Expand All @@ -670,22 +673,29 @@ impl ModuleSubscriptions {
}
};

let read_tx = scopeguard::guard(read_tx, |tx| {
// When we're done with this method, release the tx and report metrics.
let mut read_tx = scopeguard::guard(read_tx, |tx| {
let (tx_metrics_read, reducer) = self.relational_db.release_tx(tx);
report_tx_metricses(&reducer, stdb, tx_data.as_ref(), Some(tx_metrics_mut), tx_metrics_read);
report_tx_metricses(
&reducer,
stdb,
tx_data.as_ref(),
Some(&tx_metrics_mut),
&tx_metrics_read,
);
});

let read_tx = tx_data
// Create the delta transaction we'll use to eval updates against.
let delta_read_tx = tx_data
.as_ref()
.map(|tx_data| DeltaTx::new(&read_tx, tx_data))
.unwrap_or_else(|| DeltaTx::from(&*read_tx));

let event = Arc::new(event);
let mut metrics = ExecutionMetrics::default();
let mut update_metrics: ExecutionMetrics = ExecutionMetrics::default();

match &event.status {
EventStatus::Committed(_) => {
metrics.merge(subscriptions.eval_updates(&read_tx, event.clone(), caller));
update_metrics = subscriptions.eval_updates(&delta_read_tx, event.clone(), caller);
}
EventStatus::Failed(_) => {
if let Some(client) = caller {
Expand All @@ -701,7 +711,10 @@ impl ModuleSubscriptions {
EventStatus::OutOfEnergy => {} // ?
}

Ok(Ok((event, metrics)))
// Merge in the subscription evaluation metrics.
read_tx.metrics.merge(update_metrics);

Ok(Ok((event, update_metrics)))
}
}

Expand Down
Loading