diff --git a/pgdog/src/backend/databases.rs b/pgdog/src/backend/databases.rs index f3a840480..c77b0a032 100644 --- a/pgdog/src/backend/databases.rs +++ b/pgdog/src/backend/databases.rs @@ -22,7 +22,10 @@ use crate::frontend::router::sharding::Mapping; use crate::frontend::router::sharding::mapping::mapping_valid; use crate::{ backend::pool::PoolConfig, - config::{ConfigAndUsers, ManualQuery, Role, User as ConfigUser, config, load, set}, + config::{ + ConfigAndUsers, ManualQuery, Role, ShardedMapping, ShardedTable, User as ConfigUser, + config, load, set, + }, net::{messages::FrontendPid, tls}, }; @@ -487,6 +490,41 @@ impl Databases { } } +fn resolve_table_mappings( + tables: &mut [ShardedTable], + mappings: &HashMap<(String, String, Option), Vec>, +) { + for table in tables { + let found = mappings + .get(&( + table.database.clone(), + table.column.clone(), + table.name.clone(), + )) + .or_else(|| { + table.name.as_ref().and_then(|_| { + // if the table is specified for `sharded_tables`, try to apply the default + // mapping that is not tied to specific table + mappings.get(&(table.database.clone(), table.column.clone(), None)) + }) + }); + + if let Some(found) = found { + table.mapping = Mapping::new(found); + + if let Some(ref mapping) = table.mapping + && !mapping_valid(mapping) + { + warn!( + "sharded table name=\"{}\", column=\"{}\" has overlapping ranges", + table.name.as_deref().unwrap_or_default(), + table.column + ); + } + } + } +} + fn new_pool(user: &crate::config::User, config: &crate::config::Config) -> Option<(User, Cluster)> { let sharded_tables = config.sharded_tables(); let omnisharded_tables = config.omnisharded_tables(); @@ -528,27 +566,7 @@ fn new_pool(user: &crate::config::User, config: &crate::config::Config) -> Optio .cloned() .unwrap_or_default(); - for sharded_table in &mut sharded_tables { - let mappings = sharded_mappings.get(&( - sharded_table.database.clone(), - sharded_table.column.clone(), - sharded_table.name.clone(), - )); - - if let Some(mappings) = mappings { - sharded_table.mapping = Mapping::new(mappings); - - if let Some(ref mapping) = sharded_table.mapping - && !mapping_valid(mapping) - { - warn!( - "sharded table name=\"{}\", column=\"{}\" has overlapping ranges", - sharded_table.name.as_ref().unwrap_or(&String::from("")), - sharded_table.column - ); - } - } - } + resolve_table_mappings(&mut sharded_tables, &sharded_mappings); let omnisharded_tables = omnisharded_tables .get(&user.database) diff --git a/pgdog/src/frontend/router/sharding/test/mod.rs b/pgdog/src/frontend/router/sharding/test/mod.rs index da1eadb37..ffdb6cdb6 100644 --- a/pgdog/src/frontend/router/sharding/test/mod.rs +++ b/pgdog/src/frontend/router/sharding/test/mod.rs @@ -2,9 +2,10 @@ use std::{collections::HashSet, str::from_utf8}; use rand::seq::SliceRandom; +use crate::config::{ShardedMapping, ShardedMappingKind, ShardedTable}; use crate::{ backend::server::test::test_server, - config::{FlexibleType, ShardedMapping, ShardedMappingKind}, + config::FlexibleType, net::{Bind, DataRow, Execute, FromBytes, Parse, Protocol, Query, Sync, bind::Parameter}, }; @@ -503,3 +504,281 @@ async fn test_shard_by_list_with_default() { server.execute("ROLLBACK").await.unwrap(); } + +fn make_sharding_schema( + sharded_tables: Vec, + sharded_mappings: Vec, +) -> ShardingSchema { + use crate::backend::databases::from_config; + use crate::config::{Config, ConfigAndUsers, Database, Role, User, Users}; + use std::path::PathBuf; + + let config = Config { + databases: vec![ + Database { + name: "db".into(), + host: "localhost".into(), + port: 5432, + role: Role::Primary, + shard: 0, + ..Default::default() + }, + Database { + name: "db".into(), + host: "localhost".into(), + port: 5432, + role: Role::Primary, + shard: 1, + ..Default::default() + }, + ], + sharded_tables, + sharded_mappings, + ..Default::default() + }; + let users = Users { + users: vec![User { + name: "user".into(), + database: "db".into(), + ..Default::default() + }], + ..Default::default() + }; + let databases = from_config(&ConfigAndUsers { + config, + users, + config_path: PathBuf::new(), + users_path: PathBuf::new(), + }); + databases.cluster(("user", "db")).unwrap().sharding_schema() +} + +fn route(schema: &ShardingSchema, table_name: &str, value: i64) -> Shard { + use crate::frontend::router::parser::Column; + + let col = Column { + name: "id", + table: Some(table_name), + schema: None, + }; + + let t = schema + .tables() + .get_table(col) + .expect("table/column not found in schema"); + + ContextBuilder::new(t) + .data(value) + .shards(schema.shards) + .build() + .unwrap() + .apply() + .unwrap() +} + +#[test] +fn test_same_tables_specific_and_mapping_specific() { + let schema = make_sharding_schema( + vec![ShardedTable { + database: "db".into(), + name: Some("users".into()), + column: "id".into(), + data_type: DataType::Bigint, + ..Default::default() + }], + vec![ + ShardedMapping { + database: "db".into(), + column: "id".into(), + table: Some("users".into()), + kind: ShardedMappingKind::List, + values: [1, 2, 3].into_iter().map(FlexibleType::Integer).collect(), + shard: 0, + ..Default::default() + }, + ShardedMapping { + database: "db".into(), + column: "id".into(), + table: Some("users".into()), + kind: ShardedMappingKind::List, + values: [4, 5, 6].into_iter().map(FlexibleType::Integer).collect(), + shard: 1, + ..Default::default() + }, + ], + ); + + for v in [1, 2, 3] { + assert_eq!( + route(&schema, "users", v), + Shard::Direct(0), + "id={v} should go to shard 0" + ); + } + for v in [4, 5, 6] { + assert_eq!( + route(&schema, "users", v), + Shard::Direct(1), + "id={v} should go to shard 1" + ); + } +} + +#[test] +fn test_different_tables_specific_and_mapping_specific() { + let schema = make_sharding_schema( + vec![ShardedTable { + database: "db".into(), + name: Some("users".into()), + column: "id".into(), + data_type: DataType::Bigint, + ..Default::default() + }], + vec![ShardedMapping { + database: "db".into(), + column: "id".into(), + table: Some("orders".into()), + kind: ShardedMappingKind::List, + values: [1, 2, 3].into_iter().map(FlexibleType::Integer).collect(), + shard: 0, + ..Default::default() + }], + ); + + assert_eq!(route(&schema, "users", 1), Shard::Direct(0)); + assert_eq!(route(&schema, "users", 3), Shard::Direct(1)); +} + +#[test] +fn test_tables_all_and_mapping_all() { + let schema = make_sharding_schema( + vec![ShardedTable { + database: "db".into(), + column: "id".into(), + data_type: DataType::Bigint, + ..Default::default() + }], + vec![ + ShardedMapping { + database: "db".into(), + column: "id".into(), + kind: ShardedMappingKind::List, + values: [1, 2, 3].into_iter().map(FlexibleType::Integer).collect(), + shard: 0, + ..Default::default() + }, + ShardedMapping { + database: "db".into(), + column: "id".into(), + kind: ShardedMappingKind::List, + values: [4, 5, 6].into_iter().map(FlexibleType::Integer).collect(), + shard: 1, + ..Default::default() + }, + ], + ); + + for i in [1, 2, 3] { + assert_eq!(route(&schema, "users", i), Shard::Direct(0)); + assert_eq!(route(&schema, "orders", i), Shard::Direct(0)); + } + + for i in [4, 5, 6] { + assert_eq!(route(&schema, "users", i), Shard::Direct(1)); + assert_eq!(route(&schema, "orders", i), Shard::Direct(1)); + } +} + +#[test] +fn test_tables_specific_and_mapping_all() { + let schema = make_sharding_schema( + vec![ShardedTable { + database: "db".into(), + name: Some("users".into()), + column: "id".into(), + data_type: DataType::Bigint, + ..Default::default() + }], + vec![ + ShardedMapping { + database: "db".into(), + column: "id".into(), + kind: ShardedMappingKind::List, + values: [1, 2, 3].into_iter().map(FlexibleType::Integer).collect(), + shard: 0, + ..Default::default() + }, + ShardedMapping { + database: "db".into(), + column: "id".into(), + kind: ShardedMappingKind::List, + values: [4, 5, 6].into_iter().map(FlexibleType::Integer).collect(), + shard: 1, + ..Default::default() + }, + ], + ); + + for v in [1, 2, 3] { + assert_eq!( + route(&schema, "users", v), + Shard::Direct(0), + "id={v} should go to shard 0" + ); + } + for v in [4, 5, 6] { + assert_eq!( + route(&schema, "users", v), + Shard::Direct(1), + "id={v} should go to shard 1" + ); + } +} + +#[test] +#[ignore = "Mapping cannot be specified without sharded table"] +fn test_tables_all_and_mapping_specific() { + let schema = make_sharding_schema( + vec![ShardedTable { + database: "db".into(), + column: "id".into(), + data_type: DataType::Bigint, + ..Default::default() + }], + vec![ + ShardedMapping { + database: "db".into(), + table: Some("users".into()), + column: "id".into(), + kind: ShardedMappingKind::List, + values: [1, 2, 3].into_iter().map(FlexibleType::Integer).collect(), + shard: 0, + ..Default::default() + }, + ShardedMapping { + database: "db".into(), + table: Some("users".into()), + column: "id".into(), + kind: ShardedMappingKind::List, + values: [4, 5, 6].into_iter().map(FlexibleType::Integer).collect(), + shard: 1, + ..Default::default() + }, + ], + ); + + for v in [1, 2, 3] { + assert_eq!( + route(&schema, "users", v), + Shard::Direct(0), + "id={v} should go to shard 0" + ); + } + for v in [4, 5, 6] { + assert_eq!( + route(&schema, "users", v), + Shard::Direct(1), + "id={v} should go to shard 1" + ); + } +}