Skip to content

Commit fb8c119

Browse files
authored
Merge branch 'main' into feat/clustered-by-auto-none
2 parents 2586168 + abac8a7 commit fb8c119

12 files changed

Lines changed: 206 additions & 10 deletions

File tree

docs/concepts/overview.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,4 +68,4 @@ SQLMesh automatically runs audits when you apply a `plan` to an environment, or
6868
## Infrastructure and orchestration
6969
Every company's data infrastructure is different. SQLMesh is flexible with regard to which engines and orchestration frameworks you use — its only requirement is access to the target SQL/analytics engine.
7070

71-
SQLMesh keeps track of model versions and processed data intervals using your existing infrastructure. SQLMesh it automatically creates a `sqlmesh` schema in your data warehouse for its internal metadata.
71+
SQLMesh keeps track of model versions and processed data intervals using your existing infrastructure. It automatically creates a `sqlmesh` schema in your data warehouse for its internal metadata.

docs/guides/configuration.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,16 @@ The examples specify a Snowflake connection whose password is stored in an envir
170170
account: <account>
171171
```
172172

173+
!!! tip "Base64-encoded secrets"
174+
175+
If a secret is distributed base64-encoded in a single environment variable (for example a BigQuery service-account key), pipe the variable through the built-in `b64decode` filter to decode it to text inline:
176+
177+
```yaml
178+
keyfile_json: {{ env_var('BIGQUERY_KEY_B64') | b64decode }}
179+
```
180+
181+
A matching `b64encode` filter is also available. Both return UTF-8 text, so they are intended for string/JSON secrets rather than arbitrary binary data.
182+
173183
=== "Python"
174184

175185
Python accesses environment variables via the `os` library's `environ` dictionary.

sqlmesh/core/dialect.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -804,7 +804,10 @@ def format_model_expressions(
804804
A string representing the formatted model.
805805
"""
806806
if len(expressions) == 1 and is_meta_expression(expressions[0]):
807-
return expressions[0].sql(pretty=True, dialect=dialect)
807+
# Meta expressions (MODEL/AUDIT/METRIC) are SQLMesh DDL, not standard SQL,
808+
# so they must never be transpiled to the target dialect (e.g. tsql would
809+
# rewrite a boolean property like `allow_partials TRUE` to `(1 = 1)`).
810+
return expressions[0].sql(pretty=True, dialect=None)
808811

809812
if rewrite_casts:
810813

@@ -836,7 +839,14 @@ def cast_to_colon(node: exp.Expr) -> exp.Expr:
836839
expressions = new_expressions
837840

838841
return ";\n\n".join(
839-
expression.sql(pretty=True, dialect=dialect, **kwargs) for expression in expressions
842+
# Meta expressions (MODEL/AUDIT/METRIC) are SQLMesh DDL and must stay
843+
# dialect-agnostic; only the actual query/statement expressions transpile.
844+
expression.sql(
845+
pretty=True,
846+
dialect=None if is_meta_expression(expression) else dialect,
847+
**kwargs,
848+
)
849+
for expression in expressions
840850
).strip()
841851

842852

sqlmesh/core/engine_adapter/fabric.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_result
1010
from sqlmesh.core.engine_adapter.mssql import MSSQLEngineAdapter
1111
from sqlmesh.core.engine_adapter.shared import (
12+
CommentCreationTable,
13+
CommentCreationView,
1214
InsertOverwriteStrategy,
1315
)
1416
from sqlmesh.utils.errors import SQLMeshError
@@ -30,6 +32,10 @@ class FabricEngineAdapter(MSSQLEngineAdapter):
3032
SUPPORTS_TRANSACTIONS = False
3133
SUPPORTS_CREATE_DROP_CATALOG = True
3234
INSERT_OVERWRITE_STRATEGY = InsertOverwriteStrategy.DELETE_INSERT
35+
# There is no standard method to handle comments in Fabric for now, so we disable it.
36+
# Otherwise, it would be inherited from MSSQL and would not work.
37+
COMMENT_CREATION_TABLE = CommentCreationTable.UNSUPPORTED
38+
COMMENT_CREATION_VIEW = CommentCreationView.UNSUPPORTED
3339

3440
def __init__(
3541
self, connection_factory_or_pool: t.Union[t.Callable, t.Any], *args: t.Any, **kwargs: t.Any

sqlmesh/integrations/dlt.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,7 @@ def generate_incremental_model(
208208
FROM
209209
{from_clause}
210210
WHERE
211-
{time_column} BETWEEN @start_ds AND @end_ds
211+
{time_column} BETWEEN @start_ts AND @end_ts
212212
"""
213213

214214

sqlmesh/utils/jinja.py

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from __future__ import annotations
22

3+
import base64
34
import importlib
45
import json
56
import re
@@ -28,11 +29,40 @@
2829
SQLMESH_JINJA_PACKAGE = "sqlmesh.utils.jinja"
2930

3031

32+
def b64decode(value: t.Union[str, bytes]) -> str:
33+
"""Decode a base64-encoded value and return it as UTF-8 text.
34+
35+
Intended for base64-encoded string/JSON secrets (for example a service-account
36+
key stored in an environment variable), not arbitrary binary payloads.
37+
"""
38+
decoded = value.encode("utf-8") if isinstance(value, str) else value
39+
return base64.b64decode(decoded).decode("utf-8")
40+
41+
42+
def b64encode(value: t.Union[str, bytes]) -> str:
43+
"""Base64-encode a value and return the encoding as UTF-8 text.
44+
45+
The input is treated as UTF-8 text, mirroring ``b64decode``; it is intended for
46+
string/JSON secrets rather than arbitrary binary payloads.
47+
"""
48+
encoded = value.encode("utf-8") if isinstance(value, str) else value
49+
return base64.b64encode(encoded).decode("utf-8")
50+
51+
52+
def create_builtin_filters() -> t.Dict[str, t.Callable]:
53+
return {
54+
"b64decode": b64decode,
55+
"b64encode": b64encode,
56+
}
57+
58+
3159
def environment(**kwargs: t.Any) -> Environment:
3260
extensions = kwargs.pop("extensions", [])
3361
extensions.append("jinja2.ext.do")
3462
extensions.append("jinja2.ext.loopcontrols")
35-
return Environment(extensions=extensions, **kwargs)
63+
env = Environment(extensions=extensions, **kwargs)
64+
env.filters.update(create_builtin_filters())
65+
return env
3666

3767

3868
ENVIRONMENT = environment()

tests/cli/test_cli.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -921,7 +921,7 @@ def test_dlt_filesystem_pipeline(tmp_path):
921921
FROM
922922
filesystem_pipeline_dataset.equipment as c
923923
WHERE
924-
TO_TIMESTAMP(CAST(c._dlt_load_id AS DOUBLE)) BETWEEN @start_ds AND @end_ds
924+
TO_TIMESTAMP(CAST(c._dlt_load_id AS DOUBLE)) BETWEEN @start_ts AND @end_ts
925925
"""
926926

927927
with open(equipment_model_path) as file:
@@ -1064,7 +1064,7 @@ def test_dlt_pipeline(runner, tmp_path):
10641064
FROM
10651065
sushi_dataset.sushi_types as c
10661066
WHERE
1067-
TO_TIMESTAMP(CAST(c._dlt_load_id AS DOUBLE)) BETWEEN @start_ds AND @end_ds
1067+
TO_TIMESTAMP(CAST(c._dlt_load_id AS DOUBLE)) BETWEEN @start_ts AND @end_ts
10681068
"""
10691069

10701070
dlt_sushi_types_model_path = tmp_path / "models/incremental_sushi_types.sql"
@@ -1095,7 +1095,7 @@ def test_dlt_pipeline(runner, tmp_path):
10951095
FROM
10961096
sushi_dataset._dlt_loads as c
10971097
WHERE
1098-
TO_TIMESTAMP(CAST(c.load_id AS DOUBLE)) BETWEEN @start_ds AND @end_ds
1098+
TO_TIMESTAMP(CAST(c.load_id AS DOUBLE)) BETWEEN @start_ts AND @end_ts
10991099
"""
11001100

11011101
with open(dlt_loads_model_path) as file:
@@ -1122,7 +1122,7 @@ def test_dlt_pipeline(runner, tmp_path):
11221122
ON
11231123
c._dlt_parent_id = p._dlt_id
11241124
WHERE
1125-
TO_TIMESTAMP(CAST(p._dlt_load_id AS DOUBLE)) BETWEEN @start_ds AND @end_ds
1125+
TO_TIMESTAMP(CAST(p._dlt_load_id AS DOUBLE)) BETWEEN @start_ts AND @end_ts
11261126
"""
11271127

11281128
with open(dlt_sushi_fillings_model_path) as file:

tests/core/engine_adapter/test_fabric.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -286,3 +286,23 @@ def test_merge_exists(
286286
f"MERGE INTO [target] AS [__MERGE_TARGET__] USING (SELECT CAST([id] AS INT) AS [id], CAST([ts] AS DATETIME2(6)) AS [ts] FROM [__temp_target_{temp_table_id}]) AS [__MERGE_SOURCE__] ON [__MERGE_TARGET__].[id] = [__MERGE_SOURCE__].[id] AND [__MERGE_TARGET__].[ts] = [__MERGE_SOURCE__].[ts] WHEN NOT MATCHED THEN INSERT ([id], [ts]) VALUES ([__MERGE_SOURCE__].[id], [__MERGE_SOURCE__].[ts]);",
287287
f"DROP TABLE IF EXISTS [__temp_target_{temp_table_id}];",
288288
]
289+
290+
291+
def test_comments(make_mocked_engine_adapter: t.Callable, mocker: MockerFixture):
292+
adapter = make_mocked_engine_adapter(FabricEngineAdapter)
293+
comment = "\\"
294+
295+
create_table_comment_mock = mocker.patch.object(adapter, "_create_table_comment")
296+
create_column_comments_mock = mocker.patch.object(adapter, "_create_column_comments")
297+
mocker.patch.object(adapter, "_create_table")
298+
299+
adapter.create_table(
300+
"test_table",
301+
{"a": exp.DataType.build("INT"), "b": exp.DataType.build("INT")},
302+
table_description=comment,
303+
column_descriptions={"a": comment},
304+
)
305+
306+
create_table_comment_mock.assert_not_called()
307+
create_column_comments_mock.assert_not_called()
308+
assert to_sql_calls(adapter) == []

tests/core/engine_adapter/test_mssql.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1002,3 +1002,24 @@ def python_scd2_model(context, **kwargs):
10021002
snapshot: Snapshot = make_snapshot(m)
10031003
assert snapshot.node.physical_properties == m.physical_properties
10041004
assert snapshot.node.physical_properties.get("mssql_merge_exists")
1005+
1006+
1007+
def test_comments(make_mocked_engine_adapter: t.Callable, mocker: MockerFixture):
1008+
adapter = make_mocked_engine_adapter(MSSQLEngineAdapter)
1009+
table = exp.to_table("test_table")
1010+
comment = "\\"
1011+
1012+
mocker.patch.object(adapter, "_create_table")
1013+
1014+
adapter.create_table(
1015+
"test_table",
1016+
{"a": exp.DataType.build("INT"), "b": exp.DataType.build("INT")},
1017+
table_description=comment,
1018+
column_descriptions={"a": comment},
1019+
)
1020+
1021+
sql_calls = to_sql_calls(adapter)
1022+
assert sql_calls == [
1023+
adapter._build_create_comment_table_exp(table, comment),
1024+
adapter._build_create_comment_column_exp(table, "a", comment),
1025+
]

tests/core/test_dialect.py

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,60 @@ def test_format_model_expressions():
230230
1::Int32 AS bar"""
231231
)
232232

233+
x = format_model_expressions(
234+
parse(
235+
"""
236+
MODEL(name a.b, kind FULL, dialect tsql, allow_partials true);
237+
SELECT TRUE AS col, CAST(x AS INT) AS y FROM t
238+
"""
239+
),
240+
dialect="tsql",
241+
)
242+
# The MODEL header is SQLMesh DDL and must not be transpiled: a boolean property
243+
# such as `allow_partials true` must stay `TRUE`, not become tsql's `(1 = 1)`.
244+
# The query body must still transpile to the target dialect.
245+
assert (
246+
x
247+
== """MODEL (
248+
name a.b,
249+
kind FULL,
250+
dialect tsql,
251+
allow_partials TRUE
252+
);
253+
254+
SELECT
255+
1 AS col,
256+
x::INTEGER AS y
257+
FROM t"""
258+
)
259+
260+
x = format_model_expressions(
261+
parse(
262+
"""
263+
AUDIT(name my_audit, dialect tsql, blocking false);
264+
SELECT TRUE AS col, CAST(x AS INT) AS y FROM t WHERE x > 0
265+
"""
266+
),
267+
dialect="tsql",
268+
)
269+
# AUDIT headers are SQLMesh DDL too: a `false` boolean property must stay
270+
# `FALSE`, not become tsql's `(1 = 0)`, while the query body still transpiles.
271+
assert (
272+
x
273+
== """AUDIT (
274+
name my_audit,
275+
dialect tsql,
276+
blocking FALSE
277+
);
278+
279+
SELECT
280+
1 AS col,
281+
x::INTEGER AS y
282+
FROM t
283+
WHERE
284+
x > 0"""
285+
)
286+
233287
x = format_model_expressions(
234288
parse(
235289
"""

0 commit comments

Comments
 (0)