diff --git a/Cargo.lock b/Cargo.lock index b75da00..9ae2835 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -532,6 +532,8 @@ version = "0.6.1" dependencies = [ "async-trait", "chrono", + "indexmap", + "pg_introspect", "pg_query", "regex", "rusqlite", @@ -1528,6 +1530,19 @@ dependencies = [ "indexmap", ] +[[package]] +name = "pg_introspect" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f7c26d6c97143ecd713d32e06aaadc9aa37b0df5be8cadcb64241a33a824dfb" +dependencies = [ + "indexmap", + "serde", + "sqlx", + "thiserror 2.0.18", + "tracing", +] + [[package]] name = "pg_query" version = "6.1.1" @@ -2791,9 +2806,9 @@ dependencies = [ [[package]] name = "tower-http" -version = "0.6.9" +version = "0.6.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a28f0d049ccfaa566e14e9663d304d8577427b368cb4710a20528690287a738b" +checksum = "68d6fdd9f81c2819c9a8b0e0cd91660e7746a8e6ea2ba7c6b2b057985f6bcb51" dependencies = [ "bitflags", "bytes", diff --git a/Cargo.toml b/Cargo.toml index 65a1012..5f3b066 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,6 +13,7 @@ dbg_macro = "deny" [workspace.dependencies] dry_run_core = { path = "crates/dry_run_core" } +pg_introspect = "0.2.0" async-trait = "0.1" chrono = { version = "0.4", features = ["serde"] } clap = { version = "4", features = ["derive", "env"] } diff --git a/crates/dry_run_cli/src/mcp/server.rs b/crates/dry_run_cli/src/mcp/server.rs index c89fa64..956a9d0 100644 --- a/crates/dry_run_cli/src/mcp/server.rs +++ b/crates/dry_run_cli/src/mcp/server.rs @@ -1554,7 +1554,11 @@ impl DryRunServer { annotated.schema.tables.len(), annotated.schema.views.len(), annotated.schema.functions.len(), - if annotated.planner.is_some() { "yes" } else { "no" }, + if annotated.planner.is_some() { + "yes" + } else { + "no" + }, annotated.activity_by_node.len(), ); *self.schema.write().await = Some(annotated); diff --git a/crates/dry_run_core/Cargo.toml b/crates/dry_run_core/Cargo.toml index 6a3330f..44595f6 100644 --- a/crates/dry_run_core/Cargo.toml +++ b/crates/dry_run_core/Cargo.toml @@ -6,6 +6,7 @@ edition.workspace = true [dependencies] async-trait = { workspace = true } chrono = { workspace = true } +pg_introspect = { workspace = true } pg_query = { workspace = true } regex = { workspace = true } rusqlite = { workspace = true } @@ -20,5 +21,6 @@ tracing = { workspace = true } zstd = { workspace = true } [dev-dependencies] +indexmap = "2" tokio = { workspace = true } tempfile = "3" diff --git a/crates/dry_run_core/src/schema/from_pg_introspect.rs b/crates/dry_run_core/src/schema/from_pg_introspect.rs new file mode 100644 index 0000000..2a6f71a --- /dev/null +++ b/crates/dry_run_core/src/schema/from_pg_introspect.rs @@ -0,0 +1,323 @@ +use pg_introspect::{ + Catalog as PgCatalog, CheckConstraint as PgCheck, Column as PgColumn, + CompositeType as PgComposite, DomainType as PgDomain, EnumType as PgEnum, + ExclusionConstraint as PgExclusion, Extension as PgExtension, ForeignKey as PgFk, + Function as PgFunction, GeneratedKind, IdentityKind, Index as PgIndex, + PartitionChild as PgPartChild, PartitionInfo as PgPartInfo, PartitionStrategy as PgPartStrat, + PolicyCommand, PrimaryKey as PgPrimaryKey, RlsPolicy as PgPolicy, Table as PgTable, + Trigger as PgTrigger, UniqueConstraint as PgUnique, View as PgView, ViewKind, + Volatility as PgVol, +}; + +use super::types::*; + +// envelope (pg_version, database, gucs, content_hash, ...) is the caller's job +pub fn catalog_to_snapshot_parts(cat: PgCatalog) -> SnapshotParts { + let mut out = SnapshotParts::default(); + + for (_, t) in cat.tables { + out.tables.push(convert_table(t)); + } + for (_, v) in cat.views { + out.views.push(convert_view(v)); + } + for e in cat.enums { + out.enums.push(convert_enum(e)); + } + for d in cat.domains { + out.domains.push(convert_domain(d)); + } + for c in cat.composites { + out.composites.push(convert_composite(c)); + } + for f in cat.functions { + out.functions.push(convert_function(f)); + } + for e in cat.extensions { + out.extensions.push(convert_extension(e)); + } + + out +} + +#[derive(Default)] +pub struct SnapshotParts { + pub tables: Vec, + pub enums: Vec, + pub domains: Vec, + pub composites: Vec, + pub views: Vec, + pub functions: Vec, + pub extensions: Vec, +} + +fn convert_table(t: PgTable) -> Table { + let mut constraints: Vec = Vec::new(); + if let Some(pk) = t.primary_key { + constraints.push(convert_pk(pk)); + } + for fk in t.foreign_keys { + constraints.push(convert_fk(fk)); + } + for u in t.unique_constraints { + constraints.push(convert_unique(u)); + } + for c in t.check_constraints { + constraints.push(convert_check(c)); + } + for x in t.exclusion_constraints { + constraints.push(convert_exclusion(x)); + } + // match the old ORDER BY conname so content_hash stays stable + constraints.sort_by(|a, b| a.name.cmp(&b.name)); + + let mut cols: Vec = Vec::with_capacity(t.columns.len()); + for (_, c) in t.columns { + cols.push(convert_column(c)); + } + + Table { + oid: t.oid, + schema: t.name.schema, + name: t.name.name, + columns: cols, + constraints, + indexes: t.indexes.into_iter().map(convert_index).collect(), + comment: t.comment, + partition_info: t.partition_info.map(convert_partition_info), + policies: t.policies.into_iter().map(convert_policy).collect(), + triggers: t.triggers.into_iter().map(convert_trigger).collect(), + reloptions: t.reloptions, + rls_enabled: t.rls_enabled, + } +} + +fn convert_column(c: PgColumn) -> Column { + // dryrun keeps these as the raw pg_attribute char codes + let identity = c.identity.map(|k| match k { + IdentityKind::Always => "a", + IdentityKind::ByDefault => "d", + }); + let generated = c.generated.map(|g| match g { + GeneratedKind::Stored => "s", + GeneratedKind::Virtual => "v", + }); + + Column { + name: c.name, + ordinal: c.ordinal, + type_name: c.type_name, + nullable: c.is_nullable, + default: c.default, + identity: identity.map(String::from), + generated: generated.map(String::from), + comment: c.comment, + statistics_target: c.statistics_target, + } +} + +fn convert_pk(pk: PgPrimaryKey) -> Constraint { + Constraint { + name: pk.name, + kind: ConstraintKind::PrimaryKey, + columns: pk.columns, + definition: Some(pk.definition), + fk_table: None, + fk_columns: vec![], + backing_index: None, + comment: None, + } +} + +fn convert_fk(fk: PgFk) -> Constraint { + let target = format!("{}.{}", fk.references.schema, fk.references.name); + Constraint { + name: fk.constraint_name, + kind: ConstraintKind::ForeignKey, + columns: fk.columns, + definition: Some(fk.definition), + fk_table: Some(target), + fk_columns: fk.references_columns, + backing_index: None, + comment: None, + } +} + +fn convert_unique(u: PgUnique) -> Constraint { + Constraint { + name: u.name, + kind: ConstraintKind::Unique, + columns: u.columns, + definition: Some(u.definition), + fk_table: None, + fk_columns: vec![], + backing_index: Some(u.index_name), + comment: None, + } +} + +fn convert_check(c: PgCheck) -> Constraint { + Constraint { + name: c.name, + kind: ConstraintKind::Check, + columns: c.columns, + definition: Some(c.definition), + fk_table: None, + fk_columns: vec![], + backing_index: None, + comment: None, + } +} + +fn convert_exclusion(x: PgExclusion) -> Constraint { + Constraint { + name: x.name, + kind: ConstraintKind::Exclusion, + columns: x.columns, + definition: Some(x.definition), + fk_table: None, + fk_columns: vec![], + backing_index: Some(x.index_name), + comment: None, + } +} + +fn convert_index(i: PgIndex) -> Index { + Index { + name: i.name, + columns: i.columns, + include_columns: i.included_columns, + index_type: i.method, + is_unique: i.is_unique, + is_primary: i.is_primary, + predicate: i.predicate, + definition: i.definition, + is_valid: i.is_valid, + backs_constraint: i.backs_constraint, + } +} + +fn convert_partition_info(p: PgPartInfo) -> PartitionInfo { + PartitionInfo { + strategy: match p.strategy { + PgPartStrat::Range => PartitionStrategy::Range, + PgPartStrat::List => PartitionStrategy::List, + PgPartStrat::Hash => PartitionStrategy::Hash, + }, + key: p.key, + children: p + .children + .into_iter() + .map(convert_partition_child) + .collect(), + } +} + +fn convert_partition_child(c: PgPartChild) -> PartitionChild { + PartitionChild { + schema: c.name.schema, + name: c.name.name, + bound: c.bound, + } +} + +fn convert_policy(p: PgPolicy) -> RlsPolicy { + let cmd = match p.command { + PolicyCommand::All => "ALL", + PolicyCommand::Select => "SELECT", + PolicyCommand::Insert => "INSERT", + PolicyCommand::Update => "UPDATE", + PolicyCommand::Delete => "DELETE", + }; + RlsPolicy { + name: p.name, + command: cmd.to_string(), + permissive: p.permissive, + roles: p.roles, + using_expr: p.using_expr, + with_check_expr: p.with_check_expr, + } +} + +fn convert_trigger(t: PgTrigger) -> Trigger { + // pg_introspect carries timing/events/orientation separately, but dryrun + // only stores the rendered definition. drop the rest for now. + Trigger { + name: t.name, + definition: t.definition, + } +} + +fn convert_view(v: PgView) -> View { + View { + schema: v.name.schema, + name: v.name.name, + definition: v.definition, + is_materialized: matches!(v.kind, ViewKind::Materialized), + comment: v.comment, + } +} + +fn convert_enum(e: PgEnum) -> EnumType { + EnumType { + schema: e.name.schema, + name: e.name.name, + labels: e.labels, + } +} + +fn convert_domain(d: PgDomain) -> DomainType { + DomainType { + schema: d.name.schema, + name: d.name.name, + base_type: d.base_type, + nullable: d.is_nullable, + default: d.default, + check_constraints: d.constraints, + } +} + +fn convert_composite(c: PgComposite) -> CompositeType { + let mut fields: Vec = Vec::with_capacity(c.attributes.len()); + for a in c.attributes { + fields.push(CompositeField { + name: a.name, + type_name: a.type_name, + }); + } + CompositeType { + schema: c.name.schema, + name: c.name.name, + fields, + } +} + +fn convert_function(f: PgFunction) -> Function { + let volatility = match f.volatility { + PgVol::Immutable => Volatility::Immutable, + PgVol::Stable => Volatility::Stable, + PgVol::Volatile => Volatility::Volatile, + }; + Function { + schema: f.name.schema, + name: f.name.name, + identity_args: f.identity_arguments, + return_type: f.return_type, + language: f.language, + volatility, + security_definer: f.security_definer, + comment: f.comment, + } +} + +fn convert_extension(e: PgExtension) -> Extension { + Extension { + name: e.name, + version: e.version, + schema: e.schema, + } +} + +#[cfg(test)] +#[path = "from_pg_introspect_tests.rs"] +mod tests; diff --git a/crates/dry_run_core/src/schema/from_pg_introspect_tests.rs b/crates/dry_run_core/src/schema/from_pg_introspect_tests.rs new file mode 100644 index 0000000..53552d8 --- /dev/null +++ b/crates/dry_run_core/src/schema/from_pg_introspect_tests.rs @@ -0,0 +1,420 @@ +use indexmap::IndexMap; +use pg_introspect::{ + Catalog as PgCatalog, CheckConstraint as PgCheck, Column as PgColumn, + CompositeAttribute as PgCompAttr, CompositeType as PgComposite, DomainType as PgDomain, + EnumType as PgEnum, ExclusionConstraint as PgExclusion, Extension as PgExtension, FkAction, + FkMatch, ForeignKey as PgFk, Function as PgFunction, FunctionKind, GeneratedKind, IdentityKind, + Index as PgIndex, PartitionChild as PgPartChild, PartitionInfo as PgPartInfo, + PartitionStrategy as PgPartStrat, PolicyCommand, PrimaryKey as PgPrimaryKey, QualifiedName, + RlsPolicy as PgPolicy, Table as PgTable, Trigger as PgTrigger, TriggerEvent, + TriggerOrientation, TriggerTiming, UniqueConstraint as PgUnique, View as PgView, ViewKind, + Volatility as PgVol, +}; + +use super::super::hash::{HashInput, compute_content_hash}; +use super::super::types::{ConstraintKind, PartitionStrategy, Volatility}; +use super::*; + +fn qn(schema: &str, name: &str) -> QualifiedName { + QualifiedName { + schema: schema.into(), + name: name.into(), + } +} + +fn col(name: &str, ordinal: i16, type_name: &str) -> PgColumn { + PgColumn { + name: name.into(), + type_name: type_name.into(), + ordinal, + is_nullable: false, + is_primary_key: false, + is_foreign_key: false, + is_unique: false, + identity: None, + generated: None, + statistics_target: None, + default: None, + comment: None, + } +} + +// ── enum / variant mappings ─────────────────────────────────────────────── + +#[test] +fn identity_kind_maps_to_pg_attribute_codes() { + let cases: &[(IdentityKind, &str)] = + &[(IdentityKind::Always, "a"), (IdentityKind::ByDefault, "d")]; + for (kind, expected) in cases { + let mut c = col("c", 1, "int"); + c.identity = Some(*kind); + assert_eq!(convert_column(c).identity.as_deref(), Some(*expected)); + } +} + +#[test] +fn generated_kind_maps_to_pg_attribute_codes() { + let cases: &[(GeneratedKind, &str)] = + &[(GeneratedKind::Stored, "s"), (GeneratedKind::Virtual, "v")]; + for (kind, expected) in cases { + let mut c = col("c", 1, "int"); + c.generated = Some(*kind); + assert_eq!(convert_column(c).generated.as_deref(), Some(*expected)); + } +} + +#[test] +fn column_without_identity_or_generated_stays_none() { + let c = convert_column(col("c", 1, "int")); + assert!(c.identity.is_none()); + assert!(c.generated.is_none()); +} + +#[test] +fn policy_command_maps_to_uppercase_strings() { + let cases: &[(PolicyCommand, &str)] = &[ + (PolicyCommand::All, "ALL"), + (PolicyCommand::Select, "SELECT"), + (PolicyCommand::Insert, "INSERT"), + (PolicyCommand::Update, "UPDATE"), + (PolicyCommand::Delete, "DELETE"), + ]; + for (cmd, expected) in cases { + let p = PgPolicy { + name: "p".into(), + command: *cmd, + permissive: true, + roles: vec!["public".into()], + using_expr: None, + with_check_expr: None, + }; + assert_eq!(convert_policy(p).command, *expected); + } +} + +#[test] +fn volatility_maps_to_internal_enum() { + let cases: &[(PgVol, Volatility)] = &[ + (PgVol::Immutable, Volatility::Immutable), + (PgVol::Stable, Volatility::Stable), + (PgVol::Volatile, Volatility::Volatile), + ]; + for (pg_vol, expected) in cases { + let f = PgFunction { + name: qn("public", "f"), + kind: FunctionKind::Function, + language: "sql".into(), + volatility: *pg_vol, + security_definer: false, + arguments: String::new(), + identity_arguments: String::new(), + return_type: "int".into(), + comment: None, + }; + assert_eq!(convert_function(f).volatility, *expected); + } +} + +#[test] +fn partition_strategy_maps_to_internal_enum() { + let cases: &[(PgPartStrat, PartitionStrategy)] = &[ + (PgPartStrat::Range, PartitionStrategy::Range), + (PgPartStrat::List, PartitionStrategy::List), + (PgPartStrat::Hash, PartitionStrategy::Hash), + ]; + for (pg_strat, expected) in cases { + let p = PgPartInfo { + strategy: *pg_strat, + key: "k".into(), + children: vec![], + }; + assert_eq!(convert_partition_info(p).strategy, *expected); + } +} + +#[test] +fn view_kind_materialized_sets_flag() { + let mat = PgView { + oid: 1, + name: qn("public", "v"), + kind: ViewKind::Materialized, + columns: IndexMap::new(), + definition: "SELECT 1".into(), + is_updatable: false, + comment: None, + }; + assert!(convert_view(mat).is_materialized); + + let plain = PgView { + oid: 1, + name: qn("public", "v"), + kind: ViewKind::View, + columns: IndexMap::new(), + definition: "SELECT 1".into(), + is_updatable: false, + comment: None, + }; + assert!(!convert_view(plain).is_materialized); +} + +// ── golden fixture catalog ──────────────────────────────────────────────── + +fn fixture_catalog() -> PgCatalog { + let mut columns = IndexMap::new(); + let mut id_col = col("id", 1, "int8"); + id_col.identity = Some(IdentityKind::Always); + columns.insert("id".into(), id_col); + + let mut amount = col("amount", 2, "numeric"); + amount.is_nullable = true; + columns.insert("amount".into(), amount); + + let mut full_name = col("full_name", 3, "text"); + full_name.generated = Some(GeneratedKind::Stored); + full_name.default = Some("''".into()); + columns.insert("full_name".into(), full_name); + + let table = PgTable { + oid: 16384, + name: qn("public", "orders"), + columns, + primary_key: Some(PgPrimaryKey { + name: "orders_pkey".into(), + columns: vec!["id".into()], + definition: "PRIMARY KEY (id)".into(), + }), + foreign_keys: vec![PgFk { + constraint_name: "orders_customer_fk".into(), + columns: vec!["customer_id".into()], + references: qn("public", "customers"), + references_columns: vec!["id".into()], + is_validated: true, + is_enforced: true, + is_deferrable: false, + is_deferred: false, + on_update: FkAction::NoAction, + on_delete: FkAction::Cascade, + match_type: FkMatch::Simple, + definition: "FOREIGN KEY (customer_id) REFERENCES public.customers(id) ON DELETE CASCADE".into(), + }], + indexes: vec![PgIndex { + name: "orders_pkey".into(), + columns: vec!["id".into()], + included_columns: vec![], + is_unique: true, + is_primary: true, + is_partial: false, + predicate: None, + method: "btree".into(), + definition: "CREATE UNIQUE INDEX orders_pkey ON public.orders (id)".into(), + is_valid: true, + backs_constraint: true, + }], + unique_constraints: vec![PgUnique { + name: "orders_external_id_key".into(), + columns: vec!["external_id".into()], + index_name: "orders_external_id_key".into(), + is_validated: true, + is_deferrable: false, + is_deferred: false, + nulls_not_distinct: false, + definition: "UNIQUE (external_id)".into(), + }], + exclusion_constraints: vec![PgExclusion { + name: "orders_no_overlap".into(), + columns: vec!["during".into()], + index_name: "orders_no_overlap".into(), + definition: "EXCLUDE USING gist (during WITH &&)".into(), + }], + check_constraints: vec![PgCheck { + name: "orders_amount_check".into(), + definition: "CHECK ((amount > 0))".into(), + columns: vec!["amount".into()], + is_no_inherit: false, + }], + not_null_constraints: vec![], + comment: Some("order rows".into()), + is_partitioned: true, + is_partition: false, + partition_info: Some(PgPartInfo { + strategy: PgPartStrat::Range, + key: "RANGE (created_at)".into(), + children: vec![PgPartChild { + name: qn("public", "orders_2026"), + bound: "FOR VALUES FROM ('2026-01-01') TO ('2027-01-01')".into(), + }], + }), + reloptions: vec!["fillfactor=80".into()], + rls_enabled: true, + policies: vec![PgPolicy { + name: "orders_owner".into(), + command: PolicyCommand::Select, + permissive: true, + roles: vec!["app".into()], + using_expr: Some("(owner = current_user)".into()), + with_check_expr: None, + }], + triggers: vec![PgTrigger { + name: "orders_audit".into(), + timing: TriggerTiming::After, + events: vec![TriggerEvent::Insert], + orientation: TriggerOrientation::Row, + is_constraint: false, + is_enabled: true, + function: qn("public", "audit_log"), + definition: "CREATE TRIGGER orders_audit AFTER INSERT ON public.orders FOR EACH ROW EXECUTE FUNCTION public.audit_log()".into(), + }], + }; + + let mat_view = PgView { + oid: 16500, + name: qn("public", "orders_summary"), + kind: ViewKind::Materialized, + columns: IndexMap::new(), + definition: "SELECT count(*) FROM orders".into(), + is_updatable: false, + comment: None, + }; + + let mut tables = IndexMap::new(); + tables.insert(table.name.clone(), table); + let mut views = IndexMap::new(); + views.insert(mat_view.name.clone(), mat_view); + + PgCatalog { + tables, + views, + partition_roots: Default::default(), + dependencies: vec![], + extensions: vec![PgExtension { + name: "pgcrypto".into(), + schema: "public".into(), + version: "1.3".into(), + }], + functions: vec![PgFunction { + name: qn("public", "audit_log"), + kind: FunctionKind::Function, + language: "plpgsql".into(), + volatility: PgVol::Volatile, + security_definer: true, + arguments: String::new(), + identity_arguments: String::new(), + return_type: "trigger".into(), + comment: Some("audit trigger".into()), + }], + enums: vec![PgEnum { + name: qn("public", "order_status"), + labels: vec!["new".into(), "shipped".into()], + }], + domains: vec![PgDomain { + name: qn("public", "positive_amount"), + base_type: "numeric".into(), + is_nullable: false, + default: None, + constraints: vec!["CHECK (VALUE > 0)".into()], + }], + composites: vec![PgComposite { + name: qn("public", "address"), + attributes: vec![ + PgCompAttr { + name: "street".into(), + type_name: "text".into(), + }, + PgCompAttr { + name: "zip".into(), + type_name: "text".into(), + }, + ], + }], + } +} + +#[test] +fn fixture_catalog_converts_to_expected_snapshot_parts() { + let parts = catalog_to_snapshot_parts(fixture_catalog()); + + assert_eq!(parts.tables.len(), 1); + let t = &parts.tables[0]; + assert_eq!(t.schema, "public"); + assert_eq!(t.name, "orders"); + assert_eq!(t.oid, 16384); + assert_eq!(t.columns.len(), 3); + assert_eq!(t.columns[0].identity.as_deref(), Some("a")); + assert_eq!(t.columns[2].generated.as_deref(), Some("s")); + assert!(t.rls_enabled); + assert_eq!(t.reloptions, vec!["fillfactor=80".to_string()]); + + // PK + FK + unique + check + exclusion, sorted by name (matches old ORDER BY conname) + assert_eq!(t.constraints.len(), 5); + let names: Vec<&str> = t.constraints.iter().map(|c| c.name.as_str()).collect(); + let mut sorted = names.clone(); + sorted.sort(); + assert_eq!(names, sorted, "constraints must be sorted by name"); + + let fk = t + .constraints + .iter() + .find(|c| c.kind == ConstraintKind::ForeignKey) + .expect("fk present"); + assert_eq!(fk.fk_table.as_deref(), Some("public.customers")); + assert_eq!(fk.fk_columns, vec!["id".to_string()]); + + let unique = t + .constraints + .iter() + .find(|c| c.kind == ConstraintKind::Unique) + .expect("unique present"); + assert_eq!( + unique.backing_index.as_deref(), + Some("orders_external_id_key") + ); + + let p = t.partition_info.as_ref().expect("partition info"); + assert_eq!(p.strategy, PartitionStrategy::Range); + assert_eq!(p.children.len(), 1); + assert_eq!(p.children[0].schema, "public"); + assert_eq!(p.children[0].name, "orders_2026"); + + assert_eq!(t.policies.len(), 1); + assert_eq!(t.policies[0].command, "SELECT"); + assert_eq!(t.triggers.len(), 1); + assert_eq!(t.triggers[0].name, "orders_audit"); + + assert_eq!(parts.views.len(), 1); + assert!(parts.views[0].is_materialized); + + assert_eq!(parts.enums.len(), 1); + assert_eq!(parts.enums[0].labels, vec!["new", "shipped"]); + assert_eq!(parts.domains.len(), 1); + assert_eq!(parts.domains[0].check_constraints.len(), 1); + assert_eq!(parts.composites.len(), 1); + assert_eq!(parts.composites[0].fields.len(), 2); + assert_eq!(parts.functions.len(), 1); + assert_eq!(parts.functions[0].volatility, Volatility::Volatile); + assert!(parts.functions[0].security_definer); + assert_eq!(parts.extensions.len(), 1); + assert_eq!(parts.extensions[0].name, "pgcrypto"); +} + +// guards against silent regressions in field ordering, default values, or +// upstream pg_introspect changes that would invalidate snapshots stored in +// users' history.db. update EXPECTED only on intentional snapshot-format changes. +#[test] +fn fixture_catalog_content_hash_is_stable() { + let parts = catalog_to_snapshot_parts(fixture_catalog()); + let hash = compute_content_hash(&HashInput { + pg_version: "PostgreSQL 17.0", + tables: &parts.tables, + enums: &parts.enums, + domains: &parts.domains, + composites: &parts.composites, + views: &parts.views, + functions: &parts.functions, + extensions: &parts.extensions, + }); + const EXPECTED: &str = "ef118e31e0004baa508665111e32a9c2da964b60b24a900a6a1c654629d32fd6"; + assert_eq!( + hash, EXPECTED, + "content_hash drifted; if intentional, update EXPECTED" + ); +} diff --git a/crates/dry_run_core/src/schema/introspect/catalog.rs b/crates/dry_run_core/src/schema/introspect/catalog.rs deleted file mode 100644 index cc0800b..0000000 --- a/crates/dry_run_core/src/schema/introspect/catalog.rs +++ /dev/null @@ -1,121 +0,0 @@ -use std::collections::HashMap; - -use sqlx::postgres::PgRow; -use sqlx::{PgPool, Row}; - -use super::super::types::*; -use crate::error::Result; - -pub(super) async fn fetch_enums(pool: &PgPool) -> Result> { - let rows: Vec = sqlx::query( - r#" - SELECT n.nspname AS schema_name, - t.typname AS type_name, - (SELECT array_agg(e.enumlabel ORDER BY e.enumsortorder) - FROM pg_catalog.pg_enum e - WHERE e.enumtypid = t.oid - ) AS labels - FROM pg_catalog.pg_type t - JOIN pg_catalog.pg_namespace n ON n.oid = t.typnamespace - WHERE t.typtype = 'e' - AND n.nspname NOT IN ('pg_catalog', 'information_schema') - ORDER BY n.nspname, t.typname - "#, - ) - .fetch_all(pool) - .await?; - - Ok(rows - .iter() - .map(|r| EnumType { - schema: r.get("schema_name"), - name: r.get("type_name"), - labels: r - .get::>, _>("labels") - .unwrap_or_default(), - }) - .collect()) -} - -pub(super) async fn fetch_domains(pool: &PgPool) -> Result> { - let rows: Vec = sqlx::query( - r#" - SELECT n.nspname AS schema_name, - t.typname AS type_name, - pg_catalog.format_type(t.typbasetype, t.typtypmod) AS base_type, - t.typnotnull AS notnull, - pg_catalog.pg_get_expr(t.typdefaultbin, 0) AS default_expr, - (SELECT array_agg(pg_catalog.pg_get_constraintdef(con.oid) ORDER BY con.conname) - FROM pg_catalog.pg_constraint con - WHERE con.contypid = t.oid - ) AS check_constraints - FROM pg_catalog.pg_type t - JOIN pg_catalog.pg_namespace n ON n.oid = t.typnamespace - WHERE t.typtype = 'd' - AND n.nspname NOT IN ('pg_catalog', 'information_schema') - ORDER BY n.nspname, t.typname - "#, - ) - .fetch_all(pool) - .await?; - - Ok(rows - .iter() - .map(|r| DomainType { - schema: r.get("schema_name"), - name: r.get("type_name"), - base_type: r.get("base_type"), - nullable: !r.get::("notnull"), - default: r.get("default_expr"), - check_constraints: r - .get::>, _>("check_constraints") - .unwrap_or_default(), - }) - .collect()) -} - -pub(super) async fn fetch_composites(pool: &PgPool) -> Result> { - let rows: Vec = sqlx::query( - r#" - SELECT n.nspname AS schema_name, - t.typname AS type_name, - a.attname AS field_name, - pg_catalog.format_type(a.atttypid, a.atttypmod) AS field_type - FROM pg_catalog.pg_type t - JOIN pg_catalog.pg_namespace n ON n.oid = t.typnamespace - JOIN pg_catalog.pg_class c ON c.oid = t.typrelid - JOIN pg_catalog.pg_attribute a ON a.attrelid = c.oid - WHERE t.typtype = 'c' - AND c.relkind = 'c' - AND a.attnum > 0 - AND NOT a.attisdropped - AND n.nspname NOT IN ('pg_catalog', 'information_schema') - ORDER BY n.nspname, t.typname, a.attnum - "#, - ) - .fetch_all(pool) - .await?; - - let mut map: HashMap<(String, String), Vec> = HashMap::new(); - for r in &rows { - let key = ( - r.get::("schema_name"), - r.get::("type_name"), - ); - map.entry(key).or_default().push(CompositeField { - name: r.get("field_name"), - type_name: r.get("field_type"), - }); - } - - let mut composites: Vec = map - .into_iter() - .map(|((schema, name), fields)| CompositeType { - schema, - name, - fields, - }) - .collect(); - composites.sort_by(|a, b| (&a.schema, &a.name).cmp(&(&b.schema, &b.name))); - Ok(composites) -} diff --git a/crates/dry_run_core/src/schema/introspect/comments.rs b/crates/dry_run_core/src/schema/introspect/comments.rs deleted file mode 100644 index a4be1fb..0000000 --- a/crates/dry_run_core/src/schema/introspect/comments.rs +++ /dev/null @@ -1,62 +0,0 @@ -use sqlx::postgres::PgRow; -use sqlx::{PgPool, Row}; - -use crate::error::Result; - -use super::raw_types::*; - -pub(super) async fn fetch_table_comments(pool: &PgPool) -> Result> { - let rows: Vec = sqlx::query( - r#" - SELECT d.objoid::int4 AS table_oid, - d.description AS comment - FROM pg_catalog.pg_description d - JOIN pg_catalog.pg_class c ON c.oid = d.objoid - JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace - WHERE d.objsubid = 0 - AND c.relkind IN ('r', 'p') - AND n.nspname NOT IN ('pg_catalog', 'information_schema', 'pg_toast') - AND n.nspname NOT LIKE 'pg_temp_%' - "#, - ) - .fetch_all(pool) - .await?; - - Ok(rows - .iter() - .map(|r| RawTableComment { - table_oid: r.get::("table_oid") as u32, - comment: r.get("comment"), - }) - .collect()) -} - -pub(super) async fn fetch_column_comments(pool: &PgPool) -> Result> { - let rows: Vec = sqlx::query( - r#" - SELECT d.objoid::int4 AS table_oid, - a.attname AS column_name, - d.description AS comment - FROM pg_catalog.pg_description d - JOIN pg_catalog.pg_class c ON c.oid = d.objoid - JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace - JOIN pg_catalog.pg_attribute a - ON a.attrelid = d.objoid AND a.attnum = d.objsubid - WHERE d.objsubid > 0 - AND c.relkind IN ('r', 'p') - AND n.nspname NOT IN ('pg_catalog', 'information_schema', 'pg_toast') - AND n.nspname NOT LIKE 'pg_temp_%' - "#, - ) - .fetch_all(pool) - .await?; - - Ok(rows - .iter() - .map(|r| RawColumnComment { - table_oid: r.get::("table_oid") as u32, - column_name: r.get("column_name"), - comment: r.get("comment"), - }) - .collect()) -} diff --git a/crates/dry_run_core/src/schema/introspect/indexes.rs b/crates/dry_run_core/src/schema/introspect/indexes.rs deleted file mode 100644 index 1188e93..0000000 --- a/crates/dry_run_core/src/schema/introspect/indexes.rs +++ /dev/null @@ -1,78 +0,0 @@ -use sqlx::postgres::PgRow; -use sqlx::{PgPool, Row}; - -use crate::error::Result; - -use super::raw_types::*; - -pub(super) async fn fetch_indexes(pool: &PgPool) -> Result> { - let rows: Vec = sqlx::query( - r#" - SELECT i.indrelid::int4 AS table_oid, - ci.relname AS index_name, - am.amname AS index_type, - i.indisunique AS is_unique, - i.indisprimary AS is_primary, - pg_catalog.pg_get_expr(i.indpred, i.indrelid) AS predicate, - pg_catalog.pg_get_indexdef(i.indexrelid) AS definition, - i.indisvalid AS is_valid, - i.indnkeyatts AS n_key_atts, - -- check when index backs a UNIQUE/PK/EXCLUSION constraint - EXISTS ( - SELECT 1 FROM pg_catalog.pg_constraint con - WHERE con.conindid = i.indexrelid - ) AS backs_constraint, - -- All column names (key + include) - (SELECT array_agg(a.attname ORDER BY ord.n) - FROM unnest(i.indkey) WITH ORDINALITY AS ord(attnum, n) - JOIN pg_catalog.pg_attribute a - ON a.attrelid = i.indrelid AND a.attnum = ord.attnum - WHERE ord.attnum > 0 - ) AS all_col_names, - array_length(i.indkey, 1) AS total_cols - FROM pg_catalog.pg_index i - JOIN pg_catalog.pg_class ci ON ci.oid = i.indexrelid - JOIN pg_catalog.pg_class ct ON ct.oid = i.indrelid - JOIN pg_catalog.pg_namespace n ON n.oid = ct.relnamespace - JOIN pg_catalog.pg_am am ON am.oid = ci.relam - WHERE n.nspname NOT IN ('pg_catalog', 'information_schema', 'pg_toast') - AND n.nspname NOT LIKE 'pg_temp_%' - AND NOT EXISTS (SELECT 1 FROM pg_inherits inh WHERE inh.inhrelid = i.indexrelid) - ORDER BY i.indrelid, ci.relname - "#, - ) - .fetch_all(pool) - .await?; - - Ok(rows - .iter() - .map(|r| { - let all_cols: Vec = r - .get::>, _>("all_col_names") - .unwrap_or_default(); - let n_key_atts = r.get::("n_key_atts") as usize; - let (key_cols, include_cols) = if n_key_atts > 0 && n_key_atts <= all_cols.len() { - ( - all_cols[..n_key_atts].to_vec(), - all_cols[n_key_atts..].to_vec(), - ) - } else { - (all_cols, vec![]) - }; - - RawIndex { - table_oid: r.get::("table_oid") as u32, - name: r.get("index_name"), - columns: key_cols, - include_columns: include_cols, - index_type: r.get("index_type"), - is_unique: r.get("is_unique"), - is_primary: r.get("is_primary"), - predicate: r.get("predicate"), - definition: r.get("definition"), - is_valid: r.get("is_valid"), - backs_constraint: r.get("backs_constraint"), - } - }) - .collect()) -} diff --git a/crates/dry_run_core/src/schema/introspect/mod.rs b/crates/dry_run_core/src/schema/introspect/mod.rs index 599853c..7a8b44e 100644 --- a/crates/dry_run_core/src/schema/introspect/mod.rs +++ b/crates/dry_run_core/src/schema/introspect/mod.rs @@ -1,22 +1,13 @@ -mod catalog; -mod comments; -mod indexes; -mod objects; -mod partitions; -mod policies; -mod raw_types; mod stats; -mod tables; - -use std::collections::HashMap; use chrono::Utc; +use pg_introspect::IntrospectOptions; +use sha2::{Digest, Sha256}; use sqlx::postgres::PgRow; use sqlx::{PgPool, Row}; use tracing::info; -use sha2::{Digest, Sha256}; - +use super::from_pg_introspect::catalog_to_snapshot_parts; use super::hash::{HashInput, compute_content_hash}; use super::snapshot::*; use super::types::*; @@ -26,71 +17,26 @@ pub async fn introspect_schema(pool: &PgPool) -> Result { let pg_version: String = sqlx::query_scalar("SELECT version()") .fetch_one(pool) .await?; - let database: String = sqlx::query_scalar("SELECT current_database()") .fetch_one(pool) .await?; - // Group 1: table-centric data. Stats now live in PlannerStatsSnapshot / - // ActivityStatsSnapshot; introspect_schema is DDL-only. - let ( - raw_tables, - raw_columns, - raw_constraints, - table_comments, - column_comments, - raw_indexes, - raw_partitions, - raw_partition_children, - raw_policies, - raw_triggers, - ) = tokio::try_join!( - tables::fetch_tables(pool), - tables::fetch_columns(pool), - tables::fetch_constraints(pool), - comments::fetch_table_comments(pool), - comments::fetch_column_comments(pool), - indexes::fetch_indexes(pool), - partitions::fetch_partition_info(pool), - partitions::fetch_partition_children(pool), - policies::fetch_policies(pool), - policies::fetch_triggers(pool), - )?; + let cat = pg_introspect::introspect(pool, &IntrospectOptions::default()) + .await + .map_err(|e| Error::Introspection(format!("pg_introspect: {e}")))?; + let parts = catalog_to_snapshot_parts(cat); - // Group 2: top-level objects. - let (enums, domains, composites, views, functions, extensions, gucs, _is_standby) = tokio::try_join!( - catalog::fetch_enums(pool), - catalog::fetch_domains(pool), - catalog::fetch_composites(pool), - objects::fetch_views(pool), - objects::fetch_functions(pool), - objects::fetch_extensions(pool), - objects::fetch_gucs(pool), - fetch_is_standby(pool), - )?; - - let tables = assemble_tables( - raw_tables, - raw_columns, - raw_constraints, - table_comments, - column_comments, - raw_indexes, - raw_partitions, - raw_partition_children, - raw_policies, - raw_triggers, - ); + let gucs = fetch_gucs(pool).await?; let content_hash = compute_content_hash(&HashInput { pg_version: &pg_version, - tables: &tables, - enums: &enums, - domains: &domains, - composites: &composites, - views: &views, - functions: &functions, - extensions: &extensions, + tables: &parts.tables, + enums: &parts.enums, + domains: &parts.domains, + composites: &parts.composites, + views: &parts.views, + functions: &parts.functions, + extensions: &parts.extensions, }); let snapshot = SchemaSnapshot { @@ -99,13 +45,13 @@ pub async fn introspect_schema(pool: &PgPool) -> Result { timestamp: Utc::now(), content_hash, source: None, - tables, - enums, - domains, - composites, - views, - functions, - extensions, + tables: parts.tables, + enums: parts.enums, + domains: parts.domains, + composites: parts.composites, + views: parts.views, + functions: parts.functions, + extensions: parts.extensions, gucs, }; @@ -124,6 +70,35 @@ pub async fn introspect_schema(pool: &PgPool) -> Result { Ok(snapshot) } +async fn fetch_gucs(pool: &PgPool) -> Result> { + let rows: Vec = sqlx::query( + r#" + SELECT name, setting, unit + FROM pg_catalog.pg_settings + WHERE name IN ( + 'work_mem', 'effective_cache_size', 'random_page_cost', + 'seq_page_cost', 'effective_io_concurrency', 'shared_buffers', + 'maintenance_work_mem', 'default_statistics_target', + 'autovacuum', 'autovacuum_vacuum_threshold', + 'autovacuum_vacuum_scale_factor', 'autovacuum_analyze_threshold', + 'autovacuum_analyze_scale_factor' + ) + ORDER BY name + "#, + ) + .fetch_all(pool) + .await?; + + Ok(rows + .iter() + .map(|r| GucSetting { + name: r.get("name"), + setting: r.get("setting"), + unit: r.get("unit"), + }) + .collect()) +} + pub async fn fetch_is_standby(pool: &PgPool) -> Result { let row: PgRow = sqlx::query("SELECT pg_catalog.pg_is_in_recovery() AS is_standby") .fetch_one(pool) @@ -131,8 +106,6 @@ pub async fn fetch_is_standby(pool: &PgPool) -> Result { Ok(row.get("is_standby")) } -// Snapshot split: planner-only and per-node activity captures - pub async fn introspect_planner_stats( pool: &PgPool, schema_ref_hash: &str, @@ -261,179 +234,6 @@ fn hash_payload(value: &T) -> Result { Ok(format!("{digest:x}")) } -// --------------------------------------------------------------------------- -// Assembly: merge parts into Table structs -// --------------------------------------------------------------------------- - -use raw_types::*; - -#[allow(clippy::too_many_arguments)] -fn assemble_tables( - raw_tables: Vec, - raw_columns: Vec, - raw_constraints: Vec, - table_comments: Vec, - column_comments: Vec, - raw_indexes: Vec, - raw_partitions: Vec, - raw_partition_children: Vec, - raw_policies: Vec, - raw_triggers: Vec, -) -> Vec
{ - // --- Columns --- - let mut columns_by_oid: HashMap> = HashMap::new(); - for rc in raw_columns { - columns_by_oid - .entry(rc.table_oid) - .or_default() - .push(Column { - name: rc.name, - ordinal: rc.ordinal, - type_name: rc.type_name, - nullable: rc.nullable, - default: rc.default, - identity: rc.identity, - generated: rc.generated, - comment: None, - statistics_target: rc.statistics_target, - }); - } - - // --- Constraints --- - let mut constraints_by_oid: HashMap> = HashMap::new(); - for rc in raw_constraints { - let kind = match ConstraintKind::from_pg_contype(&rc.contype) { - Some(k) => k, - None => continue, - }; - constraints_by_oid - .entry(rc.table_oid) - .or_default() - .push(Constraint { - name: rc.name, - kind, - columns: rc.columns, - definition: rc.definition, - fk_table: rc.fk_table, - fk_columns: rc.fk_columns, - backing_index: rc.backing_index, - comment: rc.comment, - }); - } - - // --- Table comments --- - let table_comment_map: HashMap = table_comments - .into_iter() - .map(|tc| (tc.table_oid, tc.comment)) - .collect(); - - // --- Column comments --- - let col_comment_map: HashMap<(u32, String), String> = column_comments - .into_iter() - .map(|cc| ((cc.table_oid, cc.column_name), cc.comment)) - .collect(); - - for (oid, cols) in &mut columns_by_oid { - for col in cols.iter_mut() { - if let Some(comment) = col_comment_map.get(&(*oid, col.name.clone())) { - col.comment = Some(comment.clone()); - } - } - } - - // --- Indexes --- - let mut indexes_by_oid: HashMap> = HashMap::new(); - for ri in raw_indexes { - indexes_by_oid.entry(ri.table_oid).or_default().push(Index { - name: ri.name, - columns: ri.columns, - include_columns: ri.include_columns, - index_type: ri.index_type, - is_unique: ri.is_unique, - is_primary: ri.is_primary, - predicate: ri.predicate, - definition: ri.definition, - is_valid: ri.is_valid, - backs_constraint: ri.backs_constraint, - }); - } - - // --- Partition info --- - let mut children_by_parent: HashMap> = HashMap::new(); - for pc in raw_partition_children { - children_by_parent - .entry(pc.parent_oid) - .or_default() - .push(PartitionChild { - schema: pc.schema, - name: pc.name, - bound: pc.bound, - }); - } - - let partition_info_by_oid: HashMap = raw_partitions - .into_iter() - .filter_map(|rp| { - let strategy = PartitionStrategy::from_pg_partstrat(&rp.strategy)?; - Some(( - rp.table_oid, - PartitionInfo { - strategy, - key: rp.key, - children: children_by_parent.remove(&rp.table_oid).unwrap_or_default(), - }, - )) - }) - .collect(); - - // --- Policies --- - let mut policies_by_oid: HashMap> = HashMap::new(); - for rp in raw_policies { - policies_by_oid - .entry(rp.table_oid) - .or_default() - .push(RlsPolicy { - name: rp.name, - command: rp.command, - permissive: rp.permissive, - roles: rp.roles, - using_expr: rp.using_expr, - with_check_expr: rp.with_check_expr, - }); - } - - // --- Triggers --- - let mut triggers_by_oid: HashMap> = HashMap::new(); - for rt in raw_triggers { - triggers_by_oid - .entry(rt.table_oid) - .or_default() - .push(Trigger { - name: rt.name, - definition: rt.definition, - }); - } - - // --- Assemble --- - raw_tables - .into_iter() - .map(|rt| Table { - oid: rt.oid, - schema: rt.schema, - name: rt.name, - columns: columns_by_oid.remove(&rt.oid).unwrap_or_default(), - constraints: constraints_by_oid.remove(&rt.oid).unwrap_or_default(), - indexes: indexes_by_oid.remove(&rt.oid).unwrap_or_default(), - comment: table_comment_map.get(&rt.oid).cloned(), - partition_info: partition_info_by_oid.get(&rt.oid).cloned(), - policies: policies_by_oid.remove(&rt.oid).unwrap_or_default(), - triggers: triggers_by_oid.remove(&rt.oid).unwrap_or_default(), - reloptions: rt.reloptions, - rls_enabled: rt.rls_enabled, - }) - .collect() -} - #[cfg(test)] mod tests { use chrono::TimeZone; diff --git a/crates/dry_run_core/src/schema/introspect/objects.rs b/crates/dry_run_core/src/schema/introspect/objects.rs deleted file mode 100644 index 0a12676..0000000 --- a/crates/dry_run_core/src/schema/introspect/objects.rs +++ /dev/null @@ -1,137 +0,0 @@ -use sqlx::postgres::PgRow; -use sqlx::{PgPool, Row}; - -use super::super::types::*; -use crate::error::Result; - -pub(super) async fn fetch_views(pool: &PgPool) -> Result> { - let rows: Vec = sqlx::query( - r#" - SELECT n.nspname AS schema_name, - c.relname AS view_name, - c.relkind = 'm' AS is_materialized, - pg_catalog.pg_get_viewdef(c.oid, true) AS definition, - d.description AS comment - FROM pg_catalog.pg_class c - JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace - LEFT JOIN pg_catalog.pg_description d - ON d.objoid = c.oid AND d.objsubid = 0 - WHERE c.relkind IN ('v', 'm') - AND n.nspname NOT IN ('pg_catalog', 'information_schema', 'pg_toast') - AND n.nspname NOT LIKE 'pg_temp_%' - ORDER BY n.nspname, c.relname - "#, - ) - .fetch_all(pool) - .await?; - - Ok(rows - .iter() - .map(|r| View { - schema: r.get("schema_name"), - name: r.get("view_name"), - definition: r.get::, _>("definition").unwrap_or_default(), - is_materialized: r.get("is_materialized"), - comment: r.get("comment"), - }) - .collect()) -} - -pub(super) async fn fetch_functions(pool: &PgPool) -> Result> { - let rows: Vec = sqlx::query( - r#" - SELECT n.nspname AS schema_name, - p.proname AS func_name, - pg_catalog.pg_get_function_identity_arguments(p.oid) AS identity_args, - pg_catalog.pg_get_function_result(p.oid) AS return_type, - l.lanname AS language, - p.provolatile::text AS volatility, - p.prosecdef AS security_definer, - d.description AS comment - FROM pg_catalog.pg_proc p - JOIN pg_catalog.pg_namespace n ON n.oid = p.pronamespace - JOIN pg_catalog.pg_language l ON l.oid = p.prolang - LEFT JOIN pg_catalog.pg_description d - ON d.objoid = p.oid AND d.objsubid = 0 - WHERE p.prokind IN ('f', 'p') - AND n.nspname NOT IN ('pg_catalog', 'information_schema', 'pg_toast') - AND n.nspname NOT LIKE 'pg_temp_%' - ORDER BY n.nspname, p.proname - "#, - ) - .fetch_all(pool) - .await?; - - Ok(rows - .iter() - .map(|r| { - let vol_str: String = r.get("volatility"); - Function { - schema: r.get("schema_name"), - name: r.get("func_name"), - identity_args: r.get("identity_args"), - return_type: r - .get::, _>("return_type") - .unwrap_or_default(), - language: r.get("language"), - volatility: Volatility::from_pg_provolatile(&vol_str) - .unwrap_or(Volatility::Volatile), - security_definer: r.get("security_definer"), - comment: r.get("comment"), - } - }) - .collect()) -} - -pub(super) async fn fetch_extensions(pool: &PgPool) -> Result> { - let rows: Vec = sqlx::query( - r#" - SELECT e.extname AS ext_name, - e.extversion AS ext_version, - n.nspname AS schema_name - FROM pg_catalog.pg_extension e - JOIN pg_catalog.pg_namespace n ON n.oid = e.extnamespace - ORDER BY e.extname - "#, - ) - .fetch_all(pool) - .await?; - - Ok(rows - .iter() - .map(|r| Extension { - name: r.get("ext_name"), - version: r.get("ext_version"), - schema: r.get("schema_name"), - }) - .collect()) -} - -pub(super) async fn fetch_gucs(pool: &PgPool) -> Result> { - let rows: Vec = sqlx::query( - r#" - SELECT name, setting, unit - FROM pg_catalog.pg_settings - WHERE name IN ( - 'work_mem', 'effective_cache_size', 'random_page_cost', - 'seq_page_cost', 'effective_io_concurrency', 'shared_buffers', - 'maintenance_work_mem', 'default_statistics_target', - 'autovacuum', 'autovacuum_vacuum_threshold', - 'autovacuum_vacuum_scale_factor', 'autovacuum_analyze_threshold', - 'autovacuum_analyze_scale_factor' - ) - ORDER BY name - "#, - ) - .fetch_all(pool) - .await?; - - Ok(rows - .iter() - .map(|r| GucSetting { - name: r.get("name"), - setting: r.get("setting"), - unit: r.get("unit"), - }) - .collect()) -} diff --git a/crates/dry_run_core/src/schema/introspect/partitions.rs b/crates/dry_run_core/src/schema/introspect/partitions.rs deleted file mode 100644 index e6e7f64..0000000 --- a/crates/dry_run_core/src/schema/introspect/partitions.rs +++ /dev/null @@ -1,61 +0,0 @@ -use sqlx::postgres::PgRow; -use sqlx::{PgPool, Row}; - -use crate::error::Result; - -use super::raw_types::*; - -pub(super) async fn fetch_partition_info(pool: &PgPool) -> Result> { - let rows: Vec = sqlx::query( - r#" - SELECT pt.partrelid::int4 AS table_oid, - pt.partstrat::text AS strategy, - pg_catalog.pg_get_partkeydef(pt.partrelid) AS part_key - FROM pg_catalog.pg_partitioned_table pt - JOIN pg_catalog.pg_class c ON c.oid = pt.partrelid - JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace - WHERE n.nspname NOT IN ('pg_catalog', 'information_schema', 'pg_toast') - "#, - ) - .fetch_all(pool) - .await?; - - Ok(rows - .iter() - .map(|r| RawPartitionInfo { - table_oid: r.get::("table_oid") as u32, - strategy: r.get("strategy"), - key: r.get("part_key"), - }) - .collect()) -} - -pub(super) async fn fetch_partition_children(pool: &PgPool) -> Result> { - let rows: Vec = sqlx::query( - r#" - SELECT inh.inhparent::int4 AS parent_oid, - n.nspname AS schema_name, - c.relname AS table_name, - pg_catalog.pg_get_expr(c.relpartbound, c.oid) AS bound - FROM pg_catalog.pg_inherits inh - JOIN pg_catalog.pg_class c ON c.oid = inh.inhrelid - JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace - WHERE c.relispartition - AND n.nspname NOT IN ('pg_catalog', 'information_schema', 'pg_toast') - AND n.nspname NOT LIKE 'pg_temp_%' - ORDER BY inh.inhparent, c.relname - "#, - ) - .fetch_all(pool) - .await?; - - Ok(rows - .iter() - .map(|r| RawPartitionChild { - parent_oid: r.get::("parent_oid") as u32, - schema: r.get("schema_name"), - name: r.get("table_name"), - bound: r.get::, _>("bound").unwrap_or_default(), - }) - .collect()) -} diff --git a/crates/dry_run_core/src/schema/introspect/policies.rs b/crates/dry_run_core/src/schema/introspect/policies.rs deleted file mode 100644 index 760a5b1..0000000 --- a/crates/dry_run_core/src/schema/introspect/policies.rs +++ /dev/null @@ -1,78 +0,0 @@ -use sqlx::postgres::PgRow; -use sqlx::{PgPool, Row}; - -use crate::error::Result; - -use super::raw_types::*; - -pub(super) async fn fetch_policies(pool: &PgPool) -> Result> { - let rows: Vec = sqlx::query( - r#" - SELECT pol.polrelid::int4 AS table_oid, - pol.polname AS policy_name, - CASE pol.polcmd - WHEN 'r' THEN 'SELECT' - WHEN 'a' THEN 'INSERT' - WHEN 'w' THEN 'UPDATE' - WHEN 'd' THEN 'DELETE' - WHEN '*' THEN 'ALL' - ELSE pol.polcmd::text - END AS command, - pol.polpermissive AS permissive, - (SELECT array_agg(r.rolname) - FROM unnest(pol.polroles) AS rid(oid) - JOIN pg_catalog.pg_roles r ON r.oid = rid.oid - ) AS roles, - pg_catalog.pg_get_expr(pol.polqual, pol.polrelid) AS using_expr, - pg_catalog.pg_get_expr(pol.polwithcheck, pol.polrelid) AS with_check_expr - FROM pg_catalog.pg_policy pol - JOIN pg_catalog.pg_class c ON c.oid = pol.polrelid - JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace - WHERE n.nspname NOT IN ('pg_catalog', 'information_schema', 'pg_toast') - ORDER BY pol.polrelid, pol.polname - "#, - ) - .fetch_all(pool) - .await?; - - Ok(rows - .iter() - .map(|r| RawPolicy { - table_oid: r.get::("table_oid") as u32, - name: r.get("policy_name"), - command: r.get("command"), - permissive: r.get("permissive"), - roles: r.get::>, _>("roles").unwrap_or_default(), - using_expr: r.get("using_expr"), - with_check_expr: r.get("with_check_expr"), - }) - .collect()) -} - -pub(super) async fn fetch_triggers(pool: &PgPool) -> Result> { - let rows: Vec = sqlx::query( - r#" - SELECT t.tgrelid::int4 AS table_oid, - t.tgname AS trigger_name, - pg_catalog.pg_get_triggerdef(t.oid) AS definition - FROM pg_catalog.pg_trigger t - JOIN pg_catalog.pg_class c ON c.oid = t.tgrelid - JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace - WHERE NOT t.tgisinternal - AND t.tgparentid = 0 - AND n.nspname NOT IN ('pg_catalog', 'information_schema', 'pg_toast') - ORDER BY t.tgrelid, t.tgname - "#, - ) - .fetch_all(pool) - .await?; - - Ok(rows - .iter() - .map(|r| RawTrigger { - table_oid: r.get::("table_oid") as u32, - name: r.get("trigger_name"), - definition: r.get("definition"), - }) - .collect()) -} diff --git a/crates/dry_run_core/src/schema/introspect/raw_types.rs b/crates/dry_run_core/src/schema/introspect/raw_types.rs deleted file mode 100644 index 51b4a20..0000000 --- a/crates/dry_run_core/src/schema/introspect/raw_types.rs +++ /dev/null @@ -1,85 +0,0 @@ -pub(super) struct RawTable { - pub oid: u32, - pub schema: String, - pub name: String, - pub rls_enabled: bool, - pub reloptions: Vec, -} - -pub(super) struct RawColumn { - pub table_oid: u32, - pub name: String, - pub ordinal: i16, - pub type_name: String, - pub nullable: bool, - pub default: Option, - pub identity: Option, - pub generated: Option, - pub statistics_target: Option, -} - -pub(super) struct RawConstraint { - pub table_oid: u32, - pub name: String, - pub contype: String, - pub columns: Vec, - pub definition: Option, - pub fk_table: Option, - pub fk_columns: Vec, - pub backing_index: Option, - pub comment: Option, -} - -pub(super) struct RawTableComment { - pub table_oid: u32, - pub comment: String, -} - -pub(super) struct RawColumnComment { - pub table_oid: u32, - pub column_name: String, - pub comment: String, -} - -pub(super) struct RawIndex { - pub table_oid: u32, - pub name: String, - pub columns: Vec, - pub include_columns: Vec, - pub index_type: String, - pub is_unique: bool, - pub is_primary: bool, - pub predicate: Option, - pub definition: String, - pub is_valid: bool, - pub backs_constraint: bool, -} - -pub(super) struct RawPartitionInfo { - pub table_oid: u32, - pub strategy: String, - pub key: String, -} - -pub(super) struct RawPartitionChild { - pub parent_oid: u32, - pub schema: String, - pub name: String, - pub bound: String, -} - -pub(super) struct RawPolicy { - pub table_oid: u32, - pub name: String, - pub command: String, - pub permissive: bool, - pub roles: Vec, - pub using_expr: Option, - pub with_check_expr: Option, -} - -pub(super) struct RawTrigger { - pub table_oid: u32, - pub name: String, - pub definition: String, -} diff --git a/crates/dry_run_core/src/schema/introspect/tables.rs b/crates/dry_run_core/src/schema/introspect/tables.rs deleted file mode 100644 index 80e1f2a..0000000 --- a/crates/dry_run_core/src/schema/introspect/tables.rs +++ /dev/null @@ -1,150 +0,0 @@ -use sqlx::postgres::PgRow; -use sqlx::{PgPool, Row}; - -use crate::error::Result; - -use super::raw_types::*; - -pub(super) async fn fetch_tables(pool: &PgPool) -> Result> { - let rows: Vec = sqlx::query( - r#" - SELECT c.oid::int4 AS oid, - n.nspname AS schema_name, - c.relname AS table_name, - c.relrowsecurity AS rls_enabled, - COALESCE(c.reloptions, '{}') AS reloptions - FROM pg_catalog.pg_class c - JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace - WHERE c.relkind IN ('r', 'p') - AND n.nspname NOT IN ('pg_catalog', 'information_schema', 'pg_toast') - AND n.nspname NOT LIKE 'pg_temp_%' - ORDER BY n.nspname, c.relname - "#, - ) - .fetch_all(pool) - .await?; - - Ok(rows - .iter() - .map(|r| RawTable { - oid: r.get::("oid") as u32, - schema: r.get("schema_name"), - name: r.get("table_name"), - rls_enabled: r.get("rls_enabled"), - reloptions: r.get::, _>("reloptions"), - }) - .collect()) -} - -pub(super) async fn fetch_columns(pool: &PgPool) -> Result> { - let rows: Vec = sqlx::query( - r#" - SELECT a.attrelid::int4 AS table_oid, - a.attname AS column_name, - a.attnum AS ordinal, - pg_catalog.format_type(a.atttypid, a.atttypmod) AS type_name, - NOT a.attnotnull AS nullable, - pg_catalog.pg_get_expr(d.adbin, d.adrelid) AS default_expr, - CASE a.attidentity - WHEN 'a' THEN 'always' - WHEN 'd' THEN 'by_default' - ELSE NULL - END AS identity, - NULLIF(a.attstattarget, -1)::int2 AS statistics_target, - CASE a.attgenerated - WHEN 's' THEN 'stored' - ELSE NULL - END AS generated - FROM pg_catalog.pg_attribute a - JOIN pg_catalog.pg_class c ON c.oid = a.attrelid - JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace - LEFT JOIN pg_catalog.pg_attrdef d ON d.adrelid = a.attrelid AND d.adnum = a.attnum - WHERE a.attnum > 0 - AND NOT a.attisdropped - AND c.relkind IN ('r', 'p') - AND n.nspname NOT IN ('pg_catalog', 'information_schema', 'pg_toast') - AND n.nspname NOT LIKE 'pg_temp_%' - ORDER BY a.attrelid, a.attnum - "#, - ) - .fetch_all(pool) - .await?; - - Ok(rows - .iter() - .map(|r| RawColumn { - table_oid: r.get::("table_oid") as u32, - name: r.get("column_name"), - ordinal: r.get("ordinal"), - type_name: r.get("type_name"), - nullable: r.get("nullable"), - default: r.get("default_expr"), - identity: r.get("identity"), - generated: r.get("generated"), - statistics_target: r.get("statistics_target"), - }) - .collect()) -} - -pub(super) async fn fetch_constraints(pool: &PgPool) -> Result> { - let rows: Vec = sqlx::query( - r#" - SELECT con.conrelid::int4 AS table_oid, - con.conname AS constraint_name, - con.contype::text AS contype, - pg_catalog.pg_get_constraintdef(con.oid) AS definition, - (SELECT array_agg(a.attname ORDER BY ord.n) - FROM unnest(con.conkey) WITH ORDINALITY AS ord(attnum, n) - JOIN pg_catalog.pg_attribute a - ON a.attrelid = con.conrelid AND a.attnum = ord.attnum - ) AS col_names, - CASE WHEN con.contype = 'f' THEN - (SELECT n2.nspname || '.' || c2.relname - FROM pg_catalog.pg_class c2 - JOIN pg_catalog.pg_namespace n2 ON n2.oid = c2.relnamespace - WHERE c2.oid = con.confrelid) - END AS fk_table, - CASE WHEN con.contype = 'f' THEN - (SELECT array_agg(a.attname ORDER BY ord.n) - FROM unnest(con.confkey) WITH ORDINALITY AS ord(attnum, n) - JOIN pg_catalog.pg_attribute a - ON a.attrelid = con.confrelid AND a.attnum = ord.attnum - ) - END AS fk_col_names, - ci.relname::text AS backing_index, - d.description AS comment - FROM pg_catalog.pg_constraint con - JOIN pg_catalog.pg_class c ON c.oid = con.conrelid - JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace - LEFT JOIN pg_catalog.pg_class ci - ON ci.oid = con.conindid - LEFT JOIN pg_catalog.pg_description d - ON d.objoid = con.oid AND d.objsubid = 0 - WHERE n.nspname NOT IN ('pg_catalog', 'information_schema', 'pg_toast') - AND n.nspname NOT LIKE 'pg_temp_%' - AND con.conislocal - ORDER BY con.conrelid, con.conname - "#, - ) - .fetch_all(pool) - .await?; - - Ok(rows - .iter() - .map(|r| RawConstraint { - table_oid: r.get::("table_oid") as u32, - name: r.get("constraint_name"), - contype: r.get("contype"), - columns: r - .get::>, _>("col_names") - .unwrap_or_default(), - definition: r.get("definition"), - fk_table: r.get("fk_table"), - fk_columns: r - .get::>, _>("fk_col_names") - .unwrap_or_default(), - backing_index: r.get("backing_index"), - comment: r.get("comment"), - }) - .collect()) -} diff --git a/crates/dry_run_core/src/schema/mod.rs b/crates/dry_run_core/src/schema/mod.rs index 0d6366a..9c17165 100644 --- a/crates/dry_run_core/src/schema/mod.rs +++ b/crates/dry_run_core/src/schema/mod.rs @@ -1,4 +1,5 @@ pub mod bloat; +mod from_pg_introspect; mod hash; mod introspect; pub mod profile;