diff --git a/internal-packages/run-engine/src/run-queue/fairQueueSelectionStrategy.ts b/internal-packages/run-engine/src/run-queue/fairQueueSelectionStrategy.ts index 0e2205e413..87a2a2becf 100644 --- a/internal-packages/run-engine/src/run-queue/fairQueueSelectionStrategy.ts +++ b/internal-packages/run-engine/src/run-queue/fairQueueSelectionStrategy.ts @@ -209,7 +209,7 @@ export class FairQueueSelectionStrategy implements RunQueueSelectionStrategy { } #weightedShuffle(weightedItems: WeightedEnv[]): string[] { - const totalWeight = weightedItems.reduce((sum, item) => sum + item.weight, 0); + let totalWeight = weightedItems.reduce((sum, item) => sum + item.weight, 0); const result: string[] = []; const items = [...weightedItems]; @@ -224,8 +224,11 @@ export class FairQueueSelectionStrategy implements RunQueueSelectionStrategy { } index = Math.max(0, index - 1); - // Add selected item to result and remove from items + // Add selected item to result and remove from items. Decrement totalWeight + // so the next draw is scaled to the remaining items; otherwise random + // routinely overshoots the shrinking set and the tail item is over-picked. result.push(items[index].envId); + totalWeight -= items[index].weight; items.splice(index, 1); } diff --git a/internal-packages/run-engine/src/run-queue/tests/fairQueueSelectionStrategy.test.ts b/internal-packages/run-engine/src/run-queue/tests/fairQueueSelectionStrategy.test.ts index fa315d3d8a..bbaa62016f 100644 --- a/internal-packages/run-engine/src/run-queue/tests/fairQueueSelectionStrategy.test.ts +++ b/internal-packages/run-engine/src/run-queue/tests/fairQueueSelectionStrategy.test.ts @@ -1203,6 +1203,90 @@ describe("FairDequeuingStrategy", () => { expect(queuesByEnv["env-1"]).toBeDefined(); expect(queuesByEnv["env-1"].length).toBe(2); }); + + redisTest( + "weighted env shuffle stays uniform across every position for equal-weight envs", + async ({ redisOptions: redis }) => { + const keyProducer = new RunQueueFullKeyProducer(); + // Biases must be non-zero so env ordering goes through the weighted shuffle + // path rather than the plain shuffle short-circuit. + const strategy = new FairQueueSelectionStrategy({ + redis, + keys: keyProducer, + defaultEnvConcurrencyLimit: 10, + parentQueueLimit: 100, + seed: "weighted-shuffle-seed", + biases: { + concurrencyLimitBias: 0.75, + availableCapacityBias: 0.3, + queueAgeRandomization: 0, + }, + }); + + const now = Date.now(); + + // Four envs, each its own org, one queue each. Identical concurrency limit + // and current usage means identical weights, so a correct weighted shuffle + // should land each env in each position equally often. Insertion order is + // alphabetical by org, which is the order the tail-overshoot bug skews by. + const envIds = ["env-1", "env-2", "env-3", "env-4"]; + for (let i = 0; i < envIds.length; i++) { + const orgId = `org-${i + 1}`; + const projectId = `proj-${i + 1}`; + const envId = envIds[i]; + + await setupQueue({ + redis, + keyProducer, + parentQueue: "parent-queue", + score: now - 1000, + queueId: `queue-${envId}`, + orgId, + projectId, + envId, + }); + + await setupConcurrency({ + redis, + keyProducer, + env: { envId, projectId, orgId, currentConcurrency: 5, limit: 10 }, + }); + } + + const iterations = 2000; + // positionCounts[position][envId] = times envId landed in that position + const positionCounts: Array> = envIds.map(() => + Object.fromEntries(envIds.map((envId) => [envId, 0])) + ); + + for (let i = 0; i < iterations; i++) { + const envResult = await strategy.distributeFairQueuesFromParentQueue( + "parent-queue", + `consumer-${i % 3}` + ); + const result = flattenResults(envResult); + expect(result).toHaveLength(envIds.length); + + result.forEach((queueId, position) => { + const envId = keyProducer.envIdFromQueue(queueId); + positionCounts[position][envId]++; + }); + } + + // For equal-weight envs the share at every position should be ~1/N. The + // tail-overshoot bug leaves position 0 fair but skews later positions hard + // (one env far below, the tail env far above). Assert each share stays + // within 40% of uniform at every position, which the bug violates. + const expectedShare = 100 / envIds.length; + for (let position = 0; position < envIds.length; position++) { + for (const envId of envIds) { + const share = (positionCounts[position][envId] / iterations) * 100; + expect(share).toBeGreaterThan(expectedShare * 0.6); + expect(share).toBeLessThan(expectedShare * 1.4); + } + } + } + ); }); // Helper function to flatten results for counting