diff --git a/src/parallel-route-transforms.ts b/src/parallel-route-transforms.ts index 3dd61ff..94b3d3c 100644 --- a/src/parallel-route-transforms.ts +++ b/src/parallel-route-transforms.ts @@ -1,9 +1,6 @@ import { Worker } from 'node:worker_threads'; import { setBoundedCacheEntry } from './bounded-cache.js'; -import { - getAvailableCpuCount, - getDefaultConcurrency, -} from './concurrency.js'; +import { getAvailableCpuCount, getDefaultConcurrency } from './concurrency.js'; import { executeRouteTransformTask, type RouteTransformResult, @@ -123,7 +120,7 @@ class ParallelRouteTransformExecutor implements RouteTransformExecutor { #nextRouteModuleWorkerIndex = 0; #nextSplitRouteAnalysisWorkerIndex = 0; #splitRouteAnalysisWorkers = new Map(); - #workers: WorkerState[] | undefined; + #workers: Array | undefined; constructor( private readonly workerCount: number, @@ -155,7 +152,9 @@ class ParallelRouteTransformExecutor implements RouteTransformExecutor { return this.#closePromise; } this.#closed = true; - const workers = this.#workers ?? []; + const workers = (this.#workers ?? []).filter( + (state): state is WorkerState => Boolean(state) + ); this.#workers = []; this.#closePromise = Promise.all( workers.map(async state => { @@ -174,7 +173,9 @@ class ParallelRouteTransformExecutor implements RouteTransformExecutor { return; } this.#workersDisabled = true; - const workers = this.#workers ?? []; + const workers = (this.#workers ?? []).filter( + (state): state is WorkerState => Boolean(state) + ); this.#workers = []; for (const state of workers) { for (const pending of state.pending.values()) { @@ -225,33 +226,24 @@ class ParallelRouteTransformExecutor implements RouteTransformExecutor { return state; } - #getWorkers(): WorkerState[] { + #getWorker(index: number): WorkerState | undefined { if (this.#closed || this.#workersDisabled) { - return []; + return undefined; } - if (this.#workers) { - return this.#workers; - } - const workers: WorkerState[] = []; - try { - for (let index = 0; index < this.workerCount; index += 1) { - workers.push(this.#createWorkerState()); - } - } catch (error) { - for (const state of workers) { - void state.worker.terminate(); - } - this.#workers = []; - throw error; + this.#workers ??= new Array(this.workerCount); + const existingWorker = this.#workers[index]; + if (existingWorker) { + return existingWorker; } - this.#workers = workers; - return workers; + + const worker = this.#createWorkerState(); + this.#workers[index] = worker; + return worker; } #runInWorker(task: RouteTransformTask): Promise { - const workers = this.#getWorkers(); - const workerIndex = this.#getWorkerIndex(task, workers.length); - const state = workers[workerIndex]; + const workerIndex = this.#getWorkerIndex(task, this.workerCount); + const state = this.#getWorker(workerIndex); if (!state) { return executeRouteTransformTask(task, this.options); } diff --git a/tests/parallel-route-transforms.test.ts b/tests/parallel-route-transforms.test.ts index fdc4fe5..0b94840 100644 --- a/tests/parallel-route-transforms.test.ts +++ b/tests/parallel-route-transforms.test.ts @@ -3,6 +3,7 @@ import { mapWithConcurrency } from '../src/concurrency'; import { getExportNames } from '../src/export-utils'; import { executeRouteTransformTask, + type RouteTransformResult, type RouteModuleTransformTask, } from '../src/route-transform-tasks'; import { @@ -76,6 +77,18 @@ class FakeRouteTransformWorker { } } +const resolveWorkerMessage = ( + worker: FakeRouteTransformWorker, + result: RouteTransformResult, + messageIndex = worker.messages.length - 1 +): void => { + worker.emit('message', { + id: worker.messages[messageIndex]!.id, + ok: true, + result, + } satisfies WorkerResponse); +}; + describe('parallel route transforms', () => { it.each([ [1, 0], @@ -162,17 +175,47 @@ describe('parallel route transforms', () => { const pending = executor.run(createRouteModuleTask()); expect(createdWorkers).toBe(1); - worker.emit('message', { - id: worker.messages[0]!.id, - ok: true, - result: { code: 'created lazily' }, - } satisfies WorkerResponse); + resolveWorkerMessage(worker, { code: 'created lazily' }); await expect(pending).resolves.toEqual({ code: 'created lazily' }); await executor.close(); expect(worker.terminateCalls).toBe(1); }); + it('creates only worker slots that receive scheduled work', async () => { + const workers: FakeRouteTransformWorker[] = []; + const executor = createRouteTransformExecutorForTesting( + { + parallelTransforms: 4, + splitRouteModules: true, + }, + () => { + const worker = new FakeRouteTransformWorker(); + workers.push(worker); + return worker; + } + ); + + const first = executor.run(createRouteModuleTask()); + const second = executor.run( + createRouteModuleTask({ resourcePath: '/app/routes/other.tsx' }) + ); + + expect(workers).toHaveLength(2); + + for (const worker of workers) { + resolveWorkerMessage(worker, { code: 'done' }); + } + + await expect(Promise.all([first, second])).resolves.toEqual([ + { code: 'done' }, + { code: 'done' }, + ]); + + await executor.close(); + expect(workers.map(worker => worker.terminateCalls)).toEqual([1, 1]); + }); + it('executes route client entry tasks through the shared task executor', async () => { await expect( executeRouteTransformTask({