Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .changeset/nested-toarray-shared-buffer-overlap.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
'@tanstack/db': patch
---

fix(db): nested `toArray` includes dropping children when sibling parent groups share a correlation key

With three (or more) levels of nested `toArray` includes, when two children in different parent groups shared the same deepest correlation key, only one of them received the nested rows and the other came back as an empty array. The nested-pipeline routing index mapped each nested correlation key to a single parent group and the shared buffer entry was deleted after routing to the first match, so sibling groups sharing the key were dropped.

The routing index now maps a nested correlation key to all parent groups that reference it and fans buffered grandchild changes out to each. A per-level snapshot of already-materialized rows also seeds parent groups that start referencing an existing correlation key after the rows were drained (e.g. inserted after the initial load), since the pipeline does not re-emit them.
213 changes: 181 additions & 32 deletions packages/db/src/query/live/collection-config-builder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1150,6 +1150,13 @@ function createOrderByComparator<T extends object>(
}
}

type SnapshotRow = {
value: any
orderByIndex: string | undefined
/** Net multiplicity (inserts − deletes) currently materialized for this row */
count: number
}

/**
* Shared buffer setup for a single nested includes level.
* Pipeline output writes into the buffer; during flush the buffer is drained
Expand All @@ -1159,6 +1166,15 @@ type NestedIncludesSetup = {
compilationResult: IncludesCompilationResult
/** Shared buffer: nestedCorrelationKey → Map<childKey, Changes> */
buffer: Map<unknown, Map<unknown, Changes<any>>>
/**
* Cumulative net-present grandchild rows per nested correlation key. The
* buffer holds only deltas since the last drain and is cleared once drained,
* so a parent group that starts referencing an existing correlation key
* *after* the rows were already drained (the pipeline does not re-emit them)
* would otherwise see nothing. The snapshot lets such late-arriving parent
* groups be seeded with the rows their siblings already received.
*/
snapshot: Map<unknown, Map<unknown, SnapshotRow>>
/** For 3+ levels of nesting */
nestedSetups?: Array<NestedIncludesSetup>
}
Expand All @@ -1184,8 +1200,14 @@ type IncludesOutputState = {
correlationToParentKeys: Map<unknown, Set<unknown>>
/** Shared nested pipeline setups (one per nested includes level) */
nestedSetups?: Array<NestedIncludesSetup>
/** nestedCorrelationKey → parentCorrelationKey */
nestedRoutingIndex?: Map<unknown, unknown>
/**
* nestedCorrelationKey → Set<parentCorrelationKey>.
* One nested correlation key can map to multiple parent groups when sibling
* parents share the same correlation value (e.g. two price ranges that
* reference the same region), so buffered grandchild changes must fan out to
* every parent group rather than a single one.
*/
nestedRoutingIndex?: Map<unknown, Set<unknown>>
/** parentCorrelationKey → Set<nestedCorrelationKeys> */
nestedRoutingReverseIndex?: Map<unknown, Set<unknown>>
}
Expand Down Expand Up @@ -1298,6 +1320,7 @@ function setupNestedPipelines(
const setup: NestedIncludesSetup = {
compilationResult: entry,
buffer,
snapshot: new Map(),
}

// Recursively set up deeper levels
Expand Down Expand Up @@ -1342,6 +1365,85 @@ function createPerEntryIncludesStates(
})
}

