Skip to content

Commit 8890d7a

Browse files
authored
feat(run-engine,webapp): always report worker queue length metrics (#4029)
## Summary The `runqueue.workerQueue.length` gauge only reported a worker queue's depth while runs were being dequeued from it. When dequeues stop, the metric goes stale or missing, so a queue that has backed up because nothing is draining it can't be alerted on. This adds a small observer that refreshes the observed set of worker queues from the `WorkerInstanceGroup` records on an interval, so every active worker queue (and its scheduled split variant) keeps reporting its length regardless of dequeue activity. The observer is off by default and enabled per service via `RUN_ENGINE_WORKER_QUEUE_OBSERVER_ENABLED`, reads from the read replica, and skips a configurable set of cloud providers (`RUN_ENGINE_WORKER_QUEUE_OBSERVER_EXCLUDED_CLOUD_PROVIDERS`, default `digitalocean`). When enabled it is the source of truth for the observed set, so the per-dequeue registration is skipped on that instance, and it groups by worker queue so the per-instance duplicates collapse to the true depth. Also removes the unused `GET`/`POST /api/v1/workers` endpoints. Their only consumer was a CLI command group that is no longer registered. ## Verification Verified end to end against a local stack: the gauge reports each worker queue's length with no dequeues happening, excludes the configured providers, includes hidden groups, and the removed endpoints return as if they never existed. Added a run-engine test (`workerQueueObservation.test.ts`).
1 parent c06005b commit 8890d7a

9 files changed

Lines changed: 331 additions & 76 deletions

File tree

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
area: webapp
3+
type: breaking
4+
---
5+
6+
Remove the unused worker group management API endpoints (GET and POST /api/v1/workers).
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
area: webapp
3+
type: improvement
4+
---
5+
6+
Optionally report worker queue length metrics continuously (enabled per-service via the RUN_ENGINE_WORKER_QUEUE_OBSERVER_ENABLED env var) so a queue's depth keeps being emitted even when nothing is dequeuing from it.

apps/webapp/app/env.server.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -828,6 +828,12 @@ const EnvironmentSchema = z
828828
RUN_ENGINE_PROCESS_WORKER_QUEUE_DEBOUNCE_MS: z.coerce.number().int().default(200),
829829
RUN_ENGINE_DEQUEUE_BLOCKING_TIMEOUT_SECONDS: z.coerce.number().int().default(10),
830830
RUN_ENGINE_MASTER_QUEUE_CONSUMERS_INTERVAL_MS: z.coerce.number().int().default(1000),
831+
// Off by default. Enable on a single service (e.g. the engine worker) so only one
832+
// instance reports worker queue length, rather than every replica.
833+
RUN_ENGINE_WORKER_QUEUE_OBSERVER_ENABLED: z.string().default("0"),
834+
RUN_ENGINE_WORKER_QUEUE_OBSERVER_INTERVAL_MS: z.coerce.number().int().default(30_000),
835+
// Comma-separated cloud providers to exclude from worker queue length observation.
836+
RUN_ENGINE_WORKER_QUEUE_OBSERVER_EXCLUDED_CLOUD_PROVIDERS: z.string().default("digitalocean"),
831837
RUN_ENGINE_MASTER_QUEUE_COOLOFF_PERIOD_MS: z.coerce.number().int().default(10_000),
832838
RUN_ENGINE_MASTER_QUEUE_COOLOFF_COUNT_THRESHOLD: z.coerce.number().int().default(10),
833839
RUN_ENGINE_MASTER_QUEUE_CONSUMER_DEQUEUE_COUNT: z.coerce.number().int().default(10),

apps/webapp/app/routes/api.v1.workers.ts

Lines changed: 0 additions & 73 deletions
This file was deleted.

apps/webapp/app/v3/runEngine.server.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { RunEngine } from "@internal/run-engine";
22
import { $replica, prisma } from "~/db.server";
33
import { env } from "~/env.server";
44
import { createBatchGlobalRateLimiter } from "~/runEngine/concerns/batchGlobalRateLimiter.server";
5+
import { SCHEDULED_WORKER_QUEUE_SUFFIX } from "~/runEngine/concerns/workerQueueSplit.server";
56
import { logger } from "~/services/logger.server";
67
import { defaultMachine, getCurrentPlan } from "~/services/platform.v3.server";
78
import { singleton } from "~/utils/singleton";
@@ -121,6 +122,18 @@ function createRunEngine() {
121122
},
122123
tracer,
123124
meter,
125+
workerQueueObserver: {
126+
enabled: env.RUN_ENGINE_WORKER_QUEUE_OBSERVER_ENABLED === "1",
127+
intervalMs: env.RUN_ENGINE_WORKER_QUEUE_OBSERVER_INTERVAL_MS,
128+
// Also observe the scheduled split variant of each worker queue. The suffix
129+
// naming convention lives in the webapp, so it is passed in here.
130+
additionalQueueSuffixes: [SCHEDULED_WORKER_QUEUE_SUFFIX],
131+
excludedCloudProviders: env.RUN_ENGINE_WORKER_QUEUE_OBSERVER_EXCLUDED_CLOUD_PROVIDERS.split(
132+
","
133+
)
134+
.map((provider) => provider.trim())
135+
.filter(Boolean),
136+
},
124137
defaultMaxTtl: env.RUN_ENGINE_DEFAULT_MAX_TTL,
125138
heartbeatTimeoutsMs: {
126139
PENDING_EXECUTING: env.RUN_ENGINE_TIMEOUT_PENDING_EXECUTING,

internal-packages/run-engine/src/engine/index.ts

Lines changed: 100 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import {
3333
import { Worker } from "@trigger.dev/redis-worker";
3434
import { assertNever } from "assert-never";
3535
import { EventEmitter } from "node:events";
36-
import { setTimeout } from "node:timers/promises";
36+
import { setInterval, setTimeout } from "node:timers/promises";
3737
import { BatchQueue } from "../batch-queue/index.js";
3838
import type {
3939
BatchItem,
@@ -100,6 +100,7 @@ export class RunEngine {
100100
private heartbeatTimeouts: HeartbeatTimeouts;
101101
private repairSnapshotTimeoutMs: number;
102102
private batchQueue: BatchQueue;
103+
private workerQueueObserverAbortController?: AbortController;
103104

104105
prisma: PrismaClient;
105106
readOnlyPrisma: PrismaReplicaClient;
@@ -474,6 +475,96 @@ export class RunEngine {
474475
machines: this.options.machines,
475476
billingCache: this.billingCache,
476477
});
478+
479+
this.#startWorkerQueueObserver();
480+
}
481+
482+
/**
483+
* Refreshes the set of worker queues observed by the `runqueue.workerQueue.length`
484+
* gauge from the WorkerInstanceGroup records, so the gauge reports each worker queue's
485+
* length even when nothing is dequeuing from it. Includes hidden groups; excludes
486+
* groups whose cloud provider is configured to be excluded (groups with no cloud
487+
* provider are always included).
488+
*
489+
* Only MANAGED groups are observed. UNMANAGED groups are created per project
490+
* (masterQueue `<projectId>-<name>`), so observing them would grow the set, and the
491+
* per-tick Redis fanout, with the number of self-hosted-worker projects rather than
492+
* with the managed regions this gauge is meant to track.
493+
*/
494+
async refreshWorkerQueueObservation() {
495+
const suffixes = this.options.workerQueueObserver?.additionalQueueSuffixes ?? [];
496+
const excludedCloudProviders = new Set(
497+
(this.options.workerQueueObserver?.excludedCloudProviders ?? []).map((p) => p.toLowerCase())
498+
);
499+
500+
// Read from the replica: this is a periodic metrics-only read and worker groups change
501+
// rarely, so a little replication lag is fine and keeps it off the primary.
502+
const workerGroups = await this.readOnlyPrisma.workerInstanceGroup.findMany({
503+
where: { type: "MANAGED" },
504+
select: { masterQueue: true, cloudProvider: true },
505+
});
506+
507+
const workerQueues: string[] = [];
508+
509+
for (const { masterQueue, cloudProvider } of workerGroups) {
510+
if (cloudProvider && excludedCloudProviders.has(cloudProvider.toLowerCase())) {
511+
continue;
512+
}
513+
514+
workerQueues.push(masterQueue);
515+
516+
for (const suffix of suffixes) {
517+
workerQueues.push(`${masterQueue}${suffix}`);
518+
}
519+
}
520+
521+
this.runQueue.setObservableWorkerQueues(workerQueues);
522+
}
523+
524+
#startWorkerQueueObserver() {
525+
if (!this.options.workerQueueObserver?.enabled) {
526+
return;
527+
}
528+
529+
const intervalMs = this.options.workerQueueObserver.intervalMs ?? 30_000;
530+
this.workerQueueObserverAbortController = new AbortController();
531+
532+
this.#runWorkerQueueObserver(
533+
intervalMs,
534+
this.workerQueueObserverAbortController.signal
535+
).catch((error) => {
536+
this.logger.error("Worker queue observer loop crashed", {
537+
error: error instanceof Error ? error.message : String(error),
538+
});
539+
});
540+
}
541+
542+
async #runWorkerQueueObserver(intervalMs: number, signal: AbortSignal) {
543+
const refresh = async () => {
544+
try {
545+
await this.refreshWorkerQueueObservation();
546+
} catch (error) {
547+
this.logger.error("Failed to refresh worker queue observation", {
548+
error: error instanceof Error ? error.message : String(error),
549+
});
550+
}
551+
};
552+
553+
// Refresh once immediately so a freshly started instance reports queue lengths
554+
// without waiting for the first interval, then keep it fresh on an interval.
555+
await refresh();
556+
557+
try {
558+
for await (const _ of setInterval(intervalMs, null, { signal })) {
559+
await refresh();
560+
}
561+
} catch (error) {
562+
if (error instanceof Error && error.name !== "AbortError") {
563+
throw error;
564+
}
565+
566+
this.logger.debug("Worker queue observer stopped");
567+
}
477568
}
478569

479570
//MARK: - Run functions
@@ -1322,8 +1413,11 @@ export class RunEngine {
13221413
blockingPop?: boolean;
13231414
blockingPopTimeoutSeconds?: number;
13241415
}): Promise<DequeuedMessage[]> {
1325-
if (!skipObserving) {
1326-
// We only do this with "prod" worker queues because we don't want to observe dev (e.g. environment) worker queues
1416+
// We only do this with "prod" worker queues because we don't want to observe dev (e.g.
1417+
// environment) worker queues. When the worker queue observer is enabled it is the source
1418+
// of truth for the observed set (and applies the cloud-provider exclusions), so the
1419+
// per-dequeue registration is skipped.
1420+
if (!skipObserving && !this.options.workerQueueObserver?.enabled) {
13271421
this.runQueue.registerObservableWorkerQueue(workerQueue);
13281422
}
13291423

@@ -2061,6 +2155,9 @@ export class RunEngine {
20612155

20622156
async quit() {
20632157
try {
2158+
// stop the worker queue observer loop
2159+
this.workerQueueObserverAbortController?.abort();
2160+
20642161
//stop the run queue
20652162
await this.runQueue.quit();
20662163
await this.worker.stop();

0 commit comments

Comments
 (0)