Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add clippy to ci and fix warnings #85

Merged
merged 5 commits into from
Aug 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions ci/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

FROM rust:1.42.0
FROM rust:1.45.0

RUN rustup component add rustfmt
RUN rustup component add rustfmt clippy

ENTRYPOINT ["cargo"]
3 changes: 3 additions & 0 deletions cloudbuild.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ steps:
- name: gcr.io/$PROJECT_ID/ci
args: ["fmt", "--", "--check"]
id: fmt-check
- name: gcr.io/$PROJECT_ID/ci
args: ["clippy"]
id: clippy
- name: gcr.io/$PROJECT_ID/ci
args: ["test"]
id: test
Expand Down
4 changes: 2 additions & 2 deletions src/extensions/filter_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,11 @@ impl FilterChain {
impl Filter for FilterChain {
fn on_downstream_receive(
&self,
endpoints: &Vec<EndPoint>,
endpoints: &[EndPoint],
from: SocketAddr,
contents: Vec<u8>,
) -> Option<(Vec<EndPoint>, Vec<u8>)> {
let mut e = endpoints.clone();
let mut e = endpoints.to_vec();
markmandel marked this conversation as resolved.
Show resolved Hide resolved
let mut c = contents;
for f in &self.filters {
match f.on_downstream_receive(&e, from, c) {
Expand Down
17 changes: 6 additions & 11 deletions src/extensions/filter_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub trait Filter: Send + Sync {
/// If the packet should be rejected, return None.
fn on_downstream_receive(
&self,
endpoints: &Vec<EndPoint>,
endpoints: &[EndPoint],
iffyio marked this conversation as resolved.
Show resolved Hide resolved
from: SocketAddr,
contents: Vec<u8>,
) -> Option<(Vec<EndPoint>, Vec<u8>)>;
Expand Down Expand Up @@ -85,17 +85,12 @@ pub trait FilterFactory: Sync + Send {
}

/// FilterRegistry is the registry of all Filters that can be applied in the system.
#[derive(Default)]
iffyio marked this conversation as resolved.
Show resolved Hide resolved
pub struct FilterRegistry {
registry: HashMap<String, Box<dyn FilterFactory>>,
}

impl FilterRegistry {
pub fn new() -> FilterRegistry {
FilterRegistry {
registry: Default::default(),
}
}

/// insert registers a Filter under the provider's given name.
pub fn insert<P: 'static>(&mut self, provider: P)
where
Expand All @@ -106,13 +101,13 @@ impl FilterRegistry {

/// get returns an instance of a filter for a given Key. Returns Error if not found,
/// or if there is a configuration issue.
pub fn get(&self, key: &String, config: &serde_yaml::Value) -> Result<Box<dyn Filter>, Error> {
pub fn get(&self, key: &str, config: &serde_yaml::Value) -> Result<Box<dyn Filter>, Error> {
match self
.registry
.get(key)
.map(|p| p.create_from_config(&config))
{
None => Err(Error::NotFound(key.clone())),
None => Err(Error::NotFound(key.into())),
Some(filter) => filter,
}
}
Expand All @@ -131,7 +126,7 @@ mod tests {
impl Filter for TestFilter {
fn on_downstream_receive(
&self,
_: &Vec<EndPoint>,
_: &[EndPoint],
_: SocketAddr,
_: Vec<u8>,
) -> Option<(Vec<EndPoint>, Vec<u8>)> {
Expand All @@ -151,7 +146,7 @@ mod tests {

#[test]
fn insert_and_get() {
let mut reg = FilterRegistry::new();
let mut reg = FilterRegistry::default();
reg.insert(TestFilterFactory {});
let config = serde_yaml::Value::Null;

Expand Down
8 changes: 4 additions & 4 deletions src/extensions/filters/debug_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ impl DebugFilterFactory {

impl FilterFactory for DebugFilterFactory {
fn name(&self) -> String {
return String::from("quilkin.extensions.filters.debug_filter.v1alpha1.DebugFilter");
"quilkin.extensions.filters.debug_filter.v1alpha1.DebugFilter".into()
}

fn create_from_config(&self, config: &Value) -> Result<Box<dyn Filter>, Error> {
Expand All @@ -83,7 +83,7 @@ impl FilterFactory for DebugFilterFactory {
_ => None,
};

return match prefix {
match prefix {
// if no config value supplied, then no prefix, which is fine
None => Ok(Box::new(DebugFilter::new(&self.log, None))),
// return an Error if the id exists but is not a string.
Expand All @@ -97,14 +97,14 @@ impl FilterFactory for DebugFilterFactory {
Some(prefix.to_string()),
))),
},
};
}
}
}

impl Filter for DebugFilter {
fn on_downstream_receive(
&self,
endpoints: &Vec<EndPoint>,
endpoints: &[EndPoint],
from: SocketAddr,
contents: Vec<u8>,
) -> Option<(Vec<EndPoint>, Vec<u8>)> {
Expand Down
14 changes: 5 additions & 9 deletions src/extensions/filters/local_rate_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,9 @@ struct Config {
}

/// Creates instances of RateLimitFilter.
#[derive(Default)]
pub struct RateLimitFilterFactory;

impl RateLimitFilterFactory {
pub fn new() -> Self {
RateLimitFilterFactory {}
}
}

/// A filter that implements rate limiting on packets based on
/// the token-bucket algorithm.
/// Packets that violate the rate limit are dropped.
Expand All @@ -95,7 +90,7 @@ impl FilterFactory for RateLimitFilterFactory {
match config.period {
Some(period) if period.lt(&Duration::from_millis(100)) => Err(Error::FieldInvalid {
field: "period".into(),
reason: format!("value must be at least 100ms"),
reason: "value must be at least 100ms".into(),
}),
_ => Ok(Box::new(RateLimitFilter::new(config))),
}
Expand Down Expand Up @@ -176,11 +171,12 @@ impl Drop for RateLimitFilter {
impl Filter for RateLimitFilter {
fn on_downstream_receive(
&self,
endpoints: &Vec<EndPoint>,
endpoints: &[EndPoint],
_from: SocketAddr,
contents: Vec<u8>,
) -> Option<(Vec<EndPoint>, Vec<u8>)> {
self.acquire_token().map(|()| (endpoints.clone(), contents))
self.acquire_token()
.map(|()| (endpoints.to_vec(), contents))
}

fn on_upstream_receive(
Expand Down
4 changes: 2 additions & 2 deletions src/extensions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ mod filter_chain;
/// default_registry returns a FilterRegistry with the default
/// set of filters that are user configurable registered to it
pub fn default_registry(base: &Logger) -> FilterRegistry {
let mut fr = FilterRegistry::new();
let mut fr = FilterRegistry::default();
fr.insert(filters::DebugFilterFactory::new(base));
fr.insert(filters::RateLimitFilterFactory::new());
fr.insert(filters::RateLimitFilterFactory::default());
fr
}
5 changes: 4 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,12 @@
* limitations under the License.
*/

// Fail the build if clippy finds any warnings.
#![deny(warnings)]
iffyio marked this conversation as resolved.
Show resolved Hide resolved

pub mod config;
pub mod extensions;
mod load_balancer_policy;
pub mod metrics;
pub mod server;
pub mod proxy;
pub mod test_utils;
2 changes: 1 addition & 1 deletion src/load_balancer_policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ impl LoadBalancerPolicy {
} => (
lb_policy,
addresses
.into_iter()
.iter()
.cloned()
.enumerate()
.map(|(offset, address)| EndPoint {
Expand Down
7 changes: 3 additions & 4 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ use slog::{info, o, Drain, Logger};
use prometheus::Registry;
use quilkin::config::Config;
use quilkin::extensions::default_registry;
use quilkin::server::{Metrics, Server};
use quilkin::proxy::{Metrics, Server};
use tokio::signal;
use tokio::sync::oneshot;

const VERSION: &'static str = env!("CARGO_PKG_VERSION");
const VERSION: &str = env!("CARGO_PKG_VERSION");

#[tokio::main]
async fn main() {
Expand Down Expand Up @@ -75,6 +75,5 @@ fn logger() -> Logger {
.build()
.fuse();
let drain = slog_async::Async::new(drain).build().fuse();
let log = slog::Logger::root(drain, o!());
return log;
slog::Logger::root(drain, o!())
}
2 changes: 1 addition & 1 deletion src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub fn histogram_opts(
) -> HistogramOpts {
HistogramOpts {
common_opts: opts(name, subsystem, description),
buckets: buckets.unwrap_or(Vec::from(DEFAULT_BUCKETS as &'static [f64])),
buckets: buckets.unwrap_or_else(|| Vec::from(DEFAULT_BUCKETS as &'static [f64])),
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/server/metrics.rs → src/proxy/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::server::sessions::metrics::Metrics as SessionMetrics;
use crate::proxy::sessions::metrics::Metrics as SessionMetrics;
use prometheus::{Encoder, Registry, Result as MetricsResult, TextEncoder};
use slog::{info, warn, Logger};
use std::net::SocketAddr;
Expand Down Expand Up @@ -35,7 +35,7 @@ pub fn start_metrics_server(
warn!(log, "failed to convert metrics to utf8: {:?}", err);
})
})
.unwrap_or("# failed to gather metrics".to_string())
.unwrap_or_else(|_| "# failed to gather metrics".to_string())
});

let (_, server) = warp::serve(metrics_route).bind_with_graceful_shutdown(addr, async {
Expand Down
File renamed without changes.
23 changes: 11 additions & 12 deletions src/server/server.rs → src/proxy/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use tokio::time::{delay_for, Duration, Instant};
use crate::config::{Config, ConnectionConfig, EndPoint};
use crate::extensions::{Filter, FilterChain, FilterRegistry};
use crate::load_balancer_policy::LoadBalancerPolicy;
use crate::server::sessions::{Packet, Session, SESSION_TIMEOUT_SECONDS};
use crate::proxy::sessions::{Packet, Session, SESSION_TIMEOUT_SECONDS};

use super::metrics::{start_metrics_server, Metrics};

Expand All @@ -49,11 +49,11 @@ impl Server {
/// new Server. Takes a logger, and the registry of available Filters.
pub fn new(base: Logger, filter_registry: FilterRegistry, metrics: Metrics) -> Self {
let log = base.new(o!("source" => "server::Server"));
return Server {
Server {
log,
filter_registry,
metrics,
};
}
}

/// start the async processing of incoming UDP packets. Will block until an
Expand Down Expand Up @@ -268,7 +268,7 @@ impl Server {
/// bind binds the local configured port
async fn bind(config: &Config) -> Result<UdpSocket> {
let addr = SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), config.local.port);
return UdpSocket::bind(addr).await;
UdpSocket::bind(addr).await
}

/// ensure_session makes sure there is a value session for the name in the sessions map
Expand Down Expand Up @@ -300,7 +300,7 @@ impl Server {
let mut map = sessions.write().await;
map.insert(s.key(), Mutex::new(s));
}
return Ok(());
Ok(())
}

/// prune_sessions removes expired Sessions from the SessionMap.
Expand All @@ -315,8 +315,7 @@ impl Server {
let session = v.lock().await;
let expiration = session.expiration().await;
if expiration.lt(&now) {
let value = k.clone();
remove_keys.push(value);
remove_keys.push(*k);
}
}
}
Expand Down Expand Up @@ -351,7 +350,7 @@ mod tests {
use crate::config;
use crate::config::{Config, ConnectionConfig, EndPoint, Local};
use crate::extensions::default_registry;
use crate::server::sessions::{Packet, SESSION_TIMEOUT_SECONDS};
use crate::proxy::sessions::{Packet, SESSION_TIMEOUT_SECONDS};
use crate::test_utils::{
ephemeral_socket, logger, recv_udp, recv_udp_done, TestFilter, TestFilterFactory,
};
Expand All @@ -361,7 +360,7 @@ mod tests {
#[tokio::test]
async fn run_server() {
let log = logger();
let server = Server::new(log.clone(), FilterRegistry::new(), Metrics::default());
let server = Server::new(log.clone(), FilterRegistry::default(), Metrics::default());

let socket1 = ephemeral_socket().await;
let endpoint1 = socket1.local_addr().unwrap();
Expand Down Expand Up @@ -412,7 +411,7 @@ mod tests {
#[tokio::test]
async fn run_client() {
let log = logger();
let server = Server::new(log.clone(), FilterRegistry::new(), Metrics::default());
let server = Server::new(log.clone(), FilterRegistry::default(), Metrics::default());
let socket = ephemeral_socket().await;
let endpoint_addr = socket.local_addr().unwrap();
let (recv, mut send) = socket.split();
Expand Down Expand Up @@ -446,7 +445,7 @@ mod tests {
#[tokio::test]
async fn run_with_filter() {
let log = logger();
let mut registry = FilterRegistry::new();
let mut registry = FilterRegistry::default();
registry.insert(TestFilterFactory {});

let server = Server::new(log.clone(), registry, Metrics::default());
Expand Down Expand Up @@ -692,7 +691,7 @@ mod tests {

#[tokio::test]
async fn run_receive_packet() {
let server = Server::new(logger(), FilterRegistry::new(), Metrics::default());
let server = Server::new(logger(), FilterRegistry::default(), Metrics::default());
let msg = "hello";

// without a filter
Expand Down
File renamed without changes.
File renamed without changes.
Loading