Skip to content
Open
22 changes: 14 additions & 8 deletions docs/reference/cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -294,11 +294,15 @@ Usage: sqlmesh invalidate [OPTIONS] ENVIRONMENT
of the janitor process.

Options:
-s, --sync Wait for the environment to be deleted before returning. If not
specified, the environment will be deleted asynchronously by the
janitor process. This option requires a connection to the data
warehouse.
--help Show this message and exit.
-s, --sync Wait for the environment to be deleted before returning.
If not specified, the environment will be deleted
asynchronously by the janitor process. This option
requires a connection to the data warehouse.
--cleanup-snapshots
After invalidating, synchronously delete unreferenced
physical snapshot tables formerly referenced by this
environment.
--help Show this message and exit.
```

## janitor
Expand All @@ -313,14 +317,16 @@ Usage: sqlmesh janitor [OPTIONS]
Options:
--ignore-ttl Cleanup snapshots that are not referenced in any
environment, regardless of when they're set to expire. Has
no effect when --environment is specified.
When --environment is specified, cleanup is scoped to
snapshots formerly referenced by that environment.
--force-delete Delete expired environment and snapshot state records even
when the physical table or view drops fail. Any objects
that could not be dropped become orphaned and must be
removed manually.
-e, --environment TEXT
Scope cleanup to a single expired environment. Global
snapshot and interval compaction are skipped.
Scope cleanup to a single expired environment. With
--ignore-ttl, snapshot cleanup is scoped to snapshots
formerly referenced by this environment.
--help Show this message and exit.
```

Expand Down
9 changes: 7 additions & 2 deletions sqlmesh/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,11 @@ def run(ctx: click.Context, environment: t.Optional[str] = None, **kwargs: t.Any
is_flag=True,
help="Wait for the environment to be deleted before returning. If not specified, the environment will be deleted asynchronously by the janitor process. This option requires a connection to the data warehouse.",
)
@click.option(
"--cleanup-snapshots",
is_flag=True,
help="After invalidating, synchronously delete unreferenced physical snapshot tables formerly referenced by this environment.",
)
@click.pass_context
@error_handler
@cli_analytics
Expand All @@ -633,7 +638,7 @@ def invalidate(ctx: click.Context, environment: str, **kwargs: t.Any) -> None:
@click.option(
"--ignore-ttl",
is_flag=True,
help="Cleanup snapshots that are not referenced in any environment, regardless of when they're set to expire. Has no effect when --environment is specified.",
help="Cleanup snapshots that are not referenced in any environment, regardless of when they're set to expire. When --environment is specified, cleanup is scoped to snapshots formerly referenced by that environment.",
)
@click.option(
"--force-delete",
Expand All @@ -645,7 +650,7 @@ def invalidate(ctx: click.Context, environment: str, **kwargs: t.Any) -> None:
"--environment",
"-e",
default=None,
help="Scope cleanup to a single expired environment. Global snapshot and interval compaction are skipped.",
help="Scope cleanup to a single expired environment. With --ignore-ttl, snapshot cleanup is scoped to snapshots formerly referenced by this environment.",
)
@click.pass_context
@error_handler
Expand Down
48 changes: 45 additions & 3 deletions sqlmesh/core/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@
Snapshot,
SnapshotEvaluator,
SnapshotFingerprint,
SnapshotId,
missing_intervals,
to_table_mapping,
)
Expand All @@ -108,7 +109,10 @@
StateReader,
StateSync,
)
from sqlmesh.core.janitor import cleanup_expired_views, delete_expired_snapshots
from sqlmesh.core.janitor import (
cleanup_expired_views,
delete_expired_snapshots,
)
from sqlmesh.core.table_diff import TableDiff
from sqlmesh.core.test import (
ModelTextTestResult,
Expand Down Expand Up @@ -1847,17 +1851,24 @@ def apply(
)

@python_api_analytics
def invalidate_environment(self, name: str, sync: bool = False) -> None:
def invalidate_environment(
self, name: str, sync: bool = False, cleanup_snapshots: bool = False
) -> None:
"""Invalidates the target environment by setting its expiration timestamp to now.

Args:
name: The name of the environment to invalidate.
sync: If True, the call blocks until the environment is deleted. Otherwise, the environment will
be deleted asynchronously by the janitor process.
cleanup_snapshots: If True, immediately deletes unreferenced physical snapshot tables that were
formerly referenced by this environment. Cleanup runs synchronously regardless of sync.
"""
name = Environment.sanitize_name(name)
self.state_sync.invalidate_environment(name)
if sync:
if cleanup_snapshots:
self._run_janitor(ignore_ttl=True, environment=name)
self.console.log_success(f"Environment '{name}' deleted.")
elif sync:
self._cleanup_environments(name=name)
self.console.log_success(f"Environment '{name}' deleted.")
else:
Expand Down Expand Up @@ -2993,6 +3004,16 @@ def _run_janitor(
current_ts = now_timestamp()
failures: t.List[str] = []

target_snapshot_ids: t.Set[SnapshotId] = set()
if environment is not None and ignore_ttl:
expired_environments = self.state_sync.get_expired_environments(
current_ts=current_ts, name=environment
)
if expired_environments:
expired_env = self.state_reader.get_environment(expired_environments[0].name)
if expired_env:
target_snapshot_ids = {s.snapshot_id for s in expired_env.snapshots}

# Clean up expired environments by removing their views and schemas
failures.extend(
self._cleanup_environments(
Expand All @@ -3013,6 +3034,27 @@ def _run_janitor(
)
)
self.state_sync.compact_intervals()
elif (
ignore_ttl
and target_snapshot_ids
and not self.state_reader.get_environment(environment)
):
self.console.log_warning(
"Scoped snapshot cleanup will permanently delete unreferenced physical snapshot "
f"tables formerly referenced by environment '{environment}'."
)
failures.extend(
delete_expired_snapshots(
self.state_sync,
self.snapshot_evaluator,
current_ts=current_ts,
ignore_ttl=ignore_ttl,
force_delete=force_delete,
console=self.console,
batch_size=self.config.janitor.expired_snapshots_batch_size,
target_snapshot_ids=target_snapshot_ids,
)
)

if failures:
failure_string = "\n - ".join(failures)
Expand Down
5 changes: 4 additions & 1 deletion sqlmesh/core/janitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from sqlmesh.core.console import Console
from sqlmesh.core.dialect import schema_
from sqlmesh.core.environment import Environment
from sqlmesh.core.snapshot import SnapshotEvaluator
from sqlmesh.core.snapshot import SnapshotEvaluator, SnapshotIdLike
from sqlmesh.core.state_sync import StateSync
from sqlmesh.core.state_sync.common import (
logger,
Expand Down Expand Up @@ -127,6 +127,7 @@ def delete_expired_snapshots(
ignore_ttl: bool = False,
force_delete: bool = False,
batch_size: t.Optional[int] = None,
target_snapshot_ids: t.Optional[t.Collection[SnapshotIdLike]] = None,
console: t.Optional[Console] = None,
) -> t.List[str]:
"""Delete all expired snapshots in batches.
Expand All @@ -153,6 +154,7 @@ def delete_expired_snapshots(
current_ts=current_ts,
ignore_ttl=ignore_ttl,
batch_size=batch_size,
target_snapshot_ids=target_snapshot_ids,
):
end_info = (
f"updated_ts={batch.batch_range.end.updated_ts}"
Expand Down Expand Up @@ -184,6 +186,7 @@ def delete_expired_snapshots(
end=batch.batch_range.end,
),
ignore_ttl=ignore_ttl,
target_snapshot_ids=target_snapshot_ids,
)
logger.info("Cleaned up expired snapshots batch")
num_expired_snapshots += len(batch.expired_snapshot_ids)
Expand Down
6 changes: 6 additions & 0 deletions sqlmesh/core/state_sync/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,13 +308,16 @@ def get_expired_snapshots(
batch_range: ExpiredBatchRange,
current_ts: t.Optional[int] = None,
ignore_ttl: bool = False,
target_snapshot_ids: t.Optional[t.Collection[SnapshotIdLike]] = None,
) -> t.Optional[ExpiredSnapshotBatch]:
"""Returns a single batch of expired snapshots ordered by (updated_ts, name, identifier).
Args:
current_ts: Timestamp used to evaluate expiration.
ignore_ttl: If True, include snapshots regardless of TTL (only checks if unreferenced).
batch_range: The range of the batch to fetch.
target_snapshot_ids: If provided, only consider snapshots with these IDs. Useful for
scoped cleanup after environment invalidation.
Returns:
A batch describing expired snapshots or None if no snapshots are pending cleanup.
Expand Down Expand Up @@ -368,6 +371,7 @@ def delete_expired_snapshots(
batch_range: ExpiredBatchRange,
ignore_ttl: bool = False,
current_ts: t.Optional[int] = None,
target_snapshot_ids: t.Optional[t.Collection[SnapshotIdLike]] = None,
) -> None:
"""Removes expired snapshots.
Expand All @@ -379,6 +383,8 @@ def delete_expired_snapshots(
ignore_ttl: Ignore the TTL on the snapshot when considering it expired. This has the effect of deleting
all snapshots that are not referenced in any environment
current_ts: Timestamp used to evaluate expiration.
target_snapshot_ids: If provided, only delete snapshots with these IDs. Useful for
scoped cleanup after environment invalidation.
"""

@abc.abstractmethod
Expand Down
2 changes: 2 additions & 0 deletions sqlmesh/core/state_sync/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,14 @@ def delete_expired_snapshots(
batch_range: ExpiredBatchRange,
ignore_ttl: bool = False,
current_ts: t.Optional[int] = None,
target_snapshot_ids: t.Optional[t.Collection[SnapshotIdLike]] = None,
) -> None:
self.snapshot_cache.clear()
self.state_sync.delete_expired_snapshots(
batch_range=batch_range,
ignore_ttl=ignore_ttl,
current_ts=current_ts,
target_snapshot_ids=target_snapshot_ids,
)

def add_snapshots_intervals(self, snapshots_intervals: t.Sequence[SnapshotIntervals]) -> None:
Expand Down
3 changes: 3 additions & 0 deletions sqlmesh/core/state_sync/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from sqlmesh.core.snapshot import (
Snapshot,
SnapshotId,
SnapshotIdLike,
SnapshotTableCleanupTask,
SnapshotTableInfo,
)
Expand Down Expand Up @@ -288,6 +289,7 @@ def iter_expired_snapshot_batches(
current_ts: int,
ignore_ttl: bool = False,
batch_size: t.Optional[int] = None,
target_snapshot_ids: t.Optional[t.Collection[SnapshotIdLike]] = None,
) -> t.Iterator[ExpiredSnapshotBatch]:
"""Yields expired snapshot batches.
Expand All @@ -306,6 +308,7 @@ def iter_expired_snapshot_batches(
current_ts=current_ts,
ignore_ttl=ignore_ttl,
batch_range=batch_range,
target_snapshot_ids=target_snapshot_ids,
)

if batch is None:
Expand Down
4 changes: 4 additions & 0 deletions sqlmesh/core/state_sync/db/facade.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,13 +267,15 @@ def get_expired_snapshots(
batch_range: ExpiredBatchRange,
current_ts: t.Optional[int] = None,
ignore_ttl: bool = False,
target_snapshot_ids: t.Optional[t.Collection[SnapshotIdLike]] = None,
) -> t.Optional[ExpiredSnapshotBatch]:
current_ts = current_ts or now_timestamp()
return self.snapshot_state.get_expired_snapshots(
environments=self.environment_state.get_environments(),
current_ts=current_ts,
ignore_ttl=ignore_ttl,
batch_range=batch_range,
target_snapshot_ids=target_snapshot_ids,
)

def get_expired_environments(
Expand All @@ -287,11 +289,13 @@ def delete_expired_snapshots(
batch_range: ExpiredBatchRange,
ignore_ttl: bool = False,
current_ts: t.Optional[int] = None,
target_snapshot_ids: t.Optional[t.Collection[SnapshotIdLike]] = None,
) -> None:
batch = self.get_expired_snapshots(
ignore_ttl=ignore_ttl,
current_ts=current_ts,
batch_range=batch_range,
target_snapshot_ids=target_snapshot_ids,
)
if batch and batch.expired_snapshot_ids:
self.snapshot_state.delete_snapshots(batch.expired_snapshot_ids)
Expand Down
11 changes: 11 additions & 0 deletions sqlmesh/core/state_sync/db/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ def get_expired_snapshots(
current_ts: int,
ignore_ttl: bool,
batch_range: ExpiredBatchRange,
target_snapshot_ids: t.Optional[t.Collection[SnapshotIdLike]] = None,
) -> t.Optional[ExpiredSnapshotBatch]:
expired_query = exp.select("name", "identifier", "version", "updated_ts").from_(
self.snapshots_table
Expand All @@ -180,6 +181,16 @@ def get_expired_snapshots(
(exp.column("updated_ts") + exp.column("ttl_ms")) <= current_ts
)

if target_snapshot_ids is not None:
target_conditions = list(
snapshot_id_filter(
self.engine_adapter,
target_snapshot_ids,
batch_size=self.SNAPSHOT_BATCH_SIZE,
)
)
expired_query = expired_query.where(exp.or_(*target_conditions))

expired_query = expired_query.where(batch_range.where_filter)

promoted_snapshot_ids = {
Expand Down
Loading
Loading