Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/prerequisites.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ This page describes the system prerequisites needed to run SQLMesh and provides

## SQLMesh prerequisites

You'll need Python 3.8 or higher to use SQLMesh. You can check your python version by running the following command:
You'll need Python 3.9 or higher to use SQLMesh. You can check your python version by running the following command:
```bash
python3 --version
```
Expand Down
2 changes: 0 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ authors = [{ name = "SQLMesh Contributors" }]
license = { file = "LICENSE" }
requires-python = ">= 3.9"
dependencies = [
"astor",
"click",
"croniter",
"duckdb>=0.10.0,!=0.10.3",
Expand Down Expand Up @@ -202,7 +201,6 @@ disable_error_code = "annotation-unchecked"
[[tool.mypy.overrides]]
module = [
"api.*",
"astor.*",
"IPython.*",
"hyperscript.*",
"py.*",
Expand Down
3 changes: 1 addition & 2 deletions sqlmesh/core/model/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import typing as t
from pathlib import Path

from astor import to_source
from difflib import get_close_matches
from sqlglot import exp
from sqlglot.helper import ensure_list
Expand Down Expand Up @@ -387,7 +386,7 @@ def get_first_arg(keyword_arg_name: str) -> t.Any:
)

try:
expression = to_source(first_arg)
expression = ast.unparse(t.cast(ast.expr, first_arg))
return eval(expression, env, local_env)
except Exception:
if strict_resolution:
Expand Down
123 changes: 123 additions & 0 deletions sqlmesh/migrations/v0102_normalize_python_env_payloads.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
"""Re-normalize python_env payloads using ast.unparse after dropping astor.

SQLMesh previously used the third-party `astor` library to serialise Python
function source code (`normalize_source`). That library has been replaced with
the stdlib `ast.unparse`, which produces subtly different text for the same
AST (e.g. `lambda : x` → `lambda: x`, condensed multi-line signatures, etc.).

Because `python_env` payloads are included in each snapshot's `data_hash`,
any model that contains Python code (Python models, SQL models with Python
macros/signals) would otherwise appear as *Directly Modified* after the upgrade,
potentially triggering a full backfill.

This migration re-normalises every stored `Executable` payload of
`kind == "definition"` via `ast.unparse(ast.parse(payload))`. The
subsequent `_migrate_rows` pass then recomputes fingerprints from the updated
payloads so that they match what the current code produces when loading models
from disk. The migrated snapshots are flagged `migrated = True`, so no
unexpected backfills are scheduled.
"""

import ast
import json

from sqlglot import exp

from sqlmesh.utils.migration import index_text_type, blob_text_type


def migrate_schemas(engine_adapter, schema, **kwargs): # type: ignore
pass


def migrate_rows(engine_adapter, schema, **kwargs): # type: ignore
import pandas as pd

snapshots_table = "_snapshots"
if schema:
snapshots_table = f"{schema}.{snapshots_table}"

index_type = index_text_type(engine_adapter.dialect)
blob_type = blob_text_type(engine_adapter.dialect)

new_snapshots = []
migration_needed = False

for (
name,
identifier,
version,
snapshot,
kind_name,
updated_ts,
unpaused_ts,
ttl_ms,
unrestorable,
forward_only,
dev_version,
fingerprint,
) in engine_adapter.fetchall(
exp.select(
"name",
"identifier",
"version",
"snapshot",
"kind_name",
"updated_ts",
"unpaused_ts",
"ttl_ms",
"unrestorable",
"forward_only",
"dev_version",
"fingerprint",
).from_(snapshots_table),
quote_identifiers=True,
):
parsed_snapshot = json.loads(snapshot)
python_env = parsed_snapshot["node"].get("python_env") or {}
for executable in python_env.values():
if executable.get("kind") != "definition":
continue
new_payload = ast.unparse(ast.parse(executable["payload"])).strip()
if new_payload != executable["payload"]:
executable["payload"] = new_payload
migration_needed = True

new_snapshots.append(
{
"name": name,
"identifier": identifier,
"version": version,
"snapshot": json.dumps(parsed_snapshot),
"kind_name": kind_name,
"updated_ts": updated_ts,
"unpaused_ts": unpaused_ts,
"ttl_ms": ttl_ms,
"unrestorable": unrestorable,
"forward_only": forward_only,
"dev_version": dev_version,
"fingerprint": fingerprint,
}
)

if migration_needed and new_snapshots:
engine_adapter.delete_from(snapshots_table, "TRUE")

engine_adapter.insert_append(
snapshots_table,
pd.DataFrame(new_snapshots),
target_columns_to_types={
"name": exp.DataType.build(index_type),
"identifier": exp.DataType.build(index_type),
"version": exp.DataType.build(index_type),
"snapshot": exp.DataType.build(blob_type),
"kind_name": exp.DataType.build(index_type),
"updated_ts": exp.DataType.build("bigint"),
"unpaused_ts": exp.DataType.build("bigint"),
"ttl_ms": exp.DataType.build("bigint"),
"unrestorable": exp.DataType.build("boolean"),
"forward_only": exp.DataType.build("boolean"),
"dev_version": exp.DataType.build(index_type),
"fingerprint": exp.DataType.build(blob_type),
},
)
11 changes: 7 additions & 4 deletions sqlmesh/utils/metaprogramming.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
from numbers import Number
from pathlib import Path

from astor import to_source

from sqlmesh.core import constants as c
from sqlmesh.utils import format_exception, unique
from sqlmesh.utils.errors import SQLMeshError
Expand Down Expand Up @@ -267,14 +265,19 @@ def normalize_source(obj: t.Any) -> str:

# remove docstrings
body = node.body
if body and isinstance(body[0], ast.Expr) and isinstance(body[0].value, ast.Str):
if (
body
and isinstance(body[0], ast.Expr)
and isinstance(body[0].value, ast.Constant)
and isinstance(body[0].value.value, str)
):
node.body = body[1:]

# remove function return type annotation
if isinstance(node, ast.FunctionDef):
node.returns = None

return to_source(root_node).strip()
return ast.unparse(root_node).strip()


def build_env(
Expand Down
12 changes: 6 additions & 6 deletions tests/utils/test_metaprogramming.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import ast
import typing as t
from contextlib import contextmanager
from dataclasses import dataclass
Expand Down Expand Up @@ -50,7 +51,7 @@ def test_print_exception(mocker: MockerFixture):
except Exception as ex:
print_exception(ex, test_env, out_mock)

expected_message = r""" File ".*?.tests.utils.test_metaprogramming\.py", line 49, in test_print_exception
expected_message = r""" File ".*?.tests.utils.test_metaprogramming\.py", line 50, in test_print_exception
eval\("test_fun\(\)", env\).*

File '/test/path.py' \(or imported file\), line 2, in test_fun
Expand Down Expand Up @@ -220,8 +221,7 @@ def closure() -> int:
def test_normalize_source() -> None:
assert (
normalize_source(main_func)
== """def main_func(y: int, foo=exp.true(), *, bar=expressions.Literal.number(1) + 2
):
== """def main_func(y: int, foo=exp.true(), *, bar=expressions.Literal.number(1) + 2):
sqlglot.parse_one('1')
MyClass(47)
DataClass(x=y)
Expand Down Expand Up @@ -271,8 +271,7 @@ def test_serialize_env() -> None:
name="main_func",
alias="MAIN",
path="test_metaprogramming.py",
payload="""def main_func(y: int, foo=exp.true(), *, bar=expressions.Literal.number(1) + 2
):
payload="""def main_func(y: int, foo=exp.true(), *, bar=expressions.Literal.number(1) + 2):
sqlglot.parse_one('1')
MyClass(47)
DataClass(x=y)
Expand Down Expand Up @@ -370,7 +369,8 @@ def sample_context_manager():
"my_lambda": Executable(
name="my_lambda",
path="test_metaprogramming.py",
payload="my_lambda = lambda : print('z')",
# Match normalize_source output across Python versions
payload=ast.unparse(ast.parse("my_lambda = lambda: print('z')")).strip(),
),
"normalize_model_name": Executable(
payload="from sqlmesh.core.dialect import normalize_model_name",
Expand Down
Loading