Skip to content

Commit

Permalink
Consoldate configuration validation to the cfg module
Browse files Browse the repository at this point in the history
  • Loading branch information
Daniel Lyne committed Jul 27, 2022
1 parent 55f4569 commit 1461f18
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 37 deletions.
102 changes: 99 additions & 3 deletions server/svix-server/src/cfg.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// SPDX-FileCopyrightText: © 2022 Svix Authors
// SPDX-License-Identifier: MIT

use std::sync::Arc;
use std::{borrow::Cow, collections::HashMap, net::SocketAddr, sync::Arc};

use figment::{
providers::{Env, Format, Toml},
Expand All @@ -12,7 +12,7 @@ use std::time::Duration;
use crate::{core::cryptography::Encryption, core::security::Keys, error::Result};
use serde::{Deserialize, Deserializer};
use tracing::Level;
use validator::Validate;
use validator::{Validate, ValidationError};

fn deserialize_jwt_secret<'de, D>(deserializer: D) -> std::result::Result<Keys, D::Error>
where
Expand Down Expand Up @@ -78,10 +78,16 @@ const DEFAULTS: &str = include_str!("../config.default.toml");

pub type Configuration = Arc<ConfigurationInner>;

// TODO: Validate given DSNs?

#[derive(Clone, Debug, Deserialize, Validate)]
#[validate(
schema(function = "validate_config_complete"),
skip_on_field_errors = false
)]
pub struct ConfigurationInner {
/// The address to listen on
pub listen_address: String,
pub listen_address: SocketAddr,

/// The address to send operational webhooks to. When None, operational webhooks will not be
/// sent. When Some, the API server with the given URL will be used to send operational webhooks.
Expand Down Expand Up @@ -162,6 +168,83 @@ pub struct ConfigurationInner {
pub internal: InternalConfig,
}

fn validate_config_complete(
config: &ConfigurationInner,
) -> std::result::Result<(), ValidationError> {
match config.cache_type {
CacheType::None | CacheType::Memory => {}
CacheType::Redis | CacheType::RedisCluster => {
if config.redis_dsn.is_none() {
return Err(ValidationError {
code: Cow::from("unset redis_dsn"),
message: Some(Cow::from(
"The redis_dsn field must be set if the cache_type is `redis` or `rediscluster`"
)),
params: HashMap::new(),
});
}
}
}

match config.queue_type {
QueueType::Memory => {}
QueueType::Redis | QueueType::RedisCluster => {
if config.redis_dsn.is_none() {
return Err(ValidationError {
code: Cow::from("unset redis_dsn"),
message: Some(Cow::from(
"The redis_dsn field must be set if the queue_type is `redis` or `rediscluster`"
)),
params: HashMap::new(),
});
}
}
}

// TODO: Throw an error if one is `RedisCluster` and one is `Redis`?

Ok(())
}

impl ConfigurationInner {
/// Fetches the configured backend information for the queue. May panic is the configuration has
/// not been validated
pub fn queue_backend(&self) -> QueueBackend<'_> {
match self.queue_type {
QueueType::Memory => QueueBackend::Memory,
QueueType::Redis => QueueBackend::Redis(
self.redis_dsn
.as_ref()
.expect("Called [`queue_backend`] before validating configuration"),
),
QueueType::RedisCluster => QueueBackend::RedisCluster(
self.redis_dsn
.as_ref()
.expect("Called [`queue_backend`] before validating configuration"),
),
}
}

/// Fetches the configured backend information for the cache, or `None` if the [`CacheType`] is
/// `None`. May panic is the configuration has not been validated
pub fn cache_backend(&self) -> CacheBackend<'_> {
match self.cache_type {
CacheType::None => CacheBackend::None,
CacheType::Memory => CacheBackend::Memory,
CacheType::Redis => CacheBackend::Redis(
self.redis_dsn
.as_ref()
.expect("Called [`cache_backend`] before validating configuration"),
),
CacheType::RedisCluster => CacheBackend::RedisCluster(
self.redis_dsn
.as_ref()
.expect("Called [`cache_backend`] before validating configuration"),
),
}
}
}

