Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ SANDBOX
vendor
perf.data*
flamegraph*.svg
Cargo.lock
27 changes: 27 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Changelog

All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

<!-- next-header -->

## [Unreleased] - ReleaseDate

### Added

Added ability to create a "half-open" - preventing messages with the half-open stream id from being routed as socket messages.
The JavaScript implementation does something similar.

### Changed

`UdxSocket::bind` is no longer async. It didn't need to be, but it does need have a tokio runtime running.

Fixes a bug where we were getting the socket address of sender wrong

### Removed


<!-- next-url -->
[Unreleased]: https://github.com/datrs/async-udx/compare/v0.1.0...HEAD
3 changes: 1 addition & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "async-udx"
version = "0.1.0"
edition = "2021"
edition = "2024"
license = "MIT OR Apache-2.0"
description = "Rust port of libudx, a protocol for reliable, multiplex, and congestion controlled streams over udp"
authors = ["Franz Heinzmann (Frando) <frando@unbiskant.org>"]
Expand All @@ -20,7 +20,6 @@ categories = [
bitflags = "1.3.2"
tokio = { version = "1.21.2", features = ["net", "macros", "rt-multi-thread", "time", "io-util", "sync"] }
futures = "0.3.25"
derivative = "2.2.0"
tracing = "0.1.37"
bytes = "1.2.1"
log = "0.4.17"
Expand Down
10 changes: 5 additions & 5 deletions benches/throughput.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use tokio::{
net::TcpStream,
};

use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
use criterion::{BenchmarkId, Criterion, Throughput, criterion_group, criterion_main};
criterion_group!(server_benches, bench_throughput);
criterion_main!(server_benches);
fn rt() -> tokio::runtime::Runtime {
Expand Down Expand Up @@ -41,8 +41,8 @@ fn bench_throughput(c: &mut Criterion) {
let num_streams = n_streams;
let limit = *len / num_streams;
b.to_async(&rt).iter_custom(|iters| async move {
let socka = UdxSocket::bind("127.0.0.1:0").await.unwrap();
let sockb = UdxSocket::bind("127.0.0.1:0").await.unwrap();
let socka = UdxSocket::bind("127.0.0.1:0").unwrap();
let sockb = UdxSocket::bind("127.0.0.1:0").unwrap();
let addra = socka.local_addr().unwrap();
let addrb = sockb.local_addr().unwrap();
let mut readers = vec![];
Expand Down Expand Up @@ -191,8 +191,8 @@ async fn setup_pipe_tcp() -> io::Result<(TcpStream, TcpStream)> {
}

async fn setup_pipe_udx() -> io::Result<(UdxStream, UdxStream)> {
let socka = UdxSocket::bind("127.0.0.1:0").await?;
let sockb = UdxSocket::bind("127.0.0.1:0").await?;
let socka = UdxSocket::bind("127.0.0.1:0")?;
let sockb = UdxSocket::bind("127.0.0.1:0")?;
let addra = socka.local_addr()?;
let addrb = sockb.local_addr()?;
let streama = socka.connect(addrb, 1, 2)?;
Expand Down
6 changes: 3 additions & 3 deletions examples/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ async fn main() {
}

async fn run(total: usize, num_streams: usize) -> io::Result<()> {
let socka = UdxSocket::bind("127.0.0.1:0").await?;
let sockb = UdxSocket::bind("127.0.0.1:0").await?;
let socka = UdxSocket::bind("127.0.0.1:0")?;
let sockb = UdxSocket::bind("127.0.0.1:0")?;
let addra = socka.local_addr()?;
let addrb = sockb.local_addr()?;

Expand Down Expand Up @@ -77,7 +77,7 @@ fn usize_from_env(name: &str, default: usize) -> usize {
std::env::var(name)
.map(|x| {
x.parse::<usize>()
.expect(&format!("{} must be a number", name))
.unwrap_or_else(|_| panic!("{} must be a number", name))
})
.unwrap_or(default)
}
12 changes: 6 additions & 6 deletions examples/multi_stream.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use async_udx::{UdxSocket, UDX_DATA_MTU};
use async_udx::{UDX_DATA_MTU, UdxSocket};
use std::time::Instant;
use tokio::io::{AsyncReadExt, AsyncWriteExt};

Expand All @@ -21,8 +21,8 @@ async fn main() {
);

let host = "127.0.0.1";
let socka = UdxSocket::bind(format!("{host}:0")).await.unwrap();
let sockb = UdxSocket::bind(format!("{host}:0")).await.unwrap();
let socka = UdxSocket::bind(format!("{host}:0")).unwrap();
let sockb = UdxSocket::bind(format!("{host}:0")).unwrap();
let addra = socka.local_addr().unwrap();
let addrb = sockb.local_addr().unwrap();
eprintln!("addra {}", addra);
Expand All @@ -33,9 +33,9 @@ async fn main() {
let i = i as u32;
let streama = socka.connect(addrb, 1000 + i, i).unwrap();
let streamb = sockb.connect(addra, i, 1000 + i).unwrap();
let read_buf = vec![0u8; MSGSIZE as usize];
let write_buf = vec![i as u8; MSGSIZE as usize];
let (reader, writer) = if i % 2 == 0 {
let read_buf = vec![0u8; MSGSIZE];
let write_buf = vec![i as u8; MSGSIZE];
let (reader, writer) = if i.is_multiple_of(2) {
(streama, streamb)
} else {
(streamb, streama)
Expand Down
4 changes: 2 additions & 2 deletions examples/rw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::{future::Future, io};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::task::JoinHandle;

use async_udx::{UdxSocket, UdxStream, UDX_DATA_MTU};
use async_udx::{UDX_DATA_MTU, UdxSocket, UdxStream};

pub fn spawn<T>(name: impl ToString, future: T) -> JoinHandle<()>
where
Expand Down Expand Up @@ -34,7 +34,7 @@ async fn main() -> io::Result<()> {
.next()
.expect("invalid connect addr");
eprintln!("{} -> {}", listen_addr, connect_addr);
let sock = UdxSocket::bind(listen_addr).await?;
let sock = UdxSocket::bind(listen_addr)?;
let stream = sock.connect(connect_addr, 1, 1)?;
let max_len = UDX_DATA_MTU * 64;
let read = spawn("read", read_loop(stream.clone(), max_len));
Expand Down
4 changes: 2 additions & 2 deletions examples/simple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ async fn main() -> io::Result<()> {
tracing_subscriber::fmt::init();

// Bind two sockets
let socka = UdxSocket::bind("127.0.0.1:20004").await?;
let socka = UdxSocket::bind("127.0.0.1:20004")?;
let addra = socka.local_addr()?;
eprintln!("Socket A bound to {addra}");
let sockb = UdxSocket::bind("127.0.0.1:20005").await?;
let sockb = UdxSocket::bind("127.0.0.1:20005")?;
let addrb = sockb.local_addr()?;
eprintln!("Socket B bound to {addrb}");

Expand Down
7 changes: 7 additions & 0 deletions release.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
pre-release-replacements = [
{file="CHANGELOG.md", search="Unreleased", replace="{{version}}"},
{file="CHANGELOG.md", search="\\.\\.\\.HEAD", replace="...{{tag_name}}", exactly=1},
{file="CHANGELOG.md", search="ReleaseDate", replace="{{date}}"},
{file="CHANGELOG.md", search="<!-- next-header -->", replace="<!-- next-header -->\n\n## [Unreleased] - ReleaseDate\n\n### Added\n\n### Changed\n\n### Removed\n\n", exactly=1},
{file="CHANGELOG.md", search="<!-- next-url -->", replace="<!-- next-url -->\n[Unreleased]: https://github.com/datrs/async-udx/compare/{{tag_name}}...HEAD", exactly=1},
]
4 changes: 2 additions & 2 deletions src/mutex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ mod tracking {
/// Acquires the lock for a certain purpose
///
/// The purpose will be recorded in the list of last lock owners
pub fn lock(&self, purpose: &'static str) -> MutexGuard<T> {
pub fn lock(&self, purpose: &'static str) -> MutexGuard<'_, T> {
let now = Instant::now();
let guard = self.inner.lock();

Expand Down Expand Up @@ -146,7 +146,7 @@ mod non_tracking {
/// Acquires the lock for a certain purpose
///
/// The purpose will be recorded in the list of last lock owners
pub fn lock(&self, _purpose: &'static str) -> MutexGuard<T> {
pub fn lock(&self, _purpose: &'static str) -> MutexGuard<'_, T> {
MutexGuard {
guard: self.inner.lock(),
}
Expand Down
2 changes: 1 addition & 1 deletion src/packet.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use atomic_instant::AtomicInstant;
use bytes::{BufMut, Bytes};
use std::fmt::{self, Debug};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::{io, net::SocketAddr, sync::atomic::AtomicUsize};
use udx_udp::Transmit;

Expand Down
Loading
Loading