Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
4f81dfa
Linux build (#37)
lhoward Jul 18, 2023
3d685a5
Use OpenCombine on Linux (#37)
lhoward Jul 18, 2023
633491c
Android build
lhoward Oct 1, 2024
04f2c3d
workaround for swiftlang/swift#77315 compiler crash
lhoward Nov 1, 2024
9d66bde
Fix race condition in `AsyncCurrentValueSubject`
wfltaylor Jan 31, 2024
d6dbbae
Fix race condition in `AsyncThrowingCurrentValueSubject`
wfltaylor Jan 31, 2024
4246f11
fix: remove overlapping operators from swift-async-algorithms
lachenmayer Dec 6, 2023
484ea76
docs: remove sentence about overlaps in README
lachenmayer Dec 6, 2023
dd30442
re-add variadic merge
lachenmayer Dec 6, 2023
f41a9fb
Revert "Merge pull request #32 from sideeffect-io/fix/multicast-upstr…
lachenmayer Dec 18, 2023
e3de960
fix test compilation
lachenmayer Apr 18, 2024
8f7a200
update for new withTaskCancellationHandler parameter order
lhoward Nov 21, 2024
cfdfcec
don't use @_implementationOnly
lhoward Nov 21, 2024
84d16b6
use Atomics package instead of Locking where possible
lhoward Nov 21, 2024
b16126f
update to use await fulfillment(of:timeout:) testing API
lhoward Nov 21, 2024
ec16e3b
only take lock once in handleNewConsumer()
lhoward Nov 22, 2024
d050ac1
use Array instead of OrderedSet in awaitings, seems to fix memory leak
lhoward Nov 22, 2024
206d7b6
remove Package.resolved
lhoward Jan 3, 2025
d51c4aa
don't import Foundation if FoundationEssentials/Dispatch available
lhoward Jan 3, 2025
8f1da30
unregister channels after iteration complete
lhoward Feb 11, 2026
13e7d02
Fix impossible AND condition in MergeStateMachine error/termination c…
lhoward Mar 22, 2026
06741fe
Fix race condition in AsyncThrowingCurrentValueSubject.handleNewConsumer
lhoward Mar 22, 2026
c62574d
Fix race condition in AsyncReplaySubject.handleNewConsumer
lhoward Mar 22, 2026
b97d381
Fix race condition in AsyncThrowingReplaySubject.handleNewConsumer
lhoward Mar 22, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@
xcuserdata/
DerivedData/
.swiftpm/xcode/package.xcworkspace/contents.xcworkspacedata
Package.resolved
16 changes: 0 additions & 16 deletions Package.resolved

This file was deleted.

34 changes: 21 additions & 13 deletions Package.swift
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// swift-tools-version:5.5
// swift-tools-version:6.0
// The swift-tools-version declares the minimum version of Swift required to build this package.

import PackageDescription
Expand All @@ -16,23 +16,31 @@ let package = Package(
name: "AsyncExtensions",
targets: ["AsyncExtensions"]),
],
dependencies: [.package(url: "https://github.com/apple/swift-collections.git", .upToNextMajor(from: "1.0.3"))],
dependencies: [
.package(url: "https://github.com/apple/swift-collections.git", .upToNextMajor(from: "1.0.3")),
.package(url: "https://github.com/apple/swift-async-algorithms.git", .upToNextMajor(from: "1.0.0")),
.package(url: "https://github.com/OpenCombine/OpenCombine.git", from: "0.14.0"),
.package(url: "https://github.com/apple/swift-atomics.git", .upToNextMajor(from: "1.2.0")),
],
targets: [
.target(
name: "AsyncExtensions",
dependencies: [.product(name: "Collections", package: "swift-collections")],
path: "Sources"
// ,
// swiftSettings: [
// .unsafeFlags([
// "-Xfrontend", "-warn-concurrency",
// "-Xfrontend", "-enable-actor-data-race-checks",
// ])
// ]
dependencies: [
.product(name: "Collections", package: "swift-collections"),
.product(name: "Atomics", package: "swift-atomics")
],
path: "Sources",
swiftSettings: [.swiftLanguageMode(.v5)]
),
.testTarget(
name: "AsyncExtensionsTests",
dependencies: ["AsyncExtensions"],
path: "Tests"),
dependencies: [
"AsyncExtensions",
.product(name: "OpenCombine", package: "OpenCombine", condition: .when(platforms: [.linux])),
.product(name: "AsyncAlgorithms", package: "swift-async-algorithms")
],
path: "Tests",
swiftSettings: [.swiftLanguageMode(.v5)]
),
]
)
8 changes: 1 addition & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

