Skip to content

Commit 60bc9f1

Browse files
committed
finalize
1 parent d4cbfcf commit 60bc9f1

File tree

3 files changed

+22
-18
lines changed

3 files changed

+22
-18
lines changed

agents/src/stt/stream_adapter.ts

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -53,11 +53,14 @@ export class StreamAdapterWrapper extends SpeechStream {
5353

5454
async #run() {
5555
const forwardInput = async () => {
56-
for await (const input of this.input) {
57-
if (input === SpeechStream.FLUSH_SENTINEL) {
56+
while (true) {
57+
const { done, value } = await this.inputReader.read();
58+
if (done) break;
59+
60+
if (value === SpeechStream.FLUSH_SENTINEL) {
5861
this.#vadStream.flush();
5962
} else {
60-
this.#vadStream.pushFrame(input);
63+
this.#vadStream.pushFrame(value);
6164
}
6265
}
6366
this.#vadStream.endInput();
@@ -67,18 +70,18 @@ export class StreamAdapterWrapper extends SpeechStream {
6770
for await (const ev of this.#vadStream) {
6871
switch (ev.type) {
6972
case VADEventType.START_OF_SPEECH:
70-
this.output.put({ type: SpeechEventType.START_OF_SPEECH });
73+
this.outputWriter.write({ type: SpeechEventType.START_OF_SPEECH });
7174
break;
7275
case VADEventType.END_OF_SPEECH:
73-
this.output.put({ type: SpeechEventType.END_OF_SPEECH });
76+
this.outputWriter.write({ type: SpeechEventType.END_OF_SPEECH });
7477

7578
try {
7679
const event = await this.#stt.recognize(ev.frames);
7780
if (!event.alternatives![0].text) {
7881
continue;
7982
}
8083

81-
this.output.put(event);
84+
this.outputWriter.write(event);
8285
break;
8386
} catch (error) {
8487
let logger = log();

agents/src/stt/stt.ts

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -151,16 +151,15 @@ export abstract class SpeechStream implements AsyncIterableIterator<SpeechEvent>
151151
AudioFrame | typeof SpeechStream.FLUSH_SENTINEL
152152
>;
153153
protected outputWriter: WritableStreamDefaultWriter<SpeechEvent>;
154-
154+
protected closed = false;
155+
protected inputClosed = false;
155156
abstract label: string;
156157
#stt: STT;
157158
private deferredInputStream: DeferredReadableStream<AudioFrame>;
158159
private logger = log();
159160
private inputWriter: WritableStreamDefaultWriter<AudioFrame | typeof SpeechStream.FLUSH_SENTINEL>;
160161
private outputReader: ReadableStreamDefaultReader<SpeechEvent>;
161162
private metricsStream: ReadableStream<SpeechEvent>;
162-
private closed = false;
163-
private inputClosed = false;
164163

165164
constructor(stt: STT) {
166165
this.#stt = stt;

plugins/deepgram/src/stt.ts

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,6 @@ export class SpeechStream extends stt.SpeechStream {
125125
constructor(stt: STT, opts: STTOptions) {
126126
super(stt);
127127
this.#opts = opts;
128-
this.closed = false;
129128
this.#audioEnergyFilter = new AudioEnergyFilter();
130129

131130
this.#run();
@@ -134,7 +133,7 @@ export class SpeechStream extends stt.SpeechStream {
134133
async #run(maxRetry = 32) {
135134
let retries = 0;
136135
let ws: WebSocket;
137-
while (!this.input.closed) {
136+
while (!this.inputClosed) {
138137
const streamURL = new URL(API_BASE_URL_V1);
139138
const params = {
140139
model: this.#opts.model,
@@ -193,7 +192,7 @@ export class SpeechStream extends stt.SpeechStream {
193192
}
194193
}
195194

196-
this.closed = true;
195+
this.close();
197196
}
198197

199198
updateOptions(opts: Partial<STTOptions>) {
@@ -222,7 +221,10 @@ export class SpeechStream extends stt.SpeechStream {
222221
samples100Ms,
223222
);
224223

225-
for await (const data of this.input) {
224+
while (true) {
225+
const { done, value: data } = await this.inputReader.read();
226+
if (done) break;
227+
226228
let frames: AudioFrame[];
227229
if (data === SpeechStream.FLUSH_SENTINEL) {
228230
frames = stream.flush();
@@ -270,7 +272,7 @@ export class SpeechStream extends stt.SpeechStream {
270272
// It's also possible we receive a transcript without a SpeechStarted event.
271273
if (this.#speaking) return;
272274
this.#speaking = true;
273-
this.queue.put({ type: stt.SpeechEventType.START_OF_SPEECH });
275+
this.outputWriter.write({ type: stt.SpeechEventType.START_OF_SPEECH });
274276
break;
275277
}
276278
// see this page:
@@ -288,16 +290,16 @@ export class SpeechStream extends stt.SpeechStream {
288290
if (alternatives[0] && alternatives[0].text) {
289291
if (!this.#speaking) {
290292
this.#speaking = true;
291-
this.queue.put({ type: stt.SpeechEventType.START_OF_SPEECH });
293+
this.outputWriter.write({ type: stt.SpeechEventType.START_OF_SPEECH });
292294
}
293295

294296
if (isFinal) {
295-
this.queue.put({
297+
this.outputWriter.write({
296298
type: stt.SpeechEventType.FINAL_TRANSCRIPT,
297299
alternatives: [alternatives[0], ...alternatives.slice(1)],
298300
});
299301
} else {
300-
this.queue.put({
302+
this.outputWriter.write({
301303
type: stt.SpeechEventType.INTERIM_TRANSCRIPT,
302304
alternatives: [alternatives[0], ...alternatives.slice(1)],
303305
});
@@ -309,7 +311,7 @@ export class SpeechStream extends stt.SpeechStream {
309311
// a non-empty transcript (deepgram doesn't have a SpeechEnded event)
310312
if (isEndpoint && this.#speaking) {
311313
this.#speaking = false;
312-
this.queue.put({ type: stt.SpeechEventType.END_OF_SPEECH });
314+
this.outputWriter.write({ type: stt.SpeechEventType.END_OF_SPEECH });
313315
}
314316

315317
break;

0 commit comments

Comments
 (0)