Skip to content

DataChannel: document state-machine events, concurrency fixes#1014

Open
pblazej wants to merge 10 commits into
mainfrom
blaze/dc-refactor
Open

DataChannel: document state-machine events, concurrency fixes#1014
pblazej wants to merge 10 commits into
mainfrom
blaze/dc-refactor

Conversation

@pblazej
Copy link
Copy Markdown
Contributor

@pblazej pblazej commented May 21, 2026

Follow-up to #977. No public API change.

  • Fix three concurrency bugs: TTLDictionary iterated outside its lock; check-then-mutate dedup race; rtcAmount over-counted on sendData failure
  • Fold e2eeManager under _state's lock behind set(e2eeManager:)
  • Run publisher transport + DC pre-flight waits concurrently via async let
  • Align DC-open timeout with client-sdk-js / -rust (7s → 15s)
  • Document and rename ChannelEvent state-machine cases (sendRequested / sendDispatched)
  • Tidy a few rough edges

🤖 Generated with Claude Code

pblazej and others added 2 commits May 21, 2026 09:05
…ent events

Three small alignments with the rest of the LiveKit SDK family, plus a docs
pass on the internal event enum:

* `defaultPublisherDataChannelOpen`: 7s → 15s. Matches client-sdk-js
  (`peerConnectionTimeout`) and client-sdk-rust (`ICE_CONNECT_TIMEOUT`);
  Android uses 20s. The previous 7s was the shortest of any SDK and may
  be a factor in first-send-times-out reports against neighbouring SDKs.
* `Room.ensurePublisherConnected` now waits on the transport-connected
  completer and the data-channel-open completer concurrently via
  `async let`, instead of sequentially. Matches JS/Rust's single
  combined readiness gate. Caller-Task cancellation is already
  propagated through `AsyncCompleter.wait()`'s own
  `withTaskCancellationHandler`, so no additional work is needed.
* `ChannelEvent.Detail`: rename `publishData` → `sendRequested` and
  `publishedData` → `sendDispatched` so the request/post-dispatch
  distinction is obvious at a glance, and add docstrings to every case
  describing exactly which producer yields it and what the consumer
  does in response.

No behaviour change beyond the timeout bump; public API unchanged.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Drops the bare `var e2eeManager: E2EEManager?` (cross-thread read on the
receive path, write from Room's connect) by folding the field into the
existing `_state: StateSync<State>` alongside `lossy`, `reliable`, etc.
Reads use dynamic member lookup (`_state.e2eeManager`); writes go
through a new `set(e2eeManager:)` method matching the existing
`set(reliable:)` / `set(lossy:)` style. No separate `StateSync`
instance — one lock for the whole struct.

This differs from `Room`'s pattern (separate `_e2eeManager` outside
`_state`) on purpose: Room keeps it out because `Room.State.onDidMutate`
triggers the engine-delegate callback chain on every mutation;
`DataChannelPair._state` has no `onDidMutate` registered, so the field
can live inside without side effects.

Room's two pairs of assignments updated to use `set(e2eeManager:)`.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@github-actions
Copy link
Copy Markdown

github-actions Bot commented May 21, 2026

⚠️ This PR does not contain any files in the .changes directory.

@pblazej pblazej changed the title DataChannelPair: align with sibling SDKs, document state-machine events DataChannel: align with other SDKs, document state-machine events May 21, 2026
1. `receiveStates()` was iterating `_state.reliableReceivedState`
   outside the lock — dynamic-member lookup returns the underlying
   `TTLDictionary` *reference* under lock, but the subsequent `.map`
   ran without it. A concurrent `didReceiveMessageWith` write could
   trip `Dictionary`'s "mutated during enumeration" trap. Wrapped
   the whole map in `_state.read`.

2. The reliable-receive dedup was a check-then-mutate split across
   two lock acquisitions: read `reliableReceivedState[sid]`, then
   `_state.mutate` to update. Two concurrent receives for the same
   sender with the same new sequence could both pass the dedup
   check before either updated, causing duplicate delivery to the
   `DataChannelDelegate`. Collapsed into a single `_state.mutate`
   that returns whether the packet is a duplicate.

3. `processSendQueue` incremented `buffer.rtcAmount` *before* the
   `channel.sendData` success check, so a failed send left the
   backpressure counter inflated until a later `bufferedAmountChanged`
   ran into the "Unexpected buffer size" guard in `updateTarget` and
   reset it to 0. Moved the increment after the `sendData` guard so
   the counter only reflects bytes actually queued by WebRTC.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@pblazej pblazej changed the title DataChannel: align with other SDKs, document state-machine events DataChannel: document state-machine events, correctness May 21, 2026
* `set(reliable:)` and `set(lossy:)` were 90% identical; both now
  delegate to a private `setChannel(_:kind:)` helper that picks the
  field via the `ChannelKind` switch.
* The `// MARK: - Cache` section header was misleading — that block
  is the backpressure-counter / retry-buffer update helpers, not a
  cache. Renamed to `// MARK: - Buffer helpers`.
* `infos()` collapsed from `_state.read { … }.compactMap(\.self).map { … }`
  into a single `_state.read { state in [state.lossy, state.reliable].compactMap { $0?.toLKInfoType() } }` —
  one closure, one lock hold, no two-step nil filter.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@pblazej pblazej changed the title DataChannel: document state-machine events, correctness DataChannel: document state-machine events, concurrency fixes May 21, 2026
Adds four `@Suite(.tags(.dataChannel))` unit tests that exercise the
parts of `DataChannelPair` reachable without an actual `LKRTCDataChannel`:

* `openCompleter` times out with `LiveKitError(.timedOut)` when channels
  never arrive (smoke test for the 7s→15s timeout bump in this PR).
* `reset(throwing: someError)` resumes parked sends with that exact
  error.
* `reset(throwing: nil)` resumes parked sends with
  `LiveKitError(.cancelled)`.
* `openCompleter.wait()` honors caller-Task cancellation and throws
  `LiveKitError(.cancelled)` — the cancellation propagation we relied
  on for the combined `async let` gate in `Room.ensurePublisherConnected`.

Whole suite runs in ~100ms (no E2E setup). Anything that needs real
`sendData` dispatch or `bufferedAmount` drains is still covered by
`RealiableDataChannelTests` / `EncryptedDataChannelTests`.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@pblazej pblazej force-pushed the blaze/dc-refactor branch from 3351d83 to bd8975b Compare May 21, 2026 08:29
pblazej and others added 2 commits May 21, 2026 10:32
Both `CompleterTests` and the new `DataChannelPairTests` had file-local
copies of the same two helpers. Move them to
`Tests/LiveKitCoreTests/TestHelpers.swift` (a regular `internal` file
inside the test target — `LiveKitTestSupport` can't import `Testing`).

`expectLiveKitError` now forwards `#_sourceLocation` so failures point
at the test that called it, not the helper itself.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Three improvements to `RealiableDataChannelTests`:

* The previous size-only assertion (`receivedString.count == ...`)
  passes even if reliable delivery reordered packets or substituted
  one drop for one duplicate. Each send now carries a 4-byte LE
  sequence prefix and the receiver asserts the received indices
  equal `[0, 1, …, iterations-1]` — exact, in send order, no dupes.
* Adds a no-reconnect baseline. The single existing test always went
  through dual reconnect, so a regression in the basic reliable-send
  path was ambiguous with a reconnect-resume regression. The
  `.none` case gives bisection signal.
* Single `@Test` becomes `@Test(arguments: [.none, .sender, .receiver, .both])`
  parameterized over a `ReconnectMode` enum — four reconnect-shape
  variants share 90% of the test body and now report as distinct
  cases. Also pulls magic timing literals into named locals and
  comments the `withRooms`-teardown polling loop.

All four cases pass; total runtime ~45s.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Introduces more failure modes/parametrized tests

pblazej and others added 2 commits May 21, 2026 11:04
Extends the `reliableDelivery(mode:)` parameterization with two more
reconnect timings:

* `.simultaneous` — sender and receiver both reconnect at 300ms.
  Covers the cross-reconnect race where the server's `lastMessageSeq`
  advertisement and the client's retry-buffer replay arrive at a peer
  that is itself mid-handshake. Existing `.both` staggered the two
  at 200ms / 400ms and avoided that overlap.

* `.bothLate` — sender at 2000ms, receiver at 3000ms. At the 50ms
  send cadence this puts both reconnects mid-burst, with ~40 packets
  already in the retry buffer waiting to be trimmed/replayed. The
  previous early-reconnect timings only exercised a 4–8 entry replay
  set; this stresses the larger-replay path (`trim(toAmount:)`,
  `retryReliable(lastSequence:)` with non-trivial input).

Refactor: delays are now properties on `ReconnectMode` instead of
local constants in the test body, and the `reconnectsSender` /
`reconnectsReceiver` bool helpers are replaced by the
optional-delay form. Receive deadline bumped 10s → 15s so .bothLate
has room to drain after its 3s reconnect.

3× local runs, all 6 cases pass each time; ~67s per run.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Storing the handler task as AnyTaskCancellable on the Descriptor coupled
its lifetime to the descriptor: when the trailer arrived, closeStream
dropped the descriptor and the wrapper's deinit cancelled the handler.
For RPC v2, the user method handler runs *after* reader.readAll() returns
(awaiting I/O, then publishing a response stream), so any cancellation
point in that window surfaced as APPLICATION_ERROR (1500) on the caller —
the failure mode behind the BM-RPC-004/005/006 benchmark regressions.

Spawn the handler via Task.detachedDiscarding instead: lifetime is
decoupled from the descriptor, and the manager actor's event loop stays
free of user-handler work. Abnormal stream conditions still propagate to
the handler through `source` throwing (encryption mismatch, length
exceeded, terminated on manager deinit).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
openStreams[info.id] = descriptor
// Detached: handler lifetime is not tied to the descriptor — abnormal stream
// conditions are signalled through `source` throwing instead.
Task.detachedDiscarding {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixes a pre-existing abnormal-cancellation bug exposed by RPC v2 benchmarks (failing run) — the old Task was wrapped in AnyTaskCancellable whose deinit cancelled mid-handler awaits when the trailer arrived.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Recent protobuf update broke compatibility with swift 6.0, I'm seriously considering dropping support as well - the AppStore requirement is already 26.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant