Skip to content

Commit 5d70aa4

Browse files
committed
fix(janitor): scope snapshot cleanup for invalidated envs
Signed-off-by: mday-io <mdaytn@gmail.com>
1 parent 117e6f1 commit 5d70aa4

7 files changed

Lines changed: 218 additions & 125 deletions

File tree

docs/reference/cli.md

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -294,11 +294,15 @@ Usage: sqlmesh invalidate [OPTIONS] ENVIRONMENT
294294
of the janitor process.
295295
296296
Options:
297-
-s, --sync Wait for the environment to be deleted before returning. If not
298-
specified, the environment will be deleted asynchronously by the
299-
janitor process. This option requires a connection to the data
300-
warehouse.
301-
--help Show this message and exit.
297+
-s, --sync Wait for the environment to be deleted before returning.
298+
If not specified, the environment will be deleted
299+
asynchronously by the janitor process. This option
300+
requires a connection to the data warehouse.
301+
--cleanup-snapshots
302+
After invalidating, synchronously delete unreferenced
303+
physical snapshot tables formerly referenced by this
304+
environment.
305+
--help Show this message and exit.
302306
```
303307

304308
## janitor
@@ -313,14 +317,16 @@ Usage: sqlmesh janitor [OPTIONS]
313317
Options:
314318
--ignore-ttl Cleanup snapshots that are not referenced in any
315319
environment, regardless of when they're set to expire. Has
316-
no effect when --environment is specified.
320+
When --environment is specified, cleanup is scoped to
321+
snapshots formerly referenced by that environment.
317322
--force-delete Delete expired environment and snapshot state records even
318323
when the physical table or view drops fail. Any objects
319324
that could not be dropped become orphaned and must be
320325
removed manually.
321326
-e, --environment TEXT
322-
Scope cleanup to a single expired environment. Global
323-
snapshot and interval compaction are skipped.
327+
Scope cleanup to a single expired environment. With
328+
--ignore-ttl, snapshot cleanup is scoped to snapshots
329+
formerly referenced by this environment.
324330
--help Show this message and exit.
325331
```
326332

