Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ All notable changes to this project will be documented in this file.
* Provides `Infallible` versions of `combineLatest` without `resultSelector` requirement.
* Provides `Infallible` versions of `CombineLatest+Collection` helpers.
* Explicitly declare `APPLICATION_EXTENSION_API_ONLY` for CocoaPods
* Add a new parameter `detached: Bool = false` to the `AsyncSequence.asObservable` function to allow iterating over the `AsyncSequence` on a background thread.

## 6.5.0

Expand Down
10 changes: 7 additions & 3 deletions RxSwift/Observable+Concurrency.swift
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ public extension AsyncSequence {
/// values of the asynchronous sequence's type
///
/// - returns: An `Observable` of the async sequence's type
func asObservable() -> Observable<Element> {
func asObservable(detached: Bool = false) -> Observable<Element> {
Observable.create { observer in
let task = Task {
let taskBlock: @Sendable () async -> Void = {
do {
for try await value in self {
observer.onNext(value)
Expand All @@ -73,7 +73,11 @@ public extension AsyncSequence {
observer.onError(error)
}
}


let task: Task<Void, Never> = detached
? Task.detached(operation: taskBlock)
: Task(operation: taskBlock)

return Disposables.create { task.cancel() }
}
}
Expand Down
96 changes: 96 additions & 0 deletions Tests/RxSwiftTests/Observable+ConcurrencyTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -90,5 +90,101 @@ extension ObservableConcurrencyTests {
task.cancel()
}

func testAsyncSequenceToObservableRunsOnBackgroundThread() async throws {

let asyncSequence = AsyncStream<Int> { continuation in
for i in 1...5 {
continuation.yield(i)
}
continuation.finish()
}

let expectation = XCTestExpectation(description: "Observable completes")

DispatchQueue.main.async {
let observable = asyncSequence.asObservable(detached: true)

var threadIsNotMain = false
var values = [Int]()

_ = observable.subscribe(
onNext: { value in
values.append(value)
threadIsNotMain = !Thread.isMainThread
},
onCompleted: {
XCTAssertEqual(values, [1, 2, 3, 4, 5])
XCTAssertTrue(threadIsNotMain, "AsyncSequence.asObservable should not run on main thread")
expectation.fulfill()
}
)
}

await fulfillment(of: [expectation], timeout: 5.0)
}

func testAsyncSequenceToObservableWithSleep() async throws {
let asyncSequence = AsyncStream<Int> { continuation in
Task {
for i in 1...3 {
try? await Task.sleep(nanoseconds: 100_000_000)
continuation.yield(i)
}
continuation.finish()
}
}

let expectation = XCTestExpectation(description: "Observable with sleep completes")

DispatchQueue.main.async {
let startTime = Date()
var values = [Int]()
var executionThreads = Set<String>()

_ = asyncSequence.asObservable(detached: true).subscribe(
onNext: { value in
values.append(value)
let threadName = Thread.current.description
executionThreads.insert(threadName)
},
onCompleted: {
let duration = Date().timeIntervalSince(startTime)
XCTAssertGreaterThanOrEqual(duration, 0.3)
XCTAssertEqual(values, [1, 2, 3])
XCTAssertFalse(executionThreads.contains(where: { $0.contains("main") }))

expectation.fulfill()
}
)
}

await fulfillment(of: [expectation], timeout: 5.0)
}

func testAsyncSequenceToObservableWithError() async throws {
struct TestError: Error {}

let asyncSequence = AsyncThrowingStream<Int, Error> { continuation in
for i in 1...3 {
continuation.yield(i)
}
continuation.finish(throwing: TestError())
}

let expectation = XCTestExpectation(description: "Observable with error completes")
var receivedError: Error?

_ = asyncSequence.asObservable(detached: true).subscribe(
onNext: { _ in },
onError: { error in
receivedError = error
expectation.fulfill()
}
)

await fulfillment(of: [expectation], timeout: 5.0)
XCTAssertTrue(receivedError is TestError)
}

}
#endif