Description
The problem
This is actually a big deal. Since the dawn of RxJS, as best I can tell, the ordering of teardown (unsubscription from source) and complete or error notifications has always been "notify complete/error first, then unsubscribe from source". TMK, I was never give a reason for this, it's just how it always was.
Fast forward 9-10 years, and we've run into a few issues, generally involving reentrancy. The issue presents itself in situations where a source synchronously emits back into itself before clean up of the source subscription can occur. Here are a few examples:
import { Subject, catchError, take } from 'rxjs';
const subject = new Subject<number>();
subject.pipe(
catchError((err) => {
console.log('saw an error!');
throw err;
}),
take(1)
)
.subscribe({
complete: () => {
subject.error(new Error('oops'));
}
});
subject.next(1);
// In RxJS@7 (latest) this logs
// "saw an error!"
We solved the above by unsubscribing before we emit from our operators. And we came up with the principle (I believe @cartant agreed with me on this), at least for operator development that "As soon as you know you can unsubscribe from a source, you should unsubscribe before doing anything else".
Unfortunately the problem is endemic. As we never patched this scenario:
import { Subject, tap, map } from 'rxjs';
const subject = new Subject<number>();
subject
.pipe(
tap((value) => console.log(`Side effect: ${value}`)),
map((n) => {
throw new Error('dang');
})
)
.subscribe({
error: (error) => {
console.log('handled error: ' + error?.message);
subject.next(1000);
},
});
subject.next(1);
// Logs:
"Side effect: 1"
"handled error: dang"
"Side effect: 1000"
And fixing that is going to get gnarly fast. It basically means that anyone that develops an operator that takes a user-land function would have to create a subscriber first, so it can capture that subscriber and use it to unsubscribe from the source before notifying of an error.
That's a real smell, and it got me thinking "What if I applied this principle everywhere? Including the Subscriber itself?"
Proposed solution:
Always unsubscribe before completing or erroring, but from within Subscriber.
Looking into it, it solves our problems everywhere when it comes to reentrancy with regards to complete and error notifications.
But it's a big deal, so I had to go back to the original principal from @headinthebox that Observable is the "dual" of iterable. I needed to see if a consumer could act on the information that an iterable was "done" before the producer finalized.
Digging into this in both JavaScript and DotNet showed that finalization of the producer within an Iterable (or Enumerator in DotNet's case) always occurred before the consumer had a chance to act on the information that iteration was complete (or errored):
function* iterable() {
try {
yield 1;
yield 2;
yield 3;
} finally {
console.log('finalized');
}
}
for (const value of iterable()) {
console.log(value);
}
console.log('consumer knows iteration is complete');
// logs
// 1
// 2
// 3
// finalized
// consumer knows iteration is complete
Even in the error case:
function* iterable() {
try {
yield 1;
throw new Error('oops');
} finally {
console.log('finalized');
}
}
try {
for (const value of iterable()) {
console.log(value);
}
} catch (error) {
console.log('consumer knows iteration has errored');
}
// logs
// 1
// finalized
// consumer knows iteration has errored
Even if you manually iterate:
function* iterable() {
try {
yield 1;
} finally {
console.log('finalized');
}
}
const iterator = iterable()[Symbol.iterator]();
console.log(iterator.next());
console.log(iterator.next());
// logs:
// {value: 1, done: false}
// finalized
// {value: undefined, done: true}
Even if we check out Promise, a contemporary of observable, it adheres to this behavior:
Promise.resolve(1)
.finally(() => console.log('finalized'))
.then(console.log);
// finalized
// 1
async function test() {
try {
return Promise.resolve(1);
} finally {
console.log('finalized');
}
}
test().then(console.log);
// finalized
// 1
Observable, Not so much:
import { of, finalize } from 'rxjs';
of(1)
.pipe(finalize(() => console.log('finalized')))
.subscribe(console.log);
// 1
// finalized
In every case for iterable, the dual of observable, the consumer can't possibly act on the information that the producer is complete/errored before the producer finalizes!
This means that, by principle, we should always be unsubscribing from the source prior to notification of complete or error. This would be a breaking change for RxJS.
Other thoughts:
This still doesn't solve for situations like take
, takeWhile
, and first
where we still want to unsubscribe from source prior to emitting a value. The only way to solve that would be to allow some sort of "next and complete" emission from the Observer, like an optional boolean on the next call... subscriber.next('somevalue', true)
or the like. But I'm unwilling to push for that change at this time, it's complicated, and I don't think that the API is user friendly. It's interesting, as a mental exercise, that there are actually 4 ways that observables can terminate in practice: completion, error, consumer unsubscribe, and "final value". It also points to a design flaw in JavaScript's single-step iteration design, in that you can't differentiate between { done: true, value: undefined }
being a "final value" that should be included in iteration, or just "complete", because JS has implicit "undefined" return values. Languages with classic designed iteration that take two calls to perform iteration cannot have this issue.