Skip to content

Commit

Permalink
refactor: Convert the rest of the spanner module to async await
Browse files Browse the repository at this point in the history
This removes usage of block() from the spanner backend implementation, even in test code.

Fix #462
  • Loading branch information
fzzzy committed Mar 11, 2020
1 parent fed42e2 commit e2017bb
Showing 1 changed file with 26 additions and 27 deletions.
53 changes: 26 additions & 27 deletions src/db/spanner/models.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
#[cfg(test)]
use actix_web::web::block;
use futures::compat::Future01CompatExt;
use futures::future::TryFutureExt;

Expand Down Expand Up @@ -1524,20 +1522,20 @@ impl SpannerDb {
Ok(result)
}

// NOTE: Currently this put_bso_sync impl. is only used during db tests,
// NOTE: Currently this put_bso_async_test impl. is only used during db tests,
// see above for the non-tests version
#[cfg(test)]
pub fn put_bso_sync(&self, bso: params::PutBso) -> Result<results::PutBso> {
pub async fn put_bso_async_test(&self, bso: params::PutBso) -> Result<results::PutBso> {
use crate::db::util::to_rfc3339;
let collection_id = self.get_or_create_collection_id(&bso.collection)?;
let collection_id = self.get_or_create_collection_id_async(&bso.collection).await?;
let mut sqlparams = params! {
"fxa_uid" => bso.user_id.fxa_uid.clone(),
"fxa_kid" => bso.user_id.fxa_kid.clone(),
"collection_id" => collection_id.to_string(),
"bso_id" => bso.id.to_string(),
};
let mut sqltypes = HashMap::new();
let touch = self.touch_collection(&bso.user_id, collection_id)?;
let touch = self.touch_collection_async(&bso.user_id, collection_id).await?;
let timestamp = self.timestamp()?;

let result = self
Expand All @@ -1550,8 +1548,8 @@ impl SpannerDb {
AND bso_id = @bso_id",
)?
.params(sqlparams.clone())
.execute(&self.conn)?
.one_or_none()?;
.execute_async(&self.conn)?
.one_or_none().await?;
let exists = result.is_some();

let sql = if exists {
Expand Down Expand Up @@ -1656,7 +1654,7 @@ impl SpannerDb {
let now_millis = timestamp.as_i64();
let ttl = bso.ttl.map_or(i64::from(DEFAULT_BSO_TTL), |ttl| {
ttl.try_into()
.expect("Could not get ttl in put_bso_sync (test)")
.expect("Could not get ttl in put_bso_async_test")
}) * 1000;
let expirystring = to_rfc3339(now_millis + ttl)?;
debug!(
Expand All @@ -1674,16 +1672,16 @@ impl SpannerDb {
self.sql(&sql)?
.params(sqlparams)
.param_types(sqltypes)
.execute_dml(&self.conn)?;
.execute_dml_async(&self.conn).await?;

Ok(touch)
}

// NOTE: Currently this post_bso_sync impl. is only used during db tests,
// NOTE: Currently this post_bso_async_test impl. is only used during db tests,
// see above for the non-tests version
#[cfg(test)]
pub fn post_bsos_sync(&self, input: params::PostBsos) -> Result<results::PostBsos> {
let collection_id = self.get_or_create_collection_id(&input.collection)?;
pub async fn post_bsos_async_test(&self, input: params::PostBsos) -> Result<results::PostBsos> {
let collection_id = self.get_or_create_collection_id_async(&input.collection).await?;
let mut result = results::PostBsos {
modified: self.timestamp()?,
success: Default::default(),
Expand All @@ -1692,17 +1690,17 @@ impl SpannerDb {

for pbso in input.bsos {
let id = pbso.id;
self.put_bso_sync(params::PutBso {
self.put_bso_async(params::PutBso {
user_id: input.user_id.clone(),
collection: input.collection.clone(),
id: id.clone(),
payload: pbso.payload,
sortindex: pbso.sortindex,
ttl: pbso.ttl,
})?;
}).await?;
result.success.push(id);
}
self.touch_collection(&input.user_id, collection_id)?;
self.touch_collection_async(&input.user_id, collection_id).await?;
Ok(result)
}

Expand Down Expand Up @@ -1867,7 +1865,7 @@ impl Db for SpannerDb {
#[cfg(test)]
fn put_bso(&self, param: params::PutBso) -> DbFuture<results::PutBso> {
let db = self.clone();
Box::pin(block(move || db.put_bso_sync(param).map_err(Into::into)).map_err(Into::into))
Box::pin(async move { db.put_bso_async_test(param).map_err(Into::into).await })
}

#[cfg(not(test))]
Expand All @@ -1879,7 +1877,7 @@ impl Db for SpannerDb {
#[cfg(test)]
fn post_bsos(&self, param: params::PostBsos) -> DbFuture<results::PostBsos> {
let db = self.clone();
Box::pin(block(move || db.post_bsos_sync(param).map_err(Into::into)).map_err(Into::into))
Box::pin(async move { db.post_bsos_async_test(param).map_err(Into::into).await })
}

fn validate_batch_id(&self, id: String) -> Result<()> {
Expand Down Expand Up @@ -1914,25 +1912,26 @@ impl Db for SpannerDb {
#[cfg(test)]
fn get_collection_id(&self, name: String) -> DbFuture<i32> {
let db = self.clone();
Box::pin(block(move || db.get_collection_id(&name).map_err(Into::into)).map_err(Into::into))
Box::pin(async move {
db.get_collection_id(&name).map_err(Into::into).await
})
}

#[cfg(test)]
fn create_collection(&self, name: String) -> DbFuture<i32> {
let db = self.clone();
Box::pin(block(move || db.create_collection(&name).map_err(Into::into)).map_err(Into::into))
Box::pin(async move {
db.create_collection(&name).map_err(Into::into).await
})
}

#[cfg(test)]
fn touch_collection(&self, param: params::TouchCollection) -> DbFuture<SyncTimestamp> {
let db = self.clone();
Box::pin(
block(move || {
db.touch_collection(&param.user_id, param.collection_id)
.map_err(Into::into)
})
.map_err(Into::into),
)
Box::pin(async move {
db.touch_collection(&param.user_id, param.collection_id)
.map_err(Into::into)
})
}

#[cfg(test)]
Expand Down

0 comments on commit e2017bb

Please sign in to comment.