From 81a0a9ecc33fe1be7b4a17fd2078d17c575c838b Mon Sep 17 00:00:00 2001 From: Lev Kokotov Date: Mon, 15 Jun 2026 15:34:17 -0700 Subject: [PATCH 1/5] feat: listen/notify stats --- pgdog/src/admin/mod.rs | 2 + pgdog/src/admin/parser.rs | 10 +++ pgdog/src/admin/show_listeners.rs | 72 ++++++++++++++++ pgdog/src/backend/pool/connection/mod.rs | 4 +- pgdog/src/backend/pool/shard/mod.rs | 10 +-- pgdog/src/backend/pub_sub/client.rs | 13 ++- pgdog/src/backend/pub_sub/listener.rs | 101 ++++++++++++++++++---- pgdog/src/backend/pub_sub/mod.rs | 2 + pgdog/src/backend/pub_sub/stats.rs | 41 +++++++++ pgdog/src/stats/http_server.rs | 9 +- pgdog/src/stats/listeners.rs | 102 +++++++++++++++++++++++ pgdog/src/stats/mod.rs | 2 + pgdog/src/stats/otel_exporter.rs | 4 +- 13 files changed, 340 insertions(+), 32 deletions(-) create mode 100644 pgdog/src/admin/show_listeners.rs create mode 100644 pgdog/src/stats/listeners.rs diff --git a/pgdog/src/admin/mod.rs b/pgdog/src/admin/mod.rs index c1fa866a2..001f98ce2 100644 --- a/pgdog/src/admin/mod.rs +++ b/pgdog/src/admin/mod.rs @@ -29,6 +29,7 @@ pub mod show_client_memory; pub mod show_clients; pub mod show_config; pub mod show_instance_id; +pub mod show_listeners; pub mod show_lists; pub mod show_mirrors; pub mod show_peers; @@ -72,6 +73,7 @@ pub use show_client_memory::*; pub use show_clients::*; pub use show_config::*; pub use show_instance_id::*; +pub use show_listeners::*; pub use show_lists::*; pub use show_mirrors::*; pub use show_peers::*; diff --git a/pgdog/src/admin/parser.rs b/pgdog/src/admin/parser.rs index a26779e7a..692ec874b 100644 --- a/pgdog/src/admin/parser.rs +++ b/pgdog/src/admin/parser.rs @@ -25,6 +25,7 @@ pub enum ParseResult { SetupSchema(SetupSchema), Shutdown(Shutdown), ShowLists(ShowLists), + ShowListeners(ShowListeners), ShowPrepared(ShowPreparedStatements), ShowReplication(ShowReplication), ShowServerMemory(ShowServerMemory), @@ -71,6 +72,7 @@ impl ParseResult { SetupSchema(setup_schema) => setup_schema.execute().await, Shutdown(shutdown) => shutdown.execute().await, ShowLists(show_lists) => show_lists.execute().await, + ShowListeners(show_listeners) => show_listeners.execute().await, ShowPrepared(cmd) => cmd.execute().await, ShowReplication(show_replication) => show_replication.execute().await, ShowServerMemory(show_server_memory) => show_server_memory.execute().await, @@ -117,6 +119,7 @@ impl ParseResult { SetupSchema(setup_schema) => setup_schema.name(), Shutdown(shutdown) => shutdown.name(), ShowLists(show_lists) => show_lists.name(), + ShowListeners(show_listeners) => show_listeners.name(), ShowPrepared(show) => show.name(), ShowReplication(show_replication) => show_replication.name(), ShowServerMemory(show_server_memory) => show_server_memory.name(), @@ -183,6 +186,7 @@ impl Parser { "version" => ParseResult::ShowVersion(ShowVersion::parse(&sql)?), "instance_id" => ParseResult::ShowInstanceId(ShowInstanceId::parse(&sql)?), "lists" => ParseResult::ShowLists(ShowLists::parse(&sql)?), + "listeners" => ParseResult::ShowListeners(ShowListeners::parse(&sql)?), "prepared" => ParseResult::ShowPrepared(ShowPreparedStatements::parse(&sql)?), "replication" => ParseResult::ShowReplication(ShowReplication::parse(&sql)?), "replication_slots" => { @@ -265,6 +269,12 @@ mod tests { assert!(matches!(result, Ok(ParseResult::ShowClientMemory(_)))); } + #[test] + fn parses_show_listeners_command() { + let result = Parser::parse("SHOW LISTENERS;"); + assert!(matches!(result, Ok(ParseResult::ShowListeners(_)))); + } + #[test] fn parses_cutover_command() { let result = Parser::parse("CUTOVER"); diff --git a/pgdog/src/admin/show_listeners.rs b/pgdog/src/admin/show_listeners.rs new file mode 100644 index 000000000..e19f10bf5 --- /dev/null +++ b/pgdog/src/admin/show_listeners.rs @@ -0,0 +1,72 @@ +//! SHOW LISTENERS. + +use crate::backend::pub_sub::listener; + +use super::prelude::*; + +pub struct ShowListeners; + +#[async_trait] +impl Command for ShowListeners { + fn name(&self) -> String { + "SHOW LISTENERS".into() + } + + fn parse(_: &str) -> Result { + Ok(Self) + } + + async fn execute(&self) -> Result, Error> { + let mut channels: Vec<_> = listener::stats().into_iter().collect(); + channels.sort_by(|a, b| a.0.cmp(&b.0)); + + let mut messages = vec![ + RowDescription::new(&[ + Field::text("channel"), + Field::numeric("listeners"), + Field::numeric("received"), + Field::numeric("dropped"), + ]) + .message()?, + ]; + + for (channel, stats) in channels { + let mut data_row = DataRow::new(); + data_row + .add(channel.as_str()) + .add(stats.listeners as i64) + .add(stats.recv as i64) + .add(stats.dropped as i64); + messages.push(data_row.message()?); + } + + Ok(messages) + } +} + +#[cfg(test)] +mod tests { + use crate::net::{FromBytes, RowDescription}; + + use super::*; + + #[tokio::test] + async fn show_listeners_reports_columns() { + let messages = ShowListeners + .execute() + .await + .expect("show listeners should execute"); + + assert_eq!(messages[0].code(), 'T'); + + let row_description = + RowDescription::from_bytes(messages[0].payload()).expect("row description parses"); + let columns: Vec<&str> = row_description + .fields + .iter() + .map(|field| field.name.as_str()) + .collect(); + + assert_eq!(columns, ["channel", "listeners", "received", "dropped"]); + } +} diff --git a/pgdog/src/backend/pool/connection/mod.rs b/pgdog/src/backend/pool/connection/mod.rs index 0fdd87f32..05716c4f5 100644 --- a/pgdog/src/backend/pool/connection/mod.rs +++ b/pgdog/src/backend/pool/connection/mod.rs @@ -256,8 +256,8 @@ impl Connection { }; if let Some(shard) = self.cluster()?.shards().get(num) { - let rx = shard.listen(channel).await?; - self.pub_sub.listen(channel, rx); + let listener = shard.listen(channel).await?; + self.pub_sub.listen(channel, listener); } Ok(()) diff --git a/pgdog/src/backend/pool/shard/mod.rs b/pgdog/src/backend/pool/shard/mod.rs index 3c2cd9e70..58faeeb05 100644 --- a/pgdog/src/backend/pool/shard/mod.rs +++ b/pgdog/src/backend/pool/shard/mod.rs @@ -4,7 +4,7 @@ use arc_swap::ArcSwap; use std::ops::Deref; use std::sync::Arc; use std::time::Duration; -use tokio::sync::{OnceCell, broadcast}; +use tokio::sync::OnceCell; use tokio::{select, spawn, sync::Notify}; use tracing::{debug, info}; @@ -12,9 +12,10 @@ use crate::backend::PubSubListener; use crate::backend::Schema; use crate::backend::databases::User; use crate::backend::pool::lb::ban::Ban; +use crate::backend::pub_sub::listener::Listener; use crate::config::{LoadBalancingStrategy, ReadWriteSplit, Role}; +use crate::net::Parameters; use crate::net::messages::FrontendPid; -use crate::net::{NotificationResponse, Parameters}; use super::{Error, Guard, LoadBalancer, Pool, PoolConfig, Request}; @@ -103,10 +104,7 @@ impl Shard { } /// Listen for notifications on channel. - pub async fn listen( - &self, - channel: &str, - ) -> Result, Error> { + pub async fn listen(&self, channel: &str) -> Result { match self.pub_sub.load_full().deref() { Some(listener) => listener.listen(channel).await, _ => Err(Error::PubSubDisabled), diff --git a/pgdog/src/backend/pub_sub/client.rs b/pgdog/src/backend/pub_sub/client.rs index 48ff49016..548b649f0 100644 --- a/pgdog/src/backend/pub_sub/client.rs +++ b/pgdog/src/backend/pub_sub/client.rs @@ -1,12 +1,8 @@ -use crate::config::config; use crate::net::NotificationResponse; +use crate::{backend::pub_sub::listener::Listener, config::config}; use std::{collections::HashMap, sync::Arc}; -use tokio::sync::{ - Notify, - broadcast::{self, error::RecvError}, - mpsc, -}; +use tokio::sync::{Notify, broadcast::error::RecvError, mpsc}; use tokio::{select, spawn}; #[derive(Debug)] @@ -37,7 +33,7 @@ impl PubSubClient { } /// Listen on a channel. - pub fn listen(&mut self, channel: &str, mut rx: broadcast::Receiver) { + pub fn listen(&mut self, channel: &str, mut rx: Listener) { let shutdown = self.shutdown.clone(); let tx = self.tx.clone(); @@ -61,8 +57,9 @@ impl PubSubClient { if tx.send(message).await.is_err() { return; } + rx.stats().incr_recv(); }, - Err(RecvError::Lagged(_)) => (), + Err(RecvError::Lagged(_)) => rx.stats().incr_dropped(), Err(RecvError::Closed) => return, } } diff --git a/pgdog/src/backend/pub_sub/listener.rs b/pgdog/src/backend/pub_sub/listener.rs index b54f8ff20..de46eef1c 100644 --- a/pgdog/src/backend/pub_sub/listener.rs +++ b/pgdog/src/backend/pub_sub/listener.rs @@ -3,7 +3,12 @@ //! Handles notifications from Postgres and sends them out //! to a broadcast channel. //! -use std::{collections::HashMap, sync::Arc, time::Duration}; +use std::{ + collections::HashMap, + ops::{Deref, DerefMut}, + sync::Arc, + time::Duration, +}; use once_cell::sync::Lazy; use parking_lot::Mutex; @@ -14,6 +19,7 @@ use tokio::{ }; use tracing::{debug, error, info}; +use super::{Stats, StatsSnapshot}; use crate::{ backend::{self, ConnectReason, DisconnectReason, Pool, pool::Error}, config::config, @@ -42,10 +48,66 @@ impl From for ProtocolMessage { } } -type Channels = Arc>>>; +type Channels = Arc>>; static CHANNELS: Lazy = Lazy::new(|| Arc::new(Mutex::new(HashMap::new()))); +/// Get stats for all channels. +pub fn stats() -> HashMap { + CHANNELS + .lock() + .iter() + .map(|(name, channel)| (name.to_string(), channel.stats.get())) + .collect() +} + +#[derive(Debug)] +struct Channel { + tx: broadcast::Sender, + stats: Arc, +} + +#[derive(Debug)] +pub struct Listener { + rx: broadcast::Receiver, + stats: Arc, +} + +impl Listener { + fn new(channel: &Channel) -> Self { + channel.stats.incr_listeners(); + + Self { + rx: channel.tx.subscribe(), + stats: channel.stats.clone(), + } + } + + pub(crate) fn stats(&self) -> &Stats { + &self.stats + } +} + +impl Drop for Listener { + fn drop(&mut self) { + self.stats.decr_listeners(); + } +} + +impl Deref for Listener { + type Target = broadcast::Receiver; + + fn deref(&self) -> &Self::Target { + &self.rx + } +} + +impl DerefMut for Listener { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.rx + } +} + #[derive(Debug)] struct Comms { start: Notify, @@ -124,23 +186,34 @@ impl PubSubListener { } /// Listen on a channel. - pub async fn listen( - &self, - channel: &str, - ) -> Result, Error> { - if let Some(channel) = self.channels.lock().get(channel) { - return Ok(channel.subscribe()); - } + pub async fn listen(&self, channel_name: &str) -> Result { + let listener = { + let mut guard = self.channels.lock(); + + if let Some(channel) = guard.get(channel_name) { + return Ok(Listener::new(channel)); + } - let (tx, rx) = broadcast::channel(config().config.general.pub_sub_channel_size); + let (tx, _) = broadcast::channel(config().config.general.pub_sub_channel_size); + let stats = Arc::new(Stats::default()); + + let channel = Channel { + tx, + stats: stats.clone(), + }; + let listener = Listener::new(&channel); + + guard.insert(channel_name.to_string(), channel); + + listener + }; - self.channels.lock().insert(channel.to_string(), tx); self.tx - .send(Request::Subscribe(channel.to_string())) + .send(Request::Subscribe(channel_name.to_string())) .await .map_err(|_| Error::Offline)?; - Ok(rx) + Ok(listener) } /// Notify a channel with payload. @@ -198,7 +271,7 @@ impl PubSubListener { let notification = NotificationResponse::from_bytes(message.to_bytes())?; let mut unsub = None; if let Some(channel) = channels.lock().get(notification.channel()) { - match channel.send(notification) { + match channel.tx.send(notification) { Ok(_) => (), Err(err) => unsub = Some(err.0.channel().to_string()), } diff --git a/pgdog/src/backend/pub_sub/mod.rs b/pgdog/src/backend/pub_sub/mod.rs index c2010bdc5..87f6ea19e 100644 --- a/pgdog/src/backend/pub_sub/mod.rs +++ b/pgdog/src/backend/pub_sub/mod.rs @@ -2,6 +2,8 @@ pub mod client; pub mod commands; pub mod listener; pub mod notification; +pub mod stats; pub use client::PubSubClient; pub use listener::PubSubListener; +pub use stats::{Stats, StatsSnapshot}; diff --git a/pgdog/src/backend/pub_sub/stats.rs b/pgdog/src/backend/pub_sub/stats.rs index e69de29bb..848000c7b 100644 --- a/pgdog/src/backend/pub_sub/stats.rs +++ b/pgdog/src/backend/pub_sub/stats.rs @@ -0,0 +1,41 @@ +use std::sync::atomic::{AtomicU64, Ordering}; + +#[derive(Debug, Default)] +pub struct Stats { + recv: AtomicU64, + dropped: AtomicU64, + listeners: AtomicU64, +} + +#[derive(Debug, Default, Copy, Clone)] +pub struct StatsSnapshot { + pub(crate) recv: u64, + pub(crate) dropped: u64, + pub(crate) listeners: u64, +} + +impl Stats { + pub(crate) fn incr_recv(&self) { + self.recv.fetch_add(1, Ordering::Relaxed); + } + + pub(crate) fn incr_dropped(&self) { + self.dropped.fetch_add(1, Ordering::Relaxed); + } + + pub(crate) fn incr_listeners(&self) { + self.listeners.fetch_add(1, Ordering::Relaxed); + } + + pub(crate) fn decr_listeners(&self) { + self.listeners.fetch_sub(1, Ordering::Relaxed); + } + + pub(crate) fn get(&self) -> StatsSnapshot { + StatsSnapshot { + recv: self.recv.load(Ordering::Relaxed), + dropped: self.dropped.load(Ordering::Relaxed), + listeners: self.listeners.load(Ordering::Relaxed), + } + } +} diff --git a/pgdog/src/stats/http_server.rs b/pgdog/src/stats/http_server.rs index c2f6c2e6f..b69e9a1ff 100644 --- a/pgdog/src/stats/http_server.rs +++ b/pgdog/src/stats/http_server.rs @@ -10,7 +10,7 @@ use hyper_util::rt::TokioIo; use tokio::net::TcpListener; use tracing::{info, warn}; -use super::{Clients, MirrorStatsMetrics, Pools, QueryCache, TwoPc}; +use super::{Clients, Listeners, MirrorStatsMetrics, Pools, QueryCache, TwoPc}; async fn metrics(_: Request) -> Result>, Infallible> { let clients = Clients::load(); @@ -20,6 +20,11 @@ async fn metrics(_: Request) -> Result = Listeners::load() + .into_iter() + .map(|m| m.to_string()) + .collect(); + let listeners = listeners.join("\n"); let query_cache: Vec<_> = QueryCache::load() .metrics() .into_iter() @@ -33,6 +38,8 @@ async fn metrics(_: Request) -> Result Vec { + let mut stats: Vec<_> = listener::stats().into_iter().collect(); + stats.sort_by(|a, b| a.0.cmp(&b.0)); + + let mut listeners = vec![]; + let mut received = vec![]; + let mut dropped = vec![]; + + for (channel, stats) in stats { + let labels = vec![("channel".into(), channel)]; + + listeners.push(Measurement { + labels: labels.clone(), + measurement: stats.listeners.into(), + }); + received.push(Measurement { + labels: labels.clone(), + measurement: stats.recv.into(), + }); + dropped.push(Measurement { + labels, + measurement: stats.dropped.into(), + }); + } + + vec![ + Metric::new(ListenerMetric { + name: "pub_sub_listeners".into(), + measurements: listeners, + help: "Current number of clients listening on a pub/sub channel.".into(), + metric_type: "gauge".into(), + }), + Metric::new(ListenerMetric { + name: "pub_sub_listener_received".into(), + measurements: received, + help: "Total number of notifications received by pub/sub listeners.".into(), + metric_type: "counter".into(), + }), + Metric::new(ListenerMetric { + name: "pub_sub_listener_dropped".into(), + measurements: dropped, + help: "Total number of notifications dropped by lagging pub/sub listeners.".into(), + metric_type: "counter".into(), + }), + ] + } +} + +struct ListenerMetric { + name: String, + measurements: Vec, + help: String, + metric_type: String, +} + +impl OpenMetric for ListenerMetric { + fn name(&self) -> String { + self.name.clone() + } + + fn measurements(&self) -> Vec { + self.measurements.clone() + } + + fn help(&self) -> Option { + Some(self.help.clone()) + } + + fn metric_type(&self) -> String { + self.metric_type.clone() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn listener_metrics_include_expected_series() { + let metrics = Listeners::load(); + let names: Vec<_> = metrics.iter().map(|metric| metric.name()).collect(); + + assert_eq!( + names, + [ + "pub_sub_listeners", + "pub_sub_listener_received", + "pub_sub_listener_dropped", + ] + ); + assert_eq!(metrics[0].metric_type(), "gauge"); + assert_eq!(metrics[1].metric_type(), "counter"); + assert_eq!(metrics[2].metric_type(), "counter"); + } +} diff --git a/pgdog/src/stats/mod.rs b/pgdog/src/stats/mod.rs index c54afb54b..3a2c5cddc 100644 --- a/pgdog/src/stats/mod.rs +++ b/pgdog/src/stats/mod.rs @@ -7,12 +7,14 @@ pub mod otel; pub mod otel_exporter; pub mod pools; pub use open_metric::*; +pub mod listeners; pub mod logger; pub mod memory; pub mod query_cache; pub mod two_pc; pub use clients::Clients; +pub use listeners::Listeners; pub use logger::Logger as StatsLogger; pub use mirror_stats::MirrorStatsMetrics; pub use pools::{PoolMetric, Pools}; diff --git a/pgdog/src/stats/otel_exporter.rs b/pgdog/src/stats/otel_exporter.rs index 8ce3680b6..45b33e8f9 100644 --- a/pgdog/src/stats/otel_exporter.rs +++ b/pgdog/src/stats/otel_exporter.rs @@ -9,7 +9,7 @@ use tokio::time::sleep; use tracing::{info, warn}; use super::otel; -use super::{Clients, MirrorStatsMetrics, Pools, QueryCache, TwoPc}; +use super::{Clients, Listeners, MirrorStatsMetrics, Pools, QueryCache, TwoPc}; use crate::config::config; /// Maximum number of metrics per OTLP request to stay under endpoint payload limits. @@ -38,12 +38,14 @@ pub async fn run() { let clients = Clients::load(); let pools = Pools::load().into_metrics(); let mirror = MirrorStatsMetrics::load(); + let listeners = Listeners::load(); let query_cache = QueryCache::load().metrics(); let two_pc = TwoPc::load(); let mut all: Vec<&super::Metric> = vec![&clients, &two_pc]; all.extend(pools.iter()); all.extend(mirror.iter()); + all.extend(listeners.iter()); all.extend(query_cache.iter()); // Send batches in parallel to stay under the 512 KB payload limit. From 18a6de39566d70d2ecc22667483eff209eb3eb87 Mon Sep 17 00:00:00 2001 From: Lev Kokotov Date: Mon, 15 Jun 2026 16:25:26 -0700 Subject: [PATCH 2/5] save --- integration/rust/tests/integration/notify.rs | 59 +++++++++++++++++++- 1 file changed, 57 insertions(+), 2 deletions(-) diff --git a/integration/rust/tests/integration/notify.rs b/integration/rust/tests/integration/notify.rs index 338682446..25275ec3b 100644 --- a/integration/rust/tests/integration/notify.rs +++ b/integration/rust/tests/integration/notify.rs @@ -2,8 +2,13 @@ use std::sync::Arc; use std::time::Duration; use parking_lot::Mutex; -use sqlx::{Connection, Executor, PgConnection, postgres::PgListener}; -use tokio::{select, spawn, sync::Barrier, time::timeout}; +use rust::setup::admin_sqlx; +use sqlx::{Connection, Executor, PgConnection, Row, postgres::PgListener}; +use tokio::{ + select, spawn, + sync::Barrier, + time::{sleep, timeout}, +}; #[tokio::test] async fn test_notify() { @@ -72,6 +77,56 @@ async fn test_notify() { } } +#[tokio::test] +async fn test_listener_stats_from_admin_db() { + let channel = "test_listener_stats"; + let mut listener = PgListener::connect("postgres://pgdog:pgdog@127.0.0.1:6432/pgdog") + .await + .unwrap(); + listener.listen(channel).await.unwrap(); + + let mut conn = PgConnection::connect("postgres://pgdog:pgdog@127.0.0.1:6432/pgdog") + .await + .unwrap(); + conn.execute(format!("NOTIFY {channel}, 'stats'").as_str()) + .await + .unwrap(); + + let notification = timeout(Duration::from_secs(5), listener.recv()) + .await + .unwrap() + .unwrap(); + assert_eq!(notification.channel(), channel); + assert_eq!(notification.payload(), "stats"); + + let admin = admin_sqlx().await; + let (listeners, received, dropped) = timeout(Duration::from_secs(5), async { + loop { + let rows = admin.fetch_all("SHOW LISTENERS").await.unwrap(); + if let Some(row) = rows + .iter() + .find(|row| row.get::("channel") == channel) + { + let listeners = row.get::("listeners"); + let received = row.get::("received"); + let dropped = row.get::("dropped"); + + if listeners >= 1 && received >= 1 { + return (listeners, received, dropped); + } + } + + sleep(Duration::from_millis(10)).await; + } + }) + .await + .unwrap(); + + assert_eq!(listeners, 1); + assert_eq!(received, 1); + assert_eq!(dropped, 0); +} + #[tokio::test] async fn test_notify_only_delivered_after_transaction_commit() { let messages = Arc::new(Mutex::new(vec![])); From 0c88db9204a3aa939153467a260415cf4532c7fb Mon Sep 17 00:00:00 2001 From: Lev Kokotov Date: Mon, 15 Jun 2026 16:44:30 -0700 Subject: [PATCH 3/5] improve test coverage --- pgdog/src/backend/pub_sub/client.rs | 96 ++++++++++++- pgdog/src/backend/pub_sub/listener.rs | 186 +++++++++++++++++++++++++- pgdog/src/backend/pub_sub/mod.rs | 6 + pgdog/src/backend/pub_sub/stats.rs | 25 ++++ 4 files changed, 304 insertions(+), 9 deletions(-) diff --git a/pgdog/src/backend/pub_sub/client.rs b/pgdog/src/backend/pub_sub/client.rs index 548b649f0..b0ed207e2 100644 --- a/pgdog/src/backend/pub_sub/client.rs +++ b/pgdog/src/backend/pub_sub/client.rs @@ -1,5 +1,7 @@ -use crate::net::NotificationResponse; -use crate::{backend::pub_sub::listener::Listener, config::config}; +use crate::{ + backend::pub_sub::{channel_size, listener::Listener}, + net::NotificationResponse, +}; use std::{collections::HashMap, sync::Arc}; use tokio::sync::{Notify, broadcast::error::RecvError, mpsc}; @@ -21,8 +23,7 @@ impl Default for PubSubClient { impl PubSubClient { pub fn new() -> Self { - let size = config().config.general.pub_sub_channel_size; - let (tx, rx) = mpsc::channel(std::cmp::max(1, size)); + let (tx, rx) = mpsc::channel(channel_size()); Self { shutdown: Arc::new(Notify::new()), @@ -83,10 +84,93 @@ impl PubSubClient { #[cfg(test)] mod test { + use std::time::Duration; + + use bytes::BufMut; + use tokio::{task::yield_now, time::timeout}; + + use crate::{ + backend::pub_sub::{StatsSnapshot, listener::test_support::TestChannel}, + net::{FromBytes, Payload}, + }; + use super::*; + fn notification(channel: &str, payload: &str) -> NotificationResponse { + let mut bytes = Payload::named('A'); + bytes.put_i32(1234); + bytes.put_string(channel); + bytes.put_string(payload); + + NotificationResponse::from_bytes(bytes.freeze()).expect("notification") + } + + fn assert_snapshot(snapshot: StatsSnapshot, recv: u64, dropped: u64, listeners: u64) { + assert_eq!(snapshot.recv, recv); + assert_eq!(snapshot.dropped, dropped); + assert_eq!(snapshot.listeners, listeners); + } + + async fn recv_notification(client: &mut PubSubClient) -> NotificationResponse { + timeout(Duration::from_secs(1), client.recv()) + .await + .expect("timed out waiting for notification") + .expect("notification") + } + + async fn wait_for_listener_count(channel: &TestChannel, listeners: u64) { + for _ in 0..10 { + if channel.stats().listeners == listeners { + return; + } + + yield_now().await; + } + + assert_eq!(channel.stats().listeners, listeners); + } + #[test] - fn test_empty_pub_sub_client() { - let _client = PubSubClient::new(); + fn default_constructs_empty_client() { + let client = PubSubClient::default(); + assert!(client.unlisten.is_empty()); + } + + #[tokio::test] + async fn listen_forwards_notifications_to_client() { + let channel = TestChannel::new(); + let mut client = PubSubClient::new(); + + client.listen("events", channel.listener()); + channel + .send(notification("events", "payload")) + .expect("send notification"); + + let message = recv_notification(&mut client).await; + assert_eq!(message.channel(), "events"); + assert_eq!(message.payload(), "payload"); + assert_snapshot(channel.stats(), 1, 0, 1); + assert_eq!(client.unlisten.len(), 1); + } + + #[tokio::test] + async fn unlisten_stops_forwarding_notifications() { + let channel = TestChannel::new(); + let mut client = PubSubClient::new(); + + client.listen("events", channel.listener()); + assert_eq!(client.unlisten.len(), 1); + + client.unlisten("events"); + assert!(client.unlisten.is_empty()); + wait_for_listener_count(&channel, 0).await; + + assert!(channel.send(notification("events", "payload")).is_err()); + assert!( + timeout(Duration::from_millis(50), client.recv()) + .await + .is_err() + ); + assert_snapshot(channel.stats(), 0, 0, 0); } } diff --git a/pgdog/src/backend/pub_sub/listener.rs b/pgdog/src/backend/pub_sub/listener.rs index de46eef1c..05609a90f 100644 --- a/pgdog/src/backend/pub_sub/listener.rs +++ b/pgdog/src/backend/pub_sub/listener.rs @@ -19,7 +19,7 @@ use tokio::{ }; use tracing::{debug, error, info}; -use super::{Stats, StatsSnapshot}; +use super::{Stats, StatsSnapshot, channel_size}; use crate::{ backend::{self, ConnectReason, DisconnectReason, Pool, pool::Error}, config::config, @@ -108,6 +108,45 @@ impl DerefMut for Listener { } } +#[cfg(test)] +pub(crate) mod test_support { + use super::*; + + pub(crate) struct TestChannel { + tx: broadcast::Sender, + stats: Arc, + } + + impl TestChannel { + pub(crate) fn new() -> Self { + let (tx, _) = broadcast::channel(4); + + Self { + tx, + stats: Arc::new(Stats::default()), + } + } + + pub(crate) fn listener(&self) -> Listener { + Listener::new(&Channel { + tx: self.tx.clone(), + stats: self.stats.clone(), + }) + } + + pub(crate) fn send( + &self, + notification: NotificationResponse, + ) -> Result> { + self.tx.send(notification) + } + + pub(crate) fn stats(&self) -> StatsSnapshot { + self.stats.get() + } + } +} + #[derive(Debug)] struct Comms { start: Notify, @@ -127,7 +166,7 @@ pub struct PubSubListener { impl PubSubListener { /// Create new listener on the server connection. pub fn new(pool: &Pool) -> Self { - let (tx, mut rx) = mpsc::channel(config().config.general.pub_sub_channel_size); + let (tx, mut rx) = mpsc::channel(channel_size()); let pool = pool.clone(); let channels = CHANNELS.clone(); @@ -194,7 +233,7 @@ impl PubSubListener { return Ok(Listener::new(channel)); } - let (tx, _) = broadcast::channel(config().config.general.pub_sub_channel_size); + let (tx, _) = broadcast::channel(channel_size()); let stats = Arc::new(Stats::default()); let channel = Channel { @@ -304,3 +343,144 @@ impl PubSubListener { Ok(()) } } + +#[cfg(test)] +mod test { + use std::{collections::HashMap, sync::Arc}; + + use parking_lot::Mutex; + use tokio::sync::{Notify, mpsc}; + + use super::{test_support::TestChannel, *}; + + fn test_pub_sub_listener() -> (PubSubListener, mpsc::Receiver) { + let (tx, rx) = mpsc::channel(4); + + ( + PubSubListener { + id: FrontendPid::new(), + pool: Pool::new_test(), + tx, + channels: Arc::new(Mutex::new(HashMap::new())), + comms: Arc::new(Comms { + start: Notify::new(), + shutdown: Notify::new(), + }), + }, + rx, + ) + } + + fn assert_snapshot(snapshot: StatsSnapshot, recv: u64, dropped: u64, listeners: u64) { + assert_eq!(snapshot.recv, recv); + assert_eq!(snapshot.dropped, dropped); + assert_eq!(snapshot.listeners, listeners); + } + + fn assert_request_query(request: Request, expected: &str) { + let ProtocolMessage::Query(query) = ProtocolMessage::from(request) else { + panic!("request should convert to a query message"); + }; + + assert_eq!(query.query(), expected); + } + + async fn expect_subscribe(rx: &mut mpsc::Receiver, expected: &str) { + let request = rx.recv().await.expect("request"); + + match request { + Request::Subscribe(channel) => assert_eq!(channel, expected), + request => panic!("expected subscribe request, got {request:?}"), + } + } + + async fn expect_notify( + rx: &mut mpsc::Receiver, + expected_channel: &str, + expected_payload: &str, + ) { + let request = rx.recv().await.expect("request"); + + match request { + Request::Notify { channel, payload } => { + assert_eq!(channel, expected_channel); + assert_eq!(payload, expected_payload); + } + request => panic!("expected notify request, got {request:?}"), + } + } + + #[test] + fn requests_convert_to_expected_sql_queries() { + assert_request_query(Request::Subscribe("events".into()), "LISTEN \"events\""); + assert_request_query(Request::Unsubscribe("events".into()), "UNLISTEN \"events\""); + assert_request_query( + Request::Notify { + channel: "events".into(), + payload: "payload".into(), + }, + "NOTIFY \"events\", 'payload'", + ); + } + + #[test] + fn listener_drop_updates_listener_count() { + let channel = TestChannel::new(); + assert_snapshot(channel.stats(), 0, 0, 0); + + let first = channel.listener(); + assert_snapshot(channel.stats(), 0, 0, 1); + + { + let _second = channel.listener(); + assert_snapshot(channel.stats(), 0, 0, 2); + } + + assert_snapshot(channel.stats(), 0, 0, 1); + drop(first); + assert_snapshot(channel.stats(), 0, 0, 0); + } + + #[tokio::test] + async fn listen_creates_channel_once_and_reuses_it() { + let (pub_sub, mut rx) = test_pub_sub_listener(); + + let first = pub_sub.listen("events").await.expect("first listen"); + expect_subscribe(&mut rx, "events").await; + assert_eq!(pub_sub.channels.lock().len(), 1); + assert_snapshot(first.stats().get(), 0, 0, 1); + + let second = pub_sub.listen("events").await.expect("second listen"); + assert!(matches!( + rx.try_recv(), + Err(mpsc::error::TryRecvError::Empty) + )); + assert_eq!(pub_sub.channels.lock().len(), 1); + assert_snapshot(second.stats().get(), 0, 0, 2); + + drop(first); + assert_snapshot(second.stats().get(), 0, 0, 1); + + drop(second); + let stats = pub_sub + .channels + .lock() + .get("events") + .expect("events channel") + .stats + .get(); + assert_snapshot(stats, 0, 0, 0); + } + + #[tokio::test] + async fn notify_queues_notify_request() { + let (pub_sub, mut rx) = test_pub_sub_listener(); + + pub_sub + .notify("events", "payload") + .await + .expect("notify request"); + + expect_notify(&mut rx, "events", "payload").await; + } +} diff --git a/pgdog/src/backend/pub_sub/mod.rs b/pgdog/src/backend/pub_sub/mod.rs index 87f6ea19e..585f8294a 100644 --- a/pgdog/src/backend/pub_sub/mod.rs +++ b/pgdog/src/backend/pub_sub/mod.rs @@ -7,3 +7,9 @@ pub mod stats; pub use client::PubSubClient; pub use listener::PubSubListener; pub use stats::{Stats, StatsSnapshot}; + +use crate::config::config; + +fn channel_size() -> usize { + std::cmp::max(1, config().config.general.pub_sub_channel_size) +} diff --git a/pgdog/src/backend/pub_sub/stats.rs b/pgdog/src/backend/pub_sub/stats.rs index 848000c7b..0126be813 100644 --- a/pgdog/src/backend/pub_sub/stats.rs +++ b/pgdog/src/backend/pub_sub/stats.rs @@ -39,3 +39,28 @@ impl Stats { } } } + +#[cfg(test)] +mod test { + use super::*; + + fn assert_snapshot(snapshot: StatsSnapshot, recv: u64, dropped: u64, listeners: u64) { + assert_eq!(snapshot.recv, recv); + assert_eq!(snapshot.dropped, dropped); + assert_eq!(snapshot.listeners, listeners); + } + + #[test] + fn snapshot_reflects_counter_changes() { + let stats = Stats::default(); + assert_snapshot(stats.get(), 0, 0, 0); + + stats.incr_recv(); + stats.incr_dropped(); + stats.incr_listeners(); + stats.incr_listeners(); + stats.decr_listeners(); + + assert_snapshot(stats.get(), 1, 1, 1); + } +} From a0d32fcbf2e72bc56352a6ba5d6b59fc1ab55deb Mon Sep 17 00:00:00 2001 From: Lev Kokotov Date: Mon, 15 Jun 2026 16:59:32 -0700 Subject: [PATCH 4/5] test --- integration/rust/tests/integration/notify.rs | 152 +++++++++++-------- 1 file changed, 87 insertions(+), 65 deletions(-) diff --git a/integration/rust/tests/integration/notify.rs b/integration/rust/tests/integration/notify.rs index 25275ec3b..da57f4b46 100644 --- a/integration/rust/tests/integration/notify.rs +++ b/integration/rust/tests/integration/notify.rs @@ -1,9 +1,10 @@ use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; use parking_lot::Mutex; use rust::setup::admin_sqlx; -use sqlx::{Connection, Executor, PgConnection, Row, postgres::PgListener}; +use rust_decimal::prelude::ToPrimitive; +use sqlx::{Connection, Executor, PgConnection, Pool, Postgres, Row, postgres::PgListener}; use tokio::{ select, spawn, sync::Barrier, @@ -15,46 +16,47 @@ async fn test_notify() { let messages = Arc::new(Mutex::new(vec![])); let mut tasks = vec![]; let mut listeners = vec![]; - let barrier = Arc::new(Barrier::new(5)); - - for i in 0..5 { + let test_id = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_nanos(); + let channels = (0..5) + .map(|i| format!("test_notify_{test_id}_{i}")) + .collect::>(); + let received_barrier = Arc::new(Barrier::new(channels.len() + 1)); + let release_barrier = Arc::new(Barrier::new(channels.len() + 1)); + + for channel in channels.iter().cloned() { let task_msgs = messages.clone(); let mut listener = PgListener::connect("postgres://pgdog:pgdog@127.0.0.1:6432/pgdog") .await .unwrap(); - listener - .listen(format!("test_notify_{}", i).as_str()) - .await - .unwrap(); + listener.listen(&channel).await.unwrap(); - let barrier = barrier.clone(); + let received_barrier = received_barrier.clone(); + let release_barrier = release_barrier.clone(); listeners.push(spawn(async move { let mut received = 0; - loop { - select! { - msg = listener.recv() => { - let msg = msg.unwrap(); - received += 1; - task_msgs.lock().push(msg); - if received == 10 { - break; - } - } - - } + while received < 10 { + let msg = listener.recv().await.unwrap(); + received += 1; + task_msgs.lock().push(msg); } - barrier.wait().await; + + received_barrier.wait().await; + release_barrier.wait().await; })); } for i in 0..50 { + let channel = channels[i % channels.len()].clone(); let handle = spawn(async move { let mut conn = PgConnection::connect("postgres://pgdog:pgdog@127.0.0.1:6432/pgdog") .await .unwrap(); - conn.execute(format!("NOTIFY test_notify_{}, 'test_notify_{}'", i % 5, i % 5).as_str()) + conn.execute(format!("NOTIFY {channel}, '{channel}'").as_str()) .await .unwrap(); }); @@ -66,65 +68,85 @@ async fn test_notify() { task.await.unwrap(); } - for listener in listeners { - listener.await.unwrap(); - } + received_barrier.wait().await; - assert_eq!(messages.lock().len(), 50); - let messages = messages.lock(); - for message in messages.iter() { - assert_eq!(message.channel(), message.payload()); + { + let messages = messages.lock(); + assert_eq!(messages.len(), 50); + for message in messages.iter() { + assert_eq!(message.channel(), message.payload()); + } } -} -#[tokio::test] -async fn test_listener_stats_from_admin_db() { - let channel = "test_listener_stats"; - let mut listener = PgListener::connect("postgres://pgdog:pgdog@127.0.0.1:6432/pgdog") - .await - .unwrap(); - listener.listen(channel).await.unwrap(); + assert_listener_stats(&channels, 10).await; - let mut conn = PgConnection::connect("postgres://pgdog:pgdog@127.0.0.1:6432/pgdog") - .await - .unwrap(); - conn.execute(format!("NOTIFY {channel}, 'stats'").as_str()) - .await - .unwrap(); + release_barrier.wait().await; + for listener in listeners { + listener.await.unwrap(); + } +} - let notification = timeout(Duration::from_secs(5), listener.recv()) - .await - .unwrap() - .unwrap(); - assert_eq!(notification.channel(), channel); - assert_eq!(notification.payload(), "stats"); +#[derive(Debug)] +struct ListenerStats { + listeners: i64, + received: i64, + dropped: i64, +} +async fn assert_listener_stats(channels: &[String], expected_received: i64) { let admin = admin_sqlx().await; - let (listeners, received, dropped) = timeout(Duration::from_secs(5), async { + let stats = wait_for_listener_stats(&admin, channels, expected_received).await; + + for (channel, stats) in channels.iter().zip(stats) { + assert_eq!(stats.listeners, 1, "{channel} listener count"); + assert_eq!( + stats.received, expected_received, + "{channel} received count" + ); + assert_eq!(stats.dropped, 0, "{channel} dropped count"); + } +} + +async fn wait_for_listener_stats( + admin: &Pool, + channels: &[String], + expected_received: i64, +) -> Vec { + timeout(Duration::from_secs(5), async { loop { let rows = admin.fetch_all("SHOW LISTENERS").await.unwrap(); - if let Some(row) = rows + let stats = channels .iter() - .find(|row| row.get::("channel") == channel) + .filter_map(|channel| { + rows.iter() + .find(|row| row.get::("channel") == *channel) + .map(|row| ListenerStats { + listeners: numeric_column(row, "listeners"), + received: numeric_column(row, "received"), + dropped: numeric_column(row, "dropped"), + }) + }) + .collect::>(); + + if stats.len() == channels.len() + && stats + .iter() + .all(|stats| stats.listeners >= 1 && stats.received >= expected_received) { - let listeners = row.get::("listeners"); - let received = row.get::("received"); - let dropped = row.get::("dropped"); - - if listeners >= 1 && received >= 1 { - return (listeners, received, dropped); - } + return stats; } sleep(Duration::from_millis(10)).await; } }) .await - .unwrap(); + .unwrap() +} - assert_eq!(listeners, 1); - assert_eq!(received, 1); - assert_eq!(dropped, 0); +fn numeric_column(row: &sqlx::postgres::PgRow, column: &str) -> i64 { + row.get::(column) + .to_i64() + .unwrap() } #[tokio::test] From 88058e08bbaf71e726250bfbc1b01fbb37c3ebd1 Mon Sep 17 00:00:00 2001 From: Lev Kokotov Date: Mon, 15 Jun 2026 17:06:34 -0700 Subject: [PATCH 5/5] fmt --- integration/rust/tests/integration/notify.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/integration/rust/tests/integration/notify.rs b/integration/rust/tests/integration/notify.rs index da57f4b46..45a5e0453 100644 --- a/integration/rust/tests/integration/notify.rs +++ b/integration/rust/tests/integration/notify.rs @@ -26,14 +26,14 @@ async fn test_notify() { let received_barrier = Arc::new(Barrier::new(channels.len() + 1)); let release_barrier = Arc::new(Barrier::new(channels.len() + 1)); - for channel in channels.iter().cloned() { + for channel in &channels { let task_msgs = messages.clone(); let mut listener = PgListener::connect("postgres://pgdog:pgdog@127.0.0.1:6432/pgdog") .await .unwrap(); - listener.listen(&channel).await.unwrap(); + listener.listen(channel).await.unwrap(); let received_barrier = received_barrier.clone(); let release_barrier = release_barrier.clone();