Skip to content

Add async stream interfaces#51

Open
haeseoklee wants to merge 5 commits into
uber:mainfrom
haeseoklee:add-async-stream-interfaces
Open

Add async stream interfaces#51
haeseoklee wants to merge 5 commits into
uber:mainfrom
haeseoklee:add-async-stream-interfaces

Conversation

@haeseoklee
Copy link
Copy Markdown

@haeseoklee haeseoklee commented May 20, 2026

Summary

Add Swift Concurrency-friendly interfaces on top of the existing RxSwift-based RIBs APIs.

This change keeps RxSwift as the source of truth and adds additive async/await conveniences for observing lifecycle streams, consuming workflow step outputs, confining async sequences to RIB lifecycle, and binding Swift Tasks to existing RIB lifecycle scopes.

Changes

  • Add Observable to async sequence bridges:

    • ObservableType.asAsyncStream(...)
    • ObservableType.asAsyncThrowingStream(...)
  • Add async lifecycle stream conveniences:

    • InteractorScope.isActiveSequence
    • RouterScope.lifecycleSequence
    • Working.isStartedSequence
    • LeakDetector.statusSequence
  • Add Workflow async APIs:

    • Step.asAsyncSequence(...)
    • Step.onAsyncStep(...)
    • AsyncSequence.fork(_:)
  • Add lifecycle-scoped task helpers:

    • Interactor.taskOnDeactivate(...)
    • Interactor.throwingTaskOnDeactivate(...)
    • Worker.taskOnStop(...)
    • Worker.throwingTaskOnStop(...)
    • Workflow.task(...)
    • Workflow.throwingTask(...)
  • Add async sequence lifecycle confinement:

    • AsyncSequence.confineTo(_:)
    • Uses the existing RxSwift confineTo(_:) behavior internally by bridging AsyncSequence to Observable
    • Adds AnyAsyncSequence<Element> to preserve element type information without requiring typed AsyncSequence<Element, Failure> availability
  • Add tests covering

Notes

The async APIs are additive and preserve existing RxSwift APIs.

RxSwift remains the internal runtime for lifecycle and workflow behavior. The async interfaces bridge from the existing observable streams and disposable lifecycle hooks rather than replacing them.

Close #6

@CLAassistant
Copy link
Copy Markdown

CLAassistant commented May 20, 2026

CLA assistant check
All committers have signed the CLA.

@alexvbush
Copy link
Copy Markdown
Collaborator

thank you for your submission @haeseoklee ! This looks very interesting and useful and might be a great addition to the framework! That said I will be thoroughly reviewing this this/next week to check if there is any conflicts with incoming #49

@alexvbush
Copy link
Copy Markdown
Collaborator

@haeseoklee Thanks for submitting this PR again!

I checked out your branch and ran some tests myself and it seems like your changes will merge cleanly with #49 when it eventually lands. No issues there and as you said your changes are just additive.

I'll still done more thorough testing and review of your code soon but so far here's what I found:

While running the test suite I hit two failures in the new ConcurrencyTests that look independent of any environment setup — they reproduce on this branch with a clean checkout:

1. test_asyncSequenceFork_createsWorkflowStep — fails deterministically

ConcurrencyTests.swift:116. The assertion XCTAssertEqual(workflow.completeCallCount, 1) (line 127) fails with 0 != 1 on every run.

The test body is synchronous, but fork(_:) bridges the AsyncSequence into an Observable by consuming it on a Task:

func test_asyncSequenceFork_createsWorkflowStep() {   // not async
    ...
    let step: Step<(), (), ()> = asyncSequence.fork(workflow)
    _ = step.commit().subscribe(())
    XCTAssertEqual(workflow.forkCallCount, 1)       // passes (didFork is sync)
    XCTAssertEqual(workflow.completeCallCount, 1)   // always 0 — async work hasn't run yet
}

forkCallCount passes because didFork() is called synchronously, but the stream is drained asynchronously, so the completion hasn't propagated by the time the assertion runs. The test needs to await the completion (e.g. make it async and await an expectation / the step's output) rather than asserting immediately.

2. test_asyncSequenceConfineTo_yieldsLatestElementWhenInteractorBecomesActive — flaky

ConcurrencyTests.swift:23. Passes intermittently (~1 in 3 for me); when it fails it's thirdValue (line 43) returning Optional(2) instead of Optional(3).

The test interleaves sourceContinuation.yield(...), interactor.activate()/deactivate(), and await iterator.next() synchronously, but confineTo is built on combineLatest(interactorScope.isActiveStream.values, self) — there's no ordering guarantee between the activeness stream and the source after a deactivate→yield→activate sequence, so "the latest value re-emitted on reactivation" races. It'd be worth either making the confinement semantics deterministic or restructuring the test to synchronize on the expected emissions instead of assuming step-by-step ordering.

@haeseoklee
Copy link
Copy Markdown
Author

haeseoklee commented May 30, 2026

@alexvbush
Thank you for checking this out and for the detailed feedback!

  1. test_asyncSequenceFork_createsWorkflowStep

fork(_:) consumes the underlying AsyncSequence asynchronously, so the previous test was asserting completeCallCount too early. I updated the test to be async and to wait for workflow completion before asserting.

  1. test_asyncSequenceConfineTo_yieldsLatestElementWhenInteractorBecomesActive

I updated the confinement implementation to delegate to the existing RxSwift confineTo(_:) behavior internally, so it preserves the current push-based Rx semantics, and adjusted the tests to synchronize on expected emissions instead of assuming immediate iterator ordering.

While making these changes, I also reorganized the new concurrency tests by feature area and added a bit more lifecycle/cancellation coverage.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Modernization: Add conveniences around async/await

3 participants