Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 65 additions & 0 deletions integration/rust/tests/sqlx/params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,71 @@ async fn test_params() {
handle2.await.unwrap();
}

#[tokio::test]
async fn test_set_config_search_path_replayed() {
let mut conn1 = PgConnection::connect("postgres://pgdog:pgdog@127.0.0.1:6432/pgdog")
.await
.unwrap();

let mut conn2 = PgConnection::connect("postgres://pgdog:pgdog@127.0.0.1:6432/pgdog")
.await
.unwrap();

let row = conn1
.fetch_one("SELECT pg_catalog.set_config('search_path', 'pg_catalog', false)")
.await
.unwrap();
assert_eq!(row.get::<String, usize>(0), "pg_catalog");

for _ in 0..25 {
// Conn 2 takes the connection conn1 just used.
conn2.execute("BEGIN").await.unwrap();

// Conn 1 is forced to get a new one, which should now be synchronized
// with the search_path recorded from set_config.
let row = conn1.fetch_one("SHOW search_path").await.unwrap();
assert_eq!(row.get::<String, usize>(0), "pg_catalog");

conn2.execute("COMMIT").await.unwrap();
}
}

#[tokio::test]
async fn test_set_config_search_path_rollback() {
let mut conn1 = PgConnection::connect("postgres://pgdog:pgdog@127.0.0.1:6432/pgdog")
.await
.unwrap();

let mut conn2 = PgConnection::connect("postgres://pgdog:pgdog@127.0.0.1:6432/pgdog")
.await
.unwrap();

conn1.execute("SET search_path TO 'public'").await.unwrap();
conn1.execute("BEGIN").await.unwrap();
let row = conn1
.fetch_one("SELECT pg_catalog.set_config('search_path', 'pg_catalog', false)")
.await
.unwrap();
assert_eq!(row.get::<String, usize>(0), "pg_catalog");

let row = conn1.fetch_one("SHOW search_path").await.unwrap();
assert_eq!(row.get::<String, usize>(0), "pg_catalog");

conn1.execute("ROLLBACK").await.unwrap();

for _ in 0..25 {
// Conn 2 takes the connection conn1 just used.
conn2.execute("BEGIN").await.unwrap();

// Conn 1 is forced to get a new one, which should be synchronized
// with the pre-transaction search_path after rollback.
let row = conn1.fetch_one("SHOW search_path").await.unwrap();
assert_eq!(row.get::<String, usize>(0), "public");

conn2.execute("COMMIT").await.unwrap();
}
}

