diff --git a/pkgs/edge-worker/project.json b/pkgs/edge-worker/project.json index 07a908abd..fdf9ea5d4 100644 --- a/pkgs/edge-worker/project.json +++ b/pkgs/edge-worker/project.json @@ -147,6 +147,13 @@ "parallel": false } }, + "test:node": { + "executor": "nx:run-commands", + "options": { + "command": "pnpm vitest run tests/unit/platform/ProcessPlatformAdapter.test.ts", + "cwd": "pkgs/edge-worker" + } + }, "test:integration": { "dependsOn": ["db:ensure", "^build"], "executor": "nx:run-commands", diff --git a/pkgs/edge-worker/src/platform/ProcessPlatformAdapter.ts b/pkgs/edge-worker/src/platform/ProcessPlatformAdapter.ts new file mode 100644 index 000000000..7749a91e2 --- /dev/null +++ b/pkgs/edge-worker/src/platform/ProcessPlatformAdapter.ts @@ -0,0 +1,166 @@ +import type postgres from 'postgres'; +import type { SupabaseClient } from '@supabase/supabase-js'; +import type { SupabaseResources } from '@pgflow/dsl/supabase'; +import type { CreateWorkerFn, Logger, PlatformAdapter } from './types.js'; +import type { Worker } from '../core/Worker.js'; +import { createServiceSupabaseClient } from '../core/supabase-utils.js'; +import { Queries } from '../core/Queries.js'; +import { isLocalSupabaseEnv } from '../shared/localDetection.js'; +import { createLoggingFactory } from './logging.js'; +import { resolveConnectionString, resolveSqlConnection } from './resolveConnection.js'; +import { getProcessDeps, type ProcessDeps, type ProcessSignal } from './processDeps.js'; + +interface ProcessEnv extends Record { + SUPABASE_URL: string; + SUPABASE_SERVICE_ROLE_KEY: string; + WORKER_NAME?: string; + DATABASE_URL?: string; + EDGE_WORKER_DB_URL?: string; + EDGE_WORKER_LOG_LEVEL?: string; +} + +type ProcessAdapterOptions = { + sql?: postgres.Sql; + connectionString?: string; + maxPgConnections?: number; +}; + +export class ProcessPlatformAdapter implements PlatformAdapter { + private readonly deps: ProcessDeps; + private readonly logger: Logger; + private readonly loggingFactory: ReturnType; + private readonly abortController = new AbortController(); + private readonly validatedEnv: ProcessEnv; + private readonly _connectionString: string | undefined; + private readonly _platformResources: SupabaseResources; + private readonly ownsSql: boolean; + private readonly queries: Queries; + private worker: Worker | null = null; + private workerId: string | null = null; + private shutdownStarted = false; + + constructor( + options?: ProcessAdapterOptions, + deps: ProcessDeps = getProcessDeps() + ) { + this.deps = deps; + this.assertProcessEnv(deps.env); + this.validatedEnv = deps.env; + this._connectionString = resolveConnectionString(this.validatedEnv, { + hasSql: !!options?.sql, + connectionString: options?.connectionString, + }); + this.ownsSql = !options?.sql; + this.loggingFactory = createLoggingFactory(this.validatedEnv); + this.logger = this.loggingFactory.createLogger('ProcessPlatformAdapter'); + this._platformResources = { + sql: resolveSqlConnection(this.validatedEnv, options), + supabase: createServiceSupabaseClient(this.validatedEnv), + }; + this.queries = new Queries(this._platformResources.sql); + } + + async startWorker(createWorkerFn: CreateWorkerFn): Promise { + const workerName = this.validatedEnv.WORKER_NAME || 'pgflow-worker'; + const workerId = this.deps.randomUUID(); + + this.workerId = workerId; + this.loggingFactory.setWorkerId(workerId); + this.loggingFactory.setWorkerName(workerName); + this.registerSignalHandlers(); + + this.worker = createWorkerFn(this.loggingFactory.createLogger); + await this.worker.startOnlyOnce({ + edgeFunctionName: workerName, + workerId, + startMode: 'process', + }); + } + + async stopWorker(): Promise { + this.requestShutdown(); + + try { + if (this.worker) { + await this.worker.stop(); + } + if (this.workerId) { + await this.queries.markWorkerStopped(this.workerId); + } + } finally { + if (this.ownsSql) { + await this._platformResources.sql.end(); + } + } + } + + requestShutdown(): void { + this.abortController.abort(); + } + + createLogger(module: string): Logger { + return this.loggingFactory.createLogger(module); + } + + get connectionString(): string | undefined { + return this._connectionString; + } + + get env(): Record { + return this.validatedEnv; + } + + get shutdownSignal(): AbortSignal { + return this.abortController.signal; + } + + get platformResources(): SupabaseResources { + return this._platformResources; + } + + get isLocalEnvironment(): boolean { + return isLocalSupabaseEnv(this.validatedEnv); + } + + get sql(): postgres.Sql { + return this._platformResources.sql; + } + + get supabase(): SupabaseClient { + return this._platformResources.supabase; + } + + private registerSignalHandlers(): void { + for (const signal of ['SIGTERM', 'SIGINT', 'SIGQUIT'] satisfies ProcessSignal[]) { + this.deps.onSignal(signal, () => this.handleSignal()); + } + } + + private async handleSignal(): Promise { + if (this.shutdownStarted) { + this.deps.exit(1); + } + + this.shutdownStarted = true; + + try { + await this.stopWorker(); + } catch (error) { + this.logger.error('Process worker shutdown failed', error); + this.deps.setExitCode(1); + this.deps.exit(1); + } + + this.deps.setExitCode(0); + this.deps.exit(0); + } + + private assertProcessEnv(env: Record): asserts env is ProcessEnv { + const required = ['SUPABASE_URL', 'SUPABASE_SERVICE_ROLE_KEY']; + const missing = required.filter((key) => !env[key]); + + if (missing.length > 0) { + throw new Error(`Missing required environment variables: ${missing.join(', ')}`); + } + } +} diff --git a/pkgs/edge-worker/src/platform/createAdapter.ts b/pkgs/edge-worker/src/platform/createAdapter.ts index 30741ee0b..185395c81 100644 --- a/pkgs/edge-worker/src/platform/createAdapter.ts +++ b/pkgs/edge-worker/src/platform/createAdapter.ts @@ -1,4 +1,5 @@ import type { PlatformAdapter } from './types.js'; +import { ProcessPlatformAdapter } from './ProcessPlatformAdapter.js'; import { SupabasePlatformAdapter } from './SupabasePlatformAdapter.js'; import type { SupabaseResources } from '@pgflow/dsl/supabase'; import type postgres from 'postgres'; @@ -21,11 +22,17 @@ export function createAdapter(options?: AdapterOptions): PlatformAdapter; + onSignal: (signal: ProcessSignal, handler: () => void | Promise) => void; + exit: (code: number) => never; + setExitCode: (code: number) => void; + randomUUID: () => string; +}; + +type ProcessLike = { + env?: Record; + on?: (signal: ProcessSignal, handler: () => void | Promise) => void; + exit?: (code: number) => never; + exitCode?: number; +}; + +type CryptoLike = { + randomUUID?: () => string; +}; + +export function getProcessDeps(): ProcessDeps { + const processLike = (globalThis as { process?: ProcessLike }).process; + const cryptoLike = globalThis.crypto as CryptoLike | undefined; + + if (!processLike?.env || !processLike.on || !processLike.exit || !cryptoLike?.randomUUID) { + throw new Error('Process runtime is not available'); + } + + return { + env: processLike.env, + onSignal: (signal, handler) => processLike.on?.(signal, handler), + exit: (code) => processLike.exit!(code), + setExitCode: (code) => { + processLike.exitCode = code; + }, + randomUUID: () => cryptoLike.randomUUID!(), + }; +} diff --git a/pkgs/edge-worker/src/platform/resolveConnection.ts b/pkgs/edge-worker/src/platform/resolveConnection.ts index 458f5068d..44b545eed 100644 --- a/pkgs/edge-worker/src/platform/resolveConnection.ts +++ b/pkgs/edge-worker/src/platform/resolveConnection.ts @@ -13,6 +13,7 @@ export const DOCKER_TRANSACTION_POOLER_URL = export interface ConnectionEnv extends Record { SUPABASE_ANON_KEY?: string; SUPABASE_SERVICE_ROLE_KEY?: string; + DATABASE_URL?: string; EDGE_WORKER_DB_URL?: string; } @@ -29,7 +30,7 @@ export interface SqlConnectionOptions { /** * Resolves the connection string based on priority: - * config.sql -> config.connectionString -> EDGE_WORKER_DB_URL -> local fallback + * config.sql -> config.connectionString -> DATABASE_URL -> EDGE_WORKER_DB_URL -> local fallback */ export function resolveConnectionString( env: ConnectionEnv, @@ -42,12 +43,13 @@ export function resolveConnectionString( isLocal && !options?.hasSql && !options?.connectionString && + !env.DATABASE_URL && !env.EDGE_WORKER_DB_URL ) { return DOCKER_TRANSACTION_POOLER_URL; } - return options?.connectionString || env.EDGE_WORKER_DB_URL; + return options?.connectionString || env.DATABASE_URL || env.EDGE_WORKER_DB_URL; } /** @@ -60,7 +62,7 @@ export function assertConnectionAvailable( if (!hasSql && !connectionString) { throw new Error( 'No database connection available. Provide one of: ' + - 'config.sql, config.connectionString, or EDGE_WORKER_DB_URL environment variable.' + 'config.sql, config.connectionString, DATABASE_URL, or EDGE_WORKER_DB_URL environment variable.' ); } } @@ -69,8 +71,9 @@ export function assertConnectionAvailable( * Resolves and creates the SQL connection based on priority: * 1. config.sql - User-provided SQL client (highest priority) * 2. config.connectionString - User-provided connection string - * 3. EDGE_WORKER_DB_URL - Environment variable - * 4. Local Supabase detection + Docker URL (lowest priority) + * 3. DATABASE_URL - Environment variable + * 4. EDGE_WORKER_DB_URL - Environment variable + * 5. Local Supabase detection + Docker URL (lowest priority) * * @throws Error if no connection source is available */ @@ -90,18 +93,23 @@ export function resolveSqlConnection( return postgres(options.connectionString, { prepare: false, max }); } - // 3. EDGE_WORKER_DB_URL + // 3. DATABASE_URL + if (env.DATABASE_URL) { + return postgres(env.DATABASE_URL, { prepare: false, max }); + } + + // 4. EDGE_WORKER_DB_URL if (env.EDGE_WORKER_DB_URL) { return postgres(env.EDGE_WORKER_DB_URL, { prepare: false, max }); } - // 4. Local Supabase detection + docker URL + // 5. Local Supabase detection + docker URL if (isLocalSupabaseEnv(env)) { return postgres(DOCKER_TRANSACTION_POOLER_URL, { prepare: false, max }); } throw new Error( 'No database connection available. Provide one of: ' + - 'config.sql, config.connectionString, or EDGE_WORKER_DB_URL environment variable.' + 'config.sql, config.connectionString, DATABASE_URL, or EDGE_WORKER_DB_URL environment variable.' ); } diff --git a/pkgs/edge-worker/src/types/currentPlatform.ts b/pkgs/edge-worker/src/types/currentPlatform.ts index aaf0125d0..d99913264 100644 --- a/pkgs/edge-worker/src/types/currentPlatform.ts +++ b/pkgs/edge-worker/src/types/currentPlatform.ts @@ -5,7 +5,7 @@ * made configurable or determined by build-time configuration. */ -import type { SupabaseResources, SupabaseEnv } from '@pgflow/dsl/supabase'; +import type { SupabaseResources } from '@pgflow/dsl/supabase'; import type { BaseContext } from '@pgflow/dsl'; /** @@ -18,9 +18,9 @@ export type CurrentPlatformResources = SupabaseResources; * The environment type for the current platform. * This is hardcoded to Supabase for MVP but can be made configurable later. */ -export type CurrentPlatformEnv = SupabaseEnv; +export type CurrentPlatformEnv = Record; /** * All resources available to flows (base context + platform resources) */ -export type AvailableResources = BaseContext & CurrentPlatformResources; \ No newline at end of file +export type AvailableResources = BaseContext & CurrentPlatformResources; diff --git a/pkgs/edge-worker/tests/unit/platform/ProcessPlatformAdapter.test.ts b/pkgs/edge-worker/tests/unit/platform/ProcessPlatformAdapter.test.ts new file mode 100644 index 000000000..39bf73c36 --- /dev/null +++ b/pkgs/edge-worker/tests/unit/platform/ProcessPlatformAdapter.test.ts @@ -0,0 +1,213 @@ +import { + assertEquals, + assertRejects, + assertStringIncludes, + assertThrows, +} from '@std/assert'; +import { ProcessPlatformAdapter } from '../../../src/platform/ProcessPlatformAdapter.ts'; +import type { ProcessDeps, ProcessSignal } from '../../../src/platform/processDeps.ts'; +import type postgres from 'postgres'; + +type SpyFn = ((...args: TArgs) => TResult) & { + calls: TArgs[]; + implementation: (...args: TArgs) => TResult; +}; + +function createSpy( + implementation: (...args: TArgs) => TResult +): SpyFn { + const spy = ((...args: TArgs) => { + spy.calls.push(args); + return spy.implementation(...args); + }) as SpyFn; + spy.calls = []; + spy.implementation = implementation; + return spy; +} + +type SqlStub = postgres.Sql & { + calls: string[]; + end: SpyFn<[], Promise>; +}; + +type WorkerStub = { + startOnlyOnce: SpyFn<[unknown], Promise>; + stop: SpyFn<[], Promise>; +}; + +function createDeps(env: Record = {}) { + const handlers = new Map void | Promise>(); + const exit = createSpy<[number], never>((code) => { + throw new Error(`exit:${code}`); + }); + const deps: ProcessDeps = { + env, + onSignal: (signal, handler) => { + handlers.set(signal, handler); + }, + exit: exit as unknown as (code: number) => never, + setExitCode: createSpy<[number], void>(() => undefined), + randomUUID: createSpy<[], string>(() => '00000000-0000-4000-8000-000000000001'), + }; + + return { deps, handlers, exit }; +} + +function createSqlStub(events?: string[]): SqlStub { + const sql = ((strings: TemplateStringsArray, ...values: unknown[]) => { + if (String.raw({ raw: strings }, ...values.map(String)).includes('pgflow.mark_worker_stopped')) { + events?.push('markWorkerStopped'); + } + sql.calls.push(String.raw({ raw: strings }, ...values.map(String))); + return Promise.resolve([]); + }) as SqlStub; + sql.calls = []; + sql.end = createSpy<[], Promise>(() => Promise.resolve()); + return sql; +} + +function createWorkerStub(): WorkerStub { + return { + startOnlyOnce: createSpy<[unknown], Promise>(() => Promise.resolve()), + stop: createSpy<[], Promise>(() => Promise.resolve()), + }; +} + +function validEnv(overrides: Record = {}) { + return { + SUPABASE_URL: 'https://example.supabase.co', + SUPABASE_SERVICE_ROLE_KEY: 'service-role-key', + DATABASE_URL: 'postgresql://user:pass@localhost:5432/postgres', + ...overrides, + }; +} + +Deno.test('ProcessPlatformAdapter throws when SUPABASE_URL is missing', () => { + const { deps } = createDeps(validEnv({ SUPABASE_URL: undefined })); + + assertThrows(() => new ProcessPlatformAdapter(undefined, deps), Error, 'SUPABASE_URL'); +}); + +Deno.test('ProcessPlatformAdapter throws when SUPABASE_SERVICE_ROLE_KEY is missing', () => { + const { deps } = createDeps(validEnv({ SUPABASE_SERVICE_ROLE_KEY: undefined })); + + assertThrows( + () => new ProcessPlatformAdapter(undefined, deps), + Error, + 'SUPABASE_SERVICE_ROLE_KEY' + ); +}); + +Deno.test('ProcessPlatformAdapter throws when no database source is available', () => { + const { deps } = createDeps(validEnv({ DATABASE_URL: undefined })); + + assertThrows( + () => new ProcessPlatformAdapter(undefined, deps), + Error, + 'No database connection available' + ); +}); + +Deno.test('ProcessPlatformAdapter starts immediately with process start mode and generated worker id', async () => { + const { deps } = createDeps(validEnv({ WORKER_NAME: 'emails' })); + const sql = createSqlStub(); + const worker = createWorkerStub(); + const adapter = new ProcessPlatformAdapter({ sql }, deps); + + await adapter.startWorker(() => worker as never); + + assertEquals(worker.startOnlyOnce.calls, [[{ + edgeFunctionName: 'emails', + workerId: '00000000-0000-4000-8000-000000000001', + startMode: 'process', + }]]); +}); + +Deno.test('ProcessPlatformAdapter defaults WORKER_NAME to pgflow-worker', async () => { + const { deps } = createDeps(validEnv()); + const sql = createSqlStub(); + const worker = createWorkerStub(); + const adapter = new ProcessPlatformAdapter({ sql }, deps); + + await adapter.startWorker(() => worker as never); + + assertEquals( + (worker.startOnlyOnce.calls[0]?.[0] as { edgeFunctionName?: string }).edgeFunctionName, + 'pgflow-worker' + ); +}); + +Deno.test('ProcessPlatformAdapter manual stop drains and keeps caller-provided sql open without exiting', async () => { + const { deps, exit } = createDeps(validEnv()); + const sql = createSqlStub(); + const worker = createWorkerStub(); + const adapter = new ProcessPlatformAdapter({ sql }, deps); + + await adapter.startWorker(() => worker as never); + await adapter.stopWorker(); + + assertEquals(worker.stop.calls.length, 1); + assertStringIncludes(sql.calls.at(-1) ?? '', 'pgflow.mark_worker_stopped'); + assertEquals(sql.end.calls.length, 0); + assertEquals(exit.calls.length, 0); +}); + +Deno.test('ProcessPlatformAdapter manual stop marks stopped after drain', async () => { + const { deps } = createDeps(validEnv()); + const events: string[] = []; + const sql = createSqlStub(events); + const worker = createWorkerStub(); + worker.stop.implementation = () => { + events.push('worker.stop'); + return Promise.resolve(); + }; + const adapter = new ProcessPlatformAdapter({ sql }, deps); + + await adapter.startWorker(() => worker as never); + await adapter.stopWorker(); + + assertEquals(events, ['worker.stop', 'markWorkerStopped']); +}); + +Deno.test('ProcessPlatformAdapter signal-triggered stop exits with zero after drain', async () => { + const { deps, handlers, exit } = createDeps(validEnv()); + const sql = createSqlStub(); + const worker = createWorkerStub(); + const adapter = new ProcessPlatformAdapter({ sql }, deps); + + await adapter.startWorker(() => worker as never); + await assertRejects(async () => await handlers.get('SIGTERM')?.(), Error, 'exit:0'); + + assertEquals(worker.stop.calls.length, 1); + assertEquals((deps.setExitCode as SpyFn<[number], void>).calls, [[0]]); + assertEquals(exit.calls, [[0]]); +}); + +Deno.test('ProcessPlatformAdapter second signal exits immediately with non-zero code', async () => { + const { deps, handlers, exit } = createDeps(validEnv()); + const sql = createSqlStub(); + const worker = createWorkerStub(); + worker.stop.implementation = () => new Promise(() => undefined); + const adapter = new ProcessPlatformAdapter({ sql }, deps); + + await adapter.startWorker(() => worker as never); + const firstShutdown = handlers.get('SIGINT')?.(); + + await assertRejects(async () => await handlers.get('SIGINT')?.(), Error, 'exit:1'); + void firstShutdown; + assertEquals(exit.calls, [[1]]); +}); + +Deno.test('ProcessPlatformAdapter stop failure exits with non-zero code for signal shutdown', async () => { + const { deps, handlers, exit } = createDeps(validEnv()); + const sql = createSqlStub(); + const worker = createWorkerStub(); + worker.stop.implementation = () => Promise.reject(new Error('drain failed')); + const adapter = new ProcessPlatformAdapter({ sql }, deps); + + await adapter.startWorker(() => worker as never); + await assertRejects(async () => await handlers.get('SIGQUIT')?.(), Error, 'exit:1'); + + assertEquals((deps.setExitCode as SpyFn<[number], void>).calls, [[1]]); + assertEquals(exit.calls, [[1]]); +});