Skip to content

[FEATURE](pyspark) PySpark validation generated from the Pydantic schema#518

Draft
Seth Fitzsimmons (sethfitz) wants to merge 8 commits into
mainfrom
pyspark-codegen
Draft

[FEATURE](pyspark) PySpark validation generated from the Pydantic schema#518
Seth Fitzsimmons (sethfitz) wants to merge 8 commits into
mainfrom
pyspark-codegen

Conversation

@sethfitz
Copy link
Copy Markdown
Collaborator

@sethfitz Seth Fitzsimmons (sethfitz) commented May 11, 2026

Replacement PR description:


Summary

Adds a new runtime package (overture-schema-pyspark) plus a new output target in overture-schema-codegen that emits PySpark validation expressions and conformance tests from the same Pydantic models that define the schema. Ships an overture-validate CLI (to be merged with the overture-schema CLI later; that will be an opportunity to improve its ergonomics) for running the validation against Parquet on disk or in S3.

PySpark plugs in as a peer of the existing Markdown output target: same FeatureSpec extraction, same four-layer architecture (Discovery -> Extraction -> Output Layout -> Rendering), new pipeline module. See packages/overture-schema-codegen/docs/design.md for the full picture; the "PySpark Pipeline" section there covers the new stages in detail.

What's in the PR

packages/overture-schema-pyspark/ -- runtime. Public API in validate.py (validate_feature, explain_errors), schema comparison in schema_check.py, dataclasses in check.py, the overture-validate CLI in cli.py, and shared expression building blocks in expressions/{constraint_expressions,column_patterns,_schema_structs}.py. The per-feature expression modules under expressions/generated/overture/schema/<theme>/<feature>.py and per-feature conformance tests under tests/generated/overture/schema/<theme>/test_<feature>.py are emitted by codegen and confined to a generated/ boundary that make generate-pyspark wipes and recreates. _registry.py walks that tree at import time and exposes REGISTRY: dict[str, FeatureValidation] keyed by each module's ENTRY_POINT value (e.g. "overture.schema.transportation:Segment"), plus a parallel PARTITION_MAP derived from each module's PARTITIONS dict for partitioned features.

This has been tested to work with Spark versions 3.4.0 - 4.1.1 (and the lowest-direct CI check will verify that it continues to work with the lowest declared PySpark version, which is currently 3.4).

Adam Lastowka (@Rachmanin0xFF) the public API changed since the previous version you tested (in an attempt to simplify how it gets integrated with the CDP). If you point me to its current state, I can propose an update that uses the new API.

packages/overture-schema-codegen/src/overture/schema/codegen/pyspark/ -- new output target. Pipeline stages:

FeatureSpec (from extraction)
    |
constraint_dispatch.py    constraints -> ExpressionDescriptor / ModelConstraintDescriptor
    |
check_builder.py          FieldShape tree -> Check / ModelCheck IR
                          (check_ir.py; resolves array nesting, variant gating via
                          FieldPath = ScalarPath | ArrayPath and Guard sum types)
schema_builder.py         FieldShape tree -> SchemaField list (StructType source)
test_data/                FeatureSpec -> BASE_ROW_SPARSE / BASE_ROW_POPULATED, scaffold,
                          invalid_value
    |
renderer.py               Check IR -> per-feature expression module
test_renderer.py          Check IR -> per-feature conformance test module
    |
pipeline.py               orchestrates, returns GeneratedModule list

make generate-pyspark wipes both generated/ trees and recreates them; make check gates on regeneration being current.

What's covered

Every constraint Pydantic enforces today is dispatched to a PySpark expression, which allows Spark to push down predicates and take advantage of Catalyst to avoid unnecessary deserialization:

  • Field constraints: Ge / Gt / Le / Lt / Interval, ArrayMinLen / ArrayMaxLen / ScalarMinLen / ScalarMaxLen (typed length constraints split by attachment layer), StrippedConstraint, PatternConstraint, UniqueItemsConstraint, GeometryTypeConstraint, JsonPointerConstraint.
  • NewType overrides: LinearlyReferencedRange (length / bounds / order).
  • Base-type overrides: HttpUrl (format + length), EmailStr, BBox (completeness, lat ordering, lat range).
  • Model constraints: RequireAnyOfConstraint, RadioGroupConstraint, RequireIfConstraint, ForbidIfConstraint, MinFieldsSetConstraint. MinFieldsSetConstraint mirrors Pydantic's model_fields_set semantics: required fields are always set (the constructor enforces them) and count toward the threshold alongside any explicitly-set optional fields. NoExtraFieldsConstraint is intentionally skipped, as its behavior is implicit when using DataFrames.

Known semantic gaps / differences

Divergences from Pydantic:

  • UniqueItemsConstraint uses Spark's array_distinct, which compares whole elements with structural equality on raw stored values. Pydantic compares normalized Python objects -- e.g., list[HttpUrl] is compared after URL normalization (such as ensuring that URLs like http://example.com contain a trailing /: http://example.com/). The PySpark check catches exact duplicates, not duplicates after normalization.
  • require_any_of checks isNotNull as a proxy for Pydantic's model_fields_set. Parquet has no equivalent of "explicitly provided" because of the tabular nature of DataFrames; isNotNull is stricter (it rejects fields explicitly set to null).
  • BBox's PySpark expressions include checking latitude ordering and range. Longitude is skipped due to undefined behavior around how we handle features that cross the antimeridian.

CLI

$ overture-validate --help
Usage: overture-validate [OPTIONS] FEATURE_TYPE PATH

  Validate Overture data at PATH and write annotated Parquet.

Options:
  -o, --output TEXT            Output path for validated Parquet.
  --head INTEGER               Error rows to display.  [default: 20]
  --conf TEXT                  Spark config key=value pairs.
  --count-only                 Report error count only; skip explain/unpivot.
  --skip-schema-check          Warn on schema mismatches instead of aborting.
  --skip-columns TEXT          Columns declared absent from data; skips their
                               checks.
  --ignore-extra-columns TEXT  Extra data columns to ignore in schema
                               comparison.
  --suppress TEXT              Suppress checks: FIELD (all checks) or
                               FIELD:CHECK (specific).
  --help                       Show this message and exit.

Output is one row per violation: feature ID, theme/type, failing field, check name, message, offending value.

Testing

overture-schema-codegen generates conformance tests alongside the PySpark expressions as a sanity-check by evaluating expected-good and expected-bad rows within a single DataFrame (setup/teardown overhead for DataFrame-per-test-case was much too high). Each feature gets two baseline rows -- BASE_ROW_SPARSE (required fields only) and BASE_ROW_POPULATED (every optional field filled in) -- and every scenario is exercised against both, so checks that depend on optional structure being present are covered as well as the minimal-row case.

make check generates tests and runs the full suite over everything, which use PySpark and increase test runtime (again; they run in just over a minute for me). testmon was introduced to improve the developer experience by skipping tests that weren't affected by recent edits.

Beyond the unit and conformance tests:

# Local Parquet
overture-validate segment samples/segment.parquet --count-only

# Real release prefix (expect bbox Float-vs-Double mismatch -> use --skip-schema-check)
# partitions (theme=places, type=place) will automatically be detected
# note that the s3a protocol must be used when AWS-provided Hadoop filesystem JARs are not available
overture-validate place s3a://overturemaps-us-west-2/release/<release>/ --skip-schema-check --head 50

Notes for review

  • This PR is intentionally large because the generated tree is large. The interesting surface is smaller: pyspark/{constraint_dispatch,check_builder,check_ir,schema_builder,renderer,test_renderer,pipeline}.py plus the runtime in overture-schema-pyspark/src/overture/schema/pyspark/. Everything under generated/ is regenerable output -- review the codegen, but skim the output to understand the shape of what's produced.
  • Bundles supporting changes (testmon for incremental test runs, VehicleSelectorBase extraction, Java 17 CI pin) rather than splitting them into separate PRs.
  • This is a draft; expect force-pushes as cleanup continues, although the bulk of my review/refactoring is complete.

Closes #517.

@github-actions
Copy link
Copy Markdown

github-actions Bot commented May 11, 2026

🗺️ Schema reference docs preview is live!

🌍 Preview https://staging.overturemaps.org/schema/pr/518/schema/index.html
🕐 Updated May 20, 2026 21:19 UTC
📝 Commit 4b55cc0
🔧 env SCHEMA_PREVIEW true

Note

♻️ This preview updates automatically with each push to this PR.

pytest-testmon tracks which tests cover which source files and skips
unaffected tests on subsequent runs. Activated via a TESTMON Makefile
variable so the default `make check` uses incremental selection while
`make check TESTMON=` runs the full suite.

Lock the dependency in the dev group, gitignore the local cache file,
and thread $(TESTMON) through the test, test-all, and test-only
targets.

Signed-off-by: Seth Fitzsimmons <seth@mojodna.net>
Pull the shared `dimension` and `comparison` fields of the five vehicle
selector subtypes into a `VehicleSelectorBase` parent, and thread
`discriminator="dimension"` through the `VehicleSelector` annotated
union.

The discriminator turns the union into a Pydantic discriminated union,
so it serializes as JSON Schema's `oneOf` + `discriminator` rather than
`anyOf`. Regenerated segment_baseline_schema.json captures the new
shape.

This is a prerequisite for downstream tooling that walks discriminated
unions structurally (e.g. PySpark codegen for segment's nested vehicle
scoping).

Signed-off-by: Seth Fitzsimmons <seth@mojodna.net>
Replace the Tonga-based Division/DivisionArea/DivisionBoundary
fixtures with Kauaʻi County samples that exercise admin_level,
capital_division_ids, wikidata, and source license alongside the
existing fields.

Replace the Tonga-based Connector/Segment fixtures with a Vermooten
Street junction in Pretoria that exercises access_restrictions with
when.vehicle, speed_limits with when.heading, routes with ref,
road_surface, and multi-source attribution.

Reformat the TOML with 4-space indents and sorted keys to match
sibling theme packages.

Signed-off-by: Seth Fitzsimmons <seth@mojodna.net>
Introduce overture-schema-pyspark, a runtime PySpark validation
package whose per-feature expression modules and conformance tests
are generated from the same Pydantic models that define the schema,
along with an `overture-validate` CLI.

Runtime (overture-schema-pyspark/src/overture/schema/pyspark/):

- check.py — Check, CheckShape, FeatureValidation dataclasses.
- schema_check.py — write-first comparison of Spark schemas against
  an expected StructType, with structural type matching and
  SchemaMismatch reporting.
- validate.py — public API: validate_feature(), evaluate_checks(),
  explain_errors(). The explain stage UNPIVOTs per-row check results
  into one row per violation, preserving all input columns for
  downstream join-back.
- cli.py — `overture-validate <parquet-or-directory>` runs the
  validation pipeline against a path of GeoParquet files. Output is
  one row per violation: feature ID, theme/type, failing field,
  check name, offending value. Single-pass evaluation keeps memory
  bounded for arbitrarily large inputs.
- expressions/ — shared runtime utilities (constraint_expressions,
  column_patterns, _schema_structs). Per-feature expression modules
  live under expressions/overture/ and are added by the codegen in
  a follow-up commit.
- tests/_support/ — conformance test infrastructure (scenarios,
  harness, helpers, mutations). The harness builds one DataFrame
  per feature, applies all scenarios as deterministic-UUID-tagged
  rows, runs validation once, and indexes violations back to
  scenario IDs — O(checks) rather than O(checks * scenarios).

CLI filtering options:

  --theme <theme>           limit to one theme
  --feature <feature>       limit to one feature type
  --skip-schema-check       run only constraint checks (no schema
                            comparison)
  --count-only              print violation counts per check rather
                            than rows
  --suppress <key>          suppress specific (feature, field, check)
                            triples per a YAML config

