Skip to content

Commit 707c3cc

Browse files
feat(trigger): add trigger-eu-region flag to switch runs to eu-central-1 (#5173)
* feat(trigger): add trigger-eu-region flag to switch runs to eu-central-1 Global on/off feature flag routing every Trigger.dev run from the default us-east-1 to eu-central-1 via the per-trigger region option, resolved at each dispatch site through resolveTriggerRegion. * test(trigger): mock resolveTriggerRegion in delete-async route test The route now pulls in feature-flags (which imports isAppConfigEnabled from env-flags); the test's partial env-flags mock made that access throw. Stub the region module and assert the region option on the dispatch.
1 parent 844733a commit 707c3cc

20 files changed

Lines changed: 127 additions & 21 deletions

File tree

apps/sim/app/api/table/[tableId]/delete-async/route.test.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,9 @@ vi.mock('@/lib/core/config/env-flags', () => ({
3939
},
4040
}))
4141
vi.mock('@/background/table-delete', () => ({ tableDeleteTask: { id: 'table-delete' } }))
42+
vi.mock('@/lib/core/async-jobs/region', () => ({
43+
resolveTriggerRegion: vi.fn().mockResolvedValue('us-east-1'),
44+
}))
4245
vi.mock('@trigger.dev/sdk', () => ({
4346
tasks: { trigger: mockTasksTrigger },
4447
task: (config: unknown) => config,
@@ -196,7 +199,7 @@ describe('POST /api/table/[tableId]/delete-async', () => {
196199
excludeRowIds: ['row_keep'],
197200
cutoff: expect.any(String),
198201
}),
199-
{ tags: ['tableId:tbl_1', 'jobId:job-id-xyz'] }
202+
{ tags: ['tableId:tbl_1', 'jobId:job-id-xyz'], region: 'us-east-1' }
200203
)
201204
})
202205