#[derive(Clone, Debug, Deserialize)]
pub struct InternalConfig {
/// The region to use in the Svix URL given in th dashboard access endpoint
Expand All @@ -181,6 +264,19 @@ fn default_app_portal_url() -> String {
"https://app.svix.com".to_owned()
}

pub enum QueueBackend<'a> {
Memory,
Redis(&'a str),
RedisCluster(&'a str),
}

pub enum CacheBackend<'a> {
None,
Memory,
Redis(&'a str),
RedisCluster(&'a str),
}

#[derive(Clone, Debug, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum LogLevel {
Expand Down
30 changes: 10 additions & 20 deletions server/svix-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,9 @@

use axum::{extract::Extension, Router};

use cfg::CacheType;
use lazy_static::lazy_static;
use std::{
net::{SocketAddr, TcpListener},
str::FromStr,
net::TcpListener,
sync::atomic::{AtomicBool, Ordering},
time::Duration,
};
Expand All @@ -19,7 +17,7 @@ use tower_http::cors::{AllowHeaders, Any, CorsLayer};
use tower_http::trace::TraceLayer;

use crate::{
cfg::Configuration,
cfg::{CacheBackend, Configuration},
core::{
cache,
idempotency::IdempotencyService,
Expand Down Expand Up @@ -86,25 +84,18 @@ pub async fn run_with_prefix(
) {
let pool = init_db(&cfg).await;

let redis_dsn = || {
cfg.redis_dsn
.as_ref()
.expect("Redis DSN not found")
.as_str()
};

tracing::debug!("Cache type: {:?}", cfg.cache_type);
let cache = match cfg.cache_type {
CacheType::Redis => {
let mgr = crate::redis::new_redis_pool(redis_dsn(), &cfg).await;
let cache = match cfg.cache_backend() {
CacheBackend::None => cache::none::new(),
CacheBackend::Memory => cache::memory::new(),
CacheBackend::Redis(dsn) => {
let mgr = crate::redis::new_redis_pool(dsn, &cfg).await;
cache::redis::new(mgr)
}
CacheType::RedisCluster => {
let mgr = crate::redis::new_redis_pool_clustered(redis_dsn(), &cfg).await;
CacheBackend::RedisCluster(dsn) => {
let mgr = crate::redis::new_redis_pool_clustered(dsn, &cfg).await;
cache::redis::new(mgr)
}
CacheType::Memory => cache::memory::new(),
CacheType::None => cache::none::new(),
};

tracing::debug!("Queue type: {:?}", cfg.queue_type);
Expand Down Expand Up @@ -147,8 +138,7 @@ pub async fn run_with_prefix(
let with_api = cfg.api_enabled;
let with_worker = cfg.worker_enabled;

let listen_address =
SocketAddr::from_str(&cfg.listen_address).expect("Error parsing server listen address");
let listen_address = cfg.listen_address;

let (server, worker_loop, expired_message_cleaner_loop) = tokio::join!(
async {
Expand Down
21 changes: 7 additions & 14 deletions server/svix-server/src/queue/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use serde::{Deserialize, Serialize};
use svix_ksuid::*;

use crate::{
cfg::{Configuration, QueueType},
cfg::{Configuration, QueueBackend},
core::{
run_with_retries::run_with_retries,
types::{ApplicationId, EndpointId, MessageAttemptTriggerType, MessageId},
Expand All @@ -31,23 +31,16 @@ pub async fn new_pair(
cfg: &Configuration,
prefix: Option<&str>,
) -> (TaskQueueProducer, TaskQueueConsumer) {
let redis_dsn = || {
cfg.redis_dsn
.as_ref()
.expect("Redis DSN not found")
.as_str()
};

match cfg.queue_type {
QueueType::Redis => {
let pool = crate::redis::new_redis_pool(redis_dsn(), cfg).await;
match cfg.queue_backend() {
QueueBackend::Redis(dsn) => {
let pool = crate::redis::new_redis_pool(dsn, cfg).await;
redis::new_pair(pool, prefix).await
}
QueueType::RedisCluster => {
let pool = crate::redis::new_redis_pool_clustered(redis_dsn(), cfg).await;
QueueBackend::RedisCluster(dsn) => {
let pool = crate::redis::new_redis_pool_clustered(dsn, cfg).await;
redis::new_pair(pool, prefix).await
}
QueueType::Memory => memory::new_pair().await,
QueueBackend::Memory => memory::new_pair().await,
}
}

Expand Down

0 comments on commit 1461f18

Please sign in to comment.