From 038c6b74de02b8f88234287ca9a63102b535a4ea Mon Sep 17 00:00:00 2001 From: Dmitry Khalanskiy Date: Wed, 10 Sep 2025 10:14:25 +0200 Subject: [PATCH 1/4] Add ReceiveChannel.toList(destination) Use case: collecting elements up until the point the channel is closed without losing the elements when `toList` when the exception is thrown. This function is similar to `Flow.toList(destination)`, which we already have, so the addition makes sense from the point of view of consistency as well. --- .../common/src/channels/Channels.common.kt | 37 +++++++++++++++++++ .../common/test/channels/ChannelsTest.kt | 36 +++++++++++++++++- 2 files changed, 71 insertions(+), 2 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/channels/Channels.common.kt b/kotlinx-coroutines-core/common/src/channels/Channels.common.kt index 15534b08fe..818460e76b 100644 --- a/kotlinx-coroutines-core/common/src/channels/Channels.common.kt +++ b/kotlinx-coroutines-core/common/src/channels/Channels.common.kt @@ -194,6 +194,43 @@ public suspend fun ReceiveChannel.toList(): List = buildList { } } +/** + * Consumes the elements of this channel into the given [destination] mutable list. + * + * This function will attempt to receive elements and put them into the list until the channel is + * [closed][SendChannel.close]. + * Calling [toList] on channels that are not eventually closed is always incorrect: + * - It will suspend indefinitely if the channel is not closed, but no new elements arrive. + * - If new elements do arrive and the channel is not eventually closed, [toList] will use more and more memory + * until exhausting it. + * + * If the channel is [closed][SendChannel.close] with a cause, [toList] will rethrow that cause. + * However, the [destination] list is left in a consistent state containing all the elements received from the channel + * up to that point. + * + * The operation is _terminal_. + * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. + * + * Example: + * ``` + * val values = listOf(1, 5, 2, 9, 3, 3, 1) + * // start a new coroutine that creates a channel, + * // sends elements to it, and closes it + * // once the coroutine's body finishes + * val channel = produce { + * values.forEach { send(it) } + * } + * val destination = mutableListOf() + * channel.toList(destination) + * check(destination == values) + * ``` + */ +public suspend inline fun ReceiveChannel.toList(destination: MutableList) { + consumeEach { + destination.add(it) + } +} + @PublishedApi internal fun ReceiveChannel<*>.cancelConsumed(cause: Throwable?) { cancel(cause?.let { diff --git a/kotlinx-coroutines-core/common/test/channels/ChannelsTest.kt b/kotlinx-coroutines-core/common/test/channels/ChannelsTest.kt index 235609c804..83ae2f4e24 100644 --- a/kotlinx-coroutines-core/common/test/channels/ChannelsTest.kt +++ b/kotlinx-coroutines-core/common/test/channels/ChannelsTest.kt @@ -85,14 +85,13 @@ class ChannelsTest: TestBase() { } @Test - fun testEmptyList() = runTest { + fun testEmptyToList() = runTest { assertTrue(emptyList().asReceiveChannel().toList().isEmpty()) } @Test fun testToList() = runTest { assertEquals(testList, testList.asReceiveChannel().toList()) - } @Test @@ -104,6 +103,39 @@ class ChannelsTest: TestBase() { } } + @Test + fun testEmptyToListWithDestination() = runTest { + val initialList = listOf(-1, -2, -3) + val destination = initialList.toMutableList() + emptyList().asReceiveChannel().toList(destination) + assertEquals(initialList, destination) + } + + @Test + fun testToListWithDestination() = runTest { + val initialList = listOf(-1, -2, -3) + val destination = initialList.toMutableList() + testList.asReceiveChannel().toList(destination) + assertEquals(initialList + testList, destination) + } + + @Test + fun testToListWithDestinationOnFailedChannel() = runTest { + val initialList = listOf(-1, -2, -3) + val destination = initialList.toMutableList() + val channel = Channel(10) + val elementsToSend = (1..5).toList() + elementsToSend.forEach { + val result = channel.trySend(it) + assertTrue(result.isSuccess) + } + channel.close(TestException()) + assertFailsWith { + channel.toList(destination) + } + assertEquals(initialList + elementsToSend, destination) + } + private fun Iterable.asReceiveChannel(context: CoroutineContext = Dispatchers.Unconfined): ReceiveChannel = GlobalScope.produce(context) { for (element in this@asReceiveChannel) From 2d8f52b3d5f32f6222ae0c55405bda89380d782a Mon Sep 17 00:00:00 2001 From: Dmitry Khalanskiy Date: Wed, 10 Sep 2025 12:48:49 +0200 Subject: [PATCH 2/4] Keep only one overload --- .../api/kotlinx-coroutines-core.api | 4 +- .../api/kotlinx-coroutines-core.klib.api | 1 + .../common/src/channels/Channels.common.kt | 47 +++---------------- 3 files changed, 10 insertions(+), 42 deletions(-) diff --git a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api index 2a97c37f6c..015a257854 100644 --- a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api +++ b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api @@ -820,7 +820,9 @@ public final class kotlinx/coroutines/channels/ChannelsKt { public static synthetic fun takeWhile$default (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/channels/ReceiveChannel; public static final fun toChannel (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlinx/coroutines/channels/SendChannel;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun toCollection (Lkotlinx/coroutines/channels/ReceiveChannel;Ljava/util/Collection;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; - public static final fun toList (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static final fun toList (Lkotlinx/coroutines/channels/ReceiveChannel;Ljava/util/List;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static final synthetic fun toList (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static synthetic fun toList$default (Lkotlinx/coroutines/channels/ReceiveChannel;Ljava/util/List;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; public static final fun toMap (Lkotlinx/coroutines/channels/ReceiveChannel;Ljava/util/Map;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final synthetic fun toMap (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final synthetic fun toMutableList (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; diff --git a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.klib.api b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.klib.api index a839ffcfa0..3c495f4acb 100644 --- a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.klib.api +++ b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.klib.api @@ -1142,6 +1142,7 @@ final suspend fun kotlinx.coroutines/yield() // kotlinx.coroutines/yield|yield() final suspend inline fun <#A: kotlin/Any?, #B: kotlin/Any?> (kotlinx.coroutines.flow/Flow<#A>).kotlinx.coroutines.flow/fold(#B, crossinline kotlin.coroutines/SuspendFunction2<#B, #A, #B>): #B // kotlinx.coroutines.flow/fold|fold@kotlinx.coroutines.flow.Flow<0:0>(0:1;kotlin.coroutines.SuspendFunction2<0:1,0:0,0:1>){0§;1§}[0] final suspend inline fun <#A: kotlin/Any?> (kotlinx.coroutines.channels/BroadcastChannel<#A>).kotlinx.coroutines.channels/consumeEach(kotlin/Function1<#A, kotlin/Unit>) // kotlinx.coroutines.channels/consumeEach|consumeEach@kotlinx.coroutines.channels.BroadcastChannel<0:0>(kotlin.Function1<0:0,kotlin.Unit>){0§}[0] final suspend inline fun <#A: kotlin/Any?> (kotlinx.coroutines.channels/ReceiveChannel<#A>).kotlinx.coroutines.channels/consumeEach(kotlin/Function1<#A, kotlin/Unit>) // kotlinx.coroutines.channels/consumeEach|consumeEach@kotlinx.coroutines.channels.ReceiveChannel<0:0>(kotlin.Function1<0:0,kotlin.Unit>){0§}[0] +final suspend inline fun <#A: kotlin/Any?> (kotlinx.coroutines.channels/ReceiveChannel<#A>).kotlinx.coroutines.channels/toList(kotlin.collections/MutableList<#A> = ...): kotlin.collections/List<#A> // kotlinx.coroutines.channels/toList|toList@kotlinx.coroutines.channels.ReceiveChannel<0:0>(kotlin.collections.MutableList<0:0>){0§}[0] final suspend inline fun <#A: kotlin/Any?> (kotlinx.coroutines.flow/Flow<#A>).kotlinx.coroutines.flow/collect(crossinline kotlin.coroutines/SuspendFunction1<#A, kotlin/Unit>) // kotlinx.coroutines.flow/collect|collect@kotlinx.coroutines.flow.Flow<0:0>(kotlin.coroutines.SuspendFunction1<0:0,kotlin.Unit>){0§}[0] final suspend inline fun <#A: kotlin/Any?> (kotlinx.coroutines.flow/Flow<#A>).kotlinx.coroutines.flow/collectIndexed(crossinline kotlin.coroutines/SuspendFunction2) // kotlinx.coroutines.flow/collectIndexed|collectIndexed@kotlinx.coroutines.flow.Flow<0:0>(kotlin.coroutines.SuspendFunction2){0§}[0] final suspend inline fun <#A: kotlin/Any?> (kotlinx.coroutines.flow/SharedFlow<#A>).kotlinx.coroutines.flow/count(): kotlin/Int // kotlinx.coroutines.flow/count|count@kotlinx.coroutines.flow.SharedFlow<0:0>(){0§}[0] diff --git a/kotlinx-coroutines-core/common/src/channels/Channels.common.kt b/kotlinx-coroutines-core/common/src/channels/Channels.common.kt index 818460e76b..b805d88ff8 100644 --- a/kotlinx-coroutines-core/common/src/channels/Channels.common.kt +++ b/kotlinx-coroutines-core/common/src/channels/Channels.common.kt @@ -161,41 +161,9 @@ public suspend inline fun ReceiveChannel.consumeEach(action: (E) -> Unit) for (e in this) action(e) } -/** - * Returns a [List] containing all the elements sent to this channel, preserving their order. - * - * This function will attempt to receive elements and put them into the list until the channel is - * [closed][SendChannel.close]. - * Calling [toList] on channels that are not eventually closed is always incorrect: - * - It will suspend indefinitely if the channel is not closed, but no new elements arrive. - * - If new elements do arrive and the channel is not eventually closed, [toList] will use more and more memory - * until exhausting it. - * - * If the channel is [closed][SendChannel.close] with a cause, [toList] will rethrow that cause. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * Example: - * ``` - * val values = listOf(1, 5, 2, 9, 3, 3, 1) - * // start a new coroutine that creates a channel, - * // sends elements to it, and closes it - * // once the coroutine's body finishes - * val channel = produce { - * values.forEach { send(it) } - * } - * check(channel.toList() == values) - * ``` - */ -public suspend fun ReceiveChannel.toList(): List = buildList { - consumeEach { - add(it) - } -} - /** * Consumes the elements of this channel into the given [destination] mutable list. + * If none is provided, a new [ArrayList] will be created. * * This function will attempt to receive elements and put them into the list until the channel is * [closed][SendChannel.close]. @@ -220,16 +188,11 @@ public suspend fun ReceiveChannel.toList(): List = buildList { * val channel = produce { * values.forEach { send(it) } * } - * val destination = mutableListOf() - * channel.toList(destination) - * check(destination == values) + * check(channel.toList() == values) * ``` */ -public suspend inline fun ReceiveChannel.toList(destination: MutableList) { - consumeEach { - destination.add(it) - } -} +public suspend inline fun ReceiveChannel.toList(destination: MutableList = ArrayList()): List = + consumeEach(destination::add).let { destination } @PublishedApi internal fun ReceiveChannel<*>.cancelConsumed(cause: Throwable?) { @@ -238,3 +201,5 @@ internal fun ReceiveChannel<*>.cancelConsumed(cause: Throwable?) { }) } +@Deprecated("Preserving binary compatibility, was stable", level = DeprecationLevel.HIDDEN) +public suspend fun ReceiveChannel.toList(): List = toList(ArrayList()) \ No newline at end of file From 6039cf8428670b75a1319a054f3b23a6567b989c Mon Sep 17 00:00:00 2001 From: Dmitry Khalanskiy Date: Wed, 10 Sep 2025 13:03:17 +0200 Subject: [PATCH 3/4] Fixup --- .../api/kotlinx-coroutines-core.klib.api | 2 +- .../common/src/channels/Channels.common.kt | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.klib.api b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.klib.api index 3c495f4acb..c76a81d1d8 100644 --- a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.klib.api +++ b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.klib.api @@ -1104,6 +1104,7 @@ final suspend fun <#A: kotlin/Any?> (kotlinx.coroutines.channels/ReceiveChannel< final suspend fun <#A: kotlin/Any?> (kotlinx.coroutines.channels/ReceiveChannel<#A>).kotlinx.coroutines.channels/single(): #A // kotlinx.coroutines.channels/single|single@kotlinx.coroutines.channels.ReceiveChannel<0:0>(){0§}[0] final suspend fun <#A: kotlin/Any?> (kotlinx.coroutines.channels/ReceiveChannel<#A>).kotlinx.coroutines.channels/singleOrNull(): #A? // kotlinx.coroutines.channels/singleOrNull|singleOrNull@kotlinx.coroutines.channels.ReceiveChannel<0:0>(){0§}[0] final suspend fun <#A: kotlin/Any?> (kotlinx.coroutines.channels/ReceiveChannel<#A>).kotlinx.coroutines.channels/toList(): kotlin.collections/List<#A> // kotlinx.coroutines.channels/toList|toList@kotlinx.coroutines.channels.ReceiveChannel<0:0>(){0§}[0] +final suspend fun <#A: kotlin/Any?> (kotlinx.coroutines.channels/ReceiveChannel<#A>).kotlinx.coroutines.channels/toList(kotlin.collections/MutableList<#A> = ...): kotlin.collections/List<#A> // kotlinx.coroutines.channels/toList|toList@kotlinx.coroutines.channels.ReceiveChannel<0:0>(kotlin.collections.MutableList<0:0>){0§}[0] final suspend fun <#A: kotlin/Any?> (kotlinx.coroutines.channels/ReceiveChannel<#A>).kotlinx.coroutines.channels/toMutableList(): kotlin.collections/MutableList<#A> // kotlinx.coroutines.channels/toMutableList|toMutableList@kotlinx.coroutines.channels.ReceiveChannel<0:0>(){0§}[0] final suspend fun <#A: kotlin/Any?> (kotlinx.coroutines.channels/ReceiveChannel<#A>).kotlinx.coroutines.channels/toMutableSet(): kotlin.collections/MutableSet<#A> // kotlinx.coroutines.channels/toMutableSet|toMutableSet@kotlinx.coroutines.channels.ReceiveChannel<0:0>(){0§}[0] final suspend fun <#A: kotlin/Any?> (kotlinx.coroutines.channels/ReceiveChannel<#A>).kotlinx.coroutines.channels/toSet(): kotlin.collections/Set<#A> // kotlinx.coroutines.channels/toSet|toSet@kotlinx.coroutines.channels.ReceiveChannel<0:0>(){0§}[0] @@ -1142,7 +1143,6 @@ final suspend fun kotlinx.coroutines/yield() // kotlinx.coroutines/yield|yield() final suspend inline fun <#A: kotlin/Any?, #B: kotlin/Any?> (kotlinx.coroutines.flow/Flow<#A>).kotlinx.coroutines.flow/fold(#B, crossinline kotlin.coroutines/SuspendFunction2<#B, #A, #B>): #B // kotlinx.coroutines.flow/fold|fold@kotlinx.coroutines.flow.Flow<0:0>(0:1;kotlin.coroutines.SuspendFunction2<0:1,0:0,0:1>){0§;1§}[0] final suspend inline fun <#A: kotlin/Any?> (kotlinx.coroutines.channels/BroadcastChannel<#A>).kotlinx.coroutines.channels/consumeEach(kotlin/Function1<#A, kotlin/Unit>) // kotlinx.coroutines.channels/consumeEach|consumeEach@kotlinx.coroutines.channels.BroadcastChannel<0:0>(kotlin.Function1<0:0,kotlin.Unit>){0§}[0] final suspend inline fun <#A: kotlin/Any?> (kotlinx.coroutines.channels/ReceiveChannel<#A>).kotlinx.coroutines.channels/consumeEach(kotlin/Function1<#A, kotlin/Unit>) // kotlinx.coroutines.channels/consumeEach|consumeEach@kotlinx.coroutines.channels.ReceiveChannel<0:0>(kotlin.Function1<0:0,kotlin.Unit>){0§}[0] -final suspend inline fun <#A: kotlin/Any?> (kotlinx.coroutines.channels/ReceiveChannel<#A>).kotlinx.coroutines.channels/toList(kotlin.collections/MutableList<#A> = ...): kotlin.collections/List<#A> // kotlinx.coroutines.channels/toList|toList@kotlinx.coroutines.channels.ReceiveChannel<0:0>(kotlin.collections.MutableList<0:0>){0§}[0] final suspend inline fun <#A: kotlin/Any?> (kotlinx.coroutines.flow/Flow<#A>).kotlinx.coroutines.flow/collect(crossinline kotlin.coroutines/SuspendFunction1<#A, kotlin/Unit>) // kotlinx.coroutines.flow/collect|collect@kotlinx.coroutines.flow.Flow<0:0>(kotlin.coroutines.SuspendFunction1<0:0,kotlin.Unit>){0§}[0] final suspend inline fun <#A: kotlin/Any?> (kotlinx.coroutines.flow/Flow<#A>).kotlinx.coroutines.flow/collectIndexed(crossinline kotlin.coroutines/SuspendFunction2) // kotlinx.coroutines.flow/collectIndexed|collectIndexed@kotlinx.coroutines.flow.Flow<0:0>(kotlin.coroutines.SuspendFunction2){0§}[0] final suspend inline fun <#A: kotlin/Any?> (kotlinx.coroutines.flow/SharedFlow<#A>).kotlinx.coroutines.flow/count(): kotlin/Int // kotlinx.coroutines.flow/count|count@kotlinx.coroutines.flow.SharedFlow<0:0>(){0§}[0] diff --git a/kotlinx-coroutines-core/common/src/channels/Channels.common.kt b/kotlinx-coroutines-core/common/src/channels/Channels.common.kt index b805d88ff8..3bf9219be8 100644 --- a/kotlinx-coroutines-core/common/src/channels/Channels.common.kt +++ b/kotlinx-coroutines-core/common/src/channels/Channels.common.kt @@ -162,7 +162,7 @@ public suspend inline fun ReceiveChannel.consumeEach(action: (E) -> Unit) } /** - * Consumes the elements of this channel into the given [destination] mutable list. + * [Consumes][consume] the elements of this channel into the given [destination] mutable list. * If none is provided, a new [ArrayList] will be created. * * This function will attempt to receive elements and put them into the list until the channel is @@ -186,12 +186,12 @@ public suspend inline fun ReceiveChannel.consumeEach(action: (E) -> Unit) * // sends elements to it, and closes it * // once the coroutine's body finishes * val channel = produce { - * values.forEach { send(it) } + * values.forEach { send(it) } * } * check(channel.toList() == values) * ``` */ -public suspend inline fun ReceiveChannel.toList(destination: MutableList = ArrayList()): List = +public suspend fun ReceiveChannel.toList(destination: MutableList = ArrayList()): List = consumeEach(destination::add).let { destination } @PublishedApi @@ -202,4 +202,4 @@ internal fun ReceiveChannel<*>.cancelConsumed(cause: Throwable?) { } @Deprecated("Preserving binary compatibility, was stable", level = DeprecationLevel.HIDDEN) -public suspend fun ReceiveChannel.toList(): List = toList(ArrayList()) \ No newline at end of file +public suspend fun ReceiveChannel.toList(): List = toList(ArrayList()) From caed24d086c1b001e09a9aa0a92982b9927e5483 Mon Sep 17 00:00:00 2001 From: Dmitry Khalanskiy Date: Tue, 14 Oct 2025 12:36:05 +0200 Subject: [PATCH 4/4] Introduce ReceiveChannel.consumeTo --- .../api/kotlinx-coroutines-core.api | 5 +- .../api/kotlinx-coroutines-core.klib.api | 2 +- .../common/src/channels/Channels.common.kt | 88 ++++++++++++++++--- .../common/test/channels/ChannelsTest.kt | 14 +-- 4 files changed, 87 insertions(+), 22 deletions(-) diff --git a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api index 015a257854..9afc0c5ab3 100644 --- a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api +++ b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api @@ -768,6 +768,7 @@ public final class kotlinx/coroutines/channels/ChannelsKt { public static final fun consume (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/jvm/functions/Function1;)Ljava/lang/Object; public static final fun consumeEach (Lkotlinx/coroutines/channels/BroadcastChannel;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun consumeEach (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static final fun consumeTo (Lkotlinx/coroutines/channels/ReceiveChannel;Ljava/util/Collection;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun consumes (Lkotlinx/coroutines/channels/ReceiveChannel;)Lkotlin/jvm/functions/Function1; public static final fun consumesAll ([Lkotlinx/coroutines/channels/ReceiveChannel;)Lkotlin/jvm/functions/Function1; public static final synthetic fun count (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; @@ -820,9 +821,7 @@ public final class kotlinx/coroutines/channels/ChannelsKt { public static synthetic fun takeWhile$default (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/channels/ReceiveChannel; public static final fun toChannel (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlinx/coroutines/channels/SendChannel;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun toCollection (Lkotlinx/coroutines/channels/ReceiveChannel;Ljava/util/Collection;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; - public static final fun toList (Lkotlinx/coroutines/channels/ReceiveChannel;Ljava/util/List;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; - public static final synthetic fun toList (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; - public static synthetic fun toList$default (Lkotlinx/coroutines/channels/ReceiveChannel;Ljava/util/List;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; + public static final fun toList (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun toMap (Lkotlinx/coroutines/channels/ReceiveChannel;Ljava/util/Map;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final synthetic fun toMap (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final synthetic fun toMutableList (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; diff --git a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.klib.api b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.klib.api index c76a81d1d8..9387ff85e9 100644 --- a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.klib.api +++ b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.klib.api @@ -1081,6 +1081,7 @@ final suspend fun <#A: kotlin/Any, #B: kotlin.collections/MutableCollection> (kotlinx.coroutines.channels/ReceiveChannel<#A?>).kotlinx.coroutines.channels/filterNotNullTo(#B): #B // kotlinx.coroutines.channels/filterNotNullTo|filterNotNullTo@kotlinx.coroutines.channels.ReceiveChannel<0:0?>(0:1){0§;1§>}[0] final suspend fun <#A: kotlin/Any> (kotlinx.coroutines.channels/ReceiveChannel<#A>).kotlinx.coroutines.channels/receiveOrNull(): #A? // kotlinx.coroutines.channels/receiveOrNull|receiveOrNull@kotlinx.coroutines.channels.ReceiveChannel<0:0>(){0§}[0] final suspend fun <#A: kotlin/Any?, #B: #A> (kotlinx.coroutines.flow/Flow<#B>).kotlinx.coroutines.flow/reduce(kotlin.coroutines/SuspendFunction2<#A, #B, #A>): #A // kotlinx.coroutines.flow/reduce|reduce@kotlinx.coroutines.flow.Flow<0:1>(kotlin.coroutines.SuspendFunction2<0:0,0:1,0:0>){0§;1§<0:0>}[0] +final suspend fun <#A: kotlin/Any?, #B: kotlin.collections/MutableCollection<#A>> (kotlinx.coroutines.channels/ReceiveChannel<#A>).kotlinx.coroutines.channels/consumeTo(#B): #B // kotlinx.coroutines.channels/consumeTo|consumeTo@kotlinx.coroutines.channels.ReceiveChannel<0:0>(0:1){0§;1§>}[0] final suspend fun <#A: kotlin/Any?, #B: kotlin.collections/MutableCollection> (kotlinx.coroutines.channels/ReceiveChannel<#A>).kotlinx.coroutines.channels/toCollection(#B): #B // kotlinx.coroutines.channels/toCollection|toCollection@kotlinx.coroutines.channels.ReceiveChannel<0:0>(0:1){0§;1§>}[0] final suspend fun <#A: kotlin/Any?, #B: kotlin.collections/MutableCollection> (kotlinx.coroutines.flow/Flow<#A>).kotlinx.coroutines.flow/toCollection(#B): #B // kotlinx.coroutines.flow/toCollection|toCollection@kotlinx.coroutines.flow.Flow<0:0>(0:1){0§;1§>}[0] final suspend fun <#A: kotlin/Any?, #B: kotlin/Any?, #C: kotlin.collections/MutableMap> (kotlinx.coroutines.channels/ReceiveChannel>).kotlinx.coroutines.channels/toMap(#C): #C // kotlinx.coroutines.channels/toMap|toMap@kotlinx.coroutines.channels.ReceiveChannel>(0:2){0§;1§;2§>}[0] @@ -1104,7 +1105,6 @@ final suspend fun <#A: kotlin/Any?> (kotlinx.coroutines.channels/ReceiveChannel< final suspend fun <#A: kotlin/Any?> (kotlinx.coroutines.channels/ReceiveChannel<#A>).kotlinx.coroutines.channels/single(): #A // kotlinx.coroutines.channels/single|single@kotlinx.coroutines.channels.ReceiveChannel<0:0>(){0§}[0] final suspend fun <#A: kotlin/Any?> (kotlinx.coroutines.channels/ReceiveChannel<#A>).kotlinx.coroutines.channels/singleOrNull(): #A? // kotlinx.coroutines.channels/singleOrNull|singleOrNull@kotlinx.coroutines.channels.ReceiveChannel<0:0>(){0§}[0] final suspend fun <#A: kotlin/Any?> (kotlinx.coroutines.channels/ReceiveChannel<#A>).kotlinx.coroutines.channels/toList(): kotlin.collections/List<#A> // kotlinx.coroutines.channels/toList|toList@kotlinx.coroutines.channels.ReceiveChannel<0:0>(){0§}[0] -final suspend fun <#A: kotlin/Any?> (kotlinx.coroutines.channels/ReceiveChannel<#A>).kotlinx.coroutines.channels/toList(kotlin.collections/MutableList<#A> = ...): kotlin.collections/List<#A> // kotlinx.coroutines.channels/toList|toList@kotlinx.coroutines.channels.ReceiveChannel<0:0>(kotlin.collections.MutableList<0:0>){0§}[0] final suspend fun <#A: kotlin/Any?> (kotlinx.coroutines.channels/ReceiveChannel<#A>).kotlinx.coroutines.channels/toMutableList(): kotlin.collections/MutableList<#A> // kotlinx.coroutines.channels/toMutableList|toMutableList@kotlinx.coroutines.channels.ReceiveChannel<0:0>(){0§}[0] final suspend fun <#A: kotlin/Any?> (kotlinx.coroutines.channels/ReceiveChannel<#A>).kotlinx.coroutines.channels/toMutableSet(): kotlin.collections/MutableSet<#A> // kotlinx.coroutines.channels/toMutableSet|toMutableSet@kotlinx.coroutines.channels.ReceiveChannel<0:0>(){0§}[0] final suspend fun <#A: kotlin/Any?> (kotlinx.coroutines.channels/ReceiveChannel<#A>).kotlinx.coroutines.channels/toSet(): kotlin.collections/Set<#A> // kotlinx.coroutines.channels/toSet|toSet@kotlinx.coroutines.channels.ReceiveChannel<0:0>(){0§}[0] diff --git a/kotlinx-coroutines-core/common/src/channels/Channels.common.kt b/kotlinx-coroutines-core/common/src/channels/Channels.common.kt index 3bf9219be8..42ca0ba167 100644 --- a/kotlinx-coroutines-core/common/src/channels/Channels.common.kt +++ b/kotlinx-coroutines-core/common/src/channels/Channels.common.kt @@ -5,6 +5,9 @@ package kotlinx.coroutines.channels import kotlinx.coroutines.* +import kotlinx.coroutines.flow.consumeAsFlow +import kotlinx.coroutines.flow.toCollection +import kotlinx.coroutines.flow.toList import kotlinx.coroutines.selects.* import kotlin.contracts.* import kotlin.jvm.* @@ -162,10 +165,16 @@ public suspend inline fun ReceiveChannel.consumeEach(action: (E) -> Unit) } /** - * [Consumes][consume] the elements of this channel into the given [destination] mutable list. - * If none is provided, a new [ArrayList] will be created. + * [Consumes][consume] the elements of this channel into a list, preserving their order. * - * This function will attempt to receive elements and put them into the list until the channel is + * This is a convenience function equivalent to calling [consumeAsFlow] followed by [kotlinx.coroutines.flow.toList]. + * It is useful for testing code that uses channels to observe the elements the channel contains at the end of the test. + * + * There is no way to recover channel elements if the channel gets closed with an exception + * or to apply additional transformations to the elements before building the resulting collection. + * Please use [consumeAsFlow] and [kotlinx.coroutines.flow.toCollection] for such advanced use-cases. + * + * [toList] attempts to receive elements and put them into the list until the channel is * [closed][SendChannel.close]. * Calling [toList] on channels that are not eventually closed is always incorrect: * - It will suspend indefinitely if the channel is not closed, but no new elements arrive. @@ -173,11 +182,10 @@ public suspend inline fun ReceiveChannel.consumeEach(action: (E) -> Unit) * until exhausting it. * * If the channel is [closed][SendChannel.close] with a cause, [toList] will rethrow that cause. - * However, the [destination] list is left in a consistent state containing all the elements received from the channel - * up to that point. * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. + * Since this function is implemented using [consume], the function is _terminal_. + * In practice, this means that if adding new elements to the list fails with an exception, + * that exception will be used for [cancelling][ReceiveChannel.cancel] the channel and rethrown. * * Example: * ``` @@ -191,8 +199,68 @@ public suspend inline fun ReceiveChannel.consumeEach(action: (E) -> Unit) * check(channel.toList() == values) * ``` */ -public suspend fun ReceiveChannel.toList(destination: MutableList = ArrayList()): List = - consumeEach(destination::add).let { destination } +public suspend fun ReceiveChannel.toList(): List = buildList { + consumeEach(::add) +} + +/** + * [Consumes][consume] the elements of this channel into the provided mutable collection. + * + * This is a convenience function equivalent to calling [consumeAsFlow] + * followed by [kotlinx.coroutines.flow.toCollection]. + * Please use [consumeAsFlow] directly in scenarios where elements should undergo additional transformations + * before being added to the resulting collection. + * + * [consumeTo] attempts to receive elements and put them into the collection until the channel is + * [closed][SendChannel.close]. + * + * If the channel is [closed][SendChannel.close] with a cause, [consumeTo] will rethrow that cause. + * However, the elements already received up to that point will remain in the collection. + * + * Since this function is implemented using [consume], the function is _terminal_. + * In practice, this means that if adding new elements to the collection fails with an exception, + * that exception will be used for [cancelling][ReceiveChannel.cancel] the channel and rethrown. + * + * The intended use case for this function is collecting the remaining elements of a closed channel + * and processing them in a single batch. + * + * Example: + * ``` + * val doContinue = AtomicBoolean(true) + * + * // Start the sender + * val channel = produce { + * var i = 0 + * while (doContinue.load()) { + * send(i++) + * delay(10.milliseconds) + * if (i == 42) break + * } + * } + * + * // Start the consumer + * launch { + * // Read elements until we suddenly decide to stop + * // or until the channel is closed. + * while (Random.nextInt() % 100 != 42) { + * val nextElement = channel.receiveCatching() + * if (nextElement.isClosed) return@launch + * println("Received ${nextElement.getOrNull()}") + * } + * doContinue.store(false) + * delay(100.milliseconds) + * val remainingElements = mutableListOf() + * try { + * channel.consumeTo(remainingElements) + * } finally { + * println("Remaining elements: $remainingElements") + * } + * } + * ``` + */ +public suspend fun > ReceiveChannel.consumeTo(collection: C): C = + consumeEach(collection::add).let { collection } + @PublishedApi internal fun ReceiveChannel<*>.cancelConsumed(cause: Throwable?) { @@ -201,5 +269,3 @@ internal fun ReceiveChannel<*>.cancelConsumed(cause: Throwable?) { }) } -@Deprecated("Preserving binary compatibility, was stable", level = DeprecationLevel.HIDDEN) -public suspend fun ReceiveChannel.toList(): List = toList(ArrayList()) diff --git a/kotlinx-coroutines-core/common/test/channels/ChannelsTest.kt b/kotlinx-coroutines-core/common/test/channels/ChannelsTest.kt index 83ae2f4e24..ce25bec4ad 100644 --- a/kotlinx-coroutines-core/common/test/channels/ChannelsTest.kt +++ b/kotlinx-coroutines-core/common/test/channels/ChannelsTest.kt @@ -104,34 +104,34 @@ class ChannelsTest: TestBase() { } @Test - fun testEmptyToListWithDestination() = runTest { + fun testEmptyConsumeToWithDestination() = runTest { val initialList = listOf(-1, -2, -3) val destination = initialList.toMutableList() - emptyList().asReceiveChannel().toList(destination) + emptyList().asReceiveChannel().consumeTo(destination) assertEquals(initialList, destination) } @Test - fun testToListWithDestination() = runTest { + fun testConsumeToWithDestination() = runTest { val initialList = listOf(-1, -2, -3) val destination = initialList.toMutableList() - testList.asReceiveChannel().toList(destination) + testList.asReceiveChannel().consumeTo(destination) assertEquals(initialList + testList, destination) } @Test - fun testToListWithDestinationOnFailedChannel() = runTest { + fun testConsumeToWithDestinationOnFailedChannel() = runTest { val initialList = listOf(-1, -2, -3) val destination = initialList.toMutableList() val channel = Channel(10) - val elementsToSend = (1..5).toList() + val elementsToSend = (1..5) elementsToSend.forEach { val result = channel.trySend(it) assertTrue(result.isSuccess) } channel.close(TestException()) assertFailsWith { - channel.toList(destination) + channel.consumeTo(destination) } assertEquals(initialList + elementsToSend, destination) }