Skip to content

Commit

Permalink
storage/pg: set statement_timeout when snapshotting
Browse files Browse the repository at this point in the history
Closes #16123
  • Loading branch information
Sean Loiselle committed Aug 14, 2023
1 parent d9b4416 commit 7b4e48d
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 0 deletions.
26 changes: 26 additions & 0 deletions src/storage/src/source/postgres/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ use std::any::Any;
use std::collections::{BTreeMap, BTreeSet};
use std::pin::pin;
use std::rc::Rc;
use std::str::FromStr;

use differential_dataflow::{AsCollection, Collection};
use futures::TryStreamExt;
Expand Down Expand Up @@ -269,6 +270,31 @@ pub(crate) fn render<G: Scope<Timestamp = MzOffset>>(
connection_config.connect(&task_name).await?
};

// Configure statement_timeout based on param. We want to be able to
// override the server value here in case it's set too low,
// respective to the size of the data we need to copy.
//
// Value is known to accept milliseconds w/o units.
// https://www.postgresql.org/docs/current/runtime-config-client.html
client.simple_query(
&format!("SET statement_timeout = {}", config.params.pg_source_snapshot_statement_timeout.as_millis())
).await?;

mz_ore::soft_assert!{{
let row = simple_query_opt(&client, "SHOW statement_timeout;")
.await?
.unwrap();
let timeout = row.get("statement_timeout").unwrap().to_owned();

// This only needs to be compatible for values we test; doesn't
// need to generalize all possible interval/duration mappings.
mz_repr::adt::interval::Interval::from_str(&timeout)
.map(|i| i.duration())
.unwrap()
.unwrap()
== config.params.pg_source_snapshot_statement_timeout
}, "SET statement_timeout in PG snapshot did not take effect"};

let (snapshot, snapshot_lsn) = loop {
match snapshot_input.next_mut().await {
Some(AsyncEvent::Data(_, data)) => break data.pop().expect("snapshot sent above"),
Expand Down
8 changes: 8 additions & 0 deletions test/pg-cdc/pg-cdc.td
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,11 @@ CREATE PUBLICATION another_publication FOR TABLE another_schema.another_table;
#
$ postgres-verify-slot connection=postgres://postgres:postgres@postgres slot=materialize_% active=false

# Sneak in a test for pg_source_snapshot_statement_timeout
$ postgres-connect name=mz_system url=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
$ postgres-execute connection=mz_system
ALTER SYSTEM SET pg_source_snapshot_statement_timeout = 1000

> CREATE SOURCE "test_slot_source"
FROM POSTGRES CONNECTION pg (PUBLICATION 'mz_source')
FOR TABLES ("pk_table");
Expand All @@ -152,6 +157,9 @@ test_slot_source_progress progress

$ postgres-verify-slot connection=postgres://postgres:postgres@postgres slot=materialize_% active=false

$ postgres-execute connection=mz_system
ALTER SYSTEM SET pg_source_snapshot_statement_timeout = 0

#
# Error checking
#
Expand Down

0 comments on commit 7b4e48d

Please sign in to comment.