-
-
Notifications
You must be signed in to change notification settings - Fork 137
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Out of order events with imitate #239
Comments
This is run with cycle.js, if that makes any difference. |
Hi! Thanks for reporting. Just to be sure, which version of xstream and cycle/run? |
Also. I just found a workaround by using |
const xs = require('xstream').default;
const START_STATE = {
ver: 0, // counts up to see out of order
a: false, // we toggle this on a timer
b: false, // we toggle this on a different timer
c: false, // we derive this from a and b
};
function loop() {
// imitate cycle
const update$ = xs.create();
// the folded state, with ever increasing version number
const state$ = update$.
fold((p, c) => ({...p, ver: p.ver + 1, ...c}), START_STATE)
.debug('state');
// the timer updates
const aUpdate$ = xs.periodic(500).map(n => ({a: n % 2 === 0}));
const bUpdate$ = xs.periodic(100).map(n => ({b: n % 3 === 0}));
// derived update from a/b
const derivedUpdate$ = state$
.filter(state => state.c !== state.a && state.b)
.map(state => ({c: state.a && state.b}));
// the proxy that sees the error
const proxy$ = xs.from(state$).debug('proxy');
// merge the update streams
const realUpdate$ = xs.merge(
aUpdate$,
bUpdate$,
derivedUpdate$
);
// and cycle back to top
update$.imitate(realUpdate$);
return {
state$,
proxy$,
};
}
const {state$, proxy$} = loop();
state$.addListener({listener: next => {}});
proxy$.addListener({listener: next => {}}); @staltz here's a test case that reproduces the problem every time for me. It might be a bit contrived, i.e. there may be a more succinct way to construct a test case. I see the problem in the proxy at
|
Wrote it as a test case too. https://gist.github.com/lolgesten/318dd558786d6da3d56338c123bdf3c7 |
Looking at xstream code I think I got it wrong that this has to do with observables.
I think that means we get two |
Maybe this isn't a bug. It's just I need to be aware of that shortcut. With that in mind, the fix is: // merge the update streams
const realUpdate$ = xs.merge(
aUpdate$,
bUpdate$,
derivedUpdate$
).compose(delay(1)); // delay update to allow all observers of state$ to see value |
@lolgesten A delay will fix it, but the issue is still real. I think |
Ah ok! I agree that it'd be nicer if I didn't have to lose the synchronicity (is that a word? :) ) |
Here's the smallest possible repro I've come up with. const imitNumber$ = xs.create<number>();
const numberAdd$ = imitNumber$.filter((n) => n < 100).map((n) => n + 100);
const source$ = xs.create<number>();
const realNumber$ = xs.merge(source$, numberAdd$);
imitNumber$.imitate(realNumber$);
realNumber$.subscribe({next: (n) => console.log('real', n)});
imitNumber$.subscribe({next: (n) => console.log('imit', n)});
source$.shamefullySendNext(1); The output is:
So what we see is that the The problem occurs because Our fix to this problem is to patch The important bit is that even if export const patchImitate = () => {
Stream.prototype.imitate = function (source$: any) {
return imitate(source$, this);
};
};
const imitate = <T>(source$: Stream<T>, target$: Stream<T>) => {
const queue: T[] = [];
let running = false;
source$.subscribe({
next: (v) => {
queue.push(v);
if (running) {
return;
}
running = true;
while (queue.length) {
const next = queue.shift();
target$.shamefullySendNext(next!);
}
running = false;
},
error: (e) => {
target$.shamefullySendError(e);
},
complete: () => {
target$.shamefullySendComplete();
},
});
}; |
I'm observing out of order events. First some pseudo to explain what I got.
My real scenario is more complicated with lots of state evaluation, so it's entirely possible that the problem is introduced somewhere else.
I observe the correct state order on
state
, but theproxy
sees another.config
merged into state.config
state$
we spot stateIDLE
andconfig
, produce updateINITED
appState
proxy
sees the events out of order. :(The text was updated successfully, but these errors were encountered: