Skip to content

Commit

Permalink
Rewrite logic
Browse files Browse the repository at this point in the history
* Support functions returning functions
* Support functions returning promises resolving into functions or streams
  • Loading branch information
Divjot Singh committed Nov 28, 2017
1 parent b557ca0 commit 07c7101
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 91 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,5 +86,7 @@ Currently we support following parts:
* [x] returning promises
* [x] returning streams

**Note that a function can return a function, or a promsie can resovle to a function/promise. Basically the definition of dynami parts is recursive, ultimately evaluating to a string or stream. Be careful!**

## License
MIT
2 changes: 1 addition & 1 deletion benchmark/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
"license": "ISC",
"dependencies": {
"express": "^4.16.2",
"marinate": "0.0.2",
"marinate": "^0.1.0",
"react": "^16.1.1",
"react-dom": "^16.1.1"
}
Expand Down
2 changes: 1 addition & 1 deletion example/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
"license": "MIT",
"dependencies": {
"express": "^4.16.2",
"marinate": "^0.0.2",
"marinate": "^0.1.0",
"react": "^16.1.1",
"react-dom": "^16.1.1"
}
Expand Down
136 changes: 75 additions & 61 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
const { Stream } = require('stream');
const { Stream, Duplex } = require('stream')

const log = (debug, mainItem, ...verboseItems) => debug !== 'off' &&
console.log('[marinate] ', mainItem, ...(debug === 'verbose' ? verboseItems : []))
const log = (debug, mainItem, ...verboseItems) => debug !== 'off' && console.log('[marinate] ', mainItem, ...(debug === 'verbose' ? verboseItems : []))

const typeOf = obj => {
if (obj instanceof Stream) {
Expand All @@ -20,76 +19,91 @@ const typeOf = obj => {

const write = stream => value => new Promise(resolve => stream.write(value, resolve))

const bufferToReadableStream = buffer => {
const stream = new Duplex()
stream.push(buffer)
stream.push(null)
return stream
}

const stringToReadableStream = string => bufferToReadableStream(Buffer.from(string))

const resolveAfterPipeAndEnd = (streamToPipe, streamToPipeInto, options) => new Promise((resolve, reject) => {
streamToPipe.pipe(streamToPipeInto, options)
streamToPipe.on('end', resolve)
})

// debug - off|basic|verbose
module.exports = (staticParts, ...dynamicParts) => async (streamToWriteInto, debug = 'basic') => {
const handlePart = async ({ part, streamToWriteInto, options }) => {
const { debug, stringToBufferThreshold } = options

const writeToStream = write(streamToWriteInto)

// for..of will iterate only when internal `await`s are resolved.
for (let [index, staticPart] of staticParts.entries()) {
log(debug, 'Rendering static part', staticPart)
await writeToStream(staticPart)
switch (typeOf(part)) {
case 'function': {
await handlePart({ part: part(), streamToWriteInto, options })
break
}
case 'promise': {
await handlePart({ part: await part, streamToWriteInto, options })
break
}
case 'string': {
if (part.length >= stringToBufferThreshold) {
log(debug, `Rendering string part as stream`, part)
await resolveAfterPipeAndEnd(stringToReadableStream(part), streamToWriteInto, { end: false })
log(debug, `Rendered string part as stream`)
} else {
log(debug, 'Rendering string part', part)
await writeToStream(part)
}
break
}
case 'stream': {
const time = Date.now()
log(debug, `Rendering stream part [${time}]`)
await resolveAfterPipeAndEnd(part, streamToWriteInto, { end: false })
log(debug, `Rendered stream part [${time}] in ${Date.now() - time}ms`)
break
}
default: {
console.log({ part })
throw new Error(`Unknown type [${typeOf(part)}] of above part`);
}
}
}

const defaultOptions = {
debug: 'off', // "debug": 'off' | 'basic' | 'verbose'
stringToBufferThreshold: Infinity, // "stringToBufferThreshold": min string.length before converting it into a Buffer
};

module.exports = (staticParts, ...dynamicParts) => async (streamToWriteInto, options = defaultOptions) => {
const { debug, stringToBufferThreshold } = options;

// if staticPart is last element, we just write it and end the stream
if (index === staticParts.length - 1) {
for (let [index, staticPart] of staticParts.entries()) { // for..of will iterate only when internal `await`s are resolved.
await handlePart({
part: staticPart, // take this string and
streamToWriteInto, // directly write into the stream or pipe it
options, // based on the options
});

if (index === staticParts.length - 1) { // if staticPart is last element, we just end the stream
log(debug, 'Finished Marinating')
streamToWriteInto.end()
}
// for intermediate staticParts, we need to plug in the dynamic parts before streaming the next staticPart
else {

} else { // for intermediate staticParts, we need to plug in the dynamic parts before streaming the next staticPart
const dynamicPart = dynamicParts[index]
const typeOfDynamicPart = typeOf(dynamicPart)

if (['string', 'function'].includes(typeOfDynamicPart)) {
await handlePart({
part: dynamicPart, // take this dynamic part (string | function returning (string | promise | stream))
streamToWriteInto, // write it into stream
options, // based on options
})

switch (typeOf(dynamicPart)) {
case 'string': {
// dynamicPart is just a string, so we simply write it.
log(debug, 'Rendering string part', dynamicPart)
await writeToStream(dynamicPart)
break
}
case 'function': {
// dynamic part is a function
log(debug, `Rendering function part`)

// we get the return value of function
const result = dynamicPart()

switch (typeOf(result)) {
case 'stream': {
// dynamicPart is a stream, we pipe and end before iterating
const time = Date.now()
log(debug, `Rendering stream part [${time}]`)
await resolveAfterPipeAndEnd(result, streamToWriteInto, { end: false })
log(debug, `Rendered stream part [${time}] in ${Date.now() - time}ms`)
break
}
case 'string': {
await writeToStream(result)
log(debug, `Rendered function part as string`, result)
break
}
case 'promise': {
try {
const time = Date.now()
log(debug, `Awaiting function part's promise [${time}]`)
// dynamicPart is a promise, we resolve it before iterating
const value = await result
if (typeof value === 'string') {
await writeToStream(value)
log(debug, `Resolved function part as string in ${Date.now() - time}ms`, value)
} else {
log('verbose', "Error! Promise doesn't resolve to a string. Resolved value is: ", { value })
}
} catch (err) {
log('verbose', "Error! Promise reject", { err })
}
}
}
break
}
} else {
throw new Error(`\${dynamicParts} can only be of type (string | function returning (string | promise | stream)). Found type ${typeOfDynamicPart}`)
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "marinate",
"version": "0.0.2",
"version": "0.1.0",
"description": "A library to pipe a template string to a stream, \"marinating\" strings, functions, promises and streams together",
"main": "index.js",
"scripts": {
Expand All @@ -16,4 +16,4 @@
"react": "^16.1.1",
"react-dom": "^16.1.1"
}
}
}
62 changes: 36 additions & 26 deletions test.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,51 +4,61 @@ const marinate = require('./index');
const outputs = [`
hello
`,
'123', `
'123', `
`,
'456', `
'456', `
`,
'789', `
'789', `
`,
'Hello world',`
`
'Hello world', `
`,
'YOLO',`
`,
];
let outputPointer = 0;

const file$ = new Readable({
const stringToReadableStream = str => {
const file$ = new Readable({
read(size) {
if (!this.content) this.push(null)
else {
this.push(this.content.slice(0, size))
this.content = this.content.slice(size)
}
if (!this.content) this.push(null)
else {
this.push(this.content.slice(0, size))
this.content = this.content.slice(size)
}
}
});
});

file$.push(Buffer.from('Hello world', 'ascii'));
file$.push(Buffer.from(str, 'ascii'));
return file$;
}

const res = new Writable({
const createRes = () => {
let outputPointer = 0;
return new Writable({
decodeStrings: false,
write(actual, encoding, next) {
const expected = outputs[outputPointer++]
const expected = outputs[outputPointer++]

if (actual instanceof Buffer) actual = actual.toString()
if (actual instanceof Buffer) actual = actual.toString()

try {
console.assert(actual === expected)
} catch (err) {
console.log('Test failed.', { actual, expected })
}
next()
try {
console.assert(actual === expected)
} catch (err) {
console.log('Test failed.', { actual, expected })
}
next()
}
})
})
}

const template = marinate`
hello
${'123'}
${() => '456'}
${() => new Promise(r => setTimeout(r, 2000, '789'))}
${() => file$}
${() => stringToReadableStream('Hello world')}
${() => new Promise(r => setTimeout(r, 2000, stringToReadableStream('YOLO')))}
`

template(res, 'off')
template(createRes(), { debug: 'off' })
template(createRes(), { debug: 'off', stringToBufferThreshold: 0 })
template(createRes(), { debug: 'off', stringToBufferThreshold: 5 })

0 comments on commit 07c7101

Please sign in to comment.