Skip to content

Commit d28e784

Browse files
authored
Merge pull request #32 from AthennaIO/develop
feat(worker): add concurrency option
2 parents f1013fa + f551382 commit d28e784

11 files changed

Lines changed: 141 additions & 93 deletions

File tree

.github/workflows/ci.yml

Lines changed: 0 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -36,29 +36,3 @@ jobs:
3636

3737
- name: Test code compilation
3838
run: npm run build
39-
40-
windows:
41-
runs-on: windows-latest
42-
strategy:
43-
matrix:
44-
node-version:
45-
- 21.x
46-
steps:
47-
- uses: actions/checkout@v2
48-
- name: Use Node.js ${{ matrix.node-version }}
49-
uses: actions/setup-node@v1
50-
with:
51-
node-version: ${{ matrix.node-version }}
52-
53-
- name: Install dependencies
54-
run: npm install
55-
56-
- name: Run tests
57-
run: npm run test:coverage
58-
env:
59-
AWS_REGION: ${{ secrets.AWS_REGION }}
60-
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
61-
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
62-
63-
- name: Test code compilation
64-
run: npm run build

bin/test.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,5 @@ await Runner.setTsEnv()
1616
.addPath('tests/unit/**/*.ts')
1717
.setCliArgs(process.argv.slice(2))
1818
.setGlobalTimeout(30000)
19+
.setForceExit()
1920
.run()
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/**
2+
* @athenna/queue
3+
*
4+
* (c) João Lenon <lenon@athenna.io>
5+
*
6+
* For the full copyright and license information, please view the LICENSE
7+
* file that was distributed with this source code.
8+
*/
9+
10+
import { Exception } from '@athenna/common'
11+
12+
export class WorkerTimeoutException extends Exception {
13+
public constructor(taskName: string) {
14+
const message = `The worker task ${taskName} has timed out.`
15+
16+
super({
17+
message,
18+
code: 'E_WORKER_TIMEOUT_ERROR'
19+
})
20+
}
21+
}

src/kernels/WorkerKernel.ts

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,6 @@
77
* file that was distributed with this source code.
88
*/
99

10-
/**
11-
* @athenna/http
12-
*
13-
* (c) João Lenon <lenon@athenna.io>
14-
*
15-
* For the full copyright and license information, please view the LICENSE
16-
* file that was distributed with this source code.
17-
*/
18-
1910
import 'reflect-metadata'
2011

2112
import { debug } from '#src/debug'
@@ -132,6 +123,10 @@ export class WorkerKernel {
132123
ioc.alias(meta.camelAlias, meta.alias)
133124
}
134125

