Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 13 additions & 7 deletions crates/core/src/host/host_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1063,13 +1063,19 @@ async fn metric_reporter(replica_ctx: Arc<ReplicaContext>) {
.with_label_values(&replica_ctx.database_identity);

loop {
let disk_usage = tokio::task::block_in_place(|| replica_ctx.total_disk_usage());
replica_ctx.update_gauges();
if let Some(num_bytes) = disk_usage.durability {
message_log_size.set(num_bytes as i64);
}
if let Some(num_bytes) = disk_usage.logs {
module_log_file_size.set(num_bytes as i64);
let ctx = replica_ctx.clone();
// We spawn a blocking task here because this grabs blocking locks.
let disk_usage_future = tokio::task::spawn_blocking(move || {
ctx.update_gauges();
ctx.total_disk_usage()
});
if let Ok(disk_usage) = disk_usage_future.await {
if let Some(num_bytes) = disk_usage.durability {
message_log_size.set(num_bytes as i64);
}
if let Some(num_bytes) = disk_usage.logs {
module_log_file_size.set(num_bytes as i64);
}
}
tokio::time::sleep(STORAGE_METERING_INTERVAL).await;
}
Expand Down
94 changes: 47 additions & 47 deletions crates/core/src/host/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,17 @@ use tokio::sync::mpsc;
use tokio::time::Instant;
use tokio_util::time::delay_queue::{self, DelayQueue, Expired};

use crate::db::datastore::locking_tx_datastore::MutTxId;
use crate::db::datastore::system_tables::{StFields, StScheduledFields, ST_SCHEDULED_ID};
use crate::db::datastore::traits::IsolationLevel;
use crate::db::relational_db::RelationalDB;
use crate::execution_context::Workload;

use super::module_host::ModuleEvent;
use super::module_host::ModuleFunctionCall;
use super::module_host::{CallReducerParams, WeakModuleHost};
use super::module_host::{DatabaseUpdate, EventStatus};
use super::{ModuleHost, ReducerArgs, ReducerCallError};
use crate::db::datastore::locking_tx_datastore::MutTxId;
use crate::db::datastore::system_tables::{StFields, StScheduledFields, ST_SCHEDULED_ID};
use crate::db::datastore::traits::IsolationLevel;
use crate::db::relational_db::RelationalDB;
use crate::execution_context::Workload;
use crate::util::asyncify;

#[derive(Copy, Clone, Eq, PartialEq, Hash)]
pub struct ScheduledReducerId {
Expand Down Expand Up @@ -398,16 +398,50 @@ impl SchedulerActor {
};
}

/// Handle repeated schedule by adding it back to queue
/// return true if it is repeated schedule
fn handle_repeated_schedule(
async fn delete_scheduled_reducer_row(
&mut self,
db: &RelationalDB,
id: ScheduledReducerId,
schedule_row: &RowRef<'_>,
) -> Result<bool, anyhow::Error> {
let schedule_at = read_schedule_at(schedule_row, id.at_column)?;
module_host: ModuleHost,
) {
let host_clone = module_host.clone();
let db = db.clone();
let schedule_at = asyncify(move || {
let mut tx = db.begin_mut_tx(IsolationLevel::Serializable, Workload::Internal);

match get_schedule_row_mut(&tx, &db, id) {
Ok(schedule_row) => {
if let Ok(schedule_at) = read_schedule_at(&schedule_row, id.at_column) {
// If the schedule is an interval, we handle it as a repeated schedule
if let ScheduleAt::Interval(_) = schedule_at {
return Some(schedule_at);
}
let row_ptr = schedule_row.pointer();
db.delete(&mut tx, id.table_id, [row_ptr]);

commit_and_broadcast_deletion_event(tx, host_clone);
} else {
log::debug!(
"Failed to read 'scheduled_at' from row: table_id {}, schedule_id {}",
id.table_id,
id.schedule_id
);
}
}
Err(_) => {
log::debug!(
"Table row corresponding to yield scheduler ID not found: table_id {}, scheduler_id {}",
id.table_id,
id.schedule_id
);
}
}
None
})
.await;

if let ScheduleAt::Interval(dur) = schedule_at {
// If this was repeated, we need to add it back to the queue.
if let Some(ScheduleAt::Interval(dur)) = schedule_at {
let key = self.queue.insert(
QueueItem::Id {
id,
Expand All @@ -416,40 +450,6 @@ impl SchedulerActor {
dur.to_duration().unwrap_or(Duration::ZERO),
);
self.key_map.insert(id, key);
Ok(true)
} else {
Ok(false)
}
}

async fn delete_scheduled_reducer_row(
&mut self,
db: &RelationalDB,
id: ScheduledReducerId,
module_host: ModuleHost,
) {
let mut tx = db.begin_mut_tx(IsolationLevel::Serializable, Workload::Internal);

match get_schedule_row_mut(&tx, db, id) {
Ok(schedule_row) => {
if let Ok(is_repeated) = self.handle_repeated_schedule(id, &schedule_row) {
if is_repeated {
return; // Do not delete entry for repeated reducer
}

let row_ptr = schedule_row.pointer();
db.delete(&mut tx, id.table_id, [row_ptr]);

commit_and_broadcast_deletion_event(tx, module_host);
}
}
Err(_) => {
log::debug!(
"Table row corresponding to yield scheduler ID not found: table_id {}, scheduler_id {}",
id.table_id,
id.schedule_id
);
}
}
}
}
Expand Down
Loading