Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 55 additions & 28 deletions src/export/stats_exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -162,18 +162,16 @@ int ExportEventsAsArrowInternal(const std::vector<PschEvent>& events, StatsExpor
// Exception barrier: Arrow + protobuf + ZSTD allocate via ::operator new and
// throw std::bad_alloc on OOM. Catching here keeps the bgworker from unwinding
// across PostgreSQL's PG_TRY/longjmp frames.
int ExportEventsAsArrow(const std::vector<PschEvent>& events, StatsExporter* exporter) {
try {
return ExportEventsAsArrowInternal(events, exporter);
} catch (const std::bad_alloc&) {
LogExporterWarning("Arrow export", "out of memory");
RecordExporterFailure("Arrow export OOM");
return 0;
} catch (const std::exception& e) {
LogExporterWarning("Arrow export exception", e.what());
RecordExporterFailure(e.what());
return 0;
}
int ExportEventsAsArrow(const std::vector<PschEvent>& events, StatsExporter* exporter) try {
return ExportEventsAsArrowInternal(events, exporter);
} catch (const std::bad_alloc&) {
LogExporterWarning("Arrow export", "out of memory");
RecordExporterFailure("Arrow export OOM");
return 0;
} catch (const std::exception& e) {
LogExporterWarning("Arrow export exception", e.what());
RecordExporterFailure(e.what());
return 0;
}

// Build and export stats (records, metrics, ClickHouse rows) from events
Expand Down Expand Up @@ -304,19 +302,17 @@ void ExportEventStatsInternal(const std::vector<PschEvent>& events, StatsExporte
// throw std::bad_alloc. Catching here prevents the throw from crossing the
// bgworker's PG_TRY frame. Returns false on failure so the caller skips
// CommitBatch and avoids flushing partial / stale exporter state.
bool ExportEventStats(const std::vector<PschEvent>& events, StatsExporter* exporter) {
try {
ExportEventStatsInternal(events, exporter);
return true;
} catch (const std::bad_alloc&) {
LogExporterWarning("event stats export", "out of memory");
RecordExporterFailure("event stats OOM");
return false;
} catch (const std::exception& e) {
LogExporterWarning("event stats exception", e.what());
RecordExporterFailure(e.what());
return false;
}
bool ExportEventStats(const std::vector<PschEvent>& events, StatsExporter* exporter) try {
ExportEventStatsInternal(events, exporter);
return true;
} catch (const std::bad_alloc&) {
LogExporterWarning("event stats export", "out of memory");
RecordExporterFailure("event stats OOM");
return false;
} catch (const std::exception& e) {
LogExporterWarning("event stats exception", e.what());
RecordExporterFailure(e.what());
return false;
}

} // namespace
Expand All @@ -342,16 +338,30 @@ void LogNegativeValue(const std::string& column_name, int64_t value) {

extern "C" {

bool PschExporterInit(void) {
// Exception barrier: factory and connection setup allocate via std::make_unique
// and can throw std::bad_alloc. Catching here prevents the throw from crossing
// the bgworker's PG_TRY frame.
bool PschExporterInit(void) try {
if (psch_use_otel) {
g_exporter.exporter = MakeOpenTelemetryExporter();
} else {
g_exporter.exporter = MakeClickHouseExporter();
}
return g_exporter.exporter->EstablishNewConnection();
} catch (const std::bad_alloc&) {
LogExporterWarning("exporter init", "out of memory");
g_exporter.exporter.reset();
return false;
} catch (const std::exception& e) {
LogExporterWarning("exporter init exception", e.what());
g_exporter.exporter.reset();
return false;
}

int PschExportBatch(void) {
// Exception barrier: DequeueEvents reserves a std::vector sized for
// psch_batch_max events and can throw std::bad_alloc. Catching here prevents
// the throw from crossing the bgworker's PG_TRY frame.
int PschExportBatch(void) try {
elog(DEBUG1, "pg_stat_ch: PschExportBatch() called");
StatsExporter* exporter = g_exporter.exporter.get();

Expand Down Expand Up @@ -388,6 +398,14 @@ int PschExportBatch(void) {
}

return exporter->NumExported();
} catch (const std::bad_alloc&) {
LogExporterWarning("export batch", "out of memory");
RecordExporterFailure("export batch OOM");
return 0;
} catch (const std::exception& e) {
LogExporterWarning("export batch exception", e.what());
RecordExporterFailure(e.what());
return 0;
}

void PschResetRetryState(void) {
Expand All @@ -412,8 +430,17 @@ int PschGetConsecutiveFailures(void) {
return g_exporter.exporter->NumConsecutiveFailures();
}

// Exception barrier: exporter destructors (clickhouse-cpp socket close, gRPC
// stub teardown, protobuf arena release) can throw. Catching here prevents the
// throw from crossing the on_proc_exit chain.
void PschExporterShutdown(void) {
g_exporter.exporter.reset();
try {
g_exporter.exporter.reset();
} catch (const std::bad_alloc&) {
LogExporterWarning("exporter shutdown", "out of memory");
} catch (const std::exception& e) {
LogExporterWarning("exporter shutdown exception", e.what());
}
elog(LOG, "pg_stat_ch: statistics exporter shutdown");
}

Expand Down
Loading