Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions lib/contribute/broadcast.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import * as Catalog from "../media/catalog"
import { isAudioTrackSettings, isVideoTrackSettings } from "../common/settings"

export interface BroadcastConfig {
namespace: string
namespace: string[]
connection: Connection
media: MediaStream

Expand All @@ -26,7 +26,7 @@ export class Broadcast {
readonly config: BroadcastConfig
readonly catalog: Catalog.Root
readonly connection: Connection
readonly namespace: string
readonly namespace: string[]

#running: Promise<void>

Expand Down Expand Up @@ -148,7 +148,7 @@ export class Broadcast {
// Send a SUBSCRIBE_OK
await subscriber.ack()

const stream = await subscriber.group({ group: 0 })
const stream = await subscriber.subgroup({ group: 0, subgroup: 0 })
await stream.write({ object: 0, payload: bytes })
await stream.close()
}
Expand All @@ -162,7 +162,7 @@ export class Broadcast {

const init = await track.init()

const stream = await subscriber.group({ group: 0 })
const stream = await subscriber.subgroup({ group: 0, subgroup: 0 })
await stream.write({ object: 0, payload: init })
await stream.close()
}
Expand Down Expand Up @@ -190,8 +190,9 @@ export class Broadcast {

async #serveSegment(subscriber: SubscribeRecv, segment: Segment) {
// Create a new stream for each segment.
const stream = await subscriber.group({
const stream = await subscriber.subgroup({
group: segment.id,
subgroup: 0, // @todo: figure out the right way to do this
priority: 127, // TODO,default to mid value, see: https://github.com/moq-wg/moq-transport/issues/504
})

Expand Down
12 changes: 8 additions & 4 deletions lib/media/catalog/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { Connection } from "../../transport"
import { asError } from "../../common/error"

export interface CommonTrackFields {
namespace?: string
namespace?: string[]
packaging?: string
renderGroup?: number
altGroup?: number
Expand All @@ -28,6 +28,9 @@ export function decode(raw: Uint8Array): Root {
const str = decoder.decode(raw)

const catalog = JSON.parse(str)
// namespace comes serialized as a "/" joined string, cast it back to a tuple (array)
catalog.commonTrackFields.namespace = catalog.commonTrackFields.namespace?.split("/").filter(Boolean) // remove empty strings

if (!isRoot(catalog)) {
throw new Error("invalid catalog")
}
Expand All @@ -43,7 +46,7 @@ export function decode(raw: Uint8Array): Root {
return catalog
}

export async function fetch(connection: Connection, namespace: string): Promise<Root> {
export async function fetch(connection: Connection, namespace: string[]): Promise<Root> {
const subscribe = await connection.subscribe(namespace, ".catalog")
try {
const segment = await subscribe.data()
Expand All @@ -61,6 +64,7 @@ export async function fetch(connection: Connection, namespace: string): Promise<
throw new Error("invalid catalog chunk")
}
} catch (e) {
console.error("Catalog fetch error: ", e)
const err = asError(e)

// Close the subscription after we're done.
Expand All @@ -78,7 +82,7 @@ export function isRoot(catalog: any): catalog is Root {
}

export interface Track {
namespace?: string
namespace?: string[]
name: string
depends?: any[]
packaging?: string
Expand Down Expand Up @@ -174,7 +178,7 @@ function isCatalogFieldValid(catalog: any, field: string): boolean {
}

function isValidNamespace(namespace: any): boolean {
return typeof namespace === "string"
return Array.isArray(namespace) && namespace.every((ns) => typeof ns === "string")
}

let isValidField: (value: any) => boolean
Expand Down
4 changes: 2 additions & 2 deletions lib/playback/backend.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import MediaWorker from "web-worker:./worker/index.ts"

import { RingShared } from "../common/ring"
import { Root, isAudioTrack } from "../media/catalog"
import { GroupHeader } from "../transport/objects"
import { SubgroupHeader } from "../transport/objects"

export interface PlayerConfig {
canvas: OffscreenCanvas
Expand Down Expand Up @@ -123,7 +123,7 @@ export interface Init {
export interface Segment {
init: string // name of the init track
kind: "audio" | "video"
header: GroupHeader
header: SubgroupHeader
buffer: Uint8Array
stream: ReadableStream<Uint8Array>
}
13 changes: 8 additions & 5 deletions lib/playback/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { asError } from "../common/error"
import Backend from "./backend"

import { Client } from "../transport/client"
import { GroupReader } from "../transport/objects"
import { SubgroupReader } from "../transport/objects"

export type Range = Message.Range
export type Timeline = Message.Timeline
Expand Down Expand Up @@ -71,7 +71,7 @@ export default class Player {
const client = new Client({ url: config.url, fingerprint: config.fingerprint, role: "subscriber" })
const connection = await client.connect()

const catalog = await Catalog.fetch(connection, config.namespace)
const catalog = await Catalog.fetch(connection, [config.namespace])
console.log("catalog", catalog)

const canvas = config.canvas.transferControlToOffscreen()
Expand All @@ -81,13 +81,15 @@ export default class Player {
}

async #run() {
// Key is "/" serialized namespace for lookup ease
// Value is Track.initTrack. @todo: type this properly
const inits = new Set<[string, string]>()
const tracks = new Array<Catalog.Track>()

this.#catalog.tracks.forEach((track, index) => {
if (index == this.#tracknum || Catalog.isAudioTrack(track)) {
if (!track.namespace) throw new Error("track has no namespace")
if (track.initTrack) inits.add([track.namespace, track.initTrack])
if (track.initTrack) inits.add([track.namespace.join("/"), track.initTrack])
tracks.push(track)
}
})
Expand All @@ -103,7 +105,7 @@ export default class Player {
}

async #runInit(namespace: string, name: string) {
const sub = await this.#connection.subscribe(namespace, name)
const sub = await this.#connection.subscribe([namespace], name)
try {
const init = await Promise.race([sub.data(), this.#running])
if (!init) throw new Error("no init data")
Expand Down Expand Up @@ -143,7 +145,7 @@ export default class Player {
const segment = await Promise.race([sub.data(), this.#running])
if (!segment) continue

if (!(segment instanceof GroupReader)) {
if (!(segment instanceof SubgroupReader)) {
throw new Error(`expected group reader for segment: ${track.name}`)
}

Expand Down Expand Up @@ -281,6 +283,7 @@ export default class Player {
try {
await this.#running
} catch (e) {
console.error("Error in Player.closed():", e)
return asError(e)
}
}
Expand Down
4 changes: 2 additions & 2 deletions lib/playback/worker/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import * as MP4 from "../../media/mp4"
import * as Message from "./message"
import { asError } from "../../common/error"
import { Deferred } from "../../common/async"
import { GroupReader, Reader } from "../../transport/objects"
import { SubgroupReader, Reader } from "../../transport/objects"

class Worker {
// Timeline receives samples, buffering them and choosing the timestamp to render.
Expand Down Expand Up @@ -71,7 +71,7 @@ class Worker {
const container = new MP4.Parser(await init.promise)

const timeline = msg.kind === "audio" ? this.#timeline.audio : this.#timeline.video
const reader = new GroupReader(msg.header, new Reader(msg.buffer, msg.stream))
const reader = new SubgroupReader(msg.header, new Reader(msg.buffer, msg.stream))

// Create a queue that will contain each MP4 frame.
const queue = new TransformStream<MP4.Frame>({})
Expand Down
4 changes: 2 additions & 2 deletions lib/playback/worker/message.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { GroupHeader } from "../../transport/objects"
import { SubgroupHeader } from "../../transport/objects"
import { RingShared } from "../../common/ring"

export interface Config {
Expand All @@ -25,7 +25,7 @@ export interface Init {
export interface Segment {
init: string // name of the init object
kind: "audio" | "video"
header: GroupHeader
header: SubgroupHeader
buffer: Uint8Array
stream: ReadableStream<Uint8Array>
}
Expand Down
7 changes: 5 additions & 2 deletions lib/transport/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,16 @@ export class Client {
const setup = new Setup.Stream(reader, writer)

// Send the setup message.
await setup.send.client({ versions: [Setup.Version.DRAFT_05], role: this.config.role })
await setup.send.client({
versions: [Setup.Version.DRAFT_06],
role: this.config.role,
})

// Receive the setup message.
// TODO verify the SETUP response.
const server = await setup.recv.server()

if (server.version != Setup.Version.DRAFT_05) {
if (server.version != Setup.Version.DRAFT_06) {
throw new Error(`unsupported server version: ${server.version}`)
}

Expand Down
4 changes: 2 additions & 2 deletions lib/transport/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,15 @@ export class Connection {
await Promise.all([this.#runControl(), this.#runObjects()])
}

announce(namespace: string) {
announce(namespace: string[]) {
return this.#publisher.announce(namespace)
}

announced() {
return this.#subscriber.announced()
}

subscribe(namespace: string, track: string) {
subscribe(namespace: string[], track: string) {
return this.#subscriber.subscribe(namespace, track)
}

Expand Down
Loading
Loading