Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ authors = ["Bavfalcon9 <olybear9@gmail.com>"]
edition = "2021"

[features]
default = [ "async_tokio" ]
default = [ "async-std" ]
mcpe = []
debug = []
debug_all = []
Expand All @@ -19,4 +19,4 @@ tokio = { version = "1.15.0", features = ["full"], optional = true }
byteorder = "1.4.3"
futures = "0.3.19"
futures-executor = "0.3.19"
async-std = { version = "1.10.0", optional = true }
async-std = { version = "1.10.0", optional = true, features = [ "unstable" ] }
402 changes: 247 additions & 155 deletions src/connection/mod.rs

Large diffs are not rendered by default.

71 changes: 48 additions & 23 deletions src/connection/queue/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ use std::collections::HashMap;

use crate::protocol::frame::FragmentMeta;
use crate::protocol::frame::Frame;
use crate::protocol::reliability::Reliability;
use crate::protocol::RAKNET_HEADER_FRAME_OVERHEAD;
use crate::server::current_epoch;

pub enum NetQueueError<E> {
Expand Down Expand Up @@ -59,12 +61,31 @@ pub struct RecoveryQueue<Item> {
queue: HashMap<u32, (u64, Item)>,
}

impl<Item> RecoveryQueue<Item> {
impl<Item> RecoveryQueue<Item>
where
Item: Clone,
{
pub fn new() -> Self {
Self {
queue: HashMap::new(),
}
}

pub fn insert_id(&mut self, seq: u32, item: Item) {
self.queue.insert(seq, (current_epoch(), item));
}

pub fn flush_old(&mut self, threshold: u64) -> Vec<Item> {
let old = self
.queue
.iter()
.filter(|(_, (time, _))| (*time + threshold) < current_epoch())
.map(|(_, (_, item))| item.clone())
.collect::<Vec<_>>();
self.queue
.retain(|_, (time, _)| (*time + threshold) > current_epoch());
old
}
}

impl<Item> NetQueue<Item> for RecoveryQueue<Item> {
Expand Down Expand Up @@ -368,46 +389,50 @@ impl FragmentQueue {

/// This will split a given frame into a bunch of smaller frames within the specified
/// restriction.
pub fn split_insert(&mut self, frame: Frame, mtu: u16) -> Result<u16, FragmentQueueError> {
let max_mtu = mtu - 60;
pub fn split_insert(&mut self, buffer: &[u8], mtu: u16) -> Result<u16, FragmentQueueError> {
let max_mtu = mtu - RAKNET_HEADER_FRAME_OVERHEAD;

self.fragment_id += self.fragment_id.wrapping_add(1);

let id = self.fragment_id;

if self.fragments.contains_key(&id) {
self.fragments.remove(&id);
}

if let Ok(frames) = Self::split(buffer, id, mtu) {
self.fragments.insert(id, (frames.len() as u32, frames));
return Ok(id);
}

return Err(FragmentQueueError::DoesNotNeedSplit);
}

pub fn split(buffer: &[u8], id: u16, mtu: u16) -> Result<Vec<Frame>, FragmentQueueError> {
let max_mtu = mtu - RAKNET_HEADER_FRAME_OVERHEAD;

if frame.body.len() > max_mtu.into() {
let splits = frame
.body
if buffer.len() > max_mtu.into() {
let splits = buffer
.chunks(max_mtu.into())
.map(|c| c.to_vec())
.collect::<Vec<Vec<u8>>>();
let id = self.fragment_id.wrapping_add(1);
let mut index = 0;
let mut frames: Vec<Frame> = Vec::new();
let mut index: u32 = 0;

for buf in splits.iter() {
let mut f = Frame::init();
let mut f = Frame::new(Reliability::ReliableOrd, Some(buf.clone()));
f.fragment_meta = Some(FragmentMeta {
index,
size: splits.len() as u32,
id,
});

f.reliability = frame.reliability;
f.flags = frame.flags;
f.size = buf.len() as u16;
f.body = buf.clone();
f.order_index = frame.order_index;
f.order_channel = frame.order_channel;
f.reliable_index = frame.reliable_index;

index += 1;

frames.push(f);
}

if self.fragments.contains_key(&id) {
self.fragments.remove(&id);
}

self.fragments.insert(id, (splits.len() as u32, frames));
return Ok(id);
return Ok(frames);
}

return Err(FragmentQueueError::DoesNotNeedSplit);
Expand Down
117 changes: 99 additions & 18 deletions src/connection/queue/send.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,29 @@
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;

#[cfg(feature = "async-std")]
use async_std::net::UdpSocket;
use binary_utils::Streamable;
#[cfg(feature = "tokio")]
use tokio::net::UdpSocket;

use crate::protocol::ack::{Ack, Ackable, Record, SingleRecord};
use crate::protocol::frame::FramePacket;
use crate::protocol::frame::{Frame, FramePacket};
use crate::protocol::packet::Packet;
use crate::protocol::reliability::Reliability;
use crate::protocol::{RAKNET_HEADER_FRAME_OVERHEAD, RAKNET_HEADER_OVERHEAD};
use crate::util::SafeGenerator;

use super::{NetQueue, RecoveryQueue};
use super::{FragmentQueue, NetQueue, OrderedQueue, RecoveryQueue};

/// This queue is used to prioritize packets being sent out
/// Packets that are old, are either dropped or requested again.
/// You can define this behavior with the `timeout` property.
#[derive(Debug, Clone)]
pub struct SendQueue {
mtu_size: u16,

/// The amount of time that needs to pass for a packet to be
/// dropped or requested again.
timeout: u16,
Expand All @@ -20,40 +35,106 @@ pub struct SendQueue {
/// The current sequence number. This is incremented every time
/// a packet is sent reliably. We can resend these if they are
/// NAcked.
send_seq: u32,
send_seq: SafeGenerator<u32>,

/// The current reliable index number.
/// a packet is sent reliably an sequenced.
reliable_seq: SafeGenerator<u16>,
reliable_seq: SafeGenerator<u32>,

/// The current recovery queue.
ack: RecoveryQueue<Vec<u8>>,
ack: RecoveryQueue<FramePacket>,

/// The fragment queue.
fragment_queue: FragmentQueue,

order_channels: HashMap<u8, OrderedQueue<Vec<u8>>>,

ready: Vec<Frame>,

socket: Arc<UdpSocket>,

address: SocketAddr,
}

impl SendQueue {
pub fn new() -> Self {
pub fn new(
mtu_size: u16,
timeout: u16,
max_tries: u16,
socket: Arc<UdpSocket>,
address: SocketAddr,
) -> Self {
Self {
timeout: 5000,
max_tries: 5,
send_seq: 0,
mtu_size,
timeout,
max_tries,
send_seq: SafeGenerator::new(),
reliable_seq: SafeGenerator::new(),
ack: RecoveryQueue::new(),
fragment_queue: FragmentQueue::new(),
order_channels: HashMap::new(),
ready: Vec::new(),
socket,
address,
}
}

/// Send a packet based on its reliability.
/// Note, reliability will be set to `Reliability::ReliableOrd` if
/// the buffer is larger than max MTU.
pub async fn insert(
&mut self,
packet: Vec<u8>,
reliability: Reliability,
channel: u8,
immediate: bool,
) {
let reliable = if packet.len() > (self.mtu_size + RAKNET_HEADER_FRAME_OVERHEAD) as usize {
Reliability::ReliableOrd
} else {
reliability
};

// match reliability {
// Reliability::Unreliable => {
// // we can just send this packet out immediately.
// let frame = Frame::new(Reliability::Unreliable, Some(packet));
// self.send_frame(frame).await;
// },
// Reliability::Reliable => {
// // we need to send this packet out reliably.
// let frame = Frame::new(Reliability::Reliable, Some(packet));
// self.send_frame(frame).await;
// },
// }
}

async fn send_frame(&mut self, mut frame: Frame) {
let mut pk = FramePacket::new();
pk.sequence = self.send_seq.next();
pk.reliability = frame.reliability;

if pk.reliability.is_reliable() {
frame.reliable_index = Some(self.reliable_seq.next());
}

if let Ok(buf) = pk.parse() {
self.send_socket(&buf[..]).await;
}
}

pub fn insert(&mut self, packet: Vec<u8>, reliable: bool) -> u32 {
// let next_id = self.fragment_seq.next();
todo!()
async fn send_socket(&mut self, packet: &[u8]) {
self.socket.send_to(packet, &self.address).await.unwrap();
}

pub fn next_seq(&mut self) -> u32 {
let seq = self.send_seq;
self.send_seq = self.send_seq.wrapping_add(1);
return seq;
pub async fn send_packet(&mut self, packet: Packet, immediate: bool) {
// parse the packet
}
}

impl Ackable for SendQueue {
type NackItem = FramePacket;

fn ack(&mut self, ack: Ack) {
if ack.is_nack() {
return;
Expand All @@ -74,12 +155,12 @@ impl Ackable for SendQueue {
}
}

fn nack(&mut self, nack: Ack) -> Vec<Vec<u8>> {
fn nack(&mut self, nack: Ack) -> Vec<FramePacket> {
if !nack.is_nack() {
return Vec::new();
}

let mut resend_queue = Vec::<Vec<u8>>::new();
let mut resend_queue = Vec::<FramePacket>::new();

// we need to get the packets to resend.
for record in nack.records.iter() {
Expand Down
4 changes: 3 additions & 1 deletion src/protocol/ack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,16 @@ use binary_utils::Streamable;
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt, BE};

pub(crate) trait Ackable {
type NackItem;

/// When an ack packet is recieved.
/// We should ack the queue
fn ack(&mut self, index: Ack) {}

/// When an NACK packet is recieved.
/// We should nack the queue
/// This should return the packets that need to be resent.
fn nack(&mut self, packet: Ack) -> Vec<Vec<u8>> {
fn nack(&mut self, packet: Ack) -> Vec<Self::NackItem> {
todo!()
}
}
Expand Down
19 changes: 19 additions & 0 deletions src/protocol/frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,25 @@ impl Frame {
}
}

/// Initializes a new frame with the given reliability.
pub fn new(reliability: Reliability, body: Option<Vec<u8>>) -> Self {
Self {
flags: 0,
size: if let Some(b) = body.as_ref() {
b.len() as u16
} else {
0
},
reliable_index: None,
sequence_index: None,
order_index: None,
order_channel: None,
fragment_meta: None,
reliability,
body: body.unwrap_or(Vec::new()),
}
}

/// Whether or not the frame is fragmented.
pub fn is_fragmented(&self) -> bool {
self.fragment_meta.is_some()
Expand Down
5 changes: 5 additions & 0 deletions src/protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,8 @@ pub use magic::*;

pub const MAX_FRAGS: u32 = 1024;
pub const MAX_ORD_CHANS: u8 = 32;

// IP Header + UDP Header + RakNet Header + RakNet Frame Packet Header (MAX)
pub const RAKNET_HEADER_FRAME_OVERHEAD: u16 = 20 + 8 + 8 + 4 + 20;
// IP Header + UDP Header + RakNet Header
pub const RAKNET_HEADER_OVERHEAD: u16 = 20 + 8 + 8;
Loading