Skip to content

Commit 75c6e67

Browse files
authored
Views: Host interface for WASM modules (#3548)
# Description of Changes Host implementation to invoke `call_view` method. I also covers: 1. API `MutTxId::is_materialized`to check if existing view exisits and updated. 2. Update in readsets logic to remove stale views. 3. sql caller implmentation. # API and ABI breaking changes NA How complicated do you think these changes are? Grade on a scale from 1 to 5, where 1 is a trivial change, and 5 is a deep-reaching and complex change. 3
1 parent 004c5bb commit 75c6e67

38 files changed

+1562
-386
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/client-api-messages/src/energy.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -121,21 +121,21 @@ impl fmt::Debug for EnergyBalance {
121121
}
122122
}
123123

124-
/// A measure of energy representing the energy budget for a reducer.
124+
/// A measure of energy representing the energy budget for a reducer or any callable function.
125125
///
126126
/// In contrast to [`EnergyQuanta`], this is represented by a 64-bit integer. This makes energy handling
127127
/// for reducers easier, while still providing a unlikely-to-ever-be-reached maximum value (e.g. for wasmtime:
128128
/// `(u64::MAX eV / 1000 eV/instruction) * 3 ns/instruction = 640 days`)
129129
#[derive(Copy, Clone, From, Add, Sub)]
130-
pub struct ReducerBudget(u64);
130+
pub struct FunctionBudget(u64);
131131

132-
impl ReducerBudget {
132+
impl FunctionBudget {
133133
// 1 second of wasm runtime is roughly 2 TeV, so this is
134134
// roughly 1 minute of wasm runtime
135-
pub const DEFAULT_BUDGET: Self = ReducerBudget(120_000_000_000_000);
135+
pub const DEFAULT_BUDGET: Self = FunctionBudget(120_000_000_000_000);
136136

137-
pub const ZERO: Self = ReducerBudget(0);
138-
pub const MAX: Self = ReducerBudget(u64::MAX);
137+
pub const ZERO: Self = FunctionBudget(0);
138+
pub const MAX: Self = FunctionBudget(u64::MAX);
139139

140140
pub fn new(v: u64) -> Self {
141141
Self(v)
@@ -151,13 +151,13 @@ impl ReducerBudget {
151151
}
152152
}
153153

154-
impl From<ReducerBudget> for EnergyQuanta {
155-
fn from(value: ReducerBudget) -> Self {
154+
impl From<FunctionBudget> for EnergyQuanta {
155+
fn from(value: FunctionBudget) -> Self {
156156
EnergyQuanta::new(value.0.into())
157157
}
158158
}
159159

160-
impl fmt::Debug for ReducerBudget {
160+
impl fmt::Debug for FunctionBudget {
161161
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
162162
f.debug_tuple("ReducerBudget")
163163
.field(&EnergyQuanta::from(*self))

crates/client-api/src/lib.rs

Lines changed: 46 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -78,57 +78,52 @@ impl Host {
7878

7979
let (tx_offset, durable_offset, json) = self
8080
.host_controller
81-
.using_database(
82-
database,
83-
self.replica_id,
84-
move |db| -> axum::response::Result<_, (StatusCode, String)> {
85-
tracing::info!(sql = body);
86-
87-
// We need a header for query results
88-
let mut header = vec![];
89-
90-
let sql_start = std::time::Instant::now();
91-
let sql_span =
92-
tracing::trace_span!("execute_sql", total_duration = tracing::field::Empty,).entered();
93-
94-
let result = sql::execute::run(
95-
// Returns an empty result set for mutations
96-
db,
97-
&body,
98-
auth,
99-
Some(&module_host.info().subscriptions),
100-
&mut header,
101-
)
102-
.map_err(|e| {
103-
log::warn!("{e}");
104-
if let Some(auth_err) = e.get_auth_error() {
105-
(StatusCode::UNAUTHORIZED, auth_err.to_string())
106-
} else {
107-
(StatusCode::BAD_REQUEST, e.to_string())
108-
}
109-
})?;
110-
111-
let total_duration = sql_start.elapsed();
112-
sql_span.record("total_duration", tracing::field::debug(total_duration));
113-
114-
// Turn the header into a `ProductType`
115-
let schema = header
116-
.into_iter()
117-
.map(|(col_name, col_type)| ProductTypeElement::new(col_type, Some(col_name)))
118-
.collect();
119-
120-
Ok((
121-
result.tx_offset,
122-
db.durable_tx_offset(),
123-
vec![SqlStmtResult {
124-
schema,
125-
rows: result.rows,
126-
total_duration_micros: total_duration.as_micros() as u64,
127-
stats: SqlStmtStats::from_metrics(&result.metrics),
128-
}],
129-
))
130-
},
131-
)
81+
.using_database(database, self.replica_id, move |db| async move {
82+
tracing::info!(sql = body);
83+
let mut header = vec![];
84+
let sql_start = std::time::Instant::now();
85+
let sql_span = tracing::trace_span!("execute_sql", total_duration = tracing::field::Empty,);
86+
let _guard = sql_span.enter();
87+
88+
let result = sql::execute::run(
89+
&db,
90+
&body,
91+
auth,
92+
Some(&module_host.info.subscriptions),
93+
Some(&module_host),
94+
auth.caller,
95+
&mut header,
96+
)
97+
.await
98+
.map_err(|e| {
99+
log::warn!("{e}");
100+
if let Some(auth_err) = e.get_auth_error() {
101+
(StatusCode::UNAUTHORIZED, auth_err.to_string())
102+
} else {
103+
(StatusCode::BAD_REQUEST, e.to_string())
104+
}
105+
})?;
106+
107+
let total_duration = sql_start.elapsed();
108+
drop(_guard);
109+
sql_span.record("total_duration", tracing::field::debug(total_duration));
110+
111+
let schema = header
112+
.into_iter()
113+
.map(|(col_name, col_type)| ProductTypeElement::new(col_type, Some(col_name)))
114+
.collect();
115+
116+
Ok::<_, (StatusCode, String)>((
117+
result.tx_offset,
118+
db.durable_tx_offset(),
119+
vec![SqlStmtResult {
120+
schema,
121+
rows: result.rows,
122+
total_duration_micros: total_duration.as_micros() as u64,
123+
stats: SqlStmtStats::from_metrics(&result.metrics),
124+
}],
125+
))
126+
})
132127
.await
133128
.map_err(log_and_500)??;
134129

@@ -154,7 +149,6 @@ impl Host {
154149
.await
155150
}
156151
}
157-
158152
/// Parameters for publishing a database.
159153
///
160154
/// See [`ControlStateDelegate::publish_database`].

0 commit comments

Comments
 (0)