Skip to content

Commit 77cdbe8

Browse files
committed
add: migration file to normalize python env payloads
1 parent 1097c24 commit 77cdbe8

1 file changed

Lines changed: 123 additions & 0 deletions

File tree

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
"""Re-normalize python_env payloads using ast.unparse after dropping astor.
2+
3+
SQLMesh previously used the third-party `astor` library to serialise Python
4+
function source code (`normalize_source`). That library has been replaced with
5+
the stdlib `ast.unparse`, which produces subtly different text for the same
6+
AST (e.g. `lambda : x` → `lambda: x`, condensed multi-line signatures, etc.).
7+
8+
Because `python_env` payloads are included in each snapshot's `data_hash`,
9+
any model that contains Python code (Python models, SQL models with Python
10+
macros/signals) would otherwise appear as *Directly Modified* after the upgrade,
11+
potentially triggering a full backfill.
12+
13+
This migration re-normalises every stored `Executable` payload of
14+
`kind == "definition"` via `ast.unparse(ast.parse(payload))`. The
15+
subsequent `_migrate_rows` pass then recomputes fingerprints from the updated
16+
payloads so that they match what the current code produces when loading models
17+
from disk. The migrated snapshots are flagged `migrated = True`, so no
18+
unexpected backfills are scheduled.
19+
"""
20+
21+
import ast
22+
import json
23+
24+
from sqlglot import exp
25+
26+
from sqlmesh.utils.migration import index_text_type, blob_text_type
27+
28+
29+
def migrate_schemas(engine_adapter, schema, **kwargs): # type: ignore
30+
pass
31+
32+
33+
def migrate_rows(engine_adapter, schema, **kwargs): # type: ignore
34+
import pandas as pd
35+
36+
snapshots_table = "_snapshots"
37+
if schema:
38+
snapshots_table = f"{schema}.{snapshots_table}"
39+
40+
index_type = index_text_type(engine_adapter.dialect)
41+
blob_type = blob_text_type(engine_adapter.dialect)
42+
43+
new_snapshots = []
44+
migration_needed = False
45+
46+
for (
47+
name,
48+
identifier,
49+
version,
50+
snapshot,
51+
kind_name,
52+
updated_ts,
53+
unpaused_ts,
54+
ttl_ms,
55+
unrestorable,
56+
forward_only,
57+
dev_version,
58+
fingerprint,
59+
) in engine_adapter.fetchall(
60+
exp.select(
61+
"name",
62+
"identifier",
63+
"version",
64+
"snapshot",
65+
"kind_name",
66+
"updated_ts",
67+
"unpaused_ts",
68+
"ttl_ms",
69+
"unrestorable",
70+
"forward_only",
71+
"dev_version",
72+
"fingerprint",
73+
).from_(snapshots_table),
74+
quote_identifiers=True,
75+
):
76+
parsed_snapshot = json.loads(snapshot)
77+
python_env = parsed_snapshot["node"].get("python_env") or {}
78+
for executable in python_env.values():
79+
if executable.get("kind") != "definition":
80+
continue
81+
new_payload = ast.unparse(ast.parse(executable["payload"])).strip()
82+
if new_payload != executable["payload"]:
83+
executable["payload"] = new_payload
84+
migration_needed = True
85+
86+
new_snapshots.append(
87+
{
88+
"name": name,
89+
"identifier": identifier,
90+
"version": version,
91+
"snapshot": json.dumps(parsed_snapshot),
92+
"kind_name": kind_name,
93+
"updated_ts": updated_ts,
94+
"unpaused_ts": unpaused_ts,
95+
"ttl_ms": ttl_ms,
96+
"unrestorable": unrestorable,
97+
"forward_only": forward_only,
98+
"dev_version": dev_version,
99+
"fingerprint": fingerprint,
100+
}
101+
)
102+
103+
if migration_needed and new_snapshots:
104+
engine_adapter.delete_from(snapshots_table, "TRUE")
105+
106+
engine_adapter.insert_append(
107+
snapshots_table,
108+
pd.DataFrame(new_snapshots),
109+
target_columns_to_types={
110+
"name": exp.DataType.build(index_type),
111+
"identifier": exp.DataType.build(index_type),
112+
"version": exp.DataType.build(index_type),
113+
"snapshot": exp.DataType.build(blob_type),
114+
"kind_name": exp.DataType.build(index_type),
115+
"updated_ts": exp.DataType.build("bigint"),
116+
"unpaused_ts": exp.DataType.build("bigint"),
117+
"ttl_ms": exp.DataType.build("bigint"),
118+
"unrestorable": exp.DataType.build("boolean"),
119+
"forward_only": exp.DataType.build("boolean"),
120+
"dev_version": exp.DataType.build(index_type),
121+
"fingerprint": exp.DataType.build(blob_type),
122+
},
123+
)

0 commit comments

Comments
 (0)