Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions pkgs/edge-worker/project.json
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,13 @@
"parallel": false
}
},
"test:node": {
"executor": "nx:run-commands",
"options": {
"command": "pnpm vitest run tests/unit/platform/ProcessPlatformAdapter.test.ts",
"cwd": "pkgs/edge-worker"
}
},
"test:integration": {
"dependsOn": ["db:ensure", "^build"],
"executor": "nx:run-commands",
Expand Down
166 changes: 166 additions & 0 deletions pkgs/edge-worker/src/platform/ProcessPlatformAdapter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
import type postgres from 'postgres';
import type { SupabaseClient } from '@supabase/supabase-js';
import type { SupabaseResources } from '@pgflow/dsl/supabase';
import type { CreateWorkerFn, Logger, PlatformAdapter } from './types.js';
import type { Worker } from '../core/Worker.js';
import { createServiceSupabaseClient } from '../core/supabase-utils.js';
import { Queries } from '../core/Queries.js';
import { isLocalSupabaseEnv } from '../shared/localDetection.js';
import { createLoggingFactory } from './logging.js';
import { resolveConnectionString, resolveSqlConnection } from './resolveConnection.js';
import { getProcessDeps, type ProcessDeps, type ProcessSignal } from './processDeps.js';

interface ProcessEnv extends Record<string, string | undefined> {
SUPABASE_URL: string;
SUPABASE_SERVICE_ROLE_KEY: string;
WORKER_NAME?: string;
DATABASE_URL?: string;
EDGE_WORKER_DB_URL?: string;
EDGE_WORKER_LOG_LEVEL?: string;
}

type ProcessAdapterOptions = {
sql?: postgres.Sql;
connectionString?: string;
maxPgConnections?: number;
};

export class ProcessPlatformAdapter implements PlatformAdapter<SupabaseResources> {
private readonly deps: ProcessDeps;
private readonly logger: Logger;
private readonly loggingFactory: ReturnType<typeof createLoggingFactory>;
private readonly abortController = new AbortController();
private readonly validatedEnv: ProcessEnv;
private readonly _connectionString: string | undefined;
private readonly _platformResources: SupabaseResources;
private readonly ownsSql: boolean;
private readonly queries: Queries;
private worker: Worker | null = null;
private workerId: string | null = null;
private shutdownStarted = false;

constructor(
options?: ProcessAdapterOptions,
deps: ProcessDeps = getProcessDeps()
) {
this.deps = deps;
this.assertProcessEnv(deps.env);
this.validatedEnv = deps.env;
this._connectionString = resolveConnectionString(this.validatedEnv, {
hasSql: !!options?.sql,
connectionString: options?.connectionString,
});
this.ownsSql = !options?.sql;
this.loggingFactory = createLoggingFactory(this.validatedEnv);
this.logger = this.loggingFactory.createLogger('ProcessPlatformAdapter');
this._platformResources = {
sql: resolveSqlConnection(this.validatedEnv, options),
supabase: createServiceSupabaseClient(this.validatedEnv),
};
this.queries = new Queries(this._platformResources.sql);
}

async startWorker(createWorkerFn: CreateWorkerFn): Promise<void> {
const workerName = this.validatedEnv.WORKER_NAME || 'pgflow-worker';
const workerId = this.deps.randomUUID();

this.workerId = workerId;
this.loggingFactory.setWorkerId(workerId);
this.loggingFactory.setWorkerName(workerName);
this.registerSignalHandlers();

this.worker = createWorkerFn(this.loggingFactory.createLogger);
await this.worker.startOnlyOnce({
edgeFunctionName: workerName,
workerId,
startMode: 'process',
});
}

async stopWorker(): Promise<void> {
this.requestShutdown();

try {
if (this.worker) {
await this.worker.stop();
}
if (this.workerId) {
await this.queries.markWorkerStopped(this.workerId);
}
} finally {
if (this.ownsSql) {
await this._platformResources.sql.end();
}
}
}
Comment on lines +80 to +95
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Race condition: stopWorker() can be called concurrently

If stopWorker() is called manually and then a signal arrives before it completes, handleSignal() will call stopWorker() again since shutdownStarted is only set in handleSignal(), not in stopWorker(). This causes:

  1. worker.stop() called twice (may not be idempotent)
  2. markWorkerStopped() called twice (could fail or create duplicate entries)
  3. sql.end() potentially called twice (will error on second call)

Fix: Add a guard in stopWorker() or set the shutdown flag there as well:

async stopWorker(): Promise<void> {
  if (this.shutdownStarted) {
    return;
  }
  this.shutdownStarted = true;
  this.requestShutdown();
  // ... rest of implementation
}
Suggested change
async stopWorker(): Promise<void> {
this.requestShutdown();
try {
if (this.worker) {
await this.worker.stop();
}
if (this.workerId) {
await this.queries.markWorkerStopped(this.workerId);
}
} finally {
if (this.ownsSql) {
await this._platformResources.sql.end();
}
}
}
async stopWorker(): Promise<void> {
if (this.shutdownStarted) {
return;
}
this.shutdownStarted = true;
this.requestShutdown();
try {
if (this.worker) {
await this.worker.stop();
}
if (this.workerId) {
await this.queries.markWorkerStopped(this.workerId);
}
} finally {
if (this.ownsSql) {
await this._platformResources.sql.end();
}
}
}

Spotted by Graphite

Fix in Graphite


Is this helpful? React 👍 or 👎 to let us know.


requestShutdown(): void {
this.abortController.abort();
}

createLogger(module: string): Logger {
return this.loggingFactory.createLogger(module);
}

get connectionString(): string | undefined {
return this._connectionString;
}

get env(): Record<string, string | undefined> {
return this.validatedEnv;
}

get shutdownSignal(): AbortSignal {
return this.abortController.signal;
}

get platformResources(): SupabaseResources {
return this._platformResources;
}

get isLocalEnvironment(): boolean {
return isLocalSupabaseEnv(this.validatedEnv);
}

get sql(): postgres.Sql {
return this._platformResources.sql;
}

get supabase(): SupabaseClient {
return this._platformResources.supabase;
}

private registerSignalHandlers(): void {
for (const signal of ['SIGTERM', 'SIGINT', 'SIGQUIT'] satisfies ProcessSignal[]) {
this.deps.onSignal(signal, () => this.handleSignal());
}
}

private async handleSignal(): Promise<void> {
if (this.shutdownStarted) {
this.deps.exit(1);
}

this.shutdownStarted = true;

try {
await this.stopWorker();
} catch (error) {
this.logger.error('Process worker shutdown failed', error);
this.deps.setExitCode(1);
this.deps.exit(1);
}

this.deps.setExitCode(0);
this.deps.exit(0);
}

private assertProcessEnv(env: Record<string, string | undefined>): asserts env is ProcessEnv {
const required = ['SUPABASE_URL', 'SUPABASE_SERVICE_ROLE_KEY'];
const missing = required.filter((key) => !env[key]);

if (missing.length > 0) {
throw new Error(`Missing required environment variables: ${missing.join(', ')}`);
}
}
}
11 changes: 9 additions & 2 deletions pkgs/edge-worker/src/platform/createAdapter.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import type { PlatformAdapter } from './types.js';
import { ProcessPlatformAdapter } from './ProcessPlatformAdapter.js';
import { SupabasePlatformAdapter } from './SupabasePlatformAdapter.js';
import type { SupabaseResources } from '@pgflow/dsl/supabase';
import type postgres from 'postgres';
Expand All @@ -21,11 +22,17 @@ export function createAdapter(options?: AdapterOptions): PlatformAdapter<Supabas
return adapter;
}