126+
if (meta.concurrency) {
127+
builder.concurrency(meta.concurrency)
128+
}
129+
135130
builder.connection(meta.connection).handler(ctx => {
136131
const worker =
137132
ioc.use(meta.name) ||

src/types/WorkerOptions.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,13 @@ export type WorkerOptions = {
1515
*/
1616
name?: string
1717

18+
/**
19+
* Define how much instances of the same worker could run in parallel.
20+
*
21+
* @default 1
22+
*/
23+
concurrency?: number
24+
1825
/**
1926
* The queue connection that will be used to get the configurations.
2027
*

src/worker/WorkerTaskBuilder.ts

Lines changed: 69 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,13 @@
77
* file that was distributed with this source code.
88
*/
99

10-
import { Is } from '@athenna/common'
1110
import { Log } from '@athenna/logger'
1211
import { Queue } from '#src/facades/Queue'
12+
import { Is, Parser } from '@athenna/common'
1313
import { WorkerImpl } from '#src/worker/WorkerImpl'
1414
import type { Context, ConnectionOptions } from '#src/types'
1515
import type { WorkerHandler } from '#src/types/WorkerHandler'
16+
import { WorkerTimeoutException } from '#src/exceptions/WorkerTimeoutException'
1617

1718
export class WorkerTaskBuilder {
1819
public worker: {
@@ -22,14 +23,14 @@ export class WorkerTaskBuilder {
2223
name?: string
2324

2425
/**
25-
* The queue connection of the worker task.
26+
* Define the maximun number of concurrent processes of the same worker.
2627
*/
27-
connection?: string
28+
concurrency?: number
2829

2930
/**
30-
* The interval instance of the worker task.
31+
* The queue connection of the worker task.
3132
*/
32-
interval?: NodeJS.Timeout
33+
connection?: string
3334

3435
/**
3536
* Define if the worker task is registered.
@@ -45,13 +46,10 @@ export class WorkerTaskBuilder {
4546
* The handler of the worker task.
4647
*/
4748
handler?: (ctx: Context) => any | Promise<any>
48-
49-
/**
50-
* Define if the worker task is running.
51-
*/
52-
isRunning?: boolean
5349
} = {}
5450

51+
private timers: NodeJS.Timeout[] = []
52+
5553
public constructor() {
5654
this.worker.connection = Config.get('queue.default')
5755
}
@@ -70,6 +68,20 @@ export class WorkerTaskBuilder {
7068
return this
7169
}
7270

71+
/**
72+
* Set the max number of concurrent worker tasks.
73+
*
74+
* @example
75+
* ```ts
76+
* new WorkerTaskBuilder().name('my_worker_task').concurrency(5)
77+
* ```
78+
*/
79+
public concurrency(concurrency: number) {
80+
this.worker.concurrency = concurrency
81+
82+
return this
83+
}
84+
7385
/**
7486
* Set the handler of the worker task.
7587
*
@@ -208,25 +220,62 @@ export class WorkerTaskBuilder {
208220
return
209221
}
210222

223+
if (this.timers.length) {
224+
return
225+
}
226+
227+
const n = this.worker.concurrency ?? 1
228+
229+
for (let i = 0; i < n; i++) {
230+
this.spawn()
231+
}
232+
}
233+
234+
/**
235+
* Use spawn to force a worker instance to run.
236+
*/
237+
private spawn() {
211238
const intervalToRun =
212239
this.worker.options?.workerInterval ||
213240
Config.get(
214241
`queue.connections.${this.worker.connection}.workerInterval`,
215242
1000
216243
)
217244

245+
const timeoutMs =
246+
this.worker.options?.workerTimeoutMs ??
247+
Config.get(
248+
`queue.connections.${this.worker.connection}.workerTimeoutMs`,
249+
Parser.timeToMs('5m')
250+
)
251+
218252
const initialOffset = this.computeInitialOffset(intervalToRun)
219253

220-
this.worker.interval = setTimeout(async () => {
221-
if (!this.worker.isRunning) {
222-
this.worker.isRunning = true
254+
const loop = async () => {
255+
if (!this.worker.isRegistered) {
256+
return
257+
}
223258

224-
await this.run()
225-
this.worker.isRunning = false
259+
try {
260+
await Promise.race([
261+
this.run(),
262+
new Promise((resolve, reject) =>
263+
setTimeout(
264+
() => reject(new WorkerTimeoutException(this.worker.name)),
265+
timeoutMs
266+
)
267+
)
268+
])
269+
} catch (err) {
270+
Log.channelOrVanilla('exception').error(err)
271+
} finally {
272+
const delay = intervalToRun + this.computeJitter(intervalToRun)
273+
274+
this.timers.push(setTimeout(loop, delay))
226275
}
276+
}
227277

228-
this.scheduleNext(intervalToRun)
229-
}, initialOffset)
278+
this.timers.push(setTimeout(loop, initialOffset))
230279
}
231280

232281
/**
@@ -242,17 +291,11 @@ export class WorkerTaskBuilder {
242291
return
243292
}
244293

245-
if (!this.worker.interval) {
246-
return
247-
}
248-
249294
this.worker.isRegistered = false
250-
this.worker.isRunning = false
251295

252-
if (this.worker.interval) {
253-
clearTimeout(this.worker.interval)
254-
this.worker.interval = undefined
255-
}
296+
this.timers.forEach(t => clearTimeout(t))
297+
298+
this.timers = []
256299
}
257300

258301
/**
@@ -304,29 +347,4 @@ export class WorkerTaskBuilder {
304347

305348
return Math.floor(Math.random() * (max + 1))
306349
}
307-
308-
/**
309-
* Schedule the next worker task.
310-
*/
311-
private scheduleNext(baseMs: number) {
312-
if (!this.worker.isRegistered) {
313-
return
314-
}
315-
316-
const delay = baseMs + this.computeJitter(baseMs)
317-
318-
this.worker.interval = setTimeout(async () => {
319-
if (this.worker.isRunning) {
320-
return this.scheduleNext(baseMs)
321-
}
322-
323-
this.worker.isRunning = true
324-
325-
await this.run()
326-
327-
this.worker.isRunning = false
328-
329-
this.scheduleNext(baseMs)
330-
}, delay)
331-
}
332350
}

tests/unit/drivers/AwsSqsDriverTest.ts

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@ import { Is, Path } from '@athenna/common'
1111
import { EnvHelper } from '@athenna/config'
1212
import { LoggerProvider } from '@athenna/logger'
1313
import { BaseTest } from '#tests/helpers/BaseTest'
14-
import { Queue, QueueProvider, WorkerProvider } from '#src'
15-
import { Test, type Context, BeforeEach, AfterEach, Skip } from '@athenna/test'
14+
import { Queue, WorkerProvider, QueueProvider } from '#src'
15+
import { Test, type Context, BeforeEach, AfterEach, Skip, AfterAll } from '@athenna/test'
1616

1717
export class AwsSqsDriverTest extends BaseTest {
1818
@BeforeEach()
@@ -36,6 +36,25 @@ export class AwsSqsDriverTest extends BaseTest {
3636
Config.clear()
3737
}
3838

39+
@AfterAll()
40+
public async afterAll() {
41+
await Config.loadAll(Path.fixtures('config'))
42+
43+
new QueueProvider().register()
44+
new WorkerProvider().register()
45+
new LoggerProvider().register()
46+
47+
await Queue.connection('aws_sqs').truncate().catch()
48+
49+
await Queue.closeAll()
50+
51+
Queue.worker().close()
52+
53+
ioc.reconstruct()
54+
55+
Config.clear()
56+
}
57+
3958
@Test()
4059
public async shouldBeAbleToConnectToDriver({ assert }: Context) {
4160
Queue.connection('aws_sqs')
@@ -91,9 +110,11 @@ export class AwsSqsDriverTest extends BaseTest {
91110

92111
await queue.add({ hello: 'world' })
93112

94-
const isEmpty = await queue.isEmpty()
113+
const job = await queue.pop()
95114

96-
assert.isFalse(isEmpty)
115+
assert.containSubset(job, {
116+
data: { hello: 'world' }
117+
})
97118
}
98119

99120
@Test()

tests/unit/drivers/FakeDriverTest.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@ export class FakeDriverTest {
2323

2424
@AfterEach()
2525
public async afterEach() {
26+
await Queue.connection('fake').truncate()
2627
await Queue.closeAll()
28+
2729
ioc.reconstruct()
2830

2931
Config.clear()

tests/unit/drivers/MemoryDriverTest.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,10 @@ export class MemoryDriverTest {
2323

2424
@AfterEach()
2525
public async afterEach() {
26+
await Queue.connection('memory').truncate()
27+
await Queue.connection('memoryBackoff').truncate()
2628
await Queue.closeAll()
29+
2730
ioc.reconstruct()
2831

2932
Config.clear()

tests/unit/kernels/WorkerKernelTest.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -145,10 +145,9 @@ export class WorkerKernelTest {
145145

146146
await Sleep.for(20000).milliseconds().wait()
147147

148-
assert.isTrue(constants.RUN_MAP.helloWorker)
148+
assert.isTrue(constants.RUN_MAP.productWorker)
149149
assert.isTrue(constants.RUN_MAP.annotatedWorker)
150150
assert.isTrue(constants.RUN_MAP.decoratedWorker)
151-
assert.isTrue(constants.RUN_MAP.productWorker)
152151
}
153152

154153
@Test()

0 commit comments

Comments
 (0)