Skip to content

concatAll() unexpectedly overlaps inner Observable subscriptions #3338

Closed
@mattflix

Description

EDIT: Note that the following repro code always produces the expected output with RxJS 4.x. The output with RxJS 5.x is unintuitive (IMO) and can actually vary depending on the formulation of the inner observables (see further discussion of such cases in subsequent comments).


RxJS version: 5.5.0 (and all previous 5.x versions, seemingly)

Code to reproduce:

const { Observable } = require(`rxjs`);

function Task(name) {
  return (
    Observable
      .defer(() => {
        console.log(`begin task ${ name }`);
        return (
          Observable
            .of(`simulated work`)
            .delay(100)
            .finally(() => {
              console.log(`end task ${ name }`);
            })
          );
      })
  );
}

Observable
  .of(
    Task(`1`),
    Task(`2`),
    Task(`3`)
  )
  .concatAll()
  .subscribe();

Expected behavior:

begin task 1
end task 1
begin task 2
end task 2
begin task 3
end task 3

Actual behavior:

begin task 1
begin task 2
end task 1
begin task 3
end task 2
end task 3

Additional information:
I discovered this behavior when using concatAll() as the basis for a serialized "task queue" for executing tasks (represented as Observables) one at a time, much like my (simplified) sample code above. Some of these tasks would allocate resources in defer() (or similar manner) and release them in finally() (or similar manner).

My task queue implementation all seemed to make logical sense, yet I was running into errors caused by concurrent access to resources that were only supposed to be accessed in a mutually-exclusive manner, but somehow weren't. I was stumped, I couldn't see the the problem in my task queue or my tasks that was leading to this unexpected concurrent access.

After much head-scratching and debugging, I eventually tracked the problem to the InnerSubscriber implementation within rxjs, which contains logic that subscribes to the next sequence before unsubscribing from the previous sequence:

    InnerSubscriber.prototype._error = function (error) {
        this.parent.notifyError(error, this); // <== subscribes to the next sequence
        this.unsubscribe();
    };
    InnerSubscriber.prototype._complete = function () {
        this.parent.notifyComplete(this); // <== subscribes to the next sequence
        this.unsubscribe();
    };

Ah-ha.

This causes the "creation" logic (defer() callback or other invocation) of the next sequence to execute before the "disposal" logic (finally() or other invocation) of the previous sequence has executed, making it impossible for the inner observables managed by concatAll() to behave like serialized tasks -- since rxjs actually "overlaps" the tasks at this critical moment.

As an experiment, I simply reversed the logic:

    InnerSubscriber.prototype._error = function (error) {
        this.unsubscribe(); // <==  unsubscribe from previous sequence first!
        this.parent.notifyError(error, this);
    };
    InnerSubscriber.prototype._complete = function () {
        this.unsubscribe(); // <== unsubscribe from previous sequence first!
        this.parent.notifyComplete(this);
    };

Then, using the original "Code to reproduce" above, I actually see the expected behavior.

I don't know, however, what other ramifications this change might have on other use cases. In the case of concatAll() (and variants) however, it would definitely seem "more correct" and "less surprising" for rxjs to always unsubscribe from the previous sequence before subscribing to the next.

Activity

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions