From 8e4705a35e46b3172dd54bae3ea731cd5a0df590 Mon Sep 17 00:00:00 2001 From: Josh Ventura Date: Thu, 9 Apr 2026 14:54:20 -0400 Subject: [PATCH 1/6] Keep track of nested queries stack-style and emit `parent_query_id` --- docker/init/00-schema.sql | 2 + migrations/001_add_parent_query_id.sql | 12 ++ src/export/stats_exporter.cc | 2 + src/hooks/hooks.cc | 209 +++++++++++-------------- src/queue/event.h | 6 +- 5 files changed, 109 insertions(+), 122 deletions(-) create mode 100644 migrations/001_add_parent_query_id.sql diff --git a/docker/init/00-schema.sql b/docker/init/00-schema.sql index 4b15ef2..c353915 100644 --- a/docker/init/00-schema.sql +++ b/docker/init/00-schema.sql @@ -53,6 +53,8 @@ CREATE TABLE pg_stat_ch.events_raw query_id Int64 COMMENT '64-bit hash identifying normalized queries. Queries differing only in constants share the same query_id. Use for aggregating statistics across similar queries.', + parent_query_id UInt64 COMMENT 'query_id of the calling query (e.g. the plpgsql function that issued this SPI statement). 0 for top-level queries. Use WHERE parent_query_id = 0 to restrict aggregations to top-level queries and avoid double-counting CPU and duration.', + cmd_type LowCardinality(String) COMMENT 'Command type: SELECT, INSERT, UPDATE, DELETE, MERGE, UTILITY, or UNKNOWN. Use for workload characterization (read-heavy vs write-heavy).', rows UInt64 COMMENT 'Rows returned (SELECT) or affected (INSERT/UPDATE/DELETE). HIGH: large result sets or bulk operations. LOW: point queries. Watch for unexpected HIGH values indicating missing WHERE clauses.', diff --git a/migrations/001_add_parent_query_id.sql b/migrations/001_add_parent_query_id.sql new file mode 100644 index 0000000..3b49bf7 --- /dev/null +++ b/migrations/001_add_parent_query_id.sql @@ -0,0 +1,12 @@ +-- Migration: add parent_query_id column +-- +-- Introduced in pg_stat_ch 0.4.x. Each event now carries the query_id of its +-- calling query (e.g. the plpgsql function that issued an SPI statement). +-- Top-level queries emit 0. Use WHERE parent_query_id = 0 in aggregations to +-- avoid double-counting CPU and duration across nested calls. +-- +-- Run against your ClickHouse instance before upgrading the extension: +-- clickhouse-client < migrations/001_add_parent_query_id.sql + +ALTER TABLE pg_stat_ch.events_raw + ADD COLUMN IF NOT EXISTS parent_query_id UInt64 DEFAULT 0; diff --git a/src/export/stats_exporter.cc b/src/export/stats_exporter.cc index e11af99..d21c795 100644 --- a/src/export/stats_exporter.cc +++ b/src/export/stats_exporter.cc @@ -93,6 +93,7 @@ void ExportEventStats(const std::vector& events, StatsExporter* expor auto col_username = exporter->DbUserColumn(); auto col_pid = exporter->RecordInt32("pid"); auto col_query_id = exporter->RecordInt64("query_id"); + auto col_parent_query_id = exporter->RecordUInt64("parent_query_id"); auto col_cmd_type = exporter->DbOperationColumn(); auto col_rows = exporter->MetricUInt64("rows"); auto col_query = exporter->DbQueryTextColumn(); @@ -148,6 +149,7 @@ void ExportEventStats(const std::vector& events, StatsExporter* expor col_username->Append(std::string(ev.username, ev.username_len)); col_pid->Append(ev.pid); col_query_id->Append(static_cast(ev.queryid)); + col_parent_query_id->Append(ev.parent_query_id); col_cmd_type->Append(CmdTypeToString(ev.cmd_type)); col_rows->Append(ev.rows); diff --git a/src/hooks/hooks.cc b/src/hooks/hooks.cc index c76bfc8..0d3b909 100644 --- a/src/hooks/hooks.cc +++ b/src/hooks/hooks.cc @@ -2,6 +2,7 @@ #include #include +#include extern "C" { #include "postgres.h" @@ -48,21 +49,21 @@ static ExecutorEnd_hook_type prev_executor_end = nullptr; static ProcessUtility_hook_type prev_process_utility = nullptr; static emit_log_hook_type prev_emit_log_hook = nullptr; -// Track nesting level to identify top-level queries -static int nesting_level = 0; - -// CPU time tracking via getrusage -static struct rusage rusage_start; +// Per-query frame pushed at ExecutorStart / PschProcessUtility and popped at +// ExecutorEnd / end of PschProcessUtility. The stack naturally tracks nesting +// depth and provides each query with its own rusage baseline and start time, +// fixing the prior single-static design that produced overlapping CPU deltas +// for nested SPI calls. +struct PschQueryFrame { + struct rusage rusage_start; + TimestampTz query_start_ts; + uint64 queryid; // used as parent_query_id by the next inner query +}; +static std::vector query_stack; // Deadlock prevention for emit_log_hook static bool disable_error_capture = false; -// Track whether the current query started at top level -static bool current_query_is_top_level = false; - -// Track query start time for duration calculation -static TimestampTz query_start_ts = 0; - // System initialization flag - set after hooks are installed and shmem is ready static bool system_init = false; @@ -326,14 +327,14 @@ static void InitEventPartial(PschEvent* event) { event->query[0] = '\0'; } -static void InitBaseEvent(PschEvent* event, TimestampTz ts_start, bool top_level, +static void InitBaseEvent(PschEvent* event, TimestampTz ts_start, uint64 parent_query_id, PschCmdType cmd_type) { InitEventPartial(event); event->ts_start = ts_start; event->dbid = MyDatabaseId; event->userid = GetUserId(); event->pid = MyProcPid; - event->top_level = top_level; + event->parent_query_id = parent_query_id; event->cmd_type = cmd_type; ResolveNames(event); } @@ -454,10 +455,10 @@ static void CopyParallelWorkerInfo([[maybe_unused]] PschEvent* event, #endif } -static void BuildEventFromQueryDesc(QueryDesc* query_desc, PschEvent* event, int64 cpu_user_us, - int64 cpu_sys_us) { - InitBaseEvent(event, query_start_ts, current_query_is_top_level, - ConvertCmdType(query_desc->operation)); +static void BuildEventFromQueryDesc(QueryDesc* query_desc, PschEvent* event, + TimestampTz query_start_ts, int64 cpu_user_us, + int64 cpu_sys_us, uint64 parent_query_id) { + InitBaseEvent(event, query_start_ts, parent_query_id, ConvertCmdType(query_desc->operation)); event->queryid = query_desc->plannedstmt->queryId; event->rows = query_desc->estate->es_processed; event->cpu_user_time_us = cpu_user_us; @@ -542,17 +543,15 @@ static void PschExecutorStart(QueryDesc* query_desc, int eflags) { return; } - // Record if this is a top-level query (before nesting_level changes in Run) - if (nesting_level == 0) { - current_query_is_top_level = true; - query_start_ts = GetCurrentTimestamp(); - // Capture CPU time baseline for top-level queries - if (psch_enabled) { - getrusage(RUSAGE_SELF, &rusage_start); - } - } else { - current_query_is_top_level = false; - } + // Push a per-query frame. Each frame captures its own rusage baseline so + // that nested SPI calls each get an accurate, non-overlapping CPU delta. + // The stack depth implicitly tracks nesting; the frame below us on the stack + // supplies the parent_query_id for this query's event. + PschQueryFrame frame; + frame.queryid = query_desc->plannedstmt->queryId; + frame.query_start_ts = GetCurrentTimestamp(); + getrusage(RUSAGE_SELF, &frame.rusage_start); + query_stack.push_back(frame); if (prev_executor_start != nullptr) { prev_executor_start(query_desc, eflags); @@ -575,75 +574,52 @@ static void PschExecutorStart(QueryDesc* query_desc, int eflags) { #if PG_VERSION_NUM >= 180000 static void PschExecutorRun(QueryDesc* query_desc, ScanDirection direction, uint64 count) { + if (prev_executor_run != nullptr) { + prev_executor_run(query_desc, direction, count); + } else { + standard_ExecutorRun(query_desc, direction, count); + } +} #else static void PschExecutorRun(QueryDesc* query_desc, ScanDirection direction, uint64 count, bool execute_once) { -#endif - if (IsParallelWorker()) { -#if PG_VERSION_NUM >= 180000 - if (prev_executor_run != nullptr) { - prev_executor_run(query_desc, direction, count); - } else { - standard_ExecutorRun(query_desc, direction, count); - } -#else - if (prev_executor_run != nullptr) { - prev_executor_run(query_desc, direction, count, execute_once); - } else { - standard_ExecutorRun(query_desc, direction, count, execute_once); - } -#endif - return; + if (prev_executor_run != nullptr) { + prev_executor_run(query_desc, direction, count, execute_once); + } else { + standard_ExecutorRun(query_desc, direction, count, execute_once); } - - nesting_level++; - PG_TRY(); - { -#if PG_VERSION_NUM >= 180000 - if (prev_executor_run != nullptr) { - prev_executor_run(query_desc, direction, count); - } else { - standard_ExecutorRun(query_desc, direction, count); - } -#else - if (prev_executor_run != nullptr) { - prev_executor_run(query_desc, direction, count, execute_once); - } else { - standard_ExecutorRun(query_desc, direction, count, execute_once); - } +} #endif + +static void PschExecutorFinish(QueryDesc* query_desc) { + if (prev_executor_finish != nullptr) { + prev_executor_finish(query_desc); + } else { + standard_ExecutorFinish(query_desc); } - PG_FINALLY(); - { nesting_level--; } - PG_END_TRY(); } -static void PschExecutorFinish(QueryDesc* query_desc) { +static void PschExecutorEnd(QueryDesc* query_desc) { + // Parallel workers never push a frame — handle them separately. if (IsParallelWorker()) { - if (prev_executor_finish != nullptr) { - prev_executor_finish(query_desc); + ForgetNormalizedStatement(query_desc->sourceText, query_desc->plannedstmt->stmt_location, + query_desc->plannedstmt->stmt_len); + if (prev_executor_end != nullptr) { + prev_executor_end(query_desc); } else { - standard_ExecutorFinish(query_desc); + standard_ExecutorEnd(query_desc); } return; } - nesting_level++; - PG_TRY(); - { - if (prev_executor_finish != nullptr) { - prev_executor_finish(query_desc); - } else { - standard_ExecutorFinish(query_desc); - } + // Pop the frame pushed in ExecutorStart. + PschQueryFrame frame = {}; + if (!query_stack.empty()) { + frame = query_stack.back(); + query_stack.pop_back(); } - PG_FINALLY(); - { nesting_level--; } - PG_END_TRY(); -} -static void PschExecutorEnd(QueryDesc* query_desc) { - if (!psch_enabled || IsParallelWorker() || query_desc->plannedstmt->queryId == UINT64CONST(0)) { + if (!psch_enabled || query_desc->plannedstmt->queryId == UINT64CONST(0)) { ForgetNormalizedStatement(query_desc->sourceText, query_desc->plannedstmt->stmt_location, query_desc->plannedstmt->stmt_len); if (prev_executor_end != nullptr) { @@ -658,17 +634,20 @@ static void PschExecutorEnd(QueryDesc* query_desc) { InstrEndLoop(query_desc->totaltime); } - // Compute CPU time delta from getrusage int64 cpu_user_us = 0; int64 cpu_sys_us = 0; struct rusage rusage_end; if (getrusage(RUSAGE_SELF, &rusage_end) == 0) { - cpu_user_us = TimeDiffMicrosec(rusage_end.ru_utime, rusage_start.ru_utime); - cpu_sys_us = TimeDiffMicrosec(rusage_end.ru_stime, rusage_start.ru_stime); + cpu_user_us = TimeDiffMicrosec(rusage_end.ru_utime, frame.rusage_start.ru_utime); + cpu_sys_us = TimeDiffMicrosec(rusage_end.ru_stime, frame.rusage_start.ru_stime); } + // The frame below us on the stack is our caller; its queryid is our parent. + uint64 parent_query_id = query_stack.empty() ? 0 : query_stack.back().queryid; + PschEvent event; - BuildEventFromQueryDesc(query_desc, &event, cpu_user_us, cpu_sys_us); + BuildEventFromQueryDesc(query_desc, &event, frame.query_start_ts, cpu_user_us, cpu_sys_us, + parent_query_id); PschEnqueueEvent(&event); if (prev_executor_end != nullptr) { @@ -681,9 +660,9 @@ static void PschExecutorEnd(QueryDesc* query_desc) { // Build a PschEvent for utility statements (no QueryDesc available) static void BuildEventForUtility(PschEvent* event, const char* queryString, TimestampTz start_ts, int stmt_location, int stmt_len, uint64 duration_us, - bool is_top_level, uint64 rows, BufferUsage* bufusage, + uint64 parent_query_id, uint64 rows, BufferUsage* bufusage, WalUsage* walusage, int64 cpu_user_us, int64 cpu_sys_us) { - InitBaseEvent(event, start_ts, is_top_level, PSCH_CMD_UTILITY); + InitBaseEvent(event, start_ts, parent_query_id, PSCH_CMD_UTILITY); event->duration_us = duration_us; event->rows = rows; event->cpu_user_time_us = cpu_user_us; @@ -746,21 +725,6 @@ static uint64 GetUtilityRowCount(QueryCompletion* qc) { } } -static void ExecuteUtilityWithNesting(PlannedStmt* pstmt, const char* queryString, -#if PG_VERSION_NUM >= 140000 - bool readOnlyTree, -#endif - ProcessUtilityContext context, ParamListInfo params, - QueryEnvironment* queryEnv, DestReceiver* dest, - QueryCompletion* qc) { - nesting_level++; - PG_TRY(); - { CALL_PROCESS_UTILITY(); } - PG_FINALLY(); - { nesting_level--; } - PG_END_TRY(); -} - // ProcessUtility hook - captures DDL and utility statements #if PG_VERSION_NUM >= 140000 static void PschProcessUtility(PlannedStmt* pstmt, const char* queryString, bool readOnlyTree, @@ -781,28 +745,34 @@ static void PschProcessUtility(PlannedStmt* pstmt, const char* queryString, return; } - // Capture state before execution - bool is_top_level = (nesting_level == 0); - TimestampTz start_ts = GetCurrentTimestamp(); + // Capture parent before pushing our own frame, then push so that any + // executor hooks fired from within this utility (e.g. CREATE TABLE AS SELECT) + // see us on the stack and correctly link themselves as our children. + uint64 parent_query_id = query_stack.empty() ? 0 : query_stack.back().queryid; + + PschQueryFrame frame; + frame.queryid = 0; // utility statements have no queryId + frame.query_start_ts = GetCurrentTimestamp(); + getrusage(RUSAGE_SELF, &frame.rusage_start); + query_stack.push_back(frame); + int stmt_location = pstmt->stmt_location; int stmt_len = pstmt->stmt_len; BufferUsage bufusage_start = pgBufferUsage; WalUsage walusage_start = pgWalUsage; - struct rusage rusage_util_start; - getrusage(RUSAGE_SELF, &rusage_util_start); instr_time start_time; INSTR_TIME_SET_CURRENT(start_time); -#if PG_VERSION_NUM >= 140000 - ExecuteUtilityWithNesting(pstmt, queryString, readOnlyTree, context, params, queryEnv, dest, qc); -#else - ExecuteUtilityWithNesting(pstmt, queryString, context, params, queryEnv, dest, qc); -#endif + CALL_PROCESS_UTILITY(); instr_time duration; INSTR_TIME_SET_CURRENT(duration); INSTR_TIME_SUBTRACT(duration, start_time); + // Pop our frame and compute CPU delta. + PschQueryFrame popped = query_stack.back(); + query_stack.pop_back(); + BufferUsage bufusage_delta; WalUsage walusage_delta; MemSet(&bufusage_delta, 0, sizeof(BufferUsage)); @@ -812,15 +782,15 @@ static void PschProcessUtility(PlannedStmt* pstmt, const char* queryString, int64 cpu_user_us = 0; int64 cpu_sys_us = 0; - struct rusage rusage_util_end; - if (getrusage(RUSAGE_SELF, &rusage_util_end) == 0) { - cpu_user_us = TimeDiffMicrosec(rusage_util_end.ru_utime, rusage_util_start.ru_utime); - cpu_sys_us = TimeDiffMicrosec(rusage_util_end.ru_stime, rusage_util_start.ru_stime); + struct rusage rusage_end; + if (getrusage(RUSAGE_SELF, &rusage_end) == 0) { + cpu_user_us = TimeDiffMicrosec(rusage_end.ru_utime, popped.rusage_start.ru_utime); + cpu_sys_us = TimeDiffMicrosec(rusage_end.ru_stime, popped.rusage_start.ru_stime); } PschEvent event; - BuildEventForUtility(&event, queryString, start_ts, stmt_location, stmt_len, - INSTR_TIME_GET_MICROSEC(duration), is_top_level, GetUtilityRowCount(qc), + BuildEventForUtility(&event, queryString, popped.query_start_ts, stmt_location, stmt_len, + INSTR_TIME_GET_MICROSEC(duration), parent_query_id, GetUtilityRowCount(qc), &bufusage_delta, &walusage_delta, cpu_user_us, cpu_sys_us); PschEnqueueEvent(&event); } @@ -867,7 +837,8 @@ static bool ShouldCaptureLog(ErrorData* edata) { // message, SQLSTATE, and client/session metadata. static void CaptureLogEvent(ErrorData* edata) { PschEvent event; - InitBaseEvent(&event, GetCurrentTimestamp(), (nesting_level == 0), PSCH_CMD_UNKNOWN); + uint64 parent_query_id = query_stack.empty() ? 0 : query_stack.back().queryid; + InitBaseEvent(&event, GetCurrentTimestamp(), parent_query_id, PSCH_CMD_UNKNOWN); UnpackSqlState(edata->sqlerrcode, event.err_sqlstate); event.err_elevel = static_cast(edata->elevel); diff --git a/src/queue/event.h b/src/queue/event.h index 7289836..a7b7c32 100644 --- a/src/queue/event.h +++ b/src/queue/event.h @@ -82,9 +82,9 @@ struct PschEvent { char username[64]; // User name (NAMEDATALEN=64, resolved at capture) uint8 username_len; // Actual length int32 pid; // Backend process ID - uint64 queryid; // Query ID (from pg_stat_statements) - bool top_level; // True if this is a top-level query - PschCmdType cmd_type; // Command type (SELECT, UPDATE, etc.) + uint64 queryid; // Query ID (from pg_stat_statements) + uint64 parent_query_id; // Query ID of the calling query (0 if top-level) + PschCmdType cmd_type; // Command type (SELECT, UPDATE, etc.) // Results uint64 rows; // Number of rows affected/returned From 0f03866abeb386265fa37e914f5b437fbe1ecc8e Mon Sep 17 00:00:00 2001 From: Josh Ventura Date: Thu, 9 Apr 2026 16:52:40 -0400 Subject: [PATCH 2/6] fix: replace std::vector query stack with fixed-size array to avoid OOM std::vector::push_back throws std::bad_alloc on memory exhaustion, which is incompatible with PostgreSQL's longjmp-based error handling. Replace with a fixed-size array and a separate depth counter so push/pop balance is always maintained. Queries exceeding the cap emit zero CPU rather than garbage or a crash. Co-Authored-By: Claude Sonnet 4.6 --- src/hooks/hooks.cc | 53 ++++++++++++++++++++++++++++++++++------------ 1 file changed, 39 insertions(+), 14 deletions(-) diff --git a/src/hooks/hooks.cc b/src/hooks/hooks.cc index 0d3b909..66a1cd2 100644 --- a/src/hooks/hooks.cc +++ b/src/hooks/hooks.cc @@ -2,7 +2,6 @@ #include #include -#include extern "C" { #include "postgres.h" @@ -54,12 +53,19 @@ static emit_log_hook_type prev_emit_log_hook = nullptr; // depth and provides each query with its own rusage baseline and start time, // fixing the prior single-static design that produced overlapping CPU deltas // for nested SPI calls. +// +// Fixed-size to avoid heap allocation: std::vector::push_back throws +// std::bad_alloc on OOM, which is incompatible with PostgreSQL's longjmp-based +// error handling. Real-world nesting depth is small; 64 levels is a generous +// ceiling even for deeply recursive plpgsql. struct PschQueryFrame { struct rusage rusage_start; TimestampTz query_start_ts; uint64 queryid; // used as parent_query_id by the next inner query }; -static std::vector query_stack; +constexpr int kMaxQueryNestingDepth = 64; +static std::array query_stack; +static int query_stack_depth = 0; // Deadlock prevention for emit_log_hook static bool disable_error_capture = false; @@ -551,7 +557,10 @@ static void PschExecutorStart(QueryDesc* query_desc, int eflags) { frame.queryid = query_desc->plannedstmt->queryId; frame.query_start_ts = GetCurrentTimestamp(); getrusage(RUSAGE_SELF, &frame.rusage_start); - query_stack.push_back(frame); + if (query_stack_depth < kMaxQueryNestingDepth) { + query_stack[query_stack_depth] = frame; + } + query_stack_depth++; if (prev_executor_start != nullptr) { prev_executor_start(query_desc, eflags); @@ -612,11 +621,14 @@ static void PschExecutorEnd(QueryDesc* query_desc) { return; } - // Pop the frame pushed in ExecutorStart. + // Pop the frame pushed in ExecutorStart. If depth exceeded the array cap, + // the frame is zeroed — the event is still emitted but with zero CPU. PschQueryFrame frame = {}; - if (!query_stack.empty()) { - frame = query_stack.back(); - query_stack.pop_back(); + if (query_stack_depth > 0) { + query_stack_depth--; + if (query_stack_depth < kMaxQueryNestingDepth) { + frame = query_stack[query_stack_depth]; + } } if (!psch_enabled || query_desc->plannedstmt->queryId == UINT64CONST(0)) { @@ -643,7 +655,9 @@ static void PschExecutorEnd(QueryDesc* query_desc) { } // The frame below us on the stack is our caller; its queryid is our parent. - uint64 parent_query_id = query_stack.empty() ? 0 : query_stack.back().queryid; + int parent_idx = query_stack_depth - 1; + uint64 parent_query_id = + (parent_idx >= 0 && parent_idx < kMaxQueryNestingDepth) ? query_stack[parent_idx].queryid : 0; PschEvent event; BuildEventFromQueryDesc(query_desc, &event, frame.query_start_ts, cpu_user_us, cpu_sys_us, @@ -748,13 +762,18 @@ static void PschProcessUtility(PlannedStmt* pstmt, const char* queryString, // Capture parent before pushing our own frame, then push so that any // executor hooks fired from within this utility (e.g. CREATE TABLE AS SELECT) // see us on the stack and correctly link themselves as our children. - uint64 parent_query_id = query_stack.empty() ? 0 : query_stack.back().queryid; + int parent_idx = query_stack_depth - 1; + uint64 parent_query_id = + (parent_idx >= 0 && parent_idx < kMaxQueryNestingDepth) ? query_stack[parent_idx].queryid : 0; PschQueryFrame frame; frame.queryid = 0; // utility statements have no queryId frame.query_start_ts = GetCurrentTimestamp(); getrusage(RUSAGE_SELF, &frame.rusage_start); - query_stack.push_back(frame); + if (query_stack_depth < kMaxQueryNestingDepth) { + query_stack[query_stack_depth] = frame; + } + query_stack_depth++; int stmt_location = pstmt->stmt_location; int stmt_len = pstmt->stmt_len; @@ -769,9 +788,15 @@ static void PschProcessUtility(PlannedStmt* pstmt, const char* queryString, INSTR_TIME_SET_CURRENT(duration); INSTR_TIME_SUBTRACT(duration, start_time); - // Pop our frame and compute CPU delta. - PschQueryFrame popped = query_stack.back(); - query_stack.pop_back(); + // Pop our frame and compute CPU delta. Zeroed if depth exceeded the cap. + PschQueryFrame popped = {}; + if (query_stack_depth > 0) { + query_stack_depth--; + if (query_stack_depth < kMaxQueryNestingDepth) { + popped = query_stack[query_stack_depth]; + } + } + BufferUsage bufusage_delta; WalUsage walusage_delta; @@ -837,7 +862,7 @@ static bool ShouldCaptureLog(ErrorData* edata) { // message, SQLSTATE, and client/session metadata. static void CaptureLogEvent(ErrorData* edata) { PschEvent event; - uint64 parent_query_id = query_stack.empty() ? 0 : query_stack.back().queryid; + uint64 parent_query_id = query_stack_depth > 0 ? query_stack[query_stack_depth - 1].queryid : 0; InitBaseEvent(&event, GetCurrentTimestamp(), parent_query_id, PSCH_CMD_UNKNOWN); UnpackSqlState(edata->sqlerrcode, event.err_sqlstate); From a31691e2c8c03de3974944ca5ac7662c06937836 Mon Sep 17 00:00:00 2001 From: Josh Ventura Date: Thu, 9 Apr 2026 23:01:34 -0400 Subject: [PATCH 3/6] More bots makes the batter better --- src/hooks/hooks.cc | 5 ++--- src/queue/event.h | 20 ++++++++++---------- 2 files changed, 12 insertions(+), 13 deletions(-) diff --git a/src/hooks/hooks.cc b/src/hooks/hooks.cc index 66a1cd2..f02fd24 100644 --- a/src/hooks/hooks.cc +++ b/src/hooks/hooks.cc @@ -462,8 +462,8 @@ static void CopyParallelWorkerInfo([[maybe_unused]] PschEvent* event, } static void BuildEventFromQueryDesc(QueryDesc* query_desc, PschEvent* event, - TimestampTz query_start_ts, int64 cpu_user_us, - int64 cpu_sys_us, uint64 parent_query_id) { + TimestampTz query_start_ts, int64 cpu_user_us, int64 cpu_sys_us, + uint64 parent_query_id) { InitBaseEvent(event, query_start_ts, parent_query_id, ConvertCmdType(query_desc->operation)); event->queryid = query_desc->plannedstmt->queryId; event->rows = query_desc->estate->es_processed; @@ -797,7 +797,6 @@ static void PschProcessUtility(PlannedStmt* pstmt, const char* queryString, } } - BufferUsage bufusage_delta; WalUsage walusage_delta; MemSet(&bufusage_delta, 0, sizeof(BufferUsage)); diff --git a/src/queue/event.h b/src/queue/event.h index a7b7c32..c8c3bcc 100644 --- a/src/queue/event.h +++ b/src/queue/event.h @@ -75,16 +75,16 @@ struct PschEvent { uint64 duration_us; // Execution duration in microseconds // Identity - Oid dbid; // Database OID - Oid userid; // User OID - char datname[64]; // Database name (NAMEDATALEN=64, resolved at capture) - uint8 datname_len; // Actual length - char username[64]; // User name (NAMEDATALEN=64, resolved at capture) - uint8 username_len; // Actual length - int32 pid; // Backend process ID - uint64 queryid; // Query ID (from pg_stat_statements) - uint64 parent_query_id; // Query ID of the calling query (0 if top-level) - PschCmdType cmd_type; // Command type (SELECT, UPDATE, etc.) + Oid dbid; // Database OID + Oid userid; // User OID + char datname[64]; // Database name (NAMEDATALEN=64, resolved at capture) + uint8 datname_len; // Actual length + char username[64]; // User name (NAMEDATALEN=64, resolved at capture) + uint8 username_len; // Actual length + int32 pid; // Backend process ID + uint64 queryid; // Query ID (from pg_stat_statements) + uint64 parent_query_id; // Query ID of the calling query (0 if top-level) + PschCmdType cmd_type; // Command type (SELECT, UPDATE, etc.) // Results uint64 rows; // Number of rows affected/returned From 7baa9e68637df48cdc1e63421baf751b69c14c97 Mon Sep 17 00:00:00 2001 From: Josh Ventura Date: Thu, 9 Apr 2026 23:17:03 -0400 Subject: [PATCH 4/6] refactor: eliminate local frame copies in query stack push/pop Push sites now write directly into query_stack[depth] instead of populating a local PschQueryFrame and assigning it to the array slot. Pop sites now use a const pointer into the array (nullptr when depth exceeded the cap) instead of copying out a zeroed PschQueryFrame. CPU and start-time fields are only read through the pointer when non-null, preserving the zero-CPU / zero-ts behavior for the overflow case without the unnecessary struct copy. Co-Authored-By: Claude Sonnet 4.6 --- src/hooks/hooks.cc | 52 +++++++++++++++++++++------------------------- 1 file changed, 24 insertions(+), 28 deletions(-) diff --git a/src/hooks/hooks.cc b/src/hooks/hooks.cc index f02fd24..c6152a7 100644 --- a/src/hooks/hooks.cc +++ b/src/hooks/hooks.cc @@ -553,12 +553,10 @@ static void PschExecutorStart(QueryDesc* query_desc, int eflags) { // that nested SPI calls each get an accurate, non-overlapping CPU delta. // The stack depth implicitly tracks nesting; the frame below us on the stack // supplies the parent_query_id for this query's event. - PschQueryFrame frame; - frame.queryid = query_desc->plannedstmt->queryId; - frame.query_start_ts = GetCurrentTimestamp(); - getrusage(RUSAGE_SELF, &frame.rusage_start); if (query_stack_depth < kMaxQueryNestingDepth) { - query_stack[query_stack_depth] = frame; + query_stack[query_stack_depth].queryid = query_desc->plannedstmt->queryId; + query_stack[query_stack_depth].query_start_ts = GetCurrentTimestamp(); + getrusage(RUSAGE_SELF, &query_stack[query_stack_depth].rusage_start); } query_stack_depth++; @@ -622,13 +620,12 @@ static void PschExecutorEnd(QueryDesc* query_desc) { } // Pop the frame pushed in ExecutorStart. If depth exceeded the array cap, - // the frame is zeroed — the event is still emitted but with zero CPU. - PschQueryFrame frame = {}; + // frame is null — the event is still emitted but with zero CPU. + const PschQueryFrame* frame = nullptr; if (query_stack_depth > 0) { query_stack_depth--; - if (query_stack_depth < kMaxQueryNestingDepth) { - frame = query_stack[query_stack_depth]; - } + if (query_stack_depth < kMaxQueryNestingDepth) + frame = &query_stack[query_stack_depth]; } if (!psch_enabled || query_desc->plannedstmt->queryId == UINT64CONST(0)) { @@ -649,9 +646,9 @@ static void PschExecutorEnd(QueryDesc* query_desc) { int64 cpu_user_us = 0; int64 cpu_sys_us = 0; struct rusage rusage_end; - if (getrusage(RUSAGE_SELF, &rusage_end) == 0) { - cpu_user_us = TimeDiffMicrosec(rusage_end.ru_utime, frame.rusage_start.ru_utime); - cpu_sys_us = TimeDiffMicrosec(rusage_end.ru_stime, frame.rusage_start.ru_stime); + if (getrusage(RUSAGE_SELF, &rusage_end) == 0 && frame != nullptr) { + cpu_user_us = TimeDiffMicrosec(rusage_end.ru_utime, frame->rusage_start.ru_utime); + cpu_sys_us = TimeDiffMicrosec(rusage_end.ru_stime, frame->rusage_start.ru_stime); } // The frame below us on the stack is our caller; its queryid is our parent. @@ -660,7 +657,8 @@ static void PschExecutorEnd(QueryDesc* query_desc) { (parent_idx >= 0 && parent_idx < kMaxQueryNestingDepth) ? query_stack[parent_idx].queryid : 0; PschEvent event; - BuildEventFromQueryDesc(query_desc, &event, frame.query_start_ts, cpu_user_us, cpu_sys_us, + BuildEventFromQueryDesc(query_desc, &event, frame ? frame->query_start_ts : 0, cpu_user_us, + cpu_sys_us, parent_query_id); PschEnqueueEvent(&event); @@ -766,12 +764,10 @@ static void PschProcessUtility(PlannedStmt* pstmt, const char* queryString, uint64 parent_query_id = (parent_idx >= 0 && parent_idx < kMaxQueryNestingDepth) ? query_stack[parent_idx].queryid : 0; - PschQueryFrame frame; - frame.queryid = 0; // utility statements have no queryId - frame.query_start_ts = GetCurrentTimestamp(); - getrusage(RUSAGE_SELF, &frame.rusage_start); if (query_stack_depth < kMaxQueryNestingDepth) { - query_stack[query_stack_depth] = frame; + query_stack[query_stack_depth].queryid = 0; // utility statements have no queryId + query_stack[query_stack_depth].query_start_ts = GetCurrentTimestamp(); + getrusage(RUSAGE_SELF, &query_stack[query_stack_depth].rusage_start); } query_stack_depth++; @@ -788,13 +784,12 @@ static void PschProcessUtility(PlannedStmt* pstmt, const char* queryString, INSTR_TIME_SET_CURRENT(duration); INSTR_TIME_SUBTRACT(duration, start_time); - // Pop our frame and compute CPU delta. Zeroed if depth exceeded the cap. - PschQueryFrame popped = {}; + // Pop our frame and compute CPU delta. Null if depth exceeded the cap. + const PschQueryFrame* popped = nullptr; if (query_stack_depth > 0) { query_stack_depth--; - if (query_stack_depth < kMaxQueryNestingDepth) { - popped = query_stack[query_stack_depth]; - } + if (query_stack_depth < kMaxQueryNestingDepth) + popped = &query_stack[query_stack_depth]; } BufferUsage bufusage_delta; @@ -807,13 +802,14 @@ static void PschProcessUtility(PlannedStmt* pstmt, const char* queryString, int64 cpu_user_us = 0; int64 cpu_sys_us = 0; struct rusage rusage_end; - if (getrusage(RUSAGE_SELF, &rusage_end) == 0) { - cpu_user_us = TimeDiffMicrosec(rusage_end.ru_utime, popped.rusage_start.ru_utime); - cpu_sys_us = TimeDiffMicrosec(rusage_end.ru_stime, popped.rusage_start.ru_stime); + if (getrusage(RUSAGE_SELF, &rusage_end) == 0 && popped != nullptr) { + cpu_user_us = TimeDiffMicrosec(rusage_end.ru_utime, popped->rusage_start.ru_utime); + cpu_sys_us = TimeDiffMicrosec(rusage_end.ru_stime, popped->rusage_start.ru_stime); } PschEvent event; - BuildEventForUtility(&event, queryString, popped.query_start_ts, stmt_location, stmt_len, + BuildEventForUtility(&event, queryString, popped ? popped->query_start_ts : 0, stmt_location, + stmt_len, INSTR_TIME_GET_MICROSEC(duration), parent_query_id, GetUtilityRowCount(qc), &bufusage_delta, &walusage_delta, cpu_user_us, cpu_sys_us); PschEnqueueEvent(&event); From 89c7140cb3a80b367e6403f3ab9577f88986eeb3 Mon Sep 17 00:00:00 2001 From: Josh Ventura Date: Fri, 10 Apr 2026 10:47:44 -0400 Subject: [PATCH 5/6] Cast parent_query_id to int64_t before appending Just for consistency with the other cast... I don't know why that's being done in the first place... --- src/export/stats_exporter.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/export/stats_exporter.cc b/src/export/stats_exporter.cc index d21c795..b961fc7 100644 --- a/src/export/stats_exporter.cc +++ b/src/export/stats_exporter.cc @@ -149,7 +149,7 @@ void ExportEventStats(const std::vector& events, StatsExporter* expor col_username->Append(std::string(ev.username, ev.username_len)); col_pid->Append(ev.pid); col_query_id->Append(static_cast(ev.queryid)); - col_parent_query_id->Append(ev.parent_query_id); + col_parent_query_id->Append(static_cast(ev.parent_query_id)); col_cmd_type->Append(CmdTypeToString(ev.cmd_type)); col_rows->Append(ev.rows); From a0ab116244b84735313f7b0210655cd1227673cc Mon Sep 17 00:00:00 2001 From: Josh Ventura Date: Fri, 10 Apr 2026 14:40:39 -0400 Subject: [PATCH 6/6] refactor: extract query stack helpers and store CPU delta in frame - Add QueryStackPush/QueryStackPop helpers to consolidate the push/pop logic (including rusage capture and CPU delta computation) that was duplicated across ExecutorStart, ExecutorEnd, and PschProcessUtility - Add cpu_user_us/cpu_sys_us to PschQueryFrame; QueryStackPop computes the delta in-place, eliminating ComputeCpuDelta and its out-params - Pass frame pointer directly into BuildEventFromQueryDesc and BuildEventForUtility instead of unpacking start_ts + CPU ints - Reduce kMaxQueryNestingDepth from 64 to 8 Co-Authored-By: Claude Sonnet 4.6 --- src/hooks/hooks.cc | 120 ++++++++++++++++++++++----------------------- 1 file changed, 58 insertions(+), 62 deletions(-) diff --git a/src/hooks/hooks.cc b/src/hooks/hooks.cc index c6152a7..4151792 100644 --- a/src/hooks/hooks.cc +++ b/src/hooks/hooks.cc @@ -61,9 +61,11 @@ static emit_log_hook_type prev_emit_log_hook = nullptr; struct PschQueryFrame { struct rusage rusage_start; TimestampTz query_start_ts; - uint64 queryid; // used as parent_query_id by the next inner query + uint64 queryid; // used as parent_query_id by the next inner query + int64 cpu_user_us; // set at pop time + int64 cpu_sys_us; // set at pop time }; -constexpr int kMaxQueryNestingDepth = 64; +constexpr int kMaxQueryNestingDepth = 8; static std::array query_stack; static int query_stack_depth = 0; @@ -461,14 +463,47 @@ static void CopyParallelWorkerInfo([[maybe_unused]] PschEvent* event, #endif } +// Push a frame onto the query stack, capturing timestamp and rusage baseline. +// If depth exceeds the cap the frame is not written, but depth is still incremented +// so the corresponding pop stays balanced. +static void QueryStackPush(uint64 queryid) { + if (query_stack_depth < kMaxQueryNestingDepth) { + query_stack[query_stack_depth].queryid = queryid; + query_stack[query_stack_depth].query_start_ts = GetCurrentTimestamp(); + getrusage(RUSAGE_SELF, &query_stack[query_stack_depth].rusage_start); + } + query_stack_depth++; +} + +// Pop a frame from the query stack and compute its CPU delta in place. +// Returns a pointer into the array, or null if the stack was empty or depth +// had exceeded the cap (no frame was written). cpu_user_us/cpu_sys_us are +// zeroed on failure or null frame. +static PschQueryFrame* QueryStackPop() { + if (query_stack_depth == 0) + return nullptr; + query_stack_depth--; + if (query_stack_depth >= kMaxQueryNestingDepth) + return nullptr; + PschQueryFrame* frame = &query_stack[query_stack_depth]; + frame->cpu_user_us = 0; + frame->cpu_sys_us = 0; + struct rusage rusage_end; + if (getrusage(RUSAGE_SELF, &rusage_end) == 0) { + frame->cpu_user_us = TimeDiffMicrosec(rusage_end.ru_utime, frame->rusage_start.ru_utime); + frame->cpu_sys_us = TimeDiffMicrosec(rusage_end.ru_stime, frame->rusage_start.ru_stime); + } + return frame; +} + static void BuildEventFromQueryDesc(QueryDesc* query_desc, PschEvent* event, - TimestampTz query_start_ts, int64 cpu_user_us, int64 cpu_sys_us, - uint64 parent_query_id) { + const PschQueryFrame* frame, uint64 parent_query_id) { + TimestampTz query_start_ts = frame ? frame->query_start_ts : 0; InitBaseEvent(event, query_start_ts, parent_query_id, ConvertCmdType(query_desc->operation)); event->queryid = query_desc->plannedstmt->queryId; event->rows = query_desc->estate->es_processed; - event->cpu_user_time_us = cpu_user_us; - event->cpu_sys_time_us = cpu_sys_us; + event->cpu_user_time_us = frame ? frame->cpu_user_us : 0; + event->cpu_sys_time_us = frame ? frame->cpu_sys_us : 0; // Instrumentation data (duration, buffer, WAL) if (query_desc->totaltime != nullptr) { @@ -553,12 +588,7 @@ static void PschExecutorStart(QueryDesc* query_desc, int eflags) { // that nested SPI calls each get an accurate, non-overlapping CPU delta. // The stack depth implicitly tracks nesting; the frame below us on the stack // supplies the parent_query_id for this query's event. - if (query_stack_depth < kMaxQueryNestingDepth) { - query_stack[query_stack_depth].queryid = query_desc->plannedstmt->queryId; - query_stack[query_stack_depth].query_start_ts = GetCurrentTimestamp(); - getrusage(RUSAGE_SELF, &query_stack[query_stack_depth].rusage_start); - } - query_stack_depth++; + QueryStackPush(query_desc->plannedstmt->queryId); if (prev_executor_start != nullptr) { prev_executor_start(query_desc, eflags); @@ -619,14 +649,9 @@ static void PschExecutorEnd(QueryDesc* query_desc) { return; } - // Pop the frame pushed in ExecutorStart. If depth exceeded the array cap, - // frame is null — the event is still emitted but with zero CPU. - const PschQueryFrame* frame = nullptr; - if (query_stack_depth > 0) { - query_stack_depth--; - if (query_stack_depth < kMaxQueryNestingDepth) - frame = &query_stack[query_stack_depth]; - } + // Pop the frame pushed in ExecutorStart. Null if depth exceeded the cap — + // the event is still emitted but with zero CPU/timestamp. + const PschQueryFrame* frame = QueryStackPop(); if (!psch_enabled || query_desc->plannedstmt->queryId == UINT64CONST(0)) { ForgetNormalizedStatement(query_desc->sourceText, query_desc->plannedstmt->stmt_location, @@ -643,23 +668,13 @@ static void PschExecutorEnd(QueryDesc* query_desc) { InstrEndLoop(query_desc->totaltime); } - int64 cpu_user_us = 0; - int64 cpu_sys_us = 0; - struct rusage rusage_end; - if (getrusage(RUSAGE_SELF, &rusage_end) == 0 && frame != nullptr) { - cpu_user_us = TimeDiffMicrosec(rusage_end.ru_utime, frame->rusage_start.ru_utime); - cpu_sys_us = TimeDiffMicrosec(rusage_end.ru_stime, frame->rusage_start.ru_stime); - } - // The frame below us on the stack is our caller; its queryid is our parent. int parent_idx = query_stack_depth - 1; uint64 parent_query_id = (parent_idx >= 0 && parent_idx < kMaxQueryNestingDepth) ? query_stack[parent_idx].queryid : 0; PschEvent event; - BuildEventFromQueryDesc(query_desc, &event, frame ? frame->query_start_ts : 0, cpu_user_us, - cpu_sys_us, - parent_query_id); + BuildEventFromQueryDesc(query_desc, &event, frame, parent_query_id); PschEnqueueEvent(&event); if (prev_executor_end != nullptr) { @@ -670,15 +685,15 @@ static void PschExecutorEnd(QueryDesc* query_desc) { } // Build a PschEvent for utility statements (no QueryDesc available) -static void BuildEventForUtility(PschEvent* event, const char* queryString, TimestampTz start_ts, - int stmt_location, int stmt_len, uint64 duration_us, - uint64 parent_query_id, uint64 rows, BufferUsage* bufusage, - WalUsage* walusage, int64 cpu_user_us, int64 cpu_sys_us) { - InitBaseEvent(event, start_ts, parent_query_id, PSCH_CMD_UTILITY); +static void BuildEventForUtility(PschEvent* event, const char* queryString, + const PschQueryFrame* frame, int stmt_location, int stmt_len, + uint64 duration_us, uint64 parent_query_id, uint64 rows, + BufferUsage* bufusage, WalUsage* walusage) { + InitBaseEvent(event, frame ? frame->query_start_ts : 0, parent_query_id, PSCH_CMD_UTILITY); event->duration_us = duration_us; event->rows = rows; - event->cpu_user_time_us = cpu_user_us; - event->cpu_sys_time_us = cpu_sys_us; + event->cpu_user_time_us = frame ? frame->cpu_user_us : 0; + event->cpu_sys_time_us = frame ? frame->cpu_sys_us : 0; CopyBufferUsage(event, bufusage); CopyIoTiming(event, bufusage); @@ -764,12 +779,7 @@ static void PschProcessUtility(PlannedStmt* pstmt, const char* queryString, uint64 parent_query_id = (parent_idx >= 0 && parent_idx < kMaxQueryNestingDepth) ? query_stack[parent_idx].queryid : 0; - if (query_stack_depth < kMaxQueryNestingDepth) { - query_stack[query_stack_depth].queryid = 0; // utility statements have no queryId - query_stack[query_stack_depth].query_start_ts = GetCurrentTimestamp(); - getrusage(RUSAGE_SELF, &query_stack[query_stack_depth].rusage_start); - } - query_stack_depth++; + QueryStackPush(0); // utility statements have no queryId int stmt_location = pstmt->stmt_location; int stmt_len = pstmt->stmt_len; @@ -784,13 +794,8 @@ static void PschProcessUtility(PlannedStmt* pstmt, const char* queryString, INSTR_TIME_SET_CURRENT(duration); INSTR_TIME_SUBTRACT(duration, start_time); - // Pop our frame and compute CPU delta. Null if depth exceeded the cap. - const PschQueryFrame* popped = nullptr; - if (query_stack_depth > 0) { - query_stack_depth--; - if (query_stack_depth < kMaxQueryNestingDepth) - popped = &query_stack[query_stack_depth]; - } + // Pop our frame. Null if depth exceeded the cap. + const PschQueryFrame* popped = QueryStackPop(); BufferUsage bufusage_delta; WalUsage walusage_delta; @@ -799,19 +804,10 @@ static void PschProcessUtility(PlannedStmt* pstmt, const char* queryString, BufferUsageAccumDiff(&bufusage_delta, &pgBufferUsage, &bufusage_start); WalUsageAccumDiff(&walusage_delta, &pgWalUsage, &walusage_start); - int64 cpu_user_us = 0; - int64 cpu_sys_us = 0; - struct rusage rusage_end; - if (getrusage(RUSAGE_SELF, &rusage_end) == 0 && popped != nullptr) { - cpu_user_us = TimeDiffMicrosec(rusage_end.ru_utime, popped->rusage_start.ru_utime); - cpu_sys_us = TimeDiffMicrosec(rusage_end.ru_stime, popped->rusage_start.ru_stime); - } - PschEvent event; - BuildEventForUtility(&event, queryString, popped ? popped->query_start_ts : 0, stmt_location, - stmt_len, + BuildEventForUtility(&event, queryString, popped, stmt_location, stmt_len, INSTR_TIME_GET_MICROSEC(duration), parent_query_id, GetUtilityRowCount(qc), - &bufusage_delta, &walusage_delta, cpu_user_us, cpu_sys_us); + &bufusage_delta, &walusage_delta); PschEnqueueEvent(&event); }