sqlmesh/cli/main.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -623,7 +623,7 @@ def run(ctx: click.Context, environment: t.Optional[str] = None, **kwargs: t.Any
623623
@click.option(
624624
"--cleanup-snapshots",
625625
is_flag=True,
626-
help="After invalidating, immediately delete physical snapshot tables that are exclusively owned by this environment (not referenced by any other environment). Cleanup runs synchronously regardless of --sync.",
626+
help="After invalidating, synchronously delete unreferenced physical snapshot tables formerly referenced by this environment.",
627627
)
628628
@click.pass_context
629629
@error_handler
@@ -638,7 +638,7 @@ def invalidate(ctx: click.Context, environment: str, **kwargs: t.Any) -> None:
638638
@click.option(
639639
"--ignore-ttl",
640640
is_flag=True,
641-
help="Cleanup snapshots that are not referenced in any environment, regardless of when they're set to expire. Has no effect when --environment is specified.",
641+
help="Cleanup snapshots that are not referenced in any environment, regardless of when they're set to expire. When --environment is specified, cleanup is scoped to snapshots formerly referenced by that environment.",
642642
)
643643
@click.option(
644644
"--force-delete",
@@ -650,7 +650,7 @@ def invalidate(ctx: click.Context, environment: str, **kwargs: t.Any) -> None:
650650
"--environment",
651651
"-e",
652652
default=None,
653-
help="Scope cleanup to a single expired environment. Global snapshot and interval compaction are skipped.",
653+
help="Scope cleanup to a single expired environment. With --ignore-ttl, snapshot cleanup is scoped to snapshots formerly referenced by this environment.",
654654
)
655655
@click.pass_context
656656
@error_handler

sqlmesh/core/context.py

Lines changed: 35 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,6 @@
112112
from sqlmesh.core.janitor import (
113113
cleanup_expired_views,
114114
delete_expired_snapshots,
115-
delete_snapshots_for_environment,
116115
)
117116
from sqlmesh.core.table_diff import TableDiff
118117
from sqlmesh.core.test import (
@@ -1861,41 +1860,16 @@ def invalidate_environment(
18611860
name: The name of the environment to invalidate.
18621861
sync: If True, the call blocks until the environment is deleted. Otherwise, the environment will
18631862
be deleted asynchronously by the janitor process.
1864-
cleanup_snapshots: If True, immediately deletes physical snapshot tables that are exclusively
1865-
owned by this environment (not referenced by any other environment). Cleanup runs
1866-
synchronously regardless of --sync.
1863+
cleanup_snapshots: If True, immediately deletes unreferenced physical snapshot tables that were
1864+
formerly referenced by this environment. Cleanup runs synchronously regardless of sync.
18671865
"""
18681866
name = Environment.sanitize_name(name)
1869-
sync = sync or cleanup_snapshots
1870-
1871-
target_snapshot_ids: t.Set[SnapshotId] = set()
1872-
if cleanup_snapshots:
1873-
# Capture snapshot IDs before invalidation so we can scope the cleanup afterwards.
1874-
env = self.state_sync.get_environment(name)
1875-
if env is None:
1876-
logger.warning("Environment '%s' does not exist; skipping snapshot cleanup.", name)
1877-
return
1878-
target_snapshot_ids = {s.snapshot_id for s in env.snapshots}
1879-
18801867
self.state_sync.invalidate_environment(name)
1881-
1882-
if sync:
1868+
if cleanup_snapshots:
1869+
self._run_janitor(ignore_ttl=True, environment=name)
1870+
self.console.log_success(f"Environment '{name}' deleted.")
1871+
elif sync:
18831872
self._cleanup_environments(name=name)
1884-
if cleanup_snapshots and target_snapshot_ids:
1885-
failures = delete_snapshots_for_environment(
1886-
self.state_sync,
1887-
self.snapshot_evaluator,
1888-
target_snapshot_ids,
1889-
console=self.console,
1890-
)
1891-
if failures:
1892-
summary = "\n".join(failures)
1893-
if self.config.janitor.warn_on_delete_failure:
1894-
self.console.log_warning(
1895-
f"Snapshot cleanup completed with failures:\n{summary}"
1896-
)
1897-
else:
1898-
raise SQLMeshError(f"Snapshot cleanup completed with failures:\n{summary}")
18991873
self.console.log_success(f"Environment '{name}' deleted.")
19001874
else:
19011875
self.console.log_success(f"Environment '{name}' invalidated.")
@@ -3030,6 +3004,16 @@ def _run_janitor(
30303004
current_ts = now_timestamp()
30313005
failures: t.List[str] = []
30323006

3007+
target_snapshot_ids: t.Set[SnapshotId] = set()
3008+
if environment is not None and ignore_ttl:
3009+
expired_environments = self.state_sync.get_expired_environments(
3010+
current_ts=current_ts, name=environment
3011+
)
3012+
if expired_environments:
3013+
expired_env = self.state_reader.get_environment(expired_environments[0].name)
3014+
if expired_env:
3015+
target_snapshot_ids = {s.snapshot_id for s in expired_env.snapshots}
3016+
30333017
# Clean up expired environments by removing their views and schemas
30343018
failures.extend(
30353019
self._cleanup_environments(
@@ -3050,6 +3034,25 @@ def _run_janitor(
30503034
)
30513035
)
30523036
self.state_sync.compact_intervals()
3037+
elif ignore_ttl and target_snapshot_ids and not self.state_reader.get_environment(
3038+
environment
3039+
):
3040+
self.console.log_warning(
3041+
"Scoped snapshot cleanup will permanently delete unreferenced physical snapshot "
3042+
f"tables formerly referenced by environment '{environment}'."
3043+
)
3044+
failures.extend(
3045+
delete_expired_snapshots(
3046+
self.state_sync,
3047+
self.snapshot_evaluator,
3048+
current_ts=current_ts,
3049+
ignore_ttl=ignore_ttl,
3050+
force_delete=force_delete,
3051+
console=self.console,
3052+
batch_size=self.config.janitor.expired_snapshots_batch_size,
3053+
target_snapshot_ids=target_snapshot_ids,
3054+
)
3055+
)
30533056

30543057
if failures:
30553058
failure_string = "\n - ".join(failures)

sqlmesh/core/janitor.py

Lines changed: 4 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from sqlmesh.core.console import Console
99
from sqlmesh.core.dialect import schema_
1010
from sqlmesh.core.environment import Environment
11-
from sqlmesh.core.snapshot import SnapshotEvaluator, SnapshotId
11+
from sqlmesh.core.snapshot import SnapshotEvaluator, SnapshotIdLike
1212
from sqlmesh.core.state_sync import StateSync
1313
from sqlmesh.core.state_sync.common import (
1414
logger,
@@ -127,6 +127,7 @@ def delete_expired_snapshots(
127127
ignore_ttl: bool = False,
128128
force_delete: bool = False,
129129
batch_size: t.Optional[int] = None,
130+
target_snapshot_ids: t.Optional[t.Collection[SnapshotIdLike]] = None,
130131
console: t.Optional[Console] = None,
131132
) -> t.List[str]:
132133
"""Delete all expired snapshots in batches.
@@ -153,6 +154,7 @@ def delete_expired_snapshots(
153154
current_ts=current_ts,
154155
ignore_ttl=ignore_ttl,
155156
batch_size=batch_size,
157+
target_snapshot_ids=target_snapshot_ids,
156158
):
157159
end_info = (
158160
f"updated_ts={batch.batch_range.end.updated_ts}"
@@ -184,6 +186,7 @@ def delete_expired_snapshots(
184186
end=batch.batch_range.end,
185187
),
186188
ignore_ttl=ignore_ttl,
189+
target_snapshot_ids=target_snapshot_ids,
187190
)
188191
logger.info("Cleaned up expired snapshots batch")
189192
num_expired_snapshots += len(batch.expired_snapshot_ids)
@@ -193,72 +196,3 @@ def delete_expired_snapshots(
193196
failures.append(message)
194197
logger.info("Cleaned up %s expired snapshots", num_expired_snapshots)
195198
return failures
196-
197-
198-
def delete_snapshots_for_environment(
199-
state_sync: StateSync,
200-
snapshot_evaluator: SnapshotEvaluator,
201-
target_snapshot_ids: t.Collection[SnapshotId],
202-
*,
203-
force_delete: bool = False,
204-
console: t.Optional[Console] = None,
205-
) -> t.List[str]:
206-
"""Delete snapshots that are exclusively owned by a specific (now-deleted) environment.
207-
208-
This performs a scoped cleanup: only the provided snapshot IDs are considered for deletion,
209-
and only those that are not referenced by any remaining active environment will be removed.
210-
211-
Args:
212-
state_sync: StateSync instance to query and delete snapshot state from.
213-
snapshot_evaluator: SnapshotEvaluator instance to clean up physical tables.
214-
target_snapshot_ids: The snapshot IDs to consider for deletion (typically from the
215-
environment that was just invalidated/deleted).
216-
force_delete: If True, delete snapshot state records even when physical table cleanup fails.
217-
console: Optional console for reporting progress.
218-
219-
Returns:
220-
List of failure messages encountered during cleanup.
221-
"""
222-
if not target_snapshot_ids:
223-
return []
224-
225-
failures: t.List[str] = []
226-
batch = state_sync.get_expired_snapshots(
227-
ignore_ttl=True,
228-
batch_range=ExpiredBatchRange.all_batch_range(),
229-
target_snapshot_ids=target_snapshot_ids,
230-
)
231-
if batch is None:
232-
return failures
233-
234-
logger.info(
235-
"Cleaning up %s snapshots exclusively owned by invalidated environment",
236-
len(batch.expired_snapshot_ids),
237-
)
238-
239-
cleanup_succeeded = True
240-
if batch.cleanup_tasks:
241-
try:
242-
snapshot_evaluator.cleanup(
243-
target_snapshots=batch.cleanup_tasks,
244-
on_complete=console.update_cleanup_progress if console else None,
245-
)
246-
except Exception as failed_drops:
247-
message = f"Failed to clean up: {failed_drops}"
248-
logger.warning(message)
249-
failures.append(message)
250-
cleanup_succeeded = False
251-
252-
if cleanup_succeeded or force_delete:
253-
try:
254-
state_sync.delete_snapshots(batch.expired_snapshot_ids)
255-
logger.info(
256-
"Cleaned up %s snapshots from invalidated environment",
257-
len(batch.expired_snapshot_ids),
258-
)
259-
except Exception as e:
260-
message = f"Failed to delete snapshot state records: {e}"
261-
logger.warning(message)
262-
failures.append(message)
263-
264-
return failures

sqlmesh/core/state_sync/common.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
from sqlmesh.core.snapshot import (
1717
Snapshot,
1818
SnapshotId,
19+
SnapshotIdLike,
1920
SnapshotTableCleanupTask,
2021
SnapshotTableInfo,
2122
)
@@ -288,6 +289,7 @@ def iter_expired_snapshot_batches(
288289
current_ts: int,
289290
ignore_ttl: bool = False,
290291
batch_size: t.Optional[int] = None,
292+
target_snapshot_ids: t.Optional[t.Collection[SnapshotIdLike]] = None,
291293
) -> t.Iterator[ExpiredSnapshotBatch]:
292294
"""Yields expired snapshot batches.
293295
@@ -306,6 +308,7 @@ def iter_expired_snapshot_batches(
306308
current_ts=current_ts,
307309
ignore_ttl=ignore_ttl,
308310
batch_range=batch_range,
311+
target_snapshot_ids=target_snapshot_ids,
309312
)
310313

311314
if batch is None:

tests/core/integration/test_aux_commands.py

Lines changed: 58 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -482,7 +482,7 @@ def test_invalidating_environment(sushi_context: Context):
482482

483483

484484
def test_invalidate_environment_cleanup_snapshots_scoped(tmp_path: Path):
485-
"""Test that --cleanup-snapshots only deletes snapshots exclusively owned by the invalidated env."""
485+
"""Test that --cleanup-snapshots only deletes unreferenced snapshots from the invalidated env."""
486486
models_dir = tmp_path / "models"
487487
models_dir.mkdir()
488488
(models_dir / "model1.sql").write_text("MODEL(name test.model1, kind FULL); SELECT 1 AS col")
@@ -522,8 +522,9 @@ def test_invalidate_environment_cleanup_snapshots_scoped(tmp_path: Path):
522522
assert ctx.state_sync.get_environment("prod") is not None
523523

524524

525-
def test_invalidate_environment_cleanup_snapshots_exclusive(tmp_path: Path):
526-
"""Test that --cleanup-snapshots deletes snapshots exclusively owned by the invalidated env."""
525+
def test_invalidate_environment_cleanup_snapshots_warns_and_drops_physical_tables(
526+
tmp_path: Path, mocker: MockerFixture
527+
):
527528
models_dir = tmp_path / "models"
528529
models_dir.mkdir()
529530
(models_dir / "model1.sql").write_text("MODEL(name test.model1, kind FULL); SELECT 1 AS col")
@@ -533,22 +534,67 @@ def test_invalidate_environment_cleanup_snapshots_exclusive(tmp_path: Path):
533534
config=Config(model_defaults=ModelDefaultsConfig(dialect="duckdb")),
534535
)
535536

536-
# Apply model1 to dev only (not prod). These snapshots will be exclusively owned by dev.
537537
ctx.plan("dev", no_prompts=True, auto_apply=True)
538+
snapshot = ctx.get_snapshot("test.model1")
539+
assert snapshot is not None
540+
snapshot_ids = [snapshot.snapshot_id]
541+
physical_table_names = [
542+
snapshot.table_name(is_deployable=False),
543+
snapshot.table_name(is_deployable=True),
544+
]
545+
assert any(ctx.engine_adapter.table_exists(table_name) for table_name in physical_table_names)
538546

539-
dev_env = ctx.state_sync.get_environment("dev")
540-
assert dev_env is not None
541-
dev_snapshot_ids = {s.snapshot_id for s in dev_env.snapshots}
542-
assert dev_snapshot_ids
547+
warning_mock = mocker.patch.object(ctx.console, "log_warning")
543548

544549
ctx.invalidate_environment("dev", cleanup_snapshots=True)
545550

546-
# The dev environment record should be gone.
551+
warning_mock.assert_any_call(
552+
"Scoped snapshot cleanup will permanently delete unreferenced physical snapshot tables "
553+
"formerly referenced by environment 'dev'."
554+
)
547555
assert ctx.state_sync.get_environment("dev") is None
556+
assert not ctx.state_sync.get_snapshots(snapshot_ids)
557+
assert not any(ctx.engine_adapter.table_exists(table_name) for table_name in physical_table_names)
558+
559+
560+
def test_janitor_environment_ignore_ttl_cleans_only_scoped_snapshots(
561+
tmp_path: Path, mocker: MockerFixture
562+
):
563+
models_dir = tmp_path / "models"
564+
models_dir.mkdir()
565+
model_path = models_dir / "model1.sql"
566+
model_path.write_text("MODEL(name test.model1, kind FULL); SELECT 1 AS col")
548567

549-
# All dev-exclusive snapshots should have been deleted.
550-
remaining_snapshots = ctx.state_sync.get_snapshots(list(dev_snapshot_ids))
551-
assert not remaining_snapshots
568+
ctx = Context(
569+
paths=[tmp_path],
570+
config=Config(model_defaults=ModelDefaultsConfig(dialect="duckdb")),
571+
)
572+
573+
ctx.plan("dev_a", no_prompts=True, auto_apply=True)
574+
dev_a_snapshot = ctx.get_snapshot("test.model1")
575+
assert dev_a_snapshot is not None
576+
577+
model_path.write_text("MODEL(name test.model1, kind FULL); SELECT 2 AS col")
578+
ctx.load()
579+
ctx.plan("dev_b", no_prompts=True, auto_apply=True)
580+
dev_b_snapshot = ctx.get_snapshot("test.model1")
581+
assert dev_b_snapshot is not None
582+
assert dev_a_snapshot.snapshot_id != dev_b_snapshot.snapshot_id
583+
584+
ctx.invalidate_environment("dev_a")
585+
ctx.invalidate_environment("dev_b")
586+
warning_mock = mocker.patch.object(ctx.console, "log_warning")
587+
588+
ctx.run_janitor(ignore_ttl=True, environment="dev_a")
589+
590+
warning_mock.assert_any_call(
591+
"Scoped snapshot cleanup will permanently delete unreferenced physical snapshot tables "
592+
"formerly referenced by environment 'dev_a'."
593+
)
594+
assert ctx.state_sync.get_environment("dev_a") is None
595+
assert ctx.state_sync.get_environment("dev_b") is not None
596+
assert not ctx.state_sync.get_snapshots([dev_a_snapshot.snapshot_id])
597+
assert ctx.state_sync.get_snapshots([dev_b_snapshot.snapshot_id])
552598

553599

554600
@time_machine.travel("2023-01-08 15:00:00 UTC")

0 commit comments

Comments
 (0)