**AsyncExtensions** provides a collection of operators that intends to ease the creation and combination of `AsyncSequences`.

**AsyncExtensions** can be seen as a companion to Apple [swift-async-algorithms](https://github.com/apple/swift-async-algorithms). For now there is an overlap between both libraries, but when **swift-async-algorithms** becomes stable the overlapping operators while be deprecated in **AsyncExtensions**. Nevertheless **AsyncExtensions** will continue to provide the operators that the community needs and are not provided by Apple.
**AsyncExtensions** can be seen as a companion to Apple [swift-async-algorithms](https://github.com/apple/swift-async-algorithms), which provides operators that the community needs and are not provided by Apple.

## Adding AsyncExtensions as a Dependency

Expand Down Expand Up @@ -44,11 +44,6 @@ AsyncStream)
* [AsyncThrowingReplaySubject](./Sources/AsyncSubjects/AsyncThrowingReplaySubject.swift): Throwing subject with a shared output. Maintains and replays a buffered amount of values

### Combiners
* [`zip(_:_:)`](./Sources/Combiners/Zip/AsyncZip2Sequence.swift): Zips two `AsyncSequence` into an AsyncSequence of tuple of elements
* [`zip(_:_:_:)`](./Sources/Combiners/Zip/AsyncZip3Sequence.swift): Zips three `AsyncSequence` into an AsyncSequence of tuple of elements
* [`zip(_:)`](./Sources/Combiners/Zip/AsyncZipSequence.swift): Zips any async sequences into an array of elements
* [`merge(_:_:)`](./Sources/Combiners/Merge/AsyncMerge2Sequence.swift): Merges two `AsyncSequence` into an AsyncSequence of elements
* [`merge(_:_:_:)`](./Sources/Combiners/Merge/AsyncMerge3Sequence.swift): Merges three `AsyncSequence` into an AsyncSequence of elements
* [`merge(_:)`](./Sources/Combiners/Merge/AsyncMergeSequence.swift): Merges any `AsyncSequence` into an AsyncSequence of elements
* [`withLatest(_:)`](./Sources/Combiners/WithLatestFrom/AsyncWithLatestFromSequence.swift): Combines elements from self with the last known element from an other `AsyncSequence`
* [`withLatest(_:_:)`](./Sources/Combiners/WithLatestFrom/AsyncWithLatestFrom2Sequence.swift): Combines elements from self with the last known elements from two other async sequences
Expand All @@ -58,7 +53,6 @@ AsyncStream)
* [AsyncFailSequence](./Sources/Creators/AsyncFailSequence.swift): Creates an `AsyncSequence` that immediately fails
* [AsyncJustSequence](./Sources/Creators/AsyncJustSequence.swift): Creates an `AsyncSequence` that emits an element an finishes
* [AsyncThrowingJustSequence](./Sources/Creators/AsyncThrowingJustSequence.swift): Creates an `AsyncSequence` that emits an elements and finishes bases on a throwing closure
* [AsyncLazySequence](./Sources/Creators/AsyncLazySequence.swift): Creates an `AsyncSequence` of the elements from the base sequence
* [AsyncTimerSequence](./Sources/Creators/AsyncTimerSequence.swift): Creates an `AsyncSequence` that emits a date value periodically
* [AsyncStream Pipe](./Sources/Creators/AsyncStream+Pipe.swift): Creates an AsyncStream and returns a tuple standing for its inputs and outputs

Expand Down
33 changes: 19 additions & 14 deletions Sources/AsyncChannels/AsyncBufferedChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
// Created by Thibault Wittemberg on 07/01/2022.
//

import Atomics
import DequeModule
import OrderedCollections

Expand Down Expand Up @@ -69,27 +70,24 @@ public final class AsyncBufferedChannel<Element: Sendable>: AsyncSequence, Senda
enum State: @unchecked Sendable {
case idle
case queued(Deque<Value>)
case awaiting(OrderedSet<Awaiting>)
case awaiting([Awaiting])
case finished

static var initial: State {
.idle
}
}

let ids: ManagedCriticalState<Int>
let ids: ManagedAtomic<Int>
let state: ManagedCriticalState<State>

public init() {
self.ids = ManagedCriticalState(0)
self.ids = ManagedAtomic(0)
self.state = ManagedCriticalState(.initial)
}

func generateId() -> Int {
self.ids.withCriticalRegion { ids in
ids += 1
return ids
}
ids.wrappingIncrementThenLoad(by: 1, ordering: .relaxed)
}

