diff --git a/Sources/Relays/CurrentValueRelay.swift b/Sources/Relays/CurrentValueRelay.swift index 6fa08ec..5d9f448 100644 --- a/Sources/Relays/CurrentValueRelay.swift +++ b/Sources/Relays/CurrentValueRelay.swift @@ -19,6 +19,8 @@ import Combine public class CurrentValueRelay: Relay { public var value: Output { storage.value } private let storage: CurrentValueSubject + private let subscriptionsLock = Lock() + // All modifications to `subscriptions` should be protected by `lock` to ensure thread-safety. private var subscriptions = [Subscription< CurrentValueSubject, AnySubscriber @@ -40,7 +42,9 @@ public class CurrentValueRelay: Relay { public func receive(subscriber: S) where Output == S.Input, Failure == S.Failure { let subscription = Subscription(upstream: storage, downstream: AnySubscriber(subscriber)) + subscriptionsLock.lock() subscriptions.append(subscription) + subscriptionsLock.unlock() subscriber.receive(subscription: subscription) } @@ -53,8 +57,9 @@ public class CurrentValueRelay: Relay { } deinit { - // Send a finished event upon dealloation + // Send a finished event upon deallocation subscriptions.forEach { $0.forceFinish() } + subscriptionsLock.cleanupLock() } } diff --git a/Tests/CurrentValueRelayTests.swift b/Tests/CurrentValueRelayTests.swift index fb8035b..bc9cbd8 100644 --- a/Tests/CurrentValueRelayTests.swift +++ b/Tests/CurrentValueRelayTests.swift @@ -332,5 +332,13 @@ class CurrentValueRelayTests: XCTestCase { XCTAssertTrue(StoredObject2.storedObjectReleased) XCTAssertNil(container) } + + func testConcurrentAccessToCurrentValueRelaySubscriptions() throws { + nonisolated(unsafe) let relay = try XCTUnwrap(relay) + + DispatchQueue.concurrentPerform(iterations: 1000) { _ in + _ = relay.sink { _ in } + } + } } #endif