Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 20 additions & 28 deletions src/parallel-route-transforms.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
import { Worker } from 'node:worker_threads';
import { setBoundedCacheEntry } from './bounded-cache.js';
import {
getAvailableCpuCount,
getDefaultConcurrency,
} from './concurrency.js';
import { getAvailableCpuCount, getDefaultConcurrency } from './concurrency.js';
import {
executeRouteTransformTask,
type RouteTransformResult,
Expand Down Expand Up @@ -123,7 +120,7 @@ class ParallelRouteTransformExecutor implements RouteTransformExecutor {
#nextRouteModuleWorkerIndex = 0;
#nextSplitRouteAnalysisWorkerIndex = 0;
#splitRouteAnalysisWorkers = new Map<string, number>();
#workers: WorkerState[] | undefined;
#workers: Array<WorkerState | undefined> | undefined;

constructor(
private readonly workerCount: number,
Expand Down Expand Up @@ -155,7 +152,9 @@ class ParallelRouteTransformExecutor implements RouteTransformExecutor {
return this.#closePromise;
}
this.#closed = true;
const workers = this.#workers ?? [];
const workers = (this.#workers ?? []).filter(
(state): state is WorkerState => Boolean(state)
);
this.#workers = [];
this.#closePromise = Promise.all(
workers.map(async state => {
Expand All @@ -174,7 +173,9 @@ class ParallelRouteTransformExecutor implements RouteTransformExecutor {
return;
}
this.#workersDisabled = true;
const workers = this.#workers ?? [];
const workers = (this.#workers ?? []).filter(
(state): state is WorkerState => Boolean(state)
);
this.#workers = [];
for (const state of workers) {
for (const pending of state.pending.values()) {
Expand Down Expand Up @@ -225,33 +226,24 @@ class ParallelRouteTransformExecutor implements RouteTransformExecutor {
return state;
}

#getWorkers(): WorkerState[] {
#getWorker(index: number): WorkerState | undefined {
if (this.#closed || this.#workersDisabled) {
return [];
return undefined;
}
if (this.#workers) {
return this.#workers;
}
const workers: WorkerState[] = [];
try {
for (let index = 0; index < this.workerCount; index += 1) {
workers.push(this.#createWorkerState());
}
} catch (error) {
for (const state of workers) {
void state.worker.terminate();
}
this.#workers = [];
throw error;
this.#workers ??= new Array(this.workerCount);
const existingWorker = this.#workers[index];
if (existingWorker) {
return existingWorker;
}
this.#workers = workers;
return workers;

const worker = this.#createWorkerState();
this.#workers[index] = worker;
return worker;
}

#runInWorker(task: RouteTransformTask): Promise<RouteTransformResult> {
const workers = this.#getWorkers();
const workerIndex = this.#getWorkerIndex(task, workers.length);
const state = workers[workerIndex];
const workerIndex = this.#getWorkerIndex(task, this.workerCount);
const state = this.#getWorker(workerIndex);
if (!state) {
return executeRouteTransformTask(task, this.options);
}
Expand Down
53 changes: 48 additions & 5 deletions tests/parallel-route-transforms.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { mapWithConcurrency } from '../src/concurrency';
import { getExportNames } from '../src/export-utils';
import {
executeRouteTransformTask,
type RouteTransformResult,
type RouteModuleTransformTask,
} from '../src/route-transform-tasks';
import {
Expand Down Expand Up @@ -76,6 +77,18 @@ class FakeRouteTransformWorker {
}
}

const resolveWorkerMessage = (
worker: FakeRouteTransformWorker,
result: RouteTransformResult,
messageIndex = worker.messages.length - 1
): void => {
worker.emit('message', {
id: worker.messages[messageIndex]!.id,
ok: true,
result,
} satisfies WorkerResponse);
};

describe('parallel route transforms', () => {
it.each([
[1, 0],
Expand Down Expand Up @@ -162,17 +175,47 @@ describe('parallel route transforms', () => {

const pending = executor.run(createRouteModuleTask());
expect(createdWorkers).toBe(1);
worker.emit('message', {
id: worker.messages[0]!.id,
ok: true,
result: { code: 'created lazily' },
} satisfies WorkerResponse);
resolveWorkerMessage(worker, { code: 'created lazily' });
await expect(pending).resolves.toEqual({ code: 'created lazily' });

await executor.close();
expect(worker.terminateCalls).toBe(1);
});

it('creates only worker slots that receive scheduled work', async () => {
const workers: FakeRouteTransformWorker[] = [];
const executor = createRouteTransformExecutorForTesting(
{
parallelTransforms: 4,
splitRouteModules: true,
},
() => {
const worker = new FakeRouteTransformWorker();
workers.push(worker);
return worker;
}
);

const first = executor.run(createRouteModuleTask());
const second = executor.run(
createRouteModuleTask({ resourcePath: '/app/routes/other.tsx' })
);

expect(workers).toHaveLength(2);

for (const worker of workers) {
resolveWorkerMessage(worker, { code: 'done' });
}

await expect(Promise.all([first, second])).resolves.toEqual([
{ code: 'done' },
{ code: 'done' },
]);

await executor.close();
expect(workers.map(worker => worker.terminateCalls)).toEqual([1, 1]);
});

it('executes route client entry tasks through the shared task executor', async () => {
await expect(
executeRouteTransformTask({
Expand Down
Loading