Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .eslintrc.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ module.exports = {
files: ['*.ts'],
extends: ['@metamask/eslint-config-typescript'],
},
{
files: ['*.d.ts'],
rules: {
'import/unambiguous': 'off',
},
},
],
ignorePatterns: ['!.eslintrc.js', 'dist/'],
};
4 changes: 1 addition & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
},
"dependencies": {
"@metamask/safe-event-emitter": "^3.0.0",
"through2": "^2.0.3"
"readable-stream": "^3.6.2"
},
"devDependencies": {
"@lavamoat/allow-scripts": "^2.3.1",
Expand All @@ -37,14 +37,12 @@
"@metamask/eslint-config-typescript": "^7.0.1",
"@types/node": "^10.17.42",
"@types/tape": "^4.13.4",
"@types/through2": "^2.0.36",
"@typescript-eslint/eslint-plugin": "^4.28.1",
"@typescript-eslint/parser": "^4.28.1",
"eslint": "^7.29.0",
"eslint-config-prettier": "^8.3.0",
"eslint-plugin-import": "^2.23.4",
"eslint-plugin-prettier": "^3.4.0",
"mississippi": "^1.2.0",
"prettier": "^2.3.2",
"prettier-plugin-packagejson": "^2.2.11",
"rimraf": "^3.0.2",
Expand Down
2 changes: 1 addition & 1 deletion src/asStream.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Duplex as DuplexStream } from 'stream';
import { Duplex as DuplexStream } from 'readable-stream';

import { ObservableStore } from './ObservableStore';

Expand Down
4 changes: 4 additions & 0 deletions src/readable-stream.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
// eslint-disable import/unambiguous
declare module 'readable-stream' {
export { Duplex, Transform, Writable, pipeline } from 'stream';
}
24 changes: 14 additions & 10 deletions src/transform.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
import { obj as TransformStream } from 'through2';
import { Transform } from 'readable-stream';

export function storeTransformStream<T, U>(syncTransformFn: (state: T) => U) {
return TransformStream((state, _encoding, cb) => {
try {
const newState = syncTransformFn(state);
cb(null, newState);
return undefined;
} catch (err) {
cb(err);
return undefined;
}
const t = new Transform({
objectMode: true,
transform: (state, _encoding, cb) => {
try {
const newState = syncTransformFn(state);
cb(undefined, newState);
return undefined;
} catch (err) {
cb(err);
return undefined;
}
},
});
return t;
}
28 changes: 19 additions & 9 deletions test/stream.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import { Transform as TransformStream } from 'stream';
import {
pipeline,
Transform as TransformStream,
Writable as WritableStream,
} from 'readable-stream';
import test from 'tape';
import { pipe, to as writeStream } from 'mississippi';
import { ObservableStore, storeAsStream } from '../src';

const TEST_WAIT = 200;
Expand All @@ -20,7 +23,7 @@ test('basic stream', function (t) {
storeTwo.once('update', nextValueCheck);
});

pipe(storeAsStream(storeOne), storeAsStream(storeTwo));
pipeline(storeAsStream(storeOne), storeAsStream(storeTwo));

storeOne.putState(nextState);

Expand Down Expand Up @@ -60,9 +63,9 @@ test('double stream', function (t) {
);
});

pipe(storeAsStream(storeOne), storeAsStream(storeTwo));
pipeline(storeAsStream(storeOne), storeAsStream(storeTwo));

pipe(storeAsStream(storeOne), storeAsStream(storeThree));
pipeline(storeAsStream(storeOne), storeAsStream(storeThree));

storeOne.putState(nextState);

Expand Down Expand Up @@ -98,7 +101,11 @@ test('transform stream', function (t) {
storeTwo.once('update', nextValueCheck);
});

pipe(storeAsStream(storeOne), metaWrapperTransform, storeAsStream(storeTwo));
pipeline(
storeAsStream(storeOne),
metaWrapperTransform,
storeAsStream(storeTwo),
);

storeOne.putState(nextState);

Expand Down Expand Up @@ -135,9 +142,12 @@ test('basic - stream buffering', function (t) {

const itemsInStream = [];

const sink = writeStream.obj((value, _encoding, cb) => {
itemsInStream.push(value);
cb();
const sink = new WritableStream({
objectMode: true,
write: (value, _encoding, cb) => {
itemsInStream.push(value);
cb();
},
});

setTimeout(pipeStreams, TEST_WAIT);
Expand Down
8 changes: 6 additions & 2 deletions test/transform.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import test from 'tape';
import { pipe } from 'mississippi';
import { pipeline } from 'readable-stream';
import { ObservableStore, storeAsStream, storeTransformStream } from '../src';

test('storeTransformStream test', function (t) {
Expand All @@ -22,7 +22,11 @@ test('storeTransformStream test', function (t) {
storeTwo.once('update', nextValueCheck);
});

pipe(storeAsStream(storeOne), metaWrapperTransform, storeAsStream(storeTwo));
pipeline(
storeAsStream(storeOne),
metaWrapperTransform,
storeAsStream(storeTwo),
);

storeOne.putState(nextState);

Expand Down
Loading