Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
209 changes: 188 additions & 21 deletions src/connection/impls.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use bytes::Buf;
use futures::stream::{FuturesOrdered, StreamExt};
use postgres_types::ToSql;
use pyo3::{PyAny, Python};
use tokio_postgres::{CopyInSink, Portal as tp_Portal, Row, Statement, ToStatement};

Expand Down Expand Up @@ -426,6 +428,63 @@ impl PSQLPyConnection {

/// Execute many queries without return.
///
/// ## Performance model
///
/// Two coupled mechanisms give this method its throughput:
///
/// 1. **Pipelining.** All `Bind`/`Execute` messages are issued against the
/// same connection via concurrently-polled futures (`FuturesOrdered`).
/// tokio-postgres dispatches them back-to-back without waiting for
/// intermediate replies, eliminating the per-row round-trip stall that
/// a naive `for ... await` loop produces.
///
/// 2. **Single transactional fsync.** Every standalone `INSERT`/`UPDATE`/
/// `DELETE` outside a transaction is its own implicit auto-commit, and
/// `PostgreSQL` fsyncs the WAL per commit. Pipelining alone collapses
/// network latency but leaves N fsyncs on the table, capping throughput
/// well below what a "real" batch achieves. Wrapping the pipelined
/// batch in a single transaction reduces this to one fsync.
///
/// Locally-measured: 1000-row INSERT batch went from ~1300 ms sequential
/// → ~1000 ms pipelined alone → ~32 ms pipelined within a transaction.
/// The transaction wrap is what produces the order-of-magnitude win;
/// pipelining alone is insufficient. This matches asyncpg's `executemany`
/// behaviour, which the project benchmarks against (see issue #167).
///
/// ## Transaction wrapping policy
///
/// When the caller is **not** already in a transaction (the connection's
/// `in_transaction()` flag is `false`), the batch is wrapped in an
/// implicit `BEGIN`/`COMMIT`. On error, `ROLLBACK` is issued before
/// returning, leaving the connection in a clean state.
///
/// When the caller **is** already in a transaction (this is invoked via
/// `Transaction::execute_many`), the batch is wrapped in a SAVEPOINT
/// (`SAVEPOINT psqlpy_execute_many` … `RELEASE` on success;
/// `ROLLBACK TO` + `RELEASE` on failure). The savepoint costs two extra
/// pipelineable round-trips but makes the failure contract symmetric
/// across both call sites: a failed batch never poisons the caller's
/// surrounding transaction. Without the savepoint, a single failing row
/// would leave the outer transaction in aborted state and force the
/// caller to roll back work they may have wanted to keep — a footgun
/// that's hard to document away when the same method name behaves
/// differently on a `Connection` vs a `Transaction`.
///
/// asyncpg does not auto-savepoint; we deliberately diverge here. The
/// reasoning is that psqlpy's `Connection::execute_many` *must* wrap in
/// a transaction to get the fsync win, so the failure-isolation
/// asymmetry between the two call sites already exists — savepoints
/// just bring `Transaction::execute_many` into line.
///
/// ## Behavioural change vs prior implementation
///
/// Previously this method ran each row as an independent auto-commit,
/// so a mid-batch failure left earlier rows committed. The new wrap
/// makes the whole batch atomic. This matches asyncpg / psycopg
/// `executemany` expectations and the issue's framing of `execute_many`
/// as a bulk operation, but it is a semantic change worth flagging in
/// release notes.
///
/// # Errors
/// May return error if there is some problem with DB communication.
pub async fn execute_many(
Expand All @@ -437,11 +496,13 @@ impl PSQLPyConnection {
let Some(parameters) = parameters else {
return Ok(());
};
if parameters.is_empty() {
return Ok(());
}

let prepared = prepared.unwrap_or(true);

let mut statements: Vec<PsqlpyStatement> = Vec::with_capacity(parameters.len());

for param_set in parameters {
let statement =
StatementBuilder::new(&querystring, &Some(param_set), self, Some(prepared))
Expand All @@ -459,41 +520,111 @@ impl PSQLPyConnection {
return Ok(());
}

let wrap = if self.in_transaction() {
ExecuteManyWrap::Savepoint
} else {
ExecuteManyWrap::Transaction
};

self.batch_execute(wrap.open_sql()).await.map_err(|err| {
RustPSQLDriverError::ConnectionExecuteError(format!(
"Cannot open transaction wrap in execute_many: {err}"
))
})?;

let batch_result = self.run_pipelined_batch(&statements, prepared).await;

let close_sql = wrap.close_sql(batch_result.is_ok());
let close_result = self.batch_execute(close_sql).await;

match (batch_result, close_result) {
(Ok(()), Ok(())) => Ok(()),
(Ok(()), Err(close_err)) => Err(RustPSQLDriverError::ConnectionExecuteError(format!(
"Failed to finalize execute_many wrap: {close_err}"
))),
// When the batch already failed, the close path is best-effort:
// the original error is the root cause and carries the diagnostic
// the caller needs. A failed ROLLBACK / ROLLBACK TO is almost
// always a downstream consequence of the same connection issue.
(Err(batch_err), _) => Err(batch_err),
}
}

/// Pipeline the bound parameter sets across a single connection.
///
/// All futures are pushed into a `FuturesOrdered` and polled together so
/// tokio-postgres can issue their `Bind`/`Execute` messages back-to-back.
/// On the first error we *keep draining* remaining futures (rather than
/// short-circuiting with `?`) so already-sent messages can be acknowledged
/// and the connection returns to a quiescent state before the caller
/// issues the close-wrap statement. The first error is what we propagate.
async fn run_pipelined_batch(
&self,
statements: &[PsqlpyStatement],
prepared: bool,
) -> PSQLPyResult<()> {
// Materialize parameter slices into owned boxes so the borrows feeding
// each future live for the whole pipeline (the slices reference data
// owned by each `PsqlpyStatement`, which already outlives this fn).
if prepared {
let first_statement = &statements[0];
let prepared_stmt = self
.prepare(first_statement.raw_query(), true)
.prepare(statements[0].raw_query(), true)
.await
.map_err(|err| {
RustPSQLDriverError::ConnectionExecuteError(format!(
"Cannot prepare statement in execute_many: {err}"
))
})?;

// Execute all statements using the same prepared statement
for statement in statements {
self.query(&prepared_stmt, &statement.params())
.await
.map_err(|err| {
RustPSQLDriverError::ConnectionExecuteError(format!(
let param_boxes: Vec<Box<[&(dyn ToSql + Sync)]>> =
statements.iter().map(PsqlpyStatement::params).collect();

let mut ordered: FuturesOrdered<_> = param_boxes
.iter()
.map(|p| self.query(&prepared_stmt, p))
.collect();

let mut first_err: Option<RustPSQLDriverError> = None;
while let Some(res) = ordered.next().await {
if let Err(err) = res {
if first_err.is_none() {
first_err = Some(RustPSQLDriverError::ConnectionExecuteError(format!(
"Error occurred in `execute_many` statement: {err}"
))
})?;
)));
}
}
}
match first_err {
Some(e) => Err(e),
None => Ok(()),
}
} else {
// Execute each statement without preparation
for statement in statements {
self.query(statement.raw_query(), &statement.params())
.await
.map_err(|err| {
RustPSQLDriverError::ConnectionExecuteError(format!(
let param_boxes: Vec<_> = statements
.iter()
.map(PsqlpyStatement::params_typed)
.collect();

let mut ordered: FuturesOrdered<_> = statements
.iter()
.zip(param_boxes.iter())
.map(|(s, p)| self.query_typed(s.raw_query(), p))
.collect();

let mut first_err: Option<RustPSQLDriverError> = None;
while let Some(res) = ordered.next().await {
if let Err(err) = res {
if first_err.is_none() {
first_err = Some(RustPSQLDriverError::ConnectionExecuteError(format!(
"Error occurred in `execute_many` statement: {err}"
))
})?;
)));
}
}
}
match first_err {
Some(e) => Err(e),
None => Ok(()),
}
}

Ok(())
}

/// Execute raw query with parameters. Return one raw row
Expand Down Expand Up @@ -655,3 +786,39 @@ impl PSQLPyConnection {
Ok((transaction, inner_portal))
}
}

/// How `execute_many` brackets its pipelined batch.
///
/// The variant is chosen at call-time from `PSQLPyConnection::in_transaction()`:
/// a connection that is not already in a transaction gets the implicit
/// `BEGIN`/`COMMIT`; one that is already inside a transaction uses a savepoint
/// so failure of the batch can never poison the caller's surrounding work.
///
/// The savepoint name (`psqlpy_execute_many`) is internal — it collides only
/// with a user-managed savepoint of the same name, which would require the
/// caller to be reaching past the public API.
#[derive(Clone, Copy)]
enum ExecuteManyWrap {
Transaction,
Savepoint,
}

impl ExecuteManyWrap {
fn open_sql(self) -> &'static str {
match self {
ExecuteManyWrap::Transaction => "BEGIN",
ExecuteManyWrap::Savepoint => "SAVEPOINT psqlpy_execute_many",
}
}

fn close_sql(self, batch_ok: bool) -> &'static str {
match (self, batch_ok) {
(ExecuteManyWrap::Transaction, true) => "COMMIT",
(ExecuteManyWrap::Transaction, false) => "ROLLBACK",
(ExecuteManyWrap::Savepoint, true) => "RELEASE SAVEPOINT psqlpy_execute_many",
(ExecuteManyWrap::Savepoint, false) => {
"ROLLBACK TO SAVEPOINT psqlpy_execute_many; RELEASE SAVEPOINT psqlpy_execute_many"
}
}
}
}
Loading