Skip to content

[BUG] buffer_with_time_or_count loses data #703

@laurens-teirlynck

Description

@laurens-teirlynck

Describe the bug
buffer_with_time_or_count loses data when the timer is the trigger and the on_next releases the GIL.

Related issue: #702, but this issue can get solved by using a scheduler since the on_next method does not release the GIL.

To Reproduce
If have a micro-service that reads data from an external source, buffers it, and aggregates and sends it to some other external service. I have added a dummy snippet that is similar to my micro-service, but has the same issue.

The issue only occurs if the timer is triggering the on_next call.

import time
from datetime import timedelta, datetime
from random import random

import reactivex
from reactivex import operators
from reactivex.scheduler import ThreadPoolScheduler


def main():
    scheduler = ThreadPoolScheduler(max_workers=4)

    reactivex.from_iterable(iterable()).pipe(
        operators.buffer_with_time_or_count(
            # set the timespan to only 2 seconds so that the timer triggers the on_next
            timespan=timedelta(seconds=2),
            count=10000,
        ),
    ).subscribe(
        on_next=on_next,
        on_error=print,
        on_completed=print,
        scheduler=scheduler,
    )

    time.sleep(1000)


def iterable():
    i = 0
    while True:
        yield i
        time.sleep(1 / 10)  # input network delay
        i += 1


def on_next(data):
    print(datetime.utcnow(), data[0], data[-1])
    time.sleep(random() * 5)  # mock output network delay


if __name__ == '__main__':
    main()

Script output
Running the script as is gives something along the lines of:

2023-10-09 08:29:16.817278 0 29
2023-10-09 08:29:24.663460 76 104
2023-10-09 08:29:29.889651 127 155
2023-10-09 08:29:36.256481 188 216
2023-10-09 08:29:43.130694 254 282
2023-10-09 08:29:47.561785 297 325
2023-10-09 08:29:55.475441 374 402

However, if I replace

operators.buffer_with_time_or_count(
            timespan=timedelta(seconds=3),  # mostly timer triggered
            count=10000,
        ),

with

operators.buffer_with_time_or_count(
            timespan=timedelta(minutes=30),
            count=100,  # count triggered
        ),

the output now looks like:

2023-10-09 08:32:29.754270 0 99
2023-10-09 08:32:41.434829 100 199
2023-10-09 08:32:56.777624 200 299
2023-10-09 08:33:10.419743 300 399
2023-10-09 08:33:24.681490 400 499
2023-10-09 08:33:35.890146 500 599

Expected behavior
buffer_with_time_or_count not losing any data. It did not matter which scheduler I used (or none at all).

Additional context

  • OS: Linux/MacOS
  • RxPY: 4.0.0
  • Python: 3.10.0/3.11.0

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions