Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "mtai",
"version": "0.2.5-alpha.5",
"version": "0.2.6-alpha.1",
"private": false,
"publishConfig": {
"access": "public",
Expand Down
187 changes: 177 additions & 10 deletions core/src/dh2d/connections.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,160 @@ async function gc() {
})
}

export async function connect(logger: Logger, elements: DH2DPlayerElements, config: Required<DH2DSessionConfig>, abortable: Abortable): Promise<DH2DConnection> {
function monitorVideoStall(video: HTMLVideoElement, maxStallDuration: number, onStall: (info: {
currentTime: number;
duration: number;
buffered: { start: number; end: number }[];
readyState: number;
readyStateDesc: string;
reason: string;
}) => void, logFn: (info: any) => void) {
let lastTime = 0;
let lastUpdate = 0;
let intervalId: number | null = null;
let started = false;
let stallTimer: number | null = null;
let inStall = false;

function resetTimeTracking() {
lastTime = video.currentTime;
lastUpdate = Date.now();
}

function enterPlayingState() {
inStall = false;
resetTimeTracking();
stallTimer && clearTimeout(stallTimer);
stallTimer = null;
}

function startMonitoring() {
if (!started) {
started = true;
enterPlayingState();

video.addEventListener("timeupdate", onTimeUpdate);
video.addEventListener("waiting", onWaiting);
intervalId = setInterval(checkStall, 1000);
} else {
enterPlayingState();
}
}

function onTimeUpdate() {
if (video.currentTime !== lastTime) {
enterPlayingState();
}
}

function checkStall() {
if (!video.paused && !video.ended && !inStall) {
const now = Date.now();
if (now - lastUpdate >= maxStallDuration) {
triggerStall();
}
}
}

function onWaiting() {
if (!inStall) {
stallTimer && clearTimeout(stallTimer);
stallTimer = setTimeout(() => {
triggerStall();
}, maxStallDuration);
}
}

function getBufferedRanges() {
const ranges = [];
for (let i = 0; i < video.buffered.length; i++) {
ranges.push({
start: video.buffered.start(i),
end: video.buffered.end(i)
});
}
return ranges;
}

function analyzeReason() {
const readyStateMap = {
0: "no media data loaded",
1: "metadata loaded, no data to play",
2: "data for current position only",
3: "data to play a little ahead",
4: "enough data to keep playing"
} as Record<number, string>;

const buffered = getBufferedRanges();
const current = video.currentTime;

let hasDataAhead = buffered.some(range => range.end > current + 0.05);

let reason;
if (!hasDataAhead) {
reason = "network starvation (no buffered data ahead)";
} else if (video.readyState <= 2) {
reason = "decoding or format delay";
} else {
reason = "unknown stall cause";
}

return {
reason,
readyState: video.readyState,
readyStateDesc: readyStateMap[video.readyState] || "unknown state"
};
}

function triggerStall() {
if (!inStall) {
inStall = true;
const bufferedRanges = getBufferedRanges();
const reasonInfo = analyzeReason();

const info = {
currentTime: video.currentTime,
duration: video.duration,
buffered: bufferedRanges,
readyState: reasonInfo.readyState,
readyStateDesc: reasonInfo.readyStateDesc,
reason: reasonInfo.reason
};

// Delegate logging if provided
if (typeof logFn === "function") {
logFn({
event: "stall-detected",
...info,
bufferedGaps: bufferedRanges.map(r =>
`[${r.start.toFixed(2)}s → ${r.end.toFixed(2)}s]`
)
});
}

onStall(info);
}
}

// Start monitoring immediately if video is already playing, otherwise wait for playing event
if (!video.paused && !video.ended) {
startMonitoring();
} else {
video.addEventListener("playing", startMonitoring);
}

return function cancelMonitor() {
intervalId && clearInterval(intervalId);
intervalId = null;
stallTimer && clearTimeout(stallTimer);
stallTimer = null;
video.removeEventListener("playing", startMonitoring);
video.removeEventListener("timeupdate", onTimeUpdate);
video.removeEventListener("waiting", onWaiting);
};
}

export async function connect(logger: Logger, evts: ReturnType<typeof events<DHConnectionEvents>>, elements: DH2DPlayerElements, config: Required<DH2DSessionConfig>, abortable: Abortable): Promise<DH2DConnection> {
const { endpoint } = rootConfig()
const rootLogger = logger.push((_, ...args) => _('[connect]', ...args))
let videoVisible = false
Expand Down Expand Up @@ -63,8 +216,7 @@ export async function connect(logger: Logger, elements: DH2DPlayerElements, conf
abortable.onabort(() => {
reject(new Error('connection aborted'))
})
const evt = events<DHConnectionEvents>()
const connection: Diff<DH2DConnection, typeof evt> = {
const connection: Diff<DH2DConnection, typeof evts> = {
get ws() { return ws },
get pc() { return pc },
get audioPc() { return audioPc },
Expand Down Expand Up @@ -105,14 +257,14 @@ export async function connect(logger: Logger, elements: DH2DPlayerElements, conf
}
Promise.all(tasks).then(() => resolve({
...connection,
...(({ on, off }) => ({ on, off }))(evt),
...(({ on, off }) => ({ on, off }))(evts),
})).catch(reject)
}
if (DHOutputMessageType.includes(message.type)) {
evt.emit("message", message)
evts.emit("message", message)
}
} else {
evt.emit("audio", event.data)
evts.emit("audio", event.data)
}
}
})
Expand Down Expand Up @@ -212,9 +364,6 @@ export async function disconnect(logger: Logger, connection: DH2DConnection, ele
connection.ws.close()
connection.pc.close()
connection.audioPc?.close()
for (const events of DHConnectionEventTypes) {
connection.off(events)
}
await gc()
} catch (e) {
logger.error(e)
Expand All @@ -224,7 +373,7 @@ export async function disconnect(logger: Logger, connection: DH2DConnection, ele
}
}

export async function untilFailed(logger: Logger, connection: DH2DConnection, config: Required<DH2DSessionConfig>, abortable: Abortable) {
export async function untilFailed(logger: Logger, connection: DH2DConnection, player: DH2DPlayerElements, config: Required<DH2DSessionConfig>, abortable: Abortable) {
logger = logger.push((_, ...args) => _('[untilFailed]', ...args))
logger.log('enter')
let running = true
Expand All @@ -235,6 +384,24 @@ export async function untilFailed(logger: Logger, connection: DH2DConnection, co
running = false
resolve()
})
const maxStallDuration = config.maxStallDuration
if (maxStallDuration > 0) {
dispose.push(
monitorVideoStall(
player.video,
config.maxStallDuration,
(info) => {
logger.error(
`reconnect due to video stall: ${info.reason} at ${info.currentTime.toFixed(2)}s`, info)
running = false
resolve()
},
(msg) => {
logger.log(msg)
}
)
)
}
connection.ws.addEventListener("close", () => {
if (!running) return
logger.log('reconnect due to websocket close')
Expand Down
1 change: 1 addition & 0 deletions core/src/dh2d/hooks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ function defaultConfig(config?: DH2DSessionConfig): Required<DH2DSessionConfig>
frameRate: config?.frameRate || 25,
reconnectInterval: config?.reconnectInterval || 20 * 60 * 1000,
connectTimeout: config?.connectTimeout || 60 * 1000,
maxStallDuration: config?.maxStallDuration || 10 * 1000,
maxAudioVideoDurationDifference: config?.maxAudioVideoDurationDifference || 10 * 1000,
pingInterval: config?.pingInterval || 10 * 1000,
pingTimeout: config?.pingTimeout || 5 * 1000,
Expand Down
12 changes: 8 additions & 4 deletions core/src/dh2d/sessions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { isLoggedIn } from "../auth"
import { box } from "../box"
import { events } from "../events"
import { hooks } from "../hooks"
import { Box, DHInputMessage, DHOutputMessage } from "../types"
import { Box, DHConnectionEvents, DHConnectionEventTypes, DHInputMessage, DHOutputMessage } from "../types"
import { DH2DConnection, DH2DSession, DH2DSessionEvents, DH2DSessionEventTypes, DH2DSessionStatus } from "./types"
import { DH2DSessionConfig } from "./types"

Expand Down Expand Up @@ -123,6 +123,7 @@ export function createDH2DSession(parent: HTMLElement, config?: DH2DSessionConfi

const completed = (async () => {
let sessionLogger = rootLogger
const evts = events<DHConnectionEvents>()
function emitStatus(_status: typeof DH2DSessionStatus[number]) {
sessionLogger.log(`status changing from ${status} to ${_status}`)
status = _status
Expand All @@ -140,7 +141,7 @@ export function createDH2DSession(parent: HTMLElement, config?: DH2DSessionConfi
send = notConnected
return
}
let connection = await withChild(rootAbortable, (_) => hooks.dh2d.connect(rootLogger, player, realConfig, _))
let connection = await withChild(rootAbortable, (_) => hooks.dh2d.connect(rootLogger, evts, player, realConfig, _))
try {
if (aborted) {
emitStatus("closed")
Expand All @@ -164,7 +165,7 @@ export function createDH2DSession(parent: HTMLElement, config?: DH2DSessionConfi
}
emitStatus("connected")
while (!aborted) {
await withChild(rootAbortable, _ => hooks.dh2d.untilFailed(sessionLogger, connection, realConfig, _))
await withChild(rootAbortable, _ => hooks.dh2d.untilFailed(sessionLogger, connection, player, realConfig, _))
emitStatus("reconnecting")
send = enqueueMessage
await withChild(rootAbortable, _ => hooks.dh2d.disconnect(sessionLogger, connection, player, _))
Expand All @@ -179,7 +180,7 @@ export function createDH2DSession(parent: HTMLElement, config?: DH2DSessionConfi
send = notConnected
return
}
connection = await withChild(rootAbortable, _ => hooks.dh2d.connect(rootLogger, player, realConfig, _))
connection = await withChild(rootAbortable, _ => hooks.dh2d.connect(rootLogger, evts, player, realConfig, _))
realConfig.sessionId = connection.sessionId
sessionLogger = rootLogger.push((_, ...args) => _(`[s:${connection.sessionId}]`, ...args))
connection.on("message", onMessage)
Expand All @@ -192,6 +193,9 @@ export function createDH2DSession(parent: HTMLElement, config?: DH2DSessionConfi
drainMessage(connection.ws)
}
} finally {
for (const evt of DHConnectionEventTypes) {
evts.off(evt)
}
emitStatus("closed")
send = notConnected
hooks.dh2d.disconnect(sessionLogger, connection, player, abortable())
Expand Down
6 changes: 6 additions & 0 deletions core/src/dh2d/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ export type DH2DSessionConfig = {
*/
reconnectInterval?: number

/**
* The maximum duration to allow video to stall before triggering a reconnect.
* @default 10 * 1000 (10 seconds)
*/
maxStallDuration?: number

/**
* The timeout for the establishing connection.
* @default 60 * 1000 (60 seconds)
Expand Down
8 changes: 4 additions & 4 deletions examples/react-app/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion examples/react-app/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
"preview": "vite preview"
},
"dependencies": {
"mtai": "^0.2.5-alpha.3",
"mtai": "^0.2.6-alpha.1",
"react": "^19.1.0",
"react-dom": "^19.1.0"
},
Expand Down
6 changes: 1 addition & 5 deletions examples/react-app/src/App.tsx
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
import { useState, useRef, useEffect } from 'react'
import type { DH2DSession, ComponentStatus, Avatar, Voice } from 'mtai'
import { getShareCode, observeComponents, updateComponent, cancelUpdateComponent, setShareCode, getLlmModels, getTtsVoices, getAvatars, setConfig } from 'mtai'
import { getShareCode, observeComponents, updateComponent, cancelUpdateComponent, setShareCode, getLlmModels, getTtsVoices, getAvatars } from 'mtai'
import { DH2D } from './dh2d'
import './App.css'

// setConfig({
// endpoint: 'http://192.168.4.50:32101'
// })
interface ModalProps {
isOpen: boolean;
title: string;
Expand Down
1 change: 1 addition & 0 deletions examples/react-app/src/dh2d.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ export const DH2D: FC<{

session.on('message', (message) => {
if (message.type === 'status_change') {
console.log('status_change', message.status);
upstreamStatus.value = message.status;
status.value = message.status;
} else if (message.type === 'asr_session') {
Expand Down
3 changes: 0 additions & 3 deletions examples/react-app/src/main.tsx
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
import { StrictMode } from 'react'
import { createRoot } from 'react-dom/client'
import './index.css'
import App from './App.tsx'

createRoot(document.getElementById('root')!).render(
// <StrictMode>
<App />
// </StrictMode>,
)