Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x: different behavior of throttleLast() (compared to 1.x) #5516

Closed
wychi opened this issue Jul 25, 2017 · 4 comments
Closed

2.x: different behavior of throttleLast() (compared to 1.x) #5516

wychi opened this issue Jul 25, 2017 · 4 comments
Labels

Comments

@wychi
Copy link

wychi commented Jul 25, 2017

Hello there,

I am not sure if there is a bug or not, but I got different result when using throttleLast()

Here is my testing result. As you can see, there is a difference in handling onNext(5).
5 is emitted in 1.x but lost in 2.x

rxjava 1.x throttleLast()
emit onNext 0
emit onNext 0 end
progress 0
emit onNext 1
emit onNext 1 end
progress 1
emit onNext 2
emit onNext 2 end
progress 2
emit onNext 3
emit onNext 3 end
progress 3
emit onNext 4
emit onNext 4 end
progress 4
emit onNext 5
emit onNext 5 end
emit onComplete 
emit onComplete - end 
wait before terminate
progress 5
doOnComplete
emit onNext 6
emit onNext 6 end
emit onNext 7
emit onNext 7 end
emit onNext 8
emit onNext 8 end
emit onNext 9
emit onNext 9 end
emit onNext 10
emit onNext 10 end
emit onNext 11
emit onNext 11 end
emit onNext 12
emit onNext 12 end
emit onNext 13
emit onNext 13 end
emit onNext 14
emit onNext 14 end
rxjava 2.x throttleLast()
emit onNext 0
emit onNext 0 end
progress 0
emit onNext 1
emit onNext 1 end
progress 1
emit onNext 2
emit onNext 2 end
progress 2
emit onNext 3
emit onNext 3 end
progress 3
emit onNext 4
emit onNext 4 end
progress 4
emit onNext 5
emit onNext 5 end
emit onComplete 
emit onComplete - end 
doOnComplete
wait before terminate
emit onNext 6
emit onNext 6 end
emit onNext 7
emit onNext 7 end
emit onNext 8
emit onNext 8 end
emit onNext 9
emit onNext 9 end
emit onNext 10
emit onNext 10 end
emit onNext 11
emit onNext 11 end
emit onNext 12
emit onNext 12 end
emit onNext 13
emit onNext 13 end
emit onNext 14
emit onNext 14 end

Here is my testing code

    compile "io.reactivex.rxjava2:rxjava:2.1.1"
    compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
    compile 'io.reactivex:rxjava:1.3.0'
package allstar.wychi.allstardemo;


import org.junit.Test;

import java.util.concurrent.TimeUnit;

import io.reactivex.functions.Action;
import io.reactivex.functions.Consumer;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.subjects.PublishSubject;

import static org.junit.Assert.assertEquals;

public class ExampleUnitTest {

    @Test
    public void test1x() {
        final rx.subjects.PublishSubject<Integer> subject = rx.subjects.PublishSubject.create();

        System.out.println("rxjava 1.x throttleLast()");
        subject
                .throttleLast(60, TimeUnit.MILLISECONDS)
                .observeOn(rx.schedulers.Schedulers.io())
                .doOnNext(new rx.functions.Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        System.out.println("progress " + integer);
                        try {
                            Thread.sleep(16);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                })
                .doOnError(new rx.functions.Action1<Throwable>() {
                    @Override
                    public void call(Throwable throwable) {
                        System.out.println("doOnError " + throwable.getMessage());
                    }
                })
                .doOnCompleted(new rx.functions.Action0() {
                    @Override
                    public void call() {
                        System.out.println("doOnComplete");
                    }
                })
                .subscribe();

        new Thread(new Runnable() {
            @Override
            public void run() {
                for(int i=0; i<100; i++) {

                    System.out.println("emit onNext " + i);
                    subject.onNext(i);
                    System.out.println("emit onNext " + i + " end");
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }

            }
        }).start();

        try {
            Thread.sleep(520);
            System.out.println("emit onComplete ");
            subject.onCompleted();
            System.out.println("emit onComplete - end ");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        try {
            System.out.println("wait before terminate");
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Test
    public void test2x() {
        System.out.println("rxjava 2.x throttleLast()");

        final PublishSubject<Integer> subject = PublishSubject.create();

        subject
                .throttleLast(60, TimeUnit.MILLISECONDS)
                .observeOn(Schedulers.io())
                .doOnNext(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        System.out.println("progress " + integer);
                        try {
                            Thread.sleep(16);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                })
                .doOnError(new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Exception {
                        System.out.println("doOnError " + throwable.getMessage());
                    }
                })
                .doOnComplete(new Action() {
                    @Override
                    public void run() throws Exception {
                        System.out.println("doOnComplete");
                    }
                })
                .subscribe();

        new Thread(new Runnable() {
            @Override
            public void run() {
                for(int i=0; i<100; i++) {

                    System.out.println("emit onNext " + i);
                    subject.onNext(i);
                    System.out.println("emit onNext " + i + " end");
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }

            }
        }).start();

        try {
            Thread.sleep(520);
            System.out.println("emit onComplete ");
            subject.onComplete();
            System.out.println("emit onComplete - end ");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        try {
            System.out.println("wait before terminate");
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

Just want to know which behavior is expected.
Thanks

@akarnokd akarnokd added the 2.x label Jul 25, 2017
@akarnokd
Copy link
Member

ThrottleLast is practically the same as sample which in 2.x by default doesn't emit the last throttled item when the upstream completes. There is, however, a sample overload that let's you configure that.

@wychi
Copy link
Author

wychi commented Jul 25, 2017

@akarnokd thanks for your answer.

According to that statement, the behavior in 2.x is expected.
Is my understanding correct?

But I am still wondering why I get different result in testing code.
Is it just something wrong in testing code? Or there is actual implementation different in 1.x?

Thanks

@akarnokd
Copy link
Member

Yes, this is the expected behavior.

But I am still wondering why I get different result in testing code.

I've already told you. 2.x doesn't emit the last throttled item when the upstream completes (the underlying sample operator was fixed in #4955) and 1.x does emit the very last (#3757).

@wychi
Copy link
Author

wychi commented Jul 25, 2017

Got it. Thank you a lot. Now I know the history behind this.

@wychi wychi closed this as completed Jul 25, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants