diff --git a/.changes/typed-disconnect-error b/.changes/typed-disconnect-error new file mode 100644 index 000000000..a29bde27f --- /dev/null +++ b/.changes/typed-disconnect-error @@ -0,0 +1 @@ +patch type="fixed" "Report transport-level disconnects as LiveKitError(.network) instead of LiveKitError(.cancelled) so consumers can distinguish network failures from user-initiated cancellation" diff --git a/.swiftlint.yml b/.swiftlint.yml index 86d8c26ea..4dd1bc1d8 100644 --- a/.swiftlint.yml +++ b/.swiftlint.yml @@ -40,3 +40,14 @@ custom_rules: regex: "@objc(?![(\\[])\\s+(?:(?:public|open|final|internal|package)\\s+)*class\\b" message: "Use @objcMembers instead of @objc for classes to implicitly expose members to Objective-C." severity: warning + public_typed_throws: + name: "Public typed throws" + # `func\s+(?!_objc_)\w+` skips Obj-C-only bridge shims, which by + # convention are named `_objc_*` and are hidden from Swift via + # `@available(swift, obsoleted: 1.0)`. Typed throws is a Swift-side + # concern, so those shims are exempt. + regex: "^\\s*(@\\w+(\\([^)]*\\))?\\s+)*(?:(?:static|class|final|override|convenience|nonisolated)\\s+)*(open|public)\\s+(?:(?:static|class|final|override|convenience|nonisolated)\\s+)*func\\s+(?!_objc_)\\w+(?:<[^>]*>)?\\s*\\((?:[^()]|\\([^()]*\\))*\\)\\s*(async\\s+)?\\bthrows\\b(?!\\s*\\()" + message: "Public throwing methods should declare a typed throws clause (e.g. `throws(LiveKitError)`). Suppress with // swiftlint:disable:next public_typed_throws if propagation is unbounded; include a one-line comment explaining why." + severity: error + included: + - "Sources/LiveKit/.*\\.swift" diff --git a/AGENTS.md b/AGENTS.md index d3d66f283..901f99311 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -130,6 +130,16 @@ private static let playAndRecordOptions: AVAudioSession.CategoryOptions = [.mixW - For non-recoverable errors, propagate with `throws` using `LiveKitError` with proper type/code - Anticipate invalid states at compile time using algebraic data types, typestates, etc. - Unsafe APIs like subscript `[0]` should be wrapped and leverage optional `?` +- Public throwing methods declare typed throws: `throws(LiveKitError)`. Enforced by the `public_typed_throws` SwiftLint rule +- At I/O edges (Foundation/AVAudio/WebRTC/protobuf), wrap with `LiveKitError(from: error)` — passes through existing `LiveKitError`, classifies `CancellationError`/`URLError`/`StreamError`, wraps everything else as `.unknown` with `internalError` set +- Inside `throws(LiveKitError)` contexts, use `try checkCancellation()` (typed helper in `Errors.swift`) instead of `try Task.checkCancellation()` + +#### Typed-throws / Obj-C tradeoffs + +- `@objc` forbids typed throws. Pair `@nonobjc public func foo() throws(LiveKitError)` with an `@objc(originalSelector)` shim named `_objc_foo`, hidden from Swift via `@available(swift, obsoleted: 1.0)` (the rule's regex exempts `_objc_*`) +- `@objc` protocol conformers (e.g. `LocalTrackProtocol`) stay untyped — suppress with a one-line reason +- `Task<_, Failure: Error>` and `withCheckedThrowingContinuation` have no typed-failure initializers; convert at the `await` site with `LiveKitError(from:)` +- Stored typed-throws closures need macOS 15+; until the floor moves, store untyped and convert at the call site ### Coding Style diff --git a/Sources/LiveKit/Audio/AudioSessionEngineObserver.swift b/Sources/LiveKit/Audio/AudioSessionEngineObserver.swift index 3d83b85e2..935f6b537 100644 --- a/Sources/LiveKit/Audio/AudioSessionEngineObserver.swift +++ b/Sources/LiveKit/Audio/AudioSessionEngineObserver.swift @@ -102,7 +102,7 @@ public class AudioSessionEngineObserver: AudioEngineObserver, Loggable, @uncheck /// of the WebRTC engine lifecycle. /// /// - Throws: ``LiveKitError`` if the audio session fails to configure or activate. - public func acquire(requirement: SessionRequirement) throws -> SessionRequirementHandle { + public func acquire(requirement: SessionRequirement) throws(LiveKitError) -> SessionRequirementHandle { let id = UUID() try set(requirement: requirement, for: id) return SessionRequirementHandle(releaseImpl: { [weak self] in @@ -111,7 +111,7 @@ public class AudioSessionEngineObserver: AudioEngineObserver, Loggable, @uncheck }) } - private func set(requirement: SessionRequirement, for id: UUID) throws { + private func set(requirement: SessionRequirement, for id: UUID) throws(LiveKitError) { try updateRequirements { if requirement == .none { $0.removeValue(forKey: id) @@ -121,21 +121,21 @@ public class AudioSessionEngineObserver: AudioEngineObserver, Loggable, @uncheck } } - fileprivate func removeRequirement(for id: UUID) throws { + fileprivate func removeRequirement(for id: UUID) throws(LiveKitError) { try updateRequirements { $0.removeValue(forKey: id) } } - private func updateRequirements(_ block: (inout [UUID: SessionRequirement]) -> Void) throws { - try _state.mutate { - let oldState = $0 - block(&$0.sessionRequirements) - guard $0.sessionRequirements != oldState.sessionRequirements else { return } + private func updateRequirements(_ block: (inout [UUID: SessionRequirement]) -> Void) throws(LiveKitError) { + try _state.mutate { state throws(LiveKitError) in + let oldState = state + block(&state.sessionRequirements) + guard state.sessionRequirements != oldState.sessionRequirements else { return } do { - try configureIfNeeded(oldState: oldState, newState: $0) + try configureIfNeeded(oldState: oldState, newState: state) } catch { - $0 = oldState + state = oldState throw LiveKitError(.audioSession, message: "Failed to configure audio session") } } diff --git a/Sources/LiveKit/Audio/Manager/AudioManager.swift b/Sources/LiveKit/Audio/Manager/AudioManager.swift index 672a64c51..47ae7def3 100644 --- a/Sources/LiveKit/Audio/Manager/AudioManager.swift +++ b/Sources/LiveKit/Audio/Manager/AudioManager.swift @@ -67,6 +67,9 @@ public struct SessionRequirement: OptionSet, Sendable { /// If not released explicitly, the requirement is released automatically on deinit. public final class SessionRequirementHandle: @unchecked Sendable { private struct State { + // Stored as untyped throws because typed-throws function types in + // stored properties need macOS 15+ runtime support; the wrapping + // public API still exposes throws(LiveKitError). var releaseImpl: (@Sendable () throws -> Void)? } @@ -83,17 +86,22 @@ public final class SessionRequirementHandle: @unchecked Sendable { /// Releases the associated audio session requirement. /// /// Releasing the same handle multiple times is a no-op. - public func release() throws { + public func release() throws(LiveKitError) { try releaseIfNeeded() } - private func releaseIfNeeded() throws { + private func releaseIfNeeded() throws(LiveKitError) { let releaseImpl = _state.mutate { state -> (@Sendable () throws -> Void)? in let releaseImpl = state.releaseImpl state.releaseImpl = nil return releaseImpl } - try releaseImpl?() + do { + try releaseImpl?() + } catch { + // Constructed via acquire() whose closure throws only LiveKitError. + throw LiveKitError(from: error) + } } } @@ -326,7 +334,7 @@ public class AudioManager: Loggable { /// Defaults to `true`. public var isVoiceProcessingEnabled: Bool { RTC.audioDeviceModule.isVoiceProcessingEnabled } - public func setVoiceProcessingEnabled(_ enabled: Bool) throws { + public func setVoiceProcessingEnabled(_ enabled: Bool) throws(LiveKitError) { let result = RTC.audioDeviceModule.setVoiceProcessingEnabled(enabled) try checkAdmResult(code: result) } @@ -363,7 +371,7 @@ public class AudioManager: Loggable { /// In this mode, you can provide audio buffers by calling `AudioManager.shared.mixer.capture(appAudio:)` continuously. /// Remote audio will not play out automatically. Get remote mixed audio buffers with `AudioManager.shared.add(localAudioRenderer:)` or individual tracks with ``RemoteAudioTrack/add(audioRenderer:)``. /// - Note: While enabled, the SDK will not configure `AVAudioSession`. Configure it yourself if your app does its own audio I/O. - public func setManualRenderingMode(_ enabled: Bool) throws { + public func setManualRenderingMode(_ enabled: Bool) throws(LiveKitError) { let result = RTC.audioDeviceModule.setManualRenderingMode(enabled) try checkAdmResult(code: result) } @@ -387,14 +395,14 @@ public class AudioManager: Loggable { /// - Note: Microphone permission is required. iOS may prompt if not already granted. /// - Note: This persists across ``Room`` lifecycles and connections until disabled. /// - Throws: An error if the underlying audio device module fails to apply the setting. - public func setRecordingAlwaysPreparedMode(_ enabled: Bool) async throws { + public func setRecordingAlwaysPreparedMode(_ enabled: Bool) async throws(LiveKitError) { let result = RTC.audioDeviceModule.setRecordingAlwaysPreparedMode(enabled) try checkAdmResult(code: result) } /// Starts mic input to the SDK even without any ``Room`` or a connection. /// Audio buffers will flow into ``LocalAudioTrack/add(audioRenderer:)`` and ``capturePostProcessingDelegate``. - public func startLocalRecording() throws { + public func startLocalRecording() throws(LiveKitError) { // Always unmute APM if muted by last session. RTC.audioProcessingModule.isMuted = false // TODO: Possibly not required anymore with new libs // Start recording on the ADM. @@ -403,7 +411,7 @@ public class AudioManager: Loggable { } /// Stops mic input after it was started with ``startLocalRecording()`` - public func stopLocalRecording() throws { + public func stopLocalRecording() throws(LiveKitError) { let result = RTC.audioDeviceModule.stopRecording() try checkAdmResult(code: result) } @@ -423,7 +431,7 @@ public class AudioManager: Loggable { /// This is useful when you need to set up connections without touching the audio /// device yet (e.g., CallKit flows), or to guarantee the engine remains off /// regardless of subscription/publication requests. - public func setEngineAvailability(_ availability: AudioEngineAvailability) throws { + public func setEngineAvailability(_ availability: AudioEngineAvailability) throws(LiveKitError) { let result = RTC.audioDeviceModule.setEngineAvailability(availability.toRTCType()) try checkAdmResult(code: result) } @@ -450,7 +458,7 @@ public class AudioManager: Loggable { /// Acquires an audio session requirement for external ownership. /// /// On platforms without `AVAudioSession`, this returns a no-op handle. - public func acquireSessionRequirement(_ requirement: SessionRequirement) throws -> SessionRequirementHandle { + public func acquireSessionRequirement(_ requirement: SessionRequirement) throws(LiveKitError) -> SessionRequirementHandle { #if os(iOS) || os(visionOS) || os(tvOS) try audioSession.acquire(requirement: requirement) #else @@ -546,7 +554,7 @@ let kAudioEngineErrorAudioSessionCategoryRecordingRequired = -4102 let kAudioEngineErrorInsufficientDevicePermission = -4101 extension AudioManager { - func checkAdmResult(code: Int) throws { + func checkAdmResult(code: Int) throws(LiveKitError) { if code == kAudioEngineErrorFailedToConfigureAudioSession { throw LiveKitError(.audioSession, message: "Failed to configure audio session") } else if code == kAudioEngineErrorInsufficientDevicePermission { diff --git a/Sources/LiveKit/Audio/PlayerNodePool.swift b/Sources/LiveKit/Audio/PlayerNodePool.swift index 8511ecc3a..4fcf2ca6b 100644 --- a/Sources/LiveKit/Audio/PlayerNodePool.swift +++ b/Sources/LiveKit/Audio/PlayerNodePool.swift @@ -63,33 +63,40 @@ class AVAudioPlayerNodePool: @unchecked Sendable, Loggable { } @discardableResult - func play(_ buffer: AVAudioPCMBuffer, loop: Bool = false) throws -> SoundPlayback { - let acquired = try executionQueue.sync { () throws -> AcquiredNode in - guard let index = items.firstIndex(where: { $0.state == .idle }) else { - throw LiveKitError(.audioEngine, message: "No available player nodes") - } - - items[index].state = .inUse - items[index].generation &+= 1 + func play(_ buffer: AVAudioPCMBuffer, loop: Bool = false) throws(LiveKitError) -> SoundPlayback { + let acquired: AcquiredNode + do { + acquired = try executionQueue.sync { () throws -> AcquiredNode in + guard let index = items.firstIndex(where: { $0.state == .idle }) else { + throw LiveKitError(.audioEngine, message: "No available player nodes") + } - let node = items[index].node - let generation = items[index].generation - node.volume = 1.0 - node.pan = 0.0 + items[index].state = .inUse + items[index].generation &+= 1 - if loop { - node.scheduleBuffer(buffer, at: nil, options: .loops) - } else { - node.scheduleBuffer(buffer, completionCallbackType: .dataPlayedBack) { [weak self] _ in - self?.executionQueue.async { [weak self] in - self?.releaseCompletedSlot(index: index, generation: generation) + let node = items[index].node + let generation = items[index].generation + node.volume = 1.0 + node.pan = 0.0 + + if loop { + node.scheduleBuffer(buffer, at: nil, options: .loops) + } else { + node.scheduleBuffer(buffer, completionCallbackType: .dataPlayedBack) { [weak self] _ in + self?.executionQueue.async { [weak self] in + self?.releaseCompletedSlot(index: index, generation: generation) + } } } - } - node.play() + node.play() - return AcquiredNode(index: index, node: node, generation: generation) + return AcquiredNode(index: index, node: node, generation: generation) + } + } catch let error as LiveKitError { + throw error + } catch { + throw LiveKitError(.audioEngine, internalError: error) } return NodePlayback(node: acquired.node) { [weak self] in diff --git a/Sources/LiveKit/Audio/SoundPlayer+Types.swift b/Sources/LiveKit/Audio/SoundPlayer+Types.swift index 3eb193320..d66ac48eb 100644 --- a/Sources/LiveKit/Audio/SoundPlayer+Types.swift +++ b/Sources/LiveKit/Audio/SoundPlayer+Types.swift @@ -85,7 +85,7 @@ public struct SoundHandle: Hashable, Sendable { let id: UUID /// Plays this prepared sound with the provided options. - public func play(options: SoundPlaybackOptions = SoundPlaybackOptions()) async throws { + public func play(options: SoundPlaybackOptions = SoundPlaybackOptions()) async throws(LiveKitError) { try await SoundPlayer.shared.play(self, options: options) } @@ -149,7 +149,7 @@ class PreparedSound { cleanUp() } - func localBuffer(for playerNodeFormat: AVAudioFormat) throws -> AVAudioPCMBuffer { + func localBuffer(for playerNodeFormat: AVAudioFormat) throws(LiveKitError) -> AVAudioPCMBuffer { if let cachedLocalBuffer, let cachedLocalBufferFormat, cachedLocalBufferFormat == playerNodeFormat { return cachedLocalBuffer } diff --git a/Sources/LiveKit/Audio/SoundPlayer.swift b/Sources/LiveKit/Audio/SoundPlayer.swift index 1f6733a5e..732341998 100644 --- a/Sources/LiveKit/Audio/SoundPlayer.swift +++ b/Sources/LiveKit/Audio/SoundPlayer.swift @@ -85,7 +85,7 @@ public final class SoundPlayer: Loggable { /// - Note: Repeated playback of the same short clip should generally reuse a prepared sound /// instead of decoding from disk each time. @discardableResult - public func prepare(fileURL: URL, named name: String? = nil) async throws -> SoundHandle { + public func prepare(fileURL: URL, named name: String? = nil) async throws(LiveKitError) -> SoundHandle { let readBuffer = try await Self.decodeBuffer(from: fileURL) let sessionRequirementHandle = try AudioManager.shared.acquireSessionRequirement(.playbackOnly) let soundId = UUID() @@ -135,7 +135,7 @@ extension SoundPlayer { return format } - func makePlayerNodeFormat(for outputFormat: AVAudioFormat) throws -> AVAudioFormat { + func makePlayerNodeFormat(for outputFormat: AVAudioFormat) throws(LiveKitError) -> AVAudioFormat { guard let format = AVAudioFormat(commonFormat: .pcmFormatFloat32, sampleRate: outputFormat.sampleRate, channels: outputFormat.channelCount, @@ -168,21 +168,25 @@ extension SoundPlayer { invalidateLocalState() } - func reconnectEngine(outputFormat: AVAudioFormat, playerNodeFormat: AVAudioFormat) throws { + func reconnectEngine(outputFormat: AVAudioFormat, playerNodeFormat: AVAudioFormat) throws(LiveKitError) { playerNodePool.stop() engine.stop() engine.disconnect(playerNodePool) playerNodePool.setMaximumFramesToRender(engine.outputNode.auAudioUnit.maximumFramesToRender) engine.connect(playerNodePool, to: engine.mainMixerNode, format: outputFormat, playerNodeFormat: playerNodeFormat) - try engine.start() + do { + try engine.start() + } catch { + throw LiveKitError(.audioEngine, internalError: error) + } localEngineState.connectedOutputFormat = outputFormat localEngineState.playerNodeFormat = playerNodeFormat localEngineState.needsReconnect = false } @discardableResult - func startEngineIfNeeded() throws -> AVAudioFormat { + func startEngineIfNeeded() throws(LiveKitError) -> AVAudioFormat { guard let outputFormat else { throw LiveKitError(.soundPlayer, message: "Invalid output format") } @@ -245,7 +249,7 @@ extension SoundPlayer { await soundState.stop(destination: destination) } - func play(_ sound: SoundHandle, options: SoundPlaybackOptions = SoundPlaybackOptions()) async throws { + func play(_ sound: SoundHandle, options: SoundPlaybackOptions = SoundPlaybackOptions()) async throws(LiveKitError) { guard let soundState = sounds[sound.id] else { throw LiveKitError(.soundPlayer, message: "Sound not prepared") } @@ -272,12 +276,12 @@ extension SoundPlayer { } } - static func decodeBuffer(from fileURL: URL) async throws -> AVAudioPCMBuffer { + static func decodeBuffer(from fileURL: URL) async throws(LiveKitError) -> AVAudioPCMBuffer { guard fileURL.isFileURL else { throw LiveKitError(.invalidParameter, message: "Only file URLs are supported") } - return try await Task.detached(priority: .userInitiated) { + let task = Task.detached(priority: .userInitiated) { () throws -> AVAudioPCMBuffer in let audioFile = try AVAudioFile(forReading: fileURL) guard let readBuffer = AVAudioPCMBuffer(pcmFormat: audioFile.processingFormat, frameCapacity: AVAudioFrameCount(audioFile.length)) @@ -286,6 +290,13 @@ extension SoundPlayer { } try audioFile.read(into: readBuffer, frameCount: AVAudioFrameCount(audioFile.length)) return readBuffer - }.value + } + do { + return try await task.value + } catch let error as LiveKitError { + throw error + } catch { + throw LiveKitError(.soundPlayer, internalError: error) + } } } diff --git a/Sources/LiveKit/Broadcast/IPC/BroadcastAudioCodec.swift b/Sources/LiveKit/Broadcast/IPC/BroadcastAudioCodec.swift index 8c596c8f1..f5dd31c35 100644 --- a/Sources/LiveKit/Broadcast/IPC/BroadcastAudioCodec.swift +++ b/Sources/LiveKit/Broadcast/IPC/BroadcastAudioCodec.swift @@ -94,6 +94,8 @@ struct BroadcastAudioCodec { } extension AudioStreamBasicDescription: Codable { + // Encodable.encode requires untyped throws. + // swiftlint:disable:next public_typed_throws public func encode(to encoder: any Encoder) throws { var container = encoder.unkeyedContainer() try container.encode(mSampleRate) diff --git a/Sources/LiveKit/Core/DataChannelPair.swift b/Sources/LiveKit/Core/DataChannelPair.swift index e0d04a72b..da4456d48 100644 --- a/Sources/LiveKit/Core/DataChannelPair.swift +++ b/Sources/LiveKit/Core/DataChannelPair.swift @@ -299,7 +299,7 @@ class DataChannelPair: NSObject, @unchecked Sendable, Loggable { } } - func reset() { + func reset(throwing error: Error? = nil) { let (lossy, reliable) = _state.mutate { let result = ($0.lossy, $0.reliable) $0.reliable = nil @@ -312,38 +312,47 @@ class DataChannelPair: NSObject, @unchecked Sendable, Loggable { lossy?.close() reliable?.close() - openCompleter.reset() + openCompleter.reset(throwing: error) } // MARK: - Send - func send(userPacket: Livekit_UserPacket, kind: Livekit_DataPacket.Kind) async throws { + func send(userPacket: Livekit_UserPacket, kind: Livekit_DataPacket.Kind) async throws(LiveKitError) { try await send(dataPacket: .with { $0.kind = kind // TODO: field is deprecated $0.user = userPacket }) } - func send(dataPacket packet: Livekit_DataPacket) async throws { + func send(dataPacket packet: Livekit_DataPacket) async throws(LiveKitError) { let packet = try withEncryption(withSequence(packet)) - let serializedData = try packet.serializedData() + let serializedData: Data + do { + serializedData = try packet.serializedData() + } catch { + throw LiveKitError(.failedToConvertData, internalError: error) + } let rtcData = RTC.createDataBuffer(data: serializedData) - try await withCheckedThrowingContinuation { continuation in - let request = PublishDataRequest( - data: rtcData, - sequence: packet.sequence, - continuation: continuation - ) - let event = ChannelEvent( - channelKind: ChannelKind(packet.kind), // TODO: field is deprecated - detail: .publishData(request) - ) - eventContinuation.yield(event) + do { + try await withCheckedThrowingContinuation { continuation in + let request = PublishDataRequest( + data: rtcData, + sequence: packet.sequence, + continuation: continuation + ) + let event = ChannelEvent( + channelKind: ChannelKind(packet.kind), // TODO: field is deprecated + detail: .publishData(request) + ) + eventContinuation.yield(event) + } + } catch { + throw LiveKitError(from: error) } } - private func withEncryption(_ packet: Livekit_DataPacket) throws -> Livekit_DataPacket { + private func withEncryption(_ packet: Livekit_DataPacket) throws(LiveKitError) -> Livekit_DataPacket { guard let e2eeManager, e2eeManager.isDataChannelEncryptionEnabled, let payload = Livekit_EncryptedPacketPayload(dataPacket: packet) else { return packet } var packet = packet diff --git a/Sources/LiveKit/Core/PreConnectAudioBuffer.swift b/Sources/LiveKit/Core/PreConnectAudioBuffer.swift index 52333a44b..7535c1623 100644 --- a/Sources/LiveKit/Core/PreConnectAudioBuffer.swift +++ b/Sources/LiveKit/Core/PreConnectAudioBuffer.swift @@ -73,7 +73,7 @@ public final class PreConnectAudioBuffer: NSObject, Sendable, Loggable { /// The room connection needs to be established and the remote participant needs to subscribe to the audio track /// before the timeout is reached. Otherwise, the audio stream will be flushed without sending. /// - recorder: Optional custom recorder instance. If not provided, a new one will be created. - public func startRecording(timeout: TimeInterval = Constants.timeout, recorder: LocalAudioTrackRecorder? = nil) async throws { + public func startRecording(timeout: TimeInterval = Constants.timeout, recorder: LocalAudioTrackRecorder? = nil) async throws(LiveKitError) { room?.add(delegate: self) let roomOptions = room?._state.roomOptions @@ -123,7 +123,7 @@ public final class PreConnectAudioBuffer: NSObject, Sendable, Loggable { /// - room: The room instance to send the audio data. /// - agents: The agents to send the audio data to. /// - topic: The topic to send the audio data. - public func sendAudioData(to room: Room, agents: [Participant.Identity], on topic: String = dataTopic) async throws { + public func sendAudioData(to room: Room, agents: [Participant.Identity], on topic: String = dataTopic) async throws(LiveKitError) { guard !agents.isEmpty else { return } guard !state.sent else { return } @@ -146,19 +146,27 @@ public final class PreConnectAudioBuffer: NSObject, Sendable, Loggable { ], destinationIdentities: agents ) - let writer = try await room.localParticipant.streamBytes(options: streamOptions) - var sentSize = 0 - for await chunk in audioStream { - do { - try await writer.write(chunk) - } catch { - try await writer.close(reason: error.localizedDescription) - throw error + let sentSize: Int + do { + let writer = try await room.localParticipant.streamBytes(options: streamOptions) + + var size = 0 + for await chunk in audioStream { + do { + try await writer.write(chunk) + } catch { + try? await writer.close(reason: error.localizedDescription) + throw error + } + size += chunk.count } - sentSize += chunk.count + try await writer.close() + sentSize = size + } catch { + // Wrap StreamError (or any other) as LiveKitError(.dataStream). + throw LiveKitError(from: error) } - try await writer.close() log("Sent \(recorder.duration(sentSize))s = \(sentSize / 1024)KB of audio data to \(agents.count) agent(s) \(agents)", .info) } diff --git a/Sources/LiveKit/Core/RegionManager.swift b/Sources/LiveKit/Core/RegionManager.swift index e0d94a831..564245eb8 100644 --- a/Sources/LiveKit/Core/RegionManager.swift +++ b/Sources/LiveKit/Core/RegionManager.swift @@ -69,7 +69,7 @@ actor RegionManager: Loggable { _ = startSettingsFetchIfNeeded(token: token) } - func resolveBest(token: String) async throws -> RegionInfo { + func resolveBest(token: String) async throws(LiveKitError) -> RegionInfo { try await requestSettingsIfNeeded(token: token) guard let selected = state.remaining.first else { throw LiveKitError(.regionManager, message: "No more remaining regions.") @@ -119,7 +119,7 @@ actor RegionManager: Loggable { do { let data = try await Self.fetchRegionSettings(providedUrl: providedUrl, token: token) let allRegions = try Self.parseRegionSettings(data: data) - try Task.checkCancellation() + try checkCancellation() applyFetchedRegions(allRegions) return allRegions } catch { @@ -132,14 +132,18 @@ actor RegionManager: Loggable { return task } - private func requestSettingsIfNeeded(token: String) async throws { + private func requestSettingsIfNeeded(token: String) async throws(LiveKitError) { guard providedUrl.isCloud else { throw LiveKitError(.onlyForCloud) } guard shouldRequestSettings() else { return } let task = startSettingsFetchIfNeeded(token: token) - _ = try await task.value + do { + _ = try await task.value + } catch { + throw LiveKitError(from: error) + } } private func applyFetchedRegions(_ allRegions: [RegionInfo]) { @@ -157,12 +161,18 @@ actor RegionManager: Loggable { // MARK: - Static helpers (non-isolated) - private nonisolated static func fetchRegionSettings(providedUrl: URL, token: String) async throws -> Data { + private nonisolated static func fetchRegionSettings(providedUrl: URL, token: String) async throws(LiveKitError) -> Data { var request = URLRequest(url: providedUrl.regionSettingsUrl(), cachePolicy: .reloadIgnoringLocalAndRemoteCacheData) request.addValue("Bearer \(token)", forHTTPHeaderField: "authorization") - let (data, response) = try await URLSession.shared.data(for: request) + let data: Data + let response: URLResponse + do { + (data, response) = try await URLSession.shared.data(for: request) + } catch { + throw LiveKitError(.network, internalError: error) + } guard let httpResponse = response as? HTTPURLResponse else { throw LiveKitError(.regionManager, message: "Failed to fetch region settings") } @@ -187,7 +197,7 @@ actor RegionManager: Loggable { return data } - private nonisolated static func parseRegionSettings(data: Data) throws -> [RegionInfo] { + private nonisolated static func parseRegionSettings(data: Data) throws(LiveKitError) -> [RegionInfo] { do { let regionSettings = try Livekit_RegionSettings(jsonUTF8Data: data) let allRegions = regionSettings.regions.compactMap { $0.toLKType() } diff --git a/Sources/LiveKit/Core/Room+Engine.swift b/Sources/LiveKit/Core/Room+Engine.swift index 66a5c8c6c..a0c316e06 100644 --- a/Sources/LiveKit/Core/Room+Engine.swift +++ b/Sources/LiveKit/Core/Room+Engine.swift @@ -39,10 +39,10 @@ extension Room { } // Resets state of transports - func cleanUpRTC() async { + func cleanUpRTC(withError disconnectError: Error? = nil) async { // Close data channels - publisherDataChannel.reset() - subscriberDataChannel.reset() + publisherDataChannel.reset(throwing: disconnectError) + subscriberDataChannel.reset(throwing: disconnectError) await _state.transport?.close() @@ -53,7 +53,7 @@ extension Room { } } - func publisherShouldNegotiate() async throws { + func publisherShouldNegotiate() async throws(LiveKitError) { log() let publisher = try requirePublisher() @@ -61,15 +61,15 @@ extension Room { _state.mutate { $0.hasPublished = true } } - func send(userPacket: Livekit_UserPacket, kind: Livekit_DataPacket.Kind) async throws { + func send(userPacket: Livekit_UserPacket, kind: Livekit_DataPacket.Kind) async throws(LiveKitError) { try await send(dataPacket: .with { $0.user = userPacket $0.kind = kind }) } - func send(dataPacket packet: Livekit_DataPacket) async throws { - func ensurePublisherConnected() async throws { + func send(dataPacket packet: Livekit_DataPacket) async throws(LiveKitError) { + func ensurePublisherConnected() async throws(LiveKitError) { // Only needed when subscriber is primary in dual PC mode guard case .subscriberPrimary = _state.transport else { return @@ -161,7 +161,7 @@ extension Room { singlePCMode: isSinglePC, delegate: self) - await publisher.set { [weak self] offer, offerId in + await publisher.set { [weak self] offer, offerId throws(LiveKitError) in guard let self else { return } log("Publisher onOffer with offerId: \(offerId), sdp: \(offer.sdp)") try await signalClient.send(offer: offer, offerId: offerId) @@ -274,7 +274,7 @@ extension Room { } // Check cancellation after WebSocket connected - try Task.checkCancellation() + try checkCancellation() connectSpan?.record("signal") connectSpan?.record("join_recv") @@ -282,14 +282,14 @@ extension Room { try await configureTransports(connectResponse: connectResponse, singlePeerConnection: singlePC) connectSpan?.record("pc_created") // Check cancellation after configuring transports - try Task.checkCancellation() + try checkCancellation() // Resume after configuring transports... await signalClient.resumeQueues() // Wait for transport... try await primaryTransportConnectedCompleter.wait(timeout: _state.connectOptions.primaryTransportConnectTimeout) - try Task.checkCancellation() + try checkCancellation() connectSpan?.record("engine") connectSpan?.record("pc_connected") @@ -338,12 +338,12 @@ extension Room { participantSid: localParticipant.sid, adaptiveStream: _state.roomOptions.adaptiveStream, singlePeerConnection: singlePC) - try Task.checkCancellation() + try checkCancellation() // Update configuration try await configureTransports(connectResponse: connectResponse, singlePeerConnection: singlePC) - try Task.checkCancellation() + try checkCancellation() // Resume after configuring transports... await signalClient.resumeQueues() @@ -357,7 +357,7 @@ extension Room { log("[Connect] Subscriber transport failed to connect, error: \(error)", .error) throw error } - try Task.checkCancellation() + try checkCancellation() // send SyncState before offer try await sendSyncState() @@ -540,7 +540,7 @@ extension Room { // MARK: - Private helpers extension Room { - func requirePublisher() throws -> Transport { + func requirePublisher() throws(LiveKitError) -> Transport { guard let publisher = _state.transport?.publisher else { log("Publisher is nil", .error) throw LiveKitError(.invalidState, message: "Publisher is nil") diff --git a/Sources/LiveKit/Core/Room+Region.swift b/Sources/LiveKit/Core/Room+Region.swift index 91095ccf9..15abfb5e6 100644 --- a/Sources/LiveKit/Core/Room+Region.swift +++ b/Sources/LiveKit/Core/Room+Region.swift @@ -54,7 +54,10 @@ extension Room { // // With LiveKit Cloud, it will also determine the best edge data center for // the current client to connect to if a token is provided. - public func prepareConnection(url providedUrlString: String, token: String? = nil) async throws { + public func prepareConnection(url providedUrlString: String, token: String? = nil) async throws(LiveKitError) { + // Obj-C interop: prepareConnection isn't tested via Obj-C tests today, + // but keep this in mind: typed throws on @objcMembers async methods + // strips the Obj-C completion-handler bridge. // Must be in disconnected state. guard _state.connectionState == .disconnected else { throw LiveKitError(.stateMismatch, message: "Cannot prepare connection when in state \(_state.connectionState)") @@ -132,9 +135,9 @@ extension Room { try await fullConnectSequence(nextUrl, token) return nextUrl } catch { - // Re-throw if is cancel. + // Re-throw if cancel (wrap as LiveKitError(.cancelled)). if error is CancellationError { - throw error + throw LiveKitError(.cancelled) } if let liveKitError = error as? LiveKitError, liveKitError.type == .validation { @@ -152,7 +155,7 @@ extension Room { await regionManager.markFailed(region: region) } - try Task.checkCancellation() + try checkCancellation() await cleanUp(isFullReconnect: true) diff --git a/Sources/LiveKit/Core/Room+TransportDelegate.swift b/Sources/LiveKit/Core/Room+TransportDelegate.swift index de299cb59..a8bff156f 100644 --- a/Sources/LiveKit/Core/Room+TransportDelegate.swift +++ b/Sources/LiveKit/Core/Room+TransportDelegate.swift @@ -32,12 +32,15 @@ extension Room: TransportDelegate { func transport(_ transport: Transport, didUpdateState pcState: LKRTCPeerConnectionState) { log("target: \(transport.target), connectionState: \(pcState.description)") + let pcError = LiveKitError(.network, + message: "Transport \(transport.target) state changed to \(pcState.description)") + // primary connected if transport.isPrimary { if pcState.isConnected { primaryTransportConnectedCompleter.resume(returning: ()) } else if pcState.isDisconnected { - primaryTransportConnectedCompleter.reset() + primaryTransportConnectedCompleter.reset(throwing: pcError) } } @@ -46,7 +49,7 @@ extension Room: TransportDelegate { if pcState.isConnected { publisherTransportConnectedCompleter.resume(returning: ()) } else if pcState.isDisconnected { - publisherTransportConnectedCompleter.reset() + publisherTransportConnectedCompleter.reset(throwing: pcError) } } diff --git a/Sources/LiveKit/Core/Room.swift b/Sources/LiveKit/Core/Room.swift index 1ec3773bf..7209428f9 100644 --- a/Sources/LiveKit/Core/Room.swift +++ b/Sources/LiveKit/Core/Room.swift @@ -40,7 +40,7 @@ public class Room: NSObject, @unchecked Sendable, ObservableObject, Loggable { public var sid: Sid? { _state.sid } /// Server assigned id of the Room. *async* version of ``Room/sid``. - public func sid() async throws -> Sid { + public func sid() async throws(LiveKitError) -> Sid { try await _sidCompleter.wait() } @@ -323,11 +323,15 @@ public class Room: NSObject, @unchecked Sendable, ObservableObject, Loggable { } } + // Pure Swift API: typed throws. @objcMembers can't bridge typed-throws + // async methods, so we add a separate @objc shim below to preserve the + // Obj-C selector -connectWithUrl:token:connectOptions:roomOptions:completionHandler:. + @nonobjc // swiftlint:disable:next cyclomatic_complexity function_body_length public func connect(url urlString: String, token: String, connectOptions: ConnectOptions? = nil, - roomOptions: RoomOptions? = nil) async throws + roomOptions: RoomOptions? = nil) async throws(LiveKitError) { guard let providedUrl = URL(string: urlString), providedUrl.isValidForConnect else { log("URL parse failed", .error) @@ -355,7 +359,7 @@ public class Room: NSObject, @unchecked Sendable, ObservableObject, Loggable { await cleanUp() - try Task.checkCancellation() + try checkCancellation() // enable E2EE if let e2eeOptions = state.roomOptions.e2eeOptions { @@ -437,7 +441,7 @@ public class Room: NSObject, @unchecked Sendable, ObservableObject, Loggable { // Connect sequence successful log("Connect sequence completed") // Final check if cancelled, don't fire connected events - try Task.checkCancellation() + try checkCancellation() connectSpan?.record("room_connected") @@ -463,12 +467,26 @@ public class Room: NSObject, @unchecked Sendable, ObservableObject, Loggable { } await cleanUp(withError: error) - throw error // Re-throw the original error + throw LiveKitError(from: error) } log("Connected to \(String(describing: self))", .info) } + /// Obj-C bridge for ``connect(url:token:connectOptions:roomOptions:)``. + /// Hidden from Swift via `@available(swift, obsoleted: 1.0)`; bridged to + /// -connectWithUrl:token:connectOptions:roomOptions:completionHandler: in Obj-C. + /// @objc forbids typed throws, so this stays untyped. + @available(swift, obsoleted: 1.0, message: "Use connect(url:token:connectOptions:roomOptions:)") + @objc(connectWithUrl:token:connectOptions:roomOptions:completionHandler:) + public func _objc_connect(url urlString: String, + token: String, + connectOptions: ConnectOptions? = nil, + roomOptions: RoomOptions? = nil) async throws + { + try await connect(url: urlString, token: token, connectOptions: connectOptions, roomOptions: roomOptions) + } + public func disconnect() async { let shouldDisconnect = _state.mutate { switch $0.connectionState { @@ -545,15 +563,16 @@ extension Room { log("withError: \(String(describing: disconnectError)), isFullReconnect: \(isFullReconnect)") // Reset completers - _sidCompleter.reset() - primaryTransportConnectedCompleter.reset() - publisherTransportConnectedCompleter.reset() + _sidCompleter.reset(throwing: disconnectError) + primaryTransportConnectedCompleter.reset(throwing: disconnectError) + publisherTransportConnectedCompleter.reset(throwing: disconnectError) + await activeParticipantCompleters.reset(throwing: disconnectError) await signalClient.cleanUp(withError: disconnectError) // Cancel all track stats timers before closing transports to prevent // stats collection from accessing destroyed WebRTC channels. cancelTimers() - await cleanUpRTC() + await cleanUpRTC(withError: disconnectError) await cleanUpParticipants(isFullReconnect: isFullReconnect) // Cleanup for E2EE diff --git a/Sources/LiveKit/Core/SignalClient.swift b/Sources/LiveKit/Core/SignalClient.swift index 3e0ea7baa..6f79b9e58 100644 --- a/Sources/LiveKit/Core/SignalClient.swift +++ b/Sources/LiveKit/Core/SignalClient.swift @@ -171,7 +171,7 @@ actor SignalClient: Loggable { let connectResponse = try await _connectResponseCompleter.wait() // Check cancellation after received join response - try Task.checkCancellation() + try checkCancellation() // Successfully connected _state.mutate { @@ -181,10 +181,10 @@ actor SignalClient: Loggable { return connectResponse } catch let connectionError { - // Skip validation if user cancelled + // Skip validation if user cancelled (wrap as LiveKitError(.cancelled)). if connectionError is CancellationError { await cleanUp(withError: connectionError) - throw connectionError + throw LiveKitError(.cancelled) } // Skip validation if reconnect mode @@ -253,9 +253,9 @@ actor SignalClient: Loggable { $0.lastJoinResponse = nil } - _connectResponseCompleter.reset() + _connectResponseCompleter.reset(throwing: disconnectError) - await _addTrackCompleters.reset() + await _addTrackCompleters.reset(throwing: disconnectError) await _requestQueue.clear() await _responseQueue.clear() @@ -270,7 +270,7 @@ actor SignalClient: Loggable { private extension SignalClient { // Send request or enqueue while reconnecting - func _sendRequest(_ request: Livekit_SignalRequest) async throws { + func _sendRequest(_ request: Livekit_SignalRequest) async throws(LiveKitError) { guard connectionState != .disconnected else { log("connectionState is .disconnected", .error) throw LiveKitError(.invalidState, message: "connectionState is .disconnected") @@ -418,7 +418,7 @@ extension SignalClient { // MARK: - Send methods extension SignalClient { - func send(offer: LKRTCSessionDescription, offerId: UInt32) async throws { + func send(offer: LKRTCSessionDescription, offerId: UInt32) async throws(LiveKitError) { let r = Livekit_SignalRequest.with { $0.offer = offer.toPBType(offerId: offerId) } @@ -426,7 +426,7 @@ extension SignalClient { try await _sendRequest(r) } - func send(answer: LKRTCSessionDescription, offerId: UInt32) async throws { + func send(answer: LKRTCSessionDescription, offerId: UInt32) async throws(LiveKitError) { let r = Livekit_SignalRequest.with { $0.answer = answer.toPBType(offerId: offerId) } @@ -445,7 +445,7 @@ extension SignalClient { try await _sendRequest(r) } - func sendMuteTrack(trackSid: Track.Sid, muted: Bool) async throws { + func sendMuteTrack(trackSid: Track.Sid, muted: Bool) async throws(LiveKitError) { let r = Livekit_SignalRequest.with { $0.mute = Livekit_MuteTrackRequest.with { $0.sid = trackSid.stringValue @@ -487,7 +487,7 @@ extension SignalClient { return try await completer.wait() } - func sendUpdateTrackSettings(trackSid: Track.Sid, settings: TrackSettings) async throws { + func sendUpdateTrackSettings(trackSid: Track.Sid, settings: TrackSettings) async throws(LiveKitError) { let r = Livekit_SignalRequest.with { $0.trackSetting = Livekit_UpdateTrackSettings.with { $0.trackSids = [trackSid.stringValue] @@ -502,7 +502,7 @@ extension SignalClient { try await _sendRequest(r) } - func sendUpdateVideoLayers(trackSid: Track.Sid, layers: [Livekit_VideoLayer]) async throws { + func sendUpdateVideoLayers(trackSid: Track.Sid, layers: [Livekit_VideoLayer]) async throws(LiveKitError) { let r = Livekit_SignalRequest.with { $0.updateLayers = Livekit_UpdateVideoLayers.with { $0.trackSid = trackSid.stringValue @@ -515,7 +515,7 @@ extension SignalClient { func sendUpdateSubscription(participantSid: Participant.Sid, trackSid: Track.Sid, - isSubscribed: Bool) async throws + isSubscribed: Bool) async throws(LiveKitError) { let p = Livekit_ParticipantTracks.with { $0.participantSid = participantSid.stringValue @@ -534,7 +534,7 @@ extension SignalClient { } func sendUpdateSubscriptionPermission(allParticipants: Bool, - trackPermissions: [ParticipantTrackPermission]) async throws + trackPermissions: [ParticipantTrackPermission]) async throws(LiveKitError) { let r = Livekit_SignalRequest.with { $0.subscriptionPermission = Livekit_SubscriptionPermission.with { @@ -548,7 +548,7 @@ extension SignalClient { func sendUpdateParticipant(name: String? = nil, metadata: String? = nil, - attributes: [String: String]? = nil) async throws + attributes: [String: String]? = nil) async throws(LiveKitError) { let r = Livekit_SignalRequest.with { $0.updateMetadata = Livekit_UpdateParticipantMetadata.with { @@ -726,7 +726,7 @@ extension Livekit_SignalRequest { } private extension SignalClient { - func requireWebSocket() async throws -> WebSocket { + func requireWebSocket() async throws(LiveKitError) -> WebSocket { guard let result = _state.socket else { log("WebSocket is nil", .error) throw LiveKitError(.invalidState, message: "WebSocket is nil") diff --git a/Sources/LiveKit/Core/Transport.swift b/Sources/LiveKit/Core/Transport.swift index e5ecff708..e35138fbd 100644 --- a/Sources/LiveKit/Core/Transport.swift +++ b/Sources/LiveKit/Core/Transport.swift @@ -23,7 +23,7 @@ internal import LiveKitWebRTC actor Transport: NSObject, Loggable { // MARK: - Types - typealias OnOfferBlock = @Sendable (LKRTCSessionDescription, UInt32) async throws -> Void + typealias OnOfferBlock = @Sendable (LKRTCSessionDescription, UInt32) async throws(LiveKitError) -> Void // MARK: - Public @@ -112,11 +112,11 @@ actor Transport: NSObject, Loggable { _isRestartingIce = true } - func add(iceCandidate candidate: IceCandidate) async throws { + func add(iceCandidate candidate: IceCandidate) async { await _iceCandidatesQueue.process(candidate, if: remoteDescription != nil && !_isRestartingIce) } - func set(remoteDescription sd: LKRTCSessionDescription, offerId: UInt32) async throws { + func set(remoteDescription sd: LKRTCSessionDescription, offerId: UInt32) async throws(LiveKitError) { if signalingState != .haveLocalOffer { log("Received answer with unexpected signaling state: \(signalingState), expected .haveLocalOffer", .warning) } @@ -130,15 +130,19 @@ actor Transport: NSObject, Loggable { try await set(remoteDescription: sd) } - func set(remoteDescription sd: LKRTCSessionDescription) async throws { - try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in - _pc.setRemoteDescription(sd) { error in - if let error { - continuation.resume(throwing: error) - } else { - continuation.resume() + func set(remoteDescription sd: LKRTCSessionDescription) async throws(LiveKitError) { + do { + try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in + _pc.setRemoteDescription(sd) { error in + if let error { + continuation.resume(throwing: LiveKitError(.webRTC, internalError: error)) + } else { + continuation.resume() + } } } + } catch { + throw LiveKitError(from: error) } await _iceCandidatesQueue.resume() @@ -157,7 +161,7 @@ actor Transport: NSObject, Loggable { } } - func createAndSendOffer(iceRestart: Bool = false) async throws { + func createAndSendOffer(iceRestart: Bool = false) async throws(LiveKitError) { guard let _onOffer else { log("_onOffer is nil", .error) return @@ -176,7 +180,7 @@ actor Transport: NSObject, Loggable { } // Actually negotiate - func _negotiateSequence() async throws { + func _negotiateSequence() async throws(LiveKitError) { _latestOfferId += 1 var offer = try await createOffer(for: constraints) if singlePCMode { @@ -321,20 +325,24 @@ extension Transport: LKRTCPeerConnectionDelegate { // MARK: - Private private extension Transport { - func createOffer(for constraints: [String: String]? = nil) async throws -> LKRTCSessionDescription { + func createOffer(for constraints: [String: String]? = nil) async throws(LiveKitError) -> LKRTCSessionDescription { let mediaConstraints = LKRTCMediaConstraints(mandatoryConstraints: constraints, optionalConstraints: nil) - return try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in - _pc.offer(for: mediaConstraints) { sd, error in - if let error { - continuation.resume(throwing: error) - } else if let sd { - continuation.resume(returning: sd) - } else { - continuation.resume(throwing: LiveKitError(.invalidState, message: "No session description and no error were provided.")) + do { + return try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in + _pc.offer(for: mediaConstraints) { sd, error in + if let error { + continuation.resume(throwing: LiveKitError(.webRTC, internalError: error)) + } else if let sd { + continuation.resume(returning: sd) + } else { + continuation.resume(throwing: LiveKitError(.invalidState, message: "No session description and no error were provided.")) + } } } + } catch { + throw LiveKitError(from: error) } } } @@ -342,32 +350,40 @@ private extension Transport { // MARK: - Internal extension Transport { - func createAnswer(for constraints: [String: String]? = nil) async throws -> LKRTCSessionDescription { + func createAnswer(for constraints: [String: String]? = nil) async throws(LiveKitError) -> LKRTCSessionDescription { let mediaConstraints = LKRTCMediaConstraints(mandatoryConstraints: constraints, optionalConstraints: nil) - return try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in - _pc.answer(for: mediaConstraints) { sd, error in - if let error { - continuation.resume(throwing: error) - } else if let sd { - continuation.resume(returning: sd) - } else { - continuation.resume(throwing: LiveKitError(.invalidState, message: "No session description and no error were provided.")) + do { + return try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in + _pc.answer(for: mediaConstraints) { sd, error in + if let error { + continuation.resume(throwing: LiveKitError(.webRTC, internalError: error)) + } else if let sd { + continuation.resume(returning: sd) + } else { + continuation.resume(throwing: LiveKitError(.invalidState, message: "No session description and no error were provided.")) + } } } + } catch { + throw LiveKitError(from: error) } } - func set(localDescription sd: LKRTCSessionDescription) async throws { - try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in - _pc.setLocalDescription(sd) { error in - if let error { - continuation.resume(throwing: error) - } else { - continuation.resume() + func set(localDescription sd: LKRTCSessionDescription) async throws(LiveKitError) { + do { + try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in + _pc.setLocalDescription(sd) { error in + if let error { + continuation.resume(throwing: LiveKitError(.webRTC, internalError: error)) + } else { + continuation.resume() + } } } + } catch { + throw LiveKitError(from: error) } } @@ -391,7 +407,7 @@ extension Transport { return transceiver } - func remove(track sender: LKRTCRtpSender) throws { + func remove(track sender: LKRTCRtpSender) throws(LiveKitError) { guard _pc.removeTrack(sender) else { throw LiveKitError(.webRTC, message: "Failed to remove track") } diff --git a/Sources/LiveKit/DataStream/Incoming/ByteStreamReader.swift b/Sources/LiveKit/DataStream/Incoming/ByteStreamReader.swift index 3ba4c010b..9a32e613a 100644 --- a/Sources/LiveKit/DataStream/Incoming/ByteStreamReader.swift +++ b/Sources/LiveKit/DataStream/Incoming/ByteStreamReader.swift @@ -33,10 +33,15 @@ public final class ByteStreamReader: NSObject, AsyncSequence, Sendable { /// once the stream closes normally. /// /// - Returns: The data consisting of all concatenated chunks. - /// - Throws: ``StreamError`` if an error occurs while reading the stream. - /// - public func readAll() async throws -> Data { - try await source.collect() + /// - Throws: ``LiveKitError`` (`.dataStream`) wrapping the underlying `StreamError` + /// if an error occurs while reading the stream. + @nonobjc + public func readAll() async throws(LiveKitError) -> Data { + do { + return try await source.collect() + } catch { + throw LiveKitError(from: error) + } } /// An asynchronous iterator of incoming chunks. @@ -61,14 +66,15 @@ extension ByteStreamReader { /// - nameOverride: The name to use for the written file. If not specified, file name and extension will be automatically /// inferred from the stream information. /// - Returns: The URL of the written file on disk. - /// - Throws: ``StreamError`` if an error occurs while reading the stream. - /// + /// - Throws: ``LiveKitError`` (`.dataStream`) wrapping the underlying `StreamError` + /// or `AsyncFileStream.Error` if an error occurs while reading the stream or writing to disk. + @nonobjc public func writeToFile( in directory: URL = FileManager.default.temporaryDirectory, name nameOverride: String? = nil - ) async throws -> URL { + ) async throws(LiveKitError) -> URL { guard directory.hasDirectoryPath else { - throw StreamError.notDirectory + throw LiveKitError(.dataStream, internalError: StreamError.notDirectory) } let fileName = Self.resolveFileName( preferredName: nameOverride ?? info.name, @@ -77,14 +83,18 @@ extension ByteStreamReader { ) let fileURL = directory.appendingPathComponent(fileName) - try await Task { - let writer = try AsyncFileStream(writingTo: fileURL) - defer { writer.close() } + do { + try await Task { + let writer = try AsyncFileStream(writingTo: fileURL) + defer { writer.close() } - for try await chunk in self { - try await writer.write(chunk) - } - }.value + for try await chunk in self { + try await writer.write(chunk) + } + }.value + } catch { + throw LiveKitError(from: error) + } return fileURL } @@ -134,4 +144,19 @@ public extension ByteStreamReader { } } } + + @available(swift, obsoleted: 1.0, message: "Use readAll()") + @objc(readAllWithCompletionHandler:) + func _objc_readAll() async throws -> Data { + try await readAll() + } + + @available(swift, obsoleted: 1.0, message: "Use writeToFile(in:name:)") + @objc(writeToFileInDirectory:name:completionHandler:) + func _objc_writeToFile( + in directory: URL = FileManager.default.temporaryDirectory, + name nameOverride: String? = nil + ) async throws -> URL { + try await writeToFile(in: directory, name: nameOverride) + } } diff --git a/Sources/LiveKit/DataStream/Incoming/TextStreamReader.swift b/Sources/LiveKit/DataStream/Incoming/TextStreamReader.swift index c81982dfc..99a3476c7 100644 --- a/Sources/LiveKit/DataStream/Incoming/TextStreamReader.swift +++ b/Sources/LiveKit/DataStream/Incoming/TextStreamReader.swift @@ -33,10 +33,15 @@ public final class TextStreamReader: NSObject, AsyncSequence, Sendable { /// once the stream closes normally. /// /// - Returns: The string consisting of all concatenated chunks. - /// - Throws: ``StreamError`` if an error occurs while reading the stream. - /// - public func readAll() async throws -> String { - try await collect() + /// - Throws: ``LiveKitError`` (`.dataStream`) wrapping the underlying `StreamError` + /// if an error occurs while reading the stream. + @nonobjc + public func readAll() async throws(LiveKitError) -> String { + do { + return try await collect() + } catch { + throw LiveKitError(from: error) + } } /// An asynchronous iterator of incoming chunks. @@ -76,4 +81,10 @@ public extension TextStreamReader { } } } + + @available(swift, obsoleted: 1.0, message: "Use readAll()") + @objc(readAllWithCompletionHandler:) + func _objc_readAll() async throws -> String { + try await readAll() + } } diff --git a/Sources/LiveKit/DataStream/Outgoing/ByteStreamWriter.swift b/Sources/LiveKit/DataStream/Outgoing/ByteStreamWriter.swift index d338a2b7b..6886ec6b8 100644 --- a/Sources/LiveKit/DataStream/Outgoing/ByteStreamWriter.swift +++ b/Sources/LiveKit/DataStream/Outgoing/ByteStreamWriter.swift @@ -32,22 +32,44 @@ public final class ByteStreamWriter: NSObject, Sendable { /// Write data to the stream. /// /// - Parameter data: Data to be sent. - /// - Throws: Throws an error if the stream has been closed or data - /// cannot be sent to remote participants. - /// - public func write(_ data: Data) async throws { - try await destination.write(data) + /// - Throws: ``LiveKitError`` (`.dataStream`) wrapping the underlying `StreamError` + /// if the stream has been closed or data cannot be sent to remote participants. + @nonobjc + public func write(_ data: Data) async throws(LiveKitError) { + do { + try await destination.write(data) + } catch { + throw LiveKitError(from: error) + } } /// Close the stream. /// /// - Parameter reason: A textual description of why the stream is being closed. Absense /// of a reason indicates a normal closure. - /// - Throws: Throws an error if the stream has already been closed or closure - /// cannot be communicated to remote participants. - /// - public func close(reason: String? = nil) async throws { - try await destination.close(reason: reason) + /// - Throws: ``LiveKitError`` (`.dataStream`) wrapping the underlying `StreamError` + /// if the stream has already been closed or closure cannot be communicated to remote participants. + @nonobjc + public func close(reason: String? = nil) async throws(LiveKitError) { + do { + try await destination.close(reason: reason) + } catch { + throw LiveKitError(from: error) + } + } + + // MARK: - Obj-C bridges + + @available(swift, obsoleted: 1.0, message: "Use write(_:)") + @objc(write:completionHandler:) + public func _objc_write(_ data: Data) async throws { + try await write(data) + } + + @available(swift, obsoleted: 1.0, message: "Use close(reason:)") + @objc(closeWithReason:completionHandler:) + public func _objc_close(reason: String? = nil) async throws { + try await close(reason: reason) } init(info: ByteStreamInfo, destination: StreamWriterDestination) { diff --git a/Sources/LiveKit/DataStream/Outgoing/TextStreamWriter.swift b/Sources/LiveKit/DataStream/Outgoing/TextStreamWriter.swift index cd09dd7aa..6b6a476db 100644 --- a/Sources/LiveKit/DataStream/Outgoing/TextStreamWriter.swift +++ b/Sources/LiveKit/DataStream/Outgoing/TextStreamWriter.swift @@ -32,22 +32,44 @@ public final class TextStreamWriter: NSObject, Sendable { /// Write text to the stream. /// /// - Parameter text: Text to be sent. - /// - Throws: Throws an error if the stream has been closed or text - /// cannot be sent to remote participants. - /// - public func write(_ text: String) async throws { - try await destination.write(text) + /// - Throws: ``LiveKitError`` (`.dataStream`) wrapping the underlying `StreamError` + /// if the stream has been closed or text cannot be sent to remote participants. + @nonobjc + public func write(_ text: String) async throws(LiveKitError) { + do { + try await destination.write(text) + } catch { + throw LiveKitError(from: error) + } } /// Close the stream. /// /// - Parameter reason: A textual description of why the stream is being closed. Absense /// of a reason indicates a normal closure. - /// - Throws: Throws an error if the stream has already been closed or closure - /// cannot be communicated to remote participants. - /// - public func close(reason: String? = nil) async throws { - try await destination.close(reason: reason) + /// - Throws: ``LiveKitError`` (`.dataStream`) wrapping the underlying `StreamError` + /// if the stream has already been closed or closure cannot be communicated to remote participants. + @nonobjc + public func close(reason: String? = nil) async throws(LiveKitError) { + do { + try await destination.close(reason: reason) + } catch { + throw LiveKitError(from: error) + } + } + + // MARK: - Obj-C bridges + + @available(swift, obsoleted: 1.0, message: "Use write(_:)") + @objc(write:completionHandler:) + public func _objc_write(_ text: String) async throws { + try await write(text) + } + + @available(swift, obsoleted: 1.0, message: "Use close(reason:)") + @objc(closeWithReason:completionHandler:) + public func _objc_close(reason: String? = nil) async throws { + try await close(reason: reason) } init(info: TextStreamInfo, destination: StreamWriterDestination) { diff --git a/Sources/LiveKit/Errors.swift b/Sources/LiveKit/Errors.swift index dacf9d058..f99b9943a 100644 --- a/Sources/LiveKit/Errors.swift +++ b/Sources/LiveKit/Errors.swift @@ -68,6 +68,9 @@ public enum LiveKitErrorType: Int, Sendable { // LiveKit Cloud case onlyForCloud = 1101 case regionManager = 1102 + + // Data streams + case dataStream = 1201 } extension LiveKitErrorType: CustomStringConvertible { @@ -131,6 +134,8 @@ extension LiveKitErrorType: CustomStringConvertible { "Only for LiveKit Cloud" case .regionManager: "Region manager error" + case .dataStream: + "Data stream error" default: "Unknown" } } @@ -182,23 +187,44 @@ public class LiveKitError: NSError, @unchecked Sendable, Loggable { } } -extension LiveKitError { - static func from(error: Error?) -> LiveKitError? { - guard let error else { return nil } - if let error = error as? LiveKitError { - return error +public extension LiveKitError { + /// Wraps any `Error` as a `LiveKitError`. Pass-through for an existing + /// `LiveKitError` (its type/message/internalError are forwarded); + /// `CancellationError` becomes `.cancelled`; network errors become + /// `.network`; `StreamError` becomes `.dataStream`; everything else + /// becomes `.unknown` with `internalError` set. + /// + /// Designed for `throws(LiveKitError)` boundary catches: + /// ```swift + /// } catch { + /// throw LiveKitError(from: error) + /// } + /// ``` + convenience init(from error: any Error) { + if let lk = error as? LiveKitError { + self.init(lk.type, message: lk.message, internalError: lk.internalError) + return } - if error is CancellationError { - return LiveKitError(.cancelled) + self.init(.cancelled) + return } - if error.isNetworkError { - return LiveKitError(.network, internalError: error) + self.init(.network, internalError: error) + return + } + if error is StreamError { + self.init(.dataStream, internalError: error) + return } + self.init(.unknown, internalError: error) + } +} - log("Uncategorized error for: \(String(describing: error))") - return LiveKitError(.unknown) +extension LiveKitError { + static func from(error: Error?) -> LiveKitError? { + guard let error else { return nil } + return LiveKitError(from: error) } static func from(reason: Livekit_DisconnectReason) -> LiveKitError { @@ -206,6 +232,15 @@ extension LiveKitError { } } +/// Throws `LiveKitError(.cancelled)` if the current Task is cancelled. +/// +/// Typed-throws counterpart to `Task.checkCancellation()` for use inside +/// `throws(LiveKitError)` contexts. +@inlinable +func checkCancellation() throws(LiveKitError) { + if Task.isCancelled { throw LiveKitError(.cancelled) } +} + extension Error { /// Returns `true` for URLError, CFNetwork, and POSIX socket errors. var isNetworkError: Bool { diff --git a/Sources/LiveKit/Participant/LocalParticipant.swift b/Sources/LiveKit/Participant/LocalParticipant.swift index 146fda51b..3e72f6419 100644 --- a/Sources/LiveKit/Participant/LocalParticipant.swift +++ b/Sources/LiveKit/Participant/LocalParticipant.swift @@ -33,7 +33,7 @@ public class LocalParticipant: Participant, @unchecked Sendable { /// publish a new audio track to the Room @discardableResult - public func publish(audioTrack: LocalAudioTrack, options: AudioPublishOptions? = nil) async throws -> LocalTrackPublication { + public func publish(audioTrack: LocalAudioTrack, options: AudioPublishOptions? = nil) async throws(LiveKitError) -> LocalTrackPublication { let result = try await _publishSerialRunner.run { try await self._publish(track: audioTrack, options: options) } @@ -43,7 +43,7 @@ public class LocalParticipant: Participant, @unchecked Sendable { /// publish a new video track to the Room @discardableResult - public func publish(videoTrack: LocalVideoTrack, options: VideoPublishOptions? = nil) async throws -> LocalTrackPublication { + public func publish(videoTrack: LocalVideoTrack, options: VideoPublishOptions? = nil) async throws(LiveKitError) -> LocalTrackPublication { let result = try await _publishSerialRunner.run { try await self._publish(track: videoTrack, options: options) } @@ -65,7 +65,7 @@ public class LocalParticipant: Participant, @unchecked Sendable { /// unpublish an existing published track /// this will also stop the track - public func unpublish(publication: LocalTrackPublication, notify _notify: Bool = true) async throws { + public func unpublish(publication: LocalTrackPublication, notify _notify: Bool = true) async throws(LiveKitError) { let room = try requireRoom() func _notifyDidUnpublish() async { @@ -103,7 +103,7 @@ public class LocalParticipant: Participant, @unchecked Sendable { try await track.stop() } - try await track.onUnpublish() + await track.onUnpublish() await _notifyDidUnpublish() } @@ -114,7 +114,7 @@ public class LocalParticipant: Participant, @unchecked Sendable { /// - Parameters: /// - data: Data to send /// - options: Provide options with a ``DataPublishOptions`` class. - public func publish(data: Data, options: DataPublishOptions? = nil) async throws { + public func publish(data: Data, options: DataPublishOptions? = nil) async throws(LiveKitError) { let room = try requireRoom() let options = options ?? room._state.roomOptions.defaultDataPublishOptions @@ -150,7 +150,7 @@ public class LocalParticipant: Participant, @unchecked Sendable { * participant/track. Any omitted participants will not receive any permissions. */ public func setTrackSubscriptionPermissions(allParticipantsAllowed: Bool, - trackPermissions: [ParticipantTrackPermission] = []) async throws + trackPermissions: [ParticipantTrackPermission] = []) async throws(LiveKitError) { self.allParticipantsAllowed = allParticipantsAllowed self.trackPermissions = trackPermissions @@ -161,7 +161,7 @@ public class LocalParticipant: Participant, @unchecked Sendable { /// Sets and updates the metadata of the local participant. /// /// Note: this requires `CanUpdateOwnMetadata` permission encoded in the token. - public func set(metadata: String) async throws { + public func set(metadata: String) async throws(LiveKitError) { let room = try requireRoom() try await room.signalClient.sendUpdateParticipant(metadata: metadata) _state.mutate { $0.metadata = metadata } @@ -170,19 +170,19 @@ public class LocalParticipant: Participant, @unchecked Sendable { /// Sets and updates the name of the local participant. /// /// Note: this requires `CanUpdateOwnMetadata` permission encoded in the token. - public func set(name: String) async throws { + public func set(name: String) async throws(LiveKitError) { let room = try requireRoom() try await room.signalClient.sendUpdateParticipant(name: name) _state.mutate { $0.name = name } } - public func set(attributes: [String: String]) async throws { + public func set(attributes: [String: String]) async throws(LiveKitError) { let room = try requireRoom() try await room.signalClient.sendUpdateParticipant(attributes: attributes) _state.mutate { $0.attributes = attributes } } - func sendTrackSubscriptionPermissions() async throws { + func sendTrackSubscriptionPermissions() async throws(LiveKitError) { let room = try requireRoom() guard room._state.connectionState == .connected else { return } @@ -545,7 +545,7 @@ extension LocalParticipant { try await track.start() // Starting the Track could be time consuming especially for camera etc. // Check cancellation after track starts. - try Task.checkCancellation() + try checkCancellation() do { var dimensions: Dimensions? // Only for Video diff --git a/Sources/LiveKit/Participant/Participant.swift b/Sources/LiveKit/Participant/Participant.swift index c3d383787..1dcdff26a 100644 --- a/Sources/LiveKit/Participant/Participant.swift +++ b/Sources/LiveKit/Participant/Participant.swift @@ -311,7 +311,7 @@ extension Participant { // MARK: - Private helpers extension Participant { - func requireRoom() throws -> Room { + func requireRoom() throws(LiveKitError) -> Room { guard let room = _room else { log("Room is nil", .error) throw LiveKitError(.invalidState, message: "Room is nil") @@ -320,7 +320,7 @@ extension Participant { return room } - func requireIdentity() throws -> Participant.Identity { + func requireIdentity() throws(LiveKitError) -> Participant.Identity { guard let identity else { log("Identity is nil", .error) throw LiveKitError(.invalidState, message: "Identity is nil") diff --git a/Sources/LiveKit/Participant/RemoteParticipant+Active.swift b/Sources/LiveKit/Participant/RemoteParticipant+Active.swift index 4603b07ce..5afbf8dee 100644 --- a/Sources/LiveKit/Participant/RemoteParticipant+Active.swift +++ b/Sources/LiveKit/Participant/RemoteParticipant+Active.swift @@ -23,7 +23,7 @@ public extension RemoteParticipant { /// - timeout: The timeout for the operation. /// - Throws: `LiveKitError` if the participant is not active within the timeout. @discardableResult - func waitUntilActive(timeout: TimeInterval = .defaultParticipantActiveTimeout) async throws -> Self { + func waitUntilActive(timeout: TimeInterval = .defaultParticipantActiveTimeout) async throws(LiveKitError) -> Self { let room = try requireRoom() let identity = try requireIdentity() try await room.activeParticipantCompleters.completer(for: identity.stringValue).wait(timeout: timeout) diff --git a/Sources/LiveKit/Support/Async/AsyncCompleter.swift b/Sources/LiveKit/Support/Async/AsyncCompleter.swift index a00a0a297..886ad90cd 100644 --- a/Sources/LiveKit/Support/Async/AsyncCompleter.swift +++ b/Sources/LiveKit/Support/Async/AsyncCompleter.swift @@ -48,15 +48,15 @@ actor CompleterMapActor { completer.resume(returning: value) } - func resume(throwing error: any Error, for key: String) { - let completer = completer(for: key) + func resume(throwing error: LiveKitError, for key: String) { + guard let completer = _completerMap[key] else { return } completer.resume(throwing: error) } - func reset() { + func reset(throwing error: Error? = nil) { // Reset call completers... for (_, value) in _completerMap { - value.reset() + value.reset(throwing: error) } // Clear all completers... _completerMap.removeAll() @@ -69,8 +69,8 @@ final class AsyncCompleter: @unchecked Sendable, Loggable { let continuation: CheckedContinuation let timeoutBlock: DispatchWorkItem - func cancel() { - continuation.resume(throwing: LiveKitError(.cancelled)) + func cancel(throwing error: LiveKitError? = nil) { + continuation.resume(throwing: error ?? LiveKitError(.cancelled)) timeoutBlock.cancel() } @@ -79,8 +79,8 @@ final class AsyncCompleter: @unchecked Sendable, Loggable { timeoutBlock.cancel() } - func resume(with result: Result) { - continuation.resume(with: result) + func resume(with result: Result) { + continuation.resume(with: result.mapError { $0 as Error }) timeoutBlock.cancel() } } @@ -92,10 +92,14 @@ final class AsyncCompleter: @unchecked Sendable, Loggable { // Internal states private var _defaultTimeout: DispatchTimeInterval private var _entries: [UUID: WaitEntry] = [:] - private var _result: Result? + private var _result: Result? private let _lock: some Lock = createLock() + var waiterCount: Int { + _lock.sync { _entries.count } + } + init(label: String, defaultTimeout: TimeInterval) { self.label = label _defaultTimeout = defaultTimeout.toDispatchTimeInterval @@ -111,17 +115,17 @@ final class AsyncCompleter: @unchecked Sendable, Loggable { } } - func reset() { + func reset(throwing error: Error? = nil) { _lock.sync { for entry in _entries.values { - entry.cancel() + entry.cancel(throwing: LiveKitError.from(error: error)) } _entries.removeAll() _result = nil } } - func resume(with result: Result) { + func resume(with result: Result) { _lock.sync { if let _result { log("\(label) already resolved \(_entries) with \(_result)", .debug) @@ -139,12 +143,12 @@ final class AsyncCompleter: @unchecked Sendable, Loggable { resume(with: .success(value)) } - func resume(throwing error: Error) { + func resume(throwing error: LiveKitError) { log("\(label)", .error) resume(with: .failure(error)) } - func wait(timeout: TimeInterval? = nil) async throws -> T { + func wait(timeout: TimeInterval? = nil) async throws(LiveKitError) -> T { // Read value if let result = _lock.sync({ _result }) { // Already resolved... @@ -162,38 +166,43 @@ final class AsyncCompleter: @unchecked Sendable, Loggable { let entryId = UUID() // Create a cancel-aware timed continuation - return try await withTaskCancellationHandler { - try await withCheckedThrowingContinuation { continuation in - // Create time-out block - let timeoutBlock = DispatchWorkItem { [weak self] in - guard let self else { return } - log("\(label) id: \(entryId) timed out") - _lock.sync { - if let entry = self._entries[entryId] { - entry.timeout() + do { + return try await withTaskCancellationHandler { + try await withCheckedThrowingContinuation { continuation in + // Create time-out block + let timeoutBlock = DispatchWorkItem { [weak self] in + guard let self else { return } + log("\(label) id: \(entryId) timed out") + _lock.sync { + if let entry = self._entries[entryId] { + entry.timeout() + } + self._entries.removeValue(forKey: entryId) } - self._entries.removeValue(forKey: entryId) } - } - _lock.sync { - // Schedule time-out block - let computedTimeout = (timeout?.toDispatchTimeInterval ?? _defaultTimeout) - _timerQueue.asyncAfter(deadline: .now() + computedTimeout, execute: timeoutBlock) - // Store entry - _entries[entryId] = WaitEntry(continuation: continuation, timeoutBlock: timeoutBlock) + _lock.sync { + // Schedule time-out block + let computedTimeout = (timeout?.toDispatchTimeInterval ?? _defaultTimeout) + _timerQueue.asyncAfter(deadline: .now() + computedTimeout, execute: timeoutBlock) + // Store entry + _entries[entryId] = WaitEntry(continuation: continuation, timeoutBlock: timeoutBlock) - log("\(label) id: \(entryId) waiting for \(computedTimeout)") + log("\(label) id: \(entryId) waiting for \(computedTimeout)") + } } - } - } onCancel: { - // Cancel only this completer when Task gets cancelled - _lock.sync { - if let entry = self._entries[entryId] { - entry.cancel() + } onCancel: { + // Cancel only this completer when Task gets cancelled + _lock.sync { + if let entry = self._entries[entryId] { + entry.cancel() + } + self._entries.removeValue(forKey: entryId) } - self._entries.removeValue(forKey: entryId) } + } catch { + // All internal resume paths use LiveKitError; this is a safety net. + throw LiveKitError(from: error) } } } diff --git a/Sources/LiveKit/Support/Async/AsyncSerialDelegate.swift b/Sources/LiveKit/Support/Async/AsyncSerialDelegate.swift index 6d8b0fecd..95b558dc9 100644 --- a/Sources/LiveKit/Support/Async/AsyncSerialDelegate.swift +++ b/Sources/LiveKit/Support/Async/AsyncSerialDelegate.swift @@ -28,7 +28,7 @@ final class AsyncSerialDelegate: Sendable { _state.mutate { $0.delegate = delegate as AnyObject } } - func notifyAsync(_ fnc: @Sendable @escaping (T) async -> Void) async throws { + func notifyAsync(_ fnc: @Sendable @escaping (T) async -> Void) async throws(LiveKitError) { guard let delegate = _state.read({ $0.delegate }) as? T else { return } try await _serialRunner.run { await fnc(delegate) diff --git a/Sources/LiveKit/Support/Audio/AudioMixRecorder.swift b/Sources/LiveKit/Support/Audio/AudioMixRecorder.swift index 2096edaa0..7c8dac190 100644 --- a/Sources/LiveKit/Support/Audio/AudioMixRecorder.swift +++ b/Sources/LiveKit/Support/Audio/AudioMixRecorder.swift @@ -131,14 +131,18 @@ public class AudioMixRecorder: Loggable, @unchecked Sendable { // MARK: - Public Methods - public func start() throws { + public func start() throws(LiveKitError) { guard !audioEngine.isRunning else { log("Already running", .warning) return } log() - try audioEngine.start() + do { + try audioEngine.start() + } catch { + throw LiveKitError(.audioEngine, internalError: error) + } // Calculate interval based on buffer size and sample rate let interval = Double(maxFrameCount) / Double(processingFormat.sampleRate) startRenderTimer(interval: interval) diff --git a/Sources/LiveKit/Support/Audio/AudioPlayerRenderer.swift b/Sources/LiveKit/Support/Audio/AudioPlayerRenderer.swift index 8ac4e20f3..9a45146e9 100644 --- a/Sources/LiveKit/Support/Audio/AudioPlayerRenderer.swift +++ b/Sources/LiveKit/Support/Audio/AudioPlayerRenderer.swift @@ -28,7 +28,7 @@ public class AudioPlayerRenderer: AudioRenderer, Loggable, @unchecked Sendable { engine.attach(playerNode) } - public func start() async throws { + public func start() async throws(LiveKitError) { log("Starting audio engine...") let format = engine.outputNode.outputFormat(forBus: 0) @@ -36,7 +36,11 @@ public class AudioPlayerRenderer: AudioRenderer, Loggable, @unchecked Sendable { engine.connect(playerNode, to: engine.mainMixerNode, format: format) - try engine.start() + do { + try engine.start() + } catch { + throw LiveKitError(.audioEngine, internalError: error) + } log("Audio engine started") } diff --git a/Sources/LiveKit/Support/Network/HTTP.swift b/Sources/LiveKit/Support/Network/HTTP.swift index d0c108494..699120bc1 100644 --- a/Sources/LiveKit/Support/Network/HTTP.swift +++ b/Sources/LiveKit/Support/Network/HTTP.swift @@ -23,7 +23,7 @@ class HTTP: NSObject { delegate: nil, delegateQueue: operationQueue) - static func requestValidation(from url: URL, token: String) async throws { + static func requestValidation(from url: URL, token: String) async throws(LiveKitError) { var request = URLRequest(url: url, cachePolicy: .reloadIgnoringLocalAndRemoteCacheData, timeoutInterval: .defaultHTTPConnect) @@ -31,10 +31,16 @@ class HTTP: NSObject { request.addValue("Bearer \(token)", forHTTPHeaderField: "Authorization") // Make the data request - let (data, response) = try await session.data(for: request) + let data: Data + let response: URLResponse + do { + (data, response) = try await session.data(for: request) + } catch { + throw LiveKitError(.network, internalError: error) + } guard let httpResponse = response as? HTTPURLResponse else { - throw URLError(.badServerResponse) + throw LiveKitError(.network, message: "Invalid HTTP response from \(url)") } guard (200 ..< 300).contains(httpResponse.statusCode) else { diff --git a/Sources/LiveKit/Support/Network/WebSocket.swift b/Sources/LiveKit/Support/Network/WebSocket.swift index e4c4f951b..9f41408c4 100644 --- a/Sources/LiveKit/Support/Network/WebSocket.swift +++ b/Sources/LiveKit/Support/Network/WebSocket.swift @@ -37,7 +37,7 @@ actor WebSocket: Loggable, AsyncSequence { return config } - init(url: URL, token: String, connectOptions: ConnectOptions?) async throws { + init(url: URL, token: String, connectOptions: ConnectOptions?) async throws(LiveKitError) { var request = URLRequest(url: url, cachePolicy: .useProtocolCachePolicy, timeoutInterval: connectOptions?.socketConnectTimeoutInterval ?? .defaultSocketConnect) @@ -54,13 +54,17 @@ actor WebSocket: Loggable, AsyncSequence { delegate: delegate, delegateQueue: nil) task = urlSession.webSocketTask(with: request) - try await withTaskCancellationHandler { - try await withCheckedThrowingContinuation { continuation in - delegate.setConnectContinuation(continuation) - task.resume() + do { + try await withTaskCancellationHandler { + try await withCheckedThrowingContinuation { continuation in + delegate.setConnectContinuation(continuation) + task.resume() + } + } onCancel: { + self.close() } - } onCancel: { - self.close() + } catch { + throw LiveKitError(from: error) } } @@ -95,7 +99,7 @@ actor WebSocket: Loggable, AsyncSequence { // rather than CancellationError. Return nil (end-of-sequence) // instead of propagating, so `subscribe` doesn't call onFailure. if task.closeCode != .invalid || Task.isCancelled { return nil } - throw LiveKitError.from(error: error) ?? error + throw LiveKitError(from: error) } } onCancel: { task.cancel(with: .normalClosure, reason: nil) @@ -141,8 +145,7 @@ actor WebSocket: Loggable, AsyncSequence { _continuation.mutate { if let error { - let lkError = LiveKitError.from(error: error) ?? LiveKitError(.unknown) - $0?.resume(throwing: lkError) + $0?.resume(throwing: LiveKitError(from: error)) } else { $0?.resume() } diff --git a/Sources/LiveKit/Support/Schedulers/SerialRunnerActor.swift b/Sources/LiveKit/Support/Schedulers/SerialRunnerActor.swift index f29f3fecf..d0f920b70 100644 --- a/Sources/LiveKit/Support/Schedulers/SerialRunnerActor.swift +++ b/Sources/LiveKit/Support/Schedulers/SerialRunnerActor.swift @@ -19,7 +19,7 @@ import Foundation actor SerialRunnerActor { private var previousTask: Task? - func run(block: @Sendable @escaping () async throws -> Value) async throws -> Value { + func run(block: @Sendable @escaping () async throws -> Value) async throws(LiveKitError) -> Value { let task = Task { [previousTask] in // Always wait for the previous task to maintain serial ordering if let previousTask { @@ -36,12 +36,18 @@ actor SerialRunnerActor { previousTask = task - return try await withTaskCancellationHandler { - // Await the current task's result - try await task.value - } onCancel: { - // Ensure the task is canceled when requested - task.cancel() + do { + return try await withTaskCancellationHandler { + // Await the current task's result + try await task.value + } onCancel: { + // Ensure the task is canceled when requested + task.cancel() + } + } catch { + // Convert non-LK errors (block throws, CancellationError from Task system) + // into LiveKitError so callers see a single typed error. + throw LiveKitError(from: error) } } } diff --git a/Sources/LiveKit/Support/Utils.swift b/Sources/LiveKit/Support/Utils.swift index b185067d7..b04de671e 100644 --- a/Sources/LiveKit/Support/Utils.swift +++ b/Sources/LiveKit/Support/Utils.swift @@ -135,7 +135,7 @@ class Utils: Loggable { reconnectMode: ReconnectMode? = nil, participantSid: Participant.Sid? = nil, adaptiveStream: Bool - ) throws -> URL { + ) throws(LiveKitError) -> URL { // use default options if nil let connectOptions = connectOptions ?? ConnectOptions() @@ -208,7 +208,7 @@ class Utils: Loggable { reconnectMode: ReconnectMode? = nil, participantSid: Participant.Sid? = nil, adaptiveStream: Bool - ) throws -> URL { + ) throws(LiveKitError) -> URL { let connectOptions = connectOptions ?? ConnectOptions() guard var builder = URLComponents(url: url, resolvingAgainstBaseURL: false) else { @@ -247,7 +247,7 @@ class Utils: Loggable { /// Converts a WebSocket URL to its HTTP validation counterpart. /// - `wss://host/rtc?...` → `https://host/rtc/validate?...` /// - `wss://host/rtc/v1?...` → `https://host/rtc/v1/validate?...` - static func toValidateUrl(_ wsUrl: URL) throws -> URL { + static func toValidateUrl(_ wsUrl: URL) throws(LiveKitError) -> URL { guard var components = URLComponents(url: wsUrl, resolvingAgainstBaseURL: false) else { throw LiveKitError(.failedToParseUrl) } @@ -266,7 +266,7 @@ class Utils: Loggable { reconnectMode: ReconnectMode?, participantSid: Participant.Sid?, adaptiveStream: Bool - ) throws -> String { + ) throws(LiveKitError) -> String { var joinRequest = Livekit_JoinRequest() joinRequest.clientInfo = Livekit_ClientInfo.with { $0.sdk = .swift @@ -290,11 +290,16 @@ class Utils: Loggable { } } - let joinRequestData = try joinRequest.serializedData() - let wrappedData = try Livekit_WrappedJoinRequest.with { - $0.compression = .none - $0.joinRequest = joinRequestData - }.serializedData() + let wrappedData: Data + do { + let joinRequestData = try joinRequest.serializedData() + wrappedData = try Livekit_WrappedJoinRequest.with { + $0.compression = .none + $0.joinRequest = joinRequestData + }.serializedData() + } catch { + throw LiveKitError(.failedToConvertData, internalError: error) + } return wrappedData.base64EncodedString() } diff --git a/Sources/LiveKit/Support/Video/DeviceManager.swift b/Sources/LiveKit/Support/Video/DeviceManager.swift index d36ae44f4..4bbe2bf9a 100644 --- a/Sources/LiveKit/Support/Video/DeviceManager.swift +++ b/Sources/LiveKit/Support/Video/DeviceManager.swift @@ -28,7 +28,7 @@ class DeviceManager: @unchecked Sendable, Loggable { } // Async version, waits until inital device fetch is complete - func devices() async throws -> [AVCaptureDevice] { + func devices() async throws(LiveKitError) -> [AVCaptureDevice] { try await _devicesCompleter.wait() } @@ -83,7 +83,7 @@ class DeviceManager: @unchecked Sendable, Loggable { private var _multiCamDeviceSetsObservation: NSKeyValueObservation? /// Find multi-cam compatible devices. - func multiCamCompatibleDevices(for devices: Set) async throws -> [AVCaptureDevice] { + func multiCamCompatibleDevices(for devices: Set) async throws(LiveKitError) -> [AVCaptureDevice] { let deviceSets = try await _multiCamDeviceSetsCompleter.wait() let compatibleDevices = deviceSets.filter { $0.isSuperset(of: devices) } diff --git a/Sources/LiveKit/Token/CachingTokenSource.swift b/Sources/LiveKit/Token/CachingTokenSource.swift index 0f649233f..162411123 100644 --- a/Sources/LiveKit/Token/CachingTokenSource.swift +++ b/Sources/LiveKit/Token/CachingTokenSource.swift @@ -68,6 +68,8 @@ public actor CachingTokenSource: TokenSourceConfigurable, Loggable { self.validator = validator } + // Conforms to TokenSourceConfigurable protocol whose requirement is untyped throws. + // swiftlint:disable:next public_typed_throws public func fetch(_ options: TokenRequestOptions) async throws -> TokenSourceResponse { if let (cachedOptions, cachedResponse) = await store.retrieve(), cachedOptions == options, diff --git a/Sources/LiveKit/Token/LiteralTokenSource.swift b/Sources/LiveKit/Token/LiteralTokenSource.swift index 400f9df83..932064c77 100644 --- a/Sources/LiveKit/Token/LiteralTokenSource.swift +++ b/Sources/LiveKit/Token/LiteralTokenSource.swift @@ -46,10 +46,11 @@ public struct LiteralTokenSource: TokenSourceFixed { self.roomName = roomName } + // Conforms to TokenSourceFixed protocol whose requirement is untyped throws. /// Returns the fixed credentials without any network requests. /// /// - Returns: A `TokenSourceResponse` containing the pre-configured credentials - public func fetch() async throws -> TokenSourceResponse { + public func fetch() async throws -> TokenSourceResponse { // swiftlint:disable:this public_typed_throws TokenSourceResponse(serverURL: serverURL, participantToken: participantToken, participantName: participantName, roomName: roomName) } } diff --git a/Sources/LiveKit/Track/Capturers/ARCameraCapturer.swift b/Sources/LiveKit/Track/Capturers/ARCameraCapturer.swift index 5020a13ce..db6fec576 100644 --- a/Sources/LiveKit/Track/Capturers/ARCameraCapturer.swift +++ b/Sources/LiveKit/Track/Capturers/ARCameraCapturer.swift @@ -36,14 +36,18 @@ public class ARCameraCapturer: VideoCapturer, @unchecked Sendable { super.init(delegate: delegate) } - override public func startCapture() async throws -> Bool { + override public func startCapture() async throws(LiveKitError) -> Bool { let didStart = try await super.startCapture() // Already started guard didStart else { return false } try await ensureCameraAccessAuthorized() - try await arKitSession.run([cameraFrameProvider]) + do { + try await arKitSession.run([cameraFrameProvider]) + } catch { + throw LiveKitError(.deviceAccessDenied, internalError: error) + } guard let format = CameraVideoFormat.supportedVideoFormats(for: .main, cameraPositions: [.left]).first, let frameUpdates = cameraFrameProvider.cameraFrameUpdates(for: format) @@ -63,7 +67,7 @@ public class ARCameraCapturer: VideoCapturer, @unchecked Sendable { return true } - private func ensureCameraAccessAuthorized() async throws { + private func ensureCameraAccessAuthorized() async throws(LiveKitError) { let queryResult = await arKitSession.queryAuthorization(for: [.cameraAccess]) switch queryResult[.cameraAccess] { case .denied: throw LiveKitError(.deviceAccessDenied) @@ -77,7 +81,7 @@ public class ARCameraCapturer: VideoCapturer, @unchecked Sendable { } } - override public func stopCapture() async throws -> Bool { + override public func stopCapture() async throws(LiveKitError) -> Bool { let didStop = try await super.stopCapture() // Already stopped guard didStop else { return false } diff --git a/Sources/LiveKit/Track/Capturers/CameraCapturer.swift b/Sources/LiveKit/Track/Capturers/CameraCapturer.swift index 4288bacc5..38f791b13 100644 --- a/Sources/LiveKit/Track/Capturers/CameraCapturer.swift +++ b/Sources/LiveKit/Track/Capturers/CameraCapturer.swift @@ -36,14 +36,14 @@ public class CameraCapturer: VideoCapturer, @unchecked Sendable { @objc public var options: CameraCaptureOptions { _cameraCapturerState.options } - @objc - public static func captureDevices() async throws -> [AVCaptureDevice] { + @nonobjc + public static func captureDevices() async throws(LiveKitError) -> [AVCaptureDevice] { try await DeviceManager.shared.devices() } /// Checks whether both front and back capturing devices exist, and can be switched. - @objc - public static func canSwitchPosition() async throws -> Bool { + @nonobjc + public static func canSwitchPosition() async throws(LiveKitError) -> Bool { let devices = try await captureDevices() return devices.contains(where: { $0.position == .front }) && devices.contains(where: { $0.position == .back }) @@ -114,9 +114,9 @@ public class CameraCapturer: VideoCapturer, @unchecked Sendable { } /// Switches the camera position between `.front` and `.back` if supported by the device. - @objc + @nonobjc @discardableResult - public func switchCameraPosition() async throws -> Bool { + public func switchCameraPosition() async throws(LiveKitError) -> Bool { // Cannot toggle if current position is unknown guard position != .unspecified else { log("Failed to toggle camera position", .error) @@ -127,9 +127,9 @@ public class CameraCapturer: VideoCapturer, @unchecked Sendable { } /// Sets the camera's position to `.front` or `.back` when supported. - @objc + @nonobjc @discardableResult - public func set(cameraPosition position: AVCaptureDevice.Position) async throws -> Bool { + public func set(cameraPosition position: AVCaptureDevice.Position) async throws(LiveKitError) -> Bool { log("set(cameraPosition:) \(position)") let newOptions = options.copyWith( device: .value(nil), @@ -139,9 +139,9 @@ public class CameraCapturer: VideoCapturer, @unchecked Sendable { } /// Sets new options at runtime and resstarts capturing. - @objc + @nonobjc @discardableResult - public func set(options newOptions: CameraCaptureOptions) async throws -> Bool { + public func set(options newOptions: CameraCaptureOptions) async throws(LiveKitError) -> Bool { log("set(options:) \(options)") // Update to new options @@ -152,7 +152,7 @@ public class CameraCapturer: VideoCapturer, @unchecked Sendable { } // swiftlint:disable:next cyclomatic_complexity function_body_length - override public func startCapture() async throws -> Bool { + override public func startCapture() async throws(LiveKitError) -> Bool { let didStart = try await super.startCapture() // Already started @@ -285,7 +285,11 @@ public class CameraCapturer: VideoCapturer, @unchecked Sendable { log("starting camera capturer device: \(device), format: \(selectedFormat), fps: \(selectedFps)(\(fpsRange))", .info) - try await capturer.startCapture(with: device, format: selectedFormat.format, fps: selectedFps) + do { + try await capturer.startCapture(with: device, format: selectedFormat.format, fps: selectedFps) + } catch { + throw LiveKitError(.webRTC, internalError: error) + } // Update internal vars _cameraCapturerState.mutate { $0.device = device } @@ -293,7 +297,7 @@ public class CameraCapturer: VideoCapturer, @unchecked Sendable { return true } - override public func stopCapture() async throws -> Bool { + override public func stopCapture() async throws(LiveKitError) -> Bool { let didStop = try await super.stopCapture() // Already stopped @@ -308,6 +312,45 @@ public class CameraCapturer: VideoCapturer, @unchecked Sendable { return true } + + // MARK: - Obj-C bridges + + // + // @objc disallows typed throws; preserve the originally auto-generated + // selectors for Obj-C consumers while keeping the typed APIs Swift-only. + + @available(swift, obsoleted: 1.0, message: "Use captureDevices()") + @objc(captureDevicesWithCompletionHandler:) + public static func _objc_captureDevices() async throws -> [AVCaptureDevice] { + try await captureDevices() + } + + @available(swift, obsoleted: 1.0, message: "Use canSwitchPosition()") + @objc(canSwitchPositionWithCompletionHandler:) + public static func _objc_canSwitchPosition() async throws -> Bool { + try await canSwitchPosition() + } + + @available(swift, obsoleted: 1.0, message: "Use switchCameraPosition()") + @objc(switchCameraPositionWithCompletionHandler:) + @discardableResult + public func _objc_switchCameraPosition() async throws -> Bool { + try await switchCameraPosition() + } + + @available(swift, obsoleted: 1.0, message: "Use set(cameraPosition:)") + @objc(setCameraPosition:completionHandler:) + @discardableResult + public func _objc_set(cameraPosition position: AVCaptureDevice.Position) async throws -> Bool { + try await set(cameraPosition: position) + } + + @available(swift, obsoleted: 1.0, message: "Use set(options:)") + @objc(setOptions:completionHandler:) + @discardableResult + public func _objc_set(options newOptions: CameraCaptureOptions) async throws -> Bool { + try await set(options: newOptions) + } } class VideoCapturerDelegateAdapter: NSObject, LKRTCVideoCapturerDelegate, Loggable { diff --git a/Sources/LiveKit/Track/Capturers/InAppCapturer.swift b/Sources/LiveKit/Track/Capturers/InAppCapturer.swift index 40f095d8d..bd28357f1 100644 --- a/Sources/LiveKit/Track/Capturers/InAppCapturer.swift +++ b/Sources/LiveKit/Track/Capturers/InAppCapturer.swift @@ -32,25 +32,29 @@ public class InAppScreenCapturer: VideoCapturer, @unchecked Sendable { super.init(delegate: delegate) } - override public func startCapture() async throws -> Bool { + override public func startCapture() async throws(LiveKitError) -> Bool { let didStart = try await super.startCapture() // Already started guard didStart else { return false } // TODO: force pixel format kCVPixelFormatType_420YpCbCr8BiPlanarFullRange - try await RPScreenRecorder.shared().startCapture { [weak self] sampleBuffer, type, _ in - guard let self else { return } - // Only process .video - if type == .video { - capture(sampleBuffer: sampleBuffer, capturer: capturer, options: options) + do { + try await RPScreenRecorder.shared().startCapture { [weak self] sampleBuffer, type, _ in + guard let self else { return } + // Only process .video + if type == .video { + capture(sampleBuffer: sampleBuffer, capturer: capturer, options: options) + } } + } catch { + throw LiveKitError(.webRTC, internalError: error) } return true } - override public func stopCapture() async throws -> Bool { + override public func stopCapture() async throws(LiveKitError) -> Bool { let didStop = try await super.stopCapture() // Already stopped diff --git a/Sources/LiveKit/Track/Capturers/MacOSScreenCapturer.swift b/Sources/LiveKit/Track/Capturers/MacOSScreenCapturer.swift index 610aeba53..0516ac42e 100644 --- a/Sources/LiveKit/Track/Capturers/MacOSScreenCapturer.swift +++ b/Sources/LiveKit/Track/Capturers/MacOSScreenCapturer.swift @@ -53,7 +53,7 @@ public class MacOSScreenCapturer: VideoCapturer, @unchecked Sendable { super.init(delegate: delegate) } - override public func startCapture() async throws -> Bool { + override public func startCapture() async throws(LiveKitError) -> Bool { let didStart = try await super.startCapture() // Already started @@ -104,18 +104,22 @@ public class MacOSScreenCapturer: VideoCapturer, @unchecked Sendable { // Why does SCStream hold strong reference to delegate? let stream = SCStream(filter: filter, configuration: configuration, delegate: self) - try stream.addStreamOutput(self, type: .screen, sampleHandlerQueue: nil) - if #available(macOS 13.0, *) { - try stream.addStreamOutput(self, type: .audio, sampleHandlerQueue: nil) + do { + try stream.addStreamOutput(self, type: .screen, sampleHandlerQueue: nil) + if #available(macOS 13.0, *) { + try stream.addStreamOutput(self, type: .audio, sampleHandlerQueue: nil) + } + try await stream.startCapture() + } catch { + throw LiveKitError(.webRTC, internalError: error) } - try await stream.startCapture() _screenCapturerState.mutate { $0.scStream = stream } return true } - override public func stopCapture() async throws -> Bool { + override public func stopCapture() async throws(LiveKitError) -> Bool { let didStop = try await super.stopCapture() // Already stopped @@ -130,8 +134,12 @@ public class MacOSScreenCapturer: VideoCapturer, @unchecked Sendable { $0.resendTimer = nil } - try await stream.stopCapture() - try stream.removeStreamOutput(self, type: .screen) + do { + try await stream.stopCapture() + try stream.removeStreamOutput(self, type: .screen) + } catch { + throw LiveKitError(.webRTC, internalError: error) + } _screenCapturerState.mutate { $0.scStream = nil diff --git a/Sources/LiveKit/Track/Capturers/VideoCapturer.swift b/Sources/LiveKit/Track/Capturers/VideoCapturer.swift index 421acf16a..e6417d4dd 100644 --- a/Sources/LiveKit/Track/Capturers/VideoCapturer.swift +++ b/Sources/LiveKit/Track/Capturers/VideoCapturer.swift @@ -136,9 +136,9 @@ public class VideoCapturer: NSObject, @unchecked Sendable, Loggable, VideoCaptur /// /// ``startCapture()`` and ``stopCapture()`` calls must be balanced. For example, if ``startCapture()`` is called 2 times, ``stopCapture()`` must be called 2 times also. /// Returns true when capturing should start, returns fals if capturing already started. - @objc + @nonobjc @discardableResult - public func startCapture() async throws -> Bool { + public func startCapture() async throws(LiveKitError) -> Bool { let didStart = _state.mutate { // Counter was 0, so did start capturing with this call let didStart = $0.startStopCounter == 0 @@ -162,9 +162,9 @@ public class VideoCapturer: NSObject, @unchecked Sendable, Loggable, VideoCaptur /// /// See ``startCapture()`` for more details. /// Returns true when capturing should stop, returns fals if capturing already stopped. - @objc + @nonobjc @discardableResult - public func stopCapture() async throws -> Bool { + public func stopCapture() async throws(LiveKitError) -> Bool { let didStop = _state.mutate { // Counter was already 0, so did NOT stop capturing with this call if $0.startStopCounter <= 0 { @@ -188,12 +188,40 @@ public class VideoCapturer: NSObject, @unchecked Sendable, Loggable, VideoCaptur return true } - @objc + @nonobjc @discardableResult - public func restartCapture() async throws -> Bool { + public func restartCapture() async throws(LiveKitError) -> Bool { try await stopCapture() return try await startCapture() } + + // MARK: - Obj-C bridges + + // + // @objc disallows typed throws; these shims preserve the original + // selectors (-startCaptureWithCompletionHandler:, etc.) for Obj-C + // consumers while letting Swift see the typed versions above. + + @available(swift, obsoleted: 1.0, message: "Use startCapture()") + @objc(startCaptureWithCompletionHandler:) + @discardableResult + public func _objc_startCapture() async throws -> Bool { + try await startCapture() + } + + @available(swift, obsoleted: 1.0, message: "Use stopCapture()") + @objc(stopCaptureWithCompletionHandler:) + @discardableResult + public func _objc_stopCapture() async throws -> Bool { + try await stopCapture() + } + + @available(swift, obsoleted: 1.0, message: "Use restartCapture()") + @objc(restartCaptureWithCompletionHandler:) + @discardableResult + public func _objc_restartCapture() async throws -> Bool { + try await restartCapture() + } } extension VideoCapturer { diff --git a/Sources/LiveKit/Track/Local/LocalAudioTrack.swift b/Sources/LiveKit/Track/Local/LocalAudioTrack.swift index 944bd68a1..93fe4319a 100644 --- a/Sources/LiveKit/Track/Local/LocalAudioTrack.swift +++ b/Sources/LiveKit/Track/Local/LocalAudioTrack.swift @@ -82,23 +82,26 @@ public class LocalAudioTrack: Track, LocalTrackProtocol, AudioTrackProtocol, @un captureOptions: options) } + // Conforms to @objc protocol LocalTrackProtocol; @objc disallows typed throws. + // swiftlint:disable:next public_typed_throws public func mute() async throws { try await super._mute() } + // swiftlint:disable:next public_typed_throws public func unmute() async throws { try await super._unmute() } // MARK: - Internal - override func startCapture() async throws { + override func startCapture() async throws(LiveKitError) { // AudioDeviceModule's InitRecording() and StartRecording() automatically get called by WebRTC, but // explicitly init & start it early to detect audio engine failures (mic not accessible for some reason, etc.). try AudioManager.shared.startLocalRecording() } - override func stopCapture() async throws { + override func stopCapture() async throws(LiveKitError) { cleanUpFrameWatcher() } } @@ -124,7 +127,7 @@ extension LocalAudioTrack { final class AudioFrameWatcher: AudioRenderer, Loggable { private let completer = AsyncCompleter(label: "Frame watcher", defaultTimeout: 5) - func wait() async throws { + func wait() async throws(LiveKitError) { try await completer.wait() } diff --git a/Sources/LiveKit/Track/Local/LocalVideoTrack.swift b/Sources/LiveKit/Track/Local/LocalVideoTrack.swift index 825a9fe3a..0b50a3145 100644 --- a/Sources/LiveKit/Track/Local/LocalVideoTrack.swift +++ b/Sources/LiveKit/Track/Local/LocalVideoTrack.swift @@ -43,21 +43,24 @@ public class LocalVideoTrack: Track, LocalTrackProtocol, @unchecked Sendable { reportStatistics: reportStatistics) } + // Conforms to @objc protocol LocalTrackProtocol; @objc disallows typed throws. + // swiftlint:disable:next public_typed_throws public func mute() async throws { try await super._mute() } + // swiftlint:disable:next public_typed_throws public func unmute() async throws { try await super._unmute() } // MARK: - Internal - override func startCapture() async throws { + override func startCapture() async throws(LiveKitError) { try await capturer.startCapture() } - override func stopCapture() async throws { + override func stopCapture() async throws(LiveKitError) { try await capturer.stopCapture() } } diff --git a/Sources/LiveKit/Track/Recorders/LocalAudioTrackRecorder.swift b/Sources/LiveKit/Track/Recorders/LocalAudioTrackRecorder.swift index ea17cc107..90ecbbb1a 100644 --- a/Sources/LiveKit/Track/Recorders/LocalAudioTrackRecorder.swift +++ b/Sources/LiveKit/Track/Recorders/LocalAudioTrackRecorder.swift @@ -64,7 +64,7 @@ public final class LocalAudioTrackRecorder: NSObject, Sendable, AudioRenderer { /// Starts capturing audio from the local track and returns a stream of audio data. /// - Returns: A stream of audio data. /// - Throws: An error if the audio track cannot be started. - public func start() async throws -> Stream { + public func start() async throws(LiveKitError) -> Stream { stop() try await track.startCapture() diff --git a/Sources/LiveKit/Track/Track.swift b/Sources/LiveKit/Track/Track.swift index afd7ff666..b65004f02 100644 --- a/Sources/LiveKit/Track/Track.swift +++ b/Sources/LiveKit/Track/Track.swift @@ -214,12 +214,12 @@ public class Track: NSObject, @unchecked Sendable, Loggable { } // Intended for child class to override - func startCapture() async throws {} + func startCapture() async throws(LiveKitError) {} // Intended for child class to override - func stopCapture() async throws {} + func stopCapture() async throws(LiveKitError) {} - public final func start() async throws { + public final func start() async throws(LiveKitError) { try await _startStopSerialRunner.run { [weak self] in guard let self else { return } guard _state.trackState != .started else { @@ -227,12 +227,12 @@ public class Track: NSObject, @unchecked Sendable, Loggable { return } try await startCapture() - if self is RemoteTrack { try await enable() } + if self is RemoteTrack { enable() } _state.mutate { $0.trackState = .started } } } - public final func stop() async throws { + public final func stop() async throws(LiveKitError) { try await _startStopSerialRunner.run { [weak self] in guard let self else { return } guard _state.trackState != .stopped else { @@ -240,14 +240,14 @@ public class Track: NSObject, @unchecked Sendable, Loggable { return } try await stopCapture() - if self is RemoteTrack { try await disable() } + if self is RemoteTrack { disable() } _state.mutate { $0.trackState = .stopped } } } // Returns true if didEnable @discardableResult - func enable() async throws -> Bool { + func enable() -> Bool { guard !mediaTrack.isEnabled else { return false } mediaTrack.isEnabled = true return true @@ -255,7 +255,7 @@ public class Track: NSObject, @unchecked Sendable, Loggable { // Returns true if didDisable @discardableResult - func disable() async throws -> Bool { + func disable() -> Bool { guard mediaTrack.isEnabled else { return false } mediaTrack.isEnabled = false return true @@ -286,7 +286,7 @@ public class Track: NSObject, @unchecked Sendable, Loggable { // Returns true if state updated @discardableResult - func onPublish() async throws -> Bool { + func onPublish() async -> Bool { // For LocalTrack only... guard self is LocalTrack else { return false } guard _state.publishState != .published else { return false } @@ -296,7 +296,7 @@ public class Track: NSObject, @unchecked Sendable, Loggable { // Returns true if state updated @discardableResult - func onUnpublish() async throws -> Bool { + func onUnpublish() async -> Bool { // For LocalTrack only... guard self is LocalTrack else { return false } guard _state.publishState != .unpublished else { return false } @@ -335,10 +335,10 @@ extension Track { // workaround for error: // @objc can only be used with members of classes, @objc protocols, and concrete extensions of classes // - func _mute() async throws { + func _mute() async throws(LiveKitError) { // LocalTrack only, already muted guard self is LocalTrack, !isMuted else { return } - try await disable() // Disable track first + disable() // Disable track first // Only stop if VideoTrack if self is LocalVideoTrack { try await stop() @@ -346,14 +346,14 @@ extension Track { set(muted: true, shouldSendSignal: true) } - func _unmute() async throws { + func _unmute() async throws(LiveKitError) { // LocalTrack only, already un-muted guard self is LocalTrack, isMuted else { return } // Only start if VideoTrack if self is LocalVideoTrack { try await start() } - try await enable() // Enable track + enable() // Enable track set(muted: false, shouldSendSignal: true) } } diff --git a/Sources/LiveKit/TrackPublications/LocalTrackPublication.swift b/Sources/LiveKit/TrackPublications/LocalTrackPublication.swift index 6e5057b5c..eac35a7ab 100644 --- a/Sources/LiveKit/TrackPublications/LocalTrackPublication.swift +++ b/Sources/LiveKit/TrackPublications/LocalTrackPublication.swift @@ -42,7 +42,7 @@ public class LocalTrackPublication: TrackPublication, @unchecked Sendable { private let _debounce = Debounce(delay: 0.1) - public func mute() async throws { + public func mute() async throws(LiveKitError) { guard let track = track as? LocalTrack else { throw LiveKitError(.invalidState, message: "track is nil or not a LocalTrack") } @@ -50,7 +50,7 @@ public class LocalTrackPublication: TrackPublication, @unchecked Sendable { try await track._mute() } - public func unmute() async throws { + public func unmute() async throws(LiveKitError) { guard let track = track as? LocalTrack else { throw LiveKitError(.invalidState, message: "track is nil or not a LocalTrack") } @@ -78,14 +78,14 @@ public class LocalTrackPublication: TrackPublication, @unchecked Sendable { } extension LocalTrackPublication { - func suspend() async throws { + func suspend() async throws(LiveKitError) { // Do nothing if already muted guard !isMuted else { return } try await mute() _suspended = true } - func resume() async throws { + func resume() async throws(LiveKitError) { // Do nothing if was not suspended guard _suspended else { return } try await unmute() diff --git a/Sources/LiveKit/TrackPublications/RemoteTrackPublication.swift b/Sources/LiveKit/TrackPublications/RemoteTrackPublication.swift index c74e9bd45..a1f949226 100644 --- a/Sources/LiveKit/TrackPublications/RemoteTrackPublication.swift +++ b/Sources/LiveKit/TrackPublications/RemoteTrackPublication.swift @@ -60,7 +60,7 @@ public class RemoteTrackPublication: TrackPublication, @unchecked Sendable { } /// Subscribe or unsubscribe from this track. - public func set(subscribed newValue: Bool) async throws { + public func set(subscribed newValue: Bool) async throws(LiveKitError) { guard _state.isSubscribePreferred != newValue else { return } let participant = try await requireParticipant() @@ -85,7 +85,7 @@ public class RemoteTrackPublication: TrackPublication, @unchecked Sendable { /// Enable or disable server from sending down data for this track. /// /// This is useful when the participant is off screen, you may disable streaming down their video to reduce bandwidth requirements. - public func set(enabled newValue: Bool) async throws { + public func set(enabled newValue: Bool) async throws(LiveKitError) { // No-op if already the desired value let trackSettings = _state.trackSettings guard trackSettings.isEnabled != newValue else { return } @@ -98,7 +98,7 @@ public class RemoteTrackPublication: TrackPublication, @unchecked Sendable { } /// Set preferred video FPS for this track. - public func set(preferredFPS newValue: UInt) async throws { + public func set(preferredFPS newValue: UInt) async throws(LiveKitError) { // No-op if already the desired value let trackSettings = _state.trackSettings guard trackSettings.preferredFPS != newValue else { return } @@ -114,7 +114,7 @@ public class RemoteTrackPublication: TrackPublication, @unchecked Sendable { /// /// Based on this value, server will decide which layer to send. /// Use ``RemoteTrackPublication/set(videoQuality:)`` to explicitly set layer instead. - public func set(preferredDimensions newValue: Dimensions) async throws { + public func set(preferredDimensions newValue: Dimensions) async throws(LiveKitError) { // No-op if already the desired value let trackSettings = _state.trackSettings guard trackSettings.dimensions != newValue else { return } @@ -131,7 +131,7 @@ public class RemoteTrackPublication: TrackPublication, @unchecked Sendable { /// This indicates the highest quality the client can accept. if network /// bandwidth does not allow, server will automatically reduce quality to /// optimize for uninterrupted video. - public func set(videoQuality newValue: VideoQuality) async throws { + public func set(videoQuality newValue: VideoQuality) async throws(LiveKitError) { // No-op if already the desired value let trackSettings = _state.trackSettings guard trackSettings.videoQuality != newValue else { return } @@ -209,7 +209,7 @@ private extension RemoteTrackPublication { return room._state.connectionState } - func checkUserCanModifyTrackSettings() async throws { + func checkUserCanModifyTrackSettings() async throws(LiveKitError) { // adaptiveStream must be disabled and must be subscribed if isAdaptiveStreamEnabled || !isSubscribed { throw LiveKitError(.invalidState, message: "adaptiveStream must be disabled and track must be subscribed") @@ -266,7 +266,7 @@ extension RemoteTrackPublication { } // attempt to send track settings - func send(trackSettings newValue: TrackSettings) async throws { + func send(trackSettings newValue: TrackSettings) async throws(LiveKitError) { let participant = try await requireParticipant() let room = try participant.requireRoom() diff --git a/Sources/LiveKit/TrackPublications/TrackPublication.swift b/Sources/LiveKit/TrackPublications/TrackPublication.swift index f8c9135b4..d9dc38363 100644 --- a/Sources/LiveKit/TrackPublications/TrackPublication.swift +++ b/Sources/LiveKit/TrackPublications/TrackPublication.swift @@ -216,7 +216,7 @@ extension TrackPublication: TrackDelegateInternal { // MARK: - Internal helpers extension TrackPublication { - func requireParticipant() async throws -> Participant { + func requireParticipant() async throws(LiveKitError) -> Participant { guard let participant else { log("Participant is nil", .error) throw LiveKitError(.invalidState, message: "Participant is nil") diff --git a/Tests/LiveKitAudioTests/AudioEngineObserver.swift b/Tests/LiveKitAudioTests/AudioEngineObserver.swift index c3a118934..fca254fa1 100644 --- a/Tests/LiveKitAudioTests/AudioEngineObserver.swift +++ b/Tests/LiveKitAudioTests/AudioEngineObserver.swift @@ -62,7 +62,7 @@ final class TestEngineObserver: AudioEngineObserver, @unchecked Sendable { testObserver.shouldSucceed = false // Stop - #expect(throws: (any Error).self) { try AudioManager.shared.stopLocalRecording() } + #expect(throws: LiveKitError.self) { try AudioManager.shared.stopLocalRecording() } #expect(!AudioManager.shared.isEngineRunning) testObserver.shouldSucceed = true @@ -73,7 +73,7 @@ final class TestEngineObserver: AudioEngineObserver, @unchecked Sendable { testObserver.shouldSucceed = false // Attempt to start, should fail - #expect(throws: (any Error).self) { try AudioManager.shared.startLocalRecording() } + #expect(throws: LiveKitError.self) { try AudioManager.shared.startLocalRecording() } #expect(!AudioManager.shared.isEngineRunning) // Switch to manual mode diff --git a/Tests/LiveKitAudioTests/AudioEnginePermission.swift b/Tests/LiveKitAudioTests/AudioEnginePermission.swift index 864df6320..e7bdb9722 100644 --- a/Tests/LiveKitAudioTests/AudioEnginePermission.swift +++ b/Tests/LiveKitAudioTests/AudioEnginePermission.swift @@ -36,7 +36,7 @@ import LiveKitTestSupport AudioManager.shared.set(engineObservers: []) // Attempt to start, should fail - #expect(throws: (any Error).self) { try AudioManager.shared.startLocalRecording() } + #expect(throws: LiveKitError.self) { try AudioManager.shared.startLocalRecording() } #expect(!AudioManager.shared.isEngineRunning) // Set audio session engine observers diff --git a/Tests/LiveKitAudioTests/PublishDeviceOptimization.swift b/Tests/LiveKitAudioTests/PublishDeviceOptimization.swift index dcc0ede8a..1286b12a8 100644 --- a/Tests/LiveKitAudioTests/PublishDeviceOptimization.swift +++ b/Tests/LiveKitAudioTests/PublishDeviceOptimization.swift @@ -27,20 +27,20 @@ import LiveKitTestSupport // Default publish flow @Test func defaultMicPublish() async throws { - var sw = Stopwatch(label: "Test: Normal publish sequence") + var span = Span(label: "Test: Normal publish sequence") let room1Opts = RoomTestingOptions(url: url, token: token, canPublish: true) try await TestEnvironment.withRooms([room1Opts]) { rooms in - sw.split(label: "Connected to room") + span.record("Connected to room") // Alias to Rooms let room1 = rooms[0] try await room1.localParticipant.setMicrophone(enabled: true) - sw.split(label: "Did publish mic") + span.record("Did publish mic") } - sw.split(label: "Sequence complete") - print(sw) + span.record("Sequence complete") + print(span) - print("Total time: \(sw.total())") + print("Total time: \(span.total())") } // No-VP publish flow @@ -48,39 +48,39 @@ import LiveKitTestSupport // Turn off Apple's VP try AudioManager.shared.setVoiceProcessingEnabled(false) - var sw = Stopwatch(label: "Test: No-VP publish sequence") + var span = Span(label: "Test: No-VP publish sequence") let room1Opts = RoomTestingOptions(url: url, token: token, canPublish: true) try await TestEnvironment.withRooms([room1Opts]) { rooms in - sw.split(label: "Connected to room") + span.record("Connected to room") // Alias to Rooms let room1 = rooms[0] try await room1.localParticipant.setMicrophone(enabled: true) - sw.split(label: "Did publish mic") + span.record("Did publish mic") } - sw.split(label: "Sequence complete") - print(sw) + span.record("Sequence complete") + print(span) - print("Total time: \(sw.total())") + print("Total time: \(span.total())") } // Concurrent device acquisition publish flow @Test func concurrentMicPublish() async throws { - var sw = Stopwatch(label: "Test: Normal publish sequence") + var span = Span(label: "Test: Normal publish sequence") let room1Opts = RoomTestingOptions(url: url, token: token, enableMicrophone: true, canPublish: true) try await TestEnvironment.withRooms([room1Opts]) { rooms in - sw.split(label: "Connected to room") + span.record("Connected to room") // Alias to Rooms let room1 = rooms[0] // Mic should be already enabled at this point let isMicEnabled = room1.localParticipant.isMicrophoneEnabled() #expect(isMicEnabled, "Mic should be enabled at this point") - sw.split(label: "Did publish mic") + span.record("Did publish mic") } - sw.split(label: "Sequence complete") - print(sw) + span.record("Sequence complete") + print(span) - print("Total time: \(sw.total())") + print("Total time: \(span.total())") } } diff --git a/Tests/LiveKitCoreTests/CompleterTests.swift b/Tests/LiveKitCoreTests/CompleterTests.swift index 177d9abd6..6c63b96aa 100644 --- a/Tests/LiveKitCoreTests/CompleterTests.swift +++ b/Tests/LiveKitCoreTests/CompleterTests.swift @@ -108,4 +108,125 @@ struct CompleterTests { print("Unknown error: \(error)") } } + + @Test func resetThrowingPropagatesTypedError() async { + let completer = AsyncCompleter(label: "reset-throwing", defaultTimeout: 30) + let task = Task { try await completer.wait() } + await waitForRegistration(of: completer) + + completer.reset(throwing: LiveKitError(.network, message: "transport failed")) + + await expectLiveKitError(.network, from: task) + } + + @Test func taskCancellationStillProducesCancelled() async { + let completer = AsyncCompleter(label: "task-cancel", defaultTimeout: 30) + let task = Task { try await completer.wait() } + await waitForRegistration(of: completer) + + task.cancel() + + await expectLiveKitError(.cancelled, from: task) + } + + @Test func resetClearsResultForReuse() async throws { + let completer = AsyncCompleter(label: "reuse-after-throw", defaultTimeout: 30) + + let firstTask = Task { try await completer.wait() } + await waitForRegistration(of: completer) + completer.reset(throwing: LiveKitError(.network)) + _ = await firstTask.result + + let secondTask = Task { try await completer.wait() } + await waitForRegistration(of: completer) + completer.resume(returning: ()) + try await secondTask.value + } +} + +private func waitForRegistration(of completer: AsyncCompleter) async { + while completer.waiterCount == 0 { + await Task.yield() + } +} + +private func expectLiveKitError(_ expected: LiveKitErrorType, from task: Task) async { + do { + _ = try await task.value + Issue.record("Expected LiveKitError(.\(expected)) to be thrown") + } catch let error as LiveKitError { + #expect(error.type == expected) + } catch { + Issue.record("Expected LiveKitError, got \(error)") + } +} + +@Suite(.tags(.concurrency)) +struct CompleterMapActorTests { + @Test func resetThrowingFanOutsTypedErrorToAllCompleters() async { + let map = CompleterMapActor(label: "map-test", defaultTimeout: 30) + + let completerA = await map.completer(for: "a") + let completerB = await map.completer(for: "b") + + let taskA = Task { try await completerA.wait() } + let taskB = Task { try await completerB.wait() } + + await waitForRegistration(of: completerA) + await waitForRegistration(of: completerB) + + await map.reset(throwing: LiveKitError(.network, message: "fan-out")) + + await expectLiveKitError(.network, from: taskA) + await expectLiveKitError(.network, from: taskB) + } + + @Test func resetWithoutErrorDefaultsToCancelled() async { + let map = CompleterMapActor(label: "map-test", defaultTimeout: 30) + let completer = await map.completer(for: "a") + let task = Task { try await completer.wait() } + + await waitForRegistration(of: completer) + + await map.reset() + + await expectLiveKitError(.cancelled, from: task) + } + + @Test func resumeThrowingForMissingKeyIsNoOp() async throws { + let map = CompleterMapActor(label: "no-op-test", defaultTimeout: 30) + + // No completer for the key yet — resume(throwing:) must not auto-create. + await map.resume(throwing: LiveKitError(.participantRemoved), for: "absent") + + // Subsequent wait on the same key must NOT see a stale "remembered" failure. + let completer = await map.completer(for: "absent") + let task = Task { try await completer.wait() } + await waitForRegistration(of: completer) + completer.resume(returning: ()) + try await task.value + } + + @Test func resumeReturningForMissingKeyRemembersSuccess() async throws { + let map = CompleterMapActor(label: "remember-success", defaultTimeout: 30) + + // resume(returning:) on a missing key creates and remembers the value. + await map.resume(returning: (), for: "key") + + // A later wait must see the success immediately. + let completer = await map.completer(for: "key") + try await completer.wait() + } + + @Test func resumeThrowingReachesExistingWaiter() async { + let map = CompleterMapActor(label: "existing-waiter", defaultTimeout: 30) + + let completer = await map.completer(for: "key") + let task = Task { try await completer.wait() } + await waitForRegistration(of: completer) + + await map.resume(throwing: LiveKitError(.network), for: "key") + + await expectLiveKitError(.network, from: task) + } } diff --git a/Tests/LiveKitCoreTests/MockDataChannelPair.swift b/Tests/LiveKitCoreTests/MockDataChannelPair.swift index 97a2f8ef0..d5fcaa06b 100644 --- a/Tests/LiveKitCoreTests/MockDataChannelPair.swift +++ b/Tests/LiveKitCoreTests/MockDataChannelPair.swift @@ -24,7 +24,7 @@ class MockDataChannelPair: DataChannelPair, @unchecked Sendable { self.packetHandler = packetHandler } - override func send(dataPacket packet: Livekit_DataPacket) async throws { + override func send(dataPacket packet: Livekit_DataPacket) async throws(LiveKitError) { packetHandler(packet) } } diff --git a/Tests/LiveKitCoreTests/Participant/RemoteParticipantTests.swift b/Tests/LiveKitCoreTests/Participant/RemoteParticipantTests.swift index 8940840d2..51313a565 100644 --- a/Tests/LiveKitCoreTests/Participant/RemoteParticipantTests.swift +++ b/Tests/LiveKitCoreTests/Participant/RemoteParticipantTests.swift @@ -39,7 +39,7 @@ struct RemoteParticipantTests { let disconnected = try #require(rooms[0].remoteParticipants.values.first) disconnected.set(info: .init(), connectionState: .disconnected) - await #expect(throws: (any Error).self) { try await disconnected.waitUntilActive(timeout: self.timeout) } + await #expect(throws: LiveKitError.self) { try await disconnected.waitUntilActive(timeout: self.timeout) } } } diff --git a/Tests/LiveKitCoreTests/Room/RoomTests.swift b/Tests/LiveKitCoreTests/Room/RoomTests.swift index 66ad13352..7f82e81f4 100644 --- a/Tests/LiveKitCoreTests/Room/RoomTests.swift +++ b/Tests/LiveKitCoreTests/Room/RoomTests.swift @@ -152,8 +152,7 @@ private struct WeakRoomRefs: @unchecked Sendable { localParticipant = room.localParticipant for remoteParticipant in room.remoteParticipants.values { - weak var weakRP: RemoteParticipant? = remoteParticipant - remoteParticipantChecks.append { weakRP == nil } + remoteParticipantChecks.append { [weak remoteParticipant] in remoteParticipant == nil } } state = room._state diff --git a/Tests/LiveKitCoreTests/Track/TrackTests.swift b/Tests/LiveKitCoreTests/Track/TrackTests.swift index 730724127..52543d9fe 100644 --- a/Tests/LiveKitCoreTests/Track/TrackTests.swift +++ b/Tests/LiveKitCoreTests/Track/TrackTests.swift @@ -29,11 +29,11 @@ class TestTrack: LocalAudioTrack, @unchecked Sendable { super.init(name: "test_audio_track", source: .microphone, track: _track, reportStatistics: false, captureOptions: AudioCaptureOptions()) } - override func startCapture() async throws { + override func startCapture() async throws(LiveKitError) { try? await Task.sleep(nanoseconds: UInt64(Double.random(in: 0.0 ... 1.0) * 1_000_000)) } - override func stopCapture() async throws { + override func stopCapture() async throws(LiveKitError) { try? await Task.sleep(nanoseconds: UInt64(Double.random(in: 0.0 ... 1.0) * 1_000_000)) } } diff --git a/Tests/LiveKitTestSupport/Tracks.swift b/Tests/LiveKitTestSupport/Tracks.swift index 046b4df8b..3cca75a7a 100644 --- a/Tests/LiveKitTestSupport/Tracks.swift +++ b/Tests/LiveKitTestSupport/Tracks.swift @@ -192,8 +192,8 @@ public class VideoTrackWatcher: TrackDelegate, VideoRenderer, @unchecked Sendabl /// LocalAudioTrack subclass that bypasses AudioManager, avoiding audio engine errors in test environments. public class TestAudioTrack: LocalAudioTrack, @unchecked Sendable { - override public func startCapture() async throws {} - override public func stopCapture() async throws {} + override public func startCapture() async throws(LiveKitError) {} + override public func stopCapture() async throws(LiveKitError) {} // Bypass frame-waiting since no real audio engine is running. override public func startWaitingForFrames() async throws {}