Skip to content

Commit a64160a

Browse files
Thibault Wittembergtwittemb
authored andcommitted
codex: address warnings and deprecations
1 parent 3ca72ea commit a64160a

34 files changed

Lines changed: 191 additions & 256 deletions

Sources/AsyncChannels/AsyncBufferedChannel.swift

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -157,27 +157,7 @@ public final class AsyncBufferedChannel<Element: Sendable>: AsyncSequence, Senda
157157
let awaitingId = self.generateId()
158158
let cancellation = ManagedCriticalState<Bool>(false)
159159

160-
return await withTaskCancellationHandler { [state] in
161-
let awaiting = state.withCriticalRegion { state -> Awaiting? in
162-
cancellation.withCriticalRegion { cancellation in
163-
cancellation = true
164-
}
165-
switch state {
166-
case .awaiting(var awaitings):
167-
let awaiting = awaitings.remove(.placeHolder(id: awaitingId))
168-
if awaitings.isEmpty {
169-
state = .idle
170-
} else {
171-
state = .awaiting(awaitings)
172-
}
173-
return awaiting
174-
default:
175-
return nil
176-
}
177-
}
178-
179-
awaiting?.continuation?.resume(returning: nil)
180-
} operation: {
160+
return await withTaskCancellationHandler {
181161
await withUnsafeContinuation { [state] (continuation: UnsafeContinuation<Element?, Never>) in
182162
let decision = state.withCriticalRegion { state -> AwaitingDecision in
183163
let isCancelled = cancellation.withCriticalRegion { $0 }
@@ -218,6 +198,26 @@ public final class AsyncBufferedChannel<Element: Sendable>: AsyncSequence, Senda
218198
onSuspend?()
219199
}
220200
}
201+
} onCancel: { [state] in
202+
let awaiting = state.withCriticalRegion { state -> Awaiting? in
203+
cancellation.withCriticalRegion { cancellation in
204+
cancellation = true
205+
}
206+
switch state {
207+
case .awaiting(var awaitings):
208+
let awaiting = awaitings.remove(.placeHolder(id: awaitingId))
209+
if awaitings.isEmpty {
210+
state = .idle
211+
} else {
212+
state = .awaiting(awaitings)
213+
}
214+
return awaiting
215+
default:
216+
return nil
217+
}
218+
}
219+
220+
awaiting?.continuation?.resume(returning: nil)
221221
}
222222
}
223223

Sources/AsyncChannels/AsyncThrowingBufferedChannel.swift

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -178,27 +178,7 @@ public final class AsyncThrowingBufferedChannel<Element, Failure: Error>: AsyncS
178178
let awaitingId = self.generateId()
179179
let cancellation = ManagedCriticalState<Bool>(false)
180180

181-
return try await withTaskCancellationHandler { [state] in
182-
let awaiting = state.withCriticalRegion { state -> Awaiting? in
183-
cancellation.withCriticalRegion { cancellation in
184-
cancellation = true
185-
}
186-
switch state {
187-
case .awaiting(var awaitings):
188-
let awaiting = awaitings.remove(.placeHolder(id: awaitingId))
189-
if awaitings.isEmpty {
190-
state = .idle
191-
} else {
192-
state = .awaiting(awaitings)
193-
}
194-
return awaiting
195-
default:
196-
return nil
197-
}
198-
}
199-
200-
awaiting?.continuation?.resume(returning: nil)
201-
} operation: {
181+
return try await withTaskCancellationHandler {
202182
try await withUnsafeThrowingContinuation { [state] (continuation: UnsafeContinuation<Element?, Error>) in
203183
let decision = state.withCriticalRegion { state -> AwaitingDecision in
204184
let isCancelled = cancellation.withCriticalRegion { $0 }
@@ -245,6 +225,26 @@ public final class AsyncThrowingBufferedChannel<Element, Failure: Error>: AsyncS
245225
onSuspend?()
246226
}
247227
}
228+
} onCancel: { [state] in
229+
let awaiting = state.withCriticalRegion { state -> Awaiting? in
230+
cancellation.withCriticalRegion { cancellation in
231+
cancellation = true
232+
}
233+
switch state {
234+
case .awaiting(var awaitings):
235+
let awaiting = awaitings.remove(.placeHolder(id: awaitingId))
236+
if awaitings.isEmpty {
237+
state = .idle
238+
} else {
239+
state = .awaiting(awaitings)
240+
}
241+
return awaiting
242+
default:
243+
return nil
244+
}
245+
}
246+
247+
awaiting?.continuation?.resume(returning: nil)
248248
}
249249
}
250250

