Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions examples/selective_subscription/.env.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Your Fishjam envs, which you can get at https://fishjam.io/app
FISHJAM_ID="your-fishjam-id"
FISHJAM_MANAGEMENT_TOKEN="your-management-token"
43 changes: 43 additions & 0 deletions examples/selective_subscription/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# Selective Subscription Demo

Demo application showing selective subscription functionality with [Fishjam](https://fishjam.io) and the Python Server SDK.

## Prerequisites

- Python 3.11+
- [uv](https://docs.astral.sh/uv/) package manager
- Fishjam credentials ([get them here](https://fishjam.io/app))

> [!IMPORTANT]
> All commands should be run from the `examples/selective_subscription` directory

## Quick Start

1. Install dependencies (in the `examples/selective_subscription` directory):
```bash
uv sync
```

To run the app, first copy [`.env.example`](./.env.example) to `.env` and populate your environment variables.

Once you have populated `.env`, you can run the demo with

2. Run the server:
```bash
uv run ./main.py
```

3. Open http://localhost:8000 in your browser

You create peers using the web UI at [http://localhost:8000](http://localhost:8000).

1. Create peers with names
2. Copy peer tokens and use them with a WebRTC client (e.g., [minimal-react](https://github.com/fishjam-cloud/web-client-sdk/tree/main/examples/react-client/minimal-react))
3. Once peers have tracks, manage subscriptions through the web interface

### API Endpoints

- `POST /api/peers` - Create a peer with manual subscription mode
- `POST /api/subscribe_peer` - Subscribe to all tracks from a peer
- `POST /api/subscribe_tracks` - Subscribe to specific track IDs

1 change: 1 addition & 0 deletions examples/selective_subscription/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Selective subscription demo package."""
24 changes: 24 additions & 0 deletions examples/selective_subscription/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from contextlib import asynccontextmanager

import uvicorn

from selective_subscription.app import app, room_service
from selective_subscription.config import HOST, PORT
from selective_subscription.notification_handler import NotificationHandler
from selective_subscription.worker import async_worker


@asynccontextmanager
async def lifespan(app):
async with async_worker() as worker:
notification_handler = NotificationHandler(room_service)
worker.run_in_background(notification_handler.start())
print(f"Selective subscription demo started on http://{HOST}:{PORT}")
yield


app.router.lifespan_context = lifespan


if __name__ == "__main__":
uvicorn.run("main:app", host=HOST, port=PORT, reload=True, log_level="info")
15 changes: 15 additions & 0 deletions examples/selective_subscription/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[project]
name = "selective-subscription-demo"
version = "0.1.0"
description = "Selective subscription demo using Fishjam Python SDK"
readme = "README.md"
requires-python = ">=3.11"
dependencies = [
"starlette>=0.35.0",
"uvicorn>=0.25.0",
"fishjam-server-sdk",
"python-dotenv",
]

[tool.uv.sources]
fishjam-server-sdk = { workspace = true }
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Selective subscription demo package."""
102 changes: 102 additions & 0 deletions examples/selective_subscription/selective_subscription/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
from pathlib import Path

from starlette.applications import Starlette
from starlette.middleware import Middleware
from starlette.middleware.cors import CORSMiddleware
from starlette.requests import Request
from starlette.responses import JSONResponse, Response
from starlette.routing import Route
from starlette.templating import Jinja2Templates

from .room_service import RoomService

room_service = RoomService()
templates = Jinja2Templates(
directory=str(Path(__file__).resolve().parent.parent / "templates")
)


async def create_peer(request: Request) -> Response:
try:
body = await request.json()
room_name = body.get("room_name")
peer_name = body.get("peer_name")

if not room_name or not peer_name:
return JSONResponse(
{"error": "room_name and peer_name are required"}, status_code=400
)

peer, token = room_service.create_peer()

return JSONResponse(
{
"peer_id": peer.id,
"token": token,
"room_name": room_name,
"peer_name": peer_name,
}
)
except Exception as e:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: isn't except Exception bit too general?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's fine for the usecase

return JSONResponse({"error": str(e)}, status_code=500)


async def subscribe_peer(request: Request) -> Response:
try:
body = await request.json()
peer_id = body.get("peer_id")
target_peer_id = body.get("target_peer_id")

if not peer_id or not target_peer_id:
return JSONResponse(
{"error": "peer_id and target_peer_id are required"}, status_code=400
)

room_service.subscribe_peer(peer_id, target_peer_id)
return JSONResponse({"status": "subscribed"})
except Exception as e:
return JSONResponse({"error": str(e)}, status_code=500)


async def subscribe_tracks(request: Request) -> Response:
try:
body = await request.json()
peer_id = body.get("peer_id")
track_ids = body.get("track_ids")

if not peer_id or not track_ids:
return JSONResponse(
{"error": "peer_id and track_ids are required"}, status_code=400
)

room_service.subscribe_tracks(peer_id, track_ids)
return JSONResponse({"status": "subscribed"})
except Exception as e:
return JSONResponse({"error": str(e)}, status_code=500)


async def serve_index(request: Request) -> Response:
return templates.TemplateResponse("index.html", {"request": request})


routes = [
Route("/", serve_index, methods=["GET"]),
Route("/api/peers", create_peer, methods=["POST"]),
Route("/api/subscribe_peer", subscribe_peer, methods=["POST"]),
Route("/api/subscribe_tracks", subscribe_tracks, methods=["POST"]),
]

middleware = [
Middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
]

app = Starlette(
routes=routes,
middleware=middleware,
)
10 changes: 10 additions & 0 deletions examples/selective_subscription/selective_subscription/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import os

import dotenv

dotenv.load_dotenv()

FISHJAM_ID = os.environ["FISHJAM_ID"]
FISHJAM_TOKEN = os.environ["FISHJAM_MANAGEMENT_TOKEN"]
HOST = os.getenv("HOST", "localhost")
PORT = int(os.getenv("PORT", "8000"))
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
from fishjam._ws_notifier import FishjamNotifier
from fishjam.events import (
ServerMessagePeerConnected,
ServerMessagePeerDisconnected,
ServerMessagePeerType,
ServerMessageTrackAdded,
ServerMessageTrackRemoved,
)
from fishjam.events.allowed_notifications import AllowedNotification

from .config import FISHJAM_ID, FISHJAM_TOKEN
from .room_service import RoomService


class NotificationHandler:
def __init__(self, room_service: RoomService):
self.room_service = room_service
self._notifier = FishjamNotifier(FISHJAM_ID, FISHJAM_TOKEN)

@self._notifier.on_server_notification
async def _(notification: AllowedNotification):
match notification:
case ServerMessagePeerConnected(
peer_type=ServerMessagePeerType.PEER_TYPE_WEBRTC,
):
await handle_peer_connected(notification)
case ServerMessagePeerDisconnected(
peer_type=ServerMessagePeerType.PEER_TYPE_WEBRTC,
):
await handle_peer_disconnected(notification)
case ServerMessageTrackAdded():
await handle_track_added(notification)
case ServerMessageTrackRemoved():
await handle_track_removed(notification)

async def handle_peer_connected(notification: ServerMessagePeerConnected):
print(f"Peer connected: {notification.peer_id}")

async def handle_peer_disconnected(notification: ServerMessagePeerDisconnected):
print(f"Peer disconnected: {notification.peer_id}")

async def handle_track_added(notification: ServerMessageTrackAdded):
print(f"Track added: {notification.track}")

async def handle_track_removed(notification: ServerMessageTrackRemoved):
print(f"Track removed: {notification.track}")

async def start(self) -> None:
await self._notifier.connect()
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from typing import List

from fishjam import FishjamClient, Peer, PeerOptions, Room, RoomOptions
from fishjam.errors import NotFoundError

from .config import FISHJAM_ID, FISHJAM_TOKEN


class RoomService:
def __init__(self):
self.fishjam = FishjamClient(
fishjam_id=FISHJAM_ID, management_token=FISHJAM_TOKEN
)
self.room = self.fishjam.create_room(
RoomOptions(max_peers=10, room_type="conference")
)

def get_or_create_room(self) -> Room:
try:
self.room = self.fishjam.get_room(self.room.id)
except NotFoundError:
self.room = self.fishjam.create_room()

return self.room

def create_peer(self) -> tuple[Peer, str]:
room = self.get_or_create_room()
options = PeerOptions(subscribe_mode="manual")
peer, token = self.fishjam.create_peer(room.id, options)
return peer, token

def subscribe_peer(self, peer_id: str, target_peer_id: str):
room = self.get_or_create_room()
self.fishjam.subscribe_peer(room.id, peer_id, target_peer_id)

def subscribe_tracks(self, peer_id: str, track_ids: List[str]):
room = self.get_or_create_room()
self.fishjam.subscribe_tracks(room.id, peer_id, track_ids)
31 changes: 31 additions & 0 deletions examples/selective_subscription/selective_subscription/worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from asyncio import Task, TaskGroup
from contextlib import asynccontextmanager
from typing import Any, Coroutine


class BackgroundWorker:
def __init__(self, tg: TaskGroup) -> None:
self._tg = tg
self._tasks: set[Task[None]] = set()

def run_in_background(self, coro: Coroutine[Any, Any, None]):
task = self._tg.create_task(coro)
task.add_done_callback(self._remove_task)
self._tasks.add(task)
return task

def _remove_task(self, task: Task[None]):
self._tasks.discard(task)

def cleanup(self):
for task in self._tasks:
task.cancel()
self._tasks = set()


@asynccontextmanager
async def async_worker():
async with TaskGroup() as tg:
worker = BackgroundWorker(tg)
yield worker
worker.cleanup()
Loading