Skip to content

Commit

Permalink
Add DB transactions when syncing
Browse files Browse the repository at this point in the history
  • Loading branch information
jeff-hughes committed Sep 30, 2021
1 parent 12b036b commit 8daa1b5
Showing 1 changed file with 54 additions and 37 deletions.
91 changes: 54 additions & 37 deletions src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pub struct SyncResult {
/// with this connection.
#[derive(Debug)]
pub struct Database {
path: PathBuf,
conn: Option<Connection>,
}

Expand All @@ -37,8 +38,9 @@ impl Database {
std::fs::create_dir_all(&db_path)
.with_context(|| "Unable to create subdirectory for database.")?;
db_path.push("data.db");
let conn = Connection::open(db_path)?;
let conn = Connection::open(&db_path)?;
let db_conn = Database {
path: db_path,
conn: Some(conn),
};
db_conn.create()?;
Expand Down Expand Up @@ -69,7 +71,7 @@ impl Database {
// go here first, before we update the version

// adding a column to capture episode guids
if db_version == Version::parse("1.2.1")? {
if db_version <= Version::parse("1.2.1")? {
conn.execute("ALTER TABLE episodes ADD COLUMN guid TEXT;", params![])
.expect("Could not run database migrations.");
}
Expand Down Expand Up @@ -172,26 +174,33 @@ impl Database {
/// Inserts a new podcast and list of podcast episodes into the
/// database.
pub fn insert_podcast(&self, podcast: PodcastNoId) -> Result<SyncResult> {
let conn = self.conn.as_ref().expect("Error connecting to database.");
let mut stmt = conn.prepare_cached(
"INSERT INTO podcasts (title, url, description, author,
let mut conn = Connection::open(&self.path).expect("Error connecting to database.");
let tx = conn.transaction()?;
// let conn = self.conn.as_ref().expect("Error connecting to database.");
{
let mut stmt = tx.prepare_cached(
"INSERT INTO podcasts (title, url, description, author,
explicit, last_checked)
VALUES (?, ?, ?, ?, ?, ?);",
)?;
stmt.execute(params![
podcast.title,
podcast.url,
podcast.description,
podcast.author,
podcast.explicit,
podcast.last_checked.timestamp()
])?;
)?;
stmt.execute(params![
podcast.title,
podcast.url,
podcast.description,
podcast.author,
podcast.explicit,
podcast.last_checked.timestamp()
])?;
}

let mut stmt = conn.prepare_cached("SELECT id FROM podcasts WHERE url = ?")?;
let pod_id = stmt.query_row::<i64, _, _>(params![podcast.url], |row| row.get(0))?;
let pod_id;
{
let mut stmt = tx.prepare_cached("SELECT id FROM podcasts WHERE url = ?")?;
pod_id = stmt.query_row::<i64, _, _>(params![podcast.url], |row| row.get(0))?;
}
let mut ep_ids = Vec::new();
for ep in podcast.episodes.iter().rev() {
let id = self.insert_episode(pod_id, ep)?;
let id = self.insert_episode(&tx, pod_id, ep)?;
let new_ep = NewEpisode {
id: id,
pod_id: pod_id,
Expand All @@ -201,6 +210,7 @@ impl Database {
};
ep_ids.push(new_ep);
}
tx.commit()?;

return Ok(SyncResult {
added: ep_ids,
Expand All @@ -209,9 +219,12 @@ impl Database {
}

/// Inserts a podcast episode into the database.
pub fn insert_episode(&self, podcast_id: i64, episode: &EpisodeNoId) -> Result<i64> {
let conn = self.conn.as_ref().expect("Error connecting to database.");

pub fn insert_episode(
&self,
conn: &Connection,
podcast_id: i64,
episode: &EpisodeNoId,
) -> Result<i64> {
let pubdate = episode.pubdate.map(|dt| dt.timestamp());

let mut stmt = conn.prepare_cached(
Expand Down Expand Up @@ -283,21 +296,23 @@ impl Database {
/// changed if necessary, and episodes are updated (modified episodes
/// are updated, new episodes are inserted).
pub fn update_podcast(&self, pod_id: i64, podcast: PodcastNoId) -> Result<SyncResult> {
let conn = self.conn.as_ref().expect("Error connecting to database.");
let mut stmt = conn.prepare_cached(
"UPDATE podcasts SET title = ?, url = ?, description = ?,
{
let conn = self.conn.as_ref().expect("Error connecting to database.");
let mut stmt = conn.prepare_cached(
"UPDATE podcasts SET title = ?, url = ?, description = ?,
author = ?, explicit = ?, last_checked = ?
WHERE id = ?;",
)?;
stmt.execute(params![
podcast.title,
podcast.url,
podcast.description,
podcast.author,
podcast.explicit,
podcast.last_checked.timestamp(),
pod_id,
])?;
)?;
stmt.execute(params![
podcast.title,
podcast.url,
podcast.description,
podcast.author,
podcast.explicit,
podcast.last_checked.timestamp(),
pod_id,
])?;
}

let result = self.update_episodes(pod_id, podcast.title, podcast.episodes)?;
return Ok(result);
Expand All @@ -317,8 +332,6 @@ impl Database {
podcast_title: String,
episodes: Vec<EpisodeNoId>,
) -> Result<SyncResult> {
let conn = self.conn.as_ref().expect("Error connecting to database.");

let old_episodes = self.get_episodes(podcast_id, true)?;
let mut old_ep_map = AHashMap::new();
for ep in old_episodes.iter() {
Expand All @@ -327,6 +340,9 @@ impl Database {
}
}

let mut conn = Connection::open(&self.path).expect("Error connecting to database.");
let tx = conn.transaction()?;

let mut insert_ep = Vec::new();
let mut update_ep = Vec::new();
for new_ep in episodes.iter().rev() {
Expand Down Expand Up @@ -371,7 +387,7 @@ impl Database {
match existing_id {
Some(id) => {
if update {
let mut stmt = conn.prepare_cached(
let mut stmt = tx.prepare_cached(
"UPDATE episodes SET title = ?, url = ?,
guid = ?, description = ?, pubdate = ?,
duration = ? WHERE id = ?;",
Expand All @@ -389,7 +405,7 @@ impl Database {
}
}
None => {
let id = self.insert_episode(podcast_id, new_ep)?;
let id = self.insert_episode(&tx, podcast_id, new_ep)?;
let new_ep = NewEpisode {
id: id,
pod_id: podcast_id,
Expand All @@ -401,6 +417,7 @@ impl Database {
}
}
}
tx.commit()?;
return Ok(SyncResult {
added: insert_ep,
updated: update_ep,
Expand Down

0 comments on commit 8daa1b5

Please sign in to comment.