diff --git a/.changeset/nested-toarray-shared-buffer-overlap.md b/.changeset/nested-toarray-shared-buffer-overlap.md new file mode 100644 index 0000000000..17571d8b27 --- /dev/null +++ b/.changeset/nested-toarray-shared-buffer-overlap.md @@ -0,0 +1,9 @@ +--- +'@tanstack/db': patch +--- + +fix(db): nested `toArray` includes dropping children when sibling parent groups share a correlation key + +With three (or more) levels of nested `toArray` includes, when two children in different parent groups shared the same deepest correlation key, only one of them received the nested rows and the other came back as an empty array. The nested-pipeline routing index mapped each nested correlation key to a single parent group and the shared buffer entry was deleted after routing to the first match, so sibling groups sharing the key were dropped. + +The routing index now maps a nested correlation key to all parent groups that reference it and fans buffered grandchild changes out to each. A per-level snapshot of already-materialized rows also seeds parent groups that start referencing an existing correlation key after the rows were drained (e.g. inserted after the initial load), since the pipeline does not re-emit them. diff --git a/packages/db/src/query/live/collection-config-builder.ts b/packages/db/src/query/live/collection-config-builder.ts index b40e6e431a..a8dbe8d113 100644 --- a/packages/db/src/query/live/collection-config-builder.ts +++ b/packages/db/src/query/live/collection-config-builder.ts @@ -1150,6 +1150,13 @@ function createOrderByComparator( } } +type SnapshotRow = { + value: any + orderByIndex: string | undefined + /** Net multiplicity (inserts − deletes) currently materialized for this row */ + count: number +} + /** * Shared buffer setup for a single nested includes level. * Pipeline output writes into the buffer; during flush the buffer is drained @@ -1159,6 +1166,15 @@ type NestedIncludesSetup = { compilationResult: IncludesCompilationResult /** Shared buffer: nestedCorrelationKey → Map */ buffer: Map>> + /** + * Cumulative net-present grandchild rows per nested correlation key. The + * buffer holds only deltas since the last drain and is cleared once drained, + * so a parent group that starts referencing an existing correlation key + * *after* the rows were already drained (the pipeline does not re-emit them) + * would otherwise see nothing. The snapshot lets such late-arriving parent + * groups be seeded with the rows their siblings already received. + */ + snapshot: Map> /** For 3+ levels of nesting */ nestedSetups?: Array } @@ -1184,8 +1200,14 @@ type IncludesOutputState = { correlationToParentKeys: Map> /** Shared nested pipeline setups (one per nested includes level) */ nestedSetups?: Array - /** nestedCorrelationKey → parentCorrelationKey */ - nestedRoutingIndex?: Map + /** + * nestedCorrelationKey → Set. + * One nested correlation key can map to multiple parent groups when sibling + * parents share the same correlation value (e.g. two price ranges that + * reference the same region), so buffered grandchild changes must fan out to + * every parent group rather than a single one. + */ + nestedRoutingIndex?: Map> /** parentCorrelationKey → Set */ nestedRoutingReverseIndex?: Map> } @@ -1298,6 +1320,7 @@ function setupNestedPipelines( const setup: NestedIncludesSetup = { compilationResult: entry, buffer, + snapshot: new Map(), } // Recursively set up deeper levels @@ -1342,6 +1365,85 @@ function createPerEntryIncludesStates( }) } +/** + * Folds a drained delta into a nested setup's cumulative snapshot, tracking the + * net multiplicity per child row and dropping rows (and empty keys) once their + * net count reaches zero. + */ +function accumulateSnapshot( + setup: NestedIncludesSetup, + nestedCorrelationKey: unknown, + childChanges: Map>, +): void { + let snap = setup.snapshot.get(nestedCorrelationKey) + if (!snap) { + snap = new Map() + setup.snapshot.set(nestedCorrelationKey, snap) + } + + for (const [childKey, changes] of childChanges) { + let row = snap.get(childKey) + if (!row) { + row = { + value: changes.value, + orderByIndex: changes.orderByIndex, + count: 0, + } + snap.set(childKey, row) + } + row.count += changes.inserts - changes.deletes + if (changes.inserts > 0) { + row.value = changes.value + if (changes.orderByIndex !== undefined) { + row.orderByIndex = changes.orderByIndex + } + } + if (row.count <= 0) { + snap.delete(childKey) + } + } + + if (snap.size === 0) { + setup.snapshot.delete(nestedCorrelationKey) + } +} + +/** + * Seeds a parent group's per-entry state with the rows already materialized for + * a nested correlation key. Used when a parent group starts referencing a key + * whose rows were drained (and cleared from the buffer) in an earlier flush, so + * the pipeline will not re-emit them. + */ +function seedParentFromSnapshot( + state: IncludesOutputState, + setupIndex: number, + parentCorrelationKey: unknown, + nestedCorrelationKey: unknown, +): void { + const setup = state.nestedSetups![setupIndex]! + const snap = setup.snapshot.get(nestedCorrelationKey) + if (!snap || snap.size === 0) return + + const entry = state.childRegistry.get(parentCorrelationKey) + if (!entry || !entry.includesStates) return + + const entryState = entry.includesStates[setupIndex]! + let byChild = entryState.pendingChildChanges.get(nestedCorrelationKey) + if (!byChild) { + byChild = new Map() + entryState.pendingChildChanges.set(nestedCorrelationKey, byChild) + } + for (const [childKey, row] of snap) { + if (byChild.has(childKey)) continue + byChild.set(childKey, { + deletes: 0, + inserts: row.count, + value: row.value, + orderByIndex: row.orderByIndex, + }) + } +} + /** * Drains shared buffers into per-entry states using the routing index. * Returns the set of parent correlation keys that had changes routed to them. @@ -1356,43 +1458,60 @@ function drainNestedBuffers(state: IncludesOutputState): Set { const toDelete: Array = [] for (const [nestedCorrelationKey, childChanges] of setup.buffer) { - const parentCorrelationKey = + const parentCorrelationKeys = state.nestedRoutingIndex!.get(nestedCorrelationKey) - if (parentCorrelationKey === undefined) { + if ( + parentCorrelationKeys === undefined || + parentCorrelationKeys.size === 0 + ) { // Unroutable — parent not yet seen; keep in buffer continue } - const entry = state.childRegistry.get(parentCorrelationKey) - if (!entry || !entry.includesStates) { - continue - } - - // Route changes into this entry's per-entry state at position i - const entryState = entry.includesStates[i]! - for (const [childKey, changes] of childChanges) { - let byChild = entryState.pendingChildChanges.get(nestedCorrelationKey) - if (!byChild) { - byChild = new Map() - entryState.pendingChildChanges.set(nestedCorrelationKey, byChild) + // A single nested correlation key can map to multiple parent groups when + // sibling parents share the same correlation value. Fan the buffered + // changes out to each ready parent group; only drop the buffer entry once + // it has been routed to at least one parent. + let routedToAny = false + for (const parentCorrelationKey of parentCorrelationKeys) { + const entry = state.childRegistry.get(parentCorrelationKey) + if (!entry || !entry.includesStates) { + continue } - const existing = byChild.get(childKey) - if (existing) { - existing.inserts += changes.inserts - existing.deletes += changes.deletes - if (changes.inserts > 0) { - existing.value = changes.value - if (changes.orderByIndex !== undefined) { - existing.orderByIndex = changes.orderByIndex + + // Route changes into this entry's per-entry state at position i + const entryState = entry.includesStates[i]! + for (const [childKey, changes] of childChanges) { + let byChild = entryState.pendingChildChanges.get(nestedCorrelationKey) + if (!byChild) { + byChild = new Map() + entryState.pendingChildChanges.set(nestedCorrelationKey, byChild) + } + const existing = byChild.get(childKey) + if (existing) { + existing.inserts += changes.inserts + existing.deletes += changes.deletes + if (changes.inserts > 0) { + existing.value = changes.value + if (changes.orderByIndex !== undefined) { + existing.orderByIndex = changes.orderByIndex + } } + } else { + byChild.set(childKey, { ...changes }) } - } else { - byChild.set(childKey, { ...changes }) } + + dirtyCorrelationKeys.add(parentCorrelationKey) + routedToAny = true } - dirtyCorrelationKeys.add(parentCorrelationKey) - toDelete.push(nestedCorrelationKey) + if (routedToAny) { + // Fold the drained delta into the cumulative snapshot so a parent group + // that starts referencing this nested key later can be seeded with it. + accumulateSnapshot(setup, nestedCorrelationKey, childChanges) + toDelete.push(nestedCorrelationKey) + } } for (const key of toDelete) { @@ -1415,7 +1534,8 @@ function updateRoutingIndex( ): void { if (!state.nestedSetups) return - for (const setup of state.nestedSetups) { + for (let i = 0; i < state.nestedSetups.length; i++) { + const setup = state.nestedSetups[i]! for (const [, change] of childChanges) { if (change.inserts > 0) { // Read the nested routing key from the INCLUDES_ROUTING stamp. @@ -1431,13 +1551,28 @@ function updateRoutingIndex( ) if (nestedCorrelationKey != null) { - state.nestedRoutingIndex!.set(nestedRoutingKey, correlationKey) + let parents = state.nestedRoutingIndex!.get(nestedRoutingKey) + if (!parents) { + parents = new Set() + state.nestedRoutingIndex!.set(nestedRoutingKey, parents) + } + const isNewParent = !parents.has(correlationKey) + parents.add(correlationKey) let reverseSet = state.nestedRoutingReverseIndex!.get(correlationKey) if (!reverseSet) { reverseSet = new Set() state.nestedRoutingReverseIndex!.set(correlationKey, reverseSet) } reverseSet.add(nestedRoutingKey) + + // If this parent group is newly associated with a nested key whose + // rows were already drained (and cleared from the buffer) in an + // earlier flush, the pipeline will not re-emit them. Seed this parent + // from the cumulative snapshot so it receives the same rows its + // siblings already have. + if (isNewParent) { + seedParentFromSnapshot(state, i, correlationKey, nestedRoutingKey) + } } } else if (change.deletes > 0 && change.inserts === 0) { // Remove from routing index @@ -1451,7 +1586,13 @@ function updateRoutingIndex( ) if (nestedCorrelationKey != null) { - state.nestedRoutingIndex!.delete(nestedRoutingKey) + const parents = state.nestedRoutingIndex!.get(nestedRoutingKey) + if (parents) { + parents.delete(correlationKey) + if (parents.size === 0) { + state.nestedRoutingIndex!.delete(nestedRoutingKey) + } + } const reverseSet = state.nestedRoutingReverseIndex!.get(correlationKey) if (reverseSet) { @@ -1479,7 +1620,15 @@ function cleanRoutingIndexOnDelete( const nestedKeys = state.nestedRoutingReverseIndex.get(correlationKey) if (nestedKeys) { for (const nestedKey of nestedKeys) { - state.nestedRoutingIndex!.delete(nestedKey) + // Remove only this parent from the nested key's parent set; other + // sibling parents may still reference the same nested correlation key. + const parents = state.nestedRoutingIndex!.get(nestedKey) + if (parents) { + parents.delete(correlationKey) + if (parents.size === 0) { + state.nestedRoutingIndex!.delete(nestedKey) + } + } } state.nestedRoutingReverseIndex.delete(correlationKey) } diff --git a/packages/db/tests/query/includes.test.ts b/packages/db/tests/query/includes.test.ts index 0124ebbeac..8e73fa48c2 100644 --- a/packages/db/tests/query/includes.test.ts +++ b/packages/db/tests/query/includes.test.ts @@ -4902,6 +4902,455 @@ describe(`includes subqueries`, () => { expect(data().runs[0].texts).toBe(run1TextsBefore) }) + + // Three collection levels (products -> priceRanges -> region). When two + // price ranges in different parent groups point at the same deepest + // correlation key (regionId 1, one under each product), each must still + // resolve its own copy of the nested `region` array. + it(`resolves nested grandchildren for sibling groups sharing a correlation key`, async () => { + type Product = { id: number; title: string } + type PriceRange = { id: number; productId: number; regionId: number } + type Region = { id: number; name: string } + + const products = createCollection( + localOnlyCollectionOptions({ + id: `shared-corr-products`, + getKey: (p) => p.id, + initialData: [ + { id: 1, title: `T-Shirt` }, + { id: 2, title: `Hoodie` }, + ], + }), + ) + + const priceRanges = createCollection( + localOnlyCollectionOptions({ + id: `shared-corr-price-ranges`, + getKey: (r) => r.id, + initialData: [ + { id: 1, productId: 1, regionId: 1 }, + { id: 2, productId: 1, regionId: 2 }, + { id: 3, productId: 2, regionId: 1 }, // same regionId as priceRange 1 + ], + }), + ) + + const regions = createCollection( + localOnlyCollectionOptions({ + id: `shared-corr-regions`, + getKey: (r) => r.id, + initialData: [ + { id: 1, name: `Europe` }, + { id: 2, name: `North America` }, + ], + }), + ) + + await Promise.all([ + products.preload(), + priceRanges.preload(), + regions.preload(), + ]) + + const collection = createLiveQueryCollection({ + id: `shared-corr-live`, + query: (q) => + q.from({ p: products }).select(({ p }) => ({ + id: p.id, + title: p.title, + priceRanges: toArray( + q + .from({ pr: priceRanges }) + .where(({ pr }) => eq(pr.productId, p.id)) + .select(({ pr }) => ({ + id: pr.id, + regionId: pr.regionId, + region: toArray( + q + .from({ r: regions }) + .where(({ r }) => eq(r.id, pr.regionId)) + .select(({ r }) => ({ id: r.id, name: r.name })), + ), + })), + ), + })), + }) + + await collection.preload() + + expect(toTree(collection)).toEqual([ + { + id: 1, + title: `T-Shirt`, + priceRanges: [ + { + id: 1, + regionId: 1, + region: [{ id: 1, name: `Europe` }], + }, + { + id: 2, + regionId: 2, + region: [{ id: 2, name: `North America` }], + }, + ], + }, + { + id: 2, + title: `Hoodie`, + priceRanges: [ + { + id: 3, + regionId: 1, + region: [{ id: 1, name: `Europe` }], + }, + ], + }, + ]) + }) + + // When a second parent group starts referencing a deepest correlation key + // that another group already resolved (the sibling price range is inserted + // after the initial load), the newly inserted group must also receive the + // nested grandchildren. + it(`fans nested grandchildren out to a sibling group inserted after load`, async () => { + type Product = { id: number; title: string } + type PriceRange = { id: number; productId: number; regionId: number } + type Region = { id: number; name: string } + + const products = createCollection( + localOnlyCollectionOptions({ + id: `shared-corr-incremental-products`, + getKey: (p) => p.id, + initialData: [ + { id: 1, title: `T-Shirt` }, + { id: 2, title: `Hoodie` }, + ], + }), + ) + const priceRanges = createCollection( + localOnlyCollectionOptions({ + id: `shared-corr-incremental-price-ranges`, + getKey: (r) => r.id, + initialData: [{ id: 1, productId: 1, regionId: 1 }], + }), + ) + const regions = createCollection( + localOnlyCollectionOptions({ + id: `shared-corr-incremental-regions`, + getKey: (r) => r.id, + initialData: [{ id: 1, name: `Europe` }], + }), + ) + + await Promise.all([ + products.preload(), + priceRanges.preload(), + regions.preload(), + ]) + + const collection = createLiveQueryCollection({ + id: `shared-corr-incremental-live`, + query: (q) => + q.from({ p: products }).select(({ p }) => ({ + id: p.id, + title: p.title, + priceRanges: toArray( + q + .from({ pr: priceRanges }) + .where(({ pr }) => eq(pr.productId, p.id)) + .select(({ pr }) => ({ + id: pr.id, + regionId: pr.regionId, + region: toArray( + q + .from({ r: regions }) + .where(({ r }) => eq(r.id, pr.regionId)) + .select(({ r }) => ({ id: r.id, name: r.name })), + ), + })), + ), + })), + }) + await collection.preload() + + // Insert a second price range under a different product, sharing regionId 1. + priceRanges.insert({ id: 3, productId: 2, regionId: 1 }) + await new Promise((r) => setTimeout(r, 50)) + + const tree = toTree(collection) + const tshirt = tree.find((p: any) => p.title === `T-Shirt`) + const hoodie = tree.find((p: any) => p.title === `Hoodie`) + expect(tshirt.priceRanges.find((pr: any) => pr.id === 1).region).toEqual([ + { id: 1, name: `Europe` }, + ]) + expect(hoodie.priceRanges.find((pr: any) => pr.id === 3).region).toEqual([ + { id: 1, name: `Europe` }, + ]) + }) + + // When two parent groups share a deepest correlation key and one of them is + // deleted, the surviving group must keep its nested grandchildren. + it(`keeps grandchildren on the surviving sibling after the other is deleted`, async () => { + type Product = { id: number; title: string } + type PriceRange = { id: number; productId: number; regionId: number } + type Region = { id: number; name: string } + + const products = createCollection( + localOnlyCollectionOptions({ + id: `shared-corr-delete-products`, + getKey: (p) => p.id, + initialData: [ + { id: 1, title: `T-Shirt` }, + { id: 2, title: `Hoodie` }, + ], + }), + ) + const priceRanges = createCollection( + localOnlyCollectionOptions({ + id: `shared-corr-delete-price-ranges`, + getKey: (r) => r.id, + initialData: [ + { id: 1, productId: 1, regionId: 1 }, + { id: 3, productId: 2, regionId: 1 }, + ], + }), + ) + const regions = createCollection( + localOnlyCollectionOptions({ + id: `shared-corr-delete-regions`, + getKey: (r) => r.id, + initialData: [{ id: 1, name: `Europe` }], + }), + ) + + await Promise.all([ + products.preload(), + priceRanges.preload(), + regions.preload(), + ]) + + const collection = createLiveQueryCollection({ + id: `shared-corr-delete-live`, + query: (q) => + q.from({ p: products }).select(({ p }) => ({ + id: p.id, + title: p.title, + priceRanges: toArray( + q + .from({ pr: priceRanges }) + .where(({ pr }) => eq(pr.productId, p.id)) + .select(({ pr }) => ({ + id: pr.id, + regionId: pr.regionId, + region: toArray( + q + .from({ r: regions }) + .where(({ r }) => eq(r.id, pr.regionId)) + .select(({ r }) => ({ id: r.id, name: r.name })), + ), + })), + ), + })), + }) + await collection.preload() + + // Delete the Hoodie's price range (the sibling sharing regionId 1). + priceRanges.delete(3) + await new Promise((r) => setTimeout(r, 50)) + + const tree = toTree(collection) + const tshirt = tree.find((p: any) => p.title === `T-Shirt`) + const hoodie = tree.find((p: any) => p.title === `Hoodie`) + expect(tshirt.priceRanges.find((pr: any) => pr.id === 1).region).toEqual([ + { id: 1, name: `Europe` }, + ]) + expect(hoodie.priceRanges).toEqual([]) + }) + + // The shared-correlation-key routing is independent of how each level is + // materialized, so the same guarantee must hold when the nested levels are + // left as live Collections (no toArray/materialize wrapper). + it(`resolves nested grandchildren for sibling groups when levels stay Collections`, async () => { + type Product = { id: number; title: string } + type PriceRange = { id: number; productId: number; regionId: number } + type Region = { id: number; name: string } + + const products = createCollection( + localOnlyCollectionOptions({ + id: `shared-corr-collection-products`, + getKey: (p) => p.id, + initialData: [ + { id: 1, title: `T-Shirt` }, + { id: 2, title: `Hoodie` }, + ], + }), + ) + const priceRanges = createCollection( + localOnlyCollectionOptions({ + id: `shared-corr-collection-price-ranges`, + getKey: (r) => r.id, + initialData: [ + { id: 1, productId: 1, regionId: 1 }, + { id: 2, productId: 1, regionId: 2 }, + { id: 3, productId: 2, regionId: 1 }, + ], + }), + ) + const regions = createCollection( + localOnlyCollectionOptions({ + id: `shared-corr-collection-regions`, + getKey: (r) => r.id, + initialData: [ + { id: 1, name: `Europe` }, + { id: 2, name: `North America` }, + ], + }), + ) + + await Promise.all([ + products.preload(), + priceRanges.preload(), + regions.preload(), + ]) + + const collection = createLiveQueryCollection({ + id: `shared-corr-collection-live`, + query: (q) => + q.from({ p: products }).select(({ p }) => ({ + id: p.id, + title: p.title, + priceRanges: q + .from({ pr: priceRanges }) + .where(({ pr }) => eq(pr.productId, p.id)) + .select(({ pr }) => ({ + id: pr.id, + regionId: pr.regionId, + region: q + .from({ r: regions }) + .where(({ r }) => eq(r.id, pr.regionId)) + .select(({ r }) => ({ id: r.id, name: r.name })), + })), + })), + }) + await collection.preload() + + // toTree recursively unwraps the nested live Collections into arrays. + expect(toTree(collection)).toEqual([ + { + id: 1, + title: `T-Shirt`, + priceRanges: [ + { id: 1, regionId: 1, region: [{ id: 1, name: `Europe` }] }, + { + id: 2, + regionId: 2, + region: [{ id: 2, name: `North America` }], + }, + ], + }, + { + id: 2, + title: `Hoodie`, + priceRanges: [ + { id: 3, regionId: 1, region: [{ id: 1, name: `Europe` }] }, + ], + }, + ]) + }) + + // Same guarantee for materialize(), which produces array/singleton + // snapshots through the same nested-includes routing. + it(`resolves nested grandchildren for sibling groups with materialize()`, async () => { + type Product = { id: number; title: string } + type PriceRange = { id: number; productId: number; regionId: number } + type Region = { id: number; name: string } + + const products = createCollection( + localOnlyCollectionOptions({ + id: `shared-corr-materialize-products`, + getKey: (p) => p.id, + initialData: [ + { id: 1, title: `T-Shirt` }, + { id: 2, title: `Hoodie` }, + ], + }), + ) + const priceRanges = createCollection( + localOnlyCollectionOptions({ + id: `shared-corr-materialize-price-ranges`, + getKey: (r) => r.id, + initialData: [ + { id: 1, productId: 1, regionId: 1 }, + { id: 2, productId: 1, regionId: 2 }, + { id: 3, productId: 2, regionId: 1 }, + ], + }), + ) + const regions = createCollection( + localOnlyCollectionOptions({ + id: `shared-corr-materialize-regions`, + getKey: (r) => r.id, + initialData: [ + { id: 1, name: `Europe` }, + { id: 2, name: `North America` }, + ], + }), + ) + + await Promise.all([ + products.preload(), + priceRanges.preload(), + regions.preload(), + ]) + + const collection = createLiveQueryCollection({ + id: `shared-corr-materialize-live`, + query: (q) => + q.from({ p: products }).select(({ p }) => ({ + id: p.id, + title: p.title, + priceRanges: materialize( + q + .from({ pr: priceRanges }) + .where(({ pr }) => eq(pr.productId, p.id)) + .select(({ pr }) => ({ + id: pr.id, + regionId: pr.regionId, + region: materialize( + q + .from({ r: regions }) + .where(({ r }) => eq(r.id, pr.regionId)) + .select(({ r }) => ({ id: r.id, name: r.name })), + ), + })), + ), + })), + }) + await collection.preload() + + expect(toTree(collection)).toEqual([ + { + id: 1, + title: `T-Shirt`, + priceRanges: [ + { id: 1, regionId: 1, region: [{ id: 1, name: `Europe` }] }, + { + id: 2, + regionId: 2, + region: [{ id: 2, name: `North America` }], + }, + ], + }, + { + id: 2, + title: `Hoodie`, + priceRanges: [ + { id: 3, regionId: 1, region: [{ id: 1, name: `Europe` }] }, + ], + }, + ]) + }) }) describe(`many sibling toArray includes with chained derived collections`, () => {