apps/sim/app/api/table/[tableId]/delete-async/route.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,14 +86,15 @@ export const POST = withRouteHandler(async (request: NextRequest, { params }: Ro
8686
// Trigger.dev runs the delete outside the web container (survives deploys) and retries —
8787
// safe: the keyset + cutoff walk just deletes whatever remains.
8888
try {
89-
const [{ tableDeleteTask }, { tasks }] = await Promise.all([
89+
const [{ tableDeleteTask }, { tasks }, { resolveTriggerRegion }] = await Promise.all([
9090
import('@/background/table-delete'),
9191
import('@trigger.dev/sdk'),
92+
import('@/lib/core/async-jobs/region'),
9293
])
9394
await tasks.trigger<typeof tableDeleteTask>(
9495
'table-delete',
9596
{ jobId, tableId, workspaceId, filter, excludeRowIds, cutoff: cutoff.toISOString() },
96-
{ tags: [`tableId:${tableId}`, `jobId:${jobId}`] }
97+
{ tags: [`tableId:${tableId}`, `jobId:${jobId}`], region: await resolveTriggerRegion() }
9798
)
9899
} catch (error) {
99100
// A failed dispatch must not leave a ghost `running` job holding the

apps/sim/app/api/table/[tableId]/export-async/route.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,12 +61,14 @@ export const POST = withRouteHandler(async (request: NextRequest, { params }: Ro
6161
const payload: TableExportPayload = { jobId, tableId, workspaceId, format }
6262
if (isTriggerDevEnabled) {
6363
try {
64-
const [{ tableExportTask }, { tasks }] = await Promise.all([
64+
const [{ tableExportTask }, { tasks }, { resolveTriggerRegion }] = await Promise.all([
6565
import('@/background/table-export'),
6666
import('@trigger.dev/sdk'),
67+
import('@/lib/core/async-jobs/region'),
6768
])
6869
await tasks.trigger<typeof tableExportTask>('table-export', payload, {
6970
tags: [`tableId:${tableId}`, `jobId:${jobId}`],
71+
region: await resolveTriggerRegion(),
7072
})
7173
} catch (error) {
7274
// A failed dispatch must not leave a ghost `running` job holding the

apps/sim/app/api/table/[tableId]/import-async/route.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,12 +83,14 @@ export const POST = withRouteHandler(async (request: NextRequest, { params }: Ro
8383
if (isTriggerDevEnabled) {
8484
// Trigger.dev runs the import outside the web container, so it survives app deploys.
8585
try {
86-
const [{ tableImportTask }, { tasks }] = await Promise.all([
86+
const [{ tableImportTask }, { tasks }, { resolveTriggerRegion }] = await Promise.all([
8787
import('@/background/table-import'),
8888
import('@trigger.dev/sdk'),
89+
import('@/lib/core/async-jobs/region'),
8990
])
9091
await tasks.trigger<typeof tableImportTask>('table-import', importPayload, {
9192
tags: [`tableId:${tableId}`, `jobId:${importId}`],
93+
region: await resolveTriggerRegion(),
9294
})
9395
} catch (error) {
9496
// A failed dispatch must not leave a ghost `running` job holding the

apps/sim/app/api/table/import-async/route.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,12 +115,14 @@ export const POST = withRouteHandler(async (request: NextRequest) => {
115115
if (isTriggerDevEnabled) {
116116
// Trigger.dev runs the import outside the web container, so it survives app deploys.
117117
try {
118-
const [{ tableImportTask }, { tasks }] = await Promise.all([
118+
const [{ tableImportTask }, { tasks }, { resolveTriggerRegion }] = await Promise.all([
119119
import('@/background/table-import'),
120120
import('@trigger.dev/sdk'),
121+
import('@/lib/core/async-jobs/region'),
121122
])
122123
await tasks.trigger<typeof tableImportTask>('table-import', importPayload, {
123124
tags: [`tableId:${table.id}`, `jobId:${importId}`],
125+
region: await resolveTriggerRegion(),
124126
})
125127
} catch (error) {
126128
// A failed dispatch must not leave a ghost `running` job holding the

apps/sim/app/api/webhooks/agentmail/route.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import {
1919
agentMailMessageSchema,
2020
webhookSvixHeadersSchema,
2121
} from '@/lib/api/contracts/webhooks'
22+
import { resolveTriggerRegion } from '@/lib/core/async-jobs/region'
2223
import { isTriggerDevEnabled } from '@/lib/core/config/env-flags'
2324
import {
2425
assertContentLengthWithinLimit,
@@ -234,6 +235,7 @@ export const POST = withRouteHandler(async (req: Request) => {
234235
{ taskId },
235236
{
236237
tags: [`workspaceId:${result.id}`, `taskId:${taskId}`],
238+
region: await resolveTriggerRegion(),
237239
}
238240
)
239241
await db

apps/sim/lib/a2a/push-notifications.ts

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -111,13 +111,15 @@ export async function notifyTaskStateChange(taskId: string, state: TaskState): P
111111

112112
if (isTriggerDevEnabled) {
113113
try {
114-
const { a2aPushNotificationTask } = await import(
115-
'@/background/a2a-push-notification-delivery'
116-
)
114+
const [{ a2aPushNotificationTask }, { resolveTriggerRegion }] = await Promise.all([
115+
import('@/background/a2a-push-notification-delivery'),
116+
import('@/lib/core/async-jobs/region'),
117+
])
117118
await a2aPushNotificationTask.trigger(
118119
{ taskId, state },
119120
{
120121
tags: [`taskId:${taskId}`],
122+
region: await resolveTriggerRegion(),
121123
}
122124
)
123125
logger.info('Push notification queued to trigger.dev', { taskId, state })

apps/sim/lib/billing/cleanup-dispatcher.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import { getPlanType, type PlanCategory } from '@/lib/billing/plan-helpers'
1010
import { chunkArray } from '@/lib/cleanup/batch-delete'
1111
import { getJobQueue } from '@/lib/core/async-jobs'
1212
import { shouldExecuteInline } from '@/lib/core/async-jobs/config'
13+
import { resolveTriggerRegion } from '@/lib/core/async-jobs/region'
1314
import type { EnqueueOptions } from '@/lib/core/async-jobs/types'
1415
import { isTriggerAvailable } from '@/lib/knowledge/documents/service'
1516
import { isOrganizationWorkspace, WORKSPACE_MODE } from '@/lib/workspaces/policy'
@@ -314,13 +315,15 @@ export async function dispatchCleanupJobs(jobType: CleanupJobType): Promise<{
314315
if (batch.length === 0) return
315316
const currentBatch = batch
316317
batch = []
318+
const region = await resolveTriggerRegion()
317319
const batchResult = await tasks.batchTrigger(
318320
jobType,
319321
currentBatch.map((payload) => ({
320322
payload,
321323
options: {
322324
tags: [`plan:${payload.plan}`, `jobType:${jobType}`],
323325
concurrencyKey: getCleanupConcurrencyKey(jobType),
326+
region,
324327
},
325328
}))
326329
)

apps/sim/lib/copilot/tools/server/table/user-table.ts

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -134,12 +134,14 @@ function shouldImportInBackground(record: { name: string; size: number }): boole
134134
async function dispatchImportJob(payload: TableImportPayload): Promise<void> {
135135
if (isTriggerDevEnabled) {
136136
try {
137-
const [{ tableImportTask }, { tasks }] = await Promise.all([
137+
const [{ tableImportTask }, { tasks }, { resolveTriggerRegion }] = await Promise.all([
138138
import('@/background/table-import'),
139139
import('@trigger.dev/sdk'),
140+
import('@/lib/core/async-jobs/region'),
140141
])
141142
await tasks.trigger<typeof tableImportTask>('table-import', payload, {
142143
tags: [`tableId:${payload.tableId}`, `jobId:${payload.importId}`],
144+
region: await resolveTriggerRegion(),
143145
})
144146
} catch (error) {
145147
await releaseJobClaim(payload.tableId, payload.importId).catch(() => {})
@@ -166,14 +168,15 @@ async function dispatchDeleteJob(params: {
166168
const { jobId, tableId, workspaceId, filter, cutoff, maxRows } = params
167169
if (isTriggerDevEnabled) {
168170
try {
169-
const [{ tableDeleteTask }, { tasks }] = await Promise.all([
171+
const [{ tableDeleteTask }, { tasks }, { resolveTriggerRegion }] = await Promise.all([
170172
import('@/background/table-delete'),
171173
import('@trigger.dev/sdk'),
174+
import('@/lib/core/async-jobs/region'),
172175
])
173176
await tasks.trigger<typeof tableDeleteTask>(
174177
'table-delete',
175178
{ jobId, tableId, workspaceId, filter, cutoff: cutoff.toISOString(), maxRows },
176-
{ tags: [`tableId:${tableId}`, `jobId:${jobId}`] }
179+
{ tags: [`tableId:${tableId}`, `jobId:${jobId}`], region: await resolveTriggerRegion() }
177180
)
178181
} catch (error) {
179182
await releaseJobClaim(tableId, jobId).catch(() => {})
@@ -208,14 +211,15 @@ async function dispatchUpdateJob(params: {
208211
const { jobId, tableId, workspaceId, filter, data, cutoff, maxRows } = params
209212
if (isTriggerDevEnabled) {
210213
try {
211-
const [{ tableUpdateTask }, { tasks }] = await Promise.all([
214+
const [{ tableUpdateTask }, { tasks }, { resolveTriggerRegion }] = await Promise.all([
212215
import('@/background/table-update'),
213216
import('@trigger.dev/sdk'),
217+
import('@/lib/core/async-jobs/region'),
214218
])
215219
await tasks.trigger<typeof tableUpdateTask>(
216220
'table-update',
217221
{ jobId, tableId, workspaceId, filter, data, cutoff: cutoff.toISOString(), maxRows },
218-
{ tags: [`tableId:${tableId}`, `jobId:${jobId}`] }
222+
{ tags: [`tableId:${tableId}`, `jobId:${jobId}`], region: await resolveTriggerRegion() }
219223
)
220224
} catch (error) {
221225
await releaseJobClaim(tableId, jobId).catch(() => {})

apps/sim/lib/core/async-jobs/backends/trigger-dev.ts

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { createLogger } from '@sim/logger'
22
import { taskContext } from '@trigger.dev/core/v3'
33
import { runs, type TriggerOptions, tasks } from '@trigger.dev/sdk'
4+
import { resolveTriggerRegion } from '@/lib/core/async-jobs/region'
45
import {
56
type EnqueueOptions,
67
JOB_STATUS,
@@ -84,6 +85,7 @@ export class TriggerDevJobQueue implements JobQueueBackend {
8485
if (options?.delayMs && options.delayMs > 0) {
8586
triggerOptions.delay = new Date(Date.now() + options.delayMs)
8687
}
88+
triggerOptions.region = await resolveTriggerRegion()
8789
const handle = await tasks.trigger(taskId, enrichedPayload, triggerOptions)
8890

8991
logger.debug('Enqueued job via trigger.dev', { jobId: handle.id, type, taskId, tags })
@@ -125,6 +127,7 @@ export class TriggerDevJobQueue implements JobQueueBackend {
125127
const taskId = JOB_TYPE_TO_TASK_ID[type]
126128
if (!taskId) throw new Error(`Unknown job type: ${type}`)
127129

130+
const region = await resolveTriggerRegion()
128131
const batchItems = items.map(({ payload, options }) => {
129132
const enrichedPayload =
130133
options?.metadata && typeof payload === 'object' && payload !== null
@@ -133,12 +136,12 @@ export class TriggerDevJobQueue implements JobQueueBackend {
133136
const tags = buildTags(options)
134137
const batchItem: {
135138
payload: unknown
136-
options?: { concurrencyKey?: string; tags?: string[] }
139+
options?: { concurrencyKey?: string; tags?: string[]; region?: string }
137140
} = { payload: enrichedPayload }
138-
const batchOpts: { concurrencyKey?: string; tags?: string[] } = {}
141+
const batchOpts: { concurrencyKey?: string; tags?: string[]; region?: string } = { region }
139142
if (options?.concurrencyKey) batchOpts.concurrencyKey = options.concurrencyKey
140143
if (tags.length > 0) batchOpts.tags = tags
141-
if (Object.keys(batchOpts).length > 0) batchItem.options = batchOpts
144+
batchItem.options = batchOpts
142145
return batchItem
143146
})
144147

0 commit comments

Comments
 (0)