/**
* Folds a drained delta into a nested setup's cumulative snapshot, tracking the
* net multiplicity per child row and dropping rows (and empty keys) once their
* net count reaches zero.
*/
function accumulateSnapshot(
setup: NestedIncludesSetup,
nestedCorrelationKey: unknown,
childChanges: Map<unknown, Changes<any>>,
): void {
let snap = setup.snapshot.get(nestedCorrelationKey)
if (!snap) {
snap = new Map()
setup.snapshot.set(nestedCorrelationKey, snap)
}

for (const [childKey, changes] of childChanges) {
let row = snap.get(childKey)
if (!row) {
row = {
value: changes.value,
orderByIndex: changes.orderByIndex,
count: 0,
}
snap.set(childKey, row)
}
row.count += changes.inserts - changes.deletes
if (changes.inserts > 0) {
row.value = changes.value
if (changes.orderByIndex !== undefined) {
row.orderByIndex = changes.orderByIndex
}
Comment on lines +1387 to +1399

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🎯 Functional Correctness | 🟠 Major | ⚡ Quick win

Clone snapshot values before storing or replaying them.

row.value currently aliases changes.value; later cleanup deletes INCLUDES_ROUTING from changes.value at Line 1984. A late replay can then seed a row without routing metadata, preventing deeper nested routing from being rebuilt via Lines 1544-1547.

🐛 Proposed fix
+function cloneSnapshotValue<T>(value: T): T {
+  if (value == null || typeof value !== `object`) return value
+  if (Array.isArray(value)) return [...value] as T
+  return { ...(value as Record<PropertyKey, unknown>) } as T
+}
+
 function accumulateSnapshot(
   setup: NestedIncludesSetup,
   nestedCorrelationKey: unknown,
   childChanges: Map<unknown, Changes<any>>,
@@
       row = {
-        value: changes.value,
+        value: cloneSnapshotValue(changes.value),
         orderByIndex: changes.orderByIndex,
         count: 0,
       }
@@
     if (changes.inserts > 0) {
-      row.value = changes.value
+      row.value = cloneSnapshotValue(changes.value)
       if (changes.orderByIndex !== undefined) {
         row.orderByIndex = changes.orderByIndex
       }
@@
     byChild.set(childKey, {
       deletes: 0,
       inserts: row.count,
-      value: row.value,
+      value: cloneSnapshotValue(row.value),
       orderByIndex: row.orderByIndex,
     })

Also applies to: 1436-1443

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@packages/db/src/query/live/collection-config-builder.ts` around lines 1387 -
1399, Clone snapshot values before assigning them into row state in
collection-config-builder’s snapshot/replay logic, because row.value is
currently sharing the same object as changes.value and later mutation of
changes.value can strip routing metadata. Update the row initialization and
replay paths in the affected update/merge flow (including the shared logic
around the count/orderByIndex handling) to store a cloned copy instead of the
original object, so later cleanup cannot affect previously buffered rows and
nested routing can be rebuilt correctly.

}
if (row.count <= 0) {
snap.delete(childKey)
}
}

if (snap.size === 0) {
setup.snapshot.delete(nestedCorrelationKey)
}
}

/**
* Seeds a parent group's per-entry state with the rows already materialized for
* a nested correlation key. Used when a parent group starts referencing a key
* whose rows were drained (and cleared from the buffer) in an earlier flush, so
* the pipeline will not re-emit them.
*/
function seedParentFromSnapshot(
state: IncludesOutputState,
setupIndex: number,
parentCorrelationKey: unknown,
nestedCorrelationKey: unknown,
): void {
const setup = state.nestedSetups![setupIndex]!
const snap = setup.snapshot.get(nestedCorrelationKey)
if (!snap || snap.size === 0) return

const entry = state.childRegistry.get(parentCorrelationKey)
if (!entry || !entry.includesStates) return

const entryState = entry.includesStates[setupIndex]!
let byChild = entryState.pendingChildChanges.get(nestedCorrelationKey)
if (!byChild) {
byChild = new Map()
entryState.pendingChildChanges.set(nestedCorrelationKey, byChild)
}
for (const [childKey, row] of snap) {
if (byChild.has(childKey)) continue
byChild.set(childKey, {
deletes: 0,
inserts: row.count,
value: row.value,
orderByIndex: row.orderByIndex,
})
}
}

/**
* Drains shared buffers into per-entry states using the routing index.
* Returns the set of parent correlation keys that had changes routed to them.
Expand All @@ -1356,43 +1458,60 @@ function drainNestedBuffers(state: IncludesOutputState): Set<unknown> {
const toDelete: Array<unknown> = []

for (const [nestedCorrelationKey, childChanges] of setup.buffer) {
const parentCorrelationKey =
const parentCorrelationKeys =
state.nestedRoutingIndex!.get(nestedCorrelationKey)
if (parentCorrelationKey === undefined) {
if (
parentCorrelationKeys === undefined ||
parentCorrelationKeys.size === 0
) {
// Unroutable — parent not yet seen; keep in buffer
continue
}

const entry = state.childRegistry.get(parentCorrelationKey)
if (!entry || !entry.includesStates) {
continue
}

// Route changes into this entry's per-entry state at position i
const entryState = entry.includesStates[i]!
for (const [childKey, changes] of childChanges) {
let byChild = entryState.pendingChildChanges.get(nestedCorrelationKey)
if (!byChild) {
byChild = new Map()
entryState.pendingChildChanges.set(nestedCorrelationKey, byChild)
// A single nested correlation key can map to multiple parent groups when
// sibling parents share the same correlation value. Fan the buffered
// changes out to each ready parent group; only drop the buffer entry once
// it has been routed to at least one parent.
let routedToAny = false
for (const parentCorrelationKey of parentCorrelationKeys) {
const entry = state.childRegistry.get(parentCorrelationKey)
if (!entry || !entry.includesStates) {
continue
}
const existing = byChild.get(childKey)
if (existing) {
existing.inserts += changes.inserts
existing.deletes += changes.deletes
if (changes.inserts > 0) {
existing.value = changes.value
if (changes.orderByIndex !== undefined) {
existing.orderByIndex = changes.orderByIndex

// Route changes into this entry's per-entry state at position i
const entryState = entry.includesStates[i]!
for (const [childKey, changes] of childChanges) {
let byChild = entryState.pendingChildChanges.get(nestedCorrelationKey)
if (!byChild) {
byChild = new Map()
entryState.pendingChildChanges.set(nestedCorrelationKey, byChild)
}
const existing = byChild.get(childKey)
if (existing) {
existing.inserts += changes.inserts
existing.deletes += changes.deletes
if (changes.inserts > 0) {
existing.value = changes.value
if (changes.orderByIndex !== undefined) {
existing.orderByIndex = changes.orderByIndex
}
}
} else {
byChild.set(childKey, { ...changes })
}
} else {
byChild.set(childKey, { ...changes })
}

dirtyCorrelationKeys.add(parentCorrelationKey)
routedToAny = true
}

dirtyCorrelationKeys.add(parentCorrelationKey)
toDelete.push(nestedCorrelationKey)
if (routedToAny) {
// Fold the drained delta into the cumulative snapshot so a parent group
// that starts referencing this nested key later can be seeded with it.
accumulateSnapshot(setup, nestedCorrelationKey, childChanges)
toDelete.push(nestedCorrelationKey)
}
}

for (const key of toDelete) {
Expand All @@ -1415,7 +1534,8 @@ function updateRoutingIndex(
): void {
if (!state.nestedSetups) return

for (const setup of state.nestedSetups) {
for (let i = 0; i < state.nestedSetups.length; i++) {
const setup = state.nestedSetups[i]!
for (const [, change] of childChanges) {
if (change.inserts > 0) {
// Read the nested routing key from the INCLUDES_ROUTING stamp.
Expand All @@ -1431,13 +1551,28 @@ function updateRoutingIndex(
)

if (nestedCorrelationKey != null) {
state.nestedRoutingIndex!.set(nestedRoutingKey, correlationKey)
let parents = state.nestedRoutingIndex!.get(nestedRoutingKey)
if (!parents) {
parents = new Set()
state.nestedRoutingIndex!.set(nestedRoutingKey, parents)
}
const isNewParent = !parents.has(correlationKey)
parents.add(correlationKey)
let reverseSet = state.nestedRoutingReverseIndex!.get(correlationKey)
if (!reverseSet) {
reverseSet = new Set()
state.nestedRoutingReverseIndex!.set(correlationKey, reverseSet)
}
reverseSet.add(nestedRoutingKey)

// If this parent group is newly associated with a nested key whose
// rows were already drained (and cleared from the buffer) in an
// earlier flush, the pipeline will not re-emit them. Seed this parent
// from the cumulative snapshot so it receives the same rows its
// siblings already have.
if (isNewParent) {
seedParentFromSnapshot(state, i, correlationKey, nestedRoutingKey)
}
}
} else if (change.deletes > 0 && change.inserts === 0) {
// Remove from routing index
Expand All @@ -1451,7 +1586,13 @@ function updateRoutingIndex(
)

if (nestedCorrelationKey != null) {
state.nestedRoutingIndex!.delete(nestedRoutingKey)
const parents = state.nestedRoutingIndex!.get(nestedRoutingKey)
if (parents) {
parents.delete(correlationKey)
if (parents.size === 0) {
state.nestedRoutingIndex!.delete(nestedRoutingKey)
}
}
const reverseSet =
state.nestedRoutingReverseIndex!.get(correlationKey)
if (reverseSet) {
Expand Down Expand Up @@ -1479,7 +1620,15 @@ function cleanRoutingIndexOnDelete(
const nestedKeys = state.nestedRoutingReverseIndex.get(correlationKey)
if (nestedKeys) {
for (const nestedKey of nestedKeys) {
state.nestedRoutingIndex!.delete(nestedKey)
// Remove only this parent from the nested key's parent set; other
// sibling parents may still reference the same nested correlation key.
const parents = state.nestedRoutingIndex!.get(nestedKey)
if (parents) {
parents.delete(correlationKey)
if (parents.size === 0) {
state.nestedRoutingIndex!.delete(nestedKey)
}
}
}
state.nestedRoutingReverseIndex.delete(correlationKey)
}
Expand Down
Loading
Loading