From c551ba2be5805c3b0e69f4b348697699721dc4ce Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Mon, 22 Jun 2026 18:19:29 +0100 Subject: [PATCH] fix(run-engine): decrement totalWeight in fair-queue weighted env shuffle The weighted env shuffle drew its random pivot against the full set's total weight on every iteration but never reduced that total as items were removed. Once items had been picked, the pivot routinely overshot the remaining weight, the selection loop ran off the end, and the last remaining item was chosen, over-picking whichever env sat at the tail of the set. With fair-queue biases enabled (the default), this left the first env slot fair but skewed later positions by env iteration order instead of by the intended concurrency and capacity weighting. Decrement totalWeight before splicing, matching the sibling queue-order and top-env selection paths. --- .../run-queue/fairQueueSelectionStrategy.ts | 7 +- .../tests/fairQueueSelectionStrategy.test.ts | 84 +++++++++++++++++++ 2 files changed, 89 insertions(+), 2 deletions(-) diff --git a/internal-packages/run-engine/src/run-queue/fairQueueSelectionStrategy.ts b/internal-packages/run-engine/src/run-queue/fairQueueSelectionStrategy.ts index 0e2205e413a..87a2a2becf5 100644 --- a/internal-packages/run-engine/src/run-queue/fairQueueSelectionStrategy.ts +++ b/internal-packages/run-engine/src/run-queue/fairQueueSelectionStrategy.ts @@ -209,7 +209,7 @@ export class FairQueueSelectionStrategy implements RunQueueSelectionStrategy { } #weightedShuffle(weightedItems: WeightedEnv[]): string[] { - const totalWeight = weightedItems.reduce((sum, item) => sum + item.weight, 0); + let totalWeight = weightedItems.reduce((sum, item) => sum + item.weight, 0); const result: string[] = []; const items = [...weightedItems]; @@ -224,8 +224,11 @@ export class FairQueueSelectionStrategy implements RunQueueSelectionStrategy { } index = Math.max(0, index - 1); - // Add selected item to result and remove from items + // Add selected item to result and remove from items. Decrement totalWeight + // so the next draw is scaled to the remaining items; otherwise random + // routinely overshoots the shrinking set and the tail item is over-picked. result.push(items[index].envId); + totalWeight -= items[index].weight; items.splice(index, 1); } diff --git a/internal-packages/run-engine/src/run-queue/tests/fairQueueSelectionStrategy.test.ts b/internal-packages/run-engine/src/run-queue/tests/fairQueueSelectionStrategy.test.ts index fa315d3d8a2..bbaa62016fd 100644 --- a/internal-packages/run-engine/src/run-queue/tests/fairQueueSelectionStrategy.test.ts +++ b/internal-packages/run-engine/src/run-queue/tests/fairQueueSelectionStrategy.test.ts @@ -1203,6 +1203,90 @@ describe("FairDequeuingStrategy", () => { expect(queuesByEnv["env-1"]).toBeDefined(); expect(queuesByEnv["env-1"].length).toBe(2); }); + + redisTest( + "weighted env shuffle stays uniform across every position for equal-weight envs", + async ({ redisOptions: redis }) => { + const keyProducer = new RunQueueFullKeyProducer(); + // Biases must be non-zero so env ordering goes through the weighted shuffle + // path rather than the plain shuffle short-circuit. + const strategy = new FairQueueSelectionStrategy({ + redis, + keys: keyProducer, + defaultEnvConcurrencyLimit: 10, + parentQueueLimit: 100, + seed: "weighted-shuffle-seed", + biases: { + concurrencyLimitBias: 0.75, + availableCapacityBias: 0.3, + queueAgeRandomization: 0, + }, + }); + + const now = Date.now(); + + // Four envs, each its own org, one queue each. Identical concurrency limit + // and current usage means identical weights, so a correct weighted shuffle + // should land each env in each position equally often. Insertion order is + // alphabetical by org, which is the order the tail-overshoot bug skews by. + const envIds = ["env-1", "env-2", "env-3", "env-4"]; + for (let i = 0; i < envIds.length; i++) { + const orgId = `org-${i + 1}`; + const projectId = `proj-${i + 1}`; + const envId = envIds[i]; + + await setupQueue({ + redis, + keyProducer, + parentQueue: "parent-queue", + score: now - 1000, + queueId: `queue-${envId}`, + orgId, + projectId, + envId, + }); + + await setupConcurrency({ + redis, + keyProducer, + env: { envId, projectId, orgId, currentConcurrency: 5, limit: 10 }, + }); + } + + const iterations = 2000; + // positionCounts[position][envId] = times envId landed in that position + const positionCounts: Array> = envIds.map(() => + Object.fromEntries(envIds.map((envId) => [envId, 0])) + ); + + for (let i = 0; i < iterations; i++) { + const envResult = await strategy.distributeFairQueuesFromParentQueue( + "parent-queue", + `consumer-${i % 3}` + ); + const result = flattenResults(envResult); + expect(result).toHaveLength(envIds.length); + + result.forEach((queueId, position) => { + const envId = keyProducer.envIdFromQueue(queueId); + positionCounts[position][envId]++; + }); + } + + // For equal-weight envs the share at every position should be ~1/N. The + // tail-overshoot bug leaves position 0 fair but skews later positions hard + // (one env far below, the tail env far above). Assert each share stays + // within 40% of uniform at every position, which the bug violates. + const expectedShare = 100 / envIds.length; + for (let position = 0; position < envIds.length; position++) { + for (const envId of envIds) { + const share = (positionCounts[position][envId] / iterations) * 100; + expect(share).toBeGreaterThan(expectedShare * 0.6); + expect(share).toBeLessThan(expectedShare * 1.4); + } + } + } + ); }); // Helper function to flatten results for counting