Skip to content

Commit

Permalink
Refactor drop into a single metric, change filters to return a result
Browse files Browse the repository at this point in the history
  • Loading branch information
XAMPPRocky committed Mar 28, 2023
1 parent df9ffe1 commit b4cb71e
Show file tree
Hide file tree
Showing 43 changed files with 429 additions and 521 deletions.
10 changes: 5 additions & 5 deletions docs/src/services/proxy/filters/writing_custom_filters.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ sent to a downstream client.
use quilkin::filters::prelude::*;
impl Filter for Greet {
fn read(&self, ctx: &mut ReadContext) -> Option<()> {
fn read(&self, ctx: &mut ReadContext) -> Result<(), FilterError> {
ctx.contents.extend(b"Hello");
Some(())
Ok(())
}
fn write(&self, ctx: &mut WriteContext) -> Option<()> {
fn write(&self, ctx: &mut WriteContext) -> Result<(), FilterError> {
ctx.contents.extend(b"Goodbye");
Some(())
Ok(())
}
}
```
Expand All @@ -65,7 +65,7 @@ impl StaticFilter for Greet {
type Configuration = ();
type BinaryConfiguration = ();
fn try_from_config(config: Option<Self::Configuration>) -> Result<Self, Error> {
fn try_from_config(config: Option<Self::Configuration>) -> Result<Self, CreationError> {
Ok(Self)
}
}
Expand Down
11 changes: 6 additions & 5 deletions examples/quilkin-filter-example/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,15 @@ struct Greet {
}

impl Filter for Greet {
fn read(&self, ctx: &mut ReadContext) -> Option<()> {
fn read(&self, ctx: &mut ReadContext) -> Result<(), FilterError> {
ctx.contents
.splice(0..0, format!("{} ", self.config.greeting).into_bytes());
Some(())
Ok(())
}
fn write(&self, ctx: &mut WriteContext) -> Option<()> {
fn write(&self, ctx: &mut WriteContext) -> Result<(), FilterError> {
ctx.contents
.splice(0..0, format!("{} ", self.config.greeting).into_bytes());
Some(())
Ok(())
}
}
// ANCHOR_END: filter
Expand All @@ -81,7 +81,7 @@ impl StaticFilter for Greet {

fn try_from_config(
config: Option<Self::Configuration>,
) -> Result<Self, quilkin::filters::Error> {
) -> Result<Self, CreationError> {
Ok(Self {
config: Self::ensure_config_exists(config)?,
})
Expand All @@ -100,6 +100,7 @@ async fn main() -> quilkin::Result<()> {
config.filters.store(std::sync::Arc::new(
vec![quilkin::config::Filter {
name: Greet::NAME.into(),
label: None,
config: None,
}]
.try_into()?,
Expand Down
2 changes: 2 additions & 0 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ mod tests {
vec![
Filter {
name: Capture::factory().name().into(),
label: None,
config: Some(serde_json::json!({
"suffix": {
"size": 3,
Expand All @@ -247,6 +248,7 @@ mod tests {
},
Filter {
name: TokenRouter::factory().name().into(),
label: None,
config: None,
},
]
Expand Down
1 change: 1 addition & 0 deletions src/cli/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ mod tests {
config.filters.store(
crate::filters::FilterChain::try_from(vec![config::Filter {
name: "TestFilter".to_string(),
label: None,
config: None,
}])
.map(Arc::new)
Expand Down
19 changes: 10 additions & 9 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,6 @@ pub use self::{

base64_serde_type!(pub Base64Standard, base64::STANDARD);

// For some log messages on the hot path (potentially per-packet), we log 1 out
// of every `LOG_SAMPLING_RATE` occurrences to avoid spamming the logs.
pub(crate) const LOG_SAMPLING_RATE: u64 = 1000;
pub(crate) const BACKOFF_INITIAL_DELAY_MILLISECONDS: u64 = 500;
pub(crate) const BACKOFF_MAX_DELAY_SECONDS: u64 = 30;
pub(crate) const BACKOFF_MAX_JITTER_MILLISECONDS: u64 = 2000;
Expand Down Expand Up @@ -256,11 +253,12 @@ fn default_proxy_id() -> Slot<String> {
#[serde(deny_unknown_fields)]
pub struct Filter {
pub name: String,
pub label: Option<String>,
pub config: Option<serde_json::Value>,
}

impl TryFrom<crate::xds::config::listener::v3::Filter> for Filter {
type Error = Error;
type Error = CreationError;

fn try_from(filter: crate::xds::config::listener::v3::Filter) -> Result<Self, Self::Error> {
use crate::xds::config::listener::v3::filter::ConfigType;
Expand All @@ -269,15 +267,15 @@ impl TryFrom<crate::xds::config::listener::v3::Filter> for Filter {
let config = match config_type {
ConfigType::TypedConfig(any) => any,
ConfigType::ConfigDiscovery(_) => {
return Err(Error::FieldInvalid {
return Err(CreationError::FieldInvalid {
field: "config_type".into(),
reason: "ConfigDiscovery is currently unsupported".into(),
})
}
};
Some(
crate::filters::FilterRegistry::get_factory(&filter.name)
.ok_or_else(|| Error::NotFound(filter.name.clone()))?
.ok_or_else(|| CreationError::NotFound(filter.name.clone()))?
.encode_config_to_json(config)?,
)
} else {
Expand All @@ -286,21 +284,23 @@ impl TryFrom<crate::xds::config::listener::v3::Filter> for Filter {

Ok(Self {
name: filter.name,
// TODO: keep the label across xDS
label: None,
config,
})
}
}

impl TryFrom<Filter> for crate::xds::config::listener::v3::Filter {
type Error = Error;
type Error = CreationError;

fn try_from(filter: Filter) -> Result<Self, Self::Error> {
use crate::xds::config::listener::v3::filter::ConfigType;

let config = if let Some(config) = filter.config {
Some(
crate::filters::FilterRegistry::get_factory(&filter.name)
.ok_or_else(|| Error::NotFound(filter.name.clone()))?
.ok_or_else(|| CreationError::NotFound(filter.name.clone()))?
.encode_config_to_protobuf(config)?,
)
} else {
Expand All @@ -318,7 +318,8 @@ impl From<(String, FilterInstance)> for Filter {
fn from((name, instance): (String, FilterInstance)) -> Self {
Self {
name,
config: Some(serde_json::Value::clone(&instance.config)),
label: instance.label().map(String::from),
config: Some(serde_json::Value::clone(instance.config())),
}
}
}
Expand Down
14 changes: 7 additions & 7 deletions src/config/config_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::convert::TryFrom;

use bytes::Bytes;

use crate::filters::Error;
use crate::filters::CreationError;

/// The configuration of a [`Filter`][crate::filters::Filter] from either a
/// static or dynamic source.
Expand Down Expand Up @@ -51,20 +51,20 @@ impl ConfigType {
pub fn deserialize<TextConfiguration, BinaryConfiguration>(
self,
filter_name: &str,
) -> Result<(serde_json::Value, TextConfiguration), Error>
) -> Result<(serde_json::Value, TextConfiguration), CreationError>
where
BinaryConfiguration: prost::Message + Default,
TextConfiguration:
serde::Serialize + for<'de> serde::Deserialize<'de> + TryFrom<BinaryConfiguration>,
Error: From<<BinaryConfiguration as TryInto<TextConfiguration>>::Error>,
CreationError: From<<BinaryConfiguration as TryInto<TextConfiguration>>::Error>,
{
match self {
ConfigType::Static(ref config) => serde_yaml::to_string(config)
.and_then(|raw_config| {
serde_yaml::from_str::<TextConfiguration>(raw_config.as_str())
})
.map_err(|err| {
Error::DeserializeFailed(format!(
CreationError::DeserializeFailed(format!(
"filter `{filter_name}`: failed to YAML deserialize config: {err}",
))
})
Expand All @@ -74,7 +74,7 @@ impl ConfigType {
}),
ConfigType::Dynamic(config) => prost::Message::decode(Bytes::from(config.value))
.map_err(|err| {
Error::DeserializeFailed(format!(
CreationError::DeserializeFailed(format!(
"filter `{filter_name}`: config decode error: {err}",
))
})
Expand All @@ -87,12 +87,12 @@ impl ConfigType {
}

// Returns an equivalent json value for the passed in config.
fn get_json_config<T>(filter_name: &str, config: &T) -> Result<serde_json::Value, Error>
fn get_json_config<T>(filter_name: &str, config: &T) -> Result<serde_json::Value, CreationError>
where
T: serde::Serialize + for<'de> serde::Deserialize<'de>,
{
serde_json::to_value(config).map_err(|err| {
Error::DeserializeFailed(format!(
CreationError::DeserializeFailed(format!(
"filter `{filter_name}`: failed to serialize config to json: {err}",
))
})
Expand Down
2 changes: 1 addition & 1 deletion src/config/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,5 +42,5 @@ pub enum ValidationError {
#[error(transparent)]
ValueInvalid(#[from] ValueInvalidArgs),
#[error(transparent)]
FilterInvalid(#[from] crate::filters::Error),
FilterInvalid(#[from] crate::filters::CreationError),
}
4 changes: 2 additions & 2 deletions src/config/slot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,11 +193,11 @@ impl<T: JsonSchema + Default> JsonSchema for Slot<T> {
}

impl<T: crate::filters::Filter + Default> crate::filters::Filter for Slot<T> {
fn read(&self, ctx: &mut ReadContext) -> Option<()> {
fn read(&self, ctx: &mut ReadContext) -> Result<(), FilterError> {
self.load().read(ctx)
}

fn write(&self, ctx: &mut WriteContext) -> Option<()> {
fn write(&self, ctx: &mut WriteContext) -> Result<(), FilterError> {
self.load().write(ctx)
}
}
Expand Down
35 changes: 18 additions & 17 deletions src/filters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ pub mod token_router;
/// [`FilterFactory`].
pub mod prelude {
pub use super::{
ConvertProtoConfigError, CreateFilterArgs, Error, Filter, FilterInstance, ReadContext,
StaticFilter, WriteContext,
ConvertProtoConfigError, CreateFilterArgs, CreationError, Filter, FilterError,
FilterInstance, ReadContext, StaticFilter, WriteContext,
};
}

Expand All @@ -55,7 +55,7 @@ pub use self::{
concatenate_bytes::ConcatenateBytes,
debug::Debug,
drop::Drop,
error::{ConvertProtoConfigError, Error},
error::{ConvertProtoConfigError, CreationError, FilterError},
factory::{CreateFilterArgs, DynFilterFactory, FilterFactory, FilterInstance},
firewall::Firewall,
load_balancer::LoadBalancer,
Expand Down Expand Up @@ -83,13 +83,13 @@ pub use self::chain::FilterChain;
/// struct Greet;
///
/// impl Filter for Greet {
/// fn read(&self, ctx: &mut ReadContext) -> Option<()> {
/// fn read(&self, ctx: &mut ReadContext) -> Result<(), FilterError> {
/// ctx.contents.splice(0..0, b"Hello ".into_iter().copied());
/// Some(())
/// Ok(())
/// }
/// fn write(&self, ctx: &mut WriteContext) -> Option<()> {
/// fn write(&self, ctx: &mut WriteContext) -> Result<(), FilterError> {
/// ctx.contents.splice(0..0, b"Goodbye ".into_iter().copied());
/// Some(())
/// Ok(())
/// }
/// }
///
Expand All @@ -98,7 +98,7 @@ pub use self::chain::FilterChain;
/// type Configuration = ();
/// type BinaryConfiguration = ();
///
/// fn try_from_config(_: Option<Self::Configuration>) -> Result<Self, quilkin::filters::Error> {
/// fn try_from_config(_: Option<Self::Configuration>) -> Result<Self, CreationError> {
/// Ok(Self)
/// }
/// }
Expand All @@ -107,7 +107,7 @@ pub trait StaticFilter: Filter + Sized
// This where clause simply states that `Configuration`'s and
// `BinaryConfiguration`'s `Error` types are compatible with `filters::Error`.
where
Error: From<<Self::Configuration as TryFrom<Self::BinaryConfiguration>>::Error>
CreationError: From<<Self::Configuration as TryFrom<Self::BinaryConfiguration>>::Error>
+ From<<Self::BinaryConfiguration as TryFrom<Self::Configuration>>::Error>,
{
/// The globally unique name of the filter.
Expand All @@ -131,7 +131,7 @@ where
/// Instantiates a new [`StaticFilter`] from the given configuration, if any.
/// # Errors
/// If the provided configuration is invalid.
fn try_from_config(config: Option<Self::Configuration>) -> Result<Self, Error>;
fn try_from_config(config: Option<Self::Configuration>) -> Result<Self, CreationError>;

/// Instantiates a new [`StaticFilter`] from the given configuration, if any.
/// # Panics
Expand All @@ -152,15 +152,16 @@ where
/// which require a fully initialized [`Self::Configuration`].
fn ensure_config_exists(
config: Option<Self::Configuration>,
) -> Result<Self::Configuration, Error> {
config.ok_or(Error::MissingConfig(Self::NAME))
) -> Result<Self::Configuration, CreationError> {
config.ok_or(CreationError::MissingConfig(Self::NAME))
}

fn as_filter_config(
config: impl Into<Option<Self::Configuration>>,
) -> Result<crate::config::Filter, Error> {
) -> Result<crate::config::Filter, CreationError> {
Ok(crate::config::Filter {
name: Self::NAME.into(),
label: None,
config: config
.into()
.map(|config| serde_json::to_value(&config))
Expand Down Expand Up @@ -197,8 +198,8 @@ pub trait Filter: Send + Sync {
/// This function should return an `Some` if the packet processing should
/// proceed. If the packet should be rejected, it will return [`None`]
/// instead. By default, the context passes through unchanged.
fn read(&self, _: &mut ReadContext) -> Option<()> {
Some(())
fn read(&self, _: &mut ReadContext) -> Result<(), FilterError> {
Ok(())
}

/// [`Filter::write`] is invoked when the proxy is about to send data to a
Expand All @@ -207,7 +208,7 @@ pub trait Filter: Send + Sync {
///
/// This function should return an `Some` if the packet processing should
/// proceed. If the packet should be rejected, it will return [`None`]
fn write(&self, _: &mut WriteContext) -> Option<()> {
Some(())
fn write(&self, _: &mut WriteContext) -> Result<(), FilterError> {
Ok(())
}
}
Loading

0 comments on commit b4cb71e

Please sign in to comment.