Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stack overflows caused by MergeLimitedSink operators #2615

Closed
4 of 12 tasks
geoffmacd opened this issue Jul 26, 2024 · 9 comments · May be fixed by #2616
Closed
4 of 12 tasks

Stack overflows caused by MergeLimitedSink operators #2615

geoffmacd opened this issue Jul 26, 2024 · 9 comments · May be fixed by #2616

Comments

@geoffmacd
Copy link

Short description of the issue:

At Dropbox, we use RxSwift heavily in a serial queue that must be FIFO to process something that requires us to wait sometimes (for reasons im not going to go into). We have 1 main observable that represents our input, and uses concatMap in conjunction with a .just() and .delay() to achieve this. The delay period, which is rare, is < 3 seconds. The input is generally all at once (10,000s of elements in short period of time). Since at least 2021, our top crash has been a stack overflow in Rx code we've never been able to address. This crash affects a minority of users on launch and is rarely reproduced until now...

We found reproduction case (see sample code) that can cause stack overflows when using standard RxSwift concatMap() operator (or concatMap/ merge(maxConcurrent: in Merge.swift) when in combination with randomly delayed sequences. This example function will cause super deep stack traces (or cause a stack overflow crash directly if you are lucky). It seems to be important that we do not exclusively .delay or not, only that there is a random mix of delayed and not delayed "just" elements.

If you run this code, if a crash doesn't happen, you can at least see a super deep stack size inside MergeLimitedSinkIter.on with the Thread API (just print Thread.callStackReturnAddresses.count). This is the source of the S/O crash we are experiencing for some users.

It is a concurrency issue where a .just() emitting immediately on the current queue seems to mess up all internal uses of MergeLimitedSinkIter (which is concat/concatMap/merge(maxConcurrnet:)).

Expected outcome:

The above code sample should be protected against stack overflows by intelligently scheduling the next inner subscribe.

What actually happens:

Stack over flow that looks like this:

Pasted Graphic 1

Self contained code example that reproduces the issue:

func generateStackOverflow() {
        print("starting rx concatMap/just subscribe")

        let scheduler = SerialDispatchQueueScheduler(qos: .userInteractive, internalSerialQueueName: "1")
        // stack overflow
        Observable.from(Array(repeating: 1, count: 100_000))
            .observe(on: scheduler)
            .concatMap {
                // produces super large stack traces when mixing ConcatMap+Just+Delay
                if Int.random(in: 0 ... 100) != 0 {
                    return Single.just($0)
                } else {
                    return Single.just($0).delay(.nanoseconds(Int.random(in: 1 ... 7)), scheduler: scheduler)
                }
            }
            .subscribe(onCompleted: {
                print("finished rx concatMap")
            })
            .disposed(by: disposeBag)
    }

RxSwift/RxCocoa/RxBlocking/RxTest version/commit

we are on 6.6.0 but this has not been addressed or even noted.

Platform/Environment

  • iOS
  • macOS
  • tvOS
  • watchOS
  • playgrounds

How easy is to reproduce? (chances of successful reproduce after running the self contained code)

  • easy, 100% repro
  • sometimes, 10%-100%
  • hard, 2% - 10%
  • extremely hard, %0 - 2%

Xcode version:

15.4

Level of RxSwift knowledge:
(this is so we can understand your level of knowledge
and formulate the response in an appropriate manner)

  • just starting
  • I have a small code base
  • I have a significant code base
@geoffmacd geoffmacd changed the title Stack overflows caused by MergeLimitedSInk operators Stack overflows caused by MergeLimitedSink operators Jul 26, 2024
@danielt1263
Copy link
Collaborator

The simplest solution is to add .delay(.seconds(0), scheduler: scheduler) to the just... Yes?

@geoffmacd
Copy link
Author

To both sides of the if statement? Yes- I suppose that would work - but it was a surprise that using the API like this could cause this in the first place. Its weird to have to know to balance it like this.

@danielt1263
Copy link
Collaborator

Very weird indeed. Especially given that if the body of the closure is only { Single.just($0) } or if it's only { Single.just($0).delay(.nanoseconds(Int.random(in: 1 ... 7)), scheduler: scheduler) }, then the call stack is fine.

@geoffmacd
Copy link
Author

Yes, my point is that devs should feel free to pass any given Observable back to concatMap() and not fear stack overflows. I believe my fix should protect against that.

@danielt1263
Copy link
Collaborator

What is the performance hit? I see if I use: return Observable.just($0).delay(.nanoseconds(0), scheduler: scheduler) in the if block it takes 14 seconds to run through the 100_000 elements whereas if I use return Observable.generate(initialState: $0, condition: { _ in false }, iterate: { $0 }) it only takes 6 seconds. (Both have a stack size of 31)

@danielt1263
Copy link
Collaborator

I'm seeing that if I take your fix and use your initial code, the stack size goes up to 42 but the runtime is only 3 seconds... Looks like a good fix to me (I assume all the tests that use MergeLimitedSinkIter still pass?

@geoffmacd
Copy link
Author

yes tests pass both in this repo (they didn't auto run in CI though) and like 3000 unit tests that heavily rely on this func in our own codebase.

When I tested with this function with device -iPhone XS:

        let t1 = Date()
        let scheduler = SerialDispatchQueueScheduler(qos: .userInteractive, internalSerialQueueName: "1")
        // stack overflow
        Observable.from(Array(repeating: 1, count: 100_000))
            .observe(on: scheduler)
            .concatMap {
                return Single.just($0).delay(.nanoseconds(0), scheduler: scheduler)
            }
            .subscribe(
                onCompleted: {
                    print("finished rx concatMap in \(Date().timeIntervalSince(t1)) s")
                }
            )
            .disposed(by: disposeBag)

I got 6.6 seconds runtime with my fix and about the same on master.

@geoffmacd
Copy link
Author

I found an even simpler repro function for this issue, only the first inner subscription needs to be a delay (or anything that queues), then the DelaySink will emit all elements (Justs) at once.

    func generateStackOverflow() {
        print("starting rx concatMap + just + delay")

        let scheduler = SerialDispatchQueueScheduler(qos: .userInteractive, internalSerialQueueName: "1")
        // stack overflow
        Observable.from(Array(0 ..< 100_000))
            .observe(on: scheduler)
            .concatMap {
                // produces super large stack traces when mixing ConcatMap+Just+Delay
                if $0 == 0 {
                    // only on the first element is it delayed, the `DelaySink` will then emit all elements emitted from during this delay
                    return Single.just($0).delay(.seconds(1), scheduler: scheduler)
                } else {
                    return Single.just($0)
                }
            }
            .subscribe(
                onCompleted: {
                    print("finished rx concatMap + just + delay")
                }
            )
            .disposed(by: disposeBag)
    }

@freak4pc
Copy link
Member

freak4pc commented Oct 3, 2024

Let's move the discussion over to #2616

@freak4pc freak4pc closed this as completed Oct 3, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants