Skip to content

Commit 8f6b0dc

Browse files
committed
store: Use REPEATABLE READ transaction for consistent dump
Wrap the dump's data-reading operations in a REPEATABLE READ READ ONLY transaction to get a consistent MVCC snapshot. This prevents head block vs entity data mismatches, cross-table inconsistency, and missing or phantom rows caused by concurrent indexing or pruning. Add a TransactionBuilder for PermittedConnection since diesel-async's TransactionBuilder requires TransactionManager = AnsiTransactionManager, which pool-wrapped connection types don't satisfy.
1 parent 2350b7a commit 8f6b0dc

File tree

2 files changed

+112
-5
lines changed

2 files changed

+112
-5
lines changed

store/postgres/src/deployment_store.rs

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -900,13 +900,26 @@ impl DeploymentStore {
900900

901901
pub(crate) async fn dump(&self, site: Arc<Site>, dir: PathBuf) -> Result<(), StoreError> {
902902
let mut conn = self.pool.get_permitted().await?;
903+
// Layout and index list are schema metadata — safe to load outside
904+
// the snapshot transaction
903905
let layout = self.layout(&mut conn, site.cheap_clone()).await?;
904-
let entity_count = crate::detail::entity_count(&mut conn, &site).await?;
905-
// Loading the IndexList should happen inside dump, but the
906-
// interface does not allow it; should be changed
907906
let index_list = IndexList::load(&mut conn, site.cheap_clone(), self.clone()).await?;
908-
layout
909-
.dump(&mut conn, index_list, dir, &site.network, entity_count)
907+
908+
// Use REPEATABLE READ to get a consistent MVCC snapshot for the
909+
// entire dump. All queries inside see the same database state,
910+
// regardless of concurrent indexing or pruning.
911+
conn.build_transaction()
912+
.repeatable_read()
913+
.read_only()
914+
.run(|conn| {
915+
async move {
916+
let entity_count = crate::detail::entity_count(conn, &site).await?;
917+
layout
918+
.dump(conn, index_list, dir, &site.network, entity_count)
919+
.await
920+
}
921+
.scope_boxed()
922+
})
910923
.await
911924
}
912925

store/postgres/src/pool/mod.rs

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use graph::slog::warn;
1919
use graph::util::timed_rw_lock::TimedMutex;
2020
use tokio::sync::OwnedSemaphorePermit;
2121

22+
use diesel_async::scoped_futures::ScopedBoxFuture;
2223
use std::collections::HashMap;
2324
use std::fmt::{self};
2425
use std::ops::{Deref, DerefMut};
@@ -65,6 +66,99 @@ impl DerefMut for PermittedConnection {
6566
}
6667
}
6768

69+
impl PermittedConnection {
70+
/// Build a transaction with custom isolation level and read mode.
71+
///
72+
/// This is analogous to `diesel_async::pg::TransactionBuilder` but
73+
/// works with the pool-wrapped connection type. The closure receives
74+
/// `&mut PermittedConnection`, keeping the full wrapper type available
75+
/// so callers can pass it to functions that expect `&mut AsyncPgConnection`
76+
/// (the pool alias, not the raw diesel type).
77+
pub fn build_transaction(&mut self) -> TransactionBuilder<'_> {
78+
TransactionBuilder::new(self)
79+
}
80+
}
81+
82+
/// Builder for a PostgreSQL transaction with configurable isolation level
83+
/// and read mode. Created via [`PermittedConnection::build_transaction`].
84+
///
85+
/// We can't use diesel-async's `TransactionBuilder` because it requires
86+
/// `C: AsyncConnection<TransactionManager = AnsiTransactionManager>`. Our
87+
/// connection types don't satisfy that: the blanket deref impl in
88+
/// diesel-async wraps the transaction manager at each deref level, so
89+
/// `Object<ConnectionManager>` gets `PoolTransactionManager<AnsiTransactionManager>`
90+
/// and `PermittedConnection` gets
91+
/// `PoolTransactionManager<PoolTransactionManager<AnsiTransactionManager>>`.
92+
/// Neither matches `AnsiTransactionManager`.
93+
#[must_use = "Transaction builder does nothing unless you call `run` on it"]
94+
pub struct TransactionBuilder<'a> {
95+
conn: &'a mut PermittedConnection,
96+
isolation_level: Option<&'static str>,
97+
read_only: bool,
98+
}
99+
100+
impl<'a> TransactionBuilder<'a> {
101+
fn new(conn: &'a mut PermittedConnection) -> Self {
102+
Self {
103+
conn,
104+
isolation_level: None,
105+
read_only: false,
106+
}
107+
}
108+
109+
/// Set the transaction isolation level to `REPEATABLE READ`.
110+
pub fn repeatable_read(mut self) -> Self {
111+
self.isolation_level = Some("REPEATABLE READ");
112+
self
113+
}
114+
115+
/// Set the transaction isolation level to `SERIALIZABLE`.
116+
pub fn serializable(mut self) -> Self {
117+
self.isolation_level = Some("SERIALIZABLE");
118+
self
119+
}
120+
121+
/// Make the transaction `READ ONLY`.
122+
pub fn read_only(mut self) -> Self {
123+
self.read_only = true;
124+
self
125+
}
126+
127+
/// Execute `f` inside the configured transaction. Commits on `Ok`,
128+
/// rolls back on `Err`.
129+
///
130+
/// The closure must return a `ScopedBoxFuture` (use `.scope_boxed()`
131+
/// from `ScopedFutureExt`).
132+
pub async fn run<'b, T, E, F>(self, f: F) -> Result<T, E>
133+
where
134+
F: for<'r> FnOnce(&'r mut PermittedConnection) -> ScopedBoxFuture<'b, 'r, Result<T, E>>
135+
+ Send
136+
+ 'a,
137+
T: 'b,
138+
E: From<diesel::result::Error> + 'b,
139+
{
140+
let mut sql = String::from("BEGIN TRANSACTION");
141+
if let Some(level) = self.isolation_level {
142+
sql.push_str(" ISOLATION LEVEL ");
143+
sql.push_str(level);
144+
}
145+
if self.read_only {
146+
sql.push_str(" READ ONLY");
147+
}
148+
self.conn.batch_execute(&sql).await?;
149+
match f(self.conn).await {
150+
Ok(value) => {
151+
self.conn.batch_execute("COMMIT").await?;
152+
Ok(value)
153+
}
154+
Err(e) => {
155+
self.conn.batch_execute("ROLLBACK").await.ok();
156+
Err(e)
157+
}
158+
}
159+
}
160+
}
161+
68162
/// The namespace under which the `PRIMARY_TABLES` are mapped into each
69163
/// shard
70164
pub(crate) const PRIMARY_PUBLIC: &str = "primary_public";

0 commit comments

Comments
 (0)