Skip to content

feat: BigQuery Data Warehouse Connector — Phase 1 (Schema, Types, Validation)#391

Open
Shrotriya-lalit wants to merge 9 commits into
Openpanel-dev:mainfrom
Shrotriya-lalit:feat/bigquery-connector-phase1
Open

feat: BigQuery Data Warehouse Connector — Phase 1 (Schema, Types, Validation)#391
Shrotriya-lalit wants to merge 9 commits into
Openpanel-dev:mainfrom
Shrotriya-lalit:feat/bigquery-connector-phase1

Conversation

@Shrotriya-lalit

@Shrotriya-lalit Shrotriya-lalit commented Jun 8, 2026

Copy link
Copy Markdown

BigQuery Data Warehouse Connector — Phase 1: Schema, Types, Validation

Adds the foundational schema for a multi-provider Data Warehouse Connector — BigQuery first, with Snowflake, Redshift, Databricks, and Postgres ready to be added without any new tables.


What's in this PR

Three shared Prisma tables

Table Purpose
warehouse_connections One row per named connection (any provider)
warehouse_syncs One sync job per connection — source table, column mapping, schedule
warehouse_sync_runs One run record per execution — row count, bytes, status, errors

Enums

  • WarehouseType: bigquery, snowflake, redshift, databricks, postgres
  • WarehouseSyncMode: append, full, onetime
  • WarehouseSyncSchedule: hourly, daily, weekly
  • WarehouseSyncRunStatus: pending, running, completed, failed
  • WarehouseSyncMappingType: events, profiles

Key design decisions

Security

  • configEncrypted (AES-256-GCM) stores provider credentials — same encrypt()/decrypt() as GSC connector
  • displayIdentifier + displayEmail extracted at creation time as plain text — connection list never requires decryption
  • Composite FK warehouse_syncs(projectId, connectionId) → warehouse_connections(projectId, id) — blocks cross-tenant exploit at DB level (project A cannot reference project B's connection)
  • name_nonempty_check constraint — DB-level guard against empty connection names

Sync modes

  • append — cursor-based incremental via insertTime column; cursorOverlapMinutes (default 10) rewinds cursor to catch late-arriving rows; dedup handles re-ingested events
  • full — full reload every run + stale event cleanup via __op_sync_id stamp (handles BQ row deletions and updates)
  • onetime — historical backfill, runs once then disables automatically

Operational fields

  • cursorOverlapMinutes Int @default(10) — late-arriving row safety window for append mode
  • syncDelayMinutes Int @default(0) — delay after cron fires before executing (allows upstream BQ pipelines to land)
  • errorRetryCount + isErrorPaused — auto-pause circuit breaker
  • failureCount BigInt — tracks individually failed rows (bad data/type mismatch), separate from rowCount
  • bytesProcessed BigInt — tracks GCP billing units per run
  • createdBy String? — userId of sync creator for avatar display in UI

Performance

  • Explicit indexes on all FK columns — PostgreSQL does not auto-create these
    • warehouse_syncs(projectId), warehouse_syncs(connectionId), warehouse_sync_runs(syncId)

Validation (packages/validation/src/index.ts)

  • zBigQueryWarehouseConfig — GCP project ID + region + SA JSON (validated for required fields)
  • zWarehouseConfig — discriminated union on type, ready for Snowflake arm
  • zBigQueryColumnMappingEvents — all event column fields: profileId, deviceId, eventId, eventName, eventNameStatic, timestamp, insertTime, revenue, jsonProperties
  • zBigQueryColumnMappingProfilesprofileIdColumn, firstName, lastName, email, avatar, createdAt, jsonProperties
  • zBigQuerySyncConfig with superRefine cross-field rules:
    • schedule required for append and full modes, not for onetime
    • insertTime required when syncMode = 'append' with events mapping
    • eventName and eventNameStatic are mutually exclusive
  • jsonProperties on both mappings — maps a BQ JSON column whose keys are flattened into event/profile properties at ingest time

Migrations applied

Migration What it does
20260610115042_warehouse_restructure 3 tables + 5 enums
20260610115043_warehouse_security_fks Composite FK + name_nonempty_check
20260610115044_warehouse_phase1_finalize 3 FK indexes + failureCount + createdBy
20260610115045_warehouse_onetime_mode onetime enum value + schedule made nullable
20260610115046_warehouse_sync_overlap_delay cursorOverlapMinutes + syncDelayMinutes

Verification

  • prisma migrate diff — no schema drift
  • ✅ 21/21 Zod validator probes pass
  • ✅ All FK constraints + composite FK confirmed in live DB
  • ✅ Cross-tenant attack blocked (composite FK probe)
  • name_nonempty_check blocks empty names at DB level
  • ✅ All 3 performance indexes present

What comes next

Phase Title
Phase 2 Connection Management — SA JSON paste, test connection, rotate key, settings tab UI
Phase 3 Sync Configuration — add-sync modal, column mapping UI, sync list
Phase 4 Sync Execution — BullMQ worker, ClickHouse write, manual trigger
Phase 5 Cron Scheduling — hourly cron, isSyncDue(), syncDelayMinutes
Phase 6 Run History UI — live polling, status badges, interrupted run detection

Summary by CodeRabbit

  • New Features
    • Added provider-agnostic Warehouse integrations with named per-project connections, dataset/table syncs, mapping/modes (including one-time), schedules, partition filters, overlap/delay controls, and region/health metadata.
    • Expanded sync run tracking with richer status/metrics, retry and error controls, and failure/byte diagnostics.
    • Event metadata now supports optional descriptions.
    • Added validation for GCP/BigQuery identifiers, service-account JSON, and warehouse/sync/column-mapping configurations.
  • Bug Fixes
    • Improved data integrity and tenant isolation with tightened constraints, composite relationships, and stronger uniqueness/non-empty rules.

Phase 1 of the BigQuery warehouse connector. Adds Prisma models for
BigQueryConnection, BigQuerySync and BigQuerySyncRun with four supporting
enums, two Prisma migrations, typed JSON column via PrismaJson namespace,
Zod schemas (zBigQuerySyncConfig + column mapping variants) in
@openpanel/validation, and the @google-cloud/bigquery dependency.
- Add zBqColRef validator for column references (supports dot-notation
  for STRUCT nested fields like user.profile.email)
- Add mappingType discriminator to both mapping schemas so the union is
  discriminated and TypeScript can narrow the type cleanly
- Add superRefine cross-validation: append mode events syncs must declare
  an insertTime column (the TIMESTAMP cursor)
- Update plan with verified BigQuery Node.js client type mappings:
  INT64 needs wrapIntegers:true, TIMESTAMP/.value not .toISOString(),
  DATETIME has no timezone, BYTES→Buffer, Big for NUMERIC/BIGNUMERIC
Real-world orgs connect multiple data sources to one project (e.g.
jm-ebg and jm-ebg-cdp on the same ROAS project). The original
@unique on projectId wrongly enforced a single connection per project.

- Remove @unique from BigQueryConnection.projectId
- Add name String field (user label, e.g. "CDP Source")
- Replace with @@unique([projectId, name]) — names unique within project
- Change Project.bigQueryConnection? → bigQueryConnections[]
Schema additions:
- BigQueryConnection: gcpRegion (GDPR region compliance), lastTestedAt/lastTestStatus (connection health)
- BigQuerySync: lastSyncStatus typed as enum (was String?), errorRetryCount+isErrorPaused circuit breaker, partitionFilter for cost-safe full-refresh on partitioned tables
- BigQuerySyncRun: rowCount BigInt (INT max ~2.1B insufficient), bytesProcessed for cost tracking

Zod additions:
- zGcpProjectId: GCP project ID format regex (rejects project numbers and display names)
- zBqIdentifier: dataset/table name validator (no hyphens, per BQ naming rules)
- zServiceAccountJson: SA JSON structure check (rejects authorized_user creds before encryption)
- zBigQueryConnectionCreate: connection creation schema with name/region/SA JSON
- zBigQuerySyncConfig: dataset/tableName now use zBqIdentifier, partitionFilter field added
@coderabbitai

coderabbitai Bot commented Jun 8, 2026

Copy link
Copy Markdown
Contributor

Review Change Stack

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: d62a4966-dd01-4c0e-9523-a801bcf42c86

📥 Commits

Reviewing files that changed from the base of the PR and between ca03135 and acc4107.

⛔ Files ignored due to path filters (1)
  • packages/db/src/generated/empty is excluded by !**/generated/**
📒 Files selected for processing (6)
  • packages/db/prisma/migrations/20260611000000_warehouse_run_composite_fk/migration.sql
  • packages/db/prisma/migrations/20260611000001_warehouse_name_btrim_check/migration.sql
  • packages/db/prisma/migrations/20260611000002_warehouse_connection_last_test_error/migration.sql
  • packages/db/prisma/schema.prisma
  • packages/db/src/types.ts
  • packages/validation/src/index.ts
✅ Files skipped from review due to trivial changes (2)
  • packages/db/prisma/migrations/20260611000002_warehouse_connection_last_test_error/migration.sql
  • packages/db/prisma/migrations/20260611000001_warehouse_name_btrim_check/migration.sql
🚧 Files skipped from review as they are similar to previous changes (4)
  • packages/db/prisma/migrations/20260611000000_warehouse_run_composite_fk/migration.sql
  • packages/db/src/types.ts
  • packages/db/prisma/schema.prisma
  • packages/validation/src/index.ts

📝 Walkthrough

Walkthrough

Adds BigQuery connector migrations and generalizes them into a provider-agnostic Warehouse schema; updates Prisma models and type declarations; adds Zod validation for warehouse and BigQuery configuration; and configures runtime dependencies.

Changes

Warehouse & BigQuery connector

Layer / File(s) Summary
EventMeta schema addition
packages/db/prisma/migrations/20260607120000_add_event_meta_description/migration.sql
Adds nullable description column to public.event_meta via ALTER TABLE.
BigQuery foundation schema
packages/db/prisma/migrations/20260608090217_add_bigquery_connector/migration.sql
Introduces BigQuery enums and creates bigquery_connections, bigquery_syncs, bigquery_sync_runs tables with UUID keys, configuration JSONB fields, sync metadata, and FK/index constraints.
BigQuery schema evolution
packages/db/prisma/migrations/20260608091000_bigquery_multi_connection/migration.sql, packages/db/prisma/migrations/20260608120000_bigquery_harden/migration.sql
Enables multiple named connections per project; adds region, health/test tracking, retry, and partition filter fields; upgrades sync status to enum; widens row count to BIGINT and adds bytesProcessed.
BigQuery referential integrity
packages/db/prisma/migrations/20260608140000_bigquery_referential_integrity/migration.sql
Adds missing FK from bigquery_sync_runs to projects; enforces tenant isolation via composite FK/index on syncs; backfills and validates empty connection names; adds database-level non-empty CHECK constraint.
Warehouse restructure
packages/db/prisma/migrations/20260610115042_warehouse_restructure/migration.sql
Drops BigQuery-specific tables and enums; creates provider-agnostic warehouse_* enums and tables (warehouse_connections, warehouse_syncs, warehouse_sync_runs) with uniqueness constraints, name validation, and composite FKs preventing cross-tenant references.
Warehouse phase finalization
packages/db/prisma/migrations/20260610115043_warehouse_security_fks/migration.sql, packages/db/prisma/migrations/20260610115044_warehouse_phase1_finalize/migration.sql, packages/db/prisma/migrations/20260610115045_warehouse_onetime_mode/migration.sql, packages/db/prisma/migrations/20260610115046_warehouse_sync_overlap_delay/migration.sql, packages/db/prisma/migrations/20260611000000_warehouse_run_composite_fk/migration.sql, packages/db/prisma/migrations/20260611000001_warehouse_name_btrim_check/migration.sql, packages/db/prisma/migrations/20260611000002_warehouse_connection_last_test_error/migration.sql
Removes redundant single-column FK; adds performance indexes on FK columns; introduces failureCount and createdBy columns; supports onetime syncs (schedule nullable); adds cursor overlap and sync delay config; enforces composite FK on sync runs for tenant safety; upgrades name constraint to reject whitespace; adds lastTestError for test failure tracking.
Warehouse validation schemas
packages/validation/src/index.ts
Implements GCP/BigQuery identifier validators, service account JSON parsing with key structure validation, warehouse connection/config validators with discriminated types, column mapping schemas for events and profiles (with required/optional field references), and zBigQuerySyncConfig with cross-field refinements requiring schedule for non-onetime modes, cursor timestamps for append mode, and mutual exclusivity of event naming fields.
Prisma schema models and relations
packages/db/prisma/schema.prisma
Declares warehouse enums; adds Project relations to WarehouseConnection, WarehouseSync, and WarehouseSyncRun; defines three new models with encrypted config storage, JSONB column mappings, cursor tracking, state flags, run counters, timestamps, and uniqueness/index constraints; extends EventMeta with optional description.
Type and dependency wiring
packages/db/package.json, packages/db/src/types.ts
Adds @google-cloud/bigquery@^8.3.1 npm dependency; exposes warehouse column-mapping type in global PrismaJson namespace for schema JSON column type safety.

Sequence Diagram

sequenceDiagram
  participant Client
  participant API
  participant Database
  participant CloudStorage
  Client->>API: request create warehouse connection (projectId, name, serviceAccountJson)
  API->>Database: validate & store WarehouseConnection
  API->>CloudStorage: verify service account credentials
  Client->>API: request create sync (connectionId, dataset, table, mapping, syncMode)
  API->>Database: validate & store WarehouseSync with schedule/delay config
  Client->>API: trigger or schedule sync execution
  API->>Database: create WarehouseSyncRun with pending status
  API->>CloudStorage: extract rows and compute bytesProcessed
  API->>Database: update WarehouseSyncRun (status, rowCount, failureCount, bytesProcessed)
Loading

🎯 4 (Complex) | ⏱️ ~50 minutes

🐰 Through BigQuery's gates I tunneled deep,
Then widened paths for warehouses to flow,
With validators keeping secrets safe to keep,
Composite keys guard each schema row,
Migrations march where tenant borders grow.

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately describes the primary change: introducing BigQuery warehouse connector infrastructure with schema, types, and validation as the foundational phase 1.
Docstring Coverage ✅ Passed No functions found in the changed files to evaluate docstring coverage. Skipping docstring coverage check.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@packages/db/package.json`:
- Line 17: Update the `@google-cloud/bigquery` dependency in
packages/db/package.json from ^7.9.1 to ^8.3.1 (look for the dependency key
"`@google-cloud/bigquery`") and run your test suite and any integration checks
that exercise BigQuery calls to catch breaking API changes; after updating,
re-run npm audit / GitHub advisory checks and address any newly surfaced
advisories or required code adjustments in functions that call BigQuery client
methods (e.g., places creating BigQuery clients or invoking methods like
dataset/table/query) to align with the v8 API.

In
`@packages/db/prisma/migrations/20260608090217_add_bigquery_connector/migration.sql`:
- Around line 50-53: The migration creates bigquery_sync_runs with a projectId
column but no FK, allowing runs to be assigned to the wrong project; add
referential integrity by adding a foreign key on bigquery_sync_runs.projectId
referencing the canonical projects table (projects.id) and also ensure
run-to-sync consistency by adding a composite foreign key (syncId, projectId)
referencing bigquery_syncs(id, projectId) (first add a unique constraint on
bigquery_syncs(id, projectId) if needed); update the CREATE TABLE for
bigquery_sync_runs to include these constraints (names like
fk_bigquery_sync_runs_project and fk_bigquery_sync_runs_sync_project) so every
run is tied to an existing project and the syncId matches that project.
- Around line 69-73: The two separate FKs allow mismatched project vs connection
tenants on bigquery_syncs; drop the existing bigquery_syncs_connectionId_fkey
and replace it with a composite foreign key (e.g.
bigquery_syncs_projectId_connectionId_fkey) on (projectId, connectionId) that
REFERENCES "public"."bigquery_connections"(projectId, id) WITH ON DELETE CASCADE
ON UPDATE CASCADE so the referenced connection must belong to the same project;
keep or leave the existing projectId -> projects FK
(bigquery_syncs_projectId_fkey) as-is.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 90092f45-30c5-4c2c-aaa2-84bd90a711d7

📥 Commits

Reviewing files that changed from the base of the PR and between b94d256 and 154e5e2.

⛔ Files ignored due to path filters (1)
  • pnpm-lock.yaml is excluded by !**/pnpm-lock.yaml
📒 Files selected for processing (8)
  • packages/db/package.json
  • packages/db/prisma/migrations/20260607120000_add_event_meta_description/migration.sql
  • packages/db/prisma/migrations/20260608090217_add_bigquery_connector/migration.sql
  • packages/db/prisma/migrations/20260608091000_bigquery_multi_connection/migration.sql
  • packages/db/prisma/migrations/20260608120000_bigquery_harden/migration.sql
  • packages/db/prisma/schema.prisma
  • packages/db/src/types.ts
  • packages/validation/src/index.ts

Comment thread packages/db/package.json Outdated
Comment on lines +50 to +53
CREATE TABLE "public"."bigquery_sync_runs" (
"id" UUID NOT NULL DEFAULT gen_random_uuid(),
"syncId" UUID NOT NULL,
"projectId" TEXT NOT NULL,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

Constrain bigquery_sync_runs.projectId to prevent mismatched run attribution.

Line 53 stores projectId, but there is no FK for it, and Line 76 only validates syncId. A run row can carry a wrong project id, breaking run-history correctness and tenant-scoped queries.

Suggested migration direction
+-- Keep sync/project coupling enforceable
+ALTER TABLE "public"."bigquery_syncs"
+  ADD CONSTRAINT "bigquery_syncs_id_projectId_key" UNIQUE ("id", "projectId");
+
+-- Enforce run project consistency with parent sync
+ALTER TABLE "public"."bigquery_sync_runs"
+  ADD CONSTRAINT "bigquery_sync_runs_syncId_projectId_fkey"
+  FOREIGN KEY ("syncId", "projectId")
+  REFERENCES "public"."bigquery_syncs"("id", "projectId")
+  ON DELETE CASCADE ON UPDATE CASCADE;
+
+-- Optional explicit FK for direct project integrity
+ALTER TABLE "public"."bigquery_sync_runs"
+  ADD CONSTRAINT "bigquery_sync_runs_projectId_fkey"
+  FOREIGN KEY ("projectId")
+  REFERENCES "public"."projects"("id")
+  ON DELETE CASCADE ON UPDATE CASCADE;

Also applies to: 75-76

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In
`@packages/db/prisma/migrations/20260608090217_add_bigquery_connector/migration.sql`
around lines 50 - 53, The migration creates bigquery_sync_runs with a projectId
column but no FK, allowing runs to be assigned to the wrong project; add
referential integrity by adding a foreign key on bigquery_sync_runs.projectId
referencing the canonical projects table (projects.id) and also ensure
run-to-sync consistency by adding a composite foreign key (syncId, projectId)
referencing bigquery_syncs(id, projectId) (first add a unique constraint on
bigquery_syncs(id, projectId) if needed); update the CREATE TABLE for
bigquery_sync_runs to include these constraints (names like
fk_bigquery_sync_runs_project and fk_bigquery_sync_runs_sync_project) so every
run is tied to an existing project and the syncId matches that project.

Comment on lines +69 to +73
-- AddForeignKey
ALTER TABLE "public"."bigquery_syncs" ADD CONSTRAINT "bigquery_syncs_connectionId_fkey" FOREIGN KEY ("connectionId") REFERENCES "public"."bigquery_connections"("id") ON DELETE CASCADE ON UPDATE CASCADE;

-- AddForeignKey
ALTER TABLE "public"."bigquery_syncs" ADD CONSTRAINT "bigquery_syncs_projectId_fkey" FOREIGN KEY ("projectId") REFERENCES "public"."projects"("id") ON DELETE CASCADE ON UPDATE CASCADE;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

Enforce project/connection tenant consistency on bigquery_syncs.

Line 70 and Line 73 create independent foreign keys, so a sync can reference a connectionId from project B while storing projectId from project A. That breaks tenant isolation and can misroute data.

Suggested migration direction
+-- Ensure referenced columns are jointly unique
+ALTER TABLE "public"."bigquery_connections"
+  ADD CONSTRAINT "bigquery_connections_id_projectId_key" UNIQUE ("id", "projectId");
+
+-- Enforce sync belongs to the same project as its connection
+ALTER TABLE "public"."bigquery_syncs"
+  ADD CONSTRAINT "bigquery_syncs_connectionId_projectId_fkey"
+  FOREIGN KEY ("connectionId", "projectId")
+  REFERENCES "public"."bigquery_connections"("id", "projectId")
+  ON DELETE CASCADE ON UPDATE CASCADE;
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In
`@packages/db/prisma/migrations/20260608090217_add_bigquery_connector/migration.sql`
around lines 69 - 73, The two separate FKs allow mismatched project vs
connection tenants on bigquery_syncs; drop the existing
bigquery_syncs_connectionId_fkey and replace it with a composite foreign key
(e.g. bigquery_syncs_projectId_connectionId_fkey) on (projectId, connectionId)
that REFERENCES "public"."bigquery_connections"(projectId, id) WITH ON DELETE
CASCADE ON UPDATE CASCADE so the referenced connection must belong to the same
project; keep or leave the existing projectId -> projects FK
(bigquery_syncs_projectId_fkey) as-is.

@Shrotriya-lalit Shrotriya-lalit force-pushed the feat/bigquery-connector-phase1 branch from 154e5e2 to 7920219 Compare June 8, 2026 10:24

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In
`@packages/db/prisma/migrations/20260608091000_bigquery_multi_connection/migration.sql`:
- Around line 6-9: The migration creates bigquery_connections.name with an
empty-string backfill which leaves existing rows with '' and still allows future
empty-string inserts; update the migration to backfill a meaningful non-empty
value for existing rows (e.g., update "bigquery_connections" set "name" =
concat('connection_', id) or another deterministic non-empty token for rows
where name = '') and then add a DB-level constraint to prevent empty names
(e.g., ALTER TABLE "bigquery_connections" ADD CONSTRAINT ... CHECK
(char_length(name) > 0) or name <> '') so future inserts cannot use ''. Ensure
the ALTER that drops the default comes after the backfill and the CHECK
constraint is added as part of the migration so both existing and new rows are
validated.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: ce3b42d9-fd05-4483-8001-e2ff1812b45c

📥 Commits

Reviewing files that changed from the base of the PR and between 154e5e2 and 7920219.

⛔ Files ignored due to path filters (1)
  • pnpm-lock.yaml is excluded by !**/pnpm-lock.yaml
📒 Files selected for processing (8)
  • packages/db/package.json
  • packages/db/prisma/migrations/20260607120000_add_event_meta_description/migration.sql
  • packages/db/prisma/migrations/20260608090217_add_bigquery_connector/migration.sql
  • packages/db/prisma/migrations/20260608091000_bigquery_multi_connection/migration.sql
  • packages/db/prisma/migrations/20260608120000_bigquery_harden/migration.sql
  • packages/db/prisma/schema.prisma
  • packages/db/src/types.ts
  • packages/validation/src/index.ts
✅ Files skipped from review due to trivial changes (1)
  • packages/db/src/types.ts
🚧 Files skipped from review as they are similar to previous changes (6)
  • packages/db/package.json
  • packages/db/prisma/migrations/20260607120000_add_event_meta_description/migration.sql
  • packages/db/prisma/schema.prisma
  • packages/validation/src/index.ts
  • packages/db/prisma/migrations/20260608090217_add_bigquery_connector/migration.sql
  • packages/db/prisma/migrations/20260608120000_bigquery_harden/migration.sql

Comment on lines +6 to +9
ALTER TABLE "public"."bigquery_connections" ADD COLUMN "name" TEXT NOT NULL DEFAULT '';

-- Remove the default now that column exists
ALTER TABLE "public"."bigquery_connections" ALTER COLUMN "name" DROP DEFAULT;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Backfill leaves invalid empty names and the DB still permits future empty-string inserts.

Line 6 seeds existing rows with '', and after Line 9 those rows remain empty. That conflicts with the non-empty connection-name contract in validation and can leak invalid records into future flows. Please backfill to a non-empty value and enforce non-empty at the database layer.

Suggested migration adjustment
-ALTER TABLE "public"."bigquery_connections" ADD COLUMN "name" TEXT NOT NULL DEFAULT '';
-
--- Remove the default now that column exists
-ALTER TABLE "public"."bigquery_connections" ALTER COLUMN "name" DROP DEFAULT;
+ALTER TABLE "public"."bigquery_connections" ADD COLUMN "name" TEXT;
+UPDATE "public"."bigquery_connections"
+SET "name" = 'default'
+WHERE "name" IS NULL OR btrim("name") = '';
+ALTER TABLE "public"."bigquery_connections" ALTER COLUMN "name" SET NOT NULL;
+ALTER TABLE "public"."bigquery_connections"
+ADD CONSTRAINT "bigquery_connections_name_nonempty_chk"
+CHECK (btrim("name") <> '');
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
ALTER TABLE "public"."bigquery_connections" ADD COLUMN "name" TEXT NOT NULL DEFAULT '';
-- Remove the default now that column exists
ALTER TABLE "public"."bigquery_connections" ALTER COLUMN "name" DROP DEFAULT;
ALTER TABLE "public"."bigquery_connections" ADD COLUMN "name" TEXT;
UPDATE "public"."bigquery_connections"
SET "name" = 'default'
WHERE "name" IS NULL OR btrim("name") = '';
ALTER TABLE "public"."bigquery_connections" ALTER COLUMN "name" SET NOT NULL;
ALTER TABLE "public"."bigquery_connections"
ADD CONSTRAINT "bigquery_connections_name_nonempty_chk"
CHECK (btrim("name") <> '');
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In
`@packages/db/prisma/migrations/20260608091000_bigquery_multi_connection/migration.sql`
around lines 6 - 9, The migration creates bigquery_connections.name with an
empty-string backfill which leaves existing rows with '' and still allows future
empty-string inserts; update the migration to backfill a meaningful non-empty
value for existing rows (e.g., update "bigquery_connections" set "name" =
concat('connection_', id) or another deterministic non-empty token for rows
where name = '') and then add a DB-level constraint to prevent empty names
(e.g., ALTER TABLE "bigquery_connections" ADD CONSTRAINT ... CHECK
(char_length(name) > 0) or name <> '') so future inserts cannot use ''. Ensure
the ALTER that drops the default comes after the backfill and the CHECK
constraint is added as part of the migration so both existing and new rows are
validated.

…d dependency upgrade

- Upgrade @google-cloud/bigquery from ^7.9.1 to ^8.3.1 (latest stable)
- Add FK bigquery_sync_runs.projectId -> projects(id) (was missing, allowing orphan runs)
- Add composite FK bigquery_syncs(projectId, connectionId) -> bigquery_connections(projectId, id)
  to prevent cross-tenant data: a sync can no longer reference a connection from a different project
- Add UNIQUE INDEX bigquery_connections(projectId, id) to back the composite FK
- Add CHECK(char_length(name) > 0) on bigquery_connections to enforce non-empty names at DB level
- Backfill any dev rows with empty name using concat('connection_', id)
…Zod validators

Adds the foundational schema for a multi-provider Data Warehouse Connector
(BigQuery first, extensible to Snowflake/Redshift/Databricks/Postgres).

Three shared Prisma tables:
- warehouse_connections: one row per named connection, any provider
- warehouse_syncs: one sync job per connection
- warehouse_sync_runs: one run record per execution

Key design decisions:
- configEncrypted (AES-256-GCM) + displayIdentifier/displayEmail for
  UI display without decryption
- Composite FK warehouse_syncs(projectId,connectionId) →
  warehouse_connections(projectId,id) blocks cross-tenant exploit at DB level
- Three sync modes: append (cursor), full (reload + stale cleanup), onetime (backfill)
- cursorOverlapMinutes (default 10) rewinds append cursor to catch late-arriving rows
- syncDelayMinutes (default 0) delays cron execution to allow BQ pipelines to land
- Performance indexes on all FK columns (PostgreSQL does not create these automatically)
- jsonProperties column mapping flattens a JSON column into event/profile properties

Validation (packages/validation/src/index.ts):
- zBigQueryWarehouseConfig + zWarehouseConfig discriminated union
- zBigQuerySyncConfig with superRefine cross-field rules:
  schedule required for append/full, insertTime required for append,
  eventName/eventNameStatic mutually exclusive

Migrations applied: 20260610115042–20260610115046
21/21 Zod validator probes pass. Zero schema drift (prisma migrate diff).

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (2)
packages/db/prisma/schema.prisma (1)

794-810: ⚖️ Poor tradeoff

Consider adding composite FK for tenant isolation consistency.

WarehouseSync uses a composite FK (projectId, connectionId) → WarehouseConnection(projectId, id) to prevent cross-tenant references. WarehouseSyncRun has separate single-column FKs for syncId and projectId, so the database does not enforce that projectId matches the parent sync's projectId.

Since runs are created by backend workers (not user input), the risk is limited to application bugs, not direct exploitation. However, adding a composite FK would maintain consistency with the sync→connection pattern and prevent any future data integrity drift.

This would require:

  1. A unique index on WarehouseSync(projectId, id) (similar to what exists on WarehouseConnection)
  2. A composite FK on WarehouseSyncRun(projectId, syncId) → WarehouseSync(projectId, id)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@packages/db/prisma/schema.prisma` around lines 794 - 810, Add a composite
foreign-key relation to enforce tenant isolation by making WarehouseSync expose
a unique (or indexed+unique) key on (projectId, id) and updating
WarehouseSyncRun to reference WarehouseSync via the pair (projectId, syncId);
specifically, add a @@unique or @@index+@@unique for WarehouseSync on
(projectId, id) (if not already present) and change the relation in
WarehouseSyncRun to use relation(fields: [projectId, syncId], references:
[projectId, id]) so the run row cannot reference a sync from a different project
(update the relation name/attributes on the WarehouseSyncRun model accordingly
and remove the single-column-only FK behavior).
packages/db/src/types.ts (1)

31-31: ⚡ Quick win

Naming inconsistency: IWarehouseColumnMapping lacks IPrisma prefix.

All other types in the PrismaJson namespace follow the pattern IPrisma<TypeName> (e.g., IPrismaImportConfig, IPrismaNotificationRuleConfig), but this new type is named IWarehouseColumnMapping without the prefix.

While this works (because it matches the Prisma schema annotation /// [IWarehouseColumnMapping]), the inconsistency may confuse maintainers. Consider renaming both the schema annotation and this type to IPrismaWarehouseColumnMapping to maintain consistency with the established pattern.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@packages/db/src/types.ts` at line 31, Rename the inconsistent type and its
Prisma schema annotation to use the established IPrisma prefix: change the alias
type IWarehouseColumnMapping (currently pointing to IWarehouseColumnMappingType)
to IPrismaWarehouseColumnMapping and update the underlying type name if needed
(IWarehouseColumnMappingType → IPrismaWarehouseColumnMappingType or keep
original type but rename the alias). Also update the Prisma schema annotation
/// [IWarehouseColumnMapping] to /// [IPrismaWarehouseColumnMapping] and
search/replace any references to IWarehouseColumnMapping (in the PrismaJson
namespace and across the codebase) so all usages reference
IPrismaWarehouseColumnMapping to maintain naming consistency.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Nitpick comments:
In `@packages/db/prisma/schema.prisma`:
- Around line 794-810: Add a composite foreign-key relation to enforce tenant
isolation by making WarehouseSync expose a unique (or indexed+unique) key on
(projectId, id) and updating WarehouseSyncRun to reference WarehouseSync via the
pair (projectId, syncId); specifically, add a @@unique or @@index+@@unique for
WarehouseSync on (projectId, id) (if not already present) and change the
relation in WarehouseSyncRun to use relation(fields: [projectId, syncId],
references: [projectId, id]) so the run row cannot reference a sync from a
different project (update the relation name/attributes on the WarehouseSyncRun
model accordingly and remove the single-column-only FK behavior).

In `@packages/db/src/types.ts`:
- Line 31: Rename the inconsistent type and its Prisma schema annotation to use
the established IPrisma prefix: change the alias type IWarehouseColumnMapping
(currently pointing to IWarehouseColumnMappingType) to
IPrismaWarehouseColumnMapping and update the underlying type name if needed
(IWarehouseColumnMappingType → IPrismaWarehouseColumnMappingType or keep
original type but rename the alias). Also update the Prisma schema annotation
/// [IWarehouseColumnMapping] to /// [IPrismaWarehouseColumnMapping] and
search/replace any references to IWarehouseColumnMapping (in the PrismaJson
namespace and across the codebase) so all usages reference
IPrismaWarehouseColumnMapping to maintain naming consistency.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: cdddf040-421e-4c69-afaf-2bcb03567596

📥 Commits

Reviewing files that changed from the base of the PR and between 632bde3 and ca03135.

⛔ Files ignored due to path filters (1)
  • pnpm-lock.yaml is excluded by !**/pnpm-lock.yaml
📒 Files selected for processing (8)
  • packages/db/prisma/migrations/20260610115042_warehouse_restructure/migration.sql
  • packages/db/prisma/migrations/20260610115043_warehouse_security_fks/migration.sql
  • packages/db/prisma/migrations/20260610115044_warehouse_phase1_finalize/migration.sql
  • packages/db/prisma/migrations/20260610115045_warehouse_onetime_mode/migration.sql
  • packages/db/prisma/migrations/20260610115046_warehouse_sync_overlap_delay/migration.sql
  • packages/db/prisma/schema.prisma
  • packages/db/src/types.ts
  • packages/validation/src/index.ts
✅ Files skipped from review due to trivial changes (2)
  • packages/db/prisma/migrations/20260610115046_warehouse_sync_overlap_delay/migration.sql
  • packages/db/prisma/migrations/20260610115045_warehouse_onetime_mode/migration.sql

…v#391

- Rename PrismaJson type alias IWarehouseColumnMapping →
  IPrismaWarehouseColumnMapping to match the established IPrisma* prefix
  convention used by all other types in the namespace
- Update schema.prisma annotation to match (/// [IPrismaWarehouseColumnMapping])
- Add @@unique([id, projectId]) on WarehouseSync to back a composite FK
- Replace single-column syncId FK on WarehouseSyncRun with composite
  (syncId, projectId) → warehouse_syncs(id, projectId) to enforce that
  a run's projectId always matches its parent sync's projectId — closes
  the same cross-tenant gap that the existing composite FK on WarehouseSync
  already closes one level up

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In
`@packages/db/prisma/migrations/20260611000000_warehouse_run_composite_fk/migration.sql`:
- Around line 14-18: Before adding the composite FK ("syncId","projectId") →
warehouse_syncs("id","projectId") in the migration, run a pre-check that counts
mismatched rows between warehouse_sync_runs and warehouse_syncs (use the
provided SELECT with IS DISTINCT FROM) and, if >0, perform a normalization
UPDATE to set warehouse_sync_runs.projectId = warehouse_syncs.projectId for rows
where they differ; place this normalization step after the DROP CONSTRAINT
"warehouse_sync_runs_syncId_fkey" and before ADD CONSTRAINT
"warehouse_sync_runs_syncId_projectId_fkey" so the new FK add will not fail
(keep the unique index creation as-is).
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 3de3952b-f359-4897-8bff-2f7f2775e8af

📥 Commits

Reviewing files that changed from the base of the PR and between ca03135 and 3e7dde4.

📒 Files selected for processing (3)
  • packages/db/prisma/migrations/20260611000000_warehouse_run_composite_fk/migration.sql
  • packages/db/prisma/schema.prisma
  • packages/db/src/types.ts
🚧 Files skipped from review as they are similar to previous changes (2)
  • packages/db/src/types.ts
  • packages/db/prisma/schema.prisma

Before adding the (syncId, projectId) composite FK, normalize any runs
whose projectId doesn't match their parent sync's projectId. Without this
the ADD CONSTRAINT fails if mismatched rows exist. Tables are dev-only now
so this is a no-op, but makes the migration safe to apply to any environment.
Validation fixes (packages/validation/src/index.ts):
- Gap 1: reject whitespace-only connection names (Zod .refine + btrim DB check)
- Gap 2: require createdAt cursor for profiles in append mode (superRefine)
- Gap 4: block SQL injection tokens in partitionFilter (-- /* */ ;)
- Gap B: export IBigQueryWarehouseConfig named type

Schema + migrations:
- migration 20260611000001: tighten name_nonempty_check to char_length(btrim(name)) > 0
- migration 20260611000002: add lastTestError String? to warehouse_connections
  so testConnection and connect can surface a human-readable failure reason
  (permission denied, project not found, key revoked) alongside the boolean status

Regenerated Prisma client to include lastTestError field.
@Shrotriya-lalit

Copy link
Copy Markdown
Author

@coderabbitai resume

@coderabbitai

coderabbitai Bot commented Jun 15, 2026

Copy link
Copy Markdown
Contributor
✅ Action performed

Reviews resumed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant