diff --git a/integration/kotlinx-coroutines-jdk8/src/main/kotlin/kotlinx/coroutines/experimental/channels8/Channels.kt b/integration/kotlinx-coroutines-jdk8/src/main/kotlin/kotlinx/coroutines/experimental/channels8/Channels.kt new file mode 100644 index 0000000000..dd7ca412f4 --- /dev/null +++ b/integration/kotlinx-coroutines-jdk8/src/main/kotlin/kotlinx/coroutines/experimental/channels8/Channels.kt @@ -0,0 +1,82 @@ +/* + * Copyright 2016-2017 JetBrains s.r.o. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kotlinx.coroutines.experimental.channels8 + +import kotlinx.coroutines.experimental.CommonPool +import kotlinx.coroutines.experimental.channels.ProducerJob +import kotlinx.coroutines.experimental.channels.ReceiveChannel +import kotlinx.coroutines.experimental.channels.consumeEach +import kotlinx.coroutines.experimental.channels.produce +import kotlinx.coroutines.experimental.runBlocking +import java.util.* +import java.util.function.BiConsumer +import java.util.function.Consumer +import java.util.stream.Collector +import java.util.stream.Stream +import java.util.stream.StreamSupport +import kotlin.coroutines.experimental.CoroutineContext + +/** + * Creates a [ProducerJob] to read all element of the [Stream]. + */ +public fun Stream.asReceiveChannel(context: CoroutineContext = CommonPool): ProducerJob = produce(context) { + for (element in this@asReceiveChannel) + send(element) +} + +/** + * Creates a [Stream] of elements in this [ReceiveChannel]. + */ +public fun ReceiveChannel.asStream(): Stream = StreamSupport.stream(SpliteratorAdapter(this), false) + +/** + * Applies the [collector] to the [ReceiveChannel] + */ +public suspend fun ReceiveChannel.collect(collector: Collector): R { + val container: A = collector.supplier().get() + val accumulator: BiConsumer = collector.accumulator() + consumeEach { accumulator.accept(container, it) } + return collector.finisher().apply(container) +} + +private class SpliteratorAdapter(val channel: ReceiveChannel) : Spliterator { + override fun estimateSize(): Long = Long.MAX_VALUE + + override fun forEachRemaining(action: Consumer) { + runBlocking { + for (element in channel) + action.accept(element) + } + } + + override fun tryAdvance(action: Consumer): Boolean = runBlocking { + val element = channel.receiveOrNull() + if (element != null) { + action.accept(element) + true + } else false + } + + override fun characteristics(): Int = characteristics + + override fun trySplit(): Spliterator? = null + + private companion object { + @JvmStatic + private val characteristics = Spliterator.ORDERED or Spliterator.NONNULL + } +} diff --git a/integration/kotlinx-coroutines-jdk8/src/test/kotlin/kotlinx/coroutines/experimental/channels8/ChannelsTest.kt b/integration/kotlinx-coroutines-jdk8/src/test/kotlin/kotlinx/coroutines/experimental/channels8/ChannelsTest.kt new file mode 100644 index 0000000000..fdf75a9256 --- /dev/null +++ b/integration/kotlinx-coroutines-jdk8/src/test/kotlin/kotlinx/coroutines/experimental/channels8/ChannelsTest.kt @@ -0,0 +1,28 @@ +package kotlinx.coroutines.experimental.channels8 + +import kotlinx.coroutines.experimental.TestBase +import kotlinx.coroutines.experimental.channels.asReceiveChannel +import kotlinx.coroutines.experimental.channels.toList +import kotlinx.coroutines.experimental.runBlocking +import org.junit.Assert.assertEquals +import org.junit.Test +import java.util.stream.Collectors + +class ChannelsTest : TestBase() { + private val testList = listOf(1, 2, 3) + + @Test + fun testCollect() = runBlocking { + assertEquals(testList, testList.asReceiveChannel().collect(Collectors.toList())) + } + + @Test + fun testStreamAsReceiveChannel() = runBlocking { + assertEquals(testList, testList.stream().asReceiveChannel().toList()) + } + + @Test + fun testReceiveChannelAsStream() { + assertEquals(testList, testList.asReceiveChannel().asStream().collect(Collectors.toList())) + } +} diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channels.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channels.kt index eafc1558d1..1f334816c5 100644 --- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channels.kt +++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channels.kt @@ -16,8 +16,39 @@ package kotlinx.coroutines.experimental.channels +import kotlinx.coroutines.experimental.CommonPool +import kotlinx.coroutines.experimental.Unconfined +import kotlinx.coroutines.experimental.runBlocking +import kotlin.coroutines.experimental.CoroutineContext + internal const val DEFAULT_CLOSE_MESSAGE = "Channel was closed" +/** + * Creates a [ProducerJob] to read all element of the [Iterable]. + */ +public fun Iterable.asReceiveChannel(context: CoroutineContext = Unconfined): ReceiveChannel = produce(context) { + for (element in this@asReceiveChannel) + send(element) +} + +/** + * Creates an [ActorJob] to insert elements in this [MutableCollection]. + */ +public fun MutableCollection.asSendChannel(context: CoroutineContext = Unconfined): SendChannel = actor(context) { + for (element in channel) + this@asSendChannel += element +} + +/** + * Creates a [Sequence] instance that wraps the original [ReceiveChannel] returning its entries when being emitted. + */ +public fun ReceiveChannel.asSequence(): Sequence = + generateSequence { + runBlocking { + receiveOrNull() + } + } + /** * Performs the given [action] for each received element. */ @@ -35,3 +66,1196 @@ public suspend fun BroadcastChannel.consumeEach(action: suspend (E) -> Un for (x in channel) action(x) } } + +/** + * Performs the given [action] for each received element. + */ +// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448 +public suspend fun ReceiveChannel.consumeEachIndexed(action: suspend (IndexedValue) -> Unit) { + var index = 0 + for (element in this) action(IndexedValue(index++, element)) +} + +/** + * Removes at least minElements and at most maxElements from this channel and adds them to the given destination collection. + */ +public suspend fun ReceiveChannel.drainTo(destination: MutableCollection, + minElements: Int = 0, + maxElements: Int = Integer.MAX_VALUE) { + require(minElements >= 0) { "minElements cannot be negative" } + require(maxElements >= minElements) { "maxElements cannot be lesser than minElements" } + repeat(minElements) { + destination += receive() + } + repeat(maxElements - minElements) { + destination += poll() ?: return + } +} + +/** + * Returns an element at the given [index] or throws an [IndexOutOfBoundsException] if the [index] is out of bounds of this channel. + * + * The operation is _terminal_. + */ +public suspend fun ReceiveChannel.elementAt(index: Int): T { + return elementAtOrElse(index) { throw IndexOutOfBoundsException("ReceiveChannel doesn't contain element at index $index.") } +} + +/** + * Returns an element at the given [index] or the result of calling the [defaultValue] function if the [index] is out of bounds of this channel. + * + * The operation is _terminal_. + */ +public suspend fun ReceiveChannel.elementAtOrElse(index: Int, defaultValue: suspend (Int) -> T): T { + if (index < 0) + return defaultValue(index) + var count = 0 + for (element in this) { + if (index == count++) + return element + } + return defaultValue(index) +} + +/** + * Returns an element at the given [index] or `null` if the [index] is out of bounds of this channel. + * + * The operation is _terminal_. + */ +public suspend fun ReceiveChannel.elementAtOrNull(index: Int): T? { + if (index < 0) + return null + var count = 0 + for (element in this) { + if (index == count++) + return element + } + return null +} + +/** + * Returns the first element matching the given [predicate], or `null` if no such element was found. + * + * The operation is _terminal_. + */ +// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448 +public suspend fun ReceiveChannel.find(predicate: suspend (T) -> Boolean): T? { + return firstOrNull(predicate) +} + +/** + * Returns the last element matching the given [predicate], or `null` if no such element was found. + * + * The operation is _terminal_. + */ +// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448 +public suspend fun ReceiveChannel.findLast(predicate: suspend (T) -> Boolean): T? { + return lastOrNull(predicate) +} + +/** + * Returns first element. + * @throws [NoSuchElementException] if the channel is empty. + * + * The operation is _terminal_. + */ +public suspend fun ReceiveChannel.first(): T { + val iterator = iterator() + if (!iterator.hasNext()) + throw NoSuchElementException("ReceiveChannel is empty.") + return iterator.next() +} + +/** + * Returns the first element matching the given [predicate]. + * @throws [NoSuchElementException] if no such element is found. + * + * The operation is _terminal_. + */ +// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448 +public suspend fun ReceiveChannel.first(predicate: suspend (T) -> Boolean): T { + for (element in this) if (predicate(element)) return element + throw NoSuchElementException("ReceiveChannel contains no element matching the predicate.") +} + +/** + * Returns the first element, or `null` if the channel is empty. + * + * The operation is _terminal_. + */ +public suspend fun ReceiveChannel.firstOrNull(): T? { + val iterator = iterator() + if (!iterator.hasNext()) + return null + return iterator.next() +} + +/** + * Returns the first element matching the given [predicate], or `null` if element was not found. + * + * The operation is _terminal_. + */ +// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448 +public suspend fun ReceiveChannel.firstOrNull(predicate: suspend (T) -> Boolean): T? { + for (element in this) if (predicate(element)) return element + return null +} + +/** + * Returns first index of [element], or -1 if the channel does not contain element. + * + * The operation is _terminal_. + */ +public suspend fun ReceiveChannel.indexOf(element: T): Int { + var index = 0 + for (item in this) { + if (element == item) + return index + index++ + } + return -1 +} + +/** + * Returns index of the first element matching the given [predicate], or -1 if the channel does not contain such element. + * + * The operation is _terminal_. + */ +// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448 +public suspend fun ReceiveChannel.indexOfFirst(predicate: suspend (T) -> Boolean): Int { + var index = 0 + for (item in this) { + if (predicate(item)) + return index + index++ + } + return -1 +} + +/** + * Returns index of the last element matching the given [predicate], or -1 if the channel does not contain such element. + * + * The operation is _terminal_. + */ +// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448 +public suspend fun ReceiveChannel.indexOfLast(predicate: suspend (T) -> Boolean): Int { + var lastIndex = -1 + var index = 0 + for (item in this) { + if (predicate(item)) + lastIndex = index + index++ + } + return lastIndex +} + +/** + * Returns the last element. + * @throws [NoSuchElementException] if the channel is empty. + * + * The operation is _terminal_. + */ +public suspend fun ReceiveChannel.last(): T { + val iterator = iterator() + if (!iterator.hasNext()) + throw NoSuchElementException("ReceiveChannel is empty.") + var last = iterator.next() + while (iterator.hasNext()) + last = iterator.next() + return last +} + +/** + * Returns the last element matching the given [predicate]. + * @throws [NoSuchElementException] if no such element is found. + * + * The operation is _terminal_. + */ +// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448 +public suspend fun ReceiveChannel.last(predicate: suspend (T) -> Boolean): T { + var last: T? = null + var found = false + for (element in this) { + if (predicate(element)) { + last = element + found = true + } + } + if (!found) throw NoSuchElementException("ReceiveChannel contains no element matching the predicate.") + @Suppress("UNCHECKED_CAST") + return last as T +} + +/** + * Returns last index of [element], or -1 if the channel does not contain element. + * + * The operation is _terminal_. + */ +public suspend fun ReceiveChannel.lastIndexOf(element: T): Int { + var lastIndex = -1 + var index = 0 + for (item in this) { + if (element == item) + lastIndex = index + index++ + } + return lastIndex +} + +/** + * Returns the last element, or `null` if the channel is empty. + * + * The operation is _terminal_. + */ +public suspend fun ReceiveChannel.lastOrNull(): T? { + val iterator = iterator() + if (!iterator.hasNext()) + return null + var last = iterator.next() + while (iterator.hasNext()) + last = iterator.next() + return last +} + +/** + * Returns the last element matching the given [predicate], or `null` if no such element was found. + * + * The operation is _terminal_. + */ +// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448 +public suspend fun ReceiveChannel.lastOrNull(predicate: suspend (T) -> Boolean): T? { + var last: T? = null + for (element in this) { + if (predicate(element)) { + last = element + } + } + return last +} + +/** + * Returns the single element, or throws an exception if the channel is empty or has more than one element. + * + * The operation is _terminal_. + */ +public suspend fun ReceiveChannel.single(): T { + val iterator = iterator() + if (!iterator.hasNext()) + throw NoSuchElementException("ReceiveChannel is empty.") + val single = iterator.next() + if (iterator.hasNext()) + throw IllegalArgumentException("ReceiveChannel has more than one element.") + return single +} + +/** + * Returns the single element matching the given [predicate], or throws exception if there is no or more than one matching element. + * + * The operation is _terminal_. + */ +// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448 +public suspend fun ReceiveChannel.single(predicate: suspend (T) -> Boolean): T { + var single: T? = null + var found = false + for (element in this) { + if (predicate(element)) { + if (found) throw IllegalArgumentException("ReceiveChannel contains more than one matching element.") + single = element + found = true + } + } + if (!found) throw NoSuchElementException("ReceiveChannel contains no element matching the predicate.") + @Suppress("UNCHECKED_CAST") + return single as T +} + +/** + * Returns single element, or `null` if the channel is empty or has more than one element. + * + * The operation is _terminal_. + */ +public suspend fun ReceiveChannel.singleOrNull(): T? { + val iterator = iterator() + if (!iterator.hasNext()) + return null + val single = iterator.next() + if (iterator.hasNext()) + return null + return single +} + +/** + * Returns the single element matching the given [predicate], or `null` if element was not found or more than one element was found. + * + * The operation is _terminal_. + */ +// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448 +public suspend fun ReceiveChannel.singleOrNull(predicate: suspend (T) -> Boolean): T? { + var single: T? = null + var found = false + for (element in this) { + if (predicate(element)) { + if (found) return null + single = element + found = true + } + } + if (!found) return null + return single +} + +/** + * Returns a channel containing all elements except first [n] elements. + * + * The operation is _intermediate_ and _stateless_. + */ +public suspend fun ReceiveChannel.drop(n: Int): ReceiveChannel = produce(CommonPool) { + require(n >= 0) { "Requested element count $n is less than zero." } + var remaining: Int = n + if (remaining > 0) + for (element in this@drop) { + remaining-- + if (remaining == 0) + break + } + for (element in this@drop) { + send(element) + } +} + +/** + * Returns a channel containing all elements except first elements that satisfy the given [predicate]. + * + * The operation is _intermediate_ and _stateless_. + */ +public suspend fun ReceiveChannel.dropWhile(predicate: suspend (T) -> Boolean): ReceiveChannel = produce(Unconfined) { + for (element in this@dropWhile) { + if (!predicate(element)) + break + } + for (element in this@dropWhile) { + send(element) + } +} + +/** + * Returns a channel containing only elements matching the given [predicate]. + * + * The operation is _intermediate_ and _stateless_. + */ +public suspend fun ReceiveChannel.filter(predicate: suspend (T) -> Boolean): ReceiveChannel = produce(Unconfined) { + for (element in this@filter) { + if (predicate(element)) + send(element) + } +} + +/** + * Returns a channel containing only elements matching the given [predicate]. + * @param [predicate] function that takes the index of an element and the element itself + * and returns the result of predicate evaluation on the element. + * + * The operation is _intermediate_ and _stateless_. + */ +public suspend fun ReceiveChannel.filterIndexed(predicate: suspend (index: Int, T) -> Boolean): ReceiveChannel = produce(Unconfined) { + var index = 0 + for (element in this@filterIndexed) { + if (predicate(index++, element)) + send(element) + } +} + +/** + * Appends all elements matching the given [predicate] to the given [destination]. + * @param [predicate] function that takes the index of an element and the element itself + * and returns the result of predicate evaluation on the element. + * + * The operation is _terminal_. + */ +public suspend fun > ReceiveChannel.filterIndexedTo(destination: C, predicate: suspend (index: Int, T) -> Boolean): C { + consumeEachIndexed { (index, element) -> + if (predicate(index, element)) destination.add(element) + } + return destination +} + +/** + * Returns a channel containing all elements not matching the given [predicate]. + * + * The operation is _intermediate_ and _stateless_. + */ +public suspend fun ReceiveChannel.filterNot(predicate: suspend (T) -> Boolean): ReceiveChannel = filter() { !predicate(it) } + + +/** + * Returns a channel containing all elements that are not `null`. + * + * The operation is _intermediate_ and _stateless_. + */ +@Suppress("UNCHECKED_CAST") +public suspend fun ReceiveChannel.filterNotNull(): ReceiveChannel = filter { it != null } as ReceiveChannel + +/** + * Appends all elements that are not `null` to the given [destination]. + * + * The operation is _terminal_. + */ +public suspend fun , T : Any> ReceiveChannel.filterNotNullTo(destination: C): C { + for (element in this) if (element != null) destination.add(element) + return destination +} + +/** + * Appends all elements not matching the given [predicate] to the given [destination]. + * + * The operation is _terminal_. + */ +// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448 +public suspend fun > ReceiveChannel.filterNotTo(destination: C, predicate: suspend (T) -> Boolean): C { + for (element in this) if (!predicate(element)) destination.add(element) + return destination +} + +/** + * Appends all elements matching the given [predicate] to the given [destination]. + * + * The operation is _terminal_. + */ +// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448 +public suspend fun > ReceiveChannel.filterTo(destination: C, predicate: suspend (T) -> Boolean): C { + for (element in this) if (predicate(element)) destination.add(element) + return destination +} + +/** + * Returns a channel containing first [n] elements. + * + * The operation is _intermediate_ and _stateless_. + */ +public suspend fun ReceiveChannel.take(n: Int): ReceiveChannel = produce(CommonPool) { + if (n == 0) return@produce + require(n >= 0) { "Requested element count $n is less than zero." } + + var remaining: Int = n + for (element in this@take) { + send(element) + remaining-- + if (remaining == 0) + return@produce + } +} + +/** + * Returns a channel containing first elements satisfying the given [predicate]. + * + * The operation is _intermediate_ and _stateless_. + */ +public suspend fun ReceiveChannel.takeWhile(predicate: suspend (T) -> Boolean): ReceiveChannel = produce(Unconfined) { + for (element in this@takeWhile) { + if (!predicate(element)) return@produce + send(element) + } +} + +/** + * Returns a [Map] containing key-value pairs provided by [transform] function + * applied to elements of the given channel. + * + * If any of two pairs would have the same key the last one gets added to the map. + * + * The returned map preserves the entry iteration order of the original channel. + * + * The operation is _terminal_. + */ +// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448 +public suspend fun ReceiveChannel.associate(transform: suspend (T) -> Pair): Map { + return associateTo(LinkedHashMap(), transform) +} + +/** + * Returns a [Map] containing the elements from the given channel indexed by the key + * returned from [keySelector] function applied to each element. + * + * If any two elements would have the same key returned by [keySelector] the last one gets added to the map. + * + * The returned map preserves the entry iteration order of the original channel. + * + * The operation is _terminal_. + */ +// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448 +public suspend fun ReceiveChannel.associateBy(keySelector: suspend (T) -> K): Map { + return associateByTo(LinkedHashMap(), keySelector) +} + +/** + * Returns a [Map] containing the values provided by [valueTransform] and indexed by [keySelector] functions applied to elements of the given channel. + * + * If any two elements would have the same key returned by [keySelector] the last one gets added to the map. + * + * The returned map preserves the entry iteration order of the original channel. + * + * The operation is _terminal_. + */ +// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448 +public suspend fun ReceiveChannel.associateBy(keySelector: suspend (T) -> K, valueTransform: suspend (T) -> V): Map { + return associateByTo(LinkedHashMap(), keySelector, valueTransform) +} + +/** + * Populates and returns the [destination] mutable map with key-value pairs, + * where key is provided by the [keySelector] function applied to each element of the given channel + * and value is the element itself. + * + * If any two elements would have the same key returned by [keySelector] the last one gets added to the map. + * + * The operation is _terminal_. + */ +// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448 +public suspend fun > ReceiveChannel.associateByTo(destination: M, keySelector: suspend (T) -> K): M { + for (element in this) { + destination.put(keySelector(element), element) + } + return destination +} + +/** + * Populates and returns the [destination] mutable map with key-value pairs, + * where key is provided by the [keySelector] function and + * and value is provided by the [valueTransform] function applied to elements of the given channel. + * + * If any two elements would have the same key returned by [keySelector] the last one gets added to the map. + * + * The operation is _terminal_. + */ +// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448 +public suspend fun > ReceiveChannel.associateByTo(destination: M, keySelector: suspend (T) -> K, valueTransform: suspend (T) -> V): M { + for (element in this) { + destination.put(keySelector(element), valueTransform(element)) + } + return destination +} + +/** + * Populates and returns the [destination] mutable map with key-value pairs + * provided by [transform] function applied to each element of the given channel. + * + * If any of two pairs would have the same key the last one gets added to the map. + * + * The operation is _terminal_. + */ +// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448 +public suspend fun > ReceiveChannel.associateTo(destination: M, transform: suspend (T) -> Pair): M { + for (element in this) { + destination += transform(element) + } + return destination +} + +/** + * Appends all elements to the given [destination] collection. + * + * The operation is _terminal_. + */ +public suspend fun > ReceiveChannel.toCollection(destination: C): C { + for (item in this) { + destination.add(item) + } + return destination +} + +/** + * Returns a [List] containing all elements. + * + * The operation is _terminal_. + */ +public suspend fun ReceiveChannel.toList(): List { + return this.toMutableList() +} + +/** + * Returns a [Map] filled with all elements of this channel. + * + * The operation is _terminal_. + */ +public suspend fun ReceiveChannel>.toMap(): Map { + return toMap(LinkedHashMap()) +} + +/** + * Returns a [MutableMap] filled with all elements of this channel. + * + * The operation is _terminal_. + */ +public suspend fun > ReceiveChannel>.toMap(destination: M): M { + consumeEach { + destination += it + } + return destination +} + +/** + * Returns a [MutableList] filled with all elements of this channel. + * + * The operation is _terminal_. + */ +public suspend fun ReceiveChannel.toMutableList(): MutableList { + return toCollection(ArrayList()) +} + +/** + * Returns a [Set] of all elements. + * + * The returned set preserves the element iteration order of the original channel. + * + * The operation is _terminal_. + */ +public suspend fun ReceiveChannel.toSet(): Set { + return toCollection(LinkedHashSet()) +} + +/** + * Returns a single channel of all elements from results of [transform] function being invoked on each element of original channel. + * + * The operation is _intermediate_ and _stateless_. + */ +public suspend fun ReceiveChannel.flatMap(transform: suspend (T) -> ReceiveChannel): ReceiveChannel = produce(Unconfined) { + for (element in this@flatMap) { + for (sub in transform(element)) { + send(sub) + } + } +} + +/** + * Groups elements of the original channel by the key returned by the given [keySelector] function + * applied to each element and returns a map where each group key is associated with a list of corresponding elements. + * + * The returned map preserves the entry iteration order of the keys produced from the original channel. + * + * The operation is _terminal_. + */ +// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448 +public suspend fun ReceiveChannel.groupBy(keySelector: suspend (T) -> K): Map> { + return groupByTo(LinkedHashMap>(), keySelector) +} + +/** + * Groups values returned by the [valueTransform] function applied to each element of the original channel + * by the key returned by the given [keySelector] function applied to the element + * and returns a map where each group key is associated with a list of corresponding values. + * + * The returned map preserves the entry iteration order of the keys produced from the original channel. + * + * The operation is _terminal_. + */ +// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448 +public suspend fun ReceiveChannel.groupBy(keySelector: suspend (T) -> K, valueTransform: suspend (T) -> V): Map> { + return groupByTo(LinkedHashMap>(), keySelector, valueTransform) +} + +/** + * Groups elements of the original channel by the key returned by the given [keySelector] function + * applied to each element and puts to the [destination] map each group key associated with a list of corresponding elements. + * + * @return The [destination] map. + * + * The operation is _terminal_. + */ +// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448 +public suspend fun >> ReceiveChannel.groupByTo(destination: M, keySelector: suspend (T) -> K): M { + for (element in this) { + val key = keySelector(element) + val list = destination.getOrPut(key) { ArrayList() } + list.add(element) + } + return destination +} + +/** + * Groups values returned by the [valueTransform] function applied to each element of the original channel + * by the key returned by the given [keySelector] function applied to the element + * and puts to the [destination] map each group key associated with a list of corresponding values. + * + * @return The [destination] map. + * + * The operation is _terminal_. + */ +// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448 +public suspend fun >> ReceiveChannel.groupByTo(destination: M, keySelector: suspend (T) -> K, valueTransform: suspend (T) -> V): M { + for (element in this) { + val key = keySelector(element) + val list = destination.getOrPut(key) { ArrayList() } + list.add(valueTransform(element)) + } + return destination +} + +/** + * Returns a channel containing the results of applying the given [transform] function + * to each element in the original channel. + * + * The operation is _intermediate_ and _stateless_. + */ +public suspend fun ReceiveChannel.map(transform: suspend (T) -> R): ReceiveChannel = produce(Unconfined) { + for (element in this@map) { + send(transform(element)) + + } +} + +/** + * Returns a channel containing the results of applying the given [transform] function + * to each element and its index in the original channel. + * @param [transform] function that takes the index of an element and the element itself + * and returns the result of the transform applied to the element. + * + * The operation is _intermediate_ and _stateless_. + */ +public suspend fun ReceiveChannel.mapIndexed(transform: suspend (index: Int, T) -> R): ReceiveChannel = produce(Unconfined) { + var index = 0 + for (element in this@mapIndexed) { + send(transform(index++, element)) + + } +} + +/** + * Returns a channel containing only the non-null results of applying the given [transform] function + * to each element and its index in the original channel. + * @param [transform] function that takes the index of an element and the element itself + * and returns the result of the transform applied to the element. + * + * The operation is _intermediate_ and _stateless_. + */ +public suspend fun ReceiveChannel.mapIndexedNotNull(transform: suspend (index: Int, T) -> R?): ReceiveChannel = + mapIndexed(transform).filterNotNull() + +/** + * Applies the given [transform] function to each element and its index in the original channel + * and appends only the non-null results to the given [destination]. + * @param [transform] function that takes the index of an element and the element itself + * and returns the result of the transform applied to the element. + * + * The operation is _terminal_. + */ +// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448 +public suspend fun > ReceiveChannel.mapIndexedNotNullTo(destination: C, transform: suspend (index: Int, T) -> R?): C { + consumeEachIndexed { (index, element) -> + transform(index, element)?.let { destination.send(it) } + } + return destination +} + +/** + * Applies the given [transform] function to each element and its index in the original channel + * and appends the results to the given [destination]. + * @param [transform] function that takes the index of an element and the element itself + * and returns the result of the transform applied to the element. + * + * The operation is _terminal_. + */ +// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448 +public suspend fun > ReceiveChannel.mapIndexedTo(destination: C, transform: suspend (index: Int, T) -> R): C { + var index = 0 + for (item in this) + destination.send(transform(index++, item)) + return destination +} + +/** + * Returns a channel containing only the non-null results of applying the given [transform] function + * to each element in the original channel. + * + * The operation is _intermediate_ and _stateless_. + */ +// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448 +public suspend fun ReceiveChannel.mapNotNull(transform: suspend (T) -> R?): ReceiveChannel = + map(transform).filterNotNull() + +/** + * Applies the given [transform] function to each element in the original channel + * and appends only the non-null results to the given [destination]. + * + * The operation is _terminal_. + */ +// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448 +public suspend fun > ReceiveChannel.mapNotNullTo(destination: C, transform: suspend (T) -> R?): C { + consumeEach { element -> transform(element)?.let { destination.send(it) } } + return destination +} + +/** + * Applies the given [transform] function to each element of the original channel + * and appends the results to the given [destination]. + * + * The operation is _terminal_. + */ +// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448 +public suspend fun > ReceiveChannel.mapTo(destination: C, transform: suspend (T) -> R): C { + for (item in this) + destination.send(transform(item)) + return destination +} + +/** + * Returns a channel of [IndexedValue] for each element of the original channel. + * + * The operation is _intermediate_ and _stateless_. + */ +public suspend fun ReceiveChannel.withIndex(): ReceiveChannel> = produce(CommonPool) { + var index = 0 + for (element in this@withIndex) { + send(IndexedValue(index++, element)) + } +} + +/** + * Returns a channel containing only distinct elements from the given channel. + * + * The elements in the resulting channel are in the same order as they were in the source channel. + * + * The operation is _intermediate_ and _stateful_. + */ +public suspend fun ReceiveChannel.distinct(): ReceiveChannel { + return this.distinctBy { it } +} + +/** + * Returns a channel containing only elements from the given channel + * having distinct keys returned by the given [selector] function. + * + * The elements in the resulting channel are in the same order as they were in the source channel. + * + * The operation is _intermediate_ and _stateful_. + */ +public suspend fun ReceiveChannel.distinctBy(selector: suspend (T) -> K): ReceiveChannel = produce(Unconfined) { + val keys = HashSet() + for (element in this@distinctBy) { + val k = selector(element) + if (k !in keys) { + send(element) + keys += k + } + } +} + +/** + * Returns a mutable set containing all distinct elements from the given channel. + * + * The returned set preserves the element iteration order of the original channel. + * + * The operation is _terminal_. + */ +public suspend fun ReceiveChannel.toMutableSet(): MutableSet { + val set = LinkedHashSet() + for (item in this) set.add(item) + return set +} + +/** + * Returns `true` if all elements match the given [predicate]. + * + * The operation is _terminal_. + */ +// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448 +public suspend fun ReceiveChannel.all(predicate: suspend (T) -> Boolean): Boolean { + for (element in this) if (!predicate(element)) return false + return true +} + +/** + * Returns `true` if channel has at least one element. + * + * The operation is _terminal_. + */ +public suspend fun ReceiveChannel.any(): Boolean { + for (element in this) return true + return false +} + +/** + * Returns `true` if at least one element matches the given [predicate]. + * + * The operation is _terminal_. + */ +// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448 +public suspend fun ReceiveChannel.any(predicate: suspend (T) -> Boolean): Boolean { + for (element in this) if (predicate(element)) return true + return false +} + +/** + * Returns the number of elements in this channel. + * + * The operation is _terminal_. + */ +public suspend fun ReceiveChannel.count(): Int { + var count = 0 + for (element in this) count++ + return count +} + +/** + * Returns the number of elements matching the given [predicate]. + * + * The operation is _terminal_. + */ +// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448 +public suspend fun ReceiveChannel.count(predicate: suspend (T) -> Boolean): Int { + var count = 0 + for (element in this) if (predicate(element)) count++ + return count +} + +/** + * Accumulates value starting with [initial] value and applying [operation] from left to right to current accumulator value and each element. + * + * The operation is _terminal_. + */ +// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448 +public suspend fun ReceiveChannel.fold(initial: R, operation: suspend (acc: R, T) -> R): R { + var accumulator = initial + for (element in this) accumulator = operation(accumulator, element) + return accumulator +} + +/** + * Accumulates value starting with [initial] value and applying [operation] from left to right + * to current accumulator value and each element with its index in the original channel. + * @param [operation] function that takes the index of an element, current accumulator value + * and the element itself, and calculates the next accumulator value. + * + * The operation is _terminal_. + */ +// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448 +public suspend fun ReceiveChannel.foldIndexed(initial: R, operation: suspend (index: Int, acc: R, T) -> R): R { + var index = 0 + var accumulator = initial + for (element in this) accumulator = operation(index++, accumulator, element) + return accumulator +} + +/** + * Returns the first element yielding the largest value of the given function or `null` if there are no elements. + * + * The operation is _terminal_. + */ +public suspend fun > ReceiveChannel.maxBy(selector: suspend (T) -> R): T? { + val iterator = iterator() + if (!iterator.hasNext()) return null + var maxElem = iterator.next() + var maxValue = selector(maxElem) + while (iterator.hasNext()) { + val e = iterator.next() + val v = selector(e) + if (maxValue < v) { + maxElem = e + maxValue = v + } + } + return maxElem +} + +/** + * Returns the first element having the largest value according to the provided [comparator] or `null` if there are no elements. + * + * The operation is _terminal_. + */ +public suspend fun ReceiveChannel.maxWith(comparator: Comparator): T? { + val iterator = iterator() + if (!iterator.hasNext()) return null + var max = iterator.next() + while (iterator.hasNext()) { + val e = iterator.next() + if (comparator.compare(max, e) < 0) max = e + } + return max +} + +/** + * Returns the first element yielding the smallest value of the given function or `null` if there are no elements. + * + * The operation is _terminal_. + */ +public suspend fun > ReceiveChannel.minBy(selector: suspend (T) -> R): T? { + val iterator = iterator() + if (!iterator.hasNext()) return null + var minElem = iterator.next() + var minValue = selector(minElem) + while (iterator.hasNext()) { + val e = iterator.next() + val v = selector(e) + if (minValue > v) { + minElem = e + minValue = v + } + } + return minElem +} + +/** + * Returns the first element having the smallest value according to the provided [comparator] or `null` if there are no elements. + * + * The operation is _terminal_. + */ +public suspend fun ReceiveChannel.minWith(comparator: Comparator): T? { + val iterator = iterator() + if (!iterator.hasNext()) return null + var min = iterator.next() + while (iterator.hasNext()) { + val e = iterator.next() + if (comparator.compare(min, e) > 0) min = e + } + return min +} + +/** + * Returns `true` if the channel has no elements. + * + * The operation is _terminal_. + */ +public suspend fun ReceiveChannel.none(): Boolean { + for (element in this) return false + return true +} + +/** + * Returns `true` if no elements match the given [predicate]. + * + * The operation is _terminal_. + */ +// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448 +public suspend fun ReceiveChannel.none(predicate: suspend (T) -> Boolean): Boolean { + for (element in this) if (predicate(element)) return false + return true +} + +/** + * Accumulates value starting with the first element and applying [operation] from left to right to current accumulator value and each element. + * + * The operation is _terminal_. + */ +// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448 +public suspend fun ReceiveChannel.reduce(operation: suspend (acc: S, T) -> S): S { + val iterator = this.iterator() + if (!iterator.hasNext()) throw UnsupportedOperationException("Empty channel can't be reduced.") + var accumulator: S = iterator.next() + while (iterator.hasNext()) { + accumulator = operation(accumulator, iterator.next()) + } + return accumulator +} + +/** + * Accumulates value starting with the first element and applying [operation] from left to right + * to current accumulator value and each element with its index in the original channel. + * @param [operation] function that takes the index of an element, current accumulator value + * and the element itself and calculates the next accumulator value. + * + * The operation is _terminal_. + */ +// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448 +public suspend fun ReceiveChannel.reduceIndexed(operation: suspend (index: Int, acc: S, T) -> S): S { + val iterator = this.iterator() + if (!iterator.hasNext()) throw UnsupportedOperationException("Empty channel can't be reduced.") + var index = 1 + var accumulator: S = iterator.next() + while (iterator.hasNext()) { + accumulator = operation(index++, accumulator, iterator.next()) + } + return accumulator +} + +/** + * Returns the sum of all values produced by [selector] function applied to each element in the channel. + * + * The operation is _terminal_. + */ +// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448 +public suspend fun ReceiveChannel.sumBy(selector: suspend (T) -> Int): Int { + var sum: Int = 0 + for (element in this) { + sum += selector(element) + } + return sum +} + +/** + * Returns the sum of all values produced by [selector] function applied to each element in the channel. + * + * The operation is _terminal_. + */ +// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448 +public suspend fun ReceiveChannel.sumByDouble(selector: suspend (T) -> Double): Double { + var sum: Double = 0.0 + for (element in this) { + sum += selector(element) + } + return sum +} + +/** + * Returns an original collection containing all the non-`null` elements, throwing an [IllegalArgumentException] if there are any `null` elements. + * + * The operation is _intermediate_ and _stateless_. + */ +public suspend fun ReceiveChannel.requireNoNulls(): ReceiveChannel { + return map { it ?: throw IllegalArgumentException("null element found in $this.") } +} + +/** + * Splits the original channel into pair of lists, + * where *first* list contains elements for which [predicate] yielded `true`, + * while *second* list contains elements for which [predicate] yielded `false`. + * + * The operation is _terminal_. + */ +// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448 +public suspend fun ReceiveChannel.partition(predicate: suspend (T) -> Boolean): Pair, List> { + val first = ArrayList() + val second = ArrayList() + for (element in this) { + if (predicate(element)) { + first.add(element) + } else { + second.add(element) + } + } + return Pair(first, second) +} + +/** + * Send each element of the original channel + * and appends the results to the given [destination]. + * + * The operation is _terminal_. + */ +// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448 +public suspend fun > ReceiveChannel.sendTo(destination: C): C { + for (item in this) + destination.send(item) + return destination +} + +/** + * Returns a channel of pairs built from elements of both channels with same indexes. + * Resulting channel has length of shortest input channel. + * + * The operation is _intermediate_ and _stateless_. + */ +public infix suspend fun ReceiveChannel.zip(other: ReceiveChannel): ReceiveChannel> { + return zip(other) { t1, t2 -> t1 to t2 } +} + +/** + * Returns a channel of values built from elements of both collections with same indexes using provided [transform]. Resulting channel has length of shortest input channels. + * + * The operation is _intermediate_ and _stateless_. + */ +// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448 +public suspend fun ReceiveChannel.zip(other: ReceiveChannel, transform: suspend (a: T, b: R) -> V): ReceiveChannel = produce(Unconfined) { + for (element1 in this@zip) { + val element2 = other.receiveOrNull() ?: break + send(transform(element1, element2)) + } +} diff --git a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelsTest.kt b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelsTest.kt new file mode 100644 index 0000000000..550bacad09 --- /dev/null +++ b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelsTest.kt @@ -0,0 +1,202 @@ +/* + * Copyright 2016-2017 JetBrains s.r.o. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kotlinx.coroutines.experimental.channels + +import kotlinx.coroutines.experimental.CommonPool +import kotlinx.coroutines.experimental.runBlocking +import org.junit.Assert.* +import org.junit.Test + +class ChannelsTest { + + private val testList = listOf(1, 2, 3) + + @Test + fun testAsReceiveChannel() = runBlocking { + assertEquals(testList, testList.asReceiveChannel().toList()) + } + + @Test + fun testAsSequence() { + assertEquals(testList, testList.asReceiveChannel().asSequence().toList()) + } + + @Test + fun testAssociateBy() = runBlocking { + assertEquals(testList.associateBy { it % 2 }, testList.asReceiveChannel().associateBy { it % 2 }) + } + + @Test + fun testAsSequenceLazy() = runBlocking { + val numbers = produce(CommonPool) { + repeat(2) { i -> + send(i) + } + fail() + } + val take2 = numbers + .asSequence() + .take(2) + .toList() + + assertEquals(listOf(0, 1), take2) + } + + @Test + fun testDistinctBy() = runBlocking { + assertEquals(testList.distinctBy { it % 2 }.toList(), testList.asReceiveChannel().distinctBy { it % 2 }.toList()) + + } + + @Test + fun testDrainTo() = runBlocking { + val target = mutableListOf() + testList.asReceiveChannel().drainTo(target) + + assertEquals(testList, target) + } + + @Test + fun testDrop() = runBlocking { + for (i in 0..testList.size) { + assertEquals("Drop $i", testList.drop(i), testList.asReceiveChannel().drop(i).toList()) + } + } + + @Test + fun testElementAtOrElse() = runBlocking { + assertEquals(testList.elementAtOrElse(2) { 42 }, testList.asReceiveChannel().elementAtOrElse(2) { 42 }) + assertEquals(testList.elementAtOrElse(9) { 42 }, testList.asReceiveChannel().elementAtOrElse(9) { 42 }) + } + + @Test + fun testFirst() = runBlocking { + for (i in testList) { + assertEquals(testList.first { it == i }, testList.asReceiveChannel().first { it == i }) + } + try { + testList.asReceiveChannel().first { it == 9 } + fail() + } catch (nse: NoSuchElementException) { + } + } + + @Test + fun testFirstOrNull() = runBlocking { + assertEquals(testList.firstOrNull { it == 2 }, testList.asReceiveChannel().firstOrNull { it == 2 }) + assertEquals(testList.firstOrNull { it == 9 }, testList.asReceiveChannel().firstOrNull { it == 9 }) + } + + @Test + fun testFlatMap() = runBlocking { + assertEquals(testList.flatMap { (0..it).toList() }, testList.asReceiveChannel().flatMap { (0..it).asReceiveChannel() }.toList()) + + } + + @Test + fun testFold() = runBlocking { + assertEquals(testList.fold(mutableListOf(42)) { acc, e -> acc.apply { add(e) } }, + testList.asReceiveChannel().fold(mutableListOf(42)) { acc, e -> acc.apply { add(e) } }.toList()) + + } + + @Test + fun testGroupBy() = runBlocking { + assertEquals(testList.groupBy({ -it }, { it + 100 }), testList.asReceiveChannel().groupBy({ -it }, { it + 100 }).toMap()) + + } + + @Test + fun testMap() = runBlocking { + assertEquals(testList.map { it + 10 }, testList.asReceiveChannel().map { it + 10 }.toList()) + + } + + @Test + fun testEmptyList() = runBlocking { + assertTrue(emptyList().asReceiveChannel().toList().isEmpty()) + } + + @Test + fun testToList() = runBlocking { + assertEquals(testList, testList.asReceiveChannel().toList()) + + } + + @Test + fun testEmptySet() = runBlocking { + assertTrue(emptyList().asReceiveChannel().toSet().isEmpty()) + + } + + @Test + fun testToSet() = runBlocking { + assertEquals(testList.toSet(), testList.asReceiveChannel().toSet()) + } + + @Test + fun testEmptySequence() = runBlocking { + val channel = Channel() + channel.close() + + assertTrue(emptyList().asReceiveChannel().count() == 0) + } + + @Test + fun testEmptyMap() = runBlocking { + val channel = Channel>() + channel.close() + + assertTrue(channel.toMap().isEmpty()) + } + + @Test + fun testToMap() = runBlocking { + val values = testList.map { it to it.toString() } + assertEquals(values.toMap(), values.asReceiveChannel().toMap()) + } + + @Test + fun testReduce() = runBlocking { + assertEquals(testList.reduce { acc, e -> acc * e }, testList.asReceiveChannel().reduce { acc, e -> acc * e }) + } + + @Test + fun testTake() = runBlocking { + for (i in 0..testList.size) { + assertEquals(testList.take(i), testList.asReceiveChannel().take(i).toList()) + } + } + + @Test + fun testPartition() = runBlocking { + assertEquals(testList.partition { it % 2 == 0 }, testList.asReceiveChannel().partition { it % 2 == 0 }) + } + + @Test + fun testSendTo() = runBlocking { + val other = mutableListOf() + testList.asReceiveChannel().sendTo(other.asSendChannel()) + assertEquals(testList, other) + } + + @Test + fun testZip() = runBlocking { + val other = listOf("a", "b") + assertEquals(testList.zip(other), testList.asReceiveChannel().zip(other.asReceiveChannel()).toList()) + } +}