Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions pkgs/client/__tests__/unit/PgflowClient.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,31 @@ describe('PgflowClient', () => {
expect(result).toBeNull();
});

test('getRun returns null without logging when no run data is returned', async () => {
const { client, mocks } = createMockClient();
const consoleErrorSpy = vi
.spyOn(console, 'error')
.mockImplementation(() => {
// Suppress the log while asserting it is not called.
});

mockRpcCall(mocks, { data: null, error: null });

const pgflowClient = new PgflowClient(client, {
realtimeStabilizationDelayMs: 0,
schedule: createSyncSchedule(),
});

try {
const result = await pgflowClient.getRun('nonexistent-id');

expect(result).toBeNull();
expect(consoleErrorSpy).not.toHaveBeenCalled();
} finally {
consoleErrorSpy.mockRestore();
}
});

test('emits events through callbacks', async () => {
const { client, mocks } = createMockClient();

Expand Down
7 changes: 7 additions & 0 deletions pkgs/client/src/lib/PgflowClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,13 @@ export class PgflowClient<TFlow extends AnyFlow = AnyFlow> implements IFlowClien

return flowRun;
} catch (error) {
if (
error instanceof Error &&
error.message === `No data returned for run ${run_id}`
) {
return null;
}

console.error('Error getting run:', error);
// Re-throw if it's a validation error
if (error instanceof Error && (error.message.includes('Invalid run data') || error.message.includes('Invalid step data'))) {
Expand Down
5 changes: 5 additions & 0 deletions pkgs/core/schemas/0056_table_worker_functions.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

create table if not exists pgflow.worker_functions (
function_name text not null primary key,
start_mode text not null default 'http',
constraint worker_functions_start_mode_check check (start_mode in ('http', 'process')),
enabled boolean not null default true,
debounce interval not null default '6 seconds'
check (debounce >= '1 second'),
Expand All @@ -17,6 +19,9 @@ comment on table pgflow.worker_functions is
comment on column pgflow.worker_functions.function_name is
'Name of the Supabase Edge Function';

comment on column pgflow.worker_functions.start_mode is
'How this worker function is started: http workers are pinged by ensure_workers(), process workers self-start';

comment on column pgflow.worker_functions.enabled is
'Whether ensure_workers() should ping this function';

Expand Down
10 changes: 6 additions & 4 deletions pkgs/core/schemas/0057_function_track_worker_function.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,18 @@
-- Registers an edge function for monitoring by ensure_workers() cron

create or replace function pgflow.track_worker_function(
function_name text
function_name text,
start_mode text default 'http'
) returns void
language sql
as $$
insert into pgflow.worker_functions (function_name, updated_at)
values (track_worker_function.function_name, clock_timestamp())
insert into pgflow.worker_functions (function_name, start_mode, updated_at)
values (track_worker_function.function_name, track_worker_function.start_mode, clock_timestamp())
on conflict (function_name)
do update set
start_mode = excluded.start_mode,
updated_at = clock_timestamp();
$$;

comment on function pgflow.track_worker_function(text) is
comment on function pgflow.track_worker_function(text, text) is
'Registers an edge function for monitoring. Called by workers on startup.';
32 changes: 17 additions & 15 deletions pkgs/core/schemas/0059_function_ensure_workers.sql
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,20 @@ as $$
end as base_url
),

-- Only HTTP-started worker functions are supervised via HTTP pings.
http_worker_functions as (
select wf.function_name, wf.debounce, wf.last_invoked_at
from pgflow.worker_functions as wf
where wf.enabled = true
and wf.start_mode = 'http'
),

-- Find functions that pass the debounce check
debounce_passed as (
select wf.function_name, wf.debounce
from pgflow.worker_functions as wf
where wf.enabled = true
and (
wf.last_invoked_at is null
or wf.last_invoked_at < now() - wf.debounce
)
from http_worker_functions as wf
where wf.last_invoked_at is null
or wf.last_invoked_at < now() - wf.debounce
),

-- Find functions that have at least one alive worker
Expand All @@ -56,15 +61,12 @@ as $$
-- Production mode: only functions that pass debounce AND have no alive workers
functions_to_invoke as (
select wf.function_name
from pgflow.worker_functions as wf
where wf.enabled = true
and (
pgflow.is_local() = true -- Local: all enabled functions
or (
-- Production: debounce + no alive workers
wf.function_name in (select dp.function_name from debounce_passed as dp)
and wf.function_name not in (select faw.function_name from functions_with_alive_workers as faw)
)
from http_worker_functions as wf
where pgflow.is_local() = true -- Local: all enabled HTTP functions
or (
-- Production: debounce + no alive workers
wf.function_name in (select dp.function_name from debounce_passed as dp)
and wf.function_name not in (select faw.function_name from functions_with_alive_workers as faw)
)
),

Expand Down
5 changes: 4 additions & 1 deletion pkgs/core/src/database-types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,7 @@ export type Database = {
enabled: boolean
function_name: string
last_invoked_at: string | null
start_mode: string
updated_at: string
}
Insert: {
Expand All @@ -359,6 +360,7 @@ export type Database = {
enabled?: boolean
function_name: string
last_invoked_at?: string | null
start_mode?: string
updated_at?: string
}
Update: {
Expand All @@ -367,6 +369,7 @@ export type Database = {
enabled?: boolean
function_name?: string
last_invoked_at?: string | null
start_mode?: string
updated_at?: string
}
Relationships: []
Expand Down Expand Up @@ -656,7 +659,7 @@ export type Database = {
}
}
track_worker_function: {
Args: { function_name: string }
Args: { function_name: string; start_mode?: string }
Returns: undefined
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
-- Modify "worker_functions" table
ALTER TABLE "pgflow"."worker_functions" ADD CONSTRAINT "worker_functions_start_mode_check" CHECK (start_mode = ANY (ARRAY['http'::text, 'process'::text])), ADD COLUMN "start_mode" text NOT NULL DEFAULT 'http';
-- Set comment to column: "start_mode" on table: "worker_functions"
COMMENT ON COLUMN "pgflow"."worker_functions"."start_mode" IS 'How this worker function is started: http workers are pinged by ensure_workers(), process workers self-start';
-- Modify "ensure_workers" function
CREATE OR REPLACE FUNCTION "pgflow"."ensure_workers" () RETURNS TABLE ("function_name" text, "invoked" boolean, "request_id" bigint) LANGUAGE sql AS $$
with
-- Detect environment
env as (
select pgflow.is_local() as is_local
),

-- Get credentials: Local mode uses hardcoded URL, production uses vault secrets
-- Empty strings are treated as NULL using nullif()
-- pgflow_auth_secret takes priority over supabase_service_role_key for production auth
credentials as (
select
case
when (select is_local from env) then null
else coalesce(
nullif((select decrypted_secret from vault.decrypted_secrets where name = 'pgflow_auth_secret'), ''),
nullif((select decrypted_secret from vault.decrypted_secrets where name = 'supabase_service_role_key'), '')
)
end as service_role_key,
case
when (select is_local from env) then 'http://kong:8000/functions/v1'
else (select 'https://' || nullif(decrypted_secret, '') || '.supabase.co/functions/v1' from vault.decrypted_secrets where name = 'supabase_project_id')
end as base_url
),

-- Only HTTP-started worker functions are supervised via HTTP pings.
http_worker_functions as (
select wf.function_name, wf.debounce, wf.last_invoked_at
from pgflow.worker_functions as wf
where wf.enabled = true
and wf.start_mode = 'http'
),

-- Find functions that pass the debounce check
debounce_passed as (
select wf.function_name, wf.debounce
from http_worker_functions as wf
where wf.last_invoked_at is null
or wf.last_invoked_at < now() - wf.debounce
),

-- Find functions that have at least one alive worker
functions_with_alive_workers as (
select distinct w.function_name
from pgflow.workers as w
inner join debounce_passed as dp on w.function_name = dp.function_name
where w.stopped_at is null
and w.deprecated_at is null
and w.last_heartbeat_at > now() - dp.debounce
),

-- Determine which functions should be invoked
-- Local mode: all enabled functions (bypass debounce AND alive workers check)
-- Production mode: only functions that pass debounce AND have no alive workers
functions_to_invoke as (
select wf.function_name
from http_worker_functions as wf
where pgflow.is_local() = true -- Local: all enabled HTTP functions
or (
-- Production: debounce + no alive workers
wf.function_name in (select dp.function_name from debounce_passed as dp)
and wf.function_name not in (select faw.function_name from functions_with_alive_workers as faw)
)
),

-- Make HTTP requests and capture request_ids
http_requests as (
select
fti.function_name,
net.http_post(
url => c.base_url || '/' || fti.function_name,
headers => case
when e.is_local then '{}'::jsonb
else jsonb_build_object(
'Content-Type', 'application/json',
'Authorization', 'Bearer ' || c.service_role_key
)
end,
body => '{}'::jsonb
) as request_id
from functions_to_invoke as fti
cross join credentials as c
cross join env as e
where c.base_url is not null
and (e.is_local or c.service_role_key is not null)
),

-- Update last_invoked_at for invoked functions
updated as (
update pgflow.worker_functions as wf
set last_invoked_at = clock_timestamp()
from http_requests as hr
where wf.function_name = hr.function_name
returning wf.function_name
)

select u.function_name, true as invoked, hr.request_id
from updated as u
inner join http_requests as hr on u.function_name = hr.function_name
$$;
-- Drop "track_worker_function" function
DROP FUNCTION "pgflow"."track_worker_function" (text);
-- Create "track_worker_function" function
CREATE FUNCTION "pgflow"."track_worker_function" ("function_name" text, "start_mode" text DEFAULT 'http') RETURNS void LANGUAGE sql AS $$
insert into pgflow.worker_functions (function_name, start_mode, updated_at)
values (track_worker_function.function_name, track_worker_function.start_mode, clock_timestamp())
on conflict (function_name)
do update set
start_mode = excluded.start_mode,
updated_at = clock_timestamp();
$$;
-- Set comment to function: "track_worker_function"
COMMENT ON FUNCTION "pgflow"."track_worker_function" (text, text) IS 'Registers an edge function for monitoring. Called by workers on startup.';
3 changes: 2 additions & 1 deletion pkgs/core/supabase/migrations/atlas.sum
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
h1:jDc+2bvTL4ZYqATAMfBXbTYtMlx8RPvDUvRJjrP537w=
h1:ugGS1dSXEdS8KOgQWhOdxWCovACZT4bTwWa9HVb7F70=
20250429164909_pgflow_initial.sql h1:I3n/tQIg5Q5nLg7RDoU3BzqHvFVjmumQxVNbXTPG15s=
20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql h1:wTuXuwMxVniCr3ONCpodpVWJcHktoQZIbqMZ3sUHKMY=
20250609105135_pgflow_add_start_tasks_and_started_status.sql h1:ggGanW4Wyt8Kv6TWjnZ00/qVb3sm+/eFVDjGfT8qyPg=
Expand All @@ -19,3 +19,4 @@ h1:jDc+2bvTL4ZYqATAMfBXbTYtMlx8RPvDUvRJjrP537w=
20260120205547_pgflow_requeue_stalled_tasks.sql h1:4wCBBvjtETCgJf1eXmlH5wCTKDUhiLi0uzsFG1V528E=
20260124113408_pgflow_auth_secret_support.sql h1:i/s1JkBqRElN6FOYFQviJt685W08SuSo30aP25lNlLc=
20260214181656_pgflow_step_conditions.sql h1:rHQnXCeZ/QGxPlChdTMxumtsTtYHr1ej183Dd+auw34=
20260607175525_pgflow_worker_start_mode.sql h1:PFAfoGaHe5stKF7YAFg6AqBxmRisqDvV60vVpnnVdBE=
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
begin;
select plan(3);

select pgflow_tests.reset_db();

select pgflow.track_worker_function('http-worker', 'http');
select pgflow.track_worker_function('process-worker', 'process');

select is(
(select count(*)::int from pgflow.ensure_workers() where function_name = 'process-worker'),
0,
'ensure_workers skips process workers in production mode'
);

select ok(
(select count(*)::int from pgflow.ensure_workers() where function_name = 'http-worker') >= 0,
'ensure_workers still evaluates http workers'
);

select set_config('app.settings.is_local', 'true', true);

select is(
(select count(*)::int from pgflow.ensure_workers() where function_name = 'process-worker'),
0,
'ensure_workers skips process workers in local mode'
);

select * from finish();
rollback;
38 changes: 38 additions & 0 deletions pkgs/core/supabase/tests/track_worker_function/start_mode.test.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
begin;
select plan(4);

select pgflow_tests.reset_db();

select pgflow.track_worker_function('default-http-worker');

select is(
(select start_mode from pgflow.worker_functions where function_name = 'default-http-worker'),
'http',
'track_worker_function defaults start_mode to http'
);

select pgflow.track_worker_function('process-worker', 'process');

select is(
(select start_mode from pgflow.worker_functions where function_name = 'process-worker'),
'process',
'track_worker_function stores explicit process start_mode'
);

select pgflow.track_worker_function('mode-update-worker', 'http');
select pgflow.track_worker_function('mode-update-worker', 'process');

select is(
(select start_mode from pgflow.worker_functions where function_name = 'mode-update-worker'),
'process',
'track_worker_function updates start_mode on conflict'
);

select throws_ok(
$$ select pgflow.track_worker_function('invalid-worker', 'invalid') $$,
'new row for relation "worker_functions" violates check constraint "worker_functions_start_mode_check"',
'track_worker_function rejects unsupported start_mode values'
);

select * from finish();
rollback;
6 changes: 3 additions & 3 deletions pkgs/edge-worker/src/core/Queries.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type postgres from 'postgres';
import type { WorkerRow } from './types.js';
import type { WorkerRow, WorkerStartMode } from './types.js';
import type { FlowShape, Json } from '@pgflow/dsl';

export type EnsureFlowCompiledStatus = 'compiled' | 'verified' | 'recompiled' | 'mismatch';
Expand Down Expand Up @@ -81,9 +81,9 @@ export class Queries {
* Called by workers on startup. Sets last_invoked_at to prevent cron from
* pinging during startup (debounce).
*/
async trackWorkerFunction(functionName: string): Promise<void> {
async trackWorkerFunction(functionName: string, startMode: WorkerStartMode = 'http'): Promise<void> {
await this.sql`
SELECT pgflow.track_worker_function(${functionName})
SELECT pgflow.track_worker_function(${functionName}, ${startMode})
`;
}

Expand Down
3 changes: 2 additions & 1 deletion pkgs/edge-worker/src/core/WorkerLifecycle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ export class WorkerLifecycle<IMessage extends Json> implements ILifecycle {
this.workerState.transitionTo(States.Starting);

// Register this edge function for monitoring by ensure_workers() cron.
await this.queries.trackWorkerFunction(workerBootstrap.edgeFunctionName);
const startMode = workerBootstrap.startMode ?? 'http';
await this.queries.trackWorkerFunction(workerBootstrap.edgeFunctionName, startMode);

this.logger.debug(`Ensuring queue '${this.queue.queueName}' exists...`);
await this.queue.safeCreate();
Expand Down
Loading
Loading