-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathprogress-parallel.sample.js
More file actions
71 lines (63 loc) · 1.97 KB
/
progress-parallel.sample.js
File metadata and controls
71 lines (63 loc) · 1.97 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
/**
* Parallel progress tracking sample.
*
* @author Admilson B. F. Cossa
* SPDX-License-Identifier: Apache-2.0
*
* Shows how to watch one specific named task while many sibling tasks run in
* parallel. Progress is collected from typed task events, not console output.
*/
import assert from "node:assert/strict";
import { run } from "../dist/index.js";
const TARGET_NAME = "embed.batch.7";
const TOTAL = 16;
const CONCURRENCY = TOTAL;
const taskNames = new Map();
const targetProgress = [];
let active = 0;
let maxActive = 0;
const results = await run.scope(async (scope) => {
scope.onEvent((event) => {
if (event.type === "task:started") {
taskNames.set(event.taskId, event.name);
}
if (event.type === "task:progress" && taskNames.get(event.taskId) === TARGET_NAME) {
targetProgress.push({ pct: event.pct, message: event.message });
}
});
const handles = Array.from({ length: TOTAL }, (_, index) => scope.spawn(async (ctx) => {
active++;
maxActive = Math.max(maxActive, active);
if (index === 7) {
for (const step of [1, 2, 3, 4]) {
ctx.report({ pct: step / 4, message: `chunk-${step}` });
await sleep(1, ctx.signal);
}
} else {
await sleep(8, ctx.signal);
}
active--;
return index;
}, { name: `embed.batch.${index}`, kind: "llm" }));
return await Promise.all(handles);
}, { name: "progress.parallel" });
assert.equal(results.length, TOTAL);
assert.equal(maxActive, CONCURRENCY);
assert.deepEqual(targetProgress.map((event) => event.pct), [0.25, 0.5, 0.75, 1]);
process.stdout.write(`${JSON.stringify({
sample: "progress-parallel",
target: TARGET_NAME,
totalTasks: TOTAL,
concurrency: CONCURRENCY,
maxActive,
progress: targetProgress,
})}\n`);
function sleep(ms, signal) {
return new Promise((resolve, reject) => {
const timer = setTimeout(resolve, ms);
signal.addEventListener("abort", () => {
clearTimeout(timer);
reject(signal.reason);
}, { once: true });
});
}