@@ -32,6 +32,15 @@ import org.scalatest.junit.JUnitSuite
32
32
import rx .lang .scala ._
33
33
import rx .lang .scala .schedulers ._
34
34
35
+ /**
36
+ * Demo how the different operators can be used. In Eclipse, you can right-click
37
+ * a test and choose "Run As" > "Scala JUnit Test".
38
+ *
39
+ * For each operator added to Observable.java, we add a little usage demo here.
40
+ * It does not need to test the functionality (that's already done by the tests in
41
+ * RxJava core), but it should demonstrate how it can be used, to make sure that
42
+ * the method signature makes sense.
43
+ */
35
44
@ Ignore // Since this doesn't do automatic testing, don't increase build time unnecessarily
36
45
class RxScalaDemo extends JUnitSuite {
37
46
@@ -78,13 +87,9 @@ class RxScalaDemo extends JUnitSuite {
78
87
val first = Observable .from(List (10 , 11 , 12 ))
79
88
val second = Observable .from(List (10 , 11 , 12 ))
80
89
81
- val b1 = (first zip second) map (p => p._1 == p._2) forall (b => b)
82
-
83
- val equality = (a : Any , b : Any ) => a == b
84
- val b2 = (first zip second) map (p => equality(p._1, p._2)) forall (b => b)
90
+ val b = (first zip second) forall { case (a, b) => a == b }
85
91
86
- assertTrue(b1.toBlockingObservable.single)
87
- assertTrue(b2.toBlockingObservable.single)
92
+ assertTrue(b.toBlockingObservable.single)
88
93
}
89
94
90
95
@ Test def testObservableComparisonWithForComprehension () {
@@ -93,7 +98,7 @@ class RxScalaDemo extends JUnitSuite {
93
98
94
99
val booleans = for ((n1, n2) <- (first zip second)) yield (n1 == n2)
95
100
96
- val b1 = booleans.forall(_ == true ) // without `== true`, b1 is assigned the forall function
101
+ val b1 = booleans.forall(identity)
97
102
98
103
assertTrue(b1.toBlockingObservable.single)
99
104
}
@@ -216,18 +221,21 @@ class RxScalaDemo extends JUnitSuite {
216
221
}).flatten.toBlockingObservable.foreach(println(_))
217
222
}
218
223
219
- @ Ignore // TODO something's bad here
220
224
@ Test def timingTest1 () {
221
225
val numbersByModulo3 = Observable .interval(1000 millis).take(9 ).groupBy(_ % 3 )
222
226
223
227
val t0 = System .currentTimeMillis
224
228
225
229
(for ((modulo, numbers) <- numbersByModulo3) yield {
226
230
println(" Observable for modulo" + modulo + " started at t = " + (System .currentTimeMillis - t0))
227
- numbers.take(1 ) // <- TODO very unexpected
228
- // numbers
231
+ numbers.map(n => s " ${n} is in the modulo- $modulo group " )
229
232
}).flatten.toBlockingObservable.foreach(println(_))
230
233
}
234
+
235
+ @ Test def testOlympicYearTicks () {
236
+ Olympics .yearTicks.subscribe(println(_))
237
+ waitFor(Olympics .yearTicks)
238
+ }
231
239
232
240
@ Test def groupByExample () {
233
241
val medalsByCountry = Olympics .mountainBikeMedals.groupBy(medal => medal.country)
@@ -238,8 +246,10 @@ class RxScalaDemo extends JUnitSuite {
238
246
firstMedalOfEachCountry.subscribe(medal => {
239
247
println(s " ${medal.country} wins its first medal in ${medal.year}" )
240
248
})
249
+
250
+ Olympics .yearTicks.subscribe(year => println(s " \n Year $year starts. " ))
241
251
242
- waitFor(firstMedalOfEachCountry )
252
+ waitFor(Olympics .yearTicks )
243
253
}
244
254
245
255
@ Test def groupByUntilExample () {
@@ -250,30 +260,36 @@ class RxScalaDemo extends JUnitSuite {
250
260
}
251
261
252
262
@ Test def combineLatestExample () {
253
- val first_counter = Observable .interval(250 millis)
254
- val second_counter = Observable .interval(550 millis)
255
- val combined_counter = first_counter .combineLatest(second_counter ,
263
+ val firstCounter = Observable .interval(250 millis)
264
+ val secondCounter = Observable .interval(550 millis)
265
+ val combinedCounter = firstCounter .combineLatest(secondCounter ,
256
266
(x : Long , y : Long ) => List (x,y)) take 10
257
267
258
- combined_counter subscribe {x => println(s " Emitted group: $x" )}
268
+ combinedCounter subscribe {x => println(s " Emitted group: $x" )}
269
+ waitFor(combinedCounter)
259
270
}
260
271
272
+ @ Test def olympicsExampleWithoutPublish () {
273
+ val medals = Olympics .mountainBikeMedals.doOnEach(_ => println(" onNext" ))
274
+ medals.subscribe(println(_)) // triggers an execution of medals Observable
275
+ waitFor(medals) // triggers another execution of medals Observable
276
+ }
261
277
262
- @ Test def olympicsExample () {
263
- val medals = Olympics .mountainBikeMedals.publish
264
- medals.subscribe(println(_))
278
+ @ Test def olympicsExampleWithPublish () {
279
+ val medals = Olympics .mountainBikeMedals.doOnEach(_ => println( " onNext " )). publish
280
+ medals.subscribe(println(_)) // triggers an execution of medals Observable
265
281
medals.connect
266
- // waitFor(medals)
282
+ waitFor(medals) // triggers another execution of medals Observable
267
283
}
268
284
269
285
@ Test def exampleWithoutPublish () {
270
- val unshared = List (1 to 4 ).toObservable
286
+ val unshared = Observable .from (1 to 4 )
271
287
unshared.subscribe(n => println(s " subscriber 1 gets $n" ))
272
288
unshared.subscribe(n => println(s " subscriber 2 gets $n" ))
273
289
}
274
290
275
291
@ Test def exampleWithPublish () {
276
- val unshared = List (1 to 4 ).toObservable
292
+ val unshared = Observable .from (1 to 4 )
277
293
val shared = unshared.publish
278
294
shared.subscribe(n => println(s " subscriber 1 gets $n" ))
279
295
shared.subscribe(n => println(s " subscriber 2 gets $n" ))
@@ -402,7 +418,7 @@ class RxScalaDemo extends JUnitSuite {
402
418
}
403
419
404
420
@ Test def timestampExample () {
405
- val timestamped = Observable .interval(100 millis).take(3 ).timestamp.toBlockingObservable
421
+ val timestamped = Observable .interval(100 millis).take(6 ).timestamp.toBlockingObservable
406
422
for ((millis, value) <- timestamped if value > 0 ) {
407
423
println(value + " at t = " + millis)
408
424
}
@@ -441,35 +457,57 @@ class RxScalaDemo extends JUnitSuite {
441
457
val oc3 : rx.Notification [_ <: Int ] = oc2.asJavaNotification
442
458
val oc4 : rx.Notification [_ <: Any ] = oc2.asJavaNotification
443
459
}
444
-
445
- @ Test def elementAtReplacement () {
446
- assertEquals(" b" , List (" a" , " b" , " c" ).toObservable.drop(1 ).first.toBlockingObservable.single)
447
- }
448
-
449
- @ Test def elementAtOrDefaultReplacement () {
450
- assertEquals(" b" , List (" a" , " b" , " c" ).toObservable.drop(1 ).firstOrElse(" !" ).toBlockingObservable.single)
451
- assertEquals(" !!" , List (" a" , " b" , " c" ).toObservable.drop(10 ).firstOrElse(" !!" ).toBlockingObservable.single)
452
- }
453
460
454
461
@ Test def takeWhileWithIndexAlternative {
455
462
val condition = true
456
463
List (" a" , " b" ).toObservable.zipWithIndex.takeWhile{case (elem, index) => condition}.map(_._1)
457
464
}
458
465
459
- @ Test def createExample () {
466
+ def calculateElement (index : Int ): String = {
467
+ println(" omg I'm calculating so hard" )
468
+ index match {
469
+ case 0 => " a"
470
+ case 1 => " b"
471
+ case _ => throw new IllegalArgumentException
472
+ }
473
+ }
474
+
475
+ /**
476
+ * This is a bad way of using Observable.create, because even if the consumer unsubscribes,
477
+ * all elements are calculated.
478
+ */
479
+ @ Test def createExampleBad () {
460
480
val o = Observable .create[String ](observer => {
461
- // this is bad because you cannot unsubscribe!
462
- observer.onNext(" a" )
463
- observer.onNext(" b" )
481
+ observer.onNext(calculateElement(0 ))
482
+ observer.onNext(calculateElement(1 ))
464
483
observer.onCompleted()
465
484
Subscription {}
466
485
})
467
- o.subscribe(println(_))
486
+ o.take(1 ).subscribe(println(_))
487
+ }
488
+
489
+ /**
490
+ * This is the good way of doing it: If the consumer unsubscribes, no more elements are
491
+ * calculated.
492
+ */
493
+ @ Test def createExampleGood () {
494
+ val o = Observable [String ](subscriber => {
495
+ var i = 0
496
+ while (i < 2 && ! subscriber.isUnsubscribed) {
497
+ subscriber.onNext(calculateElement(i))
498
+ i += 1
499
+ }
500
+ if (! subscriber.isUnsubscribed) subscriber.onCompleted()
501
+ })
502
+ o.take(1 ).subscribe(println(_))
468
503
}
469
504
470
505
def output (s : String ): Unit = println(s)
471
506
472
- // blocks until obs has completed
507
+ /** Subscribes to obs and waits until obs has completed. Note that if you subscribe to
508
+ * obs yourself and also call waitFor(obs), all side-effects of subscribing to obs
509
+ * will happen twice.
510
+ */
473
511
def waitFor [T ](obs : Observable [T ]): Unit = {
474
512
obs.toBlockingObservable.toIterable.last
475
513
}
0 commit comments