Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Help to build control flow func #193

Open
eric-basley opened this issue Feb 16, 2019 · 3 comments
Open

Help to build control flow func #193

eric-basley opened this issue Feb 16, 2019 · 3 comments

Comments

@eric-basley
Copy link

I can't find a way to build a control flow system to pop values from a stream until a concurrency limit.

const urls = reduce((stream, url) => stream(url), flyd.stream(), R.times(makeUrl(), 1000));
const results = throughLimit(requestUrl, urls, { maxConcurrency });
// only maxConcurrency requests can be pending at a time
// result from requestUrl() are pushed one by one in results

Thanks for your help.

Eric.

@eric-basley
Copy link
Author

I've made this implementation, but behavior of count() is very weird !!!

const throughLimit= (fn, stream, maxConcurrency) => {
const running = flyd.stream(0);
const cache = [];
return flyd.combine((s, count, self, changed) => {
if(count() < maxConcurrency) {
count(count() + 1);
const value = includes(s, changed) ? s() : cache.pop();
if(value) fn(value, res => { count(count() - 1); self(res) });
}else if (includes(s, changed)){
cache.unshift(s());
}
}, [stream, running]);
};

Can we update a dependant stream inside a combine() body ?

@eric-basley
Copy link
Author

let's forget the first buggy implementation!
here's a first version, with a mix of stream and recursive callbacks.

it's called as: const resStream = throughLimit(makeRequest, ulrsStream, 10)

Does anyone have a better idea ?

const throughLimit= (fn, stream, maxConcurrency) => {
let running = 0;
const cache = [];
const callFn = (value, cb) => {
fn(value, res => {
cb(res);
if(cache.length) {
const value = cache.pop();
running++;
callFn(value, cb);
}
});
};
return flyd.combine((s, self, changed) => {
if(running < maxConcurrency) {
running++;
callFn(s(), res => {
running--;
self(res);
});
}else{
cache.unshift(s());
}
}, [stream]);
};

@nordfjord
Copy link
Collaborator

You could use ramda to help you there

Maybe something like this could do the trick for you?

const urlStream = flyd.stream() // Stream string

const joinedUrlStream = urlStream
  .pipe(flyd.scan(R.flip(R.append), [])) // Stream [string]

const requestStream = joinedUrlStream
  .map(R.splitEvery(1000)) // Stream [[string]]
  .map(async splitUrls => {
    let result = []
    for (const urls of splitUrls) {
      result = result.concat(await Promise.all(urls.map(makeRequest)))
    }
    return result
  }) // Stream Promise [Result]

const responseStream = requestStream
  .chain(flyd.fromPromise) // Stream [Result]

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants