From a888144f5901a6985c826b7f146d235ae8ba9b08 Mon Sep 17 00:00:00 2001 From: Joe Thom Date: Tue, 5 May 2026 02:45:39 -0700 Subject: [PATCH 1/2] fix(drizzle-kit): limit prepared postgres query concurrency --- drizzle-kit/src/api.ts | 62 +++++++++++++++++++++++---- drizzle-kit/tests/api.test.ts | 80 +++++++++++++++++++++++++++++++++++ 2 files changed, 133 insertions(+), 9 deletions(-) create mode 100644 drizzle-kit/tests/api.test.ts diff --git a/drizzle-kit/src/api.ts b/drizzle-kit/src/api.ts index 70eafa9540..edf18dd518 100644 --- a/drizzle-kit/src/api.ts +++ b/drizzle-kit/src/api.ts @@ -33,11 +33,49 @@ export type DrizzlePgDB = DB & { proxy: Proxy; migrate: (config: string | MigrationConfig) => Promise; }; +export type PreparePgDBOptions = { + queryConcurrency?: number; +}; export type DrizzlePgDBIntrospectSchema = Omit< PgSchemaKit, 'internal' >; +function createConcurrencyLimiter(concurrency?: number) { + if (concurrency === undefined || concurrency < 1) { + return (fn: () => Promise) => fn(); + } + + let activeCount = 0; + const queue: Array<() => void> = []; + + const runNext = () => { + if (activeCount >= concurrency) return; + + const next = queue.shift(); + if (!next) return; + + activeCount += 1; + next(); + }; + + return (fn: () => Promise) => { + return new Promise((resolve, reject) => { + queue.push(() => { + Promise.resolve() + .then(fn) + .then(resolve, reject) + .finally(() => { + activeCount -= 1; + runNext(); + }); + }); + + runNext(); + }); + }; +} + export const introspectPgDB = async ( db: DrizzlePgDB, filters: string[], @@ -86,6 +124,7 @@ export const introspectPgDB = async ( export const preparePgDB = async ( pool: import('pg').Pool | import('pg').PoolClient, + options: PreparePgDBOptions = {}, ): Promise< DrizzlePgDB > => { @@ -119,22 +158,27 @@ export const preparePgDB = async ( const migrateFn = async (config: MigrationConfig) => { return migrate(db, config); }; + const limitQuery = createConcurrencyLimiter(options.queryConcurrency); const query = async (sql: string, params?: any[]) => { - const result = await pool.query({ - text: sql, - values: params ?? [], - types, + const result = await limitQuery(() => { + return pool.query({ + text: sql, + values: params ?? [], + types, + }); }); return result.rows; }; const proxy: Proxy = async (params: ProxyParams) => { - const result = await pool.query({ - text: params.sql, - values: params.params, - ...(params.mode === 'array' && { rowMode: 'array' }), - types, + const result = await limitQuery(() => { + return pool.query({ + text: params.sql, + values: params.params, + ...(params.mode === 'array' && { rowMode: 'array' }), + types, + }); }); return result.rows; }; diff --git a/drizzle-kit/tests/api.test.ts b/drizzle-kit/tests/api.test.ts new file mode 100644 index 0000000000..05f624d74b --- /dev/null +++ b/drizzle-kit/tests/api.test.ts @@ -0,0 +1,80 @@ +import { describe, expect, test, vi } from 'vitest'; +import { preparePgDB } from '../src/api'; + +vi.mock('pg', () => ({ + default: { + types: { + builtins: { + DATE: 1082, + INTERVAL: 1186, + TIMESTAMP: 1114, + TIMESTAMPTZ: 1184, + }, + getTypeParser: vi.fn(() => (value: unknown) => value), + }, + }, +})); + +vi.mock('drizzle-orm/node-postgres', () => ({ + drizzle: vi.fn().mockReturnValue({}), +})); + +vi.mock('drizzle-orm/node-postgres/migrator', () => ({ + migrate: vi.fn(), +})); + +function createObservedPool() { + let activeQueries = 0; + let maxActiveQueries = 0; + + const query = vi.fn(async (input: { text: string }) => { + activeQueries += 1; + maxActiveQueries = Math.max(maxActiveQueries, activeQueries); + + await new Promise((resolve) => setTimeout(resolve, 10)); + + activeQueries -= 1; + return { rows: [input.text] }; + }); + + return { + pool: { query }, + query, + getMaxActiveQueries: () => maxActiveQueries, + }; +} + +describe('preparePgDB', () => { + test('does not limit query concurrency by default', async () => { + const observed = createObservedPool(); + const db = await preparePgDB(observed.pool as any); + + await Promise.all([ + db.query('select 1'), + db.query('select 2'), + db.query('select 3'), + db.query('select 4'), + ]); + + expect(observed.query).toHaveBeenCalledTimes(4); + expect(observed.getMaxActiveQueries()).toBe(4); + }); + + test('limits query and proxy calls with queryConcurrency', async () => { + const observed = createObservedPool(); + const db = await preparePgDB(observed.pool as any, { + queryConcurrency: 2, + }); + + await Promise.all([ + db.query('select 1'), + db.query('select 2'), + db.query('select 3'), + db.proxy({ mode: 'array', params: [], sql: 'select 4' }), + db.proxy({ mode: 'object', params: [], sql: 'select 5' }), + ]); + + expect(observed.query).toHaveBeenCalledTimes(5); + expect(observed.getMaxActiveQueries()).toBe(2); + }); +}); From ef612ff3d84c11c6ef3e51dd25903b5ac160d3d2 Mon Sep 17 00:00:00 2001 From: Joe Thom Date: Tue, 5 May 2026 06:24:55 -0700 Subject: [PATCH 2/2] fix(drizzle-kit): reject invalid query concurrency --- drizzle-kit/src/api.ts | 6 +++++- drizzle-kit/tests/api.test.ts | 14 ++++++++++++++ 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/drizzle-kit/src/api.ts b/drizzle-kit/src/api.ts index edf18dd518..70960756a9 100644 --- a/drizzle-kit/src/api.ts +++ b/drizzle-kit/src/api.ts @@ -42,10 +42,14 @@ export type DrizzlePgDBIntrospectSchema = Omit< >; function createConcurrencyLimiter(concurrency?: number) { - if (concurrency === undefined || concurrency < 1) { + if (concurrency === undefined) { return (fn: () => Promise) => fn(); } + if (!Number.isInteger(concurrency) || concurrency < 1) { + throw new RangeError('queryConcurrency must be a positive integer'); + } + let activeCount = 0; const queue: Array<() => void> = []; diff --git a/drizzle-kit/tests/api.test.ts b/drizzle-kit/tests/api.test.ts index 05f624d74b..1afcfca057 100644 --- a/drizzle-kit/tests/api.test.ts +++ b/drizzle-kit/tests/api.test.ts @@ -77,4 +77,18 @@ describe('preparePgDB', () => { expect(observed.query).toHaveBeenCalledTimes(5); expect(observed.getMaxActiveQueries()).toBe(2); }); + + test('rejects invalid queryConcurrency values', async () => { + const observed = createObservedPool(); + + await expect( + preparePgDB(observed.pool as any, { queryConcurrency: 0 }), + ).rejects.toThrow('queryConcurrency must be a positive integer'); + await expect( + preparePgDB(observed.pool as any, { queryConcurrency: -1 }), + ).rejects.toThrow('queryConcurrency must be a positive integer'); + await expect( + preparePgDB(observed.pool as any, { queryConcurrency: 1.5 }), + ).rejects.toThrow('queryConcurrency must be a positive integer'); + }); });