Skip to content

Commit b7721cf

Browse files
committed
Guide for channel basics
1 parent 12f961d commit b7721cf

File tree

9 files changed

+488
-5
lines changed

9 files changed

+488
-5
lines changed

coroutines-guide.md

Lines changed: 329 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,15 @@ This is a short guide on core features of `kotlinx.coroutines` with a series of
3333
* [Children of a coroutine](#children-of-a-coroutine)
3434
* [Combining contexts](#combining-contexts)
3535
* [Naming coroutines for debugging](#naming-coroutines-for-debugging)
36+
* [Channels](#channels)
37+
* [Channel basics](#channel-basics)
38+
* [Closing and iteration over channels](#closing-and-iteration-over-channels)
39+
* [Building channel producers](#building-channel-producers)
40+
* [Pipelines](#pipelines)
41+
* [Prime numbers with pipeline](#prime-numbers-with-pipeline)
42+
* [Fan-out](#fan-out)
43+
* [Fan-in](#fan-in)
44+
* [Buffered channels](#buffered-channels)
3645

3746
<!--- KNIT kotlinx-coroutines-core/src/test/kotlin/guide/.*\.kt -->
3847

@@ -441,8 +450,8 @@ This section covers various approaches to composition of suspending functions.
441450
### Sequential by default
442451

443452
Assume that we have two suspending functions defined elsewhere that do something useful like some kind of
444-
remote service call or computation. We'll just pretend they are useful, but each one will just actaully
445-
delay for a second for the purpose of this example:
453+
remote service call or computation. We just pretend they are useful, but actually each one just
454+
delays for a second for the purpose of this example:
446455

447456
<!--- INCLUDE .*/example-compose-([0-9]+).kt
448457
import kotlin.system.measureTimeMillis
@@ -672,7 +681,7 @@ fun main(args: Array<String>) = runBlocking<Unit> {
672681

673682
> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-context-03.kt)
674683
675-
There are three coroutines. The main couroutine (#1) -- `runBlocking` one,
684+
There are three coroutines. The main coroutine (#1) -- `runBlocking` one,
676685
and two coroutines computing deferred values `a` (#2) and `b` (#3).
677686
They are all executing in the context of `runBlocking` and are confined to the main thread.
678687
The output of this code is:
@@ -687,7 +696,7 @@ The `log` function prints the name of the thread in square brackets and you can
687696
thread, but the identifier of the currently executing coroutine is appended to it. This identifier
688697
is consecutively assigned to all created coroutines when debugging mode is turned on.
689698

690-
You can read more about debugging facilities in documentation for `newCoroutineContext` function.
699+
You can read more about debugging facilities in the documentation for `newCoroutineContext` function.
691700

692701
### Jumping between threads
693702

@@ -822,7 +831,7 @@ main: Who has survived request cancellation?
822831

823832
### Naming coroutines for debugging
824833

825-
Automatically assignmed ids are good when coroutines log often and you just need to correlate log records
834+
Automatically assigned ids are good when coroutines log often and you just need to correlate log records
826835
coming from the same coroutine. However, when coroutine is tied to the processing of a specific request
827836
or doing some specific background task, it is better to name it explicitly for debugging purposes.
828837
Coroutine name serves the same function as a thread name. It'll get displayed in the thread name that
@@ -861,4 +870,319 @@ The output it produces with `-Dkotlinx.coroutines.debug` JVM option is similar t
861870
[main @main#1] The answer for v1 / v2 = 42
862871
```
863872

873+
## Channels
864874

875+
Deferred values provide a convenient way to transfer a single value between coroutines.
876+
Channels provide a way to transfer a stream of values.
877+
878+
<!--- INCLUDE .*/example-channel-([0-9]+).kt
879+
import kotlinx.coroutines.experimental.channels.*
880+
-->
881+
882+
### Channel basics
883+
884+
A `Channel` is conceptually very similar to `BlockingQueue`. One key difference is that
885+
instead of a blocking `put` operation it has a suspending `send`, and instead of
886+
a blocking `take` operation it has a suspending `receive`.
887+
888+
```kotlin
889+
fun main(args: Array<String>) = runBlocking<Unit> {
890+
val channel = Channel<Int>()
891+
launch(CommonPool) {
892+
// this might be heavy CPU-consuming computation or async logic, we'll just send five squares
893+
for (x in 1..5) channel.send(x * x)
894+
}
895+
// here we print five received integers:
896+
repeat(5) { println(channel.receive()) }
897+
println("Done!")
898+
}
899+
```
900+
901+
> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-01.kt)
902+
903+
### Closing and iteration over channels
904+
905+
Unlike a queue, a channel can be closed to indicate that no more elements are coming.
906+
On the receiver side it is convenient to use a regular `for` loop to receive elements
907+
from the channel.
908+
909+
Conceptually, a `close` is like sending a special close token to the channel.
910+
The iteration stops as soon as this close token is received, so there is a guarantee
911+
that all previously sent elements before the close are received:
912+
913+
```kotlin
914+
fun main(args: Array<String>) = runBlocking<Unit> {
915+
val channel = Channel<Int>()
916+
launch(CommonPool) {
917+
for (x in 1..5) channel.send(x * x)
918+
channel.close() // we're done sending
919+
}
920+
// here we print received values using `for` loop (until the channel is closed)
921+
for (y in channel) println(y)
922+
println("Done!")
923+
}
924+
```
925+
926+
> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-02.kt)
927+
928+
### Building channel producers
929+
930+
The pattern where a coroutine is producing a sequence of elements into a channel is quite common.
931+
You could abstract such a producer into a function that takes channel as its parameter, but this goes contrary
932+
to common sense that results must be returned from functions. Here is a convenience
933+
coroutine builder named `buildChannel` that makes it easy to do it right:
934+
935+
```kotlin
936+
fun produceSquares() = buildChannel<Int>(CommonPool) {
937+
for (x in 1..5) send(x * x)
938+
}
939+
940+
fun main(args: Array<String>) = runBlocking<Unit> {
941+
val squares = produceSquares()
942+
for (y in squares) println(y)
943+
println("Done!")
944+
}
945+
```
946+
947+
> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-03.kt)
948+
949+
### Pipelines
950+
951+
Pipeline is a pattern where one coroutine is producing, possibly infinite, stream of values:
952+
953+
```kotlin
954+
fun produceNumbers() = buildChannel<Int>(CommonPool) {
955+
var x = 1
956+
while (true) send(x++) // infinite stream of integers starting from 1
957+
}
958+
```
959+
960+
And another coroutine or coroutines are receiving that stream, doing some processing, and sending the result.
961+
In the below example the numbers are just squared:
962+
963+
```kotlin
964+
fun square(numbers: ReceiveChannel<Int>) = buildChannel<Int>(CommonPool) {
965+
for (x in numbers) send(x * x)
966+
}
967+
```
968+
969+
The main code starts and connects pipeline:
970+
971+
```kotlin
972+
fun main(args: Array<String>) = runBlocking<Unit> {
973+
val numbers = produceNumbers() // produces integers from 1 and on
974+
val squares = square(numbers) // squares integers
975+
for (i in 1..5) println(squares.receive()) // print first five
976+
println("Done!") // we are done
977+
squares.cancel() // need to cancel these coroutines in a larger app
978+
numbers.cancel()
979+
}
980+
```
981+
982+
> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-04.kt)
983+
984+
We don't have to cancel these coroutines in this example app, because
985+
[coroutines are like daemon threads](#coroutines-are-like-daemon-threads),
986+
but in a larger app we'll need to stop our pipeline if we don't need it anymore.
987+
Alternatively, we could have run pipeline coroutines as
988+
[children of a coroutine](#children-of-a-coroutine).
989+
990+
### Prime numbers with pipeline
991+
992+
Let's take pipelines to the extreme, with an example that generates prime numbers using a pipeline
993+
of coroutines. We start with an infinite sequence of numbers. This time we introduce an
994+
explicit context parameter, so that caller can control where our coroutines run:
995+
996+
<!--- INCLUDE kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-05.kt
997+
import kotlin.coroutines.experimental.CoroutineContext
998+
-->
999+
1000+
```kotlin
1001+
fun numbersFrom(context: CoroutineContext, start: Int) = buildChannel<Int>(context) {
1002+
var x = start
1003+
while (true) send(x++) // infinite stream of integers from start
1004+
}
1005+
```
1006+
1007+
The following pipeline stage filters an incoming stream of numbers, removing all the numbers
1008+
that are divisible by the given prime number:
1009+
1010+
```kotlin
1011+
fun filter(context: CoroutineContext, numbers: ReceiveChannel<Int>, prime: Int) = buildChannel<Int>(context) {
1012+
for (x in numbers) if (x % prime != 0) send(x)
1013+
}
1014+
```
1015+
1016+
Now we build our pipeline by starting a stream of numbers from 2, taking a prime number from the current channel,
1017+
and launching new pipeline stage for each prime number found. The following example prints first ten prime numbers,
1018+
running the whole pipeline in the context of the main thread:
1019+
1020+
```kotlin
1021+
fun main(args: Array<String>) = runBlocking<Unit> {
1022+
var cur = numbersFrom(context, 2)
1023+
for (i in 1..10) {
1024+
val prime = cur.receive()
1025+
println(prime)
1026+
cur = filter(context, cur, prime)
1027+
}
1028+
}
1029+
```
1030+
1031+
> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-05.kt)
1032+
1033+
The output of this code is:
1034+
1035+
```
1036+
2
1037+
3
1038+
5
1039+
7
1040+
11
1041+
13
1042+
17
1043+
19
1044+
23
1045+
29
1046+
```
1047+
1048+
### Fan-out
1049+
1050+
Multiple coroutines may receive from the same channel, distributing work between themselves.
1051+
Let us start with a producer coroutine that is periodically producing integers
1052+
(ten numbers per second):
1053+
1054+
```kotlin
1055+
fun produceNumbers() = buildChannel<Int>(CommonPool) {
1056+
var x = 1 // start from 1
1057+
while (true) {
1058+
send(x++) // produce next
1059+
delay(100) // wait 0.1s
1060+
}
1061+
}
1062+
```
1063+
1064+
Then we can have several processor coroutines. In this example, they just print their id and
1065+
received number:
1066+
1067+
```kotlin
1068+
fun launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch(CommonPool) {
1069+
while (true) {
1070+
val x = channel.receive()
1071+
println("Processor #$id received $x")
1072+
}
1073+
}
1074+
```
1075+
1076+
Now let us launch five processors and let them work for a second. See what happens:
1077+
1078+
```kotlin
1079+
fun main(args: Array<String>) = runBlocking<Unit> {
1080+
val producer = produceNumbers()
1081+
repeat(5) { launchProcessor(it, producer) }
1082+
delay(1000)
1083+
producer.cancel() // cancel producer coroutine and thus kill them all
1084+
}
1085+
```
1086+
1087+
> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-06.kt)
1088+
1089+
The output will be similar to the the following one, albeit the processor ids that receive
1090+
each specific integer may be different:
1091+
1092+
```
1093+
Processor #2 received 1
1094+
Processor #4 received 2
1095+
Processor #0 received 3
1096+
Processor #1 received 4
1097+
Processor #3 received 5
1098+
Processor #2 received 6
1099+
Processor #4 received 7
1100+
Processor #0 received 8
1101+
Processor #1 received 9
1102+
Processor #3 received 10
1103+
```
1104+
1105+
Note, that cancelling a producer coroutine closes its channel, thus eventually terminating iteration
1106+
over the channel that processor coroutines are doing.
1107+
1108+
### Fan-in
1109+
1110+
Multiple coroutines may send to the same channel.
1111+
For example, let us have a channel of strings, and a suspending function that
1112+
repeatedly sends a specified string to this channel with a specified delay:
1113+
1114+
```kotlin
1115+
suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
1116+
while (true) {
1117+
delay(time)
1118+
channel.send(s)
1119+
}
1120+
}
1121+
```
1122+
1123+
Now, let us see what happen if we launch a couple of coroutines sending strings
1124+
(in this example we launch them in the context of the main thread):
1125+
1126+
```kotlin
1127+
fun main(args: Array<String>) = runBlocking<Unit> {
1128+
val channel = Channel<String>()
1129+
launch(context) { sendString(channel, "foo", 200L) }
1130+
launch(context) { sendString(channel, "BAR!", 500L) }
1131+
repeat(6) { // receive first six
1132+
println(channel.receive())
1133+
}
1134+
}
1135+
```
1136+
1137+
> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-07.kt)
1138+
1139+
The output is:
1140+
1141+
```
1142+
foo
1143+
foo
1144+
BAR!
1145+
foo
1146+
foo
1147+
BAR!
1148+
```
1149+
1150+
### Buffered channels
1151+
1152+
The channels shown so far had no buffer. Unbuffered channels transfer elements when sender and receiver
1153+
meet each other (aka rendezvous). If send is invoked first, then it is suspended until receive is invoked,
1154+
if receive is invoked first, it is suspended until send is invoked.
1155+
1156+
Both `Channel()` factory and `buildChanner{}` builder take an optional `capacity` parameter to
1157+
specify _buffer size_. Buffer allows senders to send multiple elements before suspending,
1158+
similar to the `BlockingQueue` with a specified capacity, which blocks when buffer is full.
1159+
1160+
Take a look at the behavior of the following code:
1161+
1162+
```kotlin
1163+
fun main(args: Array<String>) = runBlocking<Unit> {
1164+
val channel = Channel<Int>(4) // create buffered channel
1165+
launch(context) { // launch sender coroutine
1166+
repeat(10) {
1167+
println("Sending $it") // print before sending each element
1168+
channel.send(it) // will suspend when buffer is full
1169+
}
1170+
}
1171+
// don't receive anything... just wait....
1172+
delay(1000)
1173+
}
1174+
```
1175+
1176+
> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-08.kt)
1177+
1178+
It prints "sending" _five_ times using a buffered channel with capacity of _four_:
1179+
1180+
```
1181+
Sending 0
1182+
Sending 1
1183+
Sending 2
1184+
Sending 3
1185+
Sending 4
1186+
```
1187+
1188+
The first four elements are added to the buffer and the sender suspends when trying to send the fifth one.
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
// This file was automatically generated from coroutines-guide.md by Knit tool. Do not edit.
2+
package guide.channel.example01
3+
4+
import kotlinx.coroutines.experimental.*
5+
import kotlinx.coroutines.experimental.channels.*
6+
7+
fun main(args: Array<String>) = runBlocking<Unit> {
8+
val channel = Channel<Int>()
9+
launch(CommonPool) {
10+
// this might be heavy CPU-consuming computation or async logic, we'll just send five squares
11+
for (x in 1..5) channel.send(x * x)
12+
}
13+
// here we print five received integers:
14+
repeat(5) { println(channel.receive()) }
15+
println("Done!")
16+
}

0 commit comments

Comments
 (0)