Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions pgdog/src/frontend/client/query_engine/multi_step/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ use crate::{
frontend::{
ClientRequest, Command, Router, RouterContext,
client::query_engine::{QueryEngine, QueryEngineContext},
router::{
Route,
parser::route::{Shard, ShardWithPriority},
},
},
net::Protocol,
};
Expand Down Expand Up @@ -30,6 +34,27 @@ impl<'a> InsertMulti<'a> {
}
}

/// If every split routes to the same `Shard::Direct(n)`, return that shard
/// number. Returns `None` when the splits span multiple shards or contain
/// any non-direct routing.
fn uniform_shard(&self) -> Option<usize> {
let first = match self.requests.first()?.route.as_ref()?.shard() {
Shard::Direct(n) => *n,
_ => return None,
};

self.requests
.iter()
.skip(1)
.all(|req| {
matches!(
req.route.as_ref().map(|r| r.shard()),
Some(Shard::Direct(n)) if *n == first
)
})
.then_some(first)
}

/// Execute the multi-shard INSERT.
pub(crate) async fn execute(
&'a mut self,
Expand All @@ -53,6 +78,27 @@ impl<'a> InsertMulti<'a> {
}
}

// All tuples map to the same shard: send the original multi-row INSERT
// as a single statement, skipping the multi-step path entirely.
if let Some(shard_n) = self.uniform_shard() {
context.client_request.route = Some(Route::write(ShardWithPriority::new_table(
Shard::Direct(shard_n),
)));
self.engine
.backend
.handle_client_request(
context.client_request,
&mut self.engine.router,
self.engine.streaming,
)
.await?;
while self.engine.backend.has_more_messages() {
let message = self.engine.read_server_message().await?;
self.engine.process_server_message(context, message).await?;
}
return Ok(false);
}

if !self.engine.backend.is_multishard() {
return Err(Error::MultiShardRequired);
}
Expand Down
61 changes: 61 additions & 0 deletions pgdog/src/frontend/client/query_engine/multi_step/test/insert.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
use crate::{
frontend::{
ClientRequest,
client::{query_engine::QueryEngineContext, test::TestClient},
},
net::{Parameters, Query},
};

#[tokio::test]
async fn test_same_shard_insert_uses_direct_route() {
crate::logger();

let mut client = TestClient::new_rewrites(Parameters::default()).await;
let id1 = client.random_id_for_shard(0);
let id2 = client.random_id_for_shard(0);

client.client.client_request = ClientRequest::from(vec![
Query::new(format!(
"INSERT INTO sharded (id, value) VALUES ({}, 'a'), ({}, 'b')",
id1, id2
))
.into(),
]);

let mut context = QueryEngineContext::new(&mut client.client);
client.engine.parse_and_rewrite(&mut context).await.unwrap();
client.engine.route_query(&mut context).await.unwrap();
client.engine.execute(&mut context).await.unwrap();

assert!(
context.client_request.route().shard().is_direct(),
"same-shard INSERT should bypass the split and use direct-to-shard routing"
);
}

#[tokio::test]
async fn test_cross_shard_insert_uses_all_shards() {
crate::logger();

let mut client = TestClient::new_rewrites(Parameters::default()).await;
let id0 = client.random_id_for_shard(0);
let id1 = client.random_id_for_shard(1);

client.client.client_request = ClientRequest::from(vec![
Query::new(format!(
"INSERT INTO sharded (id, value) VALUES ({}, 'a'), ({}, 'b')",
id0, id1
))
.into(),
]);

let mut context = QueryEngineContext::new(&mut client.client);
client.engine.parse_and_rewrite(&mut context).await.unwrap();
client.engine.route_query(&mut context).await.unwrap();
client.engine.execute(&mut context).await.unwrap();

assert!(
!context.client_request.route().shard().is_direct(),
"cross-shard INSERT must go through the split path, not the direct route"
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::{
net::{Query, ToBytes},
};

pub mod insert;
pub mod prepared;
pub mod simple;
pub mod update;
Expand Down
Loading