Sources/Combiners/Merge/MergeStateMachine.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -197,8 +197,6 @@ struct MergeStateMachine<Element>: Sendable {
197197

198198
func next() async -> RegulatedElement<Element> {
199199
await withTaskCancellationHandler {
200-
self.unsuspendAndClearOnCancel()
201-
} operation: {
202200
self.requestNextRegulatedElements()
203201

204202
let regulatedElement = await withUnsafeContinuation { (continuation: UnsafeContinuation<RegulatedElement<Element>, Never>) in
@@ -245,6 +243,8 @@ struct MergeStateMachine<Element>: Sendable {
245243
}
246244

247245
return regulatedElement
246+
} onCancel: {
247+
self.unsuspendAndClearOnCancel()
248248
}
249249
}
250250
}

Sources/Combiners/WithLatestFrom/AsyncWithLatestFrom2Sequence.swift

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -172,12 +172,7 @@ where Other1: Sendable, Other2: Sendable, Other1.Element: Sendable, Other2.Eleme
172172
let shouldReturnNil = self.isTerminated.withCriticalRegion { $0 }
173173
guard !shouldReturnNil else { return nil }
174174

175-
return try await withTaskCancellationHandler { [isTerminated, othersTask] in
176-
isTerminated.withCriticalRegion { isTerminated in
177-
isTerminated = true
178-
}
179-
othersTask?.cancel()
180-
} operation: { [othersTask, othersState, onBaseElement] in
175+
return try await withTaskCancellationHandler { [othersTask, othersState, onBaseElement] in
181176
do {
182177
while true {
183178
guard let baseElement = try await self.base.next() else {
@@ -219,6 +214,11 @@ where Other1: Sendable, Other2: Sendable, Other1.Element: Sendable, Other2.Eleme
219214
othersTask?.cancel()
220215
throw error
221216
}
217+
} onCancel: { [isTerminated, othersTask] in
218+
isTerminated.withCriticalRegion { isTerminated in
219+
isTerminated = true
220+
}
221+
othersTask?.cancel()
222222
}
223223
}
224224
}

Sources/Combiners/WithLatestFrom/AsyncWithLatestFromSequence.swift

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -121,9 +121,7 @@ where Other: Sendable, Other.Element: Sendable {
121121
public mutating func next() async rethrows -> Element? {
122122
guard !self.isTerminated else { return nil }
123123

124-
return try await withTaskCancellationHandler { [otherTask] in
125-
otherTask?.cancel()
126-
} operation: { [otherTask, otherState, onBaseElement] in
124+
return try await withTaskCancellationHandler { [otherTask, otherState, onBaseElement] in
127125
do {
128126
while true {
129127
guard let baseElement = try await self.base.next() else {
@@ -157,6 +155,8 @@ where Other: Sendable, Other.Element: Sendable {
157155
otherTask?.cancel()
158156
throw error
159157
}
158+
} onCancel: { [otherTask] in
159+
otherTask?.cancel()
160160
}
161161
}
162162
}

Sources/Combiners/Zip/Zip2Runtime.swift

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -148,12 +148,6 @@ where Base1: Sendable, Base1.Element: Sendable, Base2: Sendable, Base2.Element:
148148

149149
func next() async rethrows -> (Base1.Element, Base2.Element)? {
150150
try await withTaskCancellationHandler {
151-
let output = self.stateMachine.withCriticalRegion { stateMachine in
152-
stateMachine.rootTaskIsCancelled()
153-
}
154-
155-
self.handle(rootTaskIsCancelledOutput: output)
156-
} operation: {
157151
let results = await withUnsafeContinuation { (continuation: UnsafeContinuation<(Result<Base1.Element, Error>, Result<Base2.Element, Error>)?, Never>) in
158152
let output = self.stateMachine.withCriticalRegion { stateMachine in
159153
stateMachine.newDemandFromConsumer(suspendedDemand: continuation)
@@ -173,6 +167,12 @@ where Base1: Sendable, Base1.Element: Sendable, Base2: Sendable, Base2.Element:
173167
self.handle(demandIsFulfilledOutput: output)
174168

175169
return try (results.0._rethrowGet(), results.1._rethrowGet())
170+
} onCancel: {
171+
let output = self.stateMachine.withCriticalRegion { stateMachine in
172+
stateMachine.rootTaskIsCancelled()
173+
}
174+
175+
self.handle(rootTaskIsCancelledOutput: output)
176176
}
177177
}
178178

Sources/Combiners/Zip/Zip3Runtime.swift

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -186,12 +186,6 @@ where Base1: Sendable, Base1.Element: Sendable, Base2: Sendable, Base2.Element:
186186

187187
func next() async rethrows -> (Base1.Element, Base2.Element, Base3.Element)? {
188188
try await withTaskCancellationHandler {
189-
let output = self.stateMachine.withCriticalRegion { stateMachine in
190-
stateMachine.rootTaskIsCancelled()
191-
}
192-
193-
self.handle(rootTaskIsCancelledOutput: output)
194-
} operation: {
195189
let results = await withUnsafeContinuation { (continuation: UnsafeContinuation<(Result<Base1.Element, Error>, Result<Base2.Element, Error>, Result<Base3.Element, Error>)?, Never>) in
196190
let output = self.stateMachine.withCriticalRegion { stateMachine in
197191
stateMachine.newDemandFromConsumer(suspendedDemand: continuation)
@@ -211,6 +205,12 @@ where Base1: Sendable, Base1.Element: Sendable, Base2: Sendable, Base2.Element:
211205
self.handle(demandIsFulfilledOutput: output)
212206

213207
return try (results.0._rethrowGet(), results.1._rethrowGet(), results.2._rethrowGet())
208+
} onCancel: {
209+
let output = self.stateMachine.withCriticalRegion { stateMachine in
210+
stateMachine.rootTaskIsCancelled()
211+
}
212+
213+
self.handle(rootTaskIsCancelledOutput: output)
214214
}
215215
}
216216

Sources/Combiners/Zip/ZipRuntime.swift

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -120,12 +120,6 @@ where Base: Sendable, Base.Element: Sendable {
120120

121121
func next() async rethrows -> [Base.Element]? {
122122
try await withTaskCancellationHandler {
123-
let output = self.stateMachine.withCriticalRegion { stateMachine in
124-
stateMachine.rootTaskIsCancelled()
125-
}
126-
127-
self.handle(rootTaskIsCancelledOutput: output)
128-
} operation: {
129123
let results = await withUnsafeContinuation { (continuation: UnsafeContinuation<[Int: Result<Base.Element, Error>]?, Never>) in
130124
let output = self.stateMachine.withCriticalRegion { stateMachine in
131125
stateMachine.newDemandFromConsumer(suspendedDemand: continuation)
@@ -145,6 +139,12 @@ where Base: Sendable, Base.Element: Sendable {
145139
self.handle(demandIsFulfilledOutput: output)
146140

147141
return try results.sorted { $0.key < $1.key }.map { try $0.value._rethrowGet() }
142+
} onCancel: {
143+
let output = self.stateMachine.withCriticalRegion { stateMachine in
144+
stateMachine.rootTaskIsCancelled()
145+
}
146+
147+
self.handle(rootTaskIsCancelledOutput: output)
148148
}
149149
}
150150

Sources/Creators/AsyncTimerSequence.swift

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -78,11 +78,11 @@ public struct AsyncTimerSequence: AsyncSequence {
7878
}
7979

8080
public mutating func next() async -> Element? {
81-
await withTaskCancellationHandler { [task] in
82-
task.cancel()
83-
} operation: {
81+
await withTaskCancellationHandler {
8482
guard !Task.isCancelled else { return nil }
8583
return await self.iterator.next()
84+
} onCancel: { [task] in
85+
task.cancel()
8686
}
8787
}
8888
}

Sources/Operators/AsyncMulticastSequence.swift

Lines changed: 28 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -101,47 +101,42 @@ where Base.Element == Subject.Element, Subject.Failure == Error, Base.AsyncItera
101101

102102
/// Allow the `AsyncIterator` to produce elements.
103103
public func connect() {
104+
self.isConnected.apply(criticalState: true)
104105
self.connectedGate.send(())
105106
}
106107

107108
func next() async {
108-
guard !Task.isCancelled else { return }
109-
110-
let (canAccessBase, iterator) = self.state.withCriticalRegion { state -> (Bool, Base.AsyncIterator?) in
111-
switch state {
112-
case .available(let iterator):
113-
state = .busy
114-
return (true, iterator)
115-
case .busy:
116-
return (false, nil)
109+
await Task {
110+
let (canAccessBase, iterator) = self.state.withCriticalRegion { state -> (Bool, Base.AsyncIterator?) in
111+
switch state {
112+
case .available(let iterator):
113+
state = .busy
114+
return (true, iterator)
115+
case .busy:
116+
return (false, nil)
117+
}
118+
}
119+
120+
guard canAccessBase, var iterator = iterator else { return }
121+
122+
let toSend: Result<Element?, Error>
123+
do {
124+
let element = try await iterator.next()
125+
toSend = .success(element)
126+
} catch {
127+
toSend = .failure(error)
117128
}
118-
}
119129

120-
guard canAccessBase, var iterator = iterator else { return }
121-
defer {
122130
self.state.withCriticalRegion { state in
123131
state = .available(iterator)
124132
}
125-
}
126-
127-
guard !Task.isCancelled else { return }
128133

129-
let toSend: Result<Element?, Error>
130-
do {
131-
let element = try await iterator.next()
132-
toSend = .success(element)
133-
} catch {
134-
guard !Task.isCancelled else { return }
135-
toSend = .failure(error)
136-
}
137-
138-
guard !Task.isCancelled else { return }
139-
140-
switch toSend {
141-
case .success(.some(let element)): self.subject.send(element)
142-
case .success(.none): self.subject.send(.finished)
143-
case .failure(let error): self.subject.send(.failure(error))
144-
}
134+
switch toSend {
135+
case .success(.some(let element)): self.subject.send(element)
136+
case .success(.none): self.subject.send(.finished)
137+
case .failure(let error): self.subject.send(.failure(error))
138+
}
139+
}.value
145140
}
146141

147142
public func makeAsyncIterator() -> AsyncIterator {
@@ -163,14 +158,8 @@ where Base.Element == Subject.Element, Subject.Failure == Error, Base.AsyncItera
163158
public mutating func next() async rethrows -> Element? {
164159
guard !Task.isCancelled else { return nil }
165160

166-
let shouldWaitForGate = self.isConnected.withCriticalRegion { isConnected -> Bool in
167-
if !isConnected {
168-
isConnected = true
169-
return true
170-
}
171-
return false
172-
}
173-
if shouldWaitForGate {
161+
let isConnected = self.isConnected.withCriticalRegion { $0 }
162+
if !isConnected {
174163
await self.connectedGateIterator.next()
175164
}
176165

0 commit comments

Comments
 (0)