diff --git a/packages/trailbase-db-collection/src/trailbase.ts b/packages/trailbase-db-collection/src/trailbase.ts index b47728d86..8f05716c6 100644 --- a/packages/trailbase-db-collection/src/trailbase.ts +++ b/packages/trailbase-db-collection/src/trailbase.ts @@ -140,6 +140,12 @@ export function trailBaseCollectionOptions< const internalSyncMode = config.syncMode ?? `eager` let fullSyncCompleted = false + // Tracks whether subscribe("*") succeeded. Set to true inside start() when SSE + // connects. When false (e.g. 403 due to row-level access rules that can't be + // evaluated at wildcard subscription time), onInsert/onUpdate/onDelete skip + // awaitIds() — which relies on SSE events to populate seenIds — to avoid a + // 120 s timeout that would roll back every optimistic mutation. + let sseAvailable = false const awaitIds = ( ids: Array, @@ -294,12 +300,25 @@ export function trailBaseCollectionOptions< } async function start() { - const eventStream = await config.recordApi.subscribe(`*`) - const reader = (eventReader = eventStream.getReader()) - - // Start listening for subscriptions first. Otherwise, we'd risk a gap - // between the initial fetch and starting to listen. - listen(reader) + // Attempt to subscribe to live updates. Some TrailBase configurations + // deny wildcard subscriptions (403) when table access rules use + // row-level predicates (_ROW_.*) that can't be evaluated without a + // concrete record. In that case we fall back to polling only. + let liveUpdatesAvailable = false + try { + const eventStream = await config.recordApi.subscribe(`*`) + const reader = (eventReader = eventStream.getReader()) + + // Start listening for subscriptions first. Otherwise, we'd risk a gap + // between the initial fetch and starting to listen. + listen(reader) + liveUpdatesAvailable = true + sseAvailable = true + } catch { + console.debug( + `[trailbase] subscribe/* unavailable — falling back to polling only`, + ) + } try { // Eager mode: perform initial fetch to populate everything @@ -309,7 +328,7 @@ export function trailBaseCollectionOptions< fullSyncCompleted = true } } catch (e) { - cancelEventReader() + if (liveUpdatesAvailable) cancelEventReader() throw e } finally { // Mark ready both if everything went well or if there's an error to @@ -317,6 +336,8 @@ export function trailBaseCollectionOptions< markReady() } + if (!liveUpdatesAvailable) return + // Lastly, start a periodic cleanup task that will be removed when the // reader closes. const periodicCleanupTask = setInterval(() => { @@ -337,7 +358,7 @@ export function trailBaseCollectionOptions< }) }, 120 * 1000) - reader.closed.finally(() => clearInterval(periodicCleanupTask)) + eventReader!.closed.finally(() => clearInterval(periodicCleanupTask)) } start() @@ -380,10 +401,13 @@ export function trailBaseCollectionOptions< }), ) - // The optimistic mutation overlay is removed on return, so at this point - // we have to ensure that the new record was properly added to the local - // DB by the subscription. - await awaitIds(ids.map((id) => String(id))) + // When SSE is available: wait for the subscription event confirming the + // server has persisted the record before removing the optimistic overlay. + // When SSE is unavailable (polling-only mode): skip awaitIds — seenIds is + // never populated without SSE events, so waiting would time out after 120 s + // and roll back the optimistic insert. The collection will reflect server + // state after the next polling cycle. + if (sseAvailable) await awaitIds(ids.map((id) => String(id))) return ids }, @@ -401,10 +425,7 @@ export function trailBaseCollectionOptions< }), ) - // The optimistic mutation overlay is removed on return, so at this point - // we have to ensure that the new record was properly updated in the local - // DB by the subscription. - await awaitIds(ids) + if (sseAvailable) await awaitIds(ids) }, onDelete: async (params: DeleteMutationFnParams) => { const ids: Array = await Promise.all( @@ -419,10 +440,7 @@ export function trailBaseCollectionOptions< }), ) - // The optimistic mutation overlay is removed on return, so at this point - // we have to ensure that the new record was properly updated in the local - // DB by the subscription. - await awaitIds(ids) + if (sseAvailable) await awaitIds(ids) }, utils: { cancel: cancelEventReader,