Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Completely remove slog and replace with tracing #457

Merged
merged 2 commits into from
Jan 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,6 @@ rand = "0.8.4"
serde = { version = "1.0.130", features = ["derive", "rc"] }
serde_json = "1.0.68"
serde_yaml = "0.8.21"
slog = "2.7.0"
slog-async = "2.7.0"
slog-json = "2.4.0"
slog-term = "2.8.0"
snap = "1.0.5"
tokio = { version = "1.13.1", features = ["rt-multi-thread", "signal", "test-util", "parking_lot"] }
tokio-stream = "0.1.8"
Expand All @@ -73,6 +69,7 @@ tryhard = "0.4.0"
eyre = "0.6.5"
stable-eyre = "0.2.2"
ipnetwork = "0.18.0"
futures = "0.3.17"

[target.'cfg(target_os = "linux")'.dependencies]
sys-info = "0.9.0"
Expand Down
19 changes: 5 additions & 14 deletions src/cluster/cluster_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use std::sync::Arc;
// and we will need to acquire a read lock with every packet that is processed
// to be able to capture the current endpoint state and pass it to Filters.
use parking_lot::RwLock;
use slog::{debug, o, warn, Logger};

use prometheus::{Registry, Result as MetricsResult};
use tokio::sync::{mpsc, watch};
Expand Down Expand Up @@ -81,21 +80,17 @@ impl ClusterManager {
/// Returns a ClusterManager where the set of clusters is continuously
/// updated based on responses from the provided updates channel.
pub fn dynamic(
base_logger: Logger,
metrics_registry: &Registry,
cluster_updates_rx: mpsc::Receiver<ClusterUpdate>,
shutdown_rx: watch::Receiver<()>,
) -> MetricsResult<SharedClusterManager> {
let log = base_logger.new(o!("source" => "cluster::ClusterManager"));

let cluster_manager = Self::new(metrics_registry, None)?;
let metrics = cluster_manager.metrics.clone();
let cluster_manager = Arc::new(RwLock::new(cluster_manager));

// Start a task in the background to receive cluster updates
// and update the cluster manager's cluster set in turn.
Self::spawn_updater(
log.clone(),
metrics,
cluster_manager.clone(),
cluster_updates_rx,
Expand Down Expand Up @@ -144,7 +139,6 @@ impl ClusterManager {
/// Spawns a task to run a loop that receives cluster updates
/// and updates the ClusterManager's state in turn.
fn spawn_updater(
log: Logger,
metrics: Metrics,
cluster_manager: Arc<RwLock<ClusterManager>>,
mut cluster_updates_rx: mpsc::Receiver<ClusterUpdate>,
Expand All @@ -156,17 +150,17 @@ impl ClusterManager {
update = cluster_updates_rx.recv() => {
match update {
Some(update) => {
debug!(log, "Received a cluster update.");
tracing::debug!("Received a cluster update.");
cluster_manager.write().update(Self::process_cluster_update(&metrics, update));
}
None => {
warn!(log, "Exiting cluster update receive loop because the sender dropped the channel.");
tracing::warn!("Exiting cluster update receive loop because the sender dropped the channel.");
return;
}
}
}
_ = shutdown_rx.changed() => {
debug!(log, "Exiting cluster update receive loop because a shutdown signal was received.");
tracing::debug!("Exiting cluster update receive loop because a shutdown signal was received.");
return;
},
}
Expand All @@ -185,15 +179,13 @@ mod tests {
cluster::{Cluster, LocalityEndpoints},
endpoint::{Endpoint, Endpoints, Metadata},
metadata::MetadataView,
test_utils::logger,
};

#[tokio::test]
async fn dynamic_cluster_manager_process_cluster_update() {
let (update_tx, update_rx) = mpsc::channel(3);
let (_shutdown_tx, shutdown_rx) = watch::channel(());
let cm = ClusterManager::dynamic(logger(), &Registry::default(), update_rx, shutdown_rx)
.unwrap();
let cm = ClusterManager::dynamic(&Registry::default(), update_rx, shutdown_rx).unwrap();

fn mapping(entries: &[(&str, &str)]) -> serde_yaml::Mapping {
entries
Expand Down Expand Up @@ -301,8 +293,7 @@ mod tests {
async fn dynamic_cluster_manager_metrics() {
let (update_tx, update_rx) = mpsc::channel(3);
let (_shutdown_tx, shutdown_rx) = watch::channel(());
let cm = ClusterManager::dynamic(logger(), &Registry::default(), update_rx, shutdown_rx)
.unwrap();
let cm = ClusterManager::dynamic(&Registry::default(), update_rx, shutdown_rx).unwrap();

// Initialization metrics
{
Expand Down
11 changes: 0 additions & 11 deletions src/endpoint/address.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,17 +227,6 @@ impl<'de> Deserialize<'de> for EndpointAddress {
}
}

impl slog::Value for EndpointAddress {
fn serialize(
&self,
_: &slog::Record,
key: slog::Key,
serializer: &mut dyn slog::Serializer,
) -> slog::Result {
serializer.emit_arguments(key, &format_args!("{}", self))
}
}

/// The kind of address, such as Domain Name or IP address. **Note** that
/// the `FromStr` implementation doesn't actually validate that the name is
/// resolvable. Use [`EndpointAddress`] for complete address validation.
Expand Down
37 changes: 6 additions & 31 deletions src/filters/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use std::sync::Arc;

use parking_lot::RwLock;
use prometheus::Registry;
use slog::{debug, o, warn, Logger};
use tokio::sync::mpsc;
use tokio::sync::watch;

Expand Down Expand Up @@ -72,34 +71,25 @@ impl FilterManager {
/// Returns a new instance backed by a stream of filter chain updates.
/// Updates from the provided stream will be reflected in the current filter chain.
pub fn dynamic(
base_logger: Logger,
metrics_registry: &Registry,
filter_chain_updates_rx: mpsc::Receiver<Arc<FilterChain>>,
shutdown_rx: watch::Receiver<()>,
) -> Result<SharedFilterManager, FilterChainError> {
let log = Self::create_logger(base_logger);

let filter_manager = Arc::new(RwLock::new(FilterManager {
// Start out with an empty filter chain.
filter_chain: Arc::new(FilterChain::new(vec![], metrics_registry)?),
}));

// Start a task in the background to receive LDS updates
// and update the FilterManager's filter chain in turn.
Self::spawn_updater(
log,
filter_manager.clone(),
filter_chain_updates_rx,
shutdown_rx,
);
Self::spawn_updater(filter_manager.clone(), filter_chain_updates_rx, shutdown_rx);

Ok(filter_manager)
}

/// Spawns a task in the background that listens for filter chain updates and
/// updates the filter manager's current filter in turn.
fn spawn_updater(
log: Logger,
filter_manager: SharedFilterManager,
mut filter_chain_updates_rx: mpsc::Receiver<Arc<FilterChain>>,
mut shutdown_rx: watch::Receiver<()>,
Expand All @@ -110,34 +100,29 @@ impl FilterManager {
update = filter_chain_updates_rx.recv() => {
match update {
Some(filter_chain) => {
debug!(log, "Received a filter chain update.");
tracing::debug!("Received a filter chain update.");
filter_manager.write().update(filter_chain);
}
None => {
warn!(log, "Exiting filter chain update receive loop because the sender dropped the channel.");
tracing::warn!("Exiting filter chain update receive loop because the sender dropped the channel.");
return;
}
}
}
_ = shutdown_rx.changed() => {
debug!(log, "Exiting filter chain update receive loop because a shutdown signal was received.");
tracing::debug!("Exiting filter chain update receive loop because a shutdown signal was received.");
return;
},
}
}
});
}

fn create_logger(base_logger: Logger) -> Logger {
base_logger.new(o!("source" => "FilterManager"))
}
}

#[cfg(test)]
mod tests {
use super::FilterManager;
use crate::filters::{Filter, FilterChain, FilterInstance, ReadContext, ReadResponse};
use crate::test_utils::logger;

use std::sync::Arc;
use std::time::Duration;
Expand All @@ -155,12 +140,7 @@ mod tests {
let (filter_chain_updates_tx, filter_chain_updates_rx) = mpsc::channel(10);
let (_shutdown_tx, shutdown_rx) = watch::channel(());

FilterManager::spawn_updater(
logger(),
filter_manager.clone(),
filter_chain_updates_rx,
shutdown_rx,
);
FilterManager::spawn_updater(filter_manager.clone(), filter_chain_updates_rx, shutdown_rx);

let filter_chain = {
let manager_guard = filter_manager.read();
Expand Down Expand Up @@ -233,12 +213,7 @@ mod tests {
let (filter_chain_updates_tx, filter_chain_updates_rx) = mpsc::channel(10);
let (shutdown_tx, shutdown_rx) = watch::channel(());

FilterManager::spawn_updater(
logger(),
filter_manager.clone(),
filter_chain_updates_rx,
shutdown_rx,
);
FilterManager::spawn_updater(filter_manager.clone(), filter_chain_updates_rx, shutdown_rx);

// Send a shutdown signal.
shutdown_tx.send(()).unwrap();
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub type Result<T, E = eyre::Error> = std::result::Result<T, E>;
#[doc(inline)]
pub use self::{
config::Config,
proxy::{logger, Builder, PendingValidation, Server, Validated},
proxy::{Builder, PendingValidation, Server, Validated},
runner::{run, run_with_config},
};

Expand Down
2 changes: 1 addition & 1 deletion src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/

pub(crate) use admin::Admin;
pub use builder::{logger, Builder, PendingValidation, Validated};
pub use builder::{Builder, PendingValidation, Validated};
pub(crate) use health::Health;
pub(crate) use metrics::Metrics;
pub use server::Server;
Expand Down
12 changes: 4 additions & 8 deletions src/proxy/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,13 @@ use std::sync::Arc;

use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Method, Request, Response, Server as HyperServer, StatusCode};
use slog::{error, info, o, Logger};
use tokio::sync::watch;

use crate::cluster::cluster_manager::SharedClusterManager;
use crate::filters::manager::SharedFilterManager;
use crate::proxy::{config_dump, Health, Metrics};

pub struct Admin {
log: Logger,
/// The address that the Admin server starts on
addr: SocketAddr,
metrics: Arc<Metrics>,
Expand All @@ -44,9 +42,8 @@ struct HandleRequestArgs {
}

impl Admin {
pub fn new(base: &Logger, addr: SocketAddr, metrics: Arc<Metrics>, heath: Health) -> Self {
pub fn new(addr: SocketAddr, metrics: Arc<Metrics>, heath: Health) -> Self {
Admin {
log: base.new(o!("source" => "proxy::Admin")),
addr,
metrics,
health: Arc::new(heath),
Expand All @@ -59,7 +56,7 @@ impl Admin {
filter_manager: SharedFilterManager,
mut shutdown_rx: watch::Receiver<()>,
) {
info!(self.log, "Starting admin endpoint"; "address" => self.addr.to_string());
tracing::info!(address = %self.addr, "Starting admin endpoint");

let args = HandleRequestArgs {
metrics: self.metrics.clone(),
Expand All @@ -84,10 +81,9 @@ impl Admin {
shutdown_rx.changed().await.ok();
});

let log = self.log.clone();
tokio::spawn(async move {
if let Err(err) = server.await {
error!(log, "Admin server exited with an error"; "error" => %err);
if let Err(error) = server.await {
tracing::error!(%error, "Admin server exited with an error");
}
});
}
Expand Down
28 changes: 3 additions & 25 deletions src/proxy/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
use std::{collections::HashSet, convert::TryInto, marker::PhantomData, sync::Arc};

use prometheus::Registry;
use slog::{o, Drain, Logger};
use tonic::transport::Endpoint as TonicEndpoint;

use crate::config::{Config, ManagementServer, Proxy, Source, ValidationError, ValueInvalidArgs};
Expand Down Expand Up @@ -87,7 +86,6 @@ impl ValidationStatus for PendingValidation {

/// Represents the components needed to create a Server.
pub struct Builder<V> {
log: Logger,
config: Arc<Config>,
filter_registry: FilterRegistry,
admin: Option<ProxyAdmin>,
Expand All @@ -97,16 +95,14 @@ pub struct Builder<V> {

impl From<Arc<Config>> for Builder<PendingValidation> {
fn from(config: Arc<Config>) -> Self {
let log = logger();
let metrics = Arc::new(Metrics::new(&log, Registry::default()));
let health = Health::new(&log);
let admin = ProxyAdmin::new(&log, config.admin.address, metrics.clone(), health);
let metrics = Arc::new(Metrics::new(Registry::default()));
let health = Health::new();
let admin = ProxyAdmin::new(config.admin.address, metrics.clone(), health);
Builder {
config,
filter_registry: FilterRegistry::new(FilterSet::default()),
admin: Some(admin),
metrics,
log,
validation_status: PendingValidation,
}
}
Expand Down Expand Up @@ -199,10 +195,6 @@ impl ValidatedConfig {
}

impl Builder<PendingValidation> {
pub fn with_log(self, log: Logger) -> Self {
Self { log, ..self }
}

pub fn with_filter_registry(self, filter_registry: FilterRegistry) -> Self {
Self {
filter_registry,
Expand All @@ -224,7 +216,6 @@ impl Builder<PendingValidation> {
ValidatedConfig::validate(self.config.clone(), &self.filter_registry, &self.metrics)?;

Ok(Builder {
log: self.log,
config: self.config,
admin: self.admin,
metrics: self.metrics,
Expand All @@ -237,7 +228,6 @@ impl Builder<PendingValidation> {
impl Builder<Validated> {
pub fn build(self) -> Server {
Server {
log: self.log.new(o!("source" => "server::Server")),
config: Arc::new(self.validation_status.0),
proxy_metrics: ProxyMetrics::new(&self.metrics.registry)
.expect("proxy metrics should be setup properly"),
Expand All @@ -250,18 +240,6 @@ impl Builder<Validated> {
}
}

/// Create a new `slog::Logger` instance using the default
/// quilkin configuration.
pub fn logger() -> Logger {
let drain = slog_json::Json::new(std::io::stdout())
.set_pretty(false)
.add_default_keys()
.build()
.fuse();
let drain = slog_async::Async::new(drain).build().fuse();
slog::Logger::root(drain, o!())
}

#[cfg(test)]
mod tests {
use std::convert::TryFrom;
Expand Down
Loading