var hasBufferedElements: Bool {
Expand Down Expand Up @@ -155,12 +153,12 @@ public final class AsyncBufferedChannel<Element: Sendable>: AsyncSequence, Senda

func next(onSuspend: (() -> Void)? = nil) async -> Element? {
let awaitingId = self.generateId()
let cancellation = ManagedCriticalState<Bool>(false)
let cancellation = ManagedAtomic<Bool>(false)

return await withTaskCancellationHandler {
await withUnsafeContinuation { [state] (continuation: UnsafeContinuation<Element?, Never>) in
let decision = state.withCriticalRegion { state -> AwaitingDecision in
let isCancelled = cancellation.withCriticalRegion { $0 }
let isCancelled = cancellation.load(ordering: .acquiring)
guard !isCancelled else { return .resume(nil) }

switch state {
Expand All @@ -184,7 +182,13 @@ public final class AsyncBufferedChannel<Element: Sendable>: AsyncSequence, Senda
return .suspend
}
case .awaiting(var awaitings):
awaitings.updateOrAppend(Awaiting(id: awaitingId, continuation: continuation))
let awaiting = Awaiting(id: awaitingId, continuation: continuation)

if let index = awaitings.firstIndex(where: { $0 == awaiting }) {
awaitings[index] = awaiting
} else {
awaitings.append(awaiting)
}
state = .awaiting(awaitings)
return .suspend
case .finished:
Expand All @@ -200,12 +204,13 @@ public final class AsyncBufferedChannel<Element: Sendable>: AsyncSequence, Senda
}
} onCancel: { [state] in
let awaiting = state.withCriticalRegion { state -> Awaiting? in
cancellation.withCriticalRegion { cancellation in
cancellation = true
}
cancellation.store(true, ordering: .releasing)
switch state {
case .awaiting(var awaitings):
let awaiting = awaitings.remove(.placeHolder(id: awaitingId))
let index = awaitings.firstIndex(where: { $0 == .placeHolder(id: awaitingId) })
guard let index else { return nil }
let awaiting = awaitings[index]
awaitings.remove(at: index)
if awaitings.isEmpty {
state = .idle
} else {
Expand Down
33 changes: 19 additions & 14 deletions Sources/AsyncChannels/AsyncThrowingBufferedChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
// Created by Thibault Wittemberg on 07/01/2022.
//

import Atomics
import DequeModule
import OrderedCollections

Expand Down Expand Up @@ -80,27 +81,24 @@ public final class AsyncThrowingBufferedChannel<Element, Failure: Error>: AsyncS
enum State: @unchecked Sendable {
case idle
case queued(Deque<Value>)
case awaiting(OrderedSet<Awaiting>)
case awaiting([Awaiting])
case terminated(Termination)

static var initial: State {
.idle
}
}

let ids: ManagedCriticalState<Int>
let ids: ManagedAtomic<Int>
let state: ManagedCriticalState<State>

public init() {
self.ids = ManagedCriticalState(0)
self.ids = ManagedAtomic(0)
self.state = ManagedCriticalState(.initial)
}

func generateId() -> Int {
self.ids.withCriticalRegion { ids in
ids += 1
return ids
}
ids.wrappingIncrementThenLoad(by: 1, ordering: .relaxed)
}

var hasBufferedElements: Bool {
Expand Down Expand Up @@ -176,12 +174,12 @@ public final class AsyncThrowingBufferedChannel<Element, Failure: Error>: AsyncS

func next(onSuspend: (() -> Void)? = nil) async throws -> Element? {
let awaitingId = self.generateId()
let cancellation = ManagedCriticalState<Bool>(false)
let cancellation = ManagedAtomic<Bool>(false)

return try await withTaskCancellationHandler {
try await withUnsafeThrowingContinuation { [state] (continuation: UnsafeContinuation<Element?, Error>) in
let decision = state.withCriticalRegion { state -> AwaitingDecision in
let isCancelled = cancellation.withCriticalRegion { $0 }
let isCancelled = cancellation.load(ordering: .acquiring)
guard !isCancelled else { return .resume(nil) }

switch state {
Expand All @@ -208,7 +206,13 @@ public final class AsyncThrowingBufferedChannel<Element, Failure: Error>: AsyncS
return .suspend
}
case .awaiting(var awaitings):
awaitings.updateOrAppend(Awaiting(id: awaitingId, continuation: continuation))
let awaiting = Awaiting(id: awaitingId, continuation: continuation)

if let index = awaitings.firstIndex(where: { $0 == awaiting }) {
awaitings[index] = awaiting
} else {
awaitings.append(awaiting)
}
state = .awaiting(awaitings)
return .suspend
case .terminated(.finished):
Expand All @@ -227,12 +231,13 @@ public final class AsyncThrowingBufferedChannel<Element, Failure: Error>: AsyncS
}
} onCancel: { [state] in
let awaiting = state.withCriticalRegion { state -> Awaiting? in
cancellation.withCriticalRegion { cancellation in
cancellation = true
}
cancellation.store(true, ordering: .releasing)
switch state {
case .awaiting(var awaitings):
let awaiting = awaitings.remove(.placeHolder(id: awaitingId))
let index = awaitings.firstIndex(where: { $0 == .placeHolder(id: awaitingId) })
guard let index else { return nil }
let awaiting = awaitings[index]
awaitings.remove(at: index)
if awaitings.isEmpty {
state = .idle
} else {
Expand Down
71 changes: 43 additions & 28 deletions Sources/AsyncSubjects/AsyncCurrentValueSubject.swift
Original file line number Diff line number Diff line change
Expand Up @@ -67,54 +67,57 @@ public final class AsyncCurrentValueSubject<Element>: AsyncSubject where Element
/// Sends a value to all consumers
/// - Parameter element: the value to send
public func send(_ element: Element) {
self.state.withCriticalRegion { state in
let channels = self.state.withCriticalRegion { state -> [AsyncBufferedChannel<Element>] in
state.current = element
for channel in state.channels.values {
channel.send(element)
}
return Array(state.channels.values)
}

for channel in channels {
channel.send(element)
}
}

/// Finishes the async sequences with a normal ending.
/// - Parameter termination: The termination to finish the subject.
public func send(_ termination: Termination<Failure>) {
self.state.withCriticalRegion { state in
let channels = self.state.withCriticalRegion { state -> [AsyncBufferedChannel<Element>] in
state.terminalState = termination
let channels = Array(state.channels.values)
state.channels.removeAll()
for channel in channels {
channel.finish()
}
return channels
}

for channel in channels {
channel.finish()
}
}

func handleNewConsumer() -> (iterator: AsyncBufferedChannel<Element>.Iterator, unregister: @Sendable () -> Void) {
let asyncBufferedChannel = AsyncBufferedChannel<Element>()
var consumerId: Int!
var unregister: (@Sendable () -> Void)?

let (terminalState, current) = self.state.withCriticalRegion { state -> (Termination?, Element) in
(state.terminalState, state.current)
}

if let terminalState = terminalState, terminalState.isFinished {
asyncBufferedChannel.finish()
return (asyncBufferedChannel.makeAsyncIterator(), {})
}

asyncBufferedChannel.send(current)

let consumerId = self.state.withCriticalRegion { state -> Int in
state.ids += 1
state.channels[state.ids] = asyncBufferedChannel
return state.ids
self.state.withCriticalRegion { state in
let terminalState = state.terminalState
if let terminalState, terminalState.isFinished {
asyncBufferedChannel.finish()
} else {
state.ids &+= 1
consumerId = state.ids
state.channels[consumerId] = asyncBufferedChannel
asyncBufferedChannel.send(state.current)
}
}

let unregister = { @Sendable [state] in
state.withCriticalRegion { state in
state.channels[consumerId] = nil
if let consumerId {
unregister = { @Sendable [state, consumerId] in
state.withCriticalRegion { state in
state.channels[consumerId] = nil
}
}
}

return (asyncBufferedChannel.makeAsyncIterator(), unregister)
return (asyncBufferedChannel.makeAsyncIterator(), unregister ?? {})
}

public func makeAsyncIterator() -> AsyncIterator {
Expand All @@ -124,6 +127,7 @@ public final class AsyncCurrentValueSubject<Element>: AsyncSubject where Element
public struct Iterator: AsyncSubjectIterator {
var iterator: AsyncBufferedChannel<Element>.Iterator
let unregister: @Sendable () -> Void
var isFinished = false

init(asyncSubject: AsyncCurrentValueSubject) {
(self.iterator, self.unregister) = asyncSubject.handleNewConsumer()
Expand All @@ -134,11 +138,22 @@ public final class AsyncCurrentValueSubject<Element>: AsyncSubject where Element
}

public mutating func next() async -> Element? {
await withTaskCancellationHandler {
// Don't proceed if we've already finished
guard !isFinished else { return nil }

let result = await withTaskCancellationHandler {
await self.iterator.next()
} onCancel: { [unregister] in
unregister()
}

// If iteration completed normally (returned nil), unregister the channel
if result == nil {
isFinished = true
unregister()
}

return result
}
}
}
Loading