// For now, only support Deno
// Later add NodeAdapter, BrowserAdapter, etc.
if (isProcessEnvironment()) {
return new ProcessPlatformAdapter(options);
}

throw new Error('Unsupported environment');
}

function isDenoEnvironment(): boolean {
return typeof Deno !== 'undefined';
}

function isProcessEnvironment(): boolean {
return typeof (globalThis as { process?: unknown }).process !== 'undefined';
}
2 changes: 2 additions & 0 deletions pkgs/edge-worker/src/platform/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,5 @@ export * from './types.js';
export { createAdapter } from './createAdapter.js';
export { SupabasePlatformAdapter } from './SupabasePlatformAdapter.js';
export type { SupabasePlatformDeps } from './deps.js';
export { ProcessPlatformAdapter } from './ProcessPlatformAdapter.js';
export type { ProcessDeps, ProcessSignal } from './processDeps.js';
39 changes: 39 additions & 0 deletions pkgs/edge-worker/src/platform/processDeps.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
export type ProcessSignal = 'SIGTERM' | 'SIGINT' | 'SIGQUIT';

export type ProcessDeps = {
env: Record<string, string | undefined>;
onSignal: (signal: ProcessSignal, handler: () => void | Promise<void>) => void;
exit: (code: number) => never;
setExitCode: (code: number) => void;
randomUUID: () => string;
};

type ProcessLike = {
env?: Record<string, string | undefined>;
on?: (signal: ProcessSignal, handler: () => void | Promise<void>) => void;
exit?: (code: number) => never;
exitCode?: number;
};

type CryptoLike = {
randomUUID?: () => string;
};

