-
Notifications
You must be signed in to change notification settings - Fork 94
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implementation of endpoint_send_filter #58
Merged
Merged
Changes from 1 commit
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -30,13 +30,15 @@ use tokio::sync::{mpsc, watch, RwLock}; | |
use tokio::time::{Duration, Instant}; | ||
|
||
use crate::config::EndPoint; | ||
use crate::extensions::{Filter, FilterChain}; | ||
|
||
/// SESSION_TIMEOUT_SECONDS is the default session timeout - which is one minute. | ||
pub const SESSION_TIMEOUT_SECONDS: u64 = 60; | ||
|
||
/// Session encapsulates a UDP stream session | ||
pub struct Session { | ||
log: Logger, | ||
chain: Arc<FilterChain>, | ||
send: SendHalf, | ||
/// dest is where to send data to | ||
dest: EndPoint, | ||
|
@@ -75,6 +77,7 @@ impl Session { | |
/// from its ephemeral port from endpoint(s) | ||
pub async fn new( | ||
base: &Logger, | ||
chain: Arc<FilterChain>, | ||
from: SocketAddr, | ||
dest: EndPoint, | ||
sender: mpsc::Sender<Packet>, | ||
|
@@ -84,6 +87,7 @@ impl Session { | |
let (closer, closed) = watch::channel::<bool>(false); | ||
let mut s = Session { | ||
log: base.new(o!("source" => "server::Session", "from" => from, "dest_name" => dest.name.clone(), "dest_address" => dest.address.clone())), | ||
chain, | ||
send, | ||
from, | ||
dest, | ||
|
@@ -186,7 +190,15 @@ impl Session { | |
/// Sends a packet to the Session's dest. | ||
pub async fn send_to(&mut self, buf: &[u8]) -> Result<usize> { | ||
debug!(self.log, "Sending packet"; "dest_name" => &self.dest.name, "dest_address" => &self.dest.address, "contents" => from_utf8(buf).unwrap()); | ||
return self.send.send_to(buf, &self.dest.address).await; | ||
|
||
if let Some(data) = self | ||
.chain | ||
.endpoint_send_filter(&self.dest, self.from, buf.to_vec()) | ||
{ | ||
return self.send.send_to(data.as_slice(), &self.dest.address).await; | ||
} | ||
|
||
Ok(0) | ||
} | ||
|
||
/// is_closed returns if the Session is closed or not. | ||
|
@@ -208,7 +220,7 @@ mod tests { | |
use tokio::time; | ||
use tokio::time::delay_for; | ||
|
||
use crate::test_utils::{ephemeral_socket, logger, recv_udp}; | ||
use crate::test_utils::{ephemeral_socket, logger, recv_udp, TestFilter}; | ||
|
||
use super::*; | ||
|
||
|
@@ -226,9 +238,15 @@ mod tests { | |
}; | ||
let (send_packet, mut recv_packet) = mpsc::channel::<Packet>(5); | ||
|
||
let mut sess = Session::new(&log, local_addr, endpoint, send_packet) | ||
.await | ||
.unwrap(); | ||
let mut sess = Session::new( | ||
&log, | ||
Arc::new(FilterChain::new(vec![])), | ||
local_addr, | ||
endpoint, | ||
send_packet, | ||
) | ||
.await | ||
.unwrap(); | ||
|
||
let initial_expiration: Instant; | ||
{ | ||
|
@@ -272,6 +290,8 @@ mod tests { | |
async fn session_send_to() { | ||
let log = logger(); | ||
let msg = "hello"; | ||
|
||
// without a filter | ||
let (sender, _) = mpsc::channel::<Packet>(1); | ||
let (local_addr, wait) = recv_udp().await; | ||
let endpoint = EndPoint { | ||
|
@@ -280,11 +300,41 @@ mod tests { | |
connection_ids: vec![], | ||
}; | ||
|
||
let mut session = Session::new(&log, local_addr, endpoint.clone(), sender) | ||
.await | ||
.unwrap(); | ||
let mut session = Session::new( | ||
&log, | ||
Arc::new(FilterChain::new(vec![])), | ||
local_addr, | ||
endpoint.clone(), | ||
sender, | ||
) | ||
.await | ||
.unwrap(); | ||
session.send_to(msg.as_bytes()).await.unwrap(); | ||
assert_eq!(msg, wait.await.unwrap()); | ||
|
||
// with a filters | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 'filters' should be 'filter' ? |
||
let (sender, _) = mpsc::channel::<Packet>(1); | ||
let (local_addr, wait) = recv_udp().await; | ||
let endpoint = EndPoint { | ||
name: "endpoint".to_string(), | ||
address: local_addr, | ||
connection_ids: vec![], | ||
}; | ||
let mut session = Session::new( | ||
&log, | ||
Arc::new(FilterChain::new(vec![Arc::new(TestFilter {})])), | ||
local_addr, | ||
endpoint.clone(), | ||
sender, | ||
) | ||
.await | ||
.unwrap(); | ||
|
||
session.send_to(msg.as_bytes()).await.unwrap(); | ||
assert_eq!( | ||
format!("{}:esf:{}:{}", msg, endpoint.name, local_addr), | ||
wait.await.unwrap() | ||
); | ||
} | ||
|
||
#[tokio::test] | ||
|
@@ -300,9 +350,15 @@ mod tests { | |
}; | ||
|
||
info!(log, ">> creating sessions"); | ||
let sess = Session::new(&log, local_addr, endpoint, send_packet) | ||
.await | ||
.unwrap(); | ||
let sess = Session::new( | ||
&log, | ||
Arc::new(FilterChain::new(vec![])), | ||
local_addr, | ||
endpoint, | ||
send_packet, | ||
) | ||
.await | ||
.unwrap(); | ||
info!(log, ">> session created and running"); | ||
|
||
assert!(!sess.is_closed(), "session should not be closed"); | ||
|
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we change the return type for this function to e.g
Result<Option<size>>
or similar enum that would allow the caller to differentiate between a dropped packet and a packet without payload (that could have size 0?)? In the drop case the returned value would be e.gOk(None)
so that we won't have to worry about cases where size 0 might be special.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Strong agree on this one, indeed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Excellent suggestion. Will jump on it 👍