Skip to content

Commit

Permalink
feat: update the stream service to better use fromEvent
Browse files Browse the repository at this point in the history
  • Loading branch information
jmcdo29 committed Nov 4, 2021
1 parent 5ced785 commit b7ffd2f
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 12 deletions.
5 changes: 3 additions & 2 deletions packages/cli/src/ogma.command.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Command, CommandRunner, Option } from 'nest-commander';
import { Color, OgmaLog } from '@ogma/common';
import { from, iif, of } from 'rxjs';
import { filter, map, mergeMap, tap } from 'rxjs/operators';
import { filter, map, mergeMap, takeUntil, tap } from 'rxjs/operators';
import { FileService } from './file.service';
import { badFormat } from './messages';
import { OgmaGetterService } from './ogma-getters.service';
Expand Down Expand Up @@ -43,7 +43,7 @@ export class OgmaCommand implements CommandRunner {

private async runForStdin(options: { color: boolean }): Promise<void> {
return new Promise((resolve, reject) => {
const log$ = this.streamService.readFromStream(process.stdin);
const { log: log$, done: done$ } = this.streamService.readFromStream(process.stdin);
log$
.pipe(
mergeMap((val) => {
Expand All @@ -52,6 +52,7 @@ export class OgmaCommand implements CommandRunner {
filter((log) => this.checkOgmaFormat(log)),
map((jsonLogString) => JSON.parse(jsonLogString)),
tap((logString) => this.writeLog(logString, options.color)),
takeUntil(done$),
)
.subscribe({
error: reject,
Expand Down
18 changes: 8 additions & 10 deletions packages/cli/src/stream.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,13 @@ import { Readable } from 'stream';

@Injectable()
export class StreamService {
readFromStream(stream: Readable): Observable<string> {
stream.on('readable', () => {
while (null !== stream.read()) {
/* no op for data event */
}
});
return fromEvent(stream, 'data').pipe(
filter((val) => Buffer.isBuffer(val)),
map((val: Buffer) => val.toString('utf-8')),
);
readFromStream(stream: Readable): { log: Observable<string>; done: Observable<any> } {
return {
log: fromEvent(stream, 'data').pipe(
filter((val) => Buffer.isBuffer(val)),
map((val: Buffer) => val.toString('utf-8')),
),
done: fromEvent(stream, 'end').pipe(map(() => true)),
};
}
}

0 comments on commit b7ffd2f

Please sign in to comment.