Skip to content

feat: add configurable BullMQ worker stall options to prevent duplicate job retries#6495

Open
Nek-11 wants to merge 7 commits into
FlowiseAI:mainfrom
Nek-11:fix/bullmq-worker-stall-options
Open

feat: add configurable BullMQ worker stall options to prevent duplicate job retries#6495
Nek-11 wants to merge 7 commits into
FlowiseAI:mainfrom
Nek-11:fix/bullmq-worker-stall-options

Conversation

@Nek-11

@Nek-11 Nek-11 commented Jun 8, 2026

Copy link
Copy Markdown
Contributor

Problem

BullMQ's default stall detection uses a 30-second interval with 1 automatic retry (maxStalledCount: 1). Long-running jobs — such as document store upserts that involve web scraping and LLM summarisation — frequently exceed this threshold and get silently retried.

This causes the entire job to re-run, reinserting all vector embeddings into the database. Even with a record manager in full cleanup mode, the mid-stall inconsistency means duplicates slip through, resulting in a bloated vector store.

Solution

Expose BullMQ's stalledInterval and maxStalledCount Worker options as environment variables so operators can tune them for their workload:

Variable Default Description
WORKER_STALLED_INTERVAL 300000 (5 min) How often BullMQ checks for stalled jobs (ms)
WORKER_MAX_STALLED_COUNT 0 How many times a stalled job is retried before failing

The new defaults are intentionally conservative:

  • 5 minute stall interval gives long-running jobs breathing room before being considered stalled
  • 0 retries means a stalled job fails cleanly rather than being silently re-executed, preventing duplicate side effects

Operators who prefer the old retry behaviour can set WORKER_MAX_STALLED_COUNT=1 to restore it.

Changes

  • packages/server/src/queue/BaseQueue.ts: added WORKER_STALLED_INTERVAL and WORKER_MAX_STALLED_COUNT constants read from env, passed to the BullMQ Worker constructor

Nek-11 added 5 commits May 15, 2025 15:22
BullMQ's default stall detection (30s interval, 1 retry) causes long-running
jobs like document store upserts to be silently retried, resulting in duplicate
vector embeddings in the database.

Adds two new environment variables to control this behaviour:
- WORKER_STALLED_INTERVAL (ms): how often BullMQ checks for stalled jobs.
  Defaults to 300000 (5 min) instead of BullMQ's 30s default.
- WORKER_MAX_STALLED_COUNT: how many times a stalled job is retried before
  failing. Defaults to 0 (fail immediately, no retry) instead of BullMQ's
  default of 1.

Setting WORKER_MAX_STALLED_COUNT=0 prevents duplicate processing on retry
while still allowing the job to be re-triggered manually if needed.

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces two new environment variables, WORKER_STALLED_INTERVAL and WORKER_MAX_STALLED_COUNT, to configure the stalled job interval and maximum stalled count for the queue worker in BaseQueue.ts. The reviewer suggested explicitly specifying the radix in parseInt and implementing fail-fast validation to throw an error if the environment variables contain invalid non-numeric values, rather than silently falling back to defaults.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

Comment thread packages/server/src/queue/BaseQueue.ts Outdated
Comment on lines +9 to +10
const WORKER_STALLED_INTERVAL = process.env.WORKER_STALLED_INTERVAL ? parseInt(process.env.WORKER_STALLED_INTERVAL) : 300000
const WORKER_MAX_STALLED_COUNT = process.env.WORKER_MAX_STALLED_COUNT ? parseInt(process.env.WORKER_MAX_STALLED_COUNT) : 0

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

When parsing environment variables with parseInt, it is safer to specify the radix (base 10) explicitly to avoid any parsing ambiguity. Additionally, if the environment variable is set to an invalid non-numeric value, parseInt will return NaN. To promote fail-fast behavior, we should throw an error for invalid input types from external sources (like environment variables) rather than silently falling back to a default value.

const WORKER_STALLED_INTERVAL = (() => {
  const val = process.env.WORKER_STALLED_INTERVAL;
  if (!val) return 300000;
  const parsed = parseInt(val, 10);
  if (isNaN(parsed)) throw new Error("Invalid WORKER_STALLED_INTERVAL");
  return parsed;
})();
const WORKER_MAX_STALLED_COUNT = (() => {
  const val = process.env.WORKER_MAX_STALLED_COUNT;
  if (!val) return 0;
  const parsed = parseInt(val, 10);
  if (isNaN(parsed)) throw new Error("Invalid WORKER_MAX_STALLED_COUNT");
  return parsed;
})();
References
  1. When handling potentially invalid data from external sources (like an API response), prefer throwing an error for invalid input types rather than silently returning a default or empty value. This promotes fail-fast behavior.

@Nek-11

Nek-11 commented Jun 9, 2026

Copy link
Copy Markdown
Contributor Author

/gemini review again please

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces configurable worker stalled interval and max stalled count settings parsed from environment variables, along with validation checks. The review feedback suggests using nullish coalescing (??) instead of ternary operators for more idiomatic TypeScript environment variable fallbacks, and wrapping the if statement bodies in curly braces for better maintainability.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

Comment thread packages/server/src/queue/BaseQueue.ts Outdated
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant