Skip to content

Shubhra/ajs 31 refactor vad with streams #390

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 42 commits into from
May 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
c27c7bc
init
Shubhrakanti May 1, 2025
cac38aa
deafults
Shubhrakanti May 1, 2025
01eac6e
agent import woking in example
Shubhrakanti May 4, 2025
a431af5
update gitignore
Shubhrakanti May 5, 2025
aabbaaa
get basic audio forwarding working
Shubhrakanti May 5, 2025
7c0c60b
forwarding audio to vad
Shubhrakanti May 6, 2025
52b7f81
fix test
Shubhrakanti May 6, 2025
90afce8
clean up room ui
Shubhrakanti May 7, 2025
a821d7f
clean up session code
Shubhrakanti May 7, 2025
e79d8f1
clean up Activity input stream resolver
Shubhrakanti May 7, 2025
c2966d7
clean up audio recognition and activity
Shubhrakanti May 7, 2025
2e0f79d
clean up
Shubhrakanti May 7, 2025
cce6eba
Update Livekit SDK version (#384)
Shubhrakanti May 7, 2025
86af51d
Run tests on dev branch (#382)
Shubhrakanti May 5, 2025
fd4594c
update audio stream cancel
Shubhrakanti May 7, 2025
31f8aa9
Merge remote-tracking branch 'origin/dev-1.0' into shubhra/design-doc
Shubhrakanti May 7, 2025
da1fe68
update logging
Shubhrakanti May 7, 2025
155a15f
working with source
Shubhrakanti May 7, 2025
ba2d45e
update naming
Shubhrakanti May 7, 2025
79c79bb
clean up
Shubhrakanti May 7, 2025
4b59d14
add commnet on stream compatibility
Shubhrakanti May 7, 2025
ae3c28c
Merge branch 'dev-1.0' of https://github.com/livekit/agents-js into s…
Shubhrakanti May 7, 2025
ba56ffd
clean up log
Shubhrakanti May 7, 2025
8eab520
clean up
Shubhrakanti May 7, 2025
dfdd424
undo
Shubhrakanti May 7, 2025
4447274
clean up
Shubhrakanti May 7, 2025
569802a
remove unused code
Shubhrakanti May 7, 2025
7cbbe1d
update types
Shubhrakanti May 8, 2025
366da68
fix type
Shubhrakanti May 8, 2025
584149b
fix bug with track not subscribing
Shubhrakanti May 8, 2025
be2137a
remove line
Shubhrakanti May 10, 2025
e212a01
Use DeferredStream helper (#388)
lukasIO May 13, 2025
7cb1e15
fix lint errors
Shubhrakanti May 13, 2025
7ab8914
fix test failtures
Shubhrakanti May 13, 2025
4ca5c99
save progress
Shubhrakanti May 14, 2025
dd14362
try using VAD source
Shubhrakanti May 14, 2025
bc50d9c
working with identity transform streams
Shubhrakanti May 15, 2025
7c3f31e
clean up
Shubhrakanti May 15, 2025
8a8d093
Merge branch 'dev-1.0' into shubhra/ajs-31-refactor-vad-with-streams
Shubhrakanti May 15, 2025
0084207
Merge branch 'dev-1.0' into shubhra/ajs-31-refactor-vad-with-streams
Shubhrakanti May 19, 2025
01b85f6
refactor with identity transofrm stream
Shubhrakanti May 19, 2025
c4a4fa1
build issue
Shubhrakanti May 19, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 65 additions & 32 deletions agents/src/vad.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,15 @@
import type { AudioFrame } from '@livekit/rtc-node';
import type { TypedEventEmitter as TypedEmitter } from '@livekit/typed-emitter';
import { EventEmitter } from 'node:events';
import type { ReadableStream } from 'node:stream/web';
import type {
ReadableStream,
ReadableStreamDefaultReader,
WritableStreamDefaultWriter,
} from 'node:stream/web';
import { log } from './log.js';
import type { VADMetrics } from './metrics/base.js';
import { DeferredReadableStream } from './stream/deferred_stream.js';
import { AsyncIterableQueue } from './utils.js';
import { IdentityTransform } from './stream/identity_transform.js';

export enum VADEventType {
START_OF_SPEECH,
Expand Down Expand Up @@ -80,46 +84,70 @@ export abstract class VAD extends (EventEmitter as new () => TypedEmitter<VADCal

export abstract class VADStream implements AsyncIterableIterator<VADEvent> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this should implement AsyncIterableIterator. I think it would be better to have a method

get stream(): ReadableStream<VADEvent> {
  return this.output.readable
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you elaborate on why (out of curiousity)?

Copy link
Contributor Author

@Shubhrakanti Shubhrakanti May 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this isn't a huge issue for the VAD, but the other components (LLM/TTS/STT) expect the output to be a ReadableStream for their default implementations (see this comment). This way the default usage of the components can just be

static default = {
     async [stt/llm/tts]Node(inputStream: ReadableStream<...>): Promise<ReadableStream<...>> {
          ...
          stream = [stt/llm/tts].stream()
          stream.updateInputStream(inputStream)
          return stream.stream()
     }
}

Reference python implementation

I just want to keep all the [STT/LLM/TTS/VAD]Stream classes similar.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although in this case it makes more sense for VADStream to be called VADStreamManager or something.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, that makes sense to me!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll do this in a separate PR. There are some tricky things with the legacy implementation not worth wrestling right now since we will delete that anyway. https://linear.app/livekit/issue/AJS-48/remove-iterator-from-vadsream

protected static readonly FLUSH_SENTINEL = Symbol('FLUSH_SENTINEL');
protected input = new AsyncIterableQueue<AudioFrame | typeof VADStream.FLUSH_SENTINEL>();
protected queue = new AsyncIterableQueue<VADEvent>();
protected output = new AsyncIterableQueue<VADEvent>();
protected input = new IdentityTransform<AudioFrame | typeof VADStream.FLUSH_SENTINEL>();
protected output = new IdentityTransform<VADEvent>();
protected inputWriter: WritableStreamDefaultWriter<AudioFrame | typeof VADStream.FLUSH_SENTINEL>;
protected inputReader: ReadableStreamDefaultReader<AudioFrame | typeof VADStream.FLUSH_SENTINEL>;
protected outputWriter: WritableStreamDefaultWriter<VADEvent>;
protected outputReader: ReadableStreamDefaultReader<VADEvent>;
protected closed = false;
protected inputClosed = false;

#vad: VAD;
#lastActivityTime = BigInt(0);
private logger = log();
private deferredInputStream: DeferredReadableStream<AudioFrame>;

private metricsStream: ReadableStream<VADEvent>;
constructor(vad: VAD) {
this.#vad = vad;
this.deferredInputStream = new DeferredReadableStream<AudioFrame>();

this.inputWriter = this.input.writable.getWriter();
this.inputReader = this.input.readable.getReader();
this.outputWriter = this.output.writable.getWriter();

const [outputStream, metricsStream] = this.output.readable.tee();
this.metricsStream = metricsStream;
this.outputReader = outputStream.getReader();

this.pumpDeferredStream();
this.monitorMetrics();
this.mainTask();
}

protected async mainTask() {
// This is just a placeholder since VAD isn't implemented with the streams API yet.
/**
* Reads from the deferred input stream and forwards chunks to the input writer.
*
* Note: we can't just do this.deferredInputStream.stream.pipeTo(this.input.writable)
* because the inputWriter locks the this.input.writable stream. All writes must go through
* the inputWriter.
*/
private async pumpDeferredStream() {
const reader = this.deferredInputStream.stream.getReader();
try {
const inputStream = this.deferredInputStream.stream;
const reader = inputStream.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) {
break;
}
this.pushFrame(value);
if (done) break;
await this.inputWriter.write(value);
}
} catch (error) {
this.logger.error('Error in VADStream mainTask:', error);
} catch (e) {
this.logger.error(`Error pumping deferred stream: ${e}`);
throw e;
} finally {
reader.releaseLock();
}
}

protected async monitorMetrics() {
let inferenceDurationTotal = 0;
let inferenceCount = 0;

for await (const event of this.queue) {
this.output.put(event);
switch (event.type) {
const metricsReader = this.metricsStream.getReader();
while (true) {
const { done, value } = await metricsReader.read();
if (done) {
break;
}
switch (value.type) {
case VADEventType.START_OF_SPEECH:
inferenceCount++;
if (inferenceCount >= 1 / this.#vad.capabilities.updateInterval) {
Expand All @@ -143,51 +171,56 @@ export abstract class VADStream implements AsyncIterableIterator<VADEvent> {
break;
}
}
this.output.close();
}

updateInputStream(audioStream: ReadableStream<AudioFrame>) {
this.deferredInputStream.setSource(audioStream);
}

/** @deprecated Use `updateInputStream` instead */
pushFrame(frame: AudioFrame) {
if (this.input.closed) {
// TODO(AJS-395): remove this method
if (this.inputClosed) {
throw new Error('Input is closed');
}
if (this.closed) {
throw new Error('Stream is closed');
}
this.input.put(frame);
this.inputWriter.write(frame);
}

flush() {
if (this.input.closed) {
if (this.inputClosed) {
throw new Error('Input is closed');
}
if (this.closed) {
throw new Error('Stream is closed');
}
this.input.put(VADStream.FLUSH_SENTINEL);
this.inputWriter.write(VADStream.FLUSH_SENTINEL);
}

endInput() {
if (this.input.closed) {
if (this.inputClosed) {
throw new Error('Input is closed');
}
if (this.closed) {
throw new Error('Stream is closed');
}
this.input.close();
this.inputClosed = true;
this.input.writable.close();
}

next(): Promise<IteratorResult<VADEvent>> {
return this.output.next();
async next(): Promise<IteratorResult<VADEvent>> {
return this.outputReader.read().then(({ done, value }) => {
if (done) {
return { done: true, value: undefined };
}
return { done: false, value };
});
}

close() {
this.input.close();
this.queue.close();
this.output.close();
this.input.writable.close();
this.closed = true;
}

Expand Down
4 changes: 2 additions & 2 deletions agents/src/voice/audio_recognition.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,10 @@ export class AudioRecognition {
async start() {
const [vadInputStream, sttInputStream] = this.deferredInputStream.stream.tee();
this.vadStreamProcessor = this.vadTask(vadInputStream).catch((err) => {
throw err;
this.logger.error(`Error in VAD task: ${err}`);
});
this.sttStreamProcessor = this.sttTask(sttInputStream).catch((err) => {
throw err;
this.logger.error(`Error in STT task: ${err}`);
});
}

Expand Down
13 changes: 9 additions & 4 deletions plugins/silero/src/vad.ts
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,12 @@ export class VADStream extends baseStream {
// used to avoid drift when the sampleRate ratio is not an integer
let inputCopyRemainingFrac = 0.0;

for await (const frame of this.input) {
while (true) {
const { done, value: frame } = await this.inputReader.read();
if (done) {
break;
}

if (typeof frame === 'symbol') {
continue; // ignore flush sentinel for now
}
Expand Down Expand Up @@ -255,7 +260,7 @@ export class VADStream extends baseStream {
pubSilenceDuration += inferenceDuration;
}

this.queue.put({
this.outputWriter.write({
type: VADEventType.INFERENCE_DONE,
samplesIndex: pubCurrentSample,
timestamp: pubTimestamp,
Expand Down Expand Up @@ -309,7 +314,7 @@ export class VADStream extends baseStream {
pubSilenceDuration = 0;
pubSpeechDuration = speechThresholdDuration;

this.queue.put({
this.outputWriter.write({
type: VADEventType.START_OF_SPEECH,
samplesIndex: pubCurrentSample,
timestamp: pubTimestamp,
Expand All @@ -336,7 +341,7 @@ export class VADStream extends baseStream {
pubSpeechDuration = 0;
pubSilenceDuration = silenceThresholdDuration;

this.queue.put({
this.outputWriter.write({
type: VADEventType.END_OF_SPEECH,
samplesIndex: pubCurrentSample,
timestamp: pubTimestamp,
Expand Down