Codegen pipeline (overture-schema-codegen/src/.../pyspark/):

    FeatureSpec
        |
    constraint_dispatch.py   map constraints to descriptors
        |
    check_builder.py         walk FieldSpec -> CheckNode IR;
                             resolve array nesting, variant gating
        |
    schema_builder.py        FieldSpec -> SchemaField list
                             (StructType source)
        |
    renderer.py              CheckNode -> per-feature expression
                             module
    test_renderer.py         CheckNode -> per-feature conformance
                             test module
    synthetic.py             FeatureSpec -> BASE_ROW + invalid values
        |
    pipeline.py              orchestrate, return GeneratedModule list

The dispatch tables map every supported constraint (Ge/Gt/Le/Lt/
Interval, MinLen/MaxLen, StrippedConstraint, PatternConstraint,
UniqueItemsConstraint, GeometryTypeConstraint, JsonPointerConstraint,
RequireAnyOfConstraint, RadioGroupConstraint, RequireIfConstraint,
ForbidIfConstraint, MinFieldsSetConstraint), NewType (Country-
CodeAlpha2, LinearlyReferencedRange, RegionCode), and base type
(HttpUrl, EmailStr) to constraint_expressions check functions.

Discriminated unions (segment is the canonical hard case) split
into per-arm test files. The codegen handles arm splitting via
generate_arm_rows in synthetic.py and _filter_field_nodes_for_arm
in test_renderer.py.

The Makefile gains a `generate-pyspark` target and gates `check`
on it so a stale generation surfaces immediately. The CLI is exposed
as a `[project.scripts]` entry point so `overture-validate`
becomes available after `pip install` / `uv sync`.

Signed-off-by: Seth Fitzsimmons <seth@mojodna.net>
Generate PySpark expressions (and tests) for models defined in the
workspace

Signed-off-by: Seth Fitzsimmons <seth@mojodna.net>
PySpark 3.4 (the declared floor) doesn't run on Java 21, the default
JDK on ubuntu-latest runners -- it hits NoSuchMethodException on
java.nio.DirectByteBuffer.<init>(long, int), removed in JDK 21. Pin
the lowest-direct cell to Java 17 so the resolved pyspark==3.4.0 can
actually start. The default cell (which resolves to a current pyspark
4.x) keeps the runner's default Java 21.

Signed-off-by: Seth Fitzsimmons <seth@mojodna.net>
validate_feature built check expressions referencing every column the
schema declares, then evaluated them with an eager df.select. When the
input DataFrame lacked a declared column, Spark's plan analysis raised
an AnalysisException before the caller could inspect the schema
mismatch, so a file missing a required column produced a Java stack
trace instead of the schema-mismatch report the CLI is built to emit.

Columns that compare_schemas reports as absent from the data now have
their checks dropped, the same as --skip-columns columns; referencing
them is what crashes Spark. The mismatch is still recorded in
schema_mismatches, so the CLI reports it and exits cleanly (or, with
--skip-schema-check, validates the columns that are present).

The CLI also prints the --skip-columns invocation for the absent
columns, so the escape hatch is discoverable from the error itself.

Signed-off-by: Seth Fitzsimmons <seth@mojodna.net>
Model-level constraints (require_any_of and the like) generated for a
sub-model reached through an optional field fired even when that field
was null. Pydantic skips a model validator when the optional sub-model
is absent, so the generated PySpark expression produced a false positive
the schema itself never raises.

ModelCheck now carries a gate: the optional-ancestor path that must be
non-null for the constraint to apply. check_builder sets it when the
constrained model is reached via an optional struct field inside an
array; the renderer wraps the constraint in
F.when(<accessor>.isNotNull(), ...).

Regenerated Segment expressions: the speed_limits[].when,
access_restrictions[].when, and prohibited_transitions[].when
require_any_of checks are now skipped when their when sub-model is null.

Signed-off-by: Seth Fitzsimmons <seth@mojodna.net>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Validate Overture data on Spark against the Pydantic schema

1 participant