Skip to content

Commit 69d7c28

Browse files
committed
feat(handler): extends common exception handler
1 parent 3622a51 commit 69d7c28

11 files changed

Lines changed: 2009 additions & 1600 deletions

.github/workflows/cd.yml

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@ on:
88
jobs:
99
build:
1010
runs-on: ubuntu-latest
11+
permissions:
12+
contents: write
13+
id-token: write
1114
steps:
1215
- name: Checkout
1316
uses: actions/checkout@v2
@@ -32,8 +35,7 @@ jobs:
3235
id: release
3336

3437
- name: Publish to NPM Registry
35-
run: cd build && npm publish --access public
38+
run: cd build && npm publish --provenance --access public
3639
if: steps.release.outputs.released == 'true'
3740
env:
38-
NODE_AUTH_TOKEN: ${{ secrets.NPM_TOKEN }}
3941
name: Deploy

package-lock.json

Lines changed: 1736 additions & 1443 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@athenna/queue",
3-
"version": "5.18.0",
3+
"version": "5.19.0",
44
"description": "The Athenna queue handler.",
55
"license": "MIT",
66
"author": "João Lenon <lenon@athenna.io>",
@@ -55,20 +55,20 @@
5555
"#tests": "./tests/index.js"
5656
},
5757
"dependencies": {
58-
"@aws-sdk/client-sqs": "^3.859.0"
58+
"@aws-sdk/client-sqs": "^3.1009.0"
5959
},
6060
"devDependencies": {
61-
"@athenna/artisan": "^5.7.0",
62-
"@athenna/common": "^5.14.0",
63-
"@athenna/config": "^5.4.0",
64-
"@athenna/database": "^5.27.0",
61+
"@athenna/artisan": "^5.11.0",
62+
"@athenna/common": "^5.30.0",
63+
"@athenna/config": "^5.6.0",
64+
"@athenna/database": "^5.39.0",
6565
"@athenna/ioc": "^5.2.0",
66-
"@athenna/logger": "^5.7.0",
67-
"@athenna/test": "^5.5.0",
66+
"@athenna/logger": "^5.14.0",
67+
"@athenna/test": "^5.6.0",
6868
"@athenna/tsconfig": "^5.0.0",
6969
"@athenna/view": "^5.4.0",
70-
"@typescript-eslint/eslint-plugin": "^8.38.0",
71-
"@typescript-eslint/parser": "^8.38.0",
70+
"@typescript-eslint/eslint-plugin": "^8.57.0",
71+
"@typescript-eslint/parser": "^8.57.0",
7272
"better-sqlite3": "^10.1.0",
7373
"commitizen": "^4.3.1",
7474
"cz-conventional-changelog": "^3.3.0",

src/drivers/AwsSqsDriver.ts

Lines changed: 12 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,12 @@ import {
1717
ChangeMessageVisibilityCommand
1818
} from '@aws-sdk/client-sqs'
1919

20-
import { Log } from '@athenna/logger'
2120
import { createHash } from 'node:crypto'
2221
import { Driver } from '#src/drivers/Driver'
2322
import { Is, Options, Uuid } from '@athenna/common'
2423
import type { ConnectionOptions } from '#src/types'
2524
import { ConnectionFactory } from '#src/factories/ConnectionFactory'
25+
import { AwsSqsDriverExceptionHandler } from '#src/handlers/AwsSqsDriverExceptionHandler'
2626
import { NotFifoSqsQueueTypeException } from '#src/exceptions/NotFifoSqsQueueTypeException'
2727

2828
export class AwsSqsDriver extends Driver<SQSClient> {
@@ -39,7 +39,7 @@ export class AwsSqsDriver extends Driver<SQSClient> {
3939
/**
4040
* Convert milliseconds to seconds.
4141
*/
42-
private msToS(v: number) {
42+
public msToS(v: number) {
4343
const s = Math.ceil(v / 1000)
4444
return Math.max(0, Math.min(43200, s))
4545
}
@@ -430,57 +430,21 @@ export class AwsSqsDriver extends Driver<SQSClient> {
430430
this.msToS(this.noAckDelayMs + requeueJitterMs)
431431
)
432432
}
433-
} catch (err) {
434-
stopHeartbeat()
435-
436-
const receiveCount = Number(
437-
job.metadata.Attributes?.ApproximateReceiveCount ?? '1'
438-
)
439-
const attempts = Math.max(this.attempts - receiveCount, 0)
440-
const shouldRetry = attempts > 0
441-
442-
if (Config.is('worker.logger.prettifyException')) {
443-
Log.channelOrVanilla('exception').error(
444-
await err.toAthennaException().prettify()
445-
)
446-
} else {
447-
Log.channelOrVanilla('exception').error({
448-
msg: `failed to process job: ${err.message}`,
449-
queue: this.queueName,
450-
deadletter: this.deadletter,
451-
name: err.name,
452-
code: err.code,
453-
help: err.help,
454-
details: err.details,
455-
metadata: err.metadata,
456-
stack: err.stack,
457-
job
458-
})
459-
}
460-
461-
if (shouldRetry) {
462-
const delay = this.calculateBackoffDelay(job.attempts)
463-
464-
await this.changeJobVisibility(
465-
job.id,
466-
this.msToS(delay + requeueJitterMs)
467-
)
468-
469-
return
470-
}
471-
472-
if (this.deadletter) {
473-
await this.sendJobToDLQ(job)
474-
}
475-
476-
await this.ack(job.id)
433+
} catch (error) {
434+
await new AwsSqsDriverExceptionHandler().handle({
435+
job,
436+
error,
437+
driver: this,
438+
stopHeartbeat,
439+
requeueJitterMs
440+
})
477441
}
478442
}
479443

480444
/**
481445
* Send a job to the deadletter quue.
482446
*/
483-
private async sendJobToDLQ(job: any) {
447+
public async sendJobToDLQ(job: any) {
484448
if (Is.Object(job.data)) {
485449
job.data = JSON.stringify(job.data)
486450
}
@@ -503,7 +467,7 @@ export class AwsSqsDriver extends Driver<SQSClient> {
503467
/**
504468
* Change the job visibility values in SQS.
505469
*/
506-
private async changeJobVisibility(id: string, seconds: number) {
470+
public async changeJobVisibility(id: string, seconds: number) {
507471
const cmd = new ChangeMessageVisibilityCommand({
508472
QueueUrl: this.queueName,
509473
ReceiptHandle: id,

src/drivers/DatabaseDriver.ts

Lines changed: 9 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,13 @@
77
* file that was distributed with this source code.
88
*/
99

10-
import { Log } from '@athenna/logger'
1110
import { Config } from '@athenna/config'
1211
import { Driver } from '#src/drivers/Driver'
1312
import { Is, Options } from '@athenna/common'
1413
import type { ConnectionOptions } from '#src/types'
1514
import type { DatabaseImpl } from '@athenna/database'
1615
import { ConnectionFactory } from '#src/factories/ConnectionFactory'
16+
import { DatabaseDriverExceptionHandler } from '#src/handlers/DatabaseDriverExceptionHandler'
1717

1818
export class DatabaseDriver extends Driver<DatabaseImpl> {
1919
/**
@@ -229,12 +229,7 @@ export class DatabaseDriver extends Driver<DatabaseImpl> {
229229
* ```
230230
*/
231231
public async length() {
232-
const count = await this.client
233-
.table(this.table)
234-
.where('queue', this.queueName)
235-
.count()
236-
237-
return parseInt(count)
232+
return this.client.table(this.table).where('queue', this.queueName).count()
238233
}
239234

240235
/**
@@ -356,53 +351,13 @@ export class DatabaseDriver extends Driver<DatabaseImpl> {
356351
reservedUntil: job.reservedUntil
357352
})
358353
}
359-
} catch (err) {
360-
const shouldRetry = job.attempts > 0
361-
362-
if (Config.is('worker.logger.prettifyException')) {
363-
Log.channelOrVanilla('exception').error(
364-
await err.toAthennaException().prettify()
365-
)
366-
} else {
367-
Log.channelOrVanilla('exception').error({
368-
msg: `failed to process job: ${err.message}`,
369-
queue: this.queueName,
370-
deadletter: this.deadletter,
371-
name: err.name,
372-
code: err.code,
373-
help: err.help,
374-
details: err.details,
375-
metadata: err.metadata,
376-
stack: err.stack,
377-
job
378-
})
379-
}
380-
381-
if (!shouldRetry) {
382-
await this.ack(job.id)
383-
384-
if (this.deadletter) {
385-
await this.client.table(this.table).create({
386-
...job,
387-
queue: this.deadletter,
388-
reservedUntil: null,
389-
attempts: 0
390-
})
391-
}
392-
393-
return
394-
}
395-
396-
await this.client
397-
.table(this.table)
398-
.where('id', job.id)
399-
.update({
400-
reservedUntil: null,
401-
availableAt:
402-
Date.now() +
403-
this.calculateBackoffDelay(job.attempts) +
404-
requeueJitterMs
405-
})
354+
} catch (error) {
355+
await new DatabaseDriverExceptionHandler().handle({
356+
job,
357+
error,
358+
driver: this,
359+
requeueJitterMs
360+
})
406361
}
407362
}
408363
}

src/drivers/FakeDriver.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,14 @@ export class FakeDriver {
216216
*/
217217
public static async ack() {}
218218

219+
/**
220+
* Calculate the heartbeat delay. Used to define if job is still
221+
* running.
222+
*/
223+
public static calculateHeartbeatDelay() {
224+
return 0
225+
}
226+
219227
/**
220228
* Process the next job of the queue with a handler.
221229
*

src/drivers/MemoryDriver.ts

Lines changed: 8 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,11 @@
77
* file that was distributed with this source code.
88
*/
99

10-
import { Log } from '@athenna/logger'
1110
import { Driver } from '#src/drivers/Driver'
1211
import { Options, Uuid } from '@athenna/common'
1312
import type { ConnectionOptions } from '#src/types'
1413
import { ConnectionFactory } from '#src/factories/ConnectionFactory'
14+
import { MemoryDriverExceptionHandler } from '#src/handlers/MemoryDriverExceptionHandler'
1515

1616
export class MemoryDriver extends Driver {
1717
/**
@@ -280,47 +280,13 @@ export class MemoryDriver extends Driver {
280280
job.reservedUntil = null
281281
job.availableAt = Date.now() + this.noAckDelayMs + requeueJitterMs
282282
}
283-
} catch (err) {
284-
const shouldRetry = job.attempts > 0
285-
286-
job.reservedUntil = null
287-
288-
if (Config.is('worker.logger.prettifyException')) {
289-
Log.channelOrVanilla('exception').error(
290-
await err.toAthennaException().prettify()
291-
)
292-
} else {
293-
Log.channelOrVanilla('exception').error({
294-
msg: `failed to process job: ${err.message}`,
295-
queue: this.queueName,
296-
deadletter: this.deadletter,
297-
name: err.name,
298-
code: err.code,
299-
help: err.help,
300-
details: err.details,
301-
metadata: err.metadata,
302-
stack: err.stack,
303-
job
304-
})
305-
}
306-
307-
if (shouldRetry) {
308-
job.availableAt =
309-
Date.now() +
310-
this.calculateBackoffDelay(job.attempts) +
311-
requeueJitterMs
312-
313-
return
314-
}
315-
316-
await this.ack(job.id)
317-
318-
if (this.deadletter) {
319-
this.client.queues[this.deadletter].push({
320-
...job,
321-
attempts: 0
322-
})
323-
}
283+
} catch (error) {
284+
await new MemoryDriverExceptionHandler().handle({
285+
job,
286+
error,
287+
driver: this,
288+
requeueJitterMs
289+
})
324290
}
325291
}
326292
}

0 commit comments

Comments
 (0)