Skip to content

Commit

Permalink
bug: handle duplicate keys in batch_upload_items for mysql (#873)
Browse files Browse the repository at this point in the history
* bug: handle duplicate keys in batch_upload_items for mysql

Closes #827
  • Loading branch information
jrconlin authored Oct 29, 2020
1 parent c6ea474 commit 2d6039f
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 50 deletions.
85 changes: 48 additions & 37 deletions src/db/mysql/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,79 +172,90 @@ pub fn do_append(
db: &MysqlDb,
batch_id: i64,
user_id: HawkIdentifier,
collection_id: i32,
_collection_id: i32,
bsos: Vec<params::PostCollectionBso>,
) -> Result<()> {
fn exist_idx(collection_id: i32, batch_id: i64, bso_id: &str) -> String {
fn exist_idx(user_id: u64, batch_id: i64, bso_id: &str) -> String {
// Construct something that matches the key for batch_upload_items
format!(
"{collection_id}::{batch_id}::{bso_id}",
collection_id = collection_id,
"{batch_id}-{user_id}-{bso_id}",
batch_id = batch_id,
user_id = user_id,
bso_id = bso_id,
)
};

// It's possible for the list of items to contain a duplicate key entry.
// This means that we can't really call `ON DUPLICATE` here, because that's
// more about inserting one item at a time. (e.g. it works great if the
// values contain a key that's already in the database, less so if the
// the duplicate is in the value set we're inserting.
#[derive(Debug, QueryableByName)]
#[table_name = "batch_upload_items"]
struct ExistsResult {
#[sql_type = "Integer"]
batch: i32,
#[sql_type = "Integer"]
id: i32,
user_id: i64,
batch_id: i64,
id: String,
};

/* This is not the most efficient way to do this using pure MySQL.
It would be more efficient to create an "INSERT ... ON DUPLICATE UPDATE"
query. This *does* match how spanner works, and may be easier to keep
in mind.
*/
#[derive(AsChangeset)]
#[table_name = "batch_upload_items"]
struct UpdateBatches {
payload: Option<String>,
payload_size: Option<i64>,
ttl_offset: Option<i32>,
}

let mut existing = HashSet::new();
let mut inserts: Vec<_> = Vec::new();

for item in sql_query("SELECT batch, id FROM batch_upload_items where userid=@userid;")
.bind::<BigInt, _>(user_id.legacy_id as i64)
.get_results::<ExistsResult>(&db.conn)?
// pre-load the "existing" hashset with any batched uploads that are already in the table.
for item in sql_query(
"SELECT userid as user_id, batch as batch_id, id FROM batch_upload_items WHERE userid=? AND batch=?;",
)
.bind::<BigInt, _>(user_id.legacy_id as i64)
.bind::<BigInt, _>(batch_id)
.get_results::<ExistsResult>(&db.conn)?
{
existing.insert(exist_idx(
collection_id,
item.batch as i64,
user_id.legacy_id,
item.batch_id,
&item.id.to_string(),
));
}

for bso in bsos {
let payload_size = bso.payload.as_ref().map(|p| p.len() as i64);
let exist_idx = exist_idx(collection_id, batch_id, &bso.id);
let exist_idx = exist_idx(user_id.legacy_id, batch_id, &bso.id);

if existing.contains(&exist_idx) {
diesel::update(
batch_upload_items::table
.filter(batch_upload_items::user_id.eq(user_id.legacy_id as i64))
.filter(batch_upload_items::batch_id.eq(batch_id)),
)
.set((
batch_upload_items::payload.eq(bso.payload),
batch_upload_items::payload_size.eq(payload_size),
batch_upload_items::ttl_offset.eq(bso.ttl.map(|ttl| ttl as i32)),
))
.set(&UpdateBatches {
payload: bso.payload,
payload_size,
ttl_offset: bso.ttl.map(|ttl| ttl as i32),
})
.execute(&db.conn)?;
} else {
inserts.push((
batch_upload_items::batch_id.eq(&batch_id),
batch_upload_items::user_id.eq(user_id.legacy_id as i64),
batch_upload_items::id.eq(bso.id.clone()),
batch_upload_items::sortindex.eq(bso.sortindex),
batch_upload_items::payload.eq(bso.payload),
batch_upload_items::payload_size.eq(payload_size),
batch_upload_items::ttl_offset.eq(bso.ttl.map(|ttl| ttl as i32)),
));
diesel::insert_into(batch_upload_items::table)
.values((
batch_upload_items::batch_id.eq(&batch_id),
batch_upload_items::user_id.eq(user_id.legacy_id as i64),
batch_upload_items::id.eq(bso.id.clone()),
batch_upload_items::sortindex.eq(bso.sortindex),
batch_upload_items::payload.eq(bso.payload),
batch_upload_items::payload_size.eq(payload_size),
batch_upload_items::ttl_offset.eq(bso.ttl.map(|ttl| ttl as i32)),
))
.execute(&db.conn)?;
// make sure to include the key into our table check.
existing.insert(exist_idx);
}
}

insert_into(batch_upload_items::table)
.values(inserts)
.execute(&db.conn)?;
Ok(())
}

Expand Down
41 changes: 28 additions & 13 deletions src/db/tests/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,35 +245,46 @@ async fn quota_test_append_batch() -> Result<()> {
#[tokio::test]
async fn test_append_async_w_null() -> Result<()> {
let settings = crate::settings::test_settings();
if !settings.uses_spanner() {
dbg!("### Skipping test for mysql");
return Ok(());
}
let pool = db_pool(Some(settings)).await?;
let db = test_db(pool.as_ref()).await?;
let ttl = crate::db::util::ms_since_epoch() as u32;
let ttl_0 = crate::db::util::ms_since_epoch() as u32 + 10_000;
let ttl_1 = ttl_0 + (86_400 * 2);
let bid_0 = "b0";
let bid_1 = "b1";

let uid = 1;
let coll = "clients";
let payload = "payload 0";
let first_bso = pbso(uid, coll, "b0", Some(payload), Some(10), Some(ttl + 10_000));
let payload_0 = "payload 0";
let payload_1 = "payload 1";
let first_bso = pbso(uid, coll, bid_0, Some(payload_0), Some(10), Some(ttl_0));
db.put_bso(first_bso).await?;
let second_bso = pbso(uid, coll, bid_1, Some(payload_1), Some(10), Some(ttl_1));
db.put_bso(second_bso).await?;

let tomorrow = ttl + 20_000;
let tomorrow = ttl_0 + 86_400;
let new_batch = db.create_batch(cb(uid, coll, vec![])).await?;
// update the single bso twice, leaving payload the same.
db.append_to_batch(ab(
uid,
coll,
new_batch.clone(),
vec![postbso("b0", None, Some(15), None)],
vec![postbso(bid_0, None, Some(15), None)],
))
.await?;
db.append_to_batch(ab(
uid,
coll,
new_batch.clone(),
vec![postbso(bid_0, None, None, Some(tomorrow))],
))
.await?;

// update the second bso to ensure that the first is unaltered
db.append_to_batch(ab(
uid,
coll,
new_batch.clone(),
vec![postbso("b0", None, None, Some(tomorrow))],
vec![postbso(bid_1, None, Some(20), None)],
))
.await?;

Expand All @@ -287,10 +298,14 @@ async fn test_append_async_w_null() -> Result<()> {
batch,
})
.await?;
let bso = db.get_bso(gbso(uid, coll, "b0")).await?.unwrap();
let bso_0 = db.get_bso(gbso(uid, coll, bid_0)).await?.unwrap();
let bso_1 = db.get_bso(gbso(uid, coll, bid_1)).await?.unwrap();

assert!(bso_0.payload == payload_0);
assert!(bso_0.sortindex == Some(15));

assert!(bso.payload == payload);
assert!(bso.sortindex == Some(15));
assert!(bso_1.payload == payload_1);
assert!(bso_1.sortindex == Some(20));

Ok(())
}

0 comments on commit 2d6039f

Please sign in to comment.