-
Notifications
You must be signed in to change notification settings - Fork 132
Shubhra/ajs 31 refactor vad with streams #390
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Shubhra/ajs 31 refactor vad with streams #390
Conversation
…hubhra/design-doc
|
I tried to get this to work by implementing a You can see my progress on this commit. dd14362 |
@@ -80,46 +84,73 @@ export abstract class VAD extends (EventEmitter as new () => TypedEmitter<VADCal | |||
|
|||
export abstract class VADStream implements AsyncIterableIterator<VADEvent> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this should implement AsyncIterableIterator
. I think it would be better to have a method
get stream(): ReadableStream<VADEvent> {
return this.output.readable
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you elaborate on why (out of curiousity)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah this isn't a huge issue for the VAD, but the other components (LLM/TTS/STT) expect the output to be a ReadableStream
for their default implementations (see this comment). This way the default usage of the components can just be
static default = {
async [stt/llm/tts]Node(inputStream: ReadableStream<...>): Promise<ReadableStream<...>> {
...
stream = [stt/llm/tts].stream()
stream.updateInputStream(inputStream)
return stream.stream()
}
}
Reference python implementation
I just want to keep all the [STT/LLM/TTS/VAD]Stream
classes similar.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Although in this case it makes more sense for VADStream
to be called VADStreamManager
or something.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, that makes sense to me!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll do this in a separate PR. There are some tricky things with the legacy implementation not worth wrestling right now since we will delete that anyway. https://linear.app/livekit/issue/AJS-48/remove-iterator-from-vadsream
agents/src/voice/agent_activity.ts
Outdated
// eslint-disable-next-line @typescript-eslint/no-unused-vars | ||
onVADInferenceDone(ev: VADEvent): void { | ||
this.logger.info('VAD inference done', ev); | ||
// this.logger.info('VAD inference done', ev); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ignore this. Forgot to pick this up form another PR's changes. Will resolve with merge conflict
this.vadStreamProcessor = this.vadTask().catch((err) => { | ||
this.logger.error('Error in VAD task', err); | ||
// raise the error | ||
throw err; | ||
this.logger.error(`Error in VAD task: ${err}`); | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the proper way to handle errors from these async functions we run in node? I'll do some digging on this. Ideally we want to raise them rather than swallow them.
@@ -37,7 +37,9 @@ export class AudioRecognition { | |||
|
|||
start() { | |||
this.vadStreamProcessor = this.vadTask().catch((err) => { | |||
this.logger.error('Error in VAD task', err); | |||
// raise the error | |||
throw err; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this will always throw as an unhandled exception, which we'd want to avoid.
You're trying to handle an error in an async task from within a non-async method. I think catch
is the best you can do there
Refactor the streaming logic into a clean interface. The streaming logic is similar to what we need for LLM And TTS and STT. https://linear.app/livekit/issue/AJS-43/stream-wrapper-that-takes-multiple-sources-and-combine-them |
Refactor the VAD component to use WHATWG streams.
We use two Identity TransformStreams to represent input and output.
Some parts of the
VAD
component are deprecated because of this refactor. Will remove once we do away withVoicePipelineAgent
. Keeping it around for now because we're still developing.