Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add moq-sub to subscribe to media from moq relays #133

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 34 additions & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
[workspace]
members = ["moq-transport", "moq-relay", "moq-pub", "moq-api", "moq-clock"]
members = ["moq-transport", "moq-relay", "moq-pub", "moq-api", "moq-clock", "moq-sub"]
resolver = "2"
21 changes: 21 additions & 0 deletions dev/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,13 @@ You should have it installed already if you're a video nerd, otherwise:
brew install ffmpeg
```

### moq-sub

You can use `moq-sub` to subscribe to media streams from a MoQ relay and pipe them to the standard output. By piping the
command to a video player, e.g. `ffplay` or `mpv`, you can play a MoQ broadcast natively.

Currently, `moq-sub` simply dumps all received segments of the first video and the first audio track directly to `stdout`.

### moq-api

`moq-api` uses a redis instance to store active origins for clustering.
Expand All @@ -68,6 +75,7 @@ We run the redis instance via a container automatically as part of `dev/api`.
./dev/cert
./dev/relay
./dev/pub
./dev/sub
```

They will each print out a URL you can use to publish/watch broadcasts.
Expand Down Expand Up @@ -98,13 +106,26 @@ By default, the broadcast name is `dev` but you can overwrite it with the `NAME`

> Watch URL: https://quic.video/watch/dev?server=localhost:4443

By default, the audio track is exlcluded, because the web player does not yet support audio playback. To include audio,
set the `AUDIO=1` env var.

If you're debugging encoding issues, you can use this script to dump the file to disk instead, defaulting to
`dev/output.mp4`.

```bash
./dev/pub-file
```

### moq-sub

The following command subscribes to a stream from a MoQ relay and plays it with `ffplay`.
By default, the URL is `https://localhost:4433/dev`, so it will play the stream published with `dev/pub` to the relay
started with `dev/relay`. You can change the broadcast name by setting the `NAME` env var.

```bash
./dev/sub
```

### moq-api

The following commands runs an API server, listening for HTTP requests on `http://localhost:4442` by default.
Expand Down
10 changes: 7 additions & 3 deletions dev/pub
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,20 @@ URL="${URL:-"https://$ADDR/$NAME"}"
# Default to a source video
INPUT="${INPUT:-dev/source.mp4}"

# Only inlcude audio track if AUDIO=1 is set
# TODO enable audio by default again once fixed.
if [[ ! -v AUDIO ]]; then
NO_AUDIO=1
fi

# Print out the watch URL
echo "Watch URL: https://quic.video/watch/$NAME?server=$ADDR"

# Run ffmpeg and pipe the output to moq-pub
# TODO enable audio again once fixed.
ffmpeg -hide_banner -v quiet \
-stream_loop -1 -re \
-i "$INPUT" \
-c copy \
-an \
-c copy ${NO_AUDIO:+-an} \
-f mp4 -movflags cmaf+separate_moof+delay_moov+skip_trailer \
-frag_duration 1 \
- | cargo run --bin moq-pub -- "$URL" "$@"
21 changes: 21 additions & 0 deletions dev/sub
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#!/bin/bash
set -euo pipefail

# Change directory to the root of the project
cd "$(dirname "$0")/.."

# Use debug logging by default
export RUST_LOG="${RUST_LOG:-debug}"

# Connect to localhost by default.
HOST="${HOST:-localhost}"
PORT="${PORT:-4443}"
ADDR="${ADDR:-$HOST:$PORT}"

# Use the broadcast name "dev" by default
NAME="${NAME:-dev}"

# Combine the host and name into a URL.
URL="${URL:-"https://$ADDR/$NAME"}"

cargo run --bin moq-sub -- "$URL" "$@" | ffplay -
46 changes: 46 additions & 0 deletions moq-sub/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
[package]
name = "moq-sub"
description = "Media over QUIC"
authors = []
repository = "https://github.com/kixelated/moq-rs"
license = "MIT OR Apache-2.0"

version = "0.1.0"
edition = "2021"

keywords = ["quic", "http3", "webtransport", "media", "live"]
categories = ["multimedia", "network-programming", "web-programming"]

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
moq-transport = { path = "../moq-transport" }

# QUIC
quinn = "0.10"
webtransport-quinn = "0.6.1"
url = "2"

# Crypto
rustls = { version = "0.21", features = ["dangerous_configuration"] }
rustls-native-certs = "0.6"
rustls-pemfile = "1"

# Async stuff
tokio = { version = "1", features = ["full"] }

# CLI, logging, error handling
clap = { version = "4", features = ["derive"] }
log = { version = "0.4", features = ["std"] }
env_logger = "0.9"
mp4 = "0.13"
anyhow = { version = "1", features = ["backtrace"] }
serde_json = "1"
rfc6381-codec = "0.1"
tracing = "0.1"
tracing-subscriber = "0.3"

[build-dependencies]
clap = { version = "4", features = ["derive"] }
clap_mangen = "0.2"
url = "2"
10 changes: 10 additions & 0 deletions moq-sub/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# moq-sub

A command line tool for subscribing to media via Media over QUIC (MoQ).

Takes an URL to MoQ relay with a broadcast name in the path part of the URL. It will connect to the relay, subscribe to
the broadcast, and dump the media segments of the first video and first audio track to STDOUT.

```
moq-sub https://localhost:4443/dev | ffplay -
```
1 change: 1 addition & 0 deletions moq-sub/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod media;
138 changes: 138 additions & 0 deletions moq-sub/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
use std::{fs, io, net, path, sync::Arc, time};

use anyhow::Context;
use clap::Parser;
use moq_transport::cache::broadcast;
use url::Url;

use moq_sub::media::Media;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
env_logger::init();

// Disable tracing so we don't get a bunch of Quinn spam.
let tracer = tracing_subscriber::FmtSubscriber::builder()
.with_max_level(tracing::Level::WARN)
.finish();
tracing::subscriber::set_global_default(tracer).unwrap();

let config = Config::parse();

let (publisher, subscriber) = broadcast::new("");
let out = tokio::io::stdout();
let mut media = Media::new(subscriber, out).await?;

// Create a list of acceptable root certificates.
let mut roots = rustls::RootCertStore::empty();

if config.tls_root.is_empty() {
// Add the platform's native root certificates.
for cert in rustls_native_certs::load_native_certs().context("could not load platform certs")? {
roots
.add(&rustls::Certificate(cert.0))
.context("failed to add root cert")?;
}
} else {
// Add the specified root certificates.
for root in &config.tls_root {
let root = fs::File::open(root).context("failed to open root cert file")?;
let mut root = io::BufReader::new(root);

let root = rustls_pemfile::certs(&mut root).context("failed to read root cert")?;
anyhow::ensure!(root.len() == 1, "expected a single root cert");
let root = rustls::Certificate(root[0].to_owned());

roots.add(&root).context("failed to add root cert")?;
}
}

let mut tls_config = rustls::ClientConfig::builder()
.with_safe_defaults()
.with_root_certificates(roots)
.with_no_client_auth();

// Allow disabling TLS verification altogether.
if config.tls_disable_verify {
let noop = NoCertificateVerification {};
tls_config.dangerous().set_certificate_verifier(Arc::new(noop));
}

tls_config.alpn_protocols = vec![webtransport_quinn::ALPN.to_vec()]; // this one is important

let arc_tls_config = std::sync::Arc::new(tls_config);
let quinn_client_config = quinn::ClientConfig::new(arc_tls_config);

let mut endpoint = quinn::Endpoint::client(config.bind)?;
endpoint.set_default_client_config(quinn_client_config);

log::info!("connecting to relay: url={}", config.url);

let session = webtransport_quinn::connect(&endpoint, &config.url)
.await
.context("failed to create WebTransport session")?;
log::trace!("WebTransport session established");

let session = moq_transport::session::Client::subscriber(session, publisher)
.await
.context("failed to create MoQ Transport session")?;
log::trace!("MoQ transport session established");

tokio::select! {
res = session.run() => res.context("session error")?,
res = media.run() => res.context("media error")?,
}

Ok(())
}

pub struct NoCertificateVerification {}

impl rustls::client::ServerCertVerifier for NoCertificateVerification {
fn verify_server_cert(
&self,
_end_entity: &rustls::Certificate,
_intermediates: &[rustls::Certificate],
_server_name: &rustls::ServerName,
_scts: &mut dyn Iterator<Item = &[u8]>,
_ocsp_response: &[u8],
_now: time::SystemTime,
) -> Result<rustls::client::ServerCertVerified, rustls::Error> {
Ok(rustls::client::ServerCertVerified::assertion())
}
}

#[derive(Parser, Clone, Debug)]
pub struct Config {
/// Listen for UDP packets on the given address.
#[arg(long, default_value = "[::]:0")]
pub bind: net::SocketAddr,

/// Connect to the given URL starting with https://
#[arg(value_parser = moq_url)]
pub url: Url,

/// Use the TLS root CA at this path, encoded as PEM.
///
/// This value can be provided multiple times for multiple roots.
/// If this is empty, system roots will be used instead
#[arg(long)]
pub tls_root: Vec<path::PathBuf>,

/// Danger: Disable TLS certificate verification.
///
/// Fine for local development, but should be used in caution in production.
#[arg(long)]
pub tls_disable_verify: bool,
}

fn moq_url(s: &str) -> Result<Url, String> {
let url = Url::try_from(s).map_err(|e| e.to_string())?;

// Make sure the scheme is moq
if url.scheme() != "https" {
return Err("url scheme must be https:// for WebTransport".to_string());
}

Ok(url)
}
Loading
Loading