Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions crates/core/src/db/datastore/locking_tx_datastore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2080,13 +2080,13 @@ impl traits::MutTx for Locking {
.with_label_values(&ctx.txn_type(), &ctx.database(), ctx.reducer_name().unwrap_or(""))
.inc();
DB_METRICS
.rdb_txn_cpu_time_ns
.rdb_txn_cpu_time_sec
.with_label_values(&ctx.txn_type(), &ctx.database(), ctx.reducer_name().unwrap_or(""))
.observe(cpu_time.as_nanos() as f64);
.observe(cpu_time.as_secs_f64());
DB_METRICS
.rdb_txn_elapsed_time_ns
.rdb_txn_elapsed_time_sec
.with_label_values(&ctx.txn_type(), &ctx.database(), ctx.reducer_name().unwrap_or(""))
.observe(elapsed_time.as_nanos() as f64);
.observe(elapsed_time.as_secs_f64());
tx.lock.rollback();
}

Expand All @@ -2100,13 +2100,13 @@ impl traits::MutTx for Locking {
.with_label_values(&ctx.txn_type(), &ctx.database(), ctx.reducer_name().unwrap_or(""))
.inc();
DB_METRICS
.rdb_txn_cpu_time_ns
.rdb_txn_cpu_time_sec
.with_label_values(&ctx.txn_type(), &ctx.database(), ctx.reducer_name().unwrap_or(""))
.observe(cpu_time.as_nanos() as f64);
.observe(cpu_time.as_secs_f64());
DB_METRICS
.rdb_txn_elapsed_time_ns
.rdb_txn_elapsed_time_sec
.with_label_values(&ctx.txn_type(), &ctx.database(), ctx.reducer_name().unwrap_or(""))
.observe(elapsed_time.as_nanos() as f64);
.observe(elapsed_time.as_secs_f64());
tx.lock.commit()
}

