From f563389d0b5041dfe474f176e712552a5876d3fa Mon Sep 17 00:00:00 2001 From: haeseoklee Date: Thu, 21 May 2026 00:03:10 +0900 Subject: [PATCH 1/5] Add async stream interfaces --- RIBs.xcodeproj/project.pbxproj | 20 +++ .../Concurrency/Lifecycle+AsyncSequence.swift | 47 ++++++ .../Observable+AsyncSequence.swift | 70 +++++++++ RIBs/Classes/Workflow/Workflow.swift | 33 +++++ RIBsTests/AsyncStreamTests.swift | 140 ++++++++++++++++++ 5 files changed, 310 insertions(+) create mode 100644 RIBs/Classes/Concurrency/Lifecycle+AsyncSequence.swift create mode 100644 RIBs/Classes/Concurrency/Observable+AsyncSequence.swift create mode 100644 RIBsTests/AsyncStreamTests.swift diff --git a/RIBs.xcodeproj/project.pbxproj b/RIBs.xcodeproj/project.pbxproj index 049e88a..6974f4a 100644 --- a/RIBs.xcodeproj/project.pbxproj +++ b/RIBs.xcodeproj/project.pbxproj @@ -33,6 +33,9 @@ AFB7D4031FC81C8F00045D2B /* Foundation+ExtensionsTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = AF82F6731FC81B5F006DF7BC /* Foundation+ExtensionsTests.swift */; }; AFB7D4051FC81D6100045D2B /* LaunchRouterTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = AFB7D4041FC81D6100045D2B /* LaunchRouterTests.swift */; }; BF5FC0F122808377004235F1 /* RxRelay.framework in Frameworks */ = {isa = PBXBuildFile; fileRef = BF5FC0F022808377004235F1 /* RxRelay.framework */; }; + C0A100012B10000100A10001 /* Observable+AsyncSequence.swift in Sources */ = {isa = PBXBuildFile; fileRef = C0A100032B10000100A10001 /* Observable+AsyncSequence.swift */; }; + C0A100022B10000100A10001 /* Lifecycle+AsyncSequence.swift in Sources */ = {isa = PBXBuildFile; fileRef = C0A100042B10000100A10001 /* Lifecycle+AsyncSequence.swift */; }; + C0A100062B10000100A10001 /* AsyncStreamTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = C0A100052B10000100A10001 /* AsyncStreamTests.swift */; }; /* End PBXBuildFile section */ /* Begin PBXContainerItemProxy section */ @@ -75,6 +78,9 @@ AF9966B11FC40D7E00CAEAA2 /* RxSwift.framework */ = {isa = PBXFileReference; lastKnownFileType = wrapper.framework; name = RxSwift.framework; path = ../Carthage/Build/iOS/RxSwift.framework; sourceTree = ""; }; AFB7D4041FC81D6100045D2B /* LaunchRouterTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = LaunchRouterTests.swift; sourceTree = ""; }; BF5FC0F022808377004235F1 /* RxRelay.framework */ = {isa = PBXFileReference; lastKnownFileType = wrapper.framework; name = RxRelay.framework; path = ../Carthage/Build/iOS/RxRelay.framework; sourceTree = ""; }; + C0A100032B10000100A10001 /* Observable+AsyncSequence.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "Observable+AsyncSequence.swift"; sourceTree = ""; }; + C0A100042B10000100A10001 /* Lifecycle+AsyncSequence.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "Lifecycle+AsyncSequence.swift"; sourceTree = ""; }; + C0A100052B10000100A10001 /* AsyncStreamTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = AsyncStreamTests.swift; sourceTree = ""; }; E8E789432378AD000043E59E /* Package.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; name = Package.swift; path = ../Package.swift; sourceTree = SOURCE_ROOT; }; /* End PBXFileReference section */ @@ -110,6 +116,7 @@ 413177271F8EEFEF005F08F0 /* Router.swift */, 413177241F8EEFEF005F08F0 /* ViewableRouter.swift */, 413177281F8EEFEF005F08F0 /* ViewControllable.swift */, + C0A100002B10000100A10001 /* Concurrency */, 418C17551F97DB0E003C03F7 /* DI */, 4131773B1F8EF981005F08F0 /* Extensions */, 413177361F8EF70A005F08F0 /* LeakDetector */, @@ -119,6 +126,15 @@ path = Classes; sourceTree = ""; }; + C0A100002B10000100A10001 /* Concurrency */ = { + isa = PBXGroup; + children = ( + C0A100042B10000100A10001 /* Lifecycle+AsyncSequence.swift */, + C0A100032B10000100A10001 /* Observable+AsyncSequence.swift */, + ); + path = Concurrency; + sourceTree = ""; + }; 413177201F8EEFEF005F08F0 /* Workflow */ = { isa = PBXGroup; children = ( @@ -195,6 +211,7 @@ isa = PBXGroup; children = ( 8B9882F21F86E1CF00ABE009 /* Info.plist */, + C0A100052B10000100A10001 /* AsyncStreamTests.swift */, AFB7D4041FC81D6100045D2B /* LaunchRouterTests.swift */, AF90B40B1FBA157700920384 /* Mocks.swift */, AF90B4091FBA14DB00920384 /* RouterTests.swift */, @@ -382,6 +399,7 @@ isa = PBXSourcesBuildPhase; buildActionMask = 2147483647; files = ( + C0A100022B10000100A10001 /* Lifecycle+AsyncSequence.swift in Sources */, 4131773A1F8EF711005F08F0 /* LeakDetector.swift in Sources */, 418C175C1F97F19F003C03F7 /* Component.swift in Sources */, 4131773D1F8EF98A005F08F0 /* Foundation+Extensions.swift in Sources */, @@ -392,6 +410,7 @@ 4131772D1F8EF5FF005F08F0 /* Interactor.swift in Sources */, 413177321F8EF5FF005F08F0 /* ViewableRouter.swift in Sources */, 413177351F8EF605005F08F0 /* Workflow.swift in Sources */, + C0A100012B10000100A10001 /* Observable+AsyncSequence.swift in Sources */, 4131772F1F8EF5FF005F08F0 /* PresentableInteractor.swift in Sources */, 4131772C1F8EF5FF005F08F0 /* Builder.swift in Sources */, 413177331F8EF5FF005F08F0 /* ViewControllable.swift in Sources */, @@ -404,6 +423,7 @@ isa = PBXSourcesBuildPhase; buildActionMask = 2147483647; files = ( + C0A100062B10000100A10001 /* AsyncStreamTests.swift in Sources */, AFB7D4031FC81C8F00045D2B /* Foundation+ExtensionsTests.swift in Sources */, AF90B4111FBA185E00920384 /* ComponentTests.swift in Sources */, AFB7D4051FC81D6100045D2B /* LaunchRouterTests.swift in Sources */, diff --git a/RIBs/Classes/Concurrency/Lifecycle+AsyncSequence.swift b/RIBs/Classes/Concurrency/Lifecycle+AsyncSequence.swift new file mode 100644 index 0000000..e3b17bb --- /dev/null +++ b/RIBs/Classes/Concurrency/Lifecycle+AsyncSequence.swift @@ -0,0 +1,47 @@ +// +// Copyright (c) 2017. Uber Technologies +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +public extension InteractorScope { + + /// The lifecycle of this interactor exposed as an async stream. + var isActiveSequence: AsyncStream { + return isActiveStream.asAsyncStream(bufferingPolicy: .bufferingNewest(1)) + } +} + +public extension RouterScope { + + /// The lifecycle events of this router exposed as an async stream. + var lifecycleSequence: AsyncStream { + return lifecycle.asAsyncStream() + } +} + +public extension Working { + + /// The lifecycle of this worker exposed as an async stream. + var isStartedSequence: AsyncStream { + return isStartedStream.asAsyncStream(bufferingPolicy: .bufferingNewest(1)) + } +} + +public extension LeakDetector { + + /// The leak detection status exposed as an async stream. + var statusSequence: AsyncStream { + return status.asAsyncStream(bufferingPolicy: .bufferingNewest(1)) + } +} diff --git a/RIBs/Classes/Concurrency/Observable+AsyncSequence.swift b/RIBs/Classes/Concurrency/Observable+AsyncSequence.swift new file mode 100644 index 0000000..a975290 --- /dev/null +++ b/RIBs/Classes/Concurrency/Observable+AsyncSequence.swift @@ -0,0 +1,70 @@ +// +// Copyright (c) 2017. Uber Technologies +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +import RxSwift + +public extension ObservableType { + + /// Convert this observable into an async stream. + /// + /// The underlying Rx subscription is disposed when the async stream is terminated. + func asAsyncStream( + bufferingPolicy: AsyncStream.Continuation.BufferingPolicy = .unbounded + ) -> AsyncStream { + return AsyncStream(Element.self, bufferingPolicy: bufferingPolicy) { continuation in + let disposable = subscribe( + onNext: { element in + continuation.yield(element) + }, + onError: { _ in + continuation.finish() + }, + onCompleted: { + continuation.finish() + } + ) + + continuation.onTermination = { _ in + disposable.dispose() + } + } + } + + /// Convert this observable into an async throwing stream. + /// + /// The underlying Rx subscription is disposed when the async stream is terminated. + func asAsyncThrowingStream( + bufferingPolicy: AsyncThrowingStream.Continuation.BufferingPolicy = .unbounded + ) -> AsyncThrowingStream { + return AsyncThrowingStream(Element.self, bufferingPolicy: bufferingPolicy) { continuation in + let disposable = subscribe( + onNext: { element in + continuation.yield(element) + }, + onError: { error in + continuation.finish(throwing: error) + }, + onCompleted: { + continuation.finish() + } + ) + + continuation.onTermination = { _ in + disposable.dispose() + } + } + } +} diff --git a/RIBs/Classes/Workflow/Workflow.swift b/RIBs/Classes/Workflow/Workflow.swift index 472ffa3..5149762 100644 --- a/RIBs/Classes/Workflow/Workflow.swift +++ b/RIBs/Classes/Workflow/Workflow.swift @@ -146,6 +146,30 @@ open class Step { return Step(workflow: workflow, observable: confinedNextStep) } + /// Executes the given async closure for this step. + /// + /// - parameter onStep: The async closure to execute for the `Step`. + /// - returns: The next step. + public final func onAsyncStep(_ onStep: @escaping (ActionableItemType, ValueType) async throws -> (NextActionableItemType, NextValueType)) -> Step { + return self.onStep { actionableItem, value in + Observable.create { observer in + let task = Task { + do { + let result = try await onStep(actionableItem, value) + observer.onNext(result) + observer.onCompleted() + } catch { + observer.onError(error) + } + } + + return Disposables.create { + task.cancel() + } + } + } + } + /// Executes the given closure when the `Step` produces an error. /// /// - parameter onError: The closure to execute when an error occurs. @@ -175,6 +199,15 @@ open class Step { public final func asObservable() -> Observable<(ActionableItemType, ValueType)> { return observable } + + /// Convert the `Workflow` step into an async throwing sequence. + /// + /// - returns: The async sequence representation of this `Workflow` step. + public final func asAsyncSequence( + bufferingPolicy: AsyncThrowingStream<(ActionableItemType, ValueType), Error>.Continuation.BufferingPolicy = .unbounded + ) -> AsyncThrowingStream<(ActionableItemType, ValueType), Error> { + return observable.asAsyncThrowingStream(bufferingPolicy: bufferingPolicy) + } } /// `Workflow` related obervable extensions. diff --git a/RIBsTests/AsyncStreamTests.swift b/RIBsTests/AsyncStreamTests.swift new file mode 100644 index 0000000..bea6bff --- /dev/null +++ b/RIBsTests/AsyncStreamTests.swift @@ -0,0 +1,140 @@ +// +// Copyright (c) 2017. Uber Technologies +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +import RxSwift +import XCTest +@testable import RIBs + +final class AsyncStreamTests: XCTestCase { + + func test_observableAsAsyncStream_emitsValuesAndCompletes() async { + var iterator = Observable.from([1, 2]).asAsyncStream().makeAsyncIterator() + + let firstValue = await iterator.next() + let secondValue = await iterator.next() + let completedValue = await iterator.next() + + XCTAssertEqual(firstValue, 1) + XCTAssertEqual(secondValue, 2) + XCTAssertNil(completedValue) + } + + func test_observableAsAsyncThrowingStream_throwsOnError() async { + var iterator = Observable + .error(AsyncStreamTestError.error) + .asAsyncThrowingStream() + .makeAsyncIterator() + + do { + _ = try await iterator.next() + XCTFail("Expected async sequence to throw") + } catch AsyncStreamTestError.error { + // Expected. + } catch { + XCTFail("Unexpected error: \(error)") + } + } + + func test_interactorIsActiveSequence_emitsCurrentValueAndChanges() async { + let interactor = Interactor() + var iterator = interactor.isActiveSequence.makeAsyncIterator() + + let initialValue = await iterator.next() + interactor.activate() + let activeValue = await iterator.next() + interactor.deactivate() + let inactiveValue = await iterator.next() + + XCTAssertEqual(initialValue, false) + XCTAssertEqual(activeValue, true) + XCTAssertEqual(inactiveValue, false) + } + + func test_workerIsStartedSequence_emitsCurrentValueAndChanges() async { + let interactor = Interactor() + let worker = Worker() + var iterator = worker.isStartedSequence.makeAsyncIterator() + + let initialValue = await iterator.next() + interactor.activate() + worker.start(interactor) + let startedValue = await iterator.next() + worker.stop() + let stoppedValue = await iterator.next() + + XCTAssertEqual(initialValue, false) + XCTAssertEqual(startedValue, true) + XCTAssertEqual(stoppedValue, false) + } + + func test_routerLifecycleSequence_emitsDidLoad() async { + let router = Router(interactor: Interactor()) + var iterator = router.lifecycleSequence.makeAsyncIterator() + let lifecycleTask = Task { + await iterator.next() + } + + router.load() + let lifecycle = await lifecycleTask.value + + XCTAssertEqual(lifecycle, .didLoad) + } + + func test_stepAsAsyncSequence_emitsStepOutput() async throws { + let workflow = Workflow() + let step = workflow + .onStep { actionableItem in + Observable.just((actionableItem.count, actionableItem)) + } + let sequence = step.asAsyncSequence() + let stepTask = Task { () -> (Int, String)? in + var iterator = sequence.makeAsyncIterator() + return try await iterator.next() + } + + _ = workflow.subscribe("test") + let value = try await stepTask.value + + XCTAssertEqual(value?.0, 4) + XCTAssertEqual(value?.1, "test") + } + + func test_onAsyncStep_emitsAsyncResult() async throws { + let workflow = Workflow() + let step = workflow + .onStep { actionableItem in + Observable.just((actionableItem, actionableItem)) + } + .onAsyncStep { actionableItem, value in + return (actionableItem + 1, value + 2) + } + let sequence = step.asAsyncSequence() + let stepTask = Task { () -> (Int, Int)? in + var iterator = sequence.makeAsyncIterator() + return try await iterator.next() + } + + _ = workflow.subscribe(1) + let value = try await stepTask.value + + XCTAssertEqual(value?.0, 2) + XCTAssertEqual(value?.1, 3) + } +} + +private enum AsyncStreamTestError: Error { + case error +} From 19fb4a5fd7c35c5852e0ba3527c09ed545fb1ff4 Mon Sep 17 00:00:00 2001 From: haeseoklee Date: Fri, 22 May 2026 01:19:06 +0900 Subject: [PATCH 2/5] Add async lifecycle helpers --- Package.resolved | 18 +++ Package.swift | 4 +- .../Concurrency/AsyncSequence+RIBs.swift | 73 +++++++++ RIBs/Classes/Concurrency/Task+RIBs.swift | 130 ++++++++++++++++ RIBs/Classes/Workflow/Workflow.swift | 12 ++ RIBsTests/ConcurrencyTests.swift | 142 ++++++++++++++++++ 6 files changed, 378 insertions(+), 1 deletion(-) create mode 100644 RIBs/Classes/Concurrency/AsyncSequence+RIBs.swift create mode 100644 RIBs/Classes/Concurrency/Task+RIBs.swift create mode 100644 RIBsTests/ConcurrencyTests.swift diff --git a/Package.resolved b/Package.resolved index ce9302c..480722b 100644 --- a/Package.resolved +++ b/Package.resolved @@ -27,6 +27,24 @@ "revision": "5dd1907d64f0d36f158f61a466bab75067224893", "version": "6.9.0" } + }, + { + "package": "swift-async-algorithms", + "repositoryURL": "https://github.com/apple/swift-async-algorithms", + "state": { + "branch": null, + "revision": "d0b4a06d0f173a2f3be27d3ea21b3c3aa18db440", + "version": "1.1.4" + } + }, + { + "package": "swift-collections", + "repositoryURL": "https://github.com/apple/swift-collections.git", + "state": { + "branch": null, + "revision": "fea17c02d767f46b23070fdfdacc28a03a39232a", + "version": "1.5.1" + } } ] }, diff --git a/Package.swift b/Package.swift index 53d3f9b..be7428a 100644 --- a/Package.swift +++ b/Package.swift @@ -11,6 +11,7 @@ let package = Package( ], dependencies: [ .package(url: "https://github.com/ReactiveX/RxSwift", "6.9.0"..<"7.0.0"), + .package(url: "https://github.com/apple/swift-async-algorithms", from: "1.0.0"), .package(url: "https://github.com/mattgallagher/CwlPreconditionTesting.git", from: "2.2.2"), // for testTarget only ], targets: [ @@ -18,7 +19,8 @@ let package = Package( name: "RIBs", dependencies: [ .product(name: "RxSwift", package: "RxSwift"), - .product(name: "RxRelay", package: "RxSwift") + .product(name: "RxRelay", package: "RxSwift"), + .product(name: "AsyncAlgorithms", package: "swift-async-algorithms") ], path: "RIBs" ), diff --git a/RIBs/Classes/Concurrency/AsyncSequence+RIBs.swift b/RIBs/Classes/Concurrency/AsyncSequence+RIBs.swift new file mode 100644 index 0000000..1ce7e82 --- /dev/null +++ b/RIBs/Classes/Concurrency/AsyncSequence+RIBs.swift @@ -0,0 +1,73 @@ +// +// Copyright (c) 2017. Uber Technologies +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +import AsyncAlgorithms +import RxSwift + +/// A type-erased async sequence used to preserve element type information without requiring typed +/// `AsyncSequence` availability. +public struct AnyAsyncSequence: AsyncSequence { + + public struct AsyncIterator: AsyncIteratorProtocol { + + private var nextElement: () async throws -> Element? + + fileprivate init(_ iterator: Iterator) where Iterator.Element == Element { + var iterator = iterator + nextElement = { + try await iterator.next() + } + } + + public mutating func next() async throws -> Element? { + return try await nextElement() + } + } + + private let makeIterator: () -> AsyncIterator + + public init(_ sequence: Sequence) where Sequence.Element == Element { + makeIterator = { + AsyncIterator(sequence.makeAsyncIterator()) + } + } + + public func makeAsyncIterator() -> AsyncIterator { + return makeIterator() + } +} + +public extension AsyncSequence { + + /// Confines the async sequence's elements to the given interactor scope. + /// + /// Elements are only yielded while the interactor scope is active. Values emitted while inactive are ignored, + /// except for the latest value that can be emitted when the scope becomes active again. + /// + /// - parameter interactorScope: The interactor scope whose activeness this async sequence is confined to. + /// - returns: The async sequence confined to this interactor's activeness lifecycle. + func confineTo(_ interactorScope: InteractorScope) -> AnyAsyncSequence { + return AnyAsyncSequence( + combineLatest(interactorScope.isActiveStream.values, self) + .filter { isActive, _ in + isActive + } + .map { _, element in + element + } + ) + } +} diff --git a/RIBs/Classes/Concurrency/Task+RIBs.swift b/RIBs/Classes/Concurrency/Task+RIBs.swift new file mode 100644 index 0000000..8a62716 --- /dev/null +++ b/RIBs/Classes/Concurrency/Task+RIBs.swift @@ -0,0 +1,130 @@ +// +// Copyright (c) 2017. Uber Technologies +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +import RxSwift + +public extension Interactor { + + /// Runs a task that is cancelled when this interactor deactivates. + /// + /// If the interactor is inactive when this method is invoked, the task is cancelled immediately. + @discardableResult + func taskOnDeactivate( + priority: TaskPriority? = nil, + operation: @escaping () async -> () + ) -> Task { + let task = Task(priority: priority) { + await operation() + } + Disposables.create { + task.cancel() + } + .disposeOnDeactivate(interactor: self) + return task + } + + /// Runs a throwing task that is cancelled when this interactor deactivates. + /// + /// If the interactor is inactive when this method is invoked, the task is cancelled immediately. + @discardableResult + func throwingTaskOnDeactivate( + priority: TaskPriority? = nil, + operation: @escaping () async throws -> () + ) -> Task { + let task = Task(priority: priority) { + try await operation() + } + Disposables.create { + task.cancel() + } + .disposeOnDeactivate(interactor: self) + return task + } +} + +public extension Worker { + + /// Runs a task that is cancelled when this worker stops. + /// + /// If the worker is stopped when this method is invoked, the task is cancelled immediately. + @discardableResult + func taskOnStop( + priority: TaskPriority? = nil, + operation: @escaping () async -> () + ) -> Task { + let task = Task(priority: priority) { + await operation() + } + Disposables.create { + task.cancel() + } + .disposeOnStop(self) + return task + } + + /// Runs a throwing task that is cancelled when this worker stops. + /// + /// If the worker is stopped when this method is invoked, the task is cancelled immediately. + @discardableResult + func throwingTaskOnStop( + priority: TaskPriority? = nil, + operation: @escaping () async throws -> () + ) -> Task { + let task = Task(priority: priority) { + try await operation() + } + Disposables.create { + task.cancel() + } + .disposeOnStop(self) + return task + } +} + +public extension Workflow { + + /// Runs a task that is cancelled when this workflow is disposed. + @discardableResult + func task( + priority: TaskPriority? = nil, + operation: @escaping () async -> () + ) -> Task { + let task = Task(priority: priority) { + await operation() + } + Disposables.create { + task.cancel() + } + .disposeWith(workflow: self) + return task + } + + /// Runs a throwing task that is cancelled when this workflow is disposed. + @discardableResult + func throwingTask( + priority: TaskPriority? = nil, + operation: @escaping () async throws -> () + ) -> Task { + let task = Task(priority: priority) { + try await operation() + } + Disposables.create { + task.cancel() + } + .disposeWith(workflow: self) + return task + } +} diff --git a/RIBs/Classes/Workflow/Workflow.swift b/RIBs/Classes/Workflow/Workflow.swift index 5149762..5ebb684 100644 --- a/RIBs/Classes/Workflow/Workflow.swift +++ b/RIBs/Classes/Workflow/Workflow.swift @@ -211,6 +211,18 @@ open class Step { } /// `Workflow` related obervable extensions. +public extension AsyncSequence { + + /// Fork the step from this async sequence. + /// + /// - parameter workflow: The workflow this step belongs to. + /// - returns: The newly forked step in the workflow. + func fork(_ workflow: Workflow) -> Step where Element == (ActionableItemType, ValueType) { + workflow.didFork() + return Step(workflow: workflow, observable: asObservable()) + } +} + public extension ObservableType { /// Fork the step from this obervable. diff --git a/RIBsTests/ConcurrencyTests.swift b/RIBsTests/ConcurrencyTests.swift new file mode 100644 index 0000000..93f29ca --- /dev/null +++ b/RIBsTests/ConcurrencyTests.swift @@ -0,0 +1,142 @@ +// +// Copyright (c) 2017. Uber Technologies +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +import RxSwift +import XCTest +@testable import RIBs + +final class ConcurrencyTests: XCTestCase { + + func test_asyncSequenceConfineTo_yieldsLatestElementWhenInteractorBecomesActive() async throws { + let interactor = Interactor() + var sourceContinuation: AsyncStream.Continuation? + let source = AsyncStream { continuation in + sourceContinuation = continuation + } + var iterator = source.confineTo(interactor).makeAsyncIterator() + + sourceContinuation?.yield(1) + interactor.activate() + let firstValue = try await iterator.next() + sourceContinuation?.yield(2) + let secondValue = try await iterator.next() + interactor.deactivate() + sourceContinuation?.yield(3) + interactor.activate() + let thirdValue = try await iterator.next() + + XCTAssertEqual(firstValue, 1) + XCTAssertEqual(secondValue, 2) + XCTAssertEqual(thirdValue, 3) + } + + func test_taskOnDeactivate_cancelsTaskWhenInteractorDeactivates() async { + let interactor = Interactor() + interactor.activate() + + let task = interactor.taskOnDeactivate { + while !Task.isCancelled { + await Task.yield() + } + } + XCTAssertFalse(task.isCancelled) + + interactor.deactivate() + await Task.yield() + + XCTAssertTrue(task.isCancelled) + } + + func test_taskOnDeactivate_cancelsImmediatelyWhenInteractorIsInactive() async { + let interactor = Interactor() + + let task = interactor.taskOnDeactivate { + while !Task.isCancelled { + await Task.yield() + } + } + await Task.yield() + + XCTAssertTrue(task.isCancelled) + } + + func test_taskOnStop_cancelsTaskWhenWorkerStops() async { + let interactor = Interactor() + let worker = Worker() + interactor.activate() + worker.start(interactor) + + let task = worker.taskOnStop { + while !Task.isCancelled { + await Task.yield() + } + } + XCTAssertFalse(task.isCancelled) + + worker.stop() + await Task.yield() + + XCTAssertTrue(task.isCancelled) + } + + func test_workflowTask_cancelsWhenWorkflowDisposableIsDisposed() async { + let workflow = Workflow<()>() + let task = workflow.task { + while !Task.isCancelled { + await Task.yield() + } + } + let disposable = workflow + .onStep { _ in + Observable.just(((), ())) + } + .commit() + .subscribe(()) + + XCTAssertFalse(task.isCancelled) + disposable.dispose() + await Task.yield() + + XCTAssertTrue(task.isCancelled) + } + + func test_asyncSequenceFork_createsWorkflowStep() { + let workflow = TestWorkflow() + let asyncSequence = AsyncStream<((), ())> { continuation in + continuation.yield(((), ())) + continuation.finish() + } + + let step: Step<(), (), ()> = asyncSequence.fork(workflow) + _ = step.commit().subscribe(()) + + XCTAssertEqual(workflow.forkCallCount, 1) + XCTAssertEqual(workflow.completeCallCount, 1) + } +} + +private class TestWorkflow: Workflow<()> { + var completeCallCount = 0 + var forkCallCount = 0 + + override func didComplete() { + completeCallCount += 1 + } + + override func didFork() { + forkCallCount += 1 + } +} From 360443cf9162e97bd07200f137f3d82341f02469 Mon Sep 17 00:00:00 2001 From: haeseoklee Date: Fri, 22 May 2026 01:26:30 +0900 Subject: [PATCH 3/5] Add concurrency helpers to Xcode project --- RIBs.xcodeproj/project.pbxproj | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/RIBs.xcodeproj/project.pbxproj b/RIBs.xcodeproj/project.pbxproj index 6974f4a..60f192c 100644 --- a/RIBs.xcodeproj/project.pbxproj +++ b/RIBs.xcodeproj/project.pbxproj @@ -35,6 +35,8 @@ BF5FC0F122808377004235F1 /* RxRelay.framework in Frameworks */ = {isa = PBXBuildFile; fileRef = BF5FC0F022808377004235F1 /* RxRelay.framework */; }; C0A100012B10000100A10001 /* Observable+AsyncSequence.swift in Sources */ = {isa = PBXBuildFile; fileRef = C0A100032B10000100A10001 /* Observable+AsyncSequence.swift */; }; C0A100022B10000100A10001 /* Lifecycle+AsyncSequence.swift in Sources */ = {isa = PBXBuildFile; fileRef = C0A100042B10000100A10001 /* Lifecycle+AsyncSequence.swift */; }; + C0A100072B10000200A10001 /* AsyncSequence+RIBs.swift in Sources */ = {isa = PBXBuildFile; fileRef = C0A100092B10000200A10001 /* AsyncSequence+RIBs.swift */; }; + C0A100082B10000200A10001 /* Task+RIBs.swift in Sources */ = {isa = PBXBuildFile; fileRef = C0A1000A2B10000200A10001 /* Task+RIBs.swift */; }; C0A100062B10000100A10001 /* AsyncStreamTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = C0A100052B10000100A10001 /* AsyncStreamTests.swift */; }; /* End PBXBuildFile section */ @@ -81,6 +83,8 @@ C0A100032B10000100A10001 /* Observable+AsyncSequence.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "Observable+AsyncSequence.swift"; sourceTree = ""; }; C0A100042B10000100A10001 /* Lifecycle+AsyncSequence.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "Lifecycle+AsyncSequence.swift"; sourceTree = ""; }; C0A100052B10000100A10001 /* AsyncStreamTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = AsyncStreamTests.swift; sourceTree = ""; }; + C0A100092B10000200A10001 /* AsyncSequence+RIBs.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "AsyncSequence+RIBs.swift"; sourceTree = ""; }; + C0A1000A2B10000200A10001 /* Task+RIBs.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "Task+RIBs.swift"; sourceTree = ""; }; E8E789432378AD000043E59E /* Package.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; name = Package.swift; path = ../Package.swift; sourceTree = SOURCE_ROOT; }; /* End PBXFileReference section */ @@ -129,8 +133,10 @@ C0A100002B10000100A10001 /* Concurrency */ = { isa = PBXGroup; children = ( + C0A100092B10000200A10001 /* AsyncSequence+RIBs.swift */, C0A100042B10000100A10001 /* Lifecycle+AsyncSequence.swift */, C0A100032B10000100A10001 /* Observable+AsyncSequence.swift */, + C0A1000A2B10000200A10001 /* Task+RIBs.swift */, ); path = Concurrency; sourceTree = ""; @@ -399,6 +405,7 @@ isa = PBXSourcesBuildPhase; buildActionMask = 2147483647; files = ( + C0A100072B10000200A10001 /* AsyncSequence+RIBs.swift in Sources */, C0A100022B10000100A10001 /* Lifecycle+AsyncSequence.swift in Sources */, 4131773A1F8EF711005F08F0 /* LeakDetector.swift in Sources */, 418C175C1F97F19F003C03F7 /* Component.swift in Sources */, @@ -411,6 +418,7 @@ 413177321F8EF5FF005F08F0 /* ViewableRouter.swift in Sources */, 413177351F8EF605005F08F0 /* Workflow.swift in Sources */, C0A100012B10000100A10001 /* Observable+AsyncSequence.swift in Sources */, + C0A100082B10000200A10001 /* Task+RIBs.swift in Sources */, 4131772F1F8EF5FF005F08F0 /* PresentableInteractor.swift in Sources */, 4131772C1F8EF5FF005F08F0 /* Builder.swift in Sources */, 413177331F8EF5FF005F08F0 /* ViewControllable.swift in Sources */, From 926aea92bf2c7e4a932b4dd45fecc15236131df5 Mon Sep 17 00:00:00 2001 From: haeseoklee Date: Sun, 31 May 2026 04:58:39 +0900 Subject: [PATCH 4/5] Refine async concurrency interfaces --- Package.resolved | 18 ----------------- Package.swift | 4 +--- .../Concurrency/AsyncSequence+RIBs.swift | 20 +++++++++---------- .../Concurrency/Lifecycle+AsyncSequence.swift | 3 +++ .../Observable+AsyncSequence.swift | 3 +++ RIBs/Classes/Concurrency/Task+RIBs.swift | 3 +++ RIBs/Classes/Workflow/Workflow.swift | 3 +++ 7 files changed, 23 insertions(+), 31 deletions(-) diff --git a/Package.resolved b/Package.resolved index 480722b..ce9302c 100644 --- a/Package.resolved +++ b/Package.resolved @@ -27,24 +27,6 @@ "revision": "5dd1907d64f0d36f158f61a466bab75067224893", "version": "6.9.0" } - }, - { - "package": "swift-async-algorithms", - "repositoryURL": "https://github.com/apple/swift-async-algorithms", - "state": { - "branch": null, - "revision": "d0b4a06d0f173a2f3be27d3ea21b3c3aa18db440", - "version": "1.1.4" - } - }, - { - "package": "swift-collections", - "repositoryURL": "https://github.com/apple/swift-collections.git", - "state": { - "branch": null, - "revision": "fea17c02d767f46b23070fdfdacc28a03a39232a", - "version": "1.5.1" - } } ] }, diff --git a/Package.swift b/Package.swift index be7428a..53d3f9b 100644 --- a/Package.swift +++ b/Package.swift @@ -11,7 +11,6 @@ let package = Package( ], dependencies: [ .package(url: "https://github.com/ReactiveX/RxSwift", "6.9.0"..<"7.0.0"), - .package(url: "https://github.com/apple/swift-async-algorithms", from: "1.0.0"), .package(url: "https://github.com/mattgallagher/CwlPreconditionTesting.git", from: "2.2.2"), // for testTarget only ], targets: [ @@ -19,8 +18,7 @@ let package = Package( name: "RIBs", dependencies: [ .product(name: "RxSwift", package: "RxSwift"), - .product(name: "RxRelay", package: "RxSwift"), - .product(name: "AsyncAlgorithms", package: "swift-async-algorithms") + .product(name: "RxRelay", package: "RxSwift") ], path: "RIBs" ), diff --git a/RIBs/Classes/Concurrency/AsyncSequence+RIBs.swift b/RIBs/Classes/Concurrency/AsyncSequence+RIBs.swift index 1ce7e82..ea7645a 100644 --- a/RIBs/Classes/Concurrency/AsyncSequence+RIBs.swift +++ b/RIBs/Classes/Concurrency/AsyncSequence+RIBs.swift @@ -14,7 +14,8 @@ // limitations under the License. // -import AsyncAlgorithms +#if swift(>=5.6) && canImport(_Concurrency) + import RxSwift /// A type-erased async sequence used to preserve element type information without requiring typed @@ -54,20 +55,19 @@ public extension AsyncSequence { /// Confines the async sequence's elements to the given interactor scope. /// - /// Elements are only yielded while the interactor scope is active. Values emitted while inactive are ignored, - /// except for the latest value that can be emitted when the scope becomes active again. + /// Elements are only yielded while the interactor scope is active. While the sequence is being iterated, + /// values emitted while inactive are ignored except for the latest value, which can be emitted when the scope + /// becomes active again. /// /// - parameter interactorScope: The interactor scope whose activeness this async sequence is confined to. /// - returns: The async sequence confined to this interactor's activeness lifecycle. func confineTo(_ interactorScope: InteractorScope) -> AnyAsyncSequence { return AnyAsyncSequence( - combineLatest(interactorScope.isActiveStream.values, self) - .filter { isActive, _ in - isActive - } - .map { _, element in - element - } + asObservable() + .confineTo(interactorScope) + .values ) } } + +#endif diff --git a/RIBs/Classes/Concurrency/Lifecycle+AsyncSequence.swift b/RIBs/Classes/Concurrency/Lifecycle+AsyncSequence.swift index e3b17bb..228679a 100644 --- a/RIBs/Classes/Concurrency/Lifecycle+AsyncSequence.swift +++ b/RIBs/Classes/Concurrency/Lifecycle+AsyncSequence.swift @@ -14,6 +14,8 @@ // limitations under the License. // +#if swift(>=5.6) && canImport(_Concurrency) + public extension InteractorScope { /// The lifecycle of this interactor exposed as an async stream. @@ -45,3 +47,4 @@ public extension LeakDetector { return status.asAsyncStream(bufferingPolicy: .bufferingNewest(1)) } } +#endif diff --git a/RIBs/Classes/Concurrency/Observable+AsyncSequence.swift b/RIBs/Classes/Concurrency/Observable+AsyncSequence.swift index a975290..3eebb84 100644 --- a/RIBs/Classes/Concurrency/Observable+AsyncSequence.swift +++ b/RIBs/Classes/Concurrency/Observable+AsyncSequence.swift @@ -14,6 +14,8 @@ // limitations under the License. // +#if swift(>=5.6) && canImport(_Concurrency) + import RxSwift public extension ObservableType { @@ -68,3 +70,4 @@ public extension ObservableType { } } } +#endif diff --git a/RIBs/Classes/Concurrency/Task+RIBs.swift b/RIBs/Classes/Concurrency/Task+RIBs.swift index 8a62716..5f74adc 100644 --- a/RIBs/Classes/Concurrency/Task+RIBs.swift +++ b/RIBs/Classes/Concurrency/Task+RIBs.swift @@ -14,6 +14,8 @@ // limitations under the License. // +#if swift(>=5.6) && canImport(_Concurrency) + import RxSwift public extension Interactor { @@ -128,3 +130,4 @@ public extension Workflow { return task } } +#endif diff --git a/RIBs/Classes/Workflow/Workflow.swift b/RIBs/Classes/Workflow/Workflow.swift index 5ebb684..e8139dd 100644 --- a/RIBs/Classes/Workflow/Workflow.swift +++ b/RIBs/Classes/Workflow/Workflow.swift @@ -202,6 +202,9 @@ open class Step { /// Convert the `Workflow` step into an async throwing sequence. /// + /// This method only bridges the step output. It does not commit the workflow or register workflow completion + /// and error side effects. Call `commit()` explicitly when this step should act as part of a running workflow. + /// /// - returns: The async sequence representation of this `Workflow` step. public final func asAsyncSequence( bufferingPolicy: AsyncThrowingStream<(ActionableItemType, ValueType), Error>.Continuation.BufferingPolicy = .unbounded From 162f6a7cfb11bd36ba3e80faa9ae2b0dd8e2b05e Mon Sep 17 00:00:00 2001 From: haeseoklee Date: Sun, 31 May 2026 04:59:19 +0900 Subject: [PATCH 5/5] Reorganize concurrency tests --- RIBs.xcodeproj/project.pbxproj | 36 +- RIBsTests/AsyncStreamTests.swift | 140 ------- .../InteractorConcurrencyTests.swift | 247 ++++++++++++ .../LeakDetectorConcurrencyTests.swift | 38 ++ .../ObservableConcurrencyTests.swift | 109 ++++++ .../Concurrency/RouterConcurrencyTests.swift | 44 +++ .../Concurrency/WorkerConcurrencyTests.swift | 176 +++++++++ .../WorkflowConcurrencyTests.swift | 354 ++++++++++++++++++ RIBsTests/ConcurrencyTests.swift | 142 ------- 9 files changed, 1000 insertions(+), 286 deletions(-) delete mode 100644 RIBsTests/AsyncStreamTests.swift create mode 100644 RIBsTests/Concurrency/InteractorConcurrencyTests.swift create mode 100644 RIBsTests/Concurrency/LeakDetectorConcurrencyTests.swift create mode 100644 RIBsTests/Concurrency/ObservableConcurrencyTests.swift create mode 100644 RIBsTests/Concurrency/RouterConcurrencyTests.swift create mode 100644 RIBsTests/Concurrency/WorkerConcurrencyTests.swift create mode 100644 RIBsTests/Concurrency/WorkflowConcurrencyTests.swift delete mode 100644 RIBsTests/ConcurrencyTests.swift diff --git a/RIBs.xcodeproj/project.pbxproj b/RIBs.xcodeproj/project.pbxproj index 60f192c..538972c 100644 --- a/RIBs.xcodeproj/project.pbxproj +++ b/RIBs.xcodeproj/project.pbxproj @@ -37,7 +37,12 @@ C0A100022B10000100A10001 /* Lifecycle+AsyncSequence.swift in Sources */ = {isa = PBXBuildFile; fileRef = C0A100042B10000100A10001 /* Lifecycle+AsyncSequence.swift */; }; C0A100072B10000200A10001 /* AsyncSequence+RIBs.swift in Sources */ = {isa = PBXBuildFile; fileRef = C0A100092B10000200A10001 /* AsyncSequence+RIBs.swift */; }; C0A100082B10000200A10001 /* Task+RIBs.swift in Sources */ = {isa = PBXBuildFile; fileRef = C0A1000A2B10000200A10001 /* Task+RIBs.swift */; }; - C0A100062B10000100A10001 /* AsyncStreamTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = C0A100052B10000100A10001 /* AsyncStreamTests.swift */; }; + C0A101022B10001000A10001 /* ObservableConcurrencyTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = C0A101012B10001000A10001 /* ObservableConcurrencyTests.swift */; }; + C0A101042B10001000A10001 /* InteractorConcurrencyTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = C0A101032B10001000A10001 /* InteractorConcurrencyTests.swift */; }; + C0A101062B10001000A10001 /* WorkerConcurrencyTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = C0A101052B10001000A10001 /* WorkerConcurrencyTests.swift */; }; + C0A101082B10001000A10001 /* RouterConcurrencyTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = C0A101072B10001000A10001 /* RouterConcurrencyTests.swift */; }; + C0A1010A2B10001000A10001 /* LeakDetectorConcurrencyTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = C0A101092B10001000A10001 /* LeakDetectorConcurrencyTests.swift */; }; + C0A1010C2B10001000A10001 /* WorkflowConcurrencyTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = C0A1010B2B10001000A10001 /* WorkflowConcurrencyTests.swift */; }; /* End PBXBuildFile section */ /* Begin PBXContainerItemProxy section */ @@ -82,7 +87,12 @@ BF5FC0F022808377004235F1 /* RxRelay.framework */ = {isa = PBXFileReference; lastKnownFileType = wrapper.framework; name = RxRelay.framework; path = ../Carthage/Build/iOS/RxRelay.framework; sourceTree = ""; }; C0A100032B10000100A10001 /* Observable+AsyncSequence.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "Observable+AsyncSequence.swift"; sourceTree = ""; }; C0A100042B10000100A10001 /* Lifecycle+AsyncSequence.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "Lifecycle+AsyncSequence.swift"; sourceTree = ""; }; - C0A100052B10000100A10001 /* AsyncStreamTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = AsyncStreamTests.swift; sourceTree = ""; }; + C0A101012B10001000A10001 /* ObservableConcurrencyTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ObservableConcurrencyTests.swift; sourceTree = ""; }; + C0A101032B10001000A10001 /* InteractorConcurrencyTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = InteractorConcurrencyTests.swift; sourceTree = ""; }; + C0A101052B10001000A10001 /* WorkerConcurrencyTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = WorkerConcurrencyTests.swift; sourceTree = ""; }; + C0A101072B10001000A10001 /* RouterConcurrencyTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = RouterConcurrencyTests.swift; sourceTree = ""; }; + C0A101092B10001000A10001 /* LeakDetectorConcurrencyTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = LeakDetectorConcurrencyTests.swift; sourceTree = ""; }; + C0A1010B2B10001000A10001 /* WorkflowConcurrencyTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = WorkflowConcurrencyTests.swift; sourceTree = ""; }; C0A100092B10000200A10001 /* AsyncSequence+RIBs.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "AsyncSequence+RIBs.swift"; sourceTree = ""; }; C0A1000A2B10000200A10001 /* Task+RIBs.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "Task+RIBs.swift"; sourceTree = ""; }; E8E789432378AD000043E59E /* Package.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; name = Package.swift; path = ../Package.swift; sourceTree = SOURCE_ROOT; }; @@ -217,7 +227,7 @@ isa = PBXGroup; children = ( 8B9882F21F86E1CF00ABE009 /* Info.plist */, - C0A100052B10000100A10001 /* AsyncStreamTests.swift */, + C0A1000B2B10000300A10001 /* Concurrency */, AFB7D4041FC81D6100045D2B /* LaunchRouterTests.swift */, AF90B40B1FBA157700920384 /* Mocks.swift */, AF90B4091FBA14DB00920384 /* RouterTests.swift */, @@ -229,6 +239,19 @@ path = RIBsTests; sourceTree = ""; }; + C0A1000B2B10000300A10001 /* Concurrency */ = { + isa = PBXGroup; + children = ( + C0A101032B10001000A10001 /* InteractorConcurrencyTests.swift */, + C0A101092B10001000A10001 /* LeakDetectorConcurrencyTests.swift */, + C0A101012B10001000A10001 /* ObservableConcurrencyTests.swift */, + C0A101072B10001000A10001 /* RouterConcurrencyTests.swift */, + C0A101052B10001000A10001 /* WorkerConcurrencyTests.swift */, + C0A1010B2B10001000A10001 /* WorkflowConcurrencyTests.swift */, + ); + path = Concurrency; + sourceTree = ""; + }; AF5101421FBBA64A009C0DB3 /* Frameworks */ = { isa = PBXGroup; children = ( @@ -431,7 +454,12 @@ isa = PBXSourcesBuildPhase; buildActionMask = 2147483647; files = ( - C0A100062B10000100A10001 /* AsyncStreamTests.swift in Sources */, + C0A101042B10001000A10001 /* InteractorConcurrencyTests.swift in Sources */, + C0A1010A2B10001000A10001 /* LeakDetectorConcurrencyTests.swift in Sources */, + C0A101022B10001000A10001 /* ObservableConcurrencyTests.swift in Sources */, + C0A101082B10001000A10001 /* RouterConcurrencyTests.swift in Sources */, + C0A101062B10001000A10001 /* WorkerConcurrencyTests.swift in Sources */, + C0A1010C2B10001000A10001 /* WorkflowConcurrencyTests.swift in Sources */, AFB7D4031FC81C8F00045D2B /* Foundation+ExtensionsTests.swift in Sources */, AF90B4111FBA185E00920384 /* ComponentTests.swift in Sources */, AFB7D4051FC81D6100045D2B /* LaunchRouterTests.swift in Sources */, diff --git a/RIBsTests/AsyncStreamTests.swift b/RIBsTests/AsyncStreamTests.swift deleted file mode 100644 index bea6bff..0000000 --- a/RIBsTests/AsyncStreamTests.swift +++ /dev/null @@ -1,140 +0,0 @@ -// -// Copyright (c) 2017. Uber Technologies -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// - -import RxSwift -import XCTest -@testable import RIBs - -final class AsyncStreamTests: XCTestCase { - - func test_observableAsAsyncStream_emitsValuesAndCompletes() async { - var iterator = Observable.from([1, 2]).asAsyncStream().makeAsyncIterator() - - let firstValue = await iterator.next() - let secondValue = await iterator.next() - let completedValue = await iterator.next() - - XCTAssertEqual(firstValue, 1) - XCTAssertEqual(secondValue, 2) - XCTAssertNil(completedValue) - } - - func test_observableAsAsyncThrowingStream_throwsOnError() async { - var iterator = Observable - .error(AsyncStreamTestError.error) - .asAsyncThrowingStream() - .makeAsyncIterator() - - do { - _ = try await iterator.next() - XCTFail("Expected async sequence to throw") - } catch AsyncStreamTestError.error { - // Expected. - } catch { - XCTFail("Unexpected error: \(error)") - } - } - - func test_interactorIsActiveSequence_emitsCurrentValueAndChanges() async { - let interactor = Interactor() - var iterator = interactor.isActiveSequence.makeAsyncIterator() - - let initialValue = await iterator.next() - interactor.activate() - let activeValue = await iterator.next() - interactor.deactivate() - let inactiveValue = await iterator.next() - - XCTAssertEqual(initialValue, false) - XCTAssertEqual(activeValue, true) - XCTAssertEqual(inactiveValue, false) - } - - func test_workerIsStartedSequence_emitsCurrentValueAndChanges() async { - let interactor = Interactor() - let worker = Worker() - var iterator = worker.isStartedSequence.makeAsyncIterator() - - let initialValue = await iterator.next() - interactor.activate() - worker.start(interactor) - let startedValue = await iterator.next() - worker.stop() - let stoppedValue = await iterator.next() - - XCTAssertEqual(initialValue, false) - XCTAssertEqual(startedValue, true) - XCTAssertEqual(stoppedValue, false) - } - - func test_routerLifecycleSequence_emitsDidLoad() async { - let router = Router(interactor: Interactor()) - var iterator = router.lifecycleSequence.makeAsyncIterator() - let lifecycleTask = Task { - await iterator.next() - } - - router.load() - let lifecycle = await lifecycleTask.value - - XCTAssertEqual(lifecycle, .didLoad) - } - - func test_stepAsAsyncSequence_emitsStepOutput() async throws { - let workflow = Workflow() - let step = workflow - .onStep { actionableItem in - Observable.just((actionableItem.count, actionableItem)) - } - let sequence = step.asAsyncSequence() - let stepTask = Task { () -> (Int, String)? in - var iterator = sequence.makeAsyncIterator() - return try await iterator.next() - } - - _ = workflow.subscribe("test") - let value = try await stepTask.value - - XCTAssertEqual(value?.0, 4) - XCTAssertEqual(value?.1, "test") - } - - func test_onAsyncStep_emitsAsyncResult() async throws { - let workflow = Workflow() - let step = workflow - .onStep { actionableItem in - Observable.just((actionableItem, actionableItem)) - } - .onAsyncStep { actionableItem, value in - return (actionableItem + 1, value + 2) - } - let sequence = step.asAsyncSequence() - let stepTask = Task { () -> (Int, Int)? in - var iterator = sequence.makeAsyncIterator() - return try await iterator.next() - } - - _ = workflow.subscribe(1) - let value = try await stepTask.value - - XCTAssertEqual(value?.0, 2) - XCTAssertEqual(value?.1, 3) - } -} - -private enum AsyncStreamTestError: Error { - case error -} diff --git a/RIBsTests/Concurrency/InteractorConcurrencyTests.swift b/RIBsTests/Concurrency/InteractorConcurrencyTests.swift new file mode 100644 index 0000000..0206f16 --- /dev/null +++ b/RIBsTests/Concurrency/InteractorConcurrencyTests.swift @@ -0,0 +1,247 @@ +// +// Copyright (c) 2017. Uber Technologies +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +import RxSwift +import XCTest +@testable import RIBs + +final class InteractorConcurrencyTests: XCTestCase { + + func test_isActiveSequence_emitsCurrentValueAndChanges() async { + let interactor = Interactor() + var iterator = interactor.isActiveSequence.makeAsyncIterator() + + let initialValue = await iterator.next() + interactor.activate() + let activeValue = await iterator.next() + interactor.deactivate() + let inactiveValue = await iterator.next() + + XCTAssertEqual(initialValue, false) + XCTAssertEqual(activeValue, true) + XCTAssertEqual(inactiveValue, false) + } + + func test_isActiveSequence_completesWhenInteractorDeinitializes() async { + var interactor: Interactor? = Interactor() + var iterator = interactor?.isActiveSequence.makeAsyncIterator() + + let initialValue = await iterator?.next() + interactor = nil + let completedValue = await iterator?.next() + + XCTAssertEqual(initialValue, false) + XCTAssertNil(completedValue) + } + + func test_asyncSequenceConfineTo_onlyEmitsValueWhenInteractorIsActive() async throws { + let interactor = Interactor() + var sourceContinuation: AsyncStream.Continuation! + let source = AsyncStream { continuation in + sourceContinuation = continuation + } + let activeExpectation = expectation( + description: "Should emit when interactor is active" + ) + + var receivedValue: Int? + + let task = Task { + var iterator = source.confineTo(interactor).makeAsyncIterator() + + while let value = try await iterator.next() { + receivedValue = value + + if value == 2 { + activeExpectation.fulfill() + break + } + } + } + + sourceContinuation.yield(1) + + interactor.activate() + sourceContinuation.yield(2) + + await fulfillment(of: [activeExpectation], timeout: 1.0) + XCTAssertEqual(receivedValue, 2) + task.cancel() + sourceContinuation.finish() + } + + func test_asyncSequenceConfineTo_throwsWhenBaseSequenceThrows() async { + let interactor = Interactor() + let source = AsyncThrowingStream { continuation in + continuation.finish(throwing: InteractorConcurrencyTestError.error) + } + var iterator = source.confineTo(interactor).makeAsyncIterator() + + do { + _ = try await iterator.next() + XCTFail("Expected confined async sequence to throw") + } catch InteractorConcurrencyTestError.error { + // Expected. + } catch { + XCTFail("Unexpected error: \(error)") + } + } + + func test_asyncSequenceConfineTo_yieldsLatestElementAfterBaseSequenceCompletes() async throws { + let interactor = Interactor() + var sourceContinuation: AsyncStream.Continuation? + let source = AsyncStream { continuation in + sourceContinuation = continuation + } + var iterator = source.confineTo(interactor).makeAsyncIterator() + let valueReceived = expectation(description: "Value received") + var receivedValue: Int? + + let valueTask = Task { + receivedValue = try await iterator.next() + valueReceived.fulfill() + } + sourceContinuation?.yield(1) + sourceContinuation?.finish() + interactor.activate() + await fulfillment(of: [valueReceived], timeout: 1) + try await valueTask.value + + XCTAssertEqual(receivedValue, 1) + } + + func test_taskOnDeactivate_cancelsTaskWhenInteractorDeactivates() async { + let interactor = Interactor() + interactor.activate() + let taskCancelled = expectation(description: "Task cancelled") + + let task = interactor.taskOnDeactivate { + while !Task.isCancelled { + await Task.yield() + } + taskCancelled.fulfill() + } + XCTAssertFalse(task.isCancelled) + + interactor.deactivate() + await fulfillment(of: [taskCancelled], timeout: 1) + + XCTAssertTrue(task.isCancelled) + } + + func test_taskOnDeactivate_cancelsImmediatelyWhenInteractorIsInactive() async { + let interactor = Interactor() + let taskCancelled = expectation(description: "Task cancelled") + + let task = interactor.taskOnDeactivate { + while !Task.isCancelled { + await Task.yield() + } + taskCancelled.fulfill() + } + await fulfillment(of: [taskCancelled], timeout: 1) + + XCTAssertTrue(task.isCancelled) + } + + func test_throwingTaskOnDeactivate_cancelsTaskWhenInteractorDeactivates() async { + let interactor = Interactor() + interactor.activate() + + let task = interactor.throwingTaskOnDeactivate { + try await Task.sleep(nanoseconds: 10_000_000_000) + } + XCTAssertFalse(task.isCancelled) + + interactor.deactivate() + + do { + try await task.value + XCTFail("Expected task to throw CancellationError") + } catch is CancellationError { + // Expected. + } catch { + XCTFail("Unexpected error: \(error)") + } + + XCTAssertTrue(task.isCancelled) + } + + func test_throwingTaskOnDeactivate_cancelsImmediatelyWhenInteractorIsInactive() async { + let interactor = Interactor() + + let task = interactor.throwingTaskOnDeactivate { + try await Task.sleep(nanoseconds: 10_000_000_000) + } + + do { + try await task.value + XCTFail("Expected task to throw CancellationError") + } catch is CancellationError { + // Expected. + } catch { + XCTFail("Unexpected error: \(error)") + } + + XCTAssertTrue(task.isCancelled) + } + + func test_taskOnDeactivate_cancelsTaskWhenInteractorDeinitializes() async { + var interactor: Interactor? = Interactor() + interactor?.activate() + let taskCancelled = expectation(description: "Task cancelled") + + let task = interactor!.taskOnDeactivate { + while !Task.isCancelled { + await Task.yield() + } + taskCancelled.fulfill() + } + XCTAssertFalse(task.isCancelled) + + interactor = nil + await fulfillment(of: [taskCancelled], timeout: 1) + + XCTAssertTrue(task.isCancelled) + } + + func test_throwingTaskOnDeactivate_cancelsTaskWhenInteractorDeinitializes() async { + var interactor: Interactor? = Interactor() + interactor?.activate() + + let task = interactor!.throwingTaskOnDeactivate { + try await Task.sleep(nanoseconds: 10_000_000_000) + } + XCTAssertFalse(task.isCancelled) + + interactor = nil + + do { + try await task.value + XCTFail("Expected task to throw CancellationError") + } catch is CancellationError { + // Expected. + } catch { + XCTFail("Unexpected error: \(error)") + } + + XCTAssertTrue(task.isCancelled) + } +} + +private enum InteractorConcurrencyTestError: Error { + case error +} diff --git a/RIBsTests/Concurrency/LeakDetectorConcurrencyTests.swift b/RIBsTests/Concurrency/LeakDetectorConcurrencyTests.swift new file mode 100644 index 0000000..fe458e1 --- /dev/null +++ b/RIBsTests/Concurrency/LeakDetectorConcurrencyTests.swift @@ -0,0 +1,38 @@ +// +// Copyright (c) 2017. Uber Technologies +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +import Foundation +import XCTest +@testable import RIBs + +final class LeakDetectorConcurrencyTests: XCTestCase { + + func test_statusSequence_emitsCurrentValueAndChanges() async { + let leakDetector = LeakDetector() + let object = NSObject() + var iterator = leakDetector.statusSequence.makeAsyncIterator() + + let initialValue = await iterator.next() + let handle = leakDetector.expectDeallocate(object: object, inTime: 10) + let inProgressValue = await iterator.next() + handle.cancel() + let completedValue = await iterator.next() + + XCTAssertEqual(initialValue, .DidComplete) + XCTAssertEqual(inProgressValue, .InProgress) + XCTAssertEqual(completedValue, .DidComplete) + } +} diff --git a/RIBsTests/Concurrency/ObservableConcurrencyTests.swift b/RIBsTests/Concurrency/ObservableConcurrencyTests.swift new file mode 100644 index 0000000..03fdbc4 --- /dev/null +++ b/RIBsTests/Concurrency/ObservableConcurrencyTests.swift @@ -0,0 +1,109 @@ +// +// Copyright (c) 2017. Uber Technologies +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +import RxSwift +import XCTest +@testable import RIBs + +final class ObservableConcurrencyTests: XCTestCase { + + func test_observableAsAsyncStream_emitsValuesAndCompletes() async { + var iterator = Observable.from([1, 2]).asAsyncStream().makeAsyncIterator() + + let firstValue = await iterator.next() + let secondValue = await iterator.next() + let completedValue = await iterator.next() + + XCTAssertEqual(firstValue, 1) + XCTAssertEqual(secondValue, 2) + XCTAssertNil(completedValue) + } + + func test_observableAsAsyncStream_completesOnError() async { + var iterator = Observable + .error(ObservableConcurrencyTestError.error) + .asAsyncStream() + .makeAsyncIterator() + + let value = await iterator.next() + + XCTAssertNil(value) + } + + func test_observableAsAsyncThrowingStream_emitsValuesAndCompletes() async throws { + var iterator = Observable.from([1, 2]).asAsyncThrowingStream().makeAsyncIterator() + + let firstValue = try await iterator.next() + let secondValue = try await iterator.next() + let completedValue = try await iterator.next() + + XCTAssertEqual(firstValue, 1) + XCTAssertEqual(secondValue, 2) + XCTAssertNil(completedValue) + } + + func test_observableAsAsyncThrowingStream_throwsOnError() async { + var iterator = Observable + .error(ObservableConcurrencyTestError.error) + .asAsyncThrowingStream() + .makeAsyncIterator() + + do { + _ = try await iterator.next() + XCTFail("Expected async sequence to throw") + } catch ObservableConcurrencyTestError.error { + // Expected. + } catch { + XCTFail("Unexpected error: \(error)") + } + } + + func test_observableAsAsyncStream_disposesSubscriptionWhenTaskIsCancelled() async { + let disposed = expectation(description: "Subscription disposed") + let observable = Observable.never().do(onDispose: { + disposed.fulfill() + }) + + let task = Task { + var iterator = observable.asAsyncStream().makeAsyncIterator() + _ = await iterator.next() + } + + await Task.yield() + task.cancel() + await fulfillment(of: [disposed], timeout: 1) + } + + func test_observableAsAsyncThrowingStream_disposesSubscriptionWhenTaskIsCancelled() async { + let disposed = expectation(description: "Subscription disposed") + let observable = Observable.never().do(onDispose: { + disposed.fulfill() + }) + + let task = Task { + var iterator = observable.asAsyncThrowingStream().makeAsyncIterator() + _ = try? await iterator.next() + } + + await Task.yield() + task.cancel() + await fulfillment(of: [disposed], timeout: 1) + } +} + +private enum ObservableConcurrencyTestError: Error { + case error +} diff --git a/RIBsTests/Concurrency/RouterConcurrencyTests.swift b/RIBsTests/Concurrency/RouterConcurrencyTests.swift new file mode 100644 index 0000000..50c348a --- /dev/null +++ b/RIBsTests/Concurrency/RouterConcurrencyTests.swift @@ -0,0 +1,44 @@ +// +// Copyright (c) 2017. Uber Technologies +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +import XCTest +@testable import RIBs + +final class RouterConcurrencyTests: XCTestCase { + + func test_lifecycleSequence_emitsDidLoad() async { + let router = Router(interactor: Interactor()) + var iterator = router.lifecycleSequence.makeAsyncIterator() + let lifecycleTask = Task { + await iterator.next() + } + + router.load() + let lifecycle = await lifecycleTask.value + + XCTAssertEqual(lifecycle, .didLoad) + } + + func test_lifecycleSequence_completesWhenRouterDeinitializes() async { + var router: Router? = Router(interactor: Interactor()) + var iterator = router?.lifecycleSequence.makeAsyncIterator() + + router = nil + let completedValue = await iterator?.next() + + XCTAssertNil(completedValue) + } +} diff --git a/RIBsTests/Concurrency/WorkerConcurrencyTests.swift b/RIBsTests/Concurrency/WorkerConcurrencyTests.swift new file mode 100644 index 0000000..a8736c9 --- /dev/null +++ b/RIBsTests/Concurrency/WorkerConcurrencyTests.swift @@ -0,0 +1,176 @@ +// +// Copyright (c) 2017. Uber Technologies +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +import XCTest +@testable import RIBs + +final class WorkerConcurrencyTests: XCTestCase { + + func test_isStartedSequence_emitsCurrentValueAndChanges() async { + let interactor = Interactor() + let worker = Worker() + var iterator = worker.isStartedSequence.makeAsyncIterator() + + let initialValue = await iterator.next() + interactor.activate() + worker.start(interactor) + let startedValue = await iterator.next() + worker.stop() + let stoppedValue = await iterator.next() + + XCTAssertEqual(initialValue, false) + XCTAssertEqual(startedValue, true) + XCTAssertEqual(stoppedValue, false) + } + + func test_isStartedSequence_completesWhenWorkerDeinitializes() async { + var worker: Worker? = Worker() + var iterator = worker?.isStartedSequence.makeAsyncIterator() + + let initialValue = await iterator?.next() + worker = nil + let completedValue = await iterator?.next() + + XCTAssertEqual(initialValue, false) + XCTAssertNil(completedValue) + } + + func test_taskOnStop_cancelsTaskWhenWorkerStops() async { + let interactor = Interactor() + let worker = Worker() + let taskCancelled = expectation(description: "Task cancelled") + interactor.activate() + worker.start(interactor) + + let task = worker.taskOnStop { + while !Task.isCancelled { + await Task.yield() + } + taskCancelled.fulfill() + } + XCTAssertFalse(task.isCancelled) + + worker.stop() + await fulfillment(of: [taskCancelled], timeout: 1) + + XCTAssertTrue(task.isCancelled) + } + + func test_taskOnStop_cancelsImmediatelyWhenWorkerIsStopped() async { + let worker = Worker() + let taskCancelled = expectation(description: "Task cancelled") + + let task = worker.taskOnStop { + while !Task.isCancelled { + await Task.yield() + } + taskCancelled.fulfill() + } + await fulfillment(of: [taskCancelled], timeout: 1) + + XCTAssertTrue(task.isCancelled) + } + + func test_throwingTaskOnStop_cancelsTaskWhenWorkerStops() async { + let interactor = Interactor() + let worker = Worker() + interactor.activate() + worker.start(interactor) + + let task = worker.throwingTaskOnStop { + try await Task.sleep(nanoseconds: 10_000_000_000) + } + XCTAssertFalse(task.isCancelled) + + worker.stop() + + do { + try await task.value + XCTFail("Expected task to throw CancellationError") + } catch is CancellationError { + // Expected. + } catch { + XCTFail("Unexpected error: \(error)") + } + + XCTAssertTrue(task.isCancelled) + } + + func test_throwingTaskOnStop_cancelsImmediatelyWhenWorkerIsStopped() async { + let worker = Worker() + + let task = worker.throwingTaskOnStop { + try await Task.sleep(nanoseconds: 10_000_000_000) + } + + do { + try await task.value + XCTFail("Expected task to throw CancellationError") + } catch is CancellationError { + // Expected. + } catch { + XCTFail("Unexpected error: \(error)") + } + + XCTAssertTrue(task.isCancelled) + } + + func test_taskOnStop_cancelsTaskWhenWorkerDeinitializes() async { + let interactor = Interactor() + var worker: Worker? = Worker() + let taskCancelled = expectation(description: "Task cancelled") + interactor.activate() + worker?.start(interactor) + + let task = worker!.taskOnStop { + while !Task.isCancelled { + await Task.yield() + } + taskCancelled.fulfill() + } + XCTAssertFalse(task.isCancelled) + + worker = nil + await fulfillment(of: [taskCancelled], timeout: 1) + + XCTAssertTrue(task.isCancelled) + } + + func test_throwingTaskOnStop_cancelsTaskWhenWorkerDeinitializes() async { + let interactor = Interactor() + var worker: Worker? = Worker() + interactor.activate() + worker?.start(interactor) + + let task = worker!.throwingTaskOnStop { + try await Task.sleep(nanoseconds: 10_000_000_000) + } + XCTAssertFalse(task.isCancelled) + + worker = nil + + do { + try await task.value + XCTFail("Expected task to throw CancellationError") + } catch is CancellationError { + // Expected. + } catch { + XCTFail("Unexpected error: \(error)") + } + + XCTAssertTrue(task.isCancelled) + } +} diff --git a/RIBsTests/Concurrency/WorkflowConcurrencyTests.swift b/RIBsTests/Concurrency/WorkflowConcurrencyTests.swift new file mode 100644 index 0000000..b15c042 --- /dev/null +++ b/RIBsTests/Concurrency/WorkflowConcurrencyTests.swift @@ -0,0 +1,354 @@ +// +// Copyright (c) 2017. Uber Technologies +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +import RxSwift +import XCTest +@testable import RIBs + +final class WorkflowConcurrencyTests: XCTestCase { + + func test_workflowTask_cancelsWhenWorkflowDisposableIsDisposed() async { + let workflow = Workflow<()>() + let taskCancelled = expectation(description: "Task cancelled") + let task = workflow.task { + while !Task.isCancelled { + await Task.yield() + } + taskCancelled.fulfill() + } + let disposable = workflow + .onStep { _ in + Observable.just(((), ())) + } + .commit() + .subscribe(()) + + XCTAssertFalse(task.isCancelled) + disposable.dispose() + await fulfillment(of: [taskCancelled], timeout: 1) + + XCTAssertTrue(task.isCancelled) + } + + func test_workflowThrowingTask_cancelsWhenWorkflowDisposableIsDisposed() async { + let workflow = Workflow<()>() + let task = workflow.throwingTask { + try await Task.sleep(nanoseconds: 10_000_000_000) + } + let disposable = workflow + .onStep { _ in + Observable.just(((), ())) + } + .commit() + .subscribe(()) + + XCTAssertFalse(task.isCancelled) + disposable.dispose() + + do { + try await task.value + XCTFail("Expected task to throw CancellationError") + } catch is CancellationError { + // Expected. + } catch { + XCTFail("Unexpected error: \(error)") + } + + XCTAssertTrue(task.isCancelled) + } + + func test_stepAsAsyncSequence_emitsStepOutput() async throws { + let workflow = Workflow() + let step = workflow + .onStep { actionableItem in + Observable.just((actionableItem.count, actionableItem)) + } + let sequence = step.asAsyncSequence() + let stepTask = Task { () -> (Int, String)? in + var iterator = sequence.makeAsyncIterator() + return try await iterator.next() + } + + _ = step.commit() + _ = workflow.subscribe("test") + let value = try await stepTask.value + + XCTAssertEqual(value?.0, 4) + XCTAssertEqual(value?.1, "test") + } + + func test_stepAsAsyncSequence_invokesWorkflowDidComplete() async throws { + let workflow = WorkflowConcurrencyTestWorkflow() + let step = workflow + .onStep { actionableItem in + Observable.just((actionableItem, actionableItem)) + } + let sequence = step.asAsyncSequence() + let stepTask = Task { () -> (Int, Int)? in + var iterator = sequence.makeAsyncIterator() + let value = try await iterator.next() + _ = try await iterator.next() + return value + } + + _ = step.commit() + _ = workflow.subscribe(1) + let value = try await stepTask.value + + XCTAssertEqual(value?.0, 1) + XCTAssertEqual(value?.1, 1) + XCTAssertEqual(workflow.completeCallCount, 1) + XCTAssertEqual(workflow.errorCallCount, 0) + } + + func test_stepAsAsyncSequence_invokesWorkflowDidReceiveError() async { + let workflow = WorkflowConcurrencyTestWorkflow() + let step = workflow + .onStep { _ in + Observable<(Int, Int)>.error(WorkflowConcurrencyTestError.error) + } + let sequence = step.asAsyncSequence() + let stepTask = Task { () -> (Int, Int)? in + var iterator = sequence.makeAsyncIterator() + return try await iterator.next() + } + + _ = step.commit() + _ = workflow.subscribe(1) + + do { + _ = try await stepTask.value + XCTFail("Expected async sequence to throw") + } catch WorkflowConcurrencyTestError.error { + // Expected. + } catch { + XCTFail("Unexpected error: \(error)") + } + + XCTAssertEqual(workflow.completeCallCount, 0) + XCTAssertEqual(workflow.errorCallCount, 1) + } + + func test_onAsyncStep_emitsAsyncResult() async throws { + let workflow = Workflow() + let step = workflow + .onStep { actionableItem in + Observable.just((actionableItem, actionableItem)) + } + .onAsyncStep { actionableItem, value in + return (actionableItem + 1, value + 2) + } + let sequence = step.asAsyncSequence() + let stepTask = Task { () -> (Int, Int)? in + var iterator = sequence.makeAsyncIterator() + return try await iterator.next() + } + + _ = step.commit() + _ = workflow.subscribe(1) + let value = try await stepTask.value + + XCTAssertEqual(value?.0, 2) + XCTAssertEqual(value?.1, 3) + } + + func test_onAsyncStep_invokesWorkflowDidReceiveError() async { + let workflow = WorkflowConcurrencyTestWorkflow() + let step = workflow + .onStep { actionableItem in + Observable.just((actionableItem, actionableItem)) + } + .onAsyncStep { _, _ -> (Int, Int) in + throw WorkflowConcurrencyTestError.error + } + let sequence = step.asAsyncSequence() + let stepTask = Task { () -> (Int, Int)? in + var iterator = sequence.makeAsyncIterator() + return try await iterator.next() + } + + _ = step.commit() + _ = workflow.subscribe(1) + + do { + _ = try await stepTask.value + XCTFail("Expected async sequence to throw") + } catch WorkflowConcurrencyTestError.error { + // Expected. + } catch { + XCTFail("Unexpected error: \(error)") + } + + XCTAssertEqual(workflow.completeCallCount, 0) + XCTAssertEqual(workflow.errorCallCount, 1) + } + + func test_onAsyncStep_cancelsTaskWhenWorkflowDisposableIsDisposed() async { + let workflow = Workflow<()>() + let taskStarted = expectation(description: "Async step task started") + let taskCancelled = expectation(description: "Async step task cancelled") + let disposable = workflow + .onStep { _ in + Observable.just(((), ())) + } + .onAsyncStep { _, _ -> ((), ()) in + taskStarted.fulfill() + while !Task.isCancelled { + await Task.yield() + } + taskCancelled.fulfill() + return ((), ()) + } + .commit() + .subscribe(()) + + await fulfillment(of: [taskStarted], timeout: 1) + disposable.dispose() + await fulfillment(of: [taskCancelled], timeout: 1) + } + + func test_asyncSequenceFork_createsWorkflowStep() async { + let workflow = WorkflowConcurrencyTestWorkflow<()>() + let completed = expectation(description: "Workflow completed") + workflow.onComplete = { + completed.fulfill() + } + let asyncSequence = AsyncStream<((), ())> { continuation in + continuation.yield(((), ())) + continuation.finish() + } + + let step: Step<(), (), ()> = asyncSequence.fork(workflow) + _ = step.commit().subscribe(()) + await fulfillment(of: [completed], timeout: 1) + + XCTAssertEqual(workflow.forkCallCount, 1) + XCTAssertEqual(workflow.completeCallCount, 1) + } + + func test_asyncSequenceForkedStepAsAsyncSequence_emitsStepOutput() async throws { + let workflow = WorkflowConcurrencyTestWorkflow<()>() + let asyncSequence = AsyncStream<(Int, String)> { continuation in + continuation.yield((1, "one")) + continuation.finish() + } + + let step: Step<(), Int, String> = asyncSequence.fork(workflow) + let outputSequence = step.asAsyncSequence() + var iterator = outputSequence.makeAsyncIterator() + + let value = try await iterator.next() + let completedValue = try await iterator.next() + + XCTAssertEqual(value?.0, 1) + XCTAssertEqual(value?.1, "one") + XCTAssertNil(completedValue) + XCTAssertEqual(workflow.forkCallCount, 1) + XCTAssertEqual(workflow.completeCallCount, 0) + } + + func test_asyncSequenceFork_nestedStepsDoNotRepeat() async { + var outerStep1RunCount = 0 + var outerStep2RunCount = 0 + var outerStep3RunCount = 0 + + let workflow = WorkflowConcurrencyTestWorkflow<()>() + let completed = expectation(description: "Workflow completed") + workflow.onComplete = { + completed.fulfill() + } + let asyncSequence = AsyncStream<((), ())> { continuation in + continuation.yield(((), ())) + continuation.finish() + } + + let step: Step<(), (), ()> = asyncSequence.fork(workflow) + _ = step + .onStep { _, _ -> Observable<((), ())> in + outerStep1RunCount += 1 + return Observable.just(((), ())) + } + .onStep { _, _ -> Observable<((), ())> in + outerStep2RunCount += 1 + return Observable.just(((), ())) + } + .onStep { _, _ -> Observable<((), ())> in + outerStep3RunCount += 1 + return Observable.just(((), ())) + } + .commit() + .subscribe(()) + await fulfillment(of: [completed], timeout: 1) + + XCTAssertEqual(outerStep1RunCount, 1, "Step 1 should not have been run more than once") + XCTAssertEqual(outerStep2RunCount, 1, "Step 2 should not have been run more than once") + XCTAssertEqual(outerStep3RunCount, 1, "Step 3 should not have been run more than once") + XCTAssertEqual(workflow.completeCallCount, 1) + XCTAssertEqual(workflow.errorCallCount, 0) + XCTAssertEqual(workflow.forkCallCount, 1) + } + + func test_asyncSequenceFork_workflowReceivesError() async { + let workflow = WorkflowConcurrencyTestWorkflow<()>() + let receivedError = expectation(description: "Workflow received error") + workflow.onError = { + receivedError.fulfill() + } + let asyncSequence = AsyncThrowingStream<((), ()), Error> { continuation in + continuation.finish(throwing: WorkflowConcurrencyTestError.error) + } + + let step: Step<(), (), ()> = asyncSequence.fork(workflow) + _ = step.commit().subscribe(()) + await fulfillment(of: [receivedError], timeout: 1) + + XCTAssertEqual(workflow.completeCallCount, 0) + XCTAssertEqual(workflow.errorCallCount, 1) + XCTAssertEqual(workflow.forkCallCount, 1) + guard case WorkflowConcurrencyTestError.error? = workflow.receivedError as? WorkflowConcurrencyTestError else { + XCTFail("Expected workflow to receive WorkflowConcurrencyTestError.error") + return + } + } +} + +private enum WorkflowConcurrencyTestError: Error { + case error +} + +private final class WorkflowConcurrencyTestWorkflow: Workflow { + var completeCallCount = 0 + var errorCallCount = 0 + var forkCallCount = 0 + var receivedError: Error? + var onComplete: (() -> Void)? + var onError: (() -> Void)? + + override func didComplete() { + completeCallCount += 1 + onComplete?() + } + + override func didFork() { + forkCallCount += 1 + } + + override func didReceiveError(_ error: Error) { + errorCallCount += 1 + receivedError = error + onError?() + } +} diff --git a/RIBsTests/ConcurrencyTests.swift b/RIBsTests/ConcurrencyTests.swift deleted file mode 100644 index 93f29ca..0000000 --- a/RIBsTests/ConcurrencyTests.swift +++ /dev/null @@ -1,142 +0,0 @@ -// -// Copyright (c) 2017. Uber Technologies -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// - -import RxSwift -import XCTest -@testable import RIBs - -final class ConcurrencyTests: XCTestCase { - - func test_asyncSequenceConfineTo_yieldsLatestElementWhenInteractorBecomesActive() async throws { - let interactor = Interactor() - var sourceContinuation: AsyncStream.Continuation? - let source = AsyncStream { continuation in - sourceContinuation = continuation - } - var iterator = source.confineTo(interactor).makeAsyncIterator() - - sourceContinuation?.yield(1) - interactor.activate() - let firstValue = try await iterator.next() - sourceContinuation?.yield(2) - let secondValue = try await iterator.next() - interactor.deactivate() - sourceContinuation?.yield(3) - interactor.activate() - let thirdValue = try await iterator.next() - - XCTAssertEqual(firstValue, 1) - XCTAssertEqual(secondValue, 2) - XCTAssertEqual(thirdValue, 3) - } - - func test_taskOnDeactivate_cancelsTaskWhenInteractorDeactivates() async { - let interactor = Interactor() - interactor.activate() - - let task = interactor.taskOnDeactivate { - while !Task.isCancelled { - await Task.yield() - } - } - XCTAssertFalse(task.isCancelled) - - interactor.deactivate() - await Task.yield() - - XCTAssertTrue(task.isCancelled) - } - - func test_taskOnDeactivate_cancelsImmediatelyWhenInteractorIsInactive() async { - let interactor = Interactor() - - let task = interactor.taskOnDeactivate { - while !Task.isCancelled { - await Task.yield() - } - } - await Task.yield() - - XCTAssertTrue(task.isCancelled) - } - - func test_taskOnStop_cancelsTaskWhenWorkerStops() async { - let interactor = Interactor() - let worker = Worker() - interactor.activate() - worker.start(interactor) - - let task = worker.taskOnStop { - while !Task.isCancelled { - await Task.yield() - } - } - XCTAssertFalse(task.isCancelled) - - worker.stop() - await Task.yield() - - XCTAssertTrue(task.isCancelled) - } - - func test_workflowTask_cancelsWhenWorkflowDisposableIsDisposed() async { - let workflow = Workflow<()>() - let task = workflow.task { - while !Task.isCancelled { - await Task.yield() - } - } - let disposable = workflow - .onStep { _ in - Observable.just(((), ())) - } - .commit() - .subscribe(()) - - XCTAssertFalse(task.isCancelled) - disposable.dispose() - await Task.yield() - - XCTAssertTrue(task.isCancelled) - } - - func test_asyncSequenceFork_createsWorkflowStep() { - let workflow = TestWorkflow() - let asyncSequence = AsyncStream<((), ())> { continuation in - continuation.yield(((), ())) - continuation.finish() - } - - let step: Step<(), (), ()> = asyncSequence.fork(workflow) - _ = step.commit().subscribe(()) - - XCTAssertEqual(workflow.forkCallCount, 1) - XCTAssertEqual(workflow.completeCallCount, 1) - } -} - -private class TestWorkflow: Workflow<()> { - var completeCallCount = 0 - var forkCallCount = 0 - - override func didComplete() { - completeCallCount += 1 - } - - override func didFork() { - forkCallCount += 1 - } -}