Skip to content

Commit

Permalink
Completely remove slog and replace with tracing
Browse files Browse the repository at this point in the history
  • Loading branch information
XAMPPRocky committed Dec 23, 2021
1 parent a31ec4a commit 9f58980
Show file tree
Hide file tree
Showing 24 changed files with 246 additions and 499 deletions.
5 changes: 1 addition & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,6 @@ rand = "0.8.4"
serde = { version = "1.0.130", features = ["derive", "rc"] }
serde_json = "1.0.68"
serde_yaml = "0.8.21"
slog = "2.7.0"
slog-async = "2.7.0"
slog-json = "2.4.0"
slog-term = "2.8.0"
snap = "1.0.5"
tokio = { version = "1.13.1", features = ["rt-multi-thread", "signal", "test-util", "parking_lot"] }
tokio-stream = "0.1.8"
Expand All @@ -73,6 +69,7 @@ tryhard = "0.4.0"
eyre = "0.6.5"
stable-eyre = "0.2.2"
ipnetwork = "0.18.0"
futures = "0.3.17"

[target.'cfg(target_os = "linux")'.dependencies]
sys-info = "0.9.0"
Expand Down
19 changes: 5 additions & 14 deletions src/cluster/cluster_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use std::sync::Arc;
// and we will need to acquire a read lock with every packet that is processed
// to be able to capture the current endpoint state and pass it to Filters.
use parking_lot::RwLock;
use slog::{debug, o, warn, Logger};

use prometheus::{Registry, Result as MetricsResult};
use tokio::sync::{mpsc, watch};
Expand Down Expand Up @@ -81,21 +80,17 @@ impl ClusterManager {
/// Returns a ClusterManager where the set of clusters is continuously
/// updated based on responses from the provided updates channel.
pub fn dynamic(
base_logger: Logger,
metrics_registry: &Registry,
cluster_updates_rx: mpsc::Receiver<ClusterUpdate>,
shutdown_rx: watch::Receiver<()>,
) -> MetricsResult<SharedClusterManager> {
let log = base_logger.new(o!("source" => "cluster::ClusterManager"));

let cluster_manager = Self::new(metrics_registry, None)?;
let metrics = cluster_manager.metrics.clone();
let cluster_manager = Arc::new(RwLock::new(cluster_manager));

// Start a task in the background to receive cluster updates
// and update the cluster manager's cluster set in turn.
Self::spawn_updater(
log.clone(),
metrics,
cluster_manager.clone(),
cluster_updates_rx,
Expand Down Expand Up @@ -144,7 +139,6 @@ impl ClusterManager {
/// Spawns a task to run a loop that receives cluster updates
/// and updates the ClusterManager's state in turn.
fn spawn_updater(
log: Logger,
metrics: Metrics,
cluster_manager: Arc<RwLock<ClusterManager>>,
mut cluster_updates_rx: mpsc::Receiver<ClusterUpdate>,
Expand All @@ -156,17 +150,17 @@ impl ClusterManager {
update = cluster_updates_rx.recv() => {
match update {
Some(update) => {
debug!(log, "Received a cluster update.");
tracing::debug!("Received a cluster update.");
cluster_manager.write().update(Self::process_cluster_update(&metrics, update));
}
None => {
warn!(log, "Exiting cluster update receive loop because the sender dropped the channel.");
tracing::warn!("Exiting cluster update receive loop because the sender dropped the channel.");
return;
}
}
}
_ = shutdown_rx.changed() => {
debug!(log, "Exiting cluster update receive loop because a shutdown signal was received.");
tracing::debug!("Exiting cluster update receive loop because a shutdown signal was received.");
return;
},
}
Expand All @@ -185,15 +179,13 @@ mod tests {
cluster::{Cluster, LocalityEndpoints},
endpoint::{Endpoint, Endpoints, Metadata},
metadata::MetadataView,
test_utils::logger,
};

#[tokio::test]
async fn dynamic_cluster_manager_process_cluster_update() {
let (update_tx, update_rx) = mpsc::channel(3);
let (_shutdown_tx, shutdown_rx) = watch::channel(());
let cm = ClusterManager::dynamic(logger(), &Registry::default(), update_rx, shutdown_rx)
.unwrap();
let cm = ClusterManager::dynamic(&Registry::default(), update_rx, shutdown_rx).unwrap();

fn mapping(entries: &[(&str, &str)]) -> serde_yaml::Mapping {
entries
Expand Down Expand Up @@ -301,8 +293,7 @@ mod tests {
async fn dynamic_cluster_manager_metrics() {
let (update_tx, update_rx) = mpsc::channel(3);
let (_shutdown_tx, shutdown_rx) = watch::channel(());
let cm = ClusterManager::dynamic(logger(), &Registry::default(), update_rx, shutdown_rx)
.unwrap();
let cm = ClusterManager::dynamic(&Registry::default(), update_rx, shutdown_rx).unwrap();

// Initialization metrics
{
Expand Down
11 changes: 0 additions & 11 deletions src/endpoint/address.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,17 +227,6 @@ impl<'de> Deserialize<'de> for EndpointAddress {
}
}

impl slog::Value for EndpointAddress {
fn serialize(
&self,
_: &slog::Record,
key: slog::Key,
serializer: &mut dyn slog::Serializer,
) -> slog::Result {
serializer.emit_arguments(key, &format_args!("{}", self))
}
}

/// The kind of address, such as Domain Name or IP address. **Note** that
/// the `FromStr` implementation doesn't actually validate that the name is
/// resolvable. Use [`EndpointAddress`] for complete address validation.
Expand Down
37 changes: 6 additions & 31 deletions src/filters/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use std::sync::Arc;

use parking_lot::RwLock;
use prometheus::Registry;
use slog::{debug, o, warn, Logger};
use tokio::sync::mpsc;
use tokio::sync::watch;

Expand Down Expand Up @@ -72,34 +71,25 @@ impl FilterManager {
/// Returns a new instance backed by a stream of filter chain updates.
/// Updates from the provided stream will be reflected in the current filter chain.
pub fn dynamic(
base_logger: Logger,
metrics_registry: &Registry,
filter_chain_updates_rx: mpsc::Receiver<Arc<FilterChain>>,
shutdown_rx: watch::Receiver<()>,
) -> Result<SharedFilterManager, FilterChainError> {
let log = Self::create_logger(base_logger);

let filter_manager = Arc::new(RwLock::new(FilterManager {
// Start out with an empty filter chain.
filter_chain: Arc::new(FilterChain::new(vec![], metrics_registry)?),
}));

// Start a task in the background to receive LDS updates
// and update the FilterManager's filter chain in turn.
Self::spawn_updater(
log,
filter_manager.clone(),
filter_chain_updates_rx,
shutdown_rx,
);
Self::spawn_updater(filter_manager.clone(), filter_chain_updates_rx, shutdown_rx);

Ok(filter_manager)
}

/// Spawns a task in the background that listens for filter chain updates and
/// updates the filter manager's current filter in turn.
fn spawn_updater(
log: Logger,
filter_manager: SharedFilterManager,
mut filter_chain_updates_rx: mpsc::Receiver<Arc<FilterChain>>,
mut shutdown_rx: watch::Receiver<()>,
Expand All @@ -110,34 +100,29 @@ impl FilterManager {
update = filter_chain_updates_rx.recv() => {
match update {
Some(filter_chain) => {
debug!(log, "Received a filter chain update.");
tracing::debug!("Received a filter chain update.");
filter_manager.write().update(filter_chain);
}
None => {
warn!(log, "Exiting filter chain update receive loop because the sender dropped the channel.");
tracing::warn!("Exiting filter chain update receive loop because the sender dropped the channel.");
return;
}
}
}
_ = shutdown_rx.changed() => {
debug!(log, "Exiting filter chain update receive loop because a shutdown signal was received.");
tracing::debug!("Exiting filter chain update receive loop because a shutdown signal was received.");
return;
},
}
}
});
}

fn create_logger(base_logger: Logger) -> Logger {
base_logger.new(o!("source" => "FilterManager"))
}
}

#[cfg(test)]
mod tests {
use super::FilterManager;
use crate::filters::{Filter, FilterChain, FilterInstance, ReadContext, ReadResponse};
use crate::test_utils::logger;

use std::sync::Arc;
use std::time::Duration;
Expand All @@ -155,12 +140,7 @@ mod tests {
let (filter_chain_updates_tx, filter_chain_updates_rx) = mpsc::channel(10);
let (_shutdown_tx, shutdown_rx) = watch::channel(());

FilterManager::spawn_updater(
logger(),
filter_manager.clone(),
filter_chain_updates_rx,
shutdown_rx,
);
FilterManager::spawn_updater(filter_manager.clone(), filter_chain_updates_rx, shutdown_rx);

let filter_chain = {
let manager_guard = filter_manager.read();
Expand Down Expand Up @@ -233,12 +213,7 @@ mod tests {
let (filter_chain_updates_tx, filter_chain_updates_rx) = mpsc::channel(10);
let (shutdown_tx, shutdown_rx) = watch::channel(());

FilterManager::spawn_updater(
logger(),
filter_manager.clone(),
filter_chain_updates_rx,
shutdown_rx,
);
FilterManager::spawn_updater(filter_manager.clone(), filter_chain_updates_rx, shutdown_rx);

// Send a shutdown signal.
shutdown_tx.send(()).unwrap();
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub type Result<T, E = eyre::Error> = std::result::Result<T, E>;
#[doc(inline)]
pub use self::{
config::Config,
proxy::{logger, Builder, PendingValidation, Server, Validated},
proxy::{Builder, PendingValidation, Server, Validated},
runner::{run, run_with_config},
};

Expand Down
2 changes: 1 addition & 1 deletion src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/

pub(crate) use admin::Admin;
pub use builder::{logger, Builder, PendingValidation, Validated};
pub use builder::{Builder, PendingValidation, Validated};
pub(crate) use health::Health;
pub(crate) use metrics::Metrics;
pub use server::Server;
Expand Down
12 changes: 4 additions & 8 deletions src/proxy/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,13 @@ use std::sync::Arc;

use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Method, Request, Response, Server as HyperServer, StatusCode};
use slog::{error, info, o, Logger};
use tokio::sync::watch;

use crate::cluster::cluster_manager::SharedClusterManager;
use crate::filters::manager::SharedFilterManager;
use crate::proxy::{config_dump, Health, Metrics};

pub struct Admin {
log: Logger,
/// The address that the Admin server starts on
addr: SocketAddr,
metrics: Arc<Metrics>,
Expand All @@ -44,9 +42,8 @@ struct HandleRequestArgs {
}

impl Admin {
pub fn new(base: &Logger, addr: SocketAddr, metrics: Arc<Metrics>, heath: Health) -> Self {
pub fn new(addr: SocketAddr, metrics: Arc<Metrics>, heath: Health) -> Self {
Admin {
log: base.new(o!("source" => "proxy::Admin")),
addr,
metrics,
health: Arc::new(heath),
Expand All @@ -59,7 +56,7 @@ impl Admin {
filter_manager: SharedFilterManager,
mut shutdown_rx: watch::Receiver<()>,
) {
info!(self.log, "Starting admin endpoint"; "address" => self.addr.to_string());
tracing::info!(address = %self.addr, "Starting admin endpoint");

let args = HandleRequestArgs {
metrics: self.metrics.clone(),
Expand All @@ -84,10 +81,9 @@ impl Admin {
shutdown_rx.changed().await.ok();
});

let log = self.log.clone();
tokio::spawn(async move {
if let Err(err) = server.await {
error!(log, "Admin server exited with an error"; "error" => %err);
if let Err(error) = server.await {
tracing::error!(%error, "Admin server exited with an error");
}
});
}
Expand Down
28 changes: 3 additions & 25 deletions src/proxy/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
use std::{collections::HashSet, convert::TryInto, marker::PhantomData, sync::Arc};

use prometheus::Registry;
use slog::{o, Drain, Logger};
use tonic::transport::Endpoint as TonicEndpoint;

use crate::config::{Config, ManagementServer, Proxy, Source, ValidationError, ValueInvalidArgs};
Expand Down Expand Up @@ -87,7 +86,6 @@ impl ValidationStatus for PendingValidation {

/// Represents the components needed to create a Server.
pub struct Builder<V> {
log: Logger,
config: Arc<Config>,
filter_registry: FilterRegistry,
admin: Option<ProxyAdmin>,
Expand All @@ -97,16 +95,14 @@ pub struct Builder<V> {

impl From<Arc<Config>> for Builder<PendingValidation> {
fn from(config: Arc<Config>) -> Self {
let log = logger();
let metrics = Arc::new(Metrics::new(&log, Registry::default()));
let health = Health::new(&log);
let admin = ProxyAdmin::new(&log, config.admin.address, metrics.clone(), health);
let metrics = Arc::new(Metrics::new(Registry::default()));
let health = Health::new();
let admin = ProxyAdmin::new(config.admin.address, metrics.clone(), health);
Builder {
config,
filter_registry: FilterRegistry::new(FilterSet::default()),
admin: Some(admin),
metrics,
log,
validation_status: PendingValidation,
}
}
Expand Down Expand Up @@ -199,10 +195,6 @@ impl ValidatedConfig {
}

impl Builder<PendingValidation> {
pub fn with_log(self, log: Logger) -> Self {
Self { log, ..self }
}

pub fn with_filter_registry(self, filter_registry: FilterRegistry) -> Self {
Self {
filter_registry,
Expand All @@ -224,7 +216,6 @@ impl Builder<PendingValidation> {
ValidatedConfig::validate(self.config.clone(), &self.filter_registry, &self.metrics)?;

Ok(Builder {
log: self.log,
config: self.config,
admin: self.admin,
metrics: self.metrics,
Expand All @@ -237,7 +228,6 @@ impl Builder<PendingValidation> {
impl Builder<Validated> {
pub fn build(self) -> Server {
Server {
log: self.log.new(o!("source" => "server::Server")),
config: Arc::new(self.validation_status.0),
proxy_metrics: ProxyMetrics::new(&self.metrics.registry)
.expect("proxy metrics should be setup properly"),
Expand All @@ -250,18 +240,6 @@ impl Builder<Validated> {
}
}

/// Create a new `slog::Logger` instance using the default
/// quilkin configuration.
pub fn logger() -> Logger {
let drain = slog_json::Json::new(std::io::stdout())
.set_pretty(false)
.add_default_keys()
.build()
.fuse();
let drain = slog_async::Async::new(drain).build().fuse();
slog::Logger::root(drain, o!())
}

#[cfg(test)]
mod tests {
use std::convert::TryFrom;
Expand Down
Loading

0 comments on commit 9f58980

Please sign in to comment.