Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
Original file line number Diff line number Diff line change
Expand Up @@ -768,6 +768,7 @@ public final class kotlinx/coroutines/channels/ChannelsKt {
public static final fun consume (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/jvm/functions/Function1;)Ljava/lang/Object;
public static final fun consumeEach (Lkotlinx/coroutines/channels/BroadcastChannel;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun consumeEach (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun consumeTo (Lkotlinx/coroutines/channels/ReceiveChannel;Ljava/util/Collection;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun consumes (Lkotlinx/coroutines/channels/ReceiveChannel;)Lkotlin/jvm/functions/Function1;
public static final fun consumesAll ([Lkotlinx/coroutines/channels/ReceiveChannel;)Lkotlin/jvm/functions/Function1;
public static final synthetic fun count (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1081,6 +1081,7 @@ final suspend fun <#A: kotlin/Any, #B: kotlin.collections/MutableCollection<in #
final suspend fun <#A: kotlin/Any, #B: kotlinx.coroutines.channels/SendChannel<#A>> (kotlinx.coroutines.channels/ReceiveChannel<#A?>).kotlinx.coroutines.channels/filterNotNullTo(#B): #B // kotlinx.coroutines.channels/filterNotNullTo|filterNotNullTo@kotlinx.coroutines.channels.ReceiveChannel<0:0?>(0:1){0§<kotlin.Any>;1§<kotlinx.coroutines.channels.SendChannel<0:0>>}[0]
final suspend fun <#A: kotlin/Any> (kotlinx.coroutines.channels/ReceiveChannel<#A>).kotlinx.coroutines.channels/receiveOrNull(): #A? // kotlinx.coroutines.channels/receiveOrNull|receiveOrNull@kotlinx.coroutines.channels.ReceiveChannel<0:0>(){0§<kotlin.Any>}[0]
final suspend fun <#A: kotlin/Any?, #B: #A> (kotlinx.coroutines.flow/Flow<#B>).kotlinx.coroutines.flow/reduce(kotlin.coroutines/SuspendFunction2<#A, #B, #A>): #A // kotlinx.coroutines.flow/reduce|reduce@kotlinx.coroutines.flow.Flow<0:1>(kotlin.coroutines.SuspendFunction2<0:0,0:1,0:0>){0§<kotlin.Any?>;1§<0:0>}[0]
final suspend fun <#A: kotlin/Any?, #B: kotlin.collections/MutableCollection<#A>> (kotlinx.coroutines.channels/ReceiveChannel<#A>).kotlinx.coroutines.channels/consumeTo(#B): #B // kotlinx.coroutines.channels/consumeTo|consumeTo@kotlinx.coroutines.channels.ReceiveChannel<0:0>(0:1){0§<kotlin.Any?>;1§<kotlin.collections.MutableCollection<0:0>>}[0]
final suspend fun <#A: kotlin/Any?, #B: kotlin.collections/MutableCollection<in #A>> (kotlinx.coroutines.channels/ReceiveChannel<#A>).kotlinx.coroutines.channels/toCollection(#B): #B // kotlinx.coroutines.channels/toCollection|toCollection@kotlinx.coroutines.channels.ReceiveChannel<0:0>(0:1){0§<kotlin.Any?>;1§<kotlin.collections.MutableCollection<in|0:0>>}[0]
final suspend fun <#A: kotlin/Any?, #B: kotlin.collections/MutableCollection<in #A>> (kotlinx.coroutines.flow/Flow<#A>).kotlinx.coroutines.flow/toCollection(#B): #B // kotlinx.coroutines.flow/toCollection|toCollection@kotlinx.coroutines.flow.Flow<0:0>(0:1){0§<kotlin.Any?>;1§<kotlin.collections.MutableCollection<in|0:0>>}[0]
final suspend fun <#A: kotlin/Any?, #B: kotlin/Any?, #C: kotlin.collections/MutableMap<in #A, in #B>> (kotlinx.coroutines.channels/ReceiveChannel<kotlin/Pair<#A, #B>>).kotlinx.coroutines.channels/toMap(#C): #C // kotlinx.coroutines.channels/toMap|toMap@kotlinx.coroutines.channels.ReceiveChannel<kotlin.Pair<0:0,0:1>>(0:2){0§<kotlin.Any?>;1§<kotlin.Any?>;2§<kotlin.collections.MutableMap<in|0:0,in|0:1>>}[0]
Expand Down
82 changes: 75 additions & 7 deletions kotlinx-coroutines-core/common/src/channels/Channels.common.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
package kotlinx.coroutines.channels

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.consumeAsFlow
import kotlinx.coroutines.flow.toCollection
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.selects.*
import kotlin.contracts.*
import kotlin.jvm.*
Expand Down Expand Up @@ -162,9 +165,16 @@ public suspend inline fun <E> ReceiveChannel<E>.consumeEach(action: (E) -> Unit)
}

/**
* Returns a [List] containing all the elements sent to this channel, preserving their order.
* [Consumes][consume] the elements of this channel into a list, preserving their order.
*
* This function will attempt to receive elements and put them into the list until the channel is
* This is a convenience function equivalent to calling [consumeAsFlow] followed by [kotlinx.coroutines.flow.toList].
* It is useful for testing code that uses channels to observe the elements the channel contains at the end of the test.
*
* There is no way to recover channel elements if the channel gets closed with an exception
* or to apply additional transformations to the elements before building the resulting collection.
* Please use [consumeAsFlow] and [kotlinx.coroutines.flow.toCollection] for such advanced use-cases.
*
* [toList] attempts to receive elements and put them into the list until the channel is
* [closed][SendChannel.close].
* Calling [toList] on channels that are not eventually closed is always incorrect:
* - It will suspend indefinitely if the channel is not closed, but no new elements arrive.
Expand All @@ -173,8 +183,9 @@ public suspend inline fun <E> ReceiveChannel<E>.consumeEach(action: (E) -> Unit)
*
* If the channel is [closed][SendChannel.close] with a cause, [toList] will rethrow that cause.
*
* The operation is _terminal_.
* This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
* Since this function is implemented using [consume], the function is _terminal_.
* In practice, this means that if adding new elements to the list fails with an exception,
* that exception will be used for [cancelling][ReceiveChannel.cancel] the channel and rethrown.
*
* Example:
* ```
Expand All @@ -189,11 +200,68 @@ public suspend inline fun <E> ReceiveChannel<E>.consumeEach(action: (E) -> Unit)
* ```
*/
public suspend fun <E> ReceiveChannel<E>.toList(): List<E> = buildList {
consumeEach {
add(it)
}
consumeEach(::add)
}

/**
* [Consumes][consume] the elements of this channel into the provided mutable collection.
*
* This is a convenience function equivalent to calling [consumeAsFlow]
* followed by [kotlinx.coroutines.flow.toCollection].
* Please use [consumeAsFlow] directly in scenarios where elements should undergo additional transformations
* before being added to the resulting collection.
*
* [consumeTo] attempts to receive elements and put them into the collection until the channel is
* [closed][SendChannel.close].
*
* If the channel is [closed][SendChannel.close] with a cause, [consumeTo] will rethrow that cause.
* However, the elements already received up to that point will remain in the collection.
*
* Since this function is implemented using [consume], the function is _terminal_.
* In practice, this means that if adding new elements to the collection fails with an exception,
* that exception will be used for [cancelling][ReceiveChannel.cancel] the channel and rethrown.
*
* The intended use case for this function is collecting the remaining elements of a closed channel
* and processing them in a single batch.
*
* Example:
* ```
* val doContinue = AtomicBoolean(true)
*
* // Start the sender
* val channel = produce {
* var i = 0
* while (doContinue.load()) {
* send(i++)
* delay(10.milliseconds)
* if (i == 42) break
* }
* }
*
* // Start the consumer
* launch {
* // Read elements until we suddenly decide to stop
* // or until the channel is closed.
* while (Random.nextInt() % 100 != 42) {
* val nextElement = channel.receiveCatching()
* if (nextElement.isClosed) return@launch
* println("Received ${nextElement.getOrNull()}")
* }
* doContinue.store(false)
* delay(100.milliseconds)
* val remainingElements = mutableListOf<Int>()
* try {
* channel.consumeTo(remainingElements)
* } finally {
* println("Remaining elements: $remainingElements")
* }
* }
* ```
*/
public suspend fun <E, C: MutableCollection<E>> ReceiveChannel<E>.consumeTo(collection: C): C =
consumeEach(collection::add).let { collection }


@PublishedApi
internal fun ReceiveChannel<*>.cancelConsumed(cause: Throwable?) {
cancel(cause?.let {
Expand Down
36 changes: 34 additions & 2 deletions kotlinx-coroutines-core/common/test/channels/ChannelsTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,13 @@ class ChannelsTest: TestBase() {
}

@Test
fun testEmptyList() = runTest {
fun testEmptyToList() = runTest {
assertTrue(emptyList<Nothing>().asReceiveChannel().toList().isEmpty())
}

@Test
fun testToList() = runTest {
assertEquals(testList, testList.asReceiveChannel().toList())

}

@Test
Expand All @@ -104,6 +103,39 @@ class ChannelsTest: TestBase() {
}
}

@Test
fun testEmptyConsumeToWithDestination() = runTest {
val initialList = listOf(-1, -2, -3)
val destination = initialList.toMutableList()
emptyList<Nothing>().asReceiveChannel().consumeTo(destination)
assertEquals(initialList, destination)
}

@Test
fun testConsumeToWithDestination() = runTest {
val initialList = listOf(-1, -2, -3)
val destination = initialList.toMutableList()
testList.asReceiveChannel().consumeTo(destination)
assertEquals(initialList + testList, destination)
}

@Test
fun testConsumeToWithDestinationOnFailedChannel() = runTest {
val initialList = listOf(-1, -2, -3)
val destination = initialList.toMutableList()
val channel = Channel<Int>(10)
val elementsToSend = (1..5)
elementsToSend.forEach {
val result = channel.trySend(it)
assertTrue(result.isSuccess)
}
channel.close(TestException())
assertFailsWith<TestException> {
channel.consumeTo(destination)
}
assertEquals(initialList + elementsToSend, destination)
}

private fun <E> Iterable<E>.asReceiveChannel(context: CoroutineContext = Dispatchers.Unconfined): ReceiveChannel<E> =
GlobalScope.produce(context) {
for (element in this@asReceiveChannel)
Expand Down