Skip to content

Commit b48c4c3

Browse files
feat(trigger): add trigger-eu-region flag to switch runs to eu-central-1
Global on/off feature flag routing every Trigger.dev run from the default us-east-1 to eu-central-1 via the per-trigger region option, resolved at each dispatch site through resolveTriggerRegion.
1 parent 844733a commit b48c4c3

19 files changed

Lines changed: 123 additions & 20 deletions

File tree

apps/sim/app/api/table/[tableId]/delete-async/route.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,14 +86,15 @@ export const POST = withRouteHandler(async (request: NextRequest, { params }: Ro
8686
// Trigger.dev runs the delete outside the web container (survives deploys) and retries —
8787
// safe: the keyset + cutoff walk just deletes whatever remains.
8888
try {
89-
const [{ tableDeleteTask }, { tasks }] = await Promise.all([
89+
const [{ tableDeleteTask }, { tasks }, { resolveTriggerRegion }] = await Promise.all([
9090
import('@/background/table-delete'),
9191
import('@trigger.dev/sdk'),
92+
import('@/lib/core/async-jobs/region'),
9293
])
9394
await tasks.trigger<typeof tableDeleteTask>(
9495
'table-delete',
9596
{ jobId, tableId, workspaceId, filter, excludeRowIds, cutoff: cutoff.toISOString() },
96-
{ tags: [`tableId:${tableId}`, `jobId:${jobId}`] }
97+
{ tags: [`tableId:${tableId}`, `jobId:${jobId}`], region: await resolveTriggerRegion() }
9798
)
9899
} catch (error) {
99100
// A failed dispatch must not leave a ghost `running` job holding the

apps/sim/app/api/table/[tableId]/export-async/route.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,12 +61,14 @@ export const POST = withRouteHandler(async (request: NextRequest, { params }: Ro
6161
const payload: TableExportPayload = { jobId, tableId, workspaceId, format }
6262
if (isTriggerDevEnabled) {
6363
try {
64-
const [{ tableExportTask }, { tasks }] = await Promise.all([
64+
const [{ tableExportTask }, { tasks }, { resolveTriggerRegion }] = await Promise.all([
6565
import('@/background/table-export'),
6666
import('@trigger.dev/sdk'),
67+
import('@/lib/core/async-jobs/region'),
6768
])
6869
await tasks.trigger<typeof tableExportTask>('table-export', payload, {
6970
tags: [`tableId:${tableId}`, `jobId:${jobId}`],
71+
region: await resolveTriggerRegion(),
7072
})
7173
} catch (error) {
7274
// A failed dispatch must not leave a ghost `running` job holding the

apps/sim/app/api/table/[tableId]/import-async/route.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,12 +83,14 @@ export const POST = withRouteHandler(async (request: NextRequest, { params }: Ro
8383
if (isTriggerDevEnabled) {
8484
// Trigger.dev runs the import outside the web container, so it survives app deploys.
8585
try {
86-
const [{ tableImportTask }, { tasks }] = await Promise.all([
86+
const [{ tableImportTask }, { tasks }, { resolveTriggerRegion }] = await Promise.all([
8787
import('@/background/table-import'),
8888
import('@trigger.dev/sdk'),
89+
import('@/lib/core/async-jobs/region'),
8990
])
9091
await tasks.trigger<typeof tableImportTask>('table-import', importPayload, {
9192
tags: [`tableId:${tableId}`, `jobId:${importId}`],
93+
region: await resolveTriggerRegion(),
9294
})
9395
} catch (error) {
9496
// A failed dispatch must not leave a ghost `running` job holding the

apps/sim/app/api/table/import-async/route.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,12 +115,14 @@ export const POST = withRouteHandler(async (request: NextRequest) => {
115115
if (isTriggerDevEnabled) {
116116
// Trigger.dev runs the import outside the web container, so it survives app deploys.
117117
try {
118-
const [{ tableImportTask }, { tasks }] = await Promise.all([
118+
const [{ tableImportTask }, { tasks }, { resolveTriggerRegion }] = await Promise.all([
119119
import('@/background/table-import'),
120120
import('@trigger.dev/sdk'),
121+
import('@/lib/core/async-jobs/region'),
121122
])
122123
await tasks.trigger<typeof tableImportTask>('table-import', importPayload, {
123124
tags: [`tableId:${table.id}`, `jobId:${importId}`],
125+
region: await resolveTriggerRegion(),
124126
})
125127
} catch (error) {
126128
// A failed dispatch must not leave a ghost `running` job holding the

apps/sim/app/api/webhooks/agentmail/route.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import {
1919
agentMailMessageSchema,
2020
webhookSvixHeadersSchema,
2121
} from '@/lib/api/contracts/webhooks'
22+
import { resolveTriggerRegion } from '@/lib/core/async-jobs/region'
2223
import { isTriggerDevEnabled } from '@/lib/core/config/env-flags'
2324
import {
2425
assertContentLengthWithinLimit,
@@ -234,6 +235,7 @@ export const POST = withRouteHandler(async (req: Request) => {
234235
{ taskId },
235236
{
236237
tags: [`workspaceId:${result.id}`, `taskId:${taskId}`],
238+
region: await resolveTriggerRegion(),
237239
}
238240
)
239241
await db

apps/sim/lib/a2a/push-notifications.ts

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -111,13 +111,15 @@ export async function notifyTaskStateChange(taskId: string, state: TaskState): P
111111

112112
if (isTriggerDevEnabled) {
113113
try {
114-
const { a2aPushNotificationTask } = await import(
115-
'@/background/a2a-push-notification-delivery'
116-
)
114+
const [{ a2aPushNotificationTask }, { resolveTriggerRegion }] = await Promise.all([
115+
import('@/background/a2a-push-notification-delivery'),
116+
import('@/lib/core/async-jobs/region'),
117+
])
117118
await a2aPushNotificationTask.trigger(
118119
{ taskId, state },
119120
{
120121
tags: [`taskId:${taskId}`],
122+
region: await resolveTriggerRegion(),
121123
}
122124
)
123125
logger.info('Push notification queued to trigger.dev', { taskId, state })

apps/sim/lib/billing/cleanup-dispatcher.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import { getPlanType, type PlanCategory } from '@/lib/billing/plan-helpers'
1010
import { chunkArray } from '@/lib/cleanup/batch-delete'
1111
import { getJobQueue } from '@/lib/core/async-jobs'
1212
import { shouldExecuteInline } from '@/lib/core/async-jobs/config'
13+
import { resolveTriggerRegion } from '@/lib/core/async-jobs/region'
1314
import type { EnqueueOptions } from '@/lib/core/async-jobs/types'
1415
import { isTriggerAvailable } from '@/lib/knowledge/documents/service'
1516
import { isOrganizationWorkspace, WORKSPACE_MODE } from '@/lib/workspaces/policy'
@@ -314,13 +315,15 @@ export async function dispatchCleanupJobs(jobType: CleanupJobType): Promise<{
314315
if (batch.length === 0) return
315316
const currentBatch = batch
316317
batch = []
318+
const region = await resolveTriggerRegion()
317319
const batchResult = await tasks.batchTrigger(
318320
jobType,
319321
currentBatch.map((payload) => ({
320322
payload,
321323
options: {
322324
tags: [`plan:${payload.plan}`, `jobType:${jobType}`],
323325
concurrencyKey: getCleanupConcurrencyKey(jobType),
326+
region,
324327
},
325328
}))
326329
)

apps/sim/lib/copilot/tools/server/table/user-table.ts

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -134,12 +134,14 @@ function shouldImportInBackground(record: { name: string; size: number }): boole
134134
async function dispatchImportJob(payload: TableImportPayload): Promise<void> {
135135
if (isTriggerDevEnabled) {
136136
try {
137-
const [{ tableImportTask }, { tasks }] = await Promise.all([
137+
const [{ tableImportTask }, { tasks }, { resolveTriggerRegion }] = await Promise.all([
138138
import('@/background/table-import'),
139139
import('@trigger.dev/sdk'),
140+
import('@/lib/core/async-jobs/region'),
140141
])
141142
await tasks.trigger<typeof tableImportTask>('table-import', payload, {
142143
tags: [`tableId:${payload.tableId}`, `jobId:${payload.importId}`],
144+
region: await resolveTriggerRegion(),
143145
})
144146
} catch (error) {
145147
await releaseJobClaim(payload.tableId, payload.importId).catch(() => {})
@@ -166,14 +168,15 @@ async function dispatchDeleteJob(params: {
166168
const { jobId, tableId, workspaceId, filter, cutoff, maxRows } = params
167169
if (isTriggerDevEnabled) {
168170
try {
169-
const [{ tableDeleteTask }, { tasks }] = await Promise.all([
171+
const [{ tableDeleteTask }, { tasks }, { resolveTriggerRegion }] = await Promise.all([
170172
import('@/background/table-delete'),
171173
import('@trigger.dev/sdk'),
174+
import('@/lib/core/async-jobs/region'),
172175
])
173176
await tasks.trigger<typeof tableDeleteTask>(
174177
'table-delete',
175178
{ jobId, tableId, workspaceId, filter, cutoff: cutoff.toISOString(), maxRows },
176-
{ tags: [`tableId:${tableId}`, `jobId:${jobId}`] }
179+
{ tags: [`tableId:${tableId}`, `jobId:${jobId}`], region: await resolveTriggerRegion() }
177180
)
178181
} catch (error) {
179182
await releaseJobClaim(tableId, jobId).catch(() => {})
@@ -208,14 +211,15 @@ async function dispatchUpdateJob(params: {
208211
const { jobId, tableId, workspaceId, filter, data, cutoff, maxRows } = params
209212
if (isTriggerDevEnabled) {
210213
try {
211-
const [{ tableUpdateTask }, { tasks }] = await Promise.all([
214+
const [{ tableUpdateTask }, { tasks }, { resolveTriggerRegion }] = await Promise.all([
212215
import('@/background/table-update'),
213216
import('@trigger.dev/sdk'),
217+
import('@/lib/core/async-jobs/region'),
214218
])
215219
await tasks.trigger<typeof tableUpdateTask>(
216220
'table-update',
217221
{ jobId, tableId, workspaceId, filter, data, cutoff: cutoff.toISOString(), maxRows },
218-
{ tags: [`tableId:${tableId}`, `jobId:${jobId}`] }
222+
{ tags: [`tableId:${tableId}`, `jobId:${jobId}`], region: await resolveTriggerRegion() }
219223
)
220224
} catch (error) {
221225
await releaseJobClaim(tableId, jobId).catch(() => {})

apps/sim/lib/core/async-jobs/backends/trigger-dev.ts

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { createLogger } from '@sim/logger'
22
import { taskContext } from '@trigger.dev/core/v3'
33
import { runs, type TriggerOptions, tasks } from '@trigger.dev/sdk'
4+
import { resolveTriggerRegion } from '@/lib/core/async-jobs/region'
45
import {
56
type EnqueueOptions,
67
JOB_STATUS,
@@ -84,6 +85,7 @@ export class TriggerDevJobQueue implements JobQueueBackend {
8485
if (options?.delayMs && options.delayMs > 0) {
8586
triggerOptions.delay = new Date(Date.now() + options.delayMs)
8687
}
88+
triggerOptions.region = await resolveTriggerRegion()
8789
const handle = await tasks.trigger(taskId, enrichedPayload, triggerOptions)
8890

8991
logger.debug('Enqueued job via trigger.dev', { jobId: handle.id, type, taskId, tags })
@@ -125,6 +127,7 @@ export class TriggerDevJobQueue implements JobQueueBackend {
125127
const taskId = JOB_TYPE_TO_TASK_ID[type]
126128
if (!taskId) throw new Error(`Unknown job type: ${type}`)
127129

130+
const region = await resolveTriggerRegion()
128131
const batchItems = items.map(({ payload, options }) => {
129132
const enrichedPayload =
130133
options?.metadata && typeof payload === 'object' && payload !== null
@@ -133,12 +136,12 @@ export class TriggerDevJobQueue implements JobQueueBackend {
133136
const tags = buildTags(options)
134137
const batchItem: {
135138
payload: unknown
136-
options?: { concurrencyKey?: string; tags?: string[] }
139+
options?: { concurrencyKey?: string; tags?: string[]; region?: string }
137140
} = { payload: enrichedPayload }
138-
const batchOpts: { concurrencyKey?: string; tags?: string[] } = {}
141+
const batchOpts: { concurrencyKey?: string; tags?: string[]; region?: string } = { region }
139142
if (options?.concurrencyKey) batchOpts.concurrencyKey = options.concurrencyKey
140143
if (tags.length > 0) batchOpts.tags = tags
141-
if (Object.keys(batchOpts).length > 0) batchItem.options = batchOpts
144+
batchItem.options = batchOpts
142145
return batchItem
143146
})
144147

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/**
2+
* @vitest-environment node
3+
*/
4+
import { beforeEach, describe, expect, it, vi } from 'vitest'
5+
6+
const { mockIsFeatureEnabled } = vi.hoisted(() => ({
7+
mockIsFeatureEnabled: vi.fn(),
8+
}))
9+
10+
vi.mock('@/lib/core/config/feature-flags', () => ({
11+
isFeatureEnabled: mockIsFeatureEnabled,
12+
}))
13+
14+
import {
15+
resolveTriggerRegion,
16+
TRIGGER_REGION_EU_CENTRAL,
17+
TRIGGER_REGION_US_EAST,
18+
} from '@/lib/core/async-jobs/region'
19+
20+
describe('resolveTriggerRegion', () => {
21+
beforeEach(() => {
22+
vi.clearAllMocks()
23+
})
24+
25+
it('returns eu-central-1 when the flag is enabled', async () => {
26+
mockIsFeatureEnabled.mockResolvedValue(true)
27+
expect(await resolveTriggerRegion()).toBe(TRIGGER_REGION_EU_CENTRAL)
28+
expect(mockIsFeatureEnabled).toHaveBeenCalledWith('trigger-eu-region')
29+
})
30+
31+
it('returns us-east-1 when the flag is disabled', async () => {
32+
mockIsFeatureEnabled.mockResolvedValue(false)
33+
expect(await resolveTriggerRegion()).toBe(TRIGGER_REGION_US_EAST)
34+
})
35+
36+
it('evaluates globally, passing no gating context', async () => {
37+
mockIsFeatureEnabled.mockResolvedValue(false)
38+
await resolveTriggerRegion()
39+
expect(mockIsFeatureEnabled).toHaveBeenCalledTimes(1)
40+
expect(mockIsFeatureEnabled.mock.calls[0]).toEqual(['trigger-eu-region'])
41+
})
42+
})

0 commit comments

Comments
 (0)