observer-js
is a Node.js library for monitoring WebRTC client data. It processes statistical samples from clients, organizes them into calls and participants, tracks a wide range of metrics, detects common issues, and calculates quality scores. This enables real-time insights into WebRTC session performance.
This library is a core component of the ObserverTC ecosystem, designed to provide robust server-side monitoring capabilities for WebRTC applications.
- Hierarchical Data Model: Organizes data into
Observer
->ObservedCall
->ObservedClient
->ObservedPeerConnection
and further into streams and data channels. - Comprehensive Metrics: Tracks a wide array of WebRTC statistics including RTT, jitter, packet loss, codecs, ICE states, TURN usage, bandwidth, and more.
- Automatic Entity Management: Can automatically create and manage call and client entities based on incoming data samples.
- Issue Detection: Built-in detectors for common WebRTC problems.
- Quality Scoring: Calculates quality scores for calls and clients.
- Event-Driven: Emits events for significant state changes, new entities, and detected issues.
- Configurable Update Policies: Flexible control over how and when metrics are processed and updated.
- TypeScript Support: Written in TypeScript, providing strong typing and intellisense.
- Extensible: Supports custom application data (
appData
) and integration with external schema definitions (e.g.,observertc/schemas
).
npm install @observertc/observer-js
# or
yarn add @observertc/observer-js
import { Observer, ObserverConfig } from '@observertc/observer-js';
import { ClientSample } from '@observertc/schemas'; // Assuming you use the official schemas
// 1. Configure the Observer
const observerConfig: ObserverConfig = {
updatePolicy: 'update-on-interval',
updateIntervalInMs: 5000, // Update observer every 5 seconds
defaultCallUpdatePolicy: 'update-on-any-client-updated', // Calls update when any client sends data
};
const observer = new Observer(observerConfig);
// 2. Listen to events
observer.on('newcall', (call) => {
console.log(`[Observer] New call detected: ${call.callId}`);
call.on('newclient', (client) => {
console.log(`[Call: ${call.callId}] New client joined: ${client.clientId}`);
client.on('issue', (issue) => {
console.warn(`[Client: ${client.clientId}] Issue: ${issue.type} - ${issue.severity} - ${issue.description}`);
});
});
call.on('update', () => {
console.log(
`[Call: ${call.callId}] Metrics updated. Score: ${call.score?.toFixed(1)}, Clients: ${call.numberOfClients}`
);
});
});
// 3. Process Client Samples
// (ClientSample typically comes from your application after processing getStats() output)
function processClientStats(rawStats: any, callId: string, clientId: string) {
// Transform rawStats into the ClientSample format
// This is a placeholder for your actual transformation logic
const sample: ClientSample = {
callId,
clientId,
timestamp: Date.now(),
// ...populate with transformed stats from rawStats, adhering to the ClientSample schema
// from github.com/observertc/schemas
};
observer.accept(sample);
}
// Example usage:
// const myRawClientStats = getStatsFromClient();
// processClientStats(myRawClientStats, 'myMeeting123', 'userABC');
// 4. Cleanup when done
// process.on('SIGINT', () => observer.close());
The following sections provide a comprehensive guide to observer-js
.
(This section is identical to the introductory paragraph at the top of this README)
- Client-Side: Your application collects WebRTC statistics (e.g., via
RTCPeerConnection.getStats()
). - Transformation: These raw stats are transformed into the
ClientSample
schema (ideally from observertc/schemas). - Ingestion: The
ClientSample
is passed to theobserver.accept()
method. - Processing:
observer-js
processes the sample, updating or creating relevant entities (ObservedCall
,ObservedClient
,ObservedPeerConnection
, etc.) and their metrics. - Analysis: Metrics are analyzed for issue detection and quality scoring.
- Events: Events are emitted for significant state changes, new issues, or updates.
Observer
: The root object, managing multiple calls and global settings.ObservedCall
: Represents a distinct call session.ObservedClient
: Represents an individual participant within a call.ObservedPeerConnection
: Represents a WebRTC RTCPeerConnection of a client.ObservedInboundRtpStream
/ObservedOutboundRtpStream
: Tracks individual media streams.ObservedDataChannel
: Tracks data channels.
ObservedTURN
: Tracks global TURN server usage metrics across the observer.
When observer.accept(sample)
is called:
- If an
ObservedCall
forsample.callId
doesn't exist, it's typically created. - If an
ObservedClient
forsample.clientId
within that call doesn't exist, it's typically created. - Peer connections, streams, and data channels are similarly managed based on IDs in the sample.
The library aggregates a wide array of metrics at each level of the hierarchy, including (but not limited to):
- RTT, jitter, packet loss
- Bytes sent/received (audio, video, data)
- Codec information
- ICE connection details, TURN usage
- Stream/track states (muted, enabled)
- Frame rates, resolutions
- Bandwidth estimations
A Detectors
system analyzes metrics to identify common WebRTC issues (e.g., high packet loss, low audio levels, frozen video, connection setup problems). Issues are reported via events.
ScoreCalculator
components assess the quality of calls and clients based on metrics and detected issues, typically resulting in a numerical score (e.g., 0.0 to 5.0).
The library uses Node.js EventEmitter
to signal various occurrences, allowing applications to react to changes in real-time.
Manages all monitored calls and global observer state.
Configuration (ObserverConfig
)
export type ObserverConfig<AppData extends Record<string, unknown> = Record<string, unknown>> = {
updatePolicy?: 'update-on-any-call-updated' | 'update-when-all-call-updated' | 'update-on-interval';
updateIntervalInMs?: number; // Used if updatePolicy is 'update-on-interval'
defaultCallUpdatePolicy?: ObservedCallSettings['updatePolicy'];
defaultCallUpdateIntervalInMs?: number;
appData?: AppData; // Custom data for this observer instance
};
Constructor
new Observer<AppData>(config?: ObserverConfig<AppData>)
config
: Optional. Defaults:updatePolicy: 'update-when-all-call-updated'
.
Key Properties
observedCalls: Map<string, ObservedCall>
: Active calls.observedTURN: ObservedTURN
: Aggregated TURN metrics.appData: AppData | undefined
: Custom application data.closed: boolean
: True ifclose()
has been called.- Counters:
totalAddedCall
,totalRemovedCall
, RTT buckets,totalClientIssues
,numberOfClientsUsingTurn
,numberOfClients
,numberOfPeerConnections
, etc.
Key Methods
createObservedCall<T>(settings: ObservedCallSettings<T>): ObservedCall<T>
getObservedCall<T>(callId: string): ObservedCall<T> | undefined
accept(sample: ClientSample): void
: A convenience method to feed WebRTC stats. Ifsample.callId
andsample.clientId
are provided, it will route the sample to the appropriateObservedCall
andObservedClient
, creating them if they don't exist. The core sample processing for an existing client happens within theObservedClient
's ownaccept
or update mechanism.update(): void
: Manually trigger an update cycle (behavior depends onupdatePolicy
).close(): void
: Cleans up resources for the observer and all its calls.createEventMonitor<CTX>(ctx?: CTX): ObserverEventMonitor<CTX>
: For contextual event listening.
Events (ObserverEvents
)
'newcall' (call: ObservedCall)
'call-updated' (call: ObservedCall)
'client-event' (client: ObservedClient, event: ClientEvent)
'client-issue' (client: ObservedClient, issue: ClientIssue)
'client-metadata' (client: ObservedClient, metadata: ClientMetaData)
'client-extension-stats' (client: ObservedClient, stats: ExtensionStat)
'update' ()
'close' ()
Represents a single call session.
Configuration (ObservedCallSettings
)
export type ObservedCallSettings<AppData extends Record<string, unknown> = Record<string, unknown>> = {
callId: string;
appData?: AppData;
updatePolicy?: 'update-on-any-client-updated' | 'update-when-all-client-updated' | 'update-on-interval';
updateIntervalInMs?: number; // Used if updatePolicy is 'update-on-interval'
remoteTrackResolvePolicy?: 'mediasoup-sfu'; // For specific SFU integration
};
Key Properties
callId: string
appData: AppData | undefined
numberOfClients: number
score: number | undefined
: Overall call quality score.observedClients: Map<string, ObservedClient>
- Counters:
totalAddedClients
,totalRemovedClients
,numberOfIssues
, RTT buckets, total bytes sent/received (audio/video/data), etc.
Key Methods
createObservedClient<T>(settings: ObservedClientSettings<T>): ObservedClient<T>
getObservedClient<T>(clientId: string): ObservedClient<T> | undefined
update(): void
close(): void
createEventMonitor<CTX>(ctx?: CTX): ObservedCallEventMonitor<CTX>
Events (Emitted via ObservedCall
instance)
'newclient' (client: ObservedClient)
'empty' ()
: When the last client leaves.'not-empty' ()
: When the first client joins an empty call.'update' ()
'close' ()
Represents a participant in a call.
Configuration (ObservedClientSettings
)
export type ObservedClientSettings<AppData extends Record<string, unknown> = Record<string, unknown>> = {
clientId: string;
appData?: AppData;
// Potentially other client-specific settings
};
Key Properties
clientId: string
call: ObservedCall
: Reference to the parent call.appData: AppData | undefined
score: number | undefined
: Client quality score.numberOfPeerConnections: number
usingTURN: boolean
observedPeerConnections: Map<string, ObservedPeerConnection>
- Counters:
numberOfIssues
, RTT buckets, total bytes sent/received,availableIncomingBitrate
,availableOutgoingBitrate
, etc.
Key Methods
accept(sample: ClientSample): void
: (Or a similar internal update method called byObserver.accept
orObservedCall
) Processes aClientSample
specific to this client, updating its metrics, peer connections, streams, etc. This is the primary point where a client's detailed WebRTC statistics are processed.createObservedPeerConnection<T>(settings: ObservedPeerConnectionSettings<T>): ObservedPeerConnection<T>
getObservedPeerConnection<T>(peerConnectionId: string): ObservedPeerConnection<T> | undefined
update(): void
close(): void
createEventMonitor<CTX>(ctx?: CTX): ObservedClientEventMonitor<CTX>
Events (Emitted via ObservedClient
instance)
'joined' ()
'left' ()
'update' ()
'close' ()
'newpeerconnection' (pc: ObservedPeerConnection)
'issue' (issue: ClientIssue)
(and other specific issue events)
Represents an RTCPeerConnection
.
- Tracks ICE connection state, data channel stats, stream stats.
- Holds
ObservedInboundRtpStream
,ObservedOutboundRtpStream
, andObservedDataChannel
instances.
- Track metrics for individual media streams (audio/video) like codec, packets lost/received, jitter, bytes, etc.
- Tracks metrics for data channels like state, messages sent/received, bytes.
This is the primary input data structure passed to observer.accept()
. It's a comprehensive object that should mirror the information obtainable from WebRTC getStats()
and other client-side states. Key fields include:
callId
,clientId
,timestamp
peerConnections: RTCPeerConnectionStats[]
inboundRtpStreams: RTCInboundRtpStreamStats[]
outboundRtpStreams: RTCOutboundRtpStreamStats[]
remoteInboundRtpStreams: RTCRemoteInboundRtpStreamStats[]
remoteOutboundRtpStreams: RTCRemoteOutboundRtpStreamStats[]
dataChannels: RTCDataChannelStats[]
iceLocalCandidates: RTCIceCandidateStats[]
,iceRemoteCandidates: RTCIceCandidateStats[]
,iceCandidatePairs: RTCIceCandidatePairStats[]
mediaSources: RTCAudioSourceStats[] / RTCVideoSourceStats[]
tracks: RTCMediaStreamTrackStats[]
certificates: RTCCertificateStats[]
codecs: RTCCodecStats[]
transports: RTCIceTransportStats[]
(or similar depending on spec version)browser
,engine
,platform
,os
(client environment metadata)userMediaErrors
,iceConnectionStates
,connectionStates
(client-reported events/states)extensionStats
(for custom data)
(Refer to the observertc/schemas repository, particularly the ClientSample.ts
definition, for the exact and complete structure.)
Control how frequently entities re-calculate metrics and emit update
events.
Observer Level (ObserverConfig.updatePolicy
)
'update-on-any-call-updated'
: Observer updates if any of its calls update.'update-when-all-call-updated'
: Observer updates after all its calls update. (Default)'update-on-interval'
: Observer updates atObserverConfig.updateIntervalInMs
.
Call Level (ObservedCallSettings.updatePolicy
or ObserverConfig.defaultCallUpdatePolicy
)
'update-on-any-client-updated'
: Call updates if any of its clients update.'update-when-all-client-updated'
: Call updates after all its clients update.'update-on-interval'
: Call updates at itsupdateIntervalInMs
.
ObserverConfig.updateIntervalInMs
ObserverConfig.defaultCallUpdateIntervalInMs
ObservedCallSettings.updateIntervalInMs
Associate custom context with Observer
, ObservedCall
, and ObservedClient
instances using generics.
interface MyCallAppData {
meetingTitle: string;
scheduledAt: Date;
}
const call = observer.createObservedCall<MyCallAppData>({
callId: 'call1',
appData: { meetingTitle: 'Team Sync', scheduledAt: new Date() },
});
console.log(call.appData?.meetingTitle);
The observer-js
library provides two primary ways to associate custom information with its entities: appData
and attachments
. Understanding their distinct purposes is key for effective use.
appData
(Application Data)
- Purpose:
appData
is designed to hold structured, typed, and relatively static metadata about an entity (Observer
,ObservedCall
,ObservedClient
). This data is typically set at the time of entity creation and is directly accessible as a property of the entity instance. - Typing: It is strongly typed using generics (e.g.,
Observer<MyObserverAppData>
). This provides type safety and autocompletion in TypeScript environments. - Mutability: While technically mutable (if the object assigned is mutable), it's generally intended for information that defines or describes the entity and doesn't change frequently during its lifecycle.
- Accessibility: Directly accessible via
entity.appData
. - Use Cases:
- Storing application-specific identifiers (e.g.,
userId
,roomId
,meetingType
). - Configuration flags relevant to how your application interprets this entity.
- Descriptive information (e.g.,
clientDeviceType
,callRegion
).
- Storing application-specific identifiers (e.g.,
attachments
(Arbitrary Attachments)
- Purpose:
attachments
(if implemented as aMap<string, unknown>
or similar mechanism on entities) are meant for associating arbitrary, often dynamic, or less structured data with an entity. This can be useful for temporary state, inter-plugin communication, or data that doesn't fit neatly into a predefinedappData
schema. - Typing: Typically less strictly typed (e.g.,
unknown
orany
values in a Map). Consumers of attachments need to perform their own type checks or assertions. - Mutability: Designed to be more dynamic. Attachments can be added, updated, or removed throughout the entity's lifecycle.
- Accessibility: Accessed via methods like
entity.setAttachment(key, value)
,entity.getAttachment(key)
,entity.removeAttachment(key)
. - Use Cases:
- Storing temporary state calculated by one part of your application to be read by another (e.g., a custom issue detector plugin might attach intermediate findings).
- Caching results of expensive computations related to the entity.
- Allowing different modules or plugins to associate their own private data with an observer entity without needing to modify its core
appData
type. - Storing large binary data or complex objects that are not part of the core descriptive metadata.
When to Use Which:
- Use
appData
for:- Core, descriptive metadata that is known at creation time or changes infrequently.
- Data that benefits from strong typing and is integral to your application's understanding of the entity.
- Use
attachments
for:- Dynamic, temporary, or loosely structured data.
- Data added by different, potentially independent, parts of your system or plugins.
- Information that doesn't need to be part of the primary, typed
appData
schema.
If attachments
are not yet a formal feature, this section can serve as a design consideration or be adapted if you introduce such a mechanism. If attachments
are already present, ensure the description matches their actual implementation.
For SFU scenarios, especially with MediaSoup:
ObservedCallSettings.remoteTrackResolvePolicy: 'mediasoup-sfu'
// filepath: /path/to/your/app.ts
import { Observer, ObserverConfig } from '@observertc/observer-js'; // Adjust path
import { ClientSample } from '@observertc/schemas'; // Adjust path if using official schemas
const observerConfig: ObserverConfig = {
updatePolicy: 'update-on-interval',
updateIntervalInMs: 5000,
defaultCallUpdatePolicy: 'update-on-any-client-updated',
};
const observer = new Observer(observerConfig);
observer.on('newcall', (call) => {
console.log(`[Observer] New call: ${call.callId}`);
call.on('update', () => {
console.log(`[Call: ${call.callId}] Updated. Clients: ${call.numberOfClients}, Score: ${call.score?.toFixed(1)}`);
});
call.on('newclient', (client) => {
console.log(`[Call: ${call.callId}] New client: ${client.clientId}`);
client.on('update', () => {
// console.log(`[Client: ${client.clientId}] Updated. Score: ${client.score?.toFixed(1)}`);
});
client.on('issue', (issue) => {
console.warn(`[Client: ${client.clientId}] Issue: ${issue.type} - ${issue.severity} - ${issue.description}`);
});
});
});
// Function to transform your app's WebRTC stats to ClientSample
function mapStatsToClientSample(appStats: any, callId: string, clientId: string): ClientSample {
// Detailed mapping logic here based on ClientSample.ts schema
// from github.com/observertc/schemas
return {
callId,
clientId,
timestamp: Date.now(),
// ... map all relevant stats fields ...
} as ClientSample; // Ensure all required fields are present
}
// Example: Receiving stats and processing
const rawStatsFromClient = {
/* ... your client's getStats() output ... */
};
const callId = 'meeting-alpha-123';
const clientId = 'user-xyz-789';
const sample = mapStatsToClientSample(rawStatsFromClient, callId, clientId);
observer.accept(sample);
// Later, on application shutdown:
// observer.close();
// ... observer setup ...
const call = observer.createObservedCall({
callId: 'scheduled-webinar-456',
updatePolicy: 'update-on-interval',
updateIntervalInMs: 10000,
});
const client1 = call.createObservedClient({ clientId: 'presenter-01' });
// Samples for 'presenter-01' in call 'scheduled-webinar-456' will update this client.
const call = observer.getObservedCall('meeting-alpha-123');
if (call) {
const callMonitor = call.createEventMonitor({ callId: call.callId, started: new Date() });
callMonitor.on('client-joined', (client, context) => {
console.log(`EVENT_MONITOR (${context.callId}): Client ${client.clientId} joined at ${new Date()}`);
});
callMonitor.on('issue-detected', (client, issue, context) => {
console.error(`EVENT_MONITOR (${context.callId}): Issue on ${client.clientId} - ${issue.description}`);
});
}
- Resource Management: Always call
observer.close()
,call.close()
, andclient.close()
when entities are no longer needed to free resources and stop timers. - Error Handling: Wrap calls to library methods in
try...catch
blocks where appropriate, especially for operations that might throw errors based on state (e.g., creating an entity that already exists if not usinggetOrCreate
patterns). - Event Listener Cleanup: If dynamically adding/removing listeners, ensure they are properly removed (e.g., using
emitter.off()
oremitter.removeListener()
) to prevent memory leaks, especially for short-lived monitored entities. ClientSample
Accuracy: The quality of monitoring heavily depends on the completeness and correctness of theClientSample
data provided. Ensure thorough mapping fromgetStats()
.- Update Policies: Choose update policies carefully based on the desired granularity of updates and performance considerations.
- Memory Leaks: Ensure
close()
is called on all entities. Check for unremoved event listeners. - No Events / Missing Updates:
- Verify
observer.accept()
is being called with correctly formattedClientSample
data. - Ensure
callId
andclientId
in samples match expectations. - Check if
updatePolicy
andupdateIntervalInMs
are configured as intended.
- Verify
- Debugging: Utilize
console.log
within event handlers at different levels (Observer, Call, Client) to trace data flow and state changes. UseappData
to add correlation IDs for easier debugging.
The library is written in TypeScript and provides type definitions.
Use generics with Observer
, ObservedCall
, and ObservedClient
to type your custom appData
.
interface MyClientAppData {
userId: string;
role: 'admin' | 'user';
}
const client = call.createObservedClient<MyClientAppData>({
clientId: 'user1',
appData: { userId: 'u-123', role: 'admin' },
});
// client.appData will be typed as MyClientAppData | undefined
(Placeholder for contribution guidelines - e.g., link to CONTRIBUTING.md, coding standards, pull request process)
This project is licensed under the MIT License.