Expand Down
24 changes: 12 additions & 12 deletions crates/core/src/db/db_metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,10 @@ metrics_group!(
#[labels(table_id: u32)]
pub rdb_delete_by_rel_time: HistogramVec,

#[name = spacetime_scheduled_reducer_delay_ns]
#[help = "The amount of time (nanoseconds) a reducer has been delayed past its scheduled execution time"]
#[name = spacetime_scheduled_reducer_delay_sec]
#[help = "The amount of time (in seconds) a reducer has been delayed past its scheduled execution time"]
#[labels(db: Address, reducer: str)]
pub scheduled_reducer_delay_ns: HistogramVec,
pub scheduled_reducer_delay_sec: HistogramVec,

#[name = spacetime_num_rows_inserted_cumulative]
#[help = "The cumulative number of rows inserted into a table"]
Expand Down Expand Up @@ -91,20 +91,20 @@ metrics_group!(
#[labels(txn_type: TransactionType, db: Address, reducer: str)]
pub rdb_num_txns_rolledback: IntCounterVec,

#[name = spacetime_txn_elapsed_time_ns]
#[help = "The total elapsed (wall) time of a transaction (nanoseconds)"]
#[name = spacetime_txn_elapsed_time_sec]
#[help = "The total elapsed (wall) time of a transaction (in seconds)"]
#[labels(txn_type: TransactionType, db: Address, reducer: str)]
pub rdb_txn_elapsed_time_ns: HistogramVec,
pub rdb_txn_elapsed_time_sec: HistogramVec,

#[name = spacetime_txn_cpu_time_ns]
#[help = "The time spent executing a transaction (nanoseconds), excluding time spent waiting to acquire database locks"]
#[name = spacetime_txn_cpu_time_sec]
#[help = "The time spent executing a transaction (in seconds), excluding time spent waiting to acquire database locks"]
#[labels(txn_type: TransactionType, db: Address, reducer: str)]
pub rdb_txn_cpu_time_ns: HistogramVec,
pub rdb_txn_cpu_time_sec: HistogramVec,

#[name = spacetime_wasm_abi_call_duration_ns]
#[help = "The total duration of a spacetime wasm abi call (nanoseconds); includes row serialization and copying into wasm memory"]
#[name = spacetime_wasm_abi_call_duration_sec]
#[help = "The total duration of a spacetime wasm abi call (in seconds); includes row serialization and copying into wasm memory"]
#[labels(txn_type: TransactionType, db: Address, reducer: str, call: AbiCall)]
pub wasm_abi_call_duration_ns: HistogramVec,
pub wasm_abi_call_duration_sec: HistogramVec,
}
);

Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/host/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,9 +273,9 @@ impl SchedulerActor {
// Note, we are only tracking the time a reducer spends delayed in the queue.
// This does not account for any time the executing thread spends blocked by the os.
DB_METRICS
.scheduled_reducer_delay_ns
.scheduled_reducer_delay_sec
.with_label_values(&module_host.info().address, &scheduled.reducer)
.observe(delay.as_nanos() as f64);
.observe(delay.as_secs_f64());
let db = self.db.clone();
tokio::spawn(async move {
let info = module_host.info();
Expand Down
89 changes: 37 additions & 52 deletions crates/core/src/host/wasmer/wasm_instance_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -402,9 +402,15 @@ impl WasmInstanceEnv {
let db = &ctx.database();
let reducer = &ctx.reducer_name().unwrap_or_default().to_owned();
let syscall = &AbiCall::Insert;
let start = Instant::now();

let result = Self::cvt(caller, "insert", AbiCall::Insert, |caller, mem| {
// TODO: Instead of writing this metric on every insert call,
// we should aggregate and write at the end of the transaction.
let _guard = DB_METRICS
.wasm_abi_call_duration_sec
.with_label_values(txn_type, db, reducer, syscall)
.start_timer();

Self::cvt(caller, "insert", AbiCall::Insert, |caller, mem| {
// Read the row from WASM memory into a buffer.
let mut row_buffer = mem.read_bytes(&caller, row, row_len)?;

Expand All @@ -422,16 +428,7 @@ impl WasmInstanceEnv {
);
mem.set_bytes(&caller, row, row_len, &row_buffer)?;
Ok(())
});

// TODO: Instead of writing this metric on every insert call,
// we should aggregate and write at the end of the transaction.
DB_METRICS
.wasm_abi_call_duration_ns
.with_label_values(txn_type, db, reducer, syscall)
.observe(start.elapsed().as_nanos() as f64);

result
})
}

/// Deletes all rows in the table identified by `table_id`
Expand Down Expand Up @@ -465,9 +462,13 @@ impl WasmInstanceEnv {
let db = &ctx.database();
let reducer = &ctx.reducer_name().unwrap_or_default().to_owned();
let syscall = &AbiCall::DeleteByColEq;
let start = Instant::now();

let result = Self::cvt_ret(
let _guard = DB_METRICS
.wasm_abi_call_duration_sec
.with_label_values(txn_type, db, reducer, syscall)
.start_timer();

Self::cvt_ret(
caller,
"delete_by_col_eq",
AbiCall::DeleteByColEq,
Expand All @@ -482,14 +483,7 @@ impl WasmInstanceEnv {
.delete_by_col_eq(&ctx, table_id.into(), col_id.into(), &value)?;
Ok(count.get())
},
);

DB_METRICS
.wasm_abi_call_duration_ns
.with_label_values(txn_type, db, reducer, syscall)
.observe(start.elapsed().as_nanos() as f64);

result
)
}

/// Deletes those rows, in the table identified by `table_id`,
Expand Down Expand Up @@ -622,9 +616,13 @@ impl WasmInstanceEnv {
let db = &ctx.database();
let reducer = &ctx.reducer_name().unwrap_or_default().to_owned();
let syscall = &AbiCall::IterByColEq;
let start = Instant::now();

let result = Self::cvt_ret(
let _guard = DB_METRICS
.wasm_abi_call_duration_sec
.with_label_values(txn_type, db, reducer, syscall)
.start_timer();

Self::cvt_ret(
caller,
"iter_by_col_eq",
AbiCall::IterByColEq,
Expand All @@ -645,14 +643,7 @@ impl WasmInstanceEnv {
// Insert the encoded + concatenated rows into a new buffer and return its id.
Ok(caller.data_mut().buffers.insert(data.into()))
},
);

DB_METRICS
.wasm_abi_call_duration_ns
.with_label_values(txn_type, db, reducer, syscall)
.observe(start.elapsed().as_nanos() as f64);

result
)
}

/// Start iteration on each row, as bytes, of a table identified by `table_id`.
Expand All @@ -669,9 +660,13 @@ impl WasmInstanceEnv {
let db = &ctx.database();
let reducer = &ctx.reducer_name().unwrap_or_default().to_owned();
let syscall = &AbiCall::IterStart;
let start = Instant::now();

let result = Self::cvt_ret(caller, "iter_start", AbiCall::IterStart, out, |mut caller, _mem| {
let _guard = DB_METRICS
.wasm_abi_call_duration_sec
.with_label_values(txn_type, db, reducer, syscall)
.start_timer();

Self::cvt_ret(caller, "iter_start", AbiCall::IterStart, out, |mut caller, _mem| {
// Retrieve the execution context for the current reducer.
let ctx = caller.data().reducer_context();

Expand All @@ -681,14 +676,7 @@ impl WasmInstanceEnv {
// Register the iterator and get back the index to write to `out`.
// Calls to the iterator are done through dynamic dispatch.
Ok(caller.data_mut().iters.insert(chunks.into_iter()))
});

DB_METRICS
.wasm_abi_call_duration_ns
.with_label_values(txn_type, db, reducer, syscall)
.observe(start.elapsed().as_nanos() as f64);

result
})
}

/// Like [`WasmInstanceEnv::iter_start`], start iteration on each row,
Expand Down Expand Up @@ -717,9 +705,13 @@ impl WasmInstanceEnv {
let db = &ctx.database();
let reducer = &ctx.reducer_name().unwrap_or_default().to_owned();
let syscall = &AbiCall::IterStartFiltered;
let start = Instant::now();

let result = Self::cvt_ret(
let _guard = DB_METRICS
.wasm_abi_call_duration_sec
.with_label_values(txn_type, db, reducer, syscall)
.start_timer();

Self::cvt_ret(
caller,
"iter_start_filtered",
AbiCall::IterStartFiltered,
Expand All @@ -741,14 +733,7 @@ impl WasmInstanceEnv {
// Calls to the iterator are done through dynamic dispatch.
Ok(caller.data_mut().iters.insert(chunks.into_iter()))
},
);

DB_METRICS
.wasm_abi_call_duration_ns
.with_label_values(txn_type, db, reducer, syscall)
.observe(start.elapsed().as_nanos() as f64);

result
)
}

/// Advances the registered iterator with the index given by `iter_key`.
Expand Down