Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .devcontainer/devcontainer.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
"RSAPI_ALTPORT": "TRUE",
"RME_DATA_BUCKET": "riverscapes-athena",
"RME_ATHENA_OUTPUT_BUCKET": "riverscapes-athena-output",
"AWS_DEFAULT_REGION": "us-west-2",
// mod_spatialite installed via apt (libsqlite3-mod-spatialite)
"SPATIALITE_LIB": "/usr/lib/x86_64-linux-gnu/mod_spatialite.so",
"DATA_ROOT": "/workspaces/data"
Expand Down
16 changes: 15 additions & 1 deletion .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,7 @@
// },
// This is the new version that pulls aux from Google Postgres
{
"name": "Python: huc10_athena",
"name": "Deprecated Python: huc10_athena",
"type": "debugpy",
"request": "launch",
"program": "${workspaceFolder}/pipelines/rscontext_to_athena/huc10_athena.py",
Expand All @@ -516,6 +516,20 @@
// "--delete"
],
},
{
"name": "🆕 Scrape rs_context to Athena",
"type": "debugpy",
"request": "launch",
"program": "${workspaceFolder}/pipelines/rscontext_to_athena/rscontext_to_athena.py",
"console": "integratedTerminal",
"env": {
"PYTHONPATH": "${workspaceFolder}"
},
"args": [
"${input:environment}",
"{env:DATA_ROOT}/huc10_athena",
],
},
{
"name": "Add ChaMP Aux Measurements to Topo - SQLite",
"type": "debugpy",
Expand Down
68 changes: 32 additions & 36 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -151,93 +151,89 @@ We've loaded it as a requirement in the toml: `https://github.com/Riverscapes/R
Partition hierarchy (three levels):

1. `authority` – repository root name (e.g. `data-exchange-scripts`). Derived automatically from the git repo folder name.
2. `authority_name` – the tool / package authority publishing the layer definitions (from JSON).
2. `tool_schema_name` – the tool / package name publishing the layer definitions (from JSON).
3. `tool_schema_version` – semantic version of the tool's layer definition schema (from JSON).

Output pattern:

```text
dist/index.json
dist/metadata/
authority=<repo>/authority_name=<authority>/tool_schema_version=<version>/layer_metadata.parquet
index.json
authority=<repo>/tool_schema_name=<name>/tool_schema_version=<version>/layer_metadata.parquet
```

Default behavior:

- Output format: Parquet (use `--format csv` for CSV).
- Partition columns (`authority`, `authority_name`, `tool_schema_version`) are NOT inside the Parquet/CSV files unless `--include-partition-cols` is passed.
- A `commit_sha` (current HEAD) is written into every row and stored again in `index.json` with a run timestamp.
- Schema validation is enforced; any validation error causes a loud failure (non-zero exit code). An `index.json` with `status: validation_failed` and the collected `validation_errors` is still written for diagnostics, but no partition files are produced.
- Partition columns (`authority`, `tool_schema_name`, `tool_schema_version`) are always written inside the Parquet/CSV files (the Athena table is not partitioned).
- The `dist/` directory is cleaned before each run.
- A `commit_sha` (current HEAD) is written into every row and stored again in `dist/index.json` with a run timestamp.
- Schema validation is enforced; any validation error causes a loud failure (non-zero exit code). A `dist/index.json` with `status: validation_failed` and the collected `validation_errors` is still written for diagnostics, but no partition files are produced.

Run locally:

```bash
python scripts/metadata/export_layer_definitions_for_s3.py
uv run export-layer-definitions-for-s3 --root .
```

Optional flags:

```bash
python scripts/metadata/export_layer_definitions_for_s3.py --format csv # CSV instead of Parquet
python scripts/metadata/export_layer_definitions_for_s3.py --include-partition-cols # Embed partition columns in each file
uv run export-layer-definitions-for-s3 --root . --format csv # CSV instead of Parquet
uv run export-layer-definitions-for-s3 --root . --dry-run # Validate only, no files written
```

### Athena External Table
Validate only (no output files):

```bash
uv run validate-metadata --root .
```

We publish to: `s3://riverscapes-athena/metadata/layer_definitions/`
### Athena External Table

This gets turned into the athena table default.layer_definitions.
We publish to: `s3://riverscapes-athena/riverscapes_metadata/layer_definitions_raw/X.X/` where XX is riverscapes metadata schema version number.

Add new partitions (after upload):
Although the folder is structure is set up as if for partitions, the Athena table is not partitioned.

```sql
-- auto-discover
MSCK REPAIR TABLE layer_definitions;
-- OR manual:
ALTER TABLE layer_definitions
ADD IF NOT EXISTS PARTITION (
authority='data-exchange-scripts',
authority_name='rme_to_athena',
tool_schema_version='1.0.0'
)
LOCATION 's3://riverscapes-athena/metadata/layer_definitions/authority=data-exchange-scripts/authority_name=rme_to_athena/tool_schema_version=1.0.0/';
```
This gets turned into the athena table default.layer_definitions and the view `layer_definitions_latest`.

### GitHub Actions Workflow

Workflow file: `.github/workflows/metadata-catalog.yml`

Steps performed:
The workflow has two jobs:

**`validate`** (runs on all branches and PRs):
1. Checkout code.
2. Install Python 3.12 + `uv sync`.
3. Run `export-layer-definitions-for-s3 --root .` → validates and produces partitioned Parquet under `dist/`.
4. Uploads `dist/` as a build artifact.

**`build-and-publish`** (runs on `main` branch only, after `validate`):
1. Checkout code.
2. Assume AWS IAM role via OIDC (secret `METADATA_PUBLISH_ROLE_ARN`).
3. Install dependencies (Python 3.12 + `uv sync`).
4. Run flatten script -> partitioned Parquet.
5. Sync `dist/metadata` to S3 bucket prefix.
6. Run `MSCK REPAIR TABLE` to load partitions.
7. Perform sample queries (partition listing / row count).
2. Configure AWS credentials via OIDC (secret `METADATA_PUBLISH_ROLE_ARN`).
3. Install Python 3.12 + `uv sync`.
4. Download the `dist/` artifact from the `validate` job.
5. Run `publish-metadata-to-s3 --root .` → uploads Parquet files to S3 and runs a row-count verification query on Athena.

### IAM Role (Least Privilege Summary)

The role must allow:

- S3 List/Get/Put/Delete under `metadata/layer_definitions/` and query result prefix.
- S3 List/Get/Put/Delete under `riverscapes_metadata/layer_definitions_raw/` and query result prefix.
- Athena: StartQueryExecution, GetQueryExecution, GetQueryResults.
- Glue: Get/Create/Update table & partitions for the database/table.

### Future Enhancements

- Validate layer schemas (dtype whitelist, required fields & semantic checks).
- Explicit partition adds instead of MSCK for faster updates.
- Historical snapshots (extra partition like `snapshot_date`).
- Glue Catalog integration (automated table & partition registration without MSCK).
- Data quality profile summary (row counts, distinct key coverage) in `index.json`.

## Troubleshooting Metadata

| Symptom | Likely Cause | Fix |
|---------|--------------|-----|
| Empty Athena table | Partitions not loaded | Run `MSCK REPAIR TABLE` or add partitions manually |
| Wrong data types | Created table before column rename | Drop & recreate external table with new DDL |
| Missing new version | Workflow didn’t run or lacked perms | Check Actions logs & IAM role policies |
| Zero rows for authority | Upload sync failed | Inspect S3 prefix & re-run workflow |
Expand Down
7 changes: 4 additions & 3 deletions pipelines/rme_to_athena/rme_to_athena_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
DATA_BUCKET_ENV_VAR = "RME_DATA_BUCKET"
OUTPUT_BUCKET_ENV_VAR = "RME_ATHENA_OUTPUT_BUCKET"
DEFAULT_DATA_BUCKET = "riverscapes-athena"
BASE_S3_KEY = "data_exchange/rs_metric_engine2"

DATA_BUCKET = os.getenv(DATA_BUCKET_ENV_VAR, DEFAULT_DATA_BUCKET)
ATHENA_OUTPUT_BUCKET = os.getenv(OUTPUT_BUCKET_ENV_VAR, DATA_BUCKET) # fallback to data bucket if not set
Expand All @@ -57,7 +58,7 @@
huc_projects_scraped as
(select substr(huc12, 1, 10) as huc10,
raw_rme_pq2.rme_date_created_ts
from raw_rme_pq2)
from rs_raw.raw_rs_metric_engine2 raw_rme_pq2)
select distinct project_id, huc, created_on, rme_date_created_ts
from huc_projects_dex dex
left join huc_projects_scraped scr on dex.huc = scr.huc10
Expand Down Expand Up @@ -314,7 +315,7 @@ def scrape_rme(
rme_pq_filepath = huc_dir / f'rme_{project.huc}.parquet'
data_gdf.to_parquet(rme_pq_filepath)
# do not use os.path.join because this is aws os, not system os
s3_key = f'data_exchange/riverscape_metrics/{rme_pq_filepath.name}'
s3_key = f'{BASE_S3_KEY}/{rme_pq_filepath.name}'
upload_to_s3(rme_pq_filepath, data_bucket, s3_key)

if delete_downloads_when_done:
Expand All @@ -323,7 +324,7 @@ def scrape_rme(
prg.update(count)
except Exception as e:
log.error(f'Error scraping HUC {project.huc}: {e}')
raise
# raise
prg.finish()


Expand Down
1 change: 1 addition & 0 deletions pipelines/rscontext_to_athena/huc10_athena.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""
Scrape HUC10 information and load it to Athena.
DEPRECATED - USE rscontext_to_athena instead.

Philip Bailey
5 Oct 2025
Expand Down
Loading
Loading