Skip to content

koheing/stream-executor

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

20 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

stream-executor

MIT License Main contributions welcome

  • functional stream programming library
  • This library is inspired by RxJS
  • This library is effective for
    • managing and reusing processes in actions in fine-grained
    • the processing in the action becomes complicated
    • asynchronous execution sequentially
npm i stream-executor

Usage

1. chain stream (like RxJS)

using stream-executor

import { createStream, map, which, filter, tap } from 'stream-executor'
let isSucceeded = false

const chainResult = createStream(1)
  .chain(
    map((it) => it * 10),
    which(
      (it) => it > 1,
      tap((it) => (isSucceeded = true)),
      tap((it) => console.log('not succeeded'))
    ),
    filter((it) => it >= 10)
  )
  .execute()

console.log(isSucceeded) // true
console.log(chainResult) // 10

Not using stream-executor

let isSucceeded = false

const initialValue = 1
let value = 0

if (value >= 0) {
  value = initialValue * 10
}

if (value > 1) {
  isSucceeded = true
} else {
  console.log('not succeeded')
}

if (value < 10) {
  return
}
const result = value

console.log(isSucceeded) // true
console.log(result)      // 10

2. batch stream (like switch without break)

using stream-executor

import { createStream, ifRight, which } from 'stream-executor'

const mammal = { no: 999, name: 'UNKNOWN', type: 'bird' }
let isLoading = true

createStream(mammal)
  .batch(
    (_) => (isLoading = true),
    which(
      ({ type }) => type === 'bird',
      (it) => calculateSize(it),
      (_) => console.log('Not Bird')
    ),
    ifRight(
      ({ type, name }) => type === 'bird' && name === 'UNKNOWN',
      (mammal) => registerDB(mammal)
    ),
    (_) => (isLoading = false),
    (_) => console.log('end')
  )
  .execute()

console.log(isLoading)    // false

not using stream-executor

let isLoading: boolean
const mammal = { no: 999, name: 'UNKNOWN', type: 'bird' }

isLoading = true

if (mammal.type === 'bird') {
  calculateSize(mammal)
} else {
  console.log('Not Bird')
}

if (mammal.type == 'bird' && mammal.name !== 'UNKNOWN') {
  console.log('maybe new species')
  registerDB(mammal)
}

isLoading = false

console.log('end')
console.log(isLoading)    // false

3. asyncChain stream

using stream-executor

import { createStream, tap, map } from 'stream-executor'
const result = await createStream(1)
  .asyncChain(
    tap(async (it) => console.log(await it)),             // 1
    map(async (it) => await callAPI(it)),    
    map(async (it) => parseToModel(await it))      // Record<string, any>
  )
  .execute()
console.log(result) // Record<string, any>

not using stream-executor

(async () => {
  let result
  const value = 1
  result = await callAPI(value)
  result = await parseToModel(result)
  console.log(result)
})()

Important

1. About createStream

  • The argument of createStream is not deep copied. use deepCopy method if you'd like to do deep copy, please.
import { createStream, tap, deepCopy } from 'stream-executor'
const input = { value: 1 }
const result = createStream(input)
  .chain(tap((it) => (it.value += 9)))
  .execute()

console.log(input) // { value: 10 }
console.log(result) // { value: 10 }

const input2 = { value: 1 }
const result2 = createStream(deepCopy(input2))
  .chain(tap((it) => (it.value += 9)))
  .execute()

console.log(input2) // { value: 1 }
console.log(result2) // { value: 10 }

2. About deepCopy

  • Getter and function in object are removed.
import { createStream, tap, deepCopy } from 'stream-executor'
class Wrapper<T> {
  value: T
  constructor(value: T) {
    this.value = value
  }
  get doubledValue() {
    return this.value * 2
  }
  hello() {
    console.log('world')
  }
}
const input = new Wrapper(1)
const result = createStream(deepCopy(input))
  .chain(tap((it) => (it.value += 9)))
  .execute()

console.log(input)  // Wrapper{ value: 1, doubledValue: 2, __proto__: { hello: () => console.log('world') } }
console.log(result) // { value: 10, __proto__: {} }

3. About createStream().chain():

  • Further process is not called if undefined returned
import { createStream, tap, filter, map } from 'stream-executor'
const result = createStream(1)
  .chain(
    tap((it) => console.log(it)), // 1
    filter((it) => it > 2),       // return undefined
    map((it) => it + 9)           // not called
  )
  .execute()
console.log(result) // undefined

4. Abount the arguments of execute()

  • Set the arguments of execute method if you'd like to customize error handling, please
let error: any
createStream(1)
  .batch(
    (it) => console.log(it),
    ..
  )
  .execute((err: any) => {
    console.error(error)
    error = err
  })

5. Replace chain or batch executor

  • Set option.chainClass or option.batchClass if you would change execution process, please
  • These Classes are initialized with initialValue as an argument
import { BaseExecutor, createStream } from 'stream-executor'
class MockChainExecutor implements BaseExecutor {
  constructor(public initialValue: any) {}
  stream(...args: any[]) {
    return this
  }
  execute() {
    console.log('MockChainExecutor called')
  }
}

class MockBatchExecutor implements BaseExecutor {
  constructor(public initialValue: any) {}
  stream(...args: any[]) {
    return this
  }
  execute() {
    console.log('MockBatchExecutor called')
  }
}

createStream(1, { chainClass: MockChainExecutor })
  .chain((it) => it)
  .execute() // 'MockChainExecutor called'

createStream(1, { batchClass: MockBatchExecutor })
  .batch((it) => it)
  .execute() // 'MockBatchExecutor called'

Utils

helper methods and those descriptions in createStream are

  1. map
  2. tap
  3. filter
  4. which
  5. ifRight
  6. asTypeOf
  7. asInstanceOf
  8. stop