Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: multicast playground #111

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ I've also been giving talks about Learning Rx using many of the examples listed
14. [Pagination with Rx (using Subjects)](#14-pagination-with-rx-using-subjects)
15. [Orchestrating Observable: make parallel network calls, then combine the result into a single data point (using flatmap & zip)](#15-orchestrating-observable-make-parallel-network-calls-then-combine-the-result-into-a-single-data-point-using-flatmap--zip)
16. [Simple Timeout example (using timeout)](#16-simple-timeout-example-using-timeout)
17. [Setup and teardown resources (using `using`)](#17-setup-and-teardown-resources-using-using)
18. [Multicast playground](#18-multicast-playground)

## Description

Expand Down Expand Up @@ -161,7 +163,7 @@ Cases demonstrated here:
4. run a task constantly every 3s, but after running it 5 times, terminate automatically
5. run a task A, pause for sometime, then execute Task B, then terminate

### 11. RxBus : event bus using RxJava (using RxRelay (never terminating Subjects) and debouncedBuffer)
### 11. RxBus : event bus using RxJava (using RxRelay (never terminating Subjects) and debouncedBuffer)

There are accompanying blog posts that do a much better job of explaining the details on this demo:

Expand Down Expand Up @@ -222,6 +224,20 @@ This is a simple example demonstrating the use of the `.timeout` operator. Butto

Notice how we can provide a custom Observable that indicates how to react under a timeout Exception.

### 17. Setup and teardown resources (using `using`)

The [operator `using`](http://reactivex.io/documentation/operators/using.html) is relatively less known and notoriously hard to Google. It's a beautiful API that helps to setup a (costly) resource, use it and then dispose off in a clean way.

The nice thing about this operator is that it provides a mechansim to use potentially costly resources in a tightly scoped manner. using -> setup, use and dispose. Think DB connections (like Realm instances), socket connections, thread locks etc.

### 18. Multicast Playground

Multicasting in Rx is like a dark art. Not too many folks know how to pull it off without concern. This example condiers two subscribers (in the forms of buttons) and allows you to add/remove subscribers at different points of time and see how the different operators behave under those circumstances.

The source observale is a timer (`interval`) observable and the reason this was chosen was to intentionally pick a non-terminating observable, so you can test/confirm if your multicast experiment will leak.

_I also gave a talk about [Multicasting in detail at 360|Andev](https://speakerdeck.com/kaushikgopal/rx-by-example-volume-3-the-multicast-edition). If you have the inclination and time, I highly suggest watching that talk first (specifically the Multicast operator permutation segment) and then messing around with the example here._

## Rx 2.x

All the examples here have been migrated to use RxJava 2.X.
Expand Down
6 changes: 5 additions & 1 deletion app/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@ apply plugin: 'com.f2prateek.javafmt'
apply plugin: 'kotlin-android'

dependencies {
compile 'com.android.support:multidex:1.0.1'
compile "com.android.support:support-v13:${supportLibVersion}"
compile "com.android.support:appcompat-v7:${supportLibVersion}"
compile "com.android.support:recyclerview-v7:${supportLibVersion}"

compile 'com.github.kaushikgopal:CoreTextUtils:c703fa12b6'
compile "com.jakewharton:butterknife:${butterKnifeVersion}"
annotationProcessor "com.jakewharton:butterknife-compiler:${butterKnifeVersion}"
kapt "com.jakewharton:butterknife-compiler:${butterKnifeVersion}"
compile 'com.jakewharton.timber:timber:4.5.1'
compile "com.squareup.retrofit2:retrofit:${retrofitVersion}"
compile "com.squareup.retrofit2:converter-gson:${retrofitVersion}"
Expand All @@ -44,8 +45,10 @@ dependencies {
// explicitly depend on RxJava's latest version for bug fixes and new features.
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'

compile 'com.jakewharton.rx:replaying-share-kotlin:2.0.0'
compile "com.github.akarnokd:rxjava2-extensions:0.16.0"
compile 'com.jakewharton.rxrelay2:rxrelay:2.0.0'

compile 'com.jakewharton.rxbinding2:rxbinding:2.0.0'
compile 'com.jakewharton.retrofit:retrofit2-rxjava2-adapter:1.0.0'

Expand All @@ -65,6 +68,7 @@ android {
targetSdkVersion sdkVersion
versionCode 2
versionName "1.2"
multiDexEnabled true
}
buildTypes {
release {
Expand Down
4 changes: 2 additions & 2 deletions app/src/main/java/com/morihacky/android/rxjava/MyApp.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package com.morihacky.android.rxjava;

import android.app.Application;
import android.support.multidex.MultiDexApplication;
import com.morihacky.android.rxjava.volley.MyVolley;
import com.squareup.leakcanary.LeakCanary;
import com.squareup.leakcanary.RefWatcher;
import timber.log.Timber;

public class MyApp extends Application {
public class MyApp extends MultiDexApplication {

private static MyApp _instance;
private RefWatcher _refWatcher;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,16 @@ void demoNetworkDetector() {
clickedOn(new NetworkDetectorFragment());
}

@OnClick(R.id.btn_demo_using)
void demoUsing() {
clickedOn(new UsingFragment());
}

@OnClick(R.id.btn_demo_multicastPlayground)
void demoMulticastPlayground() {
clickedOn(new MulticastPlaygroundFragment());
}

private void clickedOn(@NonNull Fragment fragment) {
final String tag = fragment.getClass().toString();
getActivity()
Expand Down
11 changes: 11 additions & 0 deletions app/src/main/kotlin/com/morihacky/android/rxjava/ext/RxExt.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.morihacky.android.rxjava.ext

import io.reactivex.disposables.CompositeDisposable
import io.reactivex.disposables.Disposable

operator fun CompositeDisposable.plus(disposable: Disposable): CompositeDisposable {
add(disposable)
return this
}


Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
package com.morihacky.android.rxjava.fragments

import android.content.Context
import android.os.Bundle
import android.os.Handler
import android.os.Looper
import android.view.LayoutInflater
import android.view.View
import android.view.ViewGroup
import android.widget.*
import butterknife.BindView
import butterknife.ButterKnife
import butterknife.OnClick
import com.jakewharton.rx.replayingShare
import com.morihacky.android.rxjava.R
import io.reactivex.Observable
import io.reactivex.disposables.Disposable
import java.util.concurrent.TimeUnit

class MulticastPlaygroundFragment : BaseFragment() {

@BindView(R.id.list_threading_log) lateinit var logList: ListView
@BindView(R.id.dropdown) lateinit var pickOperatorDD: Spinner
@BindView(R.id.msg_text) lateinit var messageText: TextView

private lateinit var sharedObservable: Observable<Long>
private lateinit var adapter: LogAdapter

private var logs: MutableList<String> = ArrayList()
private var disposable1: Disposable? = null
private var disposable2: Disposable? = null

override fun onCreateView(inflater: LayoutInflater?,
container: ViewGroup?,
savedInstanceState: Bundle?): View? {
val layout = inflater!!.inflate(R.layout.fragment_multicast_playground, container, false)
ButterKnife.bind(this, layout)

_setupLogger()
_setupDropdown()

return layout
}

@OnClick(R.id.btn_1)
fun onBtn1Click() {

disposable1?.let {
it.dispose()
_log("subscriber 1 disposed")
disposable1 = null
return
}

disposable1 =
sharedObservable
.doOnSubscribe { _log("subscriber 1 (subscribed)") }
.subscribe({ long -> _log("subscriber 1: onNext $long") })

}

@OnClick(R.id.btn_2)
fun onBtn2Click() {
disposable2?.let {
it.dispose()
_log("subscriber 2 disposed")
disposable2 = null
return
}

disposable2 =
sharedObservable
.doOnSubscribe { _log("subscriber 2 (subscribed)") }
.subscribe({ long -> _log("subscriber 2: onNext $long") })
}

@OnClick(R.id.btn_3)
fun onBtn3Click() {
logs = ArrayList<String>()
adapter.clear()
}

// -----------------------------------------------------------------------------------
// Method that help wiring up the example (irrelevant to RxJava)

private fun _log(logMsg: String) {

if (_isCurrentlyOnMainThread()) {
logs.add(0, logMsg + " (main thread) ")
adapter.clear()
adapter.addAll(logs)
} else {
logs.add(0, logMsg + " (NOT main thread) ")

// You can only do below stuff on main thread.
Handler(Looper.getMainLooper()).post {
adapter.clear()
adapter.addAll(logs)
}
}
}

private fun _setupLogger() {
logs = ArrayList<String>()
adapter = LogAdapter(activity, ArrayList<String>())
logList.adapter = adapter
}

private fun _setupDropdown() {
pickOperatorDD.adapter = ArrayAdapter<String>(context,
android.R.layout.simple_spinner_dropdown_item,
arrayOf(".publish().refCount()",
".publish().autoConnect(2)",
".replay(1).autoConnect(2)",
".replay(1).refCount()",
".replayingShare()"))


pickOperatorDD.onItemSelectedListener = object : AdapterView.OnItemSelectedListener {

override fun onItemSelected(p0: AdapterView<*>?, p1: View?, index: Int, p3: Long) {

val sourceObservable = Observable.interval(0L, 3, TimeUnit.SECONDS)
.doOnSubscribe { _log("observer (subscribed)") }
.doOnDispose { _log("observer (disposed)") }
.doOnTerminate { _log("observer (terminated)") }

sharedObservable =
when (index) {
0 -> {
messageText.setText(R.string.msg_demo_multicast_publishRefCount)
sourceObservable.publish().refCount()
}
1 -> {
messageText.setText(R.string.msg_demo_multicast_publishAutoConnect)
sourceObservable.publish().autoConnect(2)
}
2 -> {
messageText.setText(R.string.msg_demo_multicast_replayAutoConnect)
sourceObservable.replay(1).autoConnect(2)
}
3 -> {
messageText.setText(R.string.msg_demo_multicast_replayRefCount)
sourceObservable.replay(1).refCount()
}
4 -> {
messageText.setText(R.string.msg_demo_multicast_replayingShare)
sourceObservable.replayingShare()
}
else -> throw RuntimeException("got to pick an op yo!")
}
}

override fun onNothingSelected(p0: AdapterView<*>?) {}
}
}

private fun _isCurrentlyOnMainThread(): Boolean {
return Looper.myLooper() == Looper.getMainLooper()
}

private inner class LogAdapter(context: Context, logs: List<String>) :
ArrayAdapter<String>(context, R.layout.item_log, R.id.item_log, logs)

}

Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,21 @@ class PlaygroundFragment : BaseFragment() {
private var _logsList: ListView? = null
private var _adapter: LogAdapter? = null

private var _attempt = 0
private var _logs: MutableList<String> = ArrayList()

override fun onCreateView(inflater: LayoutInflater?,
container: ViewGroup?,
savedInstanceState: Bundle?): View? {
return inflater!!.inflate(R.layout.fragment_concurrency_schedulers, container, false)
}

override fun onActivityCreated(savedInstanceState: Bundle?) {
super.onActivityCreated(savedInstanceState)
val view = inflater?.inflate(R.layout.fragment_concurrency_schedulers, container, false)

_logsList = activity.findViewById(R.id.list_threading_log) as ListView
_logsList = view?.findViewById(R.id.list_threading_log) as ListView
_setupLogger()

activity.findViewById(R.id.btn_start_operation).setOnClickListener { _ ->
view.findViewById(R.id.btn_start_operation).setOnClickListener { _ ->
_log("Button clicked")
}

_setupLogger()
return view
}

// -----------------------------------------------------------------------------------
Expand All @@ -44,23 +40,23 @@ class PlaygroundFragment : BaseFragment() {

if (_isCurrentlyOnMainThread()) {
_logs.add(0, logMsg + " (main thread) ")
_adapter!!.clear()
_adapter!!.addAll(_logs)
_adapter?.clear()
_adapter?.addAll(_logs)
} else {
_logs.add(0, logMsg + " (NOT main thread) ")

// You can only do below stuff on main thread.
Handler(Looper.getMainLooper()).post {
_adapter!!.clear()
_adapter!!.addAll(_logs)
_adapter?.clear()
_adapter?.addAll(_logs)
}
}
}

private fun _setupLogger() {
_logs = ArrayList<String>()
_adapter = LogAdapter(activity, ArrayList<String>())
_logsList!!.adapter = _adapter
_logsList?.adapter = _adapter
}

private fun _isCurrentlyOnMainThread(): Boolean {
Expand Down
Loading