Skip to content

Commit 239ba82

Browse files
authored
perf(controlplane): denormalize organization_id onto workflow_runs (#3127)
Signed-off-by: Miguel Martinez Trivino <miguel@chainloop.dev>
1 parent 856f7f4 commit 239ba82

21 files changed

Lines changed: 1034 additions & 51 deletions

app/controlplane/pkg/biz/workflowrun_integration_test.go

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,89 @@ func (s *workflowRunIntegrationTestSuite) TestList() {
130130
}
131131
}
132132

133+
func (s *workflowRunIntegrationTestSuite) TestListExcludesSoftDeletedWorkflows() {
134+
ctx := context.Background()
135+
136+
// Create a fresh workflow + run, soft-delete the workflow, and make sure
137+
// the run is excluded from List. Regression guard for the case where
138+
// org-scoped reads use the denormalized organization_id column on
139+
// workflow_runs and could otherwise leak runs from deleted workflows.
140+
wf, err := s.Workflow.Create(ctx, &biz.WorkflowCreateOpts{
141+
Name: "to-be-deleted", OrgID: s.org2.ID, Project: "test-project",
142+
})
143+
s.Require().NoError(err)
144+
145+
_, err = s.WorkflowRun.Create(ctx, &biz.WorkflowRunCreateOpts{
146+
WorkflowID: wf.ID.String(), ContractRevision: s.contractVersion, CASBackendID: s.casBackend.ID,
147+
})
148+
s.Require().NoError(err)
149+
150+
s.Require().NoError(s.Workflow.Delete(ctx, s.org2.ID, wf.ID.String()))
151+
152+
got, _, err := s.WorkflowRun.List(ctx, s.org2.ID, &biz.RunListFilters{}, &pagination.CursorOptions{Limit: 10})
153+
s.Require().NoError(err)
154+
for _, r := range got {
155+
s.NotEqual(wf.ID, r.Workflow.ID, "run from soft-deleted workflow leaked into List")
156+
}
157+
}
158+
159+
func (s *workflowRunIntegrationTestSuite) TestListIsolatedByOrg() {
160+
ctx := context.Background()
161+
162+
// org1 has runOrg1; org2 has runOrg2 + runOrg2Public (see setupWorkflowRunTestData).
163+
// Regression guard for the org-scoping switch from the workflows edge to
164+
// the denormalized organization_id column on workflow_runs.
165+
got, _, err := s.WorkflowRun.List(ctx, s.org.ID, &biz.RunListFilters{}, &pagination.CursorOptions{Limit: 10})
166+
s.Require().NoError(err)
167+
gotIDs := make([]uuid.UUID, 0, len(got))
168+
for _, r := range got {
169+
gotIDs = append(gotIDs, r.ID)
170+
}
171+
s.ElementsMatch([]uuid.UUID{s.runOrg1.ID}, gotIDs, "org1 List leaked runs from another org")
172+
173+
got, _, err = s.WorkflowRun.List(ctx, s.org2.ID, &biz.RunListFilters{}, &pagination.CursorOptions{Limit: 10})
174+
s.Require().NoError(err)
175+
gotIDs = gotIDs[:0]
176+
for _, r := range got {
177+
gotIDs = append(gotIDs, r.ID)
178+
}
179+
s.ElementsMatch([]uuid.UUID{s.runOrg2.ID, s.runOrg2Public.ID}, gotIDs, "org2 List did not return its own runs")
180+
}
181+
182+
func (s *workflowRunIntegrationTestSuite) TestListFilterByProjectIDs() {
183+
ctx := context.Background()
184+
185+
// Create a second workflow in a different project in org2 and a run for it.
186+
// Filtering by the original project's ID should exclude this run.
187+
otherProjectWF, err := s.Workflow.Create(ctx, &biz.WorkflowCreateOpts{
188+
Name: "wf-other-project", OrgID: s.org2.ID, Project: "other-project",
189+
})
190+
s.Require().NoError(err)
191+
192+
otherProjectRun, err := s.WorkflowRun.Create(ctx, &biz.WorkflowRunCreateOpts{
193+
WorkflowID: otherProjectWF.ID.String(), ContractRevision: s.contractVersion, CASBackendID: s.casBackend.ID,
194+
})
195+
s.Require().NoError(err)
196+
197+
// With no project filter, all org2 runs (including the new one) are returned.
198+
allRuns, _, err := s.WorkflowRun.List(ctx, s.org2.ID, &biz.RunListFilters{}, &pagination.CursorOptions{Limit: 10})
199+
s.Require().NoError(err)
200+
allIDs := make([]uuid.UUID, 0, len(allRuns))
201+
for _, r := range allRuns {
202+
allIDs = append(allIDs, r.ID)
203+
}
204+
s.Contains(allIDs, otherProjectRun.ID)
205+
206+
// Filtering by the original project's ID excludes runs from other projects.
207+
filtered, _, err := s.WorkflowRun.List(ctx, s.org2.ID,
208+
&biz.RunListFilters{ProjectIDs: []uuid.UUID{s.workflowOrg2.ProjectID}},
209+
&pagination.CursorOptions{Limit: 10})
210+
s.Require().NoError(err)
211+
for _, r := range filtered {
212+
s.NotEqual(otherProjectRun.ID, r.ID, "run from non-selected project leaked into List")
213+
}
214+
}
215+
133216
func (s *workflowRunIntegrationTestSuite) TestSaveAttestation() {
134217
assert := assert.New(s.T())
135218
ctx := context.Background()

app/controlplane/pkg/data/ent/client.go

Lines changed: 32 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
-- atlas:txmode none
2+
3+
-- Denormalize organization_id onto workflow_runs so org-scoped list/aggregate
4+
-- queries become sargable without joining workflows.
5+
--
6+
-- Why a trigger?
7+
--
8+
-- The control plane deploys as a multi-replica Deployment with rolling
9+
-- updates. When this migration runs (in the initContainer of a new pod),
10+
-- old pods are still serving traffic with code that does NOT set
11+
-- organization_id on INSERT. The moment step 6 below enforces NOT NULL,
12+
-- every INSERT from those old pods would fail with a constraint violation
13+
-- until the rolling update replaces them — a window of seconds to minutes
14+
-- in which workflow run creation is broken org-wide.
15+
--
16+
-- The BEFORE INSERT trigger below bridges that window: whenever a writer
17+
-- doesn't set organization_id, the trigger fills it from the parent
18+
-- workflow via a single PK lookup (~0.1ms). New code paths set the column
19+
-- explicitly so the trigger's IF check short-circuits; the trigger only
20+
-- does real work for inserts coming from the old replicas. Once every
21+
-- replica runs the new code, the trigger is dead weight — a follow-up
22+
-- release will drop both the trigger and its function.
23+
24+
-- 1. Nullable add (catalog-only, instant).
25+
ALTER TABLE "workflow_runs" ADD COLUMN "organization_id" uuid;
26+
27+
-- 2. FK NOT VALID (no scan, brief AccessExclusive lock).
28+
ALTER TABLE "workflow_runs"
29+
ADD CONSTRAINT "workflow_runs_organizations_workflowruns"
30+
FOREIGN KEY ("organization_id") REFERENCES "organizations" ("id")
31+
ON UPDATE NO ACTION ON DELETE CASCADE NOT VALID;
32+
33+
-- 3. Trigger function: fills organization_id from the parent workflow when
34+
-- the caller didn't set it. Removed by a follow-up migration in the next
35+
-- release once all replicas set the column explicitly.
36+
CREATE OR REPLACE FUNCTION fill_workflow_run_organization_id() RETURNS trigger AS $$
37+
BEGIN
38+
IF NEW.organization_id IS NULL THEN
39+
SELECT organization_id INTO NEW.organization_id
40+
FROM "workflows" WHERE id = NEW.workflow_id;
41+
END IF;
42+
RETURN NEW;
43+
END;
44+
$$ LANGUAGE plpgsql;
45+
46+
CREATE TRIGGER workflow_runs_fill_organization_id
47+
BEFORE INSERT ON "workflow_runs"
48+
FOR EACH ROW EXECUTE FUNCTION fill_workflow_run_organization_id();
49+
50+
-- 4. Batched backfill. Concurrent inserts from old replicas are protected by
51+
-- the trigger above, so they can't introduce new NULL rows mid-loop.
52+
-- One COMMIT per batch keeps the longest row-lock window in the millisecond
53+
-- range and avoids one giant WAL entry.
54+
DO $$
55+
DECLARE
56+
rows_done INT;
57+
BEGIN
58+
LOOP
59+
WITH batch AS (
60+
SELECT wr.id, w.organization_id
61+
FROM "workflow_runs" wr
62+
JOIN "workflows" w ON wr.workflow_id = w.id
63+
WHERE wr.organization_id IS NULL
64+
LIMIT 5000
65+
)
66+
UPDATE "workflow_runs" wr
67+
SET organization_id = b.organization_id
68+
FROM batch b
69+
WHERE wr.id = b.id;
70+
GET DIAGNOSTICS rows_done = ROW_COUNT;
71+
COMMIT;
72+
EXIT WHEN rows_done = 0;
73+
END LOOP;
74+
END $$;
75+
76+
-- 5. Validate the FK now that data is consistent. SHARE UPDATE EXCLUSIVE
77+
-- permits concurrent DML.
78+
ALTER TABLE "workflow_runs" VALIDATE CONSTRAINT "workflow_runs_organizations_workflowruns";
79+
80+
-- 6. Enforce NOT NULL. In PG 12+ this is a verify-only scan (no rewrite).
81+
-- Safe because the trigger guarantees no concurrent NULL inserts.
82+
ALTER TABLE "workflow_runs" ALTER COLUMN "organization_id" SET NOT NULL;
83+
84+
-- 7. Create the org-scoped list index without blocking writes.
85+
CREATE INDEX CONCURRENTLY "workflowrun_organization_id_created_at"
86+
ON "workflow_runs" ("organization_id", "created_at" DESC);

app/controlplane/pkg/data/ent/migrate/migrations/atlas.sum

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
h1:Ob1pZMVVtMju/FJnsU/Li8Fg2DjcXUxzYDuaBWk9vp0=
1+
h1:OSH+lqOh2mE49KklM6mUMDQjrL1N2nGHdz9aERNstTM=
22
20230706165452_init-schema.sql h1:VvqbNFEQnCvUVyj2iDYVQQxDM0+sSXqocpt/5H64k8M=
33
20230710111950-cas-backend.sql h1:A8iBuSzZIEbdsv9ipBtscZQuaBp3V5/VMw7eZH6GX+g=
44
20230712094107-cas-backends-workflow-runs.sql h1:a5rzxpVGyd56nLRSsKrmCFc9sebg65RWzLghKHh5xvI=
@@ -134,3 +134,4 @@ h1:Ob1pZMVVtMju/FJnsU/Li8Fg2DjcXUxzYDuaBWk9vp0=
134134
20260504100323.sql h1:FP8y59ZXFUsyskbIfl/1nE7vo4OJcOPuALy3pAJaStQ=
135135
20260511202105.sql h1:Tw9OkiWm7cT4p2pNklSUGM9DzKS38uUuYjXl8BdIwnQ=
136136
20260514150303.sql h1:0bGVXYN5rBP9Hn9x/ou8JgKotKiFbSKWGHX2dBH/wCA=
137+
20260516210119.sql h1:rfBnXQwPnrhVYAp/OIiFPGcS+Tx1x9CAMSDPGs8HIT8=

app/controlplane/pkg/data/ent/migrate/schema.go

Lines changed: 28 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -782,6 +782,7 @@ var (
782782
{Name: "policy_violations_count", Type: field.TypeInt32, Nullable: true},
783783
{Name: "policy_violations_suppressed", Type: field.TypeInt32, Nullable: true},
784784
{Name: "policy_has_gates", Type: field.TypeBool, Nullable: true},
785+
{Name: "organization_id", Type: field.TypeUUID},
785786
{Name: "version_id", Type: field.TypeUUID},
786787
{Name: "workflow_id", Type: field.TypeUUID},
787788
{Name: "workflow_run_contract_version", Type: field.TypeUUID, Nullable: true},
@@ -793,20 +794,26 @@ var (
793794
PrimaryKey: []*schema.Column{WorkflowRunsColumns[0]},
794795
ForeignKeys: []*schema.ForeignKey{
795796
{
796-
Symbol: "workflow_runs_project_versions_runs",
797+
Symbol: "workflow_runs_organizations_workflowruns",
797798
Columns: []*schema.Column{WorkflowRunsColumns[20]},
799+
RefColumns: []*schema.Column{OrganizationsColumns[0]},
800+
OnDelete: schema.Cascade,
801+
},
802+
{
803+
Symbol: "workflow_runs_project_versions_runs",
804+
Columns: []*schema.Column{WorkflowRunsColumns[21]},
798805
RefColumns: []*schema.Column{ProjectVersionsColumns[0]},
799806
OnDelete: schema.NoAction,
800807
},
801808
{
802809
Symbol: "workflow_runs_workflows_workflowruns",
803-
Columns: []*schema.Column{WorkflowRunsColumns[21]},
810+
Columns: []*schema.Column{WorkflowRunsColumns[22]},
804811
RefColumns: []*schema.Column{WorkflowsColumns[0]},
805812
OnDelete: schema.Cascade,
806813
},
807814
{
808815
Symbol: "workflow_runs_workflow_contract_versions_contract_version",
809-
Columns: []*schema.Column{WorkflowRunsColumns[22]},
816+
Columns: []*schema.Column{WorkflowRunsColumns[23]},
810817
RefColumns: []*schema.Column{WorkflowContractVersionsColumns[0]},
811818
OnDelete: schema.Cascade,
812819
},
@@ -825,7 +832,7 @@ var (
825832
{
826833
Name: "workflowrun_workflow_id_created_at",
827834
Unique: false,
828-
Columns: []*schema.Column{WorkflowRunsColumns[21], WorkflowRunsColumns[1]},
835+
Columns: []*schema.Column{WorkflowRunsColumns[22], WorkflowRunsColumns[1]},
829836
Annotation: &entsql.IndexAnnotation{
830837
DescColumns: map[string]bool{
831838
WorkflowRunsColumns[1].Name: true,
@@ -835,7 +842,17 @@ var (
835842
{
836843
Name: "workflowrun_workflow_id_state_created_at",
837844
Unique: false,
838-
Columns: []*schema.Column{WorkflowRunsColumns[21], WorkflowRunsColumns[3], WorkflowRunsColumns[1]},
845+
Columns: []*schema.Column{WorkflowRunsColumns[22], WorkflowRunsColumns[3], WorkflowRunsColumns[1]},
846+
Annotation: &entsql.IndexAnnotation{
847+
DescColumns: map[string]bool{
848+
WorkflowRunsColumns[1].Name: true,
849+
},
850+
},
851+
},
852+
{
853+
Name: "workflowrun_organization_id_created_at",
854+
Unique: false,
855+
Columns: []*schema.Column{WorkflowRunsColumns[20], WorkflowRunsColumns[1]},
839856
Annotation: &entsql.IndexAnnotation{
840857
DescColumns: map[string]bool{
841858
WorkflowRunsColumns[1].Name: true,
@@ -860,12 +877,12 @@ var (
860877
{
861878
Name: "workflowrun_workflow_id",
862879
Unique: false,
863-
Columns: []*schema.Column{WorkflowRunsColumns[21]},
880+
Columns: []*schema.Column{WorkflowRunsColumns[22]},
864881
},
865882
{
866883
Name: "workflowrun_version_id_workflow_id",
867884
Unique: false,
868-
Columns: []*schema.Column{WorkflowRunsColumns[20], WorkflowRunsColumns[21]},
885+
Columns: []*schema.Column{WorkflowRunsColumns[21], WorkflowRunsColumns[22]},
869886
},
870887
{
871888
Name: "workflowrun_policy_status",
@@ -1014,9 +1031,10 @@ func init() {
10141031
WorkflowsTable.ForeignKeys[3].RefTable = WorkflowRunsTable
10151032
WorkflowContractsTable.ForeignKeys[0].RefTable = OrganizationsTable
10161033
WorkflowContractVersionsTable.ForeignKeys[0].RefTable = WorkflowContractsTable
1017-
WorkflowRunsTable.ForeignKeys[0].RefTable = ProjectVersionsTable
1018-
WorkflowRunsTable.ForeignKeys[1].RefTable = WorkflowsTable
1019-
WorkflowRunsTable.ForeignKeys[2].RefTable = WorkflowContractVersionsTable
1034+
WorkflowRunsTable.ForeignKeys[0].RefTable = OrganizationsTable
1035+
WorkflowRunsTable.ForeignKeys[1].RefTable = ProjectVersionsTable
1036+
WorkflowRunsTable.ForeignKeys[2].RefTable = WorkflowsTable
1037+
WorkflowRunsTable.ForeignKeys[3].RefTable = WorkflowContractVersionsTable
10201038
ReferrerReferencesTable.ForeignKeys[0].RefTable = ReferrersTable
10211039
ReferrerReferencesTable.ForeignKeys[1].RefTable = ReferrersTable
10221040
ReferrerWorkflowsTable.ForeignKeys[0].RefTable = ReferrersTable

0 commit comments

Comments
 (0)