Skip to content
This repository was archived by the owner on Aug 23, 2022. It is now read-only.

Add main parts of SFU example #15

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ serde_json = "1.0.75"
bytes = "1.1.0"
lazy_static = "1.4.0"
rand = "0.8.4"
flume = "0.10.12"
hyper-tungstenite = "0.6.0"
futures = "0.3"
uuid = { version = "0.8", features = ["serde", "v4"] }

[profile.dev]
opt-level = 0
Expand Down Expand Up @@ -148,3 +152,8 @@ bench = false
name = "ice-restart"
path = "examples/ice-restart/ice-restart.rs"
bench = false

[[example]]
name = "sfu"
path = "examples/sfu/main.rs"
bench = false
17 changes: 17 additions & 0 deletions examples/sfu/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# SFU

This is a simplified example of how you may build an SFU (Selective Forwarding Unit) with `webrtc-rs`. It demonstrates the following:

- Other peers joining the call
- Trickle ICE
- Re-negotiation
- Audio with Opus and video with VP8

## Building

Run the build:
```
cargo build --example sfu
```

Navigate to `http://localhost:8081` in one browser/tab, and then open it again in another browser/tab.
172 changes: 172 additions & 0 deletions examples/sfu/index.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
<!DOCTYPE html>
<html lang="en">
<head>
<title>SFU Example</title>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
</head>
<body>

<main>
<h1>Peer ID: <span id="uuid"></span></h1>
<div id="signalingContainer" style="display: none">
<h2>Browser base64 Session Description</h2>
<textarea id="localSessionDescription" readonly></textarea>

<h2>Golang base64 Session Description</h2>
<textarea id="remoteSessionDescription"></textarea>
</div>

<h2>Local Video</h2>
<video id="local" width="160" height="120" autoplay muted></video>
<h2>Remote Videos</h2>
<div id="remoteVideos"></div> <br />

<div class="grid">
<div>
<h4>Local SDP</h4>
<pre>{pc?.localDescription?.sdp}</pre>
</div>
<div>
<h4>Remote SDP</h4>
<pre>{pc?.remoteDescription?.sdp}</pre>
</div>
</div>

<h2>Logs</h2>
<div id="logs"></div>
</main>

<script>
let pc
let uuidEl = document.querySelector('#uuid')

function randomId() {
return 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, function(c) {
let r = Math.random() * 16 | 0, v = c == 'x' ? r : (r & 0x3 | 0x8)
return v.toString(16)
})
}

let uuid = randomId()
uuidEl.innerHTML = uuid

let ws = new WebSocket("ws://localhost:8081")

ws.onopen = async _e => {
console.log("Connection established. Creating peer.")
pc = await createPeerConnection()
connect()
}

ws.onmessage = async (event) => {
let msg = JSON.parse(event.data)

switch (msg.event) {
case 'answer': {
let answer = JSON.parse(msg.data)
console.warn("Got this answer:", answer)
if (!answer) { return console.log('failed to parse offer') }

await pc.setRemoteDescription(answer)

return
}
case 'offer': {
let offer = JSON.parse(msg.data)
if (!offer) { return console.log('failed to parse offer') }

console.warn("Got this offer:", offer)

console.log(pc.getSenders())
await pc.setRemoteDescription(offer)

const answer = await pc.createAnswer()
console.warn('Sending answer.', answer)
await pc.setLocalDescription(answer)

ws.send(JSON.stringify({
event: "answer",
data: answer.sdp,
uuid: msg.uuid
}))

return
}
case 'candidate': {
let candidate = JSON.parse(msg.data)
if (!candidate) {
return console.log('failed to parse candidate')
}

pc.addIceCandidate(candidate)
}
}
}

const connect = async () => {
const offer = await pc.createOffer()
await pc.setLocalDescription(offer)

console.warn('Sending offer.', offer)
ws.send(JSON.stringify({
event: "offer",
data: offer.sdp,
uuid: uuid
}))
}

const createPeerConnection = async () => {
let stream = await navigator.mediaDevices.getUserMedia({ video: true, audio: true })

let pc = new RTCPeerConnection({
iceServers: [
{
urls: 'stun:stun.l.google.com:19302'
}
]
})

pc.ontrack = function (event) {
console.log("Got a new track!", event)

if (event.track.kind === 'audio') {
return
}

let el = document.createElement(event.track.kind)
el.srcObject = event.streams[0]
el.autoplay = true
el.controls = true
document.getElementById('remoteVideos').appendChild(el)

event.track.onmute = function(event) {
el.play()
}

event.streams[0].onremovetrack = ({track}) => {
if (el.parentNode) {
el.parentNode.removeChild(el)
}
}
}

pc.oniceconnectionstatechange = e => console.warn(pc.iceConnectionState)

pc.onicecandidate = e => {
if (!e.candidate) {
return
}

ws.send(JSON.stringify({event: 'candidate', uuid, data: JSON.stringify(e.candidate)}))
}

stream.getTracks().forEach(track => pc.addTrack(track, stream));
let el = document.getElementById('local')
el.srcObject = stream

return pc
}
</script>
</body>
</html>
96 changes: 96 additions & 0 deletions examples/sfu/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
use anyhow::Result;
use sfu::signal::SocketMessage;
use std::sync::Arc;
use webrtc::{track::track_remote::TrackRemote, rtp::packet::Packet, rtp_transceiver::rtp_codec::RTPCodecType};
use flume::{Sender, Receiver};

mod sfu;

#[derive(Debug, Clone)]
pub enum PeerChanCommand {
SendIceCandidate {
uuid: String,
candidate: String,
},
ReceiveIceCandidate {
uuid: String,
candidate: String,
},
SendOffer {
uuid: String
},
ReceiveOffer {
uuid: String,
sdp: String,
tx: Sender<SocketMessage>
},
ReceiveAnswer {
uuid: String,
sdp: String,
},
OnTrack {
uuid: String,
track: Arc<TrackRemote>
},
DistributePacket {
uuid: String,
packet: Packet,
kind: RTPCodecType
}
}

#[tokio::main]
async fn main() -> Result<()> {
let new_conn_rx = sfu::signal::ws_sdp_signaler(8081).await;

let (peer_chan_tx, peer_chan_rx) = flume::unbounded::<PeerChanCommand>();

let tx_clone_1 = peer_chan_tx.clone();
let tx_clone_2 = peer_chan_tx.clone();

tokio::spawn(async move {
sfu::media::handle_peer_connection_commands(peer_chan_rx, tx_clone_1.clone()).await.unwrap();
});

while let Ok((uuid, socket_tx, socket_rx)) = new_conn_rx.recv_async().await {
handle_new_connection(&uuid, tx_clone_2.clone(), socket_tx, socket_rx).await.unwrap();
};

Ok(())
}

// Handler to spin off for every new connection.
async fn handle_new_connection(_uuid: &String, peer_chan_tx: Sender<PeerChanCommand>, socket_tx: Sender<SocketMessage>, socket_rx: Receiver<SocketMessage>) -> Result<()> {
tokio::spawn(async move {
println!("Handling a new connection.");
while let Ok(signal) = socket_rx.recv_async().await {
match signal {
SocketMessage { event, uuid: id, data: sdp } if event == "offer" => {
println!("\nReceiving offer: {:?}, for uuid: {:?}\n", sdp, id);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should use log::info or so for these, no printlns

peer_chan_tx.send(PeerChanCommand::ReceiveOffer {
uuid: id.to_owned(),
tx: socket_tx.clone(),
sdp
}).unwrap();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

expect instead of unwrap if we don't think this can happen. Alternatively if the receiver is gone should this task halt?

},
SocketMessage { event, uuid: id, data: sdp } if event == "answer" => {
println!("\nReceiving answer: {:?}, for uuid: {:?}\n", sdp, id);
peer_chan_tx.send(PeerChanCommand::ReceiveAnswer {
uuid: id.to_owned(),
sdp
}).unwrap();
},
SocketMessage { event, uuid: id, data: candidate } if event == "candidate" => {
println!("\nReceiving ice candidate, for uuid: {:?}\n", id);
peer_chan_tx.send(PeerChanCommand::ReceiveIceCandidate {
uuid: id.to_owned(),
candidate
}).unwrap();
},
_ => ()
}
};
});

Ok(())
}
3 changes: 3 additions & 0 deletions examples/sfu/sfu.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pub mod signal;
pub mod api;
pub mod media;
81 changes: 81 additions & 0 deletions examples/sfu/sfu/api.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
use anyhow::Result;
use webrtc::api::API;
use webrtc::api::interceptor_registry::register_default_interceptors;
use webrtc::api::media_engine::{MediaEngine, MIME_TYPE_VP8, MIME_TYPE_OPUS};
use webrtc::api::APIBuilder;
use webrtc::ice_transport::ice_server::RTCIceServer;
use webrtc::interceptor::registry::Registry;
use webrtc::peer_connection::configuration::RTCConfiguration;
use webrtc::rtp_transceiver::rtp_codec::{RTPCodecType, RTCRtpCodecCapability, RTCRtpCodecParameters};


pub fn prepare_api() -> Result<API, anyhow::Error> {
let audio = true;
let video = true;

// Create a MediaEngine object to configure the supported codec
let mut m = MediaEngine::default();

// Setup the codecs you want to use.
if audio {
m.register_codec(
RTCRtpCodecParameters {
capability: RTCRtpCodecCapability {
mime_type: MIME_TYPE_OPUS.to_owned(),
..Default::default()
},
payload_type: 120,
..Default::default()
},
RTPCodecType::Audio,
)?;
}

// We'll use a VP8 and Opus but you can also define your own
if video {
m.register_codec(
RTCRtpCodecParameters {
capability: RTCRtpCodecCapability {
mime_type: MIME_TYPE_VP8.to_owned(),
clock_rate: 90000,
channels: 0,
sdp_fmtp_line: "".to_owned(),
rtcp_feedback: vec![],
},
payload_type: 96,
..Default::default()
},
RTPCodecType::Video,
)?;
}

// Create a InterceptorRegistry. This is the user configurable RTP/RTCP Pipeline.
// This provides NACKs, RTCP Reports and other features. If you use `webrtc.NewPeerConnection`
// this is enabled by default. If you are manually managing You MUST create a InterceptorRegistry
// for each PeerConnection.
let mut registry = Registry::new();

// Use the default set of Interceptors
registry = register_default_interceptors(registry, &mut m)?;

// Create the API object with the MediaEngine
let api = APIBuilder::new()
.with_media_engine(m)
.with_interceptor_registry(registry)
.build();

Ok(api)
}

pub fn prepare_configuration() -> Result<RTCConfiguration, anyhow::Error> {
// Prepare the configuration
let config = RTCConfiguration {
ice_servers: vec![RTCIceServer {
urls: vec!["stun:stun.l.google.com:19302".to_owned()],
..Default::default()
}],
..Default::default()
};

Ok(config)
}
Loading