Skip to content

Commit

Permalink
pass ServerConfig to new worker
Browse files Browse the repository at this point in the history
previously, a whole Config was passed to the new worker,
which is overkill, and will not be translatable to
protobuf. ServerConfig contains only the necessary infos.
  • Loading branch information
Keksoj committed Feb 2, 2024
1 parent 62f3db3 commit 24a941b
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 30 deletions.
30 changes: 21 additions & 9 deletions bin/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::{
fs::File,
io::Error as IoError,
io::Seek,
net::SocketAddr,
os::unix::io::{AsRawFd, FromRawFd, IntoRawFd},
os::unix::process::CommandExt,
process::Command,
Expand All @@ -24,8 +25,8 @@ use tempfile::tempfile;

use sozu_command_lib::{
channel::{Channel, ChannelError},
config::Config,
logging::setup_logging_with_config,
config::{Config, ServerConfig},
logging::{setup_logging, setup_logging_with_config},
proto::command::{WorkerRequest, WorkerResponse},
ready::Ready,
request::{read_requests_from_file, RequestError},
Expand Down Expand Up @@ -88,7 +89,7 @@ pub fn begin_worker_process(
command_buffer_size: usize,
max_command_buffer_size: usize,
) -> Result<(), WorkerError> {
let mut worker_to_main_channel: Channel<WorkerResponse, Config> = Channel::new(
let mut worker_to_main_channel: Channel<WorkerResponse, ServerConfig> = Channel::new(
unsafe { UnixStream::from_raw_fd(worker_to_main_channel_fd) },
command_buffer_size,
max_command_buffer_size,
Expand All @@ -110,7 +111,12 @@ pub fn begin_worker_process(
let worker_id = format!("{}-{:02}", "WRK", id);

// do not try to log anything before this, or the logger will panic
setup_logging_with_config(&worker_config, &worker_id);
setup_logging(
&worker_config.log_target,
worker_config.log_access_target.as_deref(),
&worker_config.log_level,
&worker_id,
);

trace!(
"Creating worker {} with config: {:#?}",
Expand All @@ -133,8 +139,12 @@ pub fn begin_worker_process(
worker_to_main_channel.readiness.insert(Ready::READABLE);

if let Some(metrics) = worker_config.metrics.as_ref() {
let address = metrics
.address
.parse::<SocketAddr>()
.expect("Could not parse metrics address");
metrics::setup(
&metrics.address,
&address,
worker_id,
metrics.tagged_metrics,
metrics.prefix.clone(),
Expand Down Expand Up @@ -218,10 +228,12 @@ pub fn fork_main_into_worker(
}
})?;

let mut main_to_worker_channel: Channel<Config, WorkerResponse> = Channel::new(
let worker_config = ServerConfig::from_config(config);

let mut main_to_worker_channel: Channel<ServerConfig, WorkerResponse> = Channel::new(
main_to_worker,
config.command_buffer_size,
config.max_command_buffer_size,
worker_config.command_buffer_size,
worker_config.max_command_buffer_size,
);

// DISCUSS: should we really block the channel just to write on it?
Expand All @@ -236,7 +248,7 @@ pub fn fork_main_into_worker(
ForkResult::Parent { child: worker_pid } => {
info!("launching worker {} with pid {}", worker_id, worker_pid);
main_to_worker_channel
.write_message(config)
.write_message(&worker_config)
.map_err(WorkerError::SendConfig)?;

main_to_worker_channel
Expand Down
34 changes: 34 additions & 0 deletions command/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1746,7 +1746,15 @@ fn display_toml_error(file: &str, error: &toml::de::Error) {
}
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ServerMetricsConfig {
pub address: String,
pub tagged_metrics: bool,
pub prefix: Option<String>,
}

/// Used by a worker to start its server loop
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ServerConfig {
pub max_connections: usize,
pub front_timeout: u32,
Expand All @@ -1757,10 +1765,24 @@ pub struct ServerConfig {
pub min_buffers: usize,
pub max_buffers: usize,
pub buffer_size: usize,
pub log_level: String,
pub log_target: String,
pub log_access_target: Option<String>,
pub command_buffer_size: usize,
pub max_command_buffer_size: usize,
pub metrics: Option<ServerMetricsConfig>,
}

impl ServerConfig {
/// reduce the config to the bare minimum needed by a worker
pub fn from_config(config: &Config) -> ServerConfig {
let metrics = config.metrics.clone().and_then(|m| {
Some(ServerMetricsConfig {
address: m.address.to_string(),
tagged_metrics: m.tagged_metrics,
prefix: m.prefix,
})
});
ServerConfig {
max_connections: config.max_connections,
front_timeout: config.front_timeout,
Expand All @@ -1771,6 +1793,12 @@ impl ServerConfig {
min_buffers: config.min_buffers,
max_buffers: config.max_buffers,
buffer_size: config.buffer_size,
log_level: config.log_level.clone(),
log_target: config.log_target.clone(),
log_access_target: config.log_access_target.clone(),
command_buffer_size: config.command_buffer_size,
max_command_buffer_size: config.max_command_buffer_size,
metrics,
}
}

Expand All @@ -1792,6 +1820,12 @@ impl Default for ServerConfig {
min_buffers: DEFAULT_MIN_BUFFERS,
max_buffers: DEFAULT_MAX_BUFFERS,
buffer_size: DEFAULT_BUFFER_SIZE,
log_level: "info".to_string(),
log_target: "stdout".to_string(),
log_access_target: None,
command_buffer_size: DEFAULT_COMMAND_BUFFER_SIZE,
max_command_buffer_size: DEFAULT_MAX_COMMAND_BUFFER_SIZE,
metrics: None,
}
}
}
Expand Down
18 changes: 10 additions & 8 deletions e2e/src/sozu/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use sozu_lib as sozu;
use sozu::server::Server;
use sozu_command::{
channel::Channel,
config::{Config, ConfigBuilder, FileConfig},
config::{ConfigBuilder, FileConfig, ServerConfig},
logging::setup_logging,
proto::command::{
request::RequestType, AddBackend, Cluster, HardStop, LoadBalancingParams, PathRule,
Expand All @@ -28,7 +28,7 @@ use crate::sozu::command_id::CommandID;
/// Handle to a detached thread where a Sozu worker runs
pub struct Worker {
pub name: String,
pub config: Config,
pub config: ServerConfig,
pub state: ConfigState,
pub scm_main_to_worker: ScmSocket,
pub scm_worker_to_main: ScmSocket,
Expand All @@ -49,22 +49,24 @@ pub fn set_no_close_exec(fd: i32) {
}

impl Worker {
pub fn into_config(file_config: FileConfig) -> Config {
ConfigBuilder::new(file_config, "")
pub fn into_config(file_config: FileConfig) -> ServerConfig {
let config = ConfigBuilder::new(file_config, "")
.into_config()
.expect("could not create Config")
.expect("could not create Config");
ServerConfig::from_config(&config)
}

pub fn empty_config() -> (Config, Listeners, ConfigState) {
pub fn empty_config() -> (ServerConfig, Listeners, ConfigState) {
let listeners = Listeners::default();
let config = FileConfig::default();
let config = Worker::into_config(config);
let state = ConfigState::new();
(config, listeners, state)
}

// TODO: this seems to be used nowhere. We may want to delete it.
pub fn create_server(
config: Config,
config: ServerConfig,
listeners: Listeners,
state: ConfigState,
) -> (ScmSocket, Channel<WorkerRequest, WorkerResponse>, Server) {
Expand Down Expand Up @@ -108,7 +110,7 @@ impl Worker {

pub fn start_new_worker<S: Into<String>>(
name: S,
config: Config,
config: ServerConfig,
listeners: &Listeners,
state: ConfigState,
) -> Self {
Expand Down
8 changes: 4 additions & 4 deletions e2e/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::{
};

use sozu_command_lib::{
config::{Config, ListenerBuilder},
config::{ListenerBuilder, ServerConfig},
proto::command::{request::RequestType, ActivateListener, ListenerType, Request},
scm_socket::Listeners,
state::ConfigState,
Expand Down Expand Up @@ -46,7 +46,7 @@ fn provide_port() -> u16 {
/// - n backends ("cluster_0-{0..n}")
pub fn setup_test<S: Into<String>>(
name: S,
config: Config,
config: ServerConfig,
listeners: Listeners,
state: ConfigState,
front_address: SocketAddr,
Expand Down Expand Up @@ -108,7 +108,7 @@ pub fn setup_test<S: Into<String>>(

pub fn setup_async_test<S: Into<String>>(
name: S,
config: Config,
config: ServerConfig,
listeners: Listeners,
state: ConfigState,
front_address: SocketAddr,
Expand Down Expand Up @@ -145,7 +145,7 @@ pub fn setup_async_test<S: Into<String>>(

pub fn setup_sync_test<S: Into<String>>(
name: S,
config: Config,
config: ServerConfig,
listeners: Listeners,
state: ConfigState,
front_address: SocketAddr,
Expand Down
17 changes: 8 additions & 9 deletions lib/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use time::{Duration, Instant};

use sozu_command::{
channel::Channel,
config::{Config, ServerConfig},
config::ServerConfig,
proto::command::{
request::RequestType, response_content::ContentType, ActivateListener, AddBackend,
CertificatesWithFingerprints, Cluster, ClusterHashes, ClusterInformations,
Expand Down Expand Up @@ -242,24 +242,23 @@ impl Server {
pub fn try_new_from_config(
worker_to_main_channel: ProxyChannel,
worker_to_main_scm: ScmSocket,
config: Config,
config: ServerConfig,
initial_state: Vec<WorkerRequest>,
expects_initial_status: bool,
) -> Result<Self, ServerError> {
let event_loop = Poll::new().map_err(ServerError::CreatePoll)?;
let server_config = ServerConfig::from_config(&config);
let pool = Rc::new(RefCell::new(Pool::with_capacity(
server_config.min_buffers,
server_config.max_buffers,
server_config.buffer_size,
config.min_buffers,
config.max_buffers,
config.buffer_size,
)));
let backends = Rc::new(RefCell::new(BackendMap::new()));

//FIXME: we will use a few entries for the channel, metrics socket and the listeners
//FIXME: for HTTP/2, we will have more than 2 entries per session
let sessions: Rc<RefCell<SessionManager>> = SessionManager::new(
Slab::with_capacity(server_config.slab_capacity()),
server_config.max_connections,
Slab::with_capacity(config.slab_capacity()),
config.max_connections,
);
{
let mut s = sessions.borrow_mut();
Expand Down Expand Up @@ -304,7 +303,7 @@ impl Server {
None,
Some(https),
None,
server_config,
config,
Some(initial_state),
expects_initial_status,
)
Expand Down

0 comments on commit 24a941b

Please sign in to comment.