Skip to content

Commit 968ce36

Browse files
author
Stijn
committed
Test the output value to validate backpressure
1 parent 2331257 commit 968ce36

File tree

1 file changed

+92
-7
lines changed

1 file changed

+92
-7
lines changed

sample/Async/AsyncSequenceIntegrationTests.swift

Lines changed: 92 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@
77

88
import XCTest
99
import KMPNativeCoroutinesAsync
10-
import NativeCoroutinesSampleShared
10+
import KMPNativeCoroutinesCore
11+
@preconcurrency import NativeCoroutinesSampleShared
1112

1213
class AsyncSequenceIntegrationTests: XCTestCase {
1314

@@ -33,26 +34,110 @@ class AsyncSequenceIntegrationTests: XCTestCase {
3334
await assertJobCompleted(integrationTests)
3435
}
3536

36-
func testValueBackPressure() async {
37+
func testValueBackPressure() async throws {
3738
let integrationTests = FlowIntegrationTests()
3839
let sendValueCount: Int32 = 10
3940
let sequence = asyncSequence(for: integrationTests.getFlow(count: sendValueCount, delay: 100))
4041
do {
4142
var receivedValueCount: Int32 = 0
42-
for try await _ in sequence {
43-
let emittedCount = integrationTests.emittedCount
43+
// This will emit a value and return on the same queue which is the expected behaviour of async streams
44+
// Old implementation returned on cooporative queue making the consumption of the sample not block the emit. This is not expected for async streams.
45+
var result: [String] = []
46+
for try await value in sequence {
4447
// Note the AsyncSequence buffers at most a single item
45-
XCTAssert(emittedCount == receivedValueCount || emittedCount == receivedValueCount + 1, "Back pressure isn't applied")
46-
delay(0.2)
48+
result.append("\(value)")
49+
delay(0.2) // This effectively blocks the emit as is expected for async streams. Due to the build in buffer policy of AysyncThrowing stream by default nothing is lost
4750
receivedValueCount += 1
4851
}
49-
XCTAssertEqual(receivedValueCount, sendValueCount, "Should have received all values")
52+
XCTAssertEqual(result, [
53+
"0",
54+
"1",
55+
"2",
56+
"3",
57+
"4",
58+
"5",
59+
"6",
60+
"7",
61+
"8",
62+
"9"
63+
], """
64+
Should have received all values. You might notice that the middle, the emittedCount skips a beat.
65+
This is normal als the delay here block the same caller queue until it is ready to receive another value.
66+
When it is ready the AsyncThrowingStream emits a value from its, by default infinate, buffer. Hence effectively
67+
allowing for all values to be passed into swift. This might not be abovious from just testing the emit count as that one skips a beat.
68+
But in this case is the expected behaviour if you align with how streams behave in swift structured concurrency world.
69+
""")
5070
} catch {
5171
XCTFail("Sequence should complete without an error")
5272
}
5373
await assertJobCompleted(integrationTests)
5474
}
5575

76+
func testValueBackPressureDelibaratlyLoosesValues() async throws {
77+
let integrationTests = FlowIntegrationTests()
78+
let sendValueCount: Int32 = 10
79+
// Delibarately disable buffer policy, sometimes wanted in real time data processing or when processing audio streams that have to be real time.
80+
// In that case you would want buffers to be dropped when consumption is too slow. With AsyncThrowingStream you have that behaviour as an option
81+
// with the older implementation `NativeFlowAsyncSequence` the result is returned on a cooparative queue making it async to the sample stream. Which
82+
// you might not want.
83+
let sequence = asyncSequence(for: integrationTests.getFlow(count: sendValueCount, delay: 100), bufferingPolicy: .bufferingNewest(0))
84+
do {
85+
// This will emit a value and return on the same queue which is the expected behaviour of async streams
86+
// Old implementation returned on cooporative queue making the consumption of the sample not block the emit. This is not expected for async streams.
87+
var result: [String] = []
88+
for try await value in sequence {
89+
// Note the AsyncSequence buffers at most a single item
90+
result.append("\(value)")
91+
delay(0.2) // This effectively blocks the emit as is expected for async streams. Due to the build in buffer policy of AysyncThrowing stream by default nothing is lost
92+
}
93+
94+
XCTAssertTrue(result.count < 10, """
95+
This test shows that you have the option to force drops of items if consumption is too slow.
96+
The values dropped are a bit random so a test with exact values is not possible. But it should be less then
97+
10 values or the buffer is still active.
98+
""")
99+
} catch {
100+
XCTFail("Sequence should complete without an error")
101+
}
102+
await assertJobCompleted(integrationTests)
103+
}
104+
105+
func testValueBackPressureWhenConsumptionIsAsync() async throws {
106+
let integrationTests = FlowIntegrationTests()
107+
let sendValueCount: Int32 = 10
108+
109+
let sequence = asyncSequence(for: integrationTests.getFlow(count: sendValueCount, delay: 100), bufferingPolicy: .bufferingNewest(0))
110+
111+
do {
112+
var result: [String] = []
113+
for try await value in sequence {
114+
try await Task{
115+
result.append("\(value)")
116+
try await Task.sleep(nanoseconds: 200)
117+
}.value
118+
}
119+
120+
XCTAssertEqual(result, [
121+
"0",
122+
"1",
123+
"2",
124+
"3",
125+
"4",
126+
"5",
127+
"6",
128+
"7",
129+
"8",
130+
"9"], """
131+
This will now correctly count for emit and not use the buffer policy but still have all the elements as
132+
consumption is async.
133+
""")
134+
} catch {
135+
XCTFail("Sequence should complete without an error")
136+
}
137+
await assertJobCompleted(integrationTests)
138+
}
139+
140+
56141
func testNilValueReceived() async {
57142
let integrationTests = FlowIntegrationTests()
58143
let sendValueCount = randomInt(min: 5, max: 20)

0 commit comments

Comments
 (0)