diff --git a/docker/init/00-schema.sql b/docker/init/00-schema.sql index 4b15ef2..c353915 100644 --- a/docker/init/00-schema.sql +++ b/docker/init/00-schema.sql @@ -53,6 +53,8 @@ CREATE TABLE pg_stat_ch.events_raw query_id Int64 COMMENT '64-bit hash identifying normalized queries. Queries differing only in constants share the same query_id. Use for aggregating statistics across similar queries.', + parent_query_id UInt64 COMMENT 'query_id of the calling query (e.g. the plpgsql function that issued this SPI statement). 0 for top-level queries. Use WHERE parent_query_id = 0 to restrict aggregations to top-level queries and avoid double-counting CPU and duration.', + cmd_type LowCardinality(String) COMMENT 'Command type: SELECT, INSERT, UPDATE, DELETE, MERGE, UTILITY, or UNKNOWN. Use for workload characterization (read-heavy vs write-heavy).', rows UInt64 COMMENT 'Rows returned (SELECT) or affected (INSERT/UPDATE/DELETE). HIGH: large result sets or bulk operations. LOW: point queries. Watch for unexpected HIGH values indicating missing WHERE clauses.', diff --git a/migrations/001_add_parent_query_id.sql b/migrations/001_add_parent_query_id.sql new file mode 100644 index 0000000..3b49bf7 --- /dev/null +++ b/migrations/001_add_parent_query_id.sql @@ -0,0 +1,12 @@ +-- Migration: add parent_query_id column +-- +-- Introduced in pg_stat_ch 0.4.x. Each event now carries the query_id of its +-- calling query (e.g. the plpgsql function that issued an SPI statement). +-- Top-level queries emit 0. Use WHERE parent_query_id = 0 in aggregations to +-- avoid double-counting CPU and duration across nested calls. +-- +-- Run against your ClickHouse instance before upgrading the extension: +-- clickhouse-client < migrations/001_add_parent_query_id.sql + +ALTER TABLE pg_stat_ch.events_raw + ADD COLUMN IF NOT EXISTS parent_query_id UInt64 DEFAULT 0; diff --git a/src/export/stats_exporter.cc b/src/export/stats_exporter.cc index e11af99..b961fc7 100644 --- a/src/export/stats_exporter.cc +++ b/src/export/stats_exporter.cc @@ -93,6 +93,7 @@ void ExportEventStats(const std::vector& events, StatsExporter* expor auto col_username = exporter->DbUserColumn(); auto col_pid = exporter->RecordInt32("pid"); auto col_query_id = exporter->RecordInt64("query_id"); + auto col_parent_query_id = exporter->RecordUInt64("parent_query_id"); auto col_cmd_type = exporter->DbOperationColumn(); auto col_rows = exporter->MetricUInt64("rows"); auto col_query = exporter->DbQueryTextColumn(); @@ -148,6 +149,7 @@ void ExportEventStats(const std::vector& events, StatsExporter* expor col_username->Append(std::string(ev.username, ev.username_len)); col_pid->Append(ev.pid); col_query_id->Append(static_cast(ev.queryid)); + col_parent_query_id->Append(static_cast(ev.parent_query_id)); col_cmd_type->Append(CmdTypeToString(ev.cmd_type)); col_rows->Append(ev.rows); diff --git a/src/hooks/hooks.cc b/src/hooks/hooks.cc index c76bfc8..4151792 100644 --- a/src/hooks/hooks.cc +++ b/src/hooks/hooks.cc @@ -48,21 +48,30 @@ static ExecutorEnd_hook_type prev_executor_end = nullptr; static ProcessUtility_hook_type prev_process_utility = nullptr; static emit_log_hook_type prev_emit_log_hook = nullptr; -// Track nesting level to identify top-level queries -static int nesting_level = 0; - -// CPU time tracking via getrusage -static struct rusage rusage_start; +// Per-query frame pushed at ExecutorStart / PschProcessUtility and popped at +// ExecutorEnd / end of PschProcessUtility. The stack naturally tracks nesting +// depth and provides each query with its own rusage baseline and start time, +// fixing the prior single-static design that produced overlapping CPU deltas +// for nested SPI calls. +// +// Fixed-size to avoid heap allocation: std::vector::push_back throws +// std::bad_alloc on OOM, which is incompatible with PostgreSQL's longjmp-based +// error handling. Real-world nesting depth is small; 64 levels is a generous +// ceiling even for deeply recursive plpgsql. +struct PschQueryFrame { + struct rusage rusage_start; + TimestampTz query_start_ts; + uint64 queryid; // used as parent_query_id by the next inner query + int64 cpu_user_us; // set at pop time + int64 cpu_sys_us; // set at pop time +}; +constexpr int kMaxQueryNestingDepth = 8; +static std::array query_stack; +static int query_stack_depth = 0; // Deadlock prevention for emit_log_hook static bool disable_error_capture = false; -// Track whether the current query started at top level -static bool current_query_is_top_level = false; - -// Track query start time for duration calculation -static TimestampTz query_start_ts = 0; - // System initialization flag - set after hooks are installed and shmem is ready static bool system_init = false; @@ -326,14 +335,14 @@ static void InitEventPartial(PschEvent* event) { event->query[0] = '\0'; } -static void InitBaseEvent(PschEvent* event, TimestampTz ts_start, bool top_level, +static void InitBaseEvent(PschEvent* event, TimestampTz ts_start, uint64 parent_query_id, PschCmdType cmd_type) { InitEventPartial(event); event->ts_start = ts_start; event->dbid = MyDatabaseId; event->userid = GetUserId(); event->pid = MyProcPid; - event->top_level = top_level; + event->parent_query_id = parent_query_id; event->cmd_type = cmd_type; ResolveNames(event); } @@ -454,14 +463,47 @@ static void CopyParallelWorkerInfo([[maybe_unused]] PschEvent* event, #endif } -static void BuildEventFromQueryDesc(QueryDesc* query_desc, PschEvent* event, int64 cpu_user_us, - int64 cpu_sys_us) { - InitBaseEvent(event, query_start_ts, current_query_is_top_level, - ConvertCmdType(query_desc->operation)); +// Push a frame onto the query stack, capturing timestamp and rusage baseline. +// If depth exceeds the cap the frame is not written, but depth is still incremented +// so the corresponding pop stays balanced. +static void QueryStackPush(uint64 queryid) { + if (query_stack_depth < kMaxQueryNestingDepth) { + query_stack[query_stack_depth].queryid = queryid; + query_stack[query_stack_depth].query_start_ts = GetCurrentTimestamp(); + getrusage(RUSAGE_SELF, &query_stack[query_stack_depth].rusage_start); + } + query_stack_depth++; +} + +// Pop a frame from the query stack and compute its CPU delta in place. +// Returns a pointer into the array, or null if the stack was empty or depth +// had exceeded the cap (no frame was written). cpu_user_us/cpu_sys_us are +// zeroed on failure or null frame. +static PschQueryFrame* QueryStackPop() { + if (query_stack_depth == 0) + return nullptr; + query_stack_depth--; + if (query_stack_depth >= kMaxQueryNestingDepth) + return nullptr; + PschQueryFrame* frame = &query_stack[query_stack_depth]; + frame->cpu_user_us = 0; + frame->cpu_sys_us = 0; + struct rusage rusage_end; + if (getrusage(RUSAGE_SELF, &rusage_end) == 0) { + frame->cpu_user_us = TimeDiffMicrosec(rusage_end.ru_utime, frame->rusage_start.ru_utime); + frame->cpu_sys_us = TimeDiffMicrosec(rusage_end.ru_stime, frame->rusage_start.ru_stime); + } + return frame; +} + +static void BuildEventFromQueryDesc(QueryDesc* query_desc, PschEvent* event, + const PschQueryFrame* frame, uint64 parent_query_id) { + TimestampTz query_start_ts = frame ? frame->query_start_ts : 0; + InitBaseEvent(event, query_start_ts, parent_query_id, ConvertCmdType(query_desc->operation)); event->queryid = query_desc->plannedstmt->queryId; event->rows = query_desc->estate->es_processed; - event->cpu_user_time_us = cpu_user_us; - event->cpu_sys_time_us = cpu_sys_us; + event->cpu_user_time_us = frame ? frame->cpu_user_us : 0; + event->cpu_sys_time_us = frame ? frame->cpu_sys_us : 0; // Instrumentation data (duration, buffer, WAL) if (query_desc->totaltime != nullptr) { @@ -542,17 +584,11 @@ static void PschExecutorStart(QueryDesc* query_desc, int eflags) { return; } - // Record if this is a top-level query (before nesting_level changes in Run) - if (nesting_level == 0) { - current_query_is_top_level = true; - query_start_ts = GetCurrentTimestamp(); - // Capture CPU time baseline for top-level queries - if (psch_enabled) { - getrusage(RUSAGE_SELF, &rusage_start); - } - } else { - current_query_is_top_level = false; - } + // Push a per-query frame. Each frame captures its own rusage baseline so + // that nested SPI calls each get an accurate, non-overlapping CPU delta. + // The stack depth implicitly tracks nesting; the frame below us on the stack + // supplies the parent_query_id for this query's event. + QueryStackPush(query_desc->plannedstmt->queryId); if (prev_executor_start != nullptr) { prev_executor_start(query_desc, eflags); @@ -575,75 +611,49 @@ static void PschExecutorStart(QueryDesc* query_desc, int eflags) { #if PG_VERSION_NUM >= 180000 static void PschExecutorRun(QueryDesc* query_desc, ScanDirection direction, uint64 count) { + if (prev_executor_run != nullptr) { + prev_executor_run(query_desc, direction, count); + } else { + standard_ExecutorRun(query_desc, direction, count); + } +} #else static void PschExecutorRun(QueryDesc* query_desc, ScanDirection direction, uint64 count, bool execute_once) { -#endif - if (IsParallelWorker()) { -#if PG_VERSION_NUM >= 180000 - if (prev_executor_run != nullptr) { - prev_executor_run(query_desc, direction, count); - } else { - standard_ExecutorRun(query_desc, direction, count); - } -#else - if (prev_executor_run != nullptr) { - prev_executor_run(query_desc, direction, count, execute_once); - } else { - standard_ExecutorRun(query_desc, direction, count, execute_once); - } -#endif - return; + if (prev_executor_run != nullptr) { + prev_executor_run(query_desc, direction, count, execute_once); + } else { + standard_ExecutorRun(query_desc, direction, count, execute_once); } - - nesting_level++; - PG_TRY(); - { -#if PG_VERSION_NUM >= 180000 - if (prev_executor_run != nullptr) { - prev_executor_run(query_desc, direction, count); - } else { - standard_ExecutorRun(query_desc, direction, count); - } -#else - if (prev_executor_run != nullptr) { - prev_executor_run(query_desc, direction, count, execute_once); - } else { - standard_ExecutorRun(query_desc, direction, count, execute_once); - } +} #endif + +static void PschExecutorFinish(QueryDesc* query_desc) { + if (prev_executor_finish != nullptr) { + prev_executor_finish(query_desc); + } else { + standard_ExecutorFinish(query_desc); } - PG_FINALLY(); - { nesting_level--; } - PG_END_TRY(); } -static void PschExecutorFinish(QueryDesc* query_desc) { +static void PschExecutorEnd(QueryDesc* query_desc) { + // Parallel workers never push a frame — handle them separately. if (IsParallelWorker()) { - if (prev_executor_finish != nullptr) { - prev_executor_finish(query_desc); + ForgetNormalizedStatement(query_desc->sourceText, query_desc->plannedstmt->stmt_location, + query_desc->plannedstmt->stmt_len); + if (prev_executor_end != nullptr) { + prev_executor_end(query_desc); } else { - standard_ExecutorFinish(query_desc); + standard_ExecutorEnd(query_desc); } return; } - nesting_level++; - PG_TRY(); - { - if (prev_executor_finish != nullptr) { - prev_executor_finish(query_desc); - } else { - standard_ExecutorFinish(query_desc); - } - } - PG_FINALLY(); - { nesting_level--; } - PG_END_TRY(); -} + // Pop the frame pushed in ExecutorStart. Null if depth exceeded the cap — + // the event is still emitted but with zero CPU/timestamp. + const PschQueryFrame* frame = QueryStackPop(); -static void PschExecutorEnd(QueryDesc* query_desc) { - if (!psch_enabled || IsParallelWorker() || query_desc->plannedstmt->queryId == UINT64CONST(0)) { + if (!psch_enabled || query_desc->plannedstmt->queryId == UINT64CONST(0)) { ForgetNormalizedStatement(query_desc->sourceText, query_desc->plannedstmt->stmt_location, query_desc->plannedstmt->stmt_len); if (prev_executor_end != nullptr) { @@ -658,17 +668,13 @@ static void PschExecutorEnd(QueryDesc* query_desc) { InstrEndLoop(query_desc->totaltime); } - // Compute CPU time delta from getrusage - int64 cpu_user_us = 0; - int64 cpu_sys_us = 0; - struct rusage rusage_end; - if (getrusage(RUSAGE_SELF, &rusage_end) == 0) { - cpu_user_us = TimeDiffMicrosec(rusage_end.ru_utime, rusage_start.ru_utime); - cpu_sys_us = TimeDiffMicrosec(rusage_end.ru_stime, rusage_start.ru_stime); - } + // The frame below us on the stack is our caller; its queryid is our parent. + int parent_idx = query_stack_depth - 1; + uint64 parent_query_id = + (parent_idx >= 0 && parent_idx < kMaxQueryNestingDepth) ? query_stack[parent_idx].queryid : 0; PschEvent event; - BuildEventFromQueryDesc(query_desc, &event, cpu_user_us, cpu_sys_us); + BuildEventFromQueryDesc(query_desc, &event, frame, parent_query_id); PschEnqueueEvent(&event); if (prev_executor_end != nullptr) { @@ -679,15 +685,15 @@ static void PschExecutorEnd(QueryDesc* query_desc) { } // Build a PschEvent for utility statements (no QueryDesc available) -static void BuildEventForUtility(PschEvent* event, const char* queryString, TimestampTz start_ts, - int stmt_location, int stmt_len, uint64 duration_us, - bool is_top_level, uint64 rows, BufferUsage* bufusage, - WalUsage* walusage, int64 cpu_user_us, int64 cpu_sys_us) { - InitBaseEvent(event, start_ts, is_top_level, PSCH_CMD_UTILITY); +static void BuildEventForUtility(PschEvent* event, const char* queryString, + const PschQueryFrame* frame, int stmt_location, int stmt_len, + uint64 duration_us, uint64 parent_query_id, uint64 rows, + BufferUsage* bufusage, WalUsage* walusage) { + InitBaseEvent(event, frame ? frame->query_start_ts : 0, parent_query_id, PSCH_CMD_UTILITY); event->duration_us = duration_us; event->rows = rows; - event->cpu_user_time_us = cpu_user_us; - event->cpu_sys_time_us = cpu_sys_us; + event->cpu_user_time_us = frame ? frame->cpu_user_us : 0; + event->cpu_sys_time_us = frame ? frame->cpu_sys_us : 0; CopyBufferUsage(event, bufusage); CopyIoTiming(event, bufusage); @@ -746,21 +752,6 @@ static uint64 GetUtilityRowCount(QueryCompletion* qc) { } } -static void ExecuteUtilityWithNesting(PlannedStmt* pstmt, const char* queryString, -#if PG_VERSION_NUM >= 140000 - bool readOnlyTree, -#endif - ProcessUtilityContext context, ParamListInfo params, - QueryEnvironment* queryEnv, DestReceiver* dest, - QueryCompletion* qc) { - nesting_level++; - PG_TRY(); - { CALL_PROCESS_UTILITY(); } - PG_FINALLY(); - { nesting_level--; } - PG_END_TRY(); -} - // ProcessUtility hook - captures DDL and utility statements #if PG_VERSION_NUM >= 140000 static void PschProcessUtility(PlannedStmt* pstmt, const char* queryString, bool readOnlyTree, @@ -781,28 +772,31 @@ static void PschProcessUtility(PlannedStmt* pstmt, const char* queryString, return; } - // Capture state before execution - bool is_top_level = (nesting_level == 0); - TimestampTz start_ts = GetCurrentTimestamp(); + // Capture parent before pushing our own frame, then push so that any + // executor hooks fired from within this utility (e.g. CREATE TABLE AS SELECT) + // see us on the stack and correctly link themselves as our children. + int parent_idx = query_stack_depth - 1; + uint64 parent_query_id = + (parent_idx >= 0 && parent_idx < kMaxQueryNestingDepth) ? query_stack[parent_idx].queryid : 0; + + QueryStackPush(0); // utility statements have no queryId + int stmt_location = pstmt->stmt_location; int stmt_len = pstmt->stmt_len; BufferUsage bufusage_start = pgBufferUsage; WalUsage walusage_start = pgWalUsage; - struct rusage rusage_util_start; - getrusage(RUSAGE_SELF, &rusage_util_start); instr_time start_time; INSTR_TIME_SET_CURRENT(start_time); -#if PG_VERSION_NUM >= 140000 - ExecuteUtilityWithNesting(pstmt, queryString, readOnlyTree, context, params, queryEnv, dest, qc); -#else - ExecuteUtilityWithNesting(pstmt, queryString, context, params, queryEnv, dest, qc); -#endif + CALL_PROCESS_UTILITY(); instr_time duration; INSTR_TIME_SET_CURRENT(duration); INSTR_TIME_SUBTRACT(duration, start_time); + // Pop our frame. Null if depth exceeded the cap. + const PschQueryFrame* popped = QueryStackPop(); + BufferUsage bufusage_delta; WalUsage walusage_delta; MemSet(&bufusage_delta, 0, sizeof(BufferUsage)); @@ -810,18 +804,10 @@ static void PschProcessUtility(PlannedStmt* pstmt, const char* queryString, BufferUsageAccumDiff(&bufusage_delta, &pgBufferUsage, &bufusage_start); WalUsageAccumDiff(&walusage_delta, &pgWalUsage, &walusage_start); - int64 cpu_user_us = 0; - int64 cpu_sys_us = 0; - struct rusage rusage_util_end; - if (getrusage(RUSAGE_SELF, &rusage_util_end) == 0) { - cpu_user_us = TimeDiffMicrosec(rusage_util_end.ru_utime, rusage_util_start.ru_utime); - cpu_sys_us = TimeDiffMicrosec(rusage_util_end.ru_stime, rusage_util_start.ru_stime); - } - PschEvent event; - BuildEventForUtility(&event, queryString, start_ts, stmt_location, stmt_len, - INSTR_TIME_GET_MICROSEC(duration), is_top_level, GetUtilityRowCount(qc), - &bufusage_delta, &walusage_delta, cpu_user_us, cpu_sys_us); + BuildEventForUtility(&event, queryString, popped, stmt_location, stmt_len, + INSTR_TIME_GET_MICROSEC(duration), parent_query_id, GetUtilityRowCount(qc), + &bufusage_delta, &walusage_delta); PschEnqueueEvent(&event); } @@ -867,7 +853,8 @@ static bool ShouldCaptureLog(ErrorData* edata) { // message, SQLSTATE, and client/session metadata. static void CaptureLogEvent(ErrorData* edata) { PschEvent event; - InitBaseEvent(&event, GetCurrentTimestamp(), (nesting_level == 0), PSCH_CMD_UNKNOWN); + uint64 parent_query_id = query_stack_depth > 0 ? query_stack[query_stack_depth - 1].queryid : 0; + InitBaseEvent(&event, GetCurrentTimestamp(), parent_query_id, PSCH_CMD_UNKNOWN); UnpackSqlState(edata->sqlerrcode, event.err_sqlstate); event.err_elevel = static_cast(edata->elevel); diff --git a/src/queue/event.h b/src/queue/event.h index 7289836..c8c3bcc 100644 --- a/src/queue/event.h +++ b/src/queue/event.h @@ -75,16 +75,16 @@ struct PschEvent { uint64 duration_us; // Execution duration in microseconds // Identity - Oid dbid; // Database OID - Oid userid; // User OID - char datname[64]; // Database name (NAMEDATALEN=64, resolved at capture) - uint8 datname_len; // Actual length - char username[64]; // User name (NAMEDATALEN=64, resolved at capture) - uint8 username_len; // Actual length - int32 pid; // Backend process ID - uint64 queryid; // Query ID (from pg_stat_statements) - bool top_level; // True if this is a top-level query - PschCmdType cmd_type; // Command type (SELECT, UPDATE, etc.) + Oid dbid; // Database OID + Oid userid; // User OID + char datname[64]; // Database name (NAMEDATALEN=64, resolved at capture) + uint8 datname_len; // Actual length + char username[64]; // User name (NAMEDATALEN=64, resolved at capture) + uint8 username_len; // Actual length + int32 pid; // Backend process ID + uint64 queryid; // Query ID (from pg_stat_statements) + uint64 parent_query_id; // Query ID of the calling query (0 if top-level) + PschCmdType cmd_type; // Command type (SELECT, UPDATE, etc.) // Results uint64 rows; // Number of rows affected/returned