Skip to content

Commit

Permalink
fix: eventstream
Browse files Browse the repository at this point in the history
  • Loading branch information
xiangsx committed Jun 13, 2023
1 parent 0104b14 commit a0303a3
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 8 deletions.
12 changes: 7 additions & 5 deletions model/chatdemo/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import {AxiosInstance, AxiosRequestConfig, CreateAxiosDefaults} from "axios";
import {CreateAxiosProxy} from "../../utils/proxyAgent";
import es from "event-stream";
import {ErrorData, Event, EventStream, MessageData, parseJSON} from "../../utils";
import {randomUUID} from "crypto";
import {v4} from "uuid";
import moment from "moment";

Expand Down Expand Up @@ -50,10 +49,10 @@ export class ChatDemo extends Chat {
case Event.done:
break;
case Event.message:
result.content += (data as MessageData).content
result.content += (data as MessageData).content || '';
break;
case Event.error:
result.error = (data as ErrorData).error
result.error = (data as ErrorData).error;
break;
}
}, () => {
Expand All @@ -76,7 +75,6 @@ export class ChatDemo extends Chat {
res.data.pipe(es.split(/\r?\n\r?\n/)).pipe(es.map(async (chunk: any, cb: any) => {
const dataStr = chunk.replace('data: ', '');
if (!dataStr) {
stream.end();
return;
}
const data = parseJSON(dataStr, {} as any);
Expand All @@ -85,7 +83,11 @@ export class ChatDemo extends Chat {
stream.end();
return;
}
const [{delta: {content = ""}}] = data.choices;
const [{delta: {content = ""}, finish_reason}] = data.choices;
if (finish_reason === 'stop') {
stream.end();
return;
}
stream.write(Event.message, {content});
}))
} catch (e: any) {
Expand Down
13 changes: 10 additions & 3 deletions utils/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ export function parseJSON<T>(str: string, defaultObj: T): T {
try {
return JSON.parse(str)
} catch (e) {
console.error(str, e);
console.log(str);
return defaultObj;
}
}
Expand Down Expand Up @@ -90,6 +90,10 @@ export type DataCB<T extends Event> = (event: T, data: Data<T>) => void
export class EventStream {
private readonly pt: PassThrough = new PassThrough();

constructor() {
this.pt.setEncoding('utf-8');
}

write<T extends Event>(event: T, data: Data<T>) {
this.pt.write(`event: ${event}\n`, 'utf-8');
this.pt.write(`data: ${JSON.stringify(data)}\n\n`, 'utf-8');
Expand All @@ -105,8 +109,11 @@ export class EventStream {

read(dataCB: DataCB<Event>, closeCB: () => void) {
this.pt.setEncoding('utf-8');
this.pt.pipe(es.split('\n\n').pipe(es.map(async (chunk: any, cb: any) => {
this.pt.pipe(es.split('\n\n')).pipe(es.map(async (chunk: any, cb: any) => {
const res = chunk.toString()
if (!res) {
return;
}
const [eventStr, dataStr] = res.split('\n');
const event: Event = eventStr.replace('event: ', '');
if (!(event in Event)) {
Expand All @@ -115,7 +122,7 @@ export class EventStream {
}
const data = parseJSON(dataStr.replace('data: ', ''), {} as Data<Event>);
return dataCB(event, data);
})))
}))
this.pt.on("close", closeCB)
}
}
Expand Down

0 comments on commit a0303a3

Please sign in to comment.