export function getProcessDeps(): ProcessDeps {
const processLike = (globalThis as { process?: ProcessLike }).process;
const cryptoLike = globalThis.crypto as CryptoLike | undefined;

if (!processLike?.env || !processLike.on || !processLike.exit || !cryptoLike?.randomUUID) {
throw new Error('Process runtime is not available');
}

return {
env: processLike.env,
onSignal: (signal, handler) => processLike.on?.(signal, handler),
exit: (code) => processLike.exit!(code),
setExitCode: (code) => {
processLike.exitCode = code;
},
randomUUID: () => cryptoLike.randomUUID!(),
};
}
24 changes: 16 additions & 8 deletions pkgs/edge-worker/src/platform/resolveConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ export const DOCKER_TRANSACTION_POOLER_URL =
export interface ConnectionEnv extends Record<string, string | undefined> {
SUPABASE_ANON_KEY?: string;
SUPABASE_SERVICE_ROLE_KEY?: string;
DATABASE_URL?: string;
EDGE_WORKER_DB_URL?: string;
}

Expand All @@ -29,7 +30,7 @@ export interface SqlConnectionOptions {

/**
* Resolves the connection string based on priority:
* config.sql -> config.connectionString -> EDGE_WORKER_DB_URL -> local fallback
* config.sql -> config.connectionString -> DATABASE_URL -> EDGE_WORKER_DB_URL -> local fallback
*/
export function resolveConnectionString(
env: ConnectionEnv,
Expand All @@ -42,12 +43,13 @@ export function resolveConnectionString(
isLocal &&
!options?.hasSql &&
!options?.connectionString &&
!env.DATABASE_URL &&
!env.EDGE_WORKER_DB_URL
) {
return DOCKER_TRANSACTION_POOLER_URL;
}

return options?.connectionString || env.EDGE_WORKER_DB_URL;
return options?.connectionString || env.DATABASE_URL || env.EDGE_WORKER_DB_URL;
}

/**
Expand All @@ -60,7 +62,7 @@ export function assertConnectionAvailable(
if (!hasSql && !connectionString) {
throw new Error(
'No database connection available. Provide one of: ' +
'config.sql, config.connectionString, or EDGE_WORKER_DB_URL environment variable.'
'config.sql, config.connectionString, DATABASE_URL, or EDGE_WORKER_DB_URL environment variable.'
);
}
}
Expand All @@ -69,8 +71,9 @@ export function assertConnectionAvailable(
* Resolves and creates the SQL connection based on priority:
* 1. config.sql - User-provided SQL client (highest priority)
* 2. config.connectionString - User-provided connection string
* 3. EDGE_WORKER_DB_URL - Environment variable
* 4. Local Supabase detection + Docker URL (lowest priority)
* 3. DATABASE_URL - Environment variable
* 4. EDGE_WORKER_DB_URL - Environment variable
* 5. Local Supabase detection + Docker URL (lowest priority)
*
* @throws Error if no connection source is available
*/
Expand All @@ -90,18 +93,23 @@ export function resolveSqlConnection(
return postgres(options.connectionString, { prepare: false, max });
}

// 3. EDGE_WORKER_DB_URL
// 3. DATABASE_URL
if (env.DATABASE_URL) {
return postgres(env.DATABASE_URL, { prepare: false, max });
}

// 4. EDGE_WORKER_DB_URL
if (env.EDGE_WORKER_DB_URL) {
return postgres(env.EDGE_WORKER_DB_URL, { prepare: false, max });
}

// 4. Local Supabase detection + docker URL
// 5. Local Supabase detection + docker URL
if (isLocalSupabaseEnv(env)) {
return postgres(DOCKER_TRANSACTION_POOLER_URL, { prepare: false, max });
}

throw new Error(
'No database connection available. Provide one of: ' +
'config.sql, config.connectionString, or EDGE_WORKER_DB_URL environment variable.'
'config.sql, config.connectionString, DATABASE_URL, or EDGE_WORKER_DB_URL environment variable.'
);
}
6 changes: 3 additions & 3 deletions pkgs/edge-worker/src/types/currentPlatform.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* made configurable or determined by build-time configuration.
*/

import type { SupabaseResources, SupabaseEnv } from '@pgflow/dsl/supabase';
import type { SupabaseResources } from '@pgflow/dsl/supabase';
import type { BaseContext } from '@pgflow/dsl';

/**
Expand All @@ -18,9 +18,9 @@ export type CurrentPlatformResources = SupabaseResources;
* The environment type for the current platform.
* This is hardcoded to Supabase for MVP but can be made configurable later.
*/
export type CurrentPlatformEnv = SupabaseEnv;
export type CurrentPlatformEnv = Record<string, string | undefined>;

/**
* All resources available to flows (base context + platform resources)
*/
export type AvailableResources = BaseContext<CurrentPlatformEnv> & CurrentPlatformResources;
export type AvailableResources = BaseContext<CurrentPlatformEnv> & CurrentPlatformResources;
Loading
Loading