#[tokio::test]
async fn test_set_param() {
let mut conn1 = PgConnection::connect(
Expand Down
12 changes: 11 additions & 1 deletion pgdog/src/frontend/client/query_engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ use crate::{
frontend::{
BufferedQuery, Client, ClientComms, Command, Error, Router, RouterContext, Stats,
client::query_engine::{hooks::QueryEngineHooks, route_query::ClusterCheck},
router::{Route, parser::Shard},
router::{
Route,
parser::{SetParam, Shard},
},
},
net::{ErrorResponse, Message, Parameters},
state::State,
Expand All @@ -30,6 +33,7 @@ mod query_log_stdout;
pub mod rewrite;
pub mod route_query;
pub mod set;
pub mod set_config;
pub mod shard_key_rewrite;
pub mod start_transaction;
#[cfg(test)]
Expand Down Expand Up @@ -58,6 +62,7 @@ pub struct QueryEngine {
two_pc: TwoPc,
notify_buffer: NotifyBuffer,
pending_explain: Option<ExplainResponseState>,
pending_set_config: Option<SetParam>,
hooks: QueryEngineHooks,
advisory_locks: AdvisoryLocks,
}
Expand All @@ -79,6 +84,7 @@ impl QueryEngine {
two_pc: TwoPc::default(),
notify_buffer: NotifyBuffer::default(),
pending_explain: None,
pending_set_config: None,
begin_stmt: None,
router: Router::default(),
advisory_locks: AdvisoryLocks::default(),
Expand Down Expand Up @@ -210,6 +216,10 @@ impl QueryEngine {
context.params.rollback();
}
Command::Query(_) => self.execute(context).await?,
Command::SetConfig { param, .. } => {
let param = param.clone();
self.set_config(context, param).await?;
}
Command::Listen { .. } | Command::Notify { .. } | Command::Unlisten(_)
if self.backend.session_mode() =>
{
Expand Down
2 changes: 2 additions & 0 deletions pgdog/src/frontend/client/query_engine/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,11 @@ impl QueryEngine {

if code == 'C' {
self.emit_explain_rows(context).await?;
self.apply_pending_set_config(context);
}

if code == 'E' {
self.clear_pending_set_config();
if let Some(state) = self.pending_explain.as_mut() {
state.annotated = true;
}
Expand Down
49 changes: 49 additions & 0 deletions pgdog/src/frontend/client/query_engine/set_config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
use crate::frontend::SetParam;

use super::*;

impl QueryEngine {
pub(crate) async fn set_config(
&mut self,
context: &mut QueryEngineContext<'_>,
param: SetParam,
) -> Result<(), Error> {
self.pending_set_config = Some(param);

match self.execute(context).await {
Ok(()) => Ok(()),
Err(err) => {
self.pending_set_config = None;
Err(err)
}
}
}

pub(super) fn apply_pending_set_config(&mut self, context: &mut QueryEngineContext<'_>) {
let Some(param) = self.pending_set_config.take() else {
return;
};

if param.local {
if context.in_transaction() {
context
.params
.insert_transaction(&param.name, param.value, true);
}
return;
}

if context.in_transaction() {
context
.params
.insert_transaction(&param.name, param.value, false);
} else {
context.params.insert(&param.name, param.value);
self.comms.update_params(context.params);
}
}

pub(super) fn clear_pending_set_config(&mut self) {
self.pending_set_config = None;
}
}
5 changes: 5 additions & 0 deletions pgdog/src/frontend/router/parser/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ pub enum Command {
params: Vec<SetParam>,
route: Route,
},
SetConfig {
param: SetParam,
route: Route,
},
ResetAll,
PreparedStatement(Prepare),
InternalField {
Expand Down Expand Up @@ -67,6 +71,7 @@ impl Command {
match self {
Self::Query(route) => route,
Self::Set { route, .. } => route,
Self::SetConfig { route, .. } => route,
Self::StartTransaction { route, .. } => route,
_ => &DEFAULT_ROUTE,
}
Expand Down
1 change: 1 addition & 0 deletions pgdog/src/frontend/router/parser/function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ static WRITE_ONLY: Lazy<HashMap<&'static str, LockingBehavior>> = Lazy::new(|| {
("pg_advisory_unlock", LockingBehavior::Unlock), // TODO: we don't track multiple advisory locks.
("nextval", LockingBehavior::None),
("setval", LockingBehavior::None),
("set_config", LockingBehavior::None),
])
});

Expand Down
4 changes: 3 additions & 1 deletion pgdog/src/frontend/router/parser/query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,9 @@ impl QueryParser {
};

match &mut command {
Command::Query(route) | Command::Set { route, .. } => {
Command::Query(route)
| Command::Set { route, .. }
| Command::SetConfig { route, .. } => {
if route.is_cross_shard() && context.shards == 1 {
context
.shards_calculator
Expand Down
93 changes: 93 additions & 0 deletions pgdog/src/frontend/router/parser/query/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,15 @@ impl QueryParser {
stmt: &SelectStmt,
context: &mut QueryParserContext,
) -> Result<Command, Error> {
if cached_ast.parse_result().protobuf.stmts.len() == 1
&& let Some(param) = Self::parse_set_config(stmt)
{
return Ok(Command::SetConfig {
param,
route: Route::write(context.shards_calculator.shard()),
});
}

let cte_writes = Self::cte_writes(stmt);
let has_locking = Self::has_locking_clause(stmt);
let mut overrides = Self::functions(stmt)?;
Expand Down Expand Up @@ -231,6 +240,90 @@ impl QueryParser {
))
}

fn parse_set_config(stmt: &SelectStmt) -> Option<SetParam> {
if !Self::is_plain_function_select(stmt) {
return None;
}

let target = stmt.target_list.first()?;
let func = match &target.node {
Some(NodeEnum::FuncCall(func)) => func,
Some(NodeEnum::ResTarget(res)) => {
let val = res.val.as_ref()?;
match &val.node {
Some(NodeEnum::FuncCall(func)) => func,
_ => return None,
}
}
_ => return None,
};

if !Self::is_set_config_function(&func.funcname) || func.args.len() != 3 {
return None;
}

Some(SetParam {
name: Self::string_const(&func.args[0])?,
value: ParameterValue::String(Self::string_const(&func.args[1])?),
local: Self::bool_const(&func.args[2])?,
Comment on lines +265 to +268

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Preserve multi-entry search_path values

When set_config is used for a multi-entry search_path, e.g. SELECT set_config('search_path', 'public, pg_catalog', false), this stores the whole second argument as a single ParameterValue::String. On the next backend checkout PgDog replays tracked params via SET "search_path" TO {value}, so that value becomes SET "search_path" TO "public, pg_catalog", which PostgreSQL interprets as one quoted schema name rather than the two schemas that were actually set. This breaks session-state replay for clients that use set_config with normal comma-separated search paths.

Useful? React with 👍 / 👎.

reset: false,
})
}

fn is_plain_function_select(stmt: &SelectStmt) -> bool {
stmt.target_list.len() == 1
&& stmt.from_clause.is_empty()
&& stmt.where_clause.is_none()
&& stmt.group_clause.is_empty()
&& stmt.having_clause.is_none()
&& stmt.sort_clause.is_empty()
&& stmt.locking_clause.is_empty()
&& stmt.with_clause.is_none()
&& stmt.limit_count.is_none()
&& stmt.limit_offset.is_none()
&& stmt.values_lists.is_empty()
&& stmt.window_clause.is_empty()
&& stmt.distinct_clause.is_empty()
&& stmt.larg.is_none()
&& stmt.rarg.is_none()
&& stmt.op() == SetOperation::SetopNone
}

fn is_set_config_function(funcname: &[Node]) -> bool {
let parts: Option<Vec<_>> = funcname
.iter()
.map(|node| match &node.node {
Some(NodeEnum::String(string)) => Some(string.sval.as_str()),
_ => None,
})
.collect();

matches!(
parts.as_deref(),
Some(["set_config"] | ["pg_catalog", "set_config"])
)
}

fn string_const(node: &Node) -> Option<std::string::String> {
match &node.node {
Some(NodeEnum::AConst(AConst {
val: Some(Val::Sval(String { sval })),
..
})) => Some(sval.clone()),
_ => None,
}
}

fn bool_const(node: &Node) -> Option<bool> {
match &node.node {
Some(NodeEnum::AConst(AConst {
val: Some(Val::Boolval(Boolean { boolval })),
..
})) => Some(*boolval),
_ => None,
}
}

/// Handle the `ORDER BY` clause of a `SELECT` statement.
///
/// # Arguments
Expand Down
Loading