@@ -2,7 +2,7 @@ import Knex from 'knex'
22import { test } from '@japa/runner'
33import { Redis } from 'ioredis'
44import { MemoryAdapter } from './_mocks/memory_adapter.js'
5- import { RedisAdapter } from '../src/drivers/redis_adapter.js'
5+ import { redis , RedisAdapter } from '../src/drivers/redis_adapter.js'
66import { KnexAdapter } from '../src/drivers/knex_adapter.js'
77import { QueueSchemaService } from '../src/services/queue_schema.js'
88import { registerDriverTestSuite } from './_utils/register_driver_test_suite.js'
@@ -115,6 +115,68 @@ test.group('Adapter | Redis', (group) => {
115115 'deleteSchedule should be emitted in a single write window to avoid partial state'
116116 )
117117 } )
118+
119+ test ( 'completeJob should not delete a newer TTL dedup lock when Redis keyPrefix is disabled' , async ( {
120+ assert,
121+ } ) => {
122+ const redisOptions = {
123+ host : process . env . REDIS_HOST || 'localhost' ,
124+ port : Number . parseInt ( process . env . REDIS_PORT || '6379' , 10 ) ,
125+ db : 15 ,
126+ keyPrefix : '' ,
127+ }
128+ const inspectorConnection = new Redis ( redisOptions )
129+ const adapter = redis ( redisOptions ) ( )
130+ const queue = 'raw-ttl-clean-queue'
131+ const dedupId = 'TestJob::raw-ttl-clean-1'
132+ const dedupKey = `jobs::${ queue } ::dedup::${ dedupId } `
133+
134+ await connection . flushdb ( )
135+
136+ try {
137+ await adapter . pushOn ( queue , {
138+ id : 'raw-ttl-clean-uuid-1' ,
139+ name : 'TestJob' ,
140+ payload : { n : 1 } ,
141+ attempts : 0 ,
142+ dedup : { id : dedupId , ttl : 80 } ,
143+ } )
144+
145+ const first = await adapter . popFrom ( queue )
146+ assert . equal ( first ! . id , 'raw-ttl-clean-uuid-1' )
147+
148+ await new Promise ( ( r ) => setTimeout ( r , 150 ) )
149+
150+ const second = await adapter . pushOn ( queue , {
151+ id : 'raw-ttl-clean-uuid-2' ,
152+ name : 'TestJob' ,
153+ payload : { n : 2 } ,
154+ attempts : 0 ,
155+ dedup : { id : dedupId , ttl : 10_000 } ,
156+ } )
157+ assert . equal ( second && typeof second === 'object' && second . outcome , 'added' )
158+ assert . equal ( await inspectorConnection . get ( dedupKey ) , 'raw-ttl-clean-uuid-2' )
159+
160+ await adapter . completeJob ( first ! . id , queue , true )
161+
162+ assert . equal ( await inspectorConnection . get ( dedupKey ) , 'raw-ttl-clean-uuid-2' )
163+
164+ const third = await adapter . pushOn ( queue , {
165+ id : 'raw-ttl-clean-uuid-3' ,
166+ name : 'TestJob' ,
167+ payload : { n : 3 } ,
168+ attempts : 0 ,
169+ dedup : { id : dedupId , ttl : 10_000 } ,
170+ } )
171+
172+ assert . equal ( third && typeof third === 'object' && third . outcome , 'skipped' )
173+ assert . equal ( third && typeof third === 'object' && third . jobId , 'raw-ttl-clean-uuid-2' )
174+ } finally {
175+ await connection . flushdb ( )
176+ await adapter . destroy ( )
177+ await inspectorConnection . quit ( )
178+ }
179+ } )
118180} )
119181
120182test . group ( 'Adapter | Knex (SQLite)' , ( group ) => {
@@ -321,4 +383,104 @@ test.group('Adapter | Knex (PostgreSQL)', (group) => {
321383 `Expected a single schedule DELETE query, got ${ scheduleDeleteQueries . length } `
322384 )
323385 } )
386+
387+ test ( 'concurrent dedup pushes should not both insert when no existing row is lockable' , async ( {
388+ assert,
389+ } ) => {
390+ const dedupId = 'TestJob::pg-concurrent-missing-row'
391+ const barrierFunction = 'queue_jobs_test_dedup_insert_barrier'
392+ const barrierTrigger = 'queue_jobs_test_dedup_insert_barrier_trigger'
393+
394+ await connection . raw ( `
395+ CREATE OR REPLACE FUNCTION ${ barrierFunction } ()
396+ RETURNS trigger AS $$
397+ DECLARE
398+ attempts integer := 0;
399+ BEGIN
400+ IF NEW.dedup_id = '${ dedupId } ' THEN
401+ IF pg_try_advisory_lock(90312001) THEN
402+ LOOP
403+ EXIT WHEN NOT pg_try_advisory_lock(90312002);
404+ PERFORM pg_advisory_unlock(90312002);
405+ attempts := attempts + 1;
406+ IF attempts > 1000 THEN
407+ RAISE EXCEPTION 'timed out waiting for concurrent insert';
408+ END IF;
409+ PERFORM pg_sleep(0.01);
410+ END LOOP;
411+ ELSE
412+ PERFORM pg_advisory_lock(90312002);
413+ END IF;
414+ END IF;
415+
416+ RETURN NEW;
417+ END;
418+ $$ LANGUAGE plpgsql
419+ ` )
420+
421+ await connection . raw ( `
422+ CREATE TRIGGER ${ barrierTrigger }
423+ BEFORE INSERT ON ${ tableName }
424+ FOR EACH ROW
425+ EXECUTE FUNCTION ${ barrierFunction } ()
426+ ` )
427+
428+ const createConnection = ( ) =>
429+ Knex ( {
430+ client : 'pg' ,
431+ connection : {
432+ host : process . env . PG_HOST || 'localhost' ,
433+ port : Number . parseInt ( process . env . PG_PORT || '5432' , 10 ) ,
434+ user : process . env . PG_USER || 'postgres' ,
435+ password : process . env . PG_PASSWORD || 'postgres' ,
436+ database : process . env . PG_DATABASE || 'queue_test' ,
437+ } ,
438+ pool : { min : 1 , max : 1 } ,
439+ } )
440+
441+ const connectionA = createConnection ( )
442+ const connectionB = createConnection ( )
443+ const adapterA = new KnexAdapter ( { connection : connectionA , tableName, schedulesTableName } )
444+ const adapterB = new KnexAdapter ( { connection : connectionB , tableName, schedulesTableName } )
445+
446+ try {
447+ const results = await Promise . all ( [
448+ adapterA . pushOn ( 'pg-dedup-race-queue' , {
449+ id : 'pg-dedup-race-uuid-1' ,
450+ name : 'TestJob' ,
451+ payload : { n : 1 } ,
452+ attempts : 0 ,
453+ dedup : { id : dedupId } ,
454+ } ) ,
455+ adapterB . pushOn ( 'pg-dedup-race-queue' , {
456+ id : 'pg-dedup-race-uuid-2' ,
457+ name : 'TestJob' ,
458+ payload : { n : 2 } ,
459+ attempts : 0 ,
460+ dedup : { id : dedupId } ,
461+ } ) ,
462+ ] )
463+
464+ const outcomes = results . map ( ( result ) =>
465+ result && typeof result === 'object' ? result . outcome : undefined
466+ )
467+ assert . equal ( outcomes . filter ( ( outcome ) => outcome === 'added' ) . length , 1 )
468+ assert . equal ( outcomes . filter ( ( outcome ) => outcome === 'skipped' ) . length , 1 )
469+
470+ const count = await connection ( tableName )
471+ . where ( 'queue' , 'pg-dedup-race-queue' )
472+ . where ( 'dedup_id' , dedupId )
473+ . count < { total : string } [ ] > ( '* as total' )
474+ . first ( )
475+
476+ assert . equal ( Number ( count ?. total ) , 1 )
477+ } finally {
478+ await adapterA . destroy ( )
479+ await adapterB . destroy ( )
480+ await connectionA . destroy ( )
481+ await connectionB . destroy ( )
482+ await connection . raw ( `DROP TRIGGER IF EXISTS ${ barrierTrigger } ON ${ tableName } ` )
483+ await connection . raw ( `DROP FUNCTION IF EXISTS ${ barrierFunction } ()` )
484+ }
485+ } )
324486} )
0 commit comments