Skip to content
This repository has been archived by the owner on Oct 18, 2023. It is now read-only.

primary idle shutdown #242

Merged
merged 5 commits into from
Feb 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 2 additions & 6 deletions sqld/src/http/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
pub mod auth;
mod services;
mod types;

use std::future::poll_fn;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;

use anyhow::Context;
use base64::prelude::BASE64_STANDARD_NO_PAD;
Expand All @@ -25,11 +23,10 @@ use tower_http::{compression::CompressionLayer, cors};
use tracing::{Level, Span};

use crate::error::Error;
use crate::http::services::idle_shutdown::IdleShutdownLayer;
use crate::http::types::HttpQuery;
use crate::query::{self, Params, Queries, Query, QueryResult, ResultSet};
use crate::query_analysis::{final_state, State, Statement};
use crate::SHUTDOWN;
use crate::utils::services::idle_shutdown::IdleShutdownLayer;

use self::auth::Authorizer;
use self::types::QueryObject;
Expand Down Expand Up @@ -271,7 +268,7 @@ pub async fn run_http<F>(
authorizer: Arc<dyn Authorizer + Send + Sync>,
db_factory: F,
enable_console: bool,
idle_shutdown: Option<Duration>,
idle_shutdown_layer: Option<IdleShutdownLayer>,
) -> anyhow::Result<()>
where
F: MakeService<(), Queries> + Send + 'static,
Expand All @@ -286,7 +283,6 @@ where
tracing::info!("listening for HTTP requests on {addr}");

let (sender, mut receiver) = mpsc::channel(1024);
let idle_shutdown_layer = idle_shutdown.map(|d| IdleShutdownLayer::new(d, SHUTDOWN.clone()));
fn trace_request<B>(req: &Request<B>, _span: &Span) {
tracing::info!("got request: {} {}", req.method(), req.uri());
}
Expand Down
28 changes: 19 additions & 9 deletions sqld/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use tokio::sync::Notify;
use tokio::task::JoinSet;
use tower::load::Constant;
use tower::ServiceExt;
use utils::services::idle_shutdown::IdleShutdownLayer;

use crate::error::Error;
use crate::postgres::service::PgConnectionFactory;
Expand All @@ -34,6 +35,7 @@ mod query_analysis;
mod replication;
pub mod rpc;
mod server;
mod utils;

#[derive(clap::ValueEnum, Clone, Debug, PartialEq)]
pub enum Backend {
Expand All @@ -50,8 +52,6 @@ type Result<T> = std::result::Result<T, Error>;
///
/// /!\ use with caution.
pub(crate) static HARD_RESET: Lazy<Arc<Notify>> = Lazy::new(|| Arc::new(Notify::new()));
/// Clean shutdown of the server.
pub(crate) static SHUTDOWN: Lazy<Arc<Notify>> = Lazy::new(|| Arc::new(Notify::new()));

#[cfg(feature = "mwal_backend")]
pub(crate) static VWAL_METHODS: OnceCell<
Expand Down Expand Up @@ -87,6 +87,7 @@ async fn run_service(
service: DbFactoryService,
config: &Config,
join_set: &mut JoinSet<anyhow::Result<()>>,
idle_shutdown_layer: Option<IdleShutdownLayer>,
) -> anyhow::Result<()> {
let mut server = Server::new();

Expand All @@ -109,7 +110,7 @@ async fn run_service(
authorizer,
service.map_response(|s| Constant::new(s, 1)),
config.enable_http_console,
config.idle_shutdown_timeout,
idle_shutdown_layer,
));
}

Expand Down Expand Up @@ -137,6 +138,7 @@ async fn start_primary(
config: &Config,
join_set: &mut JoinSet<anyhow::Result<()>>,
addr: &str,
idle_shutdown_layer: Option<IdleShutdownLayer>,
) -> anyhow::Result<()> {
let (factory, handle) = WriteProxyDbFactory::new(
addr,
Expand All @@ -155,14 +157,15 @@ async fn start_primary(
join_set.spawn(async move { handle.await.expect("WriteProxy DB task failed") });

let service = DbFactoryService::new(Arc::new(factory));
run_service(service, config, join_set).await?;
run_service(service, config, join_set, idle_shutdown_layer).await?;

Ok(())
}

async fn start_replica(
config: &Config,
join_set: &mut JoinSet<anyhow::Result<()>>,
idle_shutdown_layer: Option<IdleShutdownLayer>,
) -> anyhow::Result<()> {
let logger = Arc::new(ReplicationLogger::open(&config.db_path)?);
let logger_clone = logger.clone();
Expand All @@ -183,10 +186,11 @@ async fn start_replica(
config.rpc_server_ca_cert.clone(),
db_factory,
logger_clone,
idle_shutdown_layer.clone(),
));
}

run_service(service, config, join_set).await?;
run_service(service, config, join_set, idle_shutdown_layer).await?;

Ok(())
}
Expand Down Expand Up @@ -233,20 +237,26 @@ pub async fn run_server(config: Config) -> anyhow::Result<()> {
}
let mut join_set = JoinSet::new();

let shutdown_notify: Arc<Notify> = Arc::new(Notify::new());
let idle_shutdown_layer = config
.idle_shutdown_timeout
.map(|d| IdleShutdownLayer::new(d, shutdown_notify.clone()));

match config.writer_rpc_addr {
Some(ref addr) => start_primary(&config, &mut join_set, addr).await?,
None => start_replica(&config, &mut join_set).await?,
Some(ref addr) => {
start_primary(&config, &mut join_set, addr, idle_shutdown_layer).await?
}
None => start_replica(&config, &mut join_set, idle_shutdown_layer).await?,
}

let reset = HARD_RESET.clone();
let shutdown = SHUTDOWN.clone();
loop {
tokio::select! {
_ = reset.notified() => {
hard_reset(&config, join_set).await?;
break;
},
_ = shutdown.notified() => {
_ = shutdown_notify.notified() => {
return Ok(())
}
Some(res) = join_set.join_next() => {
Expand Down
5 changes: 5 additions & 0 deletions sqld/src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,20 @@ use anyhow::Context;
use std::net::SocketAddr;
use std::path::PathBuf;
use std::sync::Arc;
use tower::util::option_layer;

use crate::database::service::DbFactory;
use crate::replication::logger::ReplicationLogger;
use crate::rpc::proxy::rpc::proxy_server::ProxyServer;
use crate::rpc::proxy::ProxyService;
use crate::rpc::replication_log::rpc::replication_log_server::ReplicationLogServer;
use crate::rpc::replication_log::ReplicationLogService;
use crate::utils::services::idle_shutdown::IdleShutdownLayer;

pub mod proxy;
pub mod replication_log;

#[allow(clippy::too_many_arguments)]
pub async fn run_rpc_server(
addr: SocketAddr,
tls: bool,
Expand All @@ -21,6 +24,7 @@ pub async fn run_rpc_server(
ca_cert_path: Option<PathBuf>,
factory: Arc<dyn DbFactory>,
logger: Arc<ReplicationLogger>,
idle_shutdown_layer: Option<IdleShutdownLayer>,
) -> anyhow::Result<()> {
let proxy_service = ProxyService::new(factory);
let logger_service = ReplicationLogService::new(logger);
Expand All @@ -44,6 +48,7 @@ pub async fn run_rpc_server(
.context("Failed to read the TSL config of RPC server")?;
}
builder
.layer(&option_layer(idle_shutdown_layer))
.add_service(ProxyServer::new(proxy_service))
.add_service(ReplicationLogServer::new(logger_service))
.serve(addr)
Expand Down
1 change: 1 addition & 0 deletions sqld/src/utils/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod services;
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use tokio::sync::{watch, Notify};
use tokio::time::timeout;
use tower::{Layer, Service};

#[derive(Clone)]
pub struct IdleShutdownLayer {
watcher: Arc<watch::Sender<()>>,
}
Expand Down