Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tolerate a non-primary shard being down during startup #2727

Merged
merged 8 commits into from
Aug 27, 2021
2 changes: 2 additions & 0 deletions docs/environment-variables.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,3 +156,5 @@ those.
given the other load management configuration settings, but never
actually decline to run a query, instead log about load management
decisions. Set to `true` to turn simulation on, defaults to `false`
- `GRAPH_STORE_CONNECTION_TIMEOUT`: How long to wait to connect to a
database before assuming the database is down in ms. Defaults to 5000ms.
10 changes: 9 additions & 1 deletion graph/src/components/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -785,6 +785,8 @@ pub enum StoreError {
FulltextSearchNonDeterministic,
#[error("operation was canceled")]
Canceled,
#[error("database unavailable")]
DatabaseUnavailable,
}

// Convenience to report a constraint violation
Expand Down Expand Up @@ -828,6 +830,12 @@ impl From<QueryExecutionError> for StoreError {
}
}

impl From<std::fmt::Error> for StoreError {
fn from(e: std::fmt::Error) -> Self {
StoreError::Unknown(anyhow!("{}", e.to_string()))
}
}

pub struct StoredDynamicDataSource {
pub name: String,
pub source: Source,
Expand Down Expand Up @@ -1341,7 +1349,7 @@ pub trait QueryStore: Send + Sync {

fn block_number(&self, block_hash: H256) -> Result<Option<BlockNumber>, StoreError>;

fn wait_stats(&self) -> &PoolWaitStats;
fn wait_stats(&self) -> PoolWaitStats;

/// If `block` is `None`, assumes the latest block.
async fn has_non_fatal_errors(&self, block: Option<BlockNumber>) -> Result<bool, StoreError>;
Expand Down
4 changes: 2 additions & 2 deletions graphql/src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ where
)?;
self.load_manager
.decide(
store.wait_stats(),
&store.wait_stats(),
query.shape_hash,
query.query_text.as_ref(),
)
Expand Down Expand Up @@ -268,7 +268,7 @@ where
if let Err(err) = self
.load_manager
.decide(
store.wait_stats(),
&store.wait_stats(),
query.shape_hash,
query.query_text.as_ref(),
)
Expand Down
1 change: 1 addition & 0 deletions node/src/bin/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,7 @@ impl Context {
PRIMARY_SHARD.as_str(),
primary,
self.registry,
Arc::new(vec![]),
)
}

Expand Down
47 changes: 34 additions & 13 deletions node/src/store_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use graph::{
prelude::{info, CheapClone, EthereumNetworkIdentifier, Logger},
util::security::SafeDisplay,
};
use graph_store_postgres::connection_pool::ConnectionPool;
use graph_store_postgres::connection_pool::{ConnectionPool, ForeignServer};
use graph_store_postgres::{
BlockStore as DieselBlockStore, ChainHeadUpdateListener as PostgresChainHeadUpdateListener,
Shard as ShardName, Store as DieselStore, SubgraphStore, SubscriptionManager, PRIMARY_SHARD,
Expand Down Expand Up @@ -45,15 +45,11 @@ impl StoreBuilder {
let (store, pools) =
Self::make_subgraph_store_and_pools(logger, node, config, registry.cheap_clone());

// Perform setup for all the pools
let details = pools
.values()
.map(|pool| pool.connection_detail())
.collect::<Result<Vec<_>, _>>()
.expect("connection url's contain enough detail");
let details = Arc::new(details);

join_all(pools.iter().map(|(_, pool)| pool.setup(details.clone()))).await;
// Try to perform setup (migrations etc.) for all the pools. If this
// attempt doesn't work for all of them because the database is
// unavailable, they will try again later in the normal course of
// using the pool
join_all(pools.iter().map(|(_, pool)| async move { pool.setup() })).await;

let chains = HashMap::from_iter(config.chains.chains.iter().map(|(name, chain)| {
let shard = ShardName::new(chain.shard.to_string())
Expand Down Expand Up @@ -86,15 +82,36 @@ impl StoreBuilder {
config: &Config,
registry: Arc<dyn MetricsRegistry>,
) -> (Arc<SubgraphStore>, HashMap<ShardName, ConnectionPool>) {
let servers = config
.stores
.iter()
.map(|(name, shard)| ForeignServer::new_from_raw(name.to_string(), &shard.connection))
.collect::<Result<Vec<_>, _>>()
.expect("connection url's contain enough detail");
let servers = Arc::new(servers);

let shards: Vec<_> = config
.stores
.iter()
.map(|(name, shard)| {
let logger = logger.new(o!("shard" => name.to_string()));
let conn_pool = Self::main_pool(&logger, node, name, shard, registry.cheap_clone());
let conn_pool = Self::main_pool(
&logger,
node,
name,
shard,
registry.cheap_clone(),
servers.clone(),
);

let (read_only_conn_pools, weights) =
Self::replica_pools(&logger, node, name, shard, registry.cheap_clone());
let (read_only_conn_pools, weights) = Self::replica_pools(
&logger,
node,
name,
shard,
registry.cheap_clone(),
servers.clone(),
);

let name =
ShardName::new(name.to_string()).expect("shard names have been validated");
Expand Down Expand Up @@ -162,6 +179,7 @@ impl StoreBuilder {
name: &str,
shard: &Shard,
registry: Arc<dyn MetricsRegistry>,
servers: Arc<Vec<ForeignServer>>,
) -> ConnectionPool {
let logger = logger.new(o!("pool" => "main"));
let pool_size = shard.pool_size.size_for(node, name).expect(&format!(
Expand All @@ -187,6 +205,7 @@ impl StoreBuilder {
Some(fdw_pool_size),
&logger,
registry.cheap_clone(),
servers,
)
}

Expand All @@ -197,6 +216,7 @@ impl StoreBuilder {
name: &str,
shard: &Shard,
registry: Arc<dyn MetricsRegistry>,
servers: Arc<Vec<ForeignServer>>,
) -> (Vec<ConnectionPool>, Vec<usize>) {
let mut weights: Vec<_> = vec![shard.weight];
(
Expand Down Expand Up @@ -226,6 +246,7 @@ impl StoreBuilder {
None,
&logger,
registry.cheap_clone(),
servers.clone(),
)
})
.collect(),
Expand Down
162 changes: 161 additions & 1 deletion store/postgres/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,16 @@ use diesel::sql_types::Integer;
use diesel::{connection::SimpleConnection, prelude::RunQueryDsl, select};
use diesel::{insert_into, OptionalExtension};
use diesel::{pg::PgConnection, sql_query};
use diesel::{sql_types::Text, ExpressionMethods, QueryDsl};
use diesel::{
sql_types::{Array, Nullable, Text},
ExpressionMethods, QueryDsl,
};
use std::collections::{HashMap, HashSet};
use std::fmt::Write;
use std::iter::FromIterator;
use std::sync::Arc;

use graph::prelude::anyhow::anyhow;
use graph::{data::subgraph::schema::POI_TABLE, prelude::StoreError};

use crate::connection_pool::ForeignServer;
Expand Down Expand Up @@ -135,6 +141,32 @@ pub fn current_servers(conn: &PgConnection) -> Result<Vec<String>, StoreError> {
.collect())
}

/// Return the options for the foreign server `name` as a map of option
/// names to values
pub fn server_options(
conn: &PgConnection,
name: &str,
) -> Result<HashMap<String, Option<String>>, StoreError> {
#[derive(QueryableByName)]
struct Srv {
#[sql_type = "Array<Text>"]
srvoptions: Vec<String>,
}
let entries = sql_query("select srvoptions from pg_foreign_server where srvname = $1")
.bind::<Text, _>(name)
.get_result::<Srv>(conn)?
.srvoptions
.into_iter()
.filter_map(|opt| {
let mut parts = opt.splitn(2, "=");
let key = parts.next();
let value = parts.next().map(|value| value.to_string());

key.map(|key| (key.to_string(), value))
});
Ok(HashMap::from_iter(entries))
}

pub fn has_namespace(conn: &PgConnection, namespace: &Namespace) -> Result<bool, StoreError> {
use pg_namespace as nsp;

Expand Down Expand Up @@ -163,6 +195,17 @@ pub fn drop_foreign_schema(conn: &PgConnection, src: &Site) -> Result<(), StoreE
Ok(())
}

/// Drop the schema `nsp` and all its contents if it exists, and create it
/// again so that `nsp` is an empty schema
pub fn recreate_schema(conn: &PgConnection, nsp: &str) -> Result<(), StoreError> {
let query = format!(
"drop schema if exists {nsp} cascade;\
create schema {nsp};",
nsp = nsp
);
Ok(conn.batch_execute(&query)?)
}

pub fn account_like(conn: &PgConnection, site: &Site) -> Result<HashSet<String>, StoreError> {
use table_stats as ts;
let names = ts::table
Expand Down Expand Up @@ -221,3 +264,120 @@ pub fn copy_account_like(conn: &PgConnection, src: &Site, dst: &Site) -> Result<
.bind::<Integer, _>(dst.id)
.execute(conn)?)
}

pub(crate) mod table_schema {
use super::*;

/// The name and data type for the column in a table. The data type is
/// in a form that it can be used in a `create table` statement
pub struct Column {
pub column_name: String,
pub data_type: String,
}

#[derive(QueryableByName)]
struct ColumnInfo {
#[sql_type = "Text"]
column_name: String,
#[sql_type = "Text"]
data_type: String,
#[sql_type = "Text"]
udt_name: String,
#[sql_type = "Text"]
udt_schema: String,
#[sql_type = "Nullable<Text>"]
elem_type: Option<String>,
}

impl From<ColumnInfo> for Column {
fn from(ci: ColumnInfo) -> Self {
// See description of `data_type` in
// https://www.postgresql.org/docs/current/infoschema-columns.html
let data_type = match ci.data_type.as_str() {
"ARRAY" => format!(
"{}[]",
ci.elem_type.expect("array columns have an elem_type")
),
"USER-DEFINED" => format!("{}.{}", ci.udt_schema, ci.udt_name),
_ => ci.data_type.clone(),
};
Self {
column_name: ci.column_name.clone(),
data_type,
}
}
}

pub fn columns(
conn: &PgConnection,
nsp: &str,
table_name: &str,
) -> Result<Vec<Column>, StoreError> {
const QUERY: &str = " \
select c.column_name::text, c.data_type::text,
c.udt_name::text, c.udt_schema::text, e.data_type::text as elem_type
from information_schema.columns c
left join information_schema.element_types e
on ((c.table_catalog, c.table_schema, c.table_name, 'TABLE', c.dtd_identifier)
= (e.object_catalog, e.object_schema, e.object_name, e.object_type, e.collection_type_identifier))
where c.table_schema = $1
and c.table_name = $2
order by c.ordinal_position";

Ok(sql_query(QUERY)
.bind::<Text, _>(nsp)
.bind::<Text, _>(table_name)
.get_results::<ColumnInfo>(conn)?
.into_iter()
.map(|ci| ci.into())
.collect())
}
}

/// Return a SQL statement to create the foreign table
/// `{dst_nsp}.{table_name}` for the server `server` which has the same
/// schema as the (local) table `{src_nsp}.{table_name}`
pub fn create_foreign_table(
conn: &PgConnection,
src_nsp: &str,
table_name: &str,
dst_nsp: &str,
server: &str,
) -> Result<String, StoreError> {
fn build_query(
columns: Vec<table_schema::Column>,
src_nsp: &str,
table_name: &str,
dst_nsp: &str,
server: &str,
) -> Result<String, std::fmt::Error> {
let mut query = String::new();
write!(
query,
"create foreign table \"{}\".\"{}\" (",
dst_nsp, table_name
)?;
for (idx, column) in columns.into_iter().enumerate() {
if idx > 0 {
write!(query, ", ")?;
}
write!(query, "\"{}\" {}", column.column_name, column.data_type)?;
}
writeln!(
query,
") server \"{}\" options(schema_name '{}');",
server, src_nsp
)?;
Ok(query)
}

let columns = table_schema::columns(conn, src_nsp, table_name)?;
let query = build_query(columns, src_nsp, table_name, dst_nsp, server).map_err(|_| {
anyhow!(
"failed to generate 'create foreign table' query for {}.{}",
dst_nsp,
table_name
)
})?;
Ok(query)
}
Loading