Skip to content

Commit 3527031

Browse files
committed
Fix a flaky check and a TSan violation in StateFlowStressTest.
There's no guarantee that any particular collector will ever encounter a value emitted by a particular emitter. It's entirely possible for the value to be overwritten by a different emitter before the collector gets a chance to collect it. It's very unlikely for a collector to miss the second half of all values emitted by a particular emitter, but it is still possible and this causes the test to be flaky. We can instead check if the collector has collected a recent enough value from *any* emitter. This should be sufficient to verify that the collector was still running near the end of the test. Also fixed a race condition in the test when printing test progress. The race condition is benign, but it causes TSan to fail for the test, which could prevent it from finding other concurrency bugs.
1 parent 66d291b commit 3527031

File tree

1 file changed

+16
-14
lines changed

1 file changed

+16
-14
lines changed

kotlinx-coroutines-core/jvm/test/flow/StateFlowStressTest.kt

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
package kotlinx.coroutines.flow
22

3-
import kotlinx.coroutines.testing.*
3+
import java.util.concurrent.atomic.AtomicLongArray
4+
import kotlin.random.*
45
import kotlinx.coroutines.*
6+
import kotlinx.coroutines.testing.*
57
import org.junit.*
6-
import kotlin.random.*
78

89
class StateFlowStressTest : TestBase() {
910
private val nSeconds = 3 * stressTestMultiplier
@@ -17,7 +18,7 @@ class StateFlowStressTest : TestBase() {
1718

1819
fun stress(nEmitters: Int, nCollectors: Int) = runTest {
1920
pool = newFixedThreadPoolContext(nEmitters + nCollectors, "StateFlowStressTest")
20-
val collected = Array(nCollectors) { LongArray(nEmitters) }
21+
val collected = Array(nCollectors) { AtomicLongArray(nEmitters) }
2122
val collectors = launch {
2223
repeat(nCollectors) { collector ->
2324
launch(pool) {
@@ -37,38 +38,39 @@ class StateFlowStressTest : TestBase() {
3738
}
3839
c[emitter] = current
3940

40-
}.take(batchSize).map { 1 }.sum()
41+
}.take(batchSize).count()
4142
} while (cnt == batchSize)
4243
}
4344
}
4445
}
45-
val emitted = LongArray(nEmitters)
46+
val emitted = AtomicLongArray(nEmitters)
4647
val emitters = launch {
4748
repeat(nEmitters) { emitter ->
4849
launch(pool) {
49-
var current = 1L
5050
while (true) {
51-
state.value = current * nEmitters + emitter
52-
emitted[emitter] = current
53-
current++
54-
if (current % 1000 == 0L) yield() // make it cancellable
51+
state.value = emitted.incrementAndGet(emitter) * nEmitters + emitter
52+
if (emitted[emitter] % 1000 == 0L) yield() // make it cancellable
5553
}
5654
}
5755
}
5856
}
5957
for (second in 1..nSeconds) {
6058
delay(1000)
6159
val cs = collected.map { it.sum() }
62-
println("$second: emitted=${emitted.sum()}, collected=${cs.minOrNull()}..${cs.maxOrNull()}")
60+
println("$second: emitted=${emitted.sum()}, collected=${cs.min()}..${cs.max()}")
6361
}
6462
emitters.cancelAndJoin()
6563
collectors.cancelAndJoin()
6664
// make sure nothing hanged up
67-
require(collected.all { c ->
68-
c.withIndex().all { (emitter, current) -> current > emitted[emitter] / 2 }
69-
})
65+
for (i in 0..<nCollectors) {
66+
check((0..<nEmitters).any { j -> collected[i][j] > emitted[j] * 0.9 }) {
67+
"collector #$i failed to collect any of the most recently emitted values"
68+
}
69+
}
7070
}
7171

72+
private fun AtomicLongArray.sum() = (0..<length()).sumOf(::get)
73+
7274
@Test
7375
fun testSingleEmitterAndCollector() = stress(1, 1)
7476

0 commit comments

Comments
 (0)