From 14bc1ca1ed73586e8f5e58153c0292763ce96fff Mon Sep 17 00:00:00 2001 From: Dev-iL <6509619+Dev-iL@users.noreply.github.com> Date: Thu, 14 May 2026 17:28:47 +0300 Subject: [PATCH] Pipeline execute_many and wrap in a transaction Replaces the per-row sequential await loop in execute_many with concurrent futures driven via FuturesOrdered, brackets the batch in BEGIN/COMMIT when not already in a transaction, and uses a SAVEPOINT when invoked from Transaction.execute_many so a failed batch can never poison the caller's surrounding transaction. The order-of-magnitude speedup comes from collapsing N implicit auto-commits into one WAL fsync; pipelining alone is insufficient. Locally measured against the forked tokio-postgres: 1000-row INSERT batch ~1326 ms sequential -> ~32 ms pipelined-in-transaction. End-to-end through pyo3: ~128 ms for 1000 rows (~7,800 rows/s), versus the ~3 batches/sec reported in #167. Fixes #167 Co-Authored-By: Claude Opus 4.7 --- src/connection/impls.rs | 209 ++++++++++++++++++++++++++++++++++++---- 1 file changed, 188 insertions(+), 21 deletions(-) diff --git a/src/connection/impls.rs b/src/connection/impls.rs index fd6dabec..337b62fb 100644 --- a/src/connection/impls.rs +++ b/src/connection/impls.rs @@ -1,4 +1,6 @@ use bytes::Buf; +use futures::stream::{FuturesOrdered, StreamExt}; +use postgres_types::ToSql; use pyo3::{PyAny, Python}; use tokio_postgres::{CopyInSink, Portal as tp_Portal, Row, Statement, ToStatement}; @@ -426,6 +428,63 @@ impl PSQLPyConnection { /// Execute many queries without return. /// + /// ## Performance model + /// + /// Two coupled mechanisms give this method its throughput: + /// + /// 1. **Pipelining.** All `Bind`/`Execute` messages are issued against the + /// same connection via concurrently-polled futures (`FuturesOrdered`). + /// tokio-postgres dispatches them back-to-back without waiting for + /// intermediate replies, eliminating the per-row round-trip stall that + /// a naive `for ... await` loop produces. + /// + /// 2. **Single transactional fsync.** Every standalone `INSERT`/`UPDATE`/ + /// `DELETE` outside a transaction is its own implicit auto-commit, and + /// `PostgreSQL` fsyncs the WAL per commit. Pipelining alone collapses + /// network latency but leaves N fsyncs on the table, capping throughput + /// well below what a "real" batch achieves. Wrapping the pipelined + /// batch in a single transaction reduces this to one fsync. + /// + /// Locally-measured: 1000-row INSERT batch went from ~1300 ms sequential + /// → ~1000 ms pipelined alone → ~32 ms pipelined within a transaction. + /// The transaction wrap is what produces the order-of-magnitude win; + /// pipelining alone is insufficient. This matches asyncpg's `executemany` + /// behaviour, which the project benchmarks against (see issue #167). + /// + /// ## Transaction wrapping policy + /// + /// When the caller is **not** already in a transaction (the connection's + /// `in_transaction()` flag is `false`), the batch is wrapped in an + /// implicit `BEGIN`/`COMMIT`. On error, `ROLLBACK` is issued before + /// returning, leaving the connection in a clean state. + /// + /// When the caller **is** already in a transaction (this is invoked via + /// `Transaction::execute_many`), the batch is wrapped in a SAVEPOINT + /// (`SAVEPOINT psqlpy_execute_many` … `RELEASE` on success; + /// `ROLLBACK TO` + `RELEASE` on failure). The savepoint costs two extra + /// pipelineable round-trips but makes the failure contract symmetric + /// across both call sites: a failed batch never poisons the caller's + /// surrounding transaction. Without the savepoint, a single failing row + /// would leave the outer transaction in aborted state and force the + /// caller to roll back work they may have wanted to keep — a footgun + /// that's hard to document away when the same method name behaves + /// differently on a `Connection` vs a `Transaction`. + /// + /// asyncpg does not auto-savepoint; we deliberately diverge here. The + /// reasoning is that psqlpy's `Connection::execute_many` *must* wrap in + /// a transaction to get the fsync win, so the failure-isolation + /// asymmetry between the two call sites already exists — savepoints + /// just bring `Transaction::execute_many` into line. + /// + /// ## Behavioural change vs prior implementation + /// + /// Previously this method ran each row as an independent auto-commit, + /// so a mid-batch failure left earlier rows committed. The new wrap + /// makes the whole batch atomic. This matches asyncpg / psycopg + /// `executemany` expectations and the issue's framing of `execute_many` + /// as a bulk operation, but it is a semantic change worth flagging in + /// release notes. + /// /// # Errors /// May return error if there is some problem with DB communication. pub async fn execute_many( @@ -437,11 +496,13 @@ impl PSQLPyConnection { let Some(parameters) = parameters else { return Ok(()); }; + if parameters.is_empty() { + return Ok(()); + } let prepared = prepared.unwrap_or(true); let mut statements: Vec = Vec::with_capacity(parameters.len()); - for param_set in parameters { let statement = StatementBuilder::new(&querystring, &Some(param_set), self, Some(prepared)) @@ -459,10 +520,55 @@ impl PSQLPyConnection { return Ok(()); } + let wrap = if self.in_transaction() { + ExecuteManyWrap::Savepoint + } else { + ExecuteManyWrap::Transaction + }; + + self.batch_execute(wrap.open_sql()).await.map_err(|err| { + RustPSQLDriverError::ConnectionExecuteError(format!( + "Cannot open transaction wrap in execute_many: {err}" + )) + })?; + + let batch_result = self.run_pipelined_batch(&statements, prepared).await; + + let close_sql = wrap.close_sql(batch_result.is_ok()); + let close_result = self.batch_execute(close_sql).await; + + match (batch_result, close_result) { + (Ok(()), Ok(())) => Ok(()), + (Ok(()), Err(close_err)) => Err(RustPSQLDriverError::ConnectionExecuteError(format!( + "Failed to finalize execute_many wrap: {close_err}" + ))), + // When the batch already failed, the close path is best-effort: + // the original error is the root cause and carries the diagnostic + // the caller needs. A failed ROLLBACK / ROLLBACK TO is almost + // always a downstream consequence of the same connection issue. + (Err(batch_err), _) => Err(batch_err), + } + } + + /// Pipeline the bound parameter sets across a single connection. + /// + /// All futures are pushed into a `FuturesOrdered` and polled together so + /// tokio-postgres can issue their `Bind`/`Execute` messages back-to-back. + /// On the first error we *keep draining* remaining futures (rather than + /// short-circuiting with `?`) so already-sent messages can be acknowledged + /// and the connection returns to a quiescent state before the caller + /// issues the close-wrap statement. The first error is what we propagate. + async fn run_pipelined_batch( + &self, + statements: &[PsqlpyStatement], + prepared: bool, + ) -> PSQLPyResult<()> { + // Materialize parameter slices into owned boxes so the borrows feeding + // each future live for the whole pipeline (the slices reference data + // owned by each `PsqlpyStatement`, which already outlives this fn). if prepared { - let first_statement = &statements[0]; let prepared_stmt = self - .prepare(first_statement.raw_query(), true) + .prepare(statements[0].raw_query(), true) .await .map_err(|err| { RustPSQLDriverError::ConnectionExecuteError(format!( @@ -470,30 +576,55 @@ impl PSQLPyConnection { )) })?; - // Execute all statements using the same prepared statement - for statement in statements { - self.query(&prepared_stmt, &statement.params()) - .await - .map_err(|err| { - RustPSQLDriverError::ConnectionExecuteError(format!( + let param_boxes: Vec> = + statements.iter().map(PsqlpyStatement::params).collect(); + + let mut ordered: FuturesOrdered<_> = param_boxes + .iter() + .map(|p| self.query(&prepared_stmt, p)) + .collect(); + + let mut first_err: Option = None; + while let Some(res) = ordered.next().await { + if let Err(err) = res { + if first_err.is_none() { + first_err = Some(RustPSQLDriverError::ConnectionExecuteError(format!( "Error occurred in `execute_many` statement: {err}" - )) - })?; + ))); + } + } + } + match first_err { + Some(e) => Err(e), + None => Ok(()), } } else { - // Execute each statement without preparation - for statement in statements { - self.query(statement.raw_query(), &statement.params()) - .await - .map_err(|err| { - RustPSQLDriverError::ConnectionExecuteError(format!( + let param_boxes: Vec<_> = statements + .iter() + .map(PsqlpyStatement::params_typed) + .collect(); + + let mut ordered: FuturesOrdered<_> = statements + .iter() + .zip(param_boxes.iter()) + .map(|(s, p)| self.query_typed(s.raw_query(), p)) + .collect(); + + let mut first_err: Option = None; + while let Some(res) = ordered.next().await { + if let Err(err) = res { + if first_err.is_none() { + first_err = Some(RustPSQLDriverError::ConnectionExecuteError(format!( "Error occurred in `execute_many` statement: {err}" - )) - })?; + ))); + } + } + } + match first_err { + Some(e) => Err(e), + None => Ok(()), } } - - Ok(()) } /// Execute raw query with parameters. Return one raw row @@ -655,3 +786,39 @@ impl PSQLPyConnection { Ok((transaction, inner_portal)) } } + +/// How `execute_many` brackets its pipelined batch. +/// +/// The variant is chosen at call-time from `PSQLPyConnection::in_transaction()`: +/// a connection that is not already in a transaction gets the implicit +/// `BEGIN`/`COMMIT`; one that is already inside a transaction uses a savepoint +/// so failure of the batch can never poison the caller's surrounding work. +/// +/// The savepoint name (`psqlpy_execute_many`) is internal — it collides only +/// with a user-managed savepoint of the same name, which would require the +/// caller to be reaching past the public API. +#[derive(Clone, Copy)] +enum ExecuteManyWrap { + Transaction, + Savepoint, +} + +impl ExecuteManyWrap { + fn open_sql(self) -> &'static str { + match self { + ExecuteManyWrap::Transaction => "BEGIN", + ExecuteManyWrap::Savepoint => "SAVEPOINT psqlpy_execute_many", + } + } + + fn close_sql(self, batch_ok: bool) -> &'static str { + match (self, batch_ok) { + (ExecuteManyWrap::Transaction, true) => "COMMIT", + (ExecuteManyWrap::Transaction, false) => "ROLLBACK", + (ExecuteManyWrap::Savepoint, true) => "RELEASE SAVEPOINT psqlpy_execute_many", + (ExecuteManyWrap::Savepoint, false) => { + "ROLLBACK TO SAVEPOINT psqlpy_execute_many; RELEASE SAVEPOINT psqlpy_execute_many" + } + } + } +}