Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/copilot-instructions.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ uv sync --extra dev # create .venv and install dev d
.scripts/run.sh integration-test # requires ADLA + az login + .env
.scripts/run.sh lint # ruff check + format --check
.scripts/run.sh fix # ruff auto-fix + format
.scripts/run.sh upload # build and upload the adapter to a storage account

# Single test file or test
uv run pytest tests/unit/test_script_builder.py -v
Expand Down
2 changes: 1 addition & 1 deletion .scripts/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ run_build() {
}

run_upload() {
write_step "upload: Building wheel and uploading to static site"
write_step "upload: Building wheel and uploading to static storage"
run_build
local dist_dir="${PROJECT_DIR}/dist"
local whl
Expand Down
36 changes: 36 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,9 @@ my_project:
| `job_timeout_seconds` | `36000` | Max seconds to wait for a SCOPE job before timing out |
| `max_files_per_trigger` | `50` | Default max files per SCOPE job (overridable per-model) |
| `max_bytes_per_trigger` | `10737418240000` (10 TB) | Default max estimated bytes per batch (overridable per-model) |
| `max_file_count_per_output_file_set` | `5000` | SCOPE `@@MaxFileCountPerOutputFileSet` (overridable per-model) |
| `cancel_jobs_on_shutdown` | `true` | Cancel in-flight ADLA jobs on SIGINT/SIGTERM |
| `wait_on_cancel_seconds` | `30` | Per-job wait for ADLA terminal state when cancelling on shutdown |
| `http_timeout_seconds` | `30` | HTTP request timeout for ADLA REST API calls |
| `http_retries` | `3` | Number of HTTP retries for transient errors (429, 5xx) |
| `scope_feature_previews` | `"EnableDeltaTableDynamicInsert:on"` | SCOPE feature preview flags (overridable per-model) |
Expand Down Expand Up @@ -386,6 +389,39 @@ Send `SIGTERM` or `SIGINT` (Ctrl+C) to the dbt process. The adapter finishes the

> **Note:** `processing_time` is only supported for `incremental` materializations.

## Graceful shutdown of ADLA jobs

When the dbt process receives `SIGINT` (Ctrl+C) or `SIGTERM`, the adapter:

1. Sets a shared shutdown flag so every in-flight `submit_and_wait` loop self-cancels its own SCOPE job.
2. Snapshots the process-wide registry of in-flight ADLA jobs and fans out parallel `CancelJob` REST calls — one worker per job, bounded at 32 threads.
3. **Waits for each cancelled job to reach a terminal `Ended` state** (typically a few seconds in ADLA) up to `wait_on_cancel_seconds` per job. Since cancels run in parallel, total wall-clock is `~wait_on_cancel_seconds` regardless of job count.

This is on by default. To opt out (e.g. in a CI environment where you'd rather let jobs run to completion):

```yaml
# profiles.yml
outputs:
dev:
type: scope
cancel_jobs_on_shutdown: false
```

To tune how long the adapter blocks waiting for ADLA to confirm each cancel:

```yaml
outputs:
dev:
type: scope
wait_on_cancel_seconds: 60 # default 30
```

**Caveats:**

- `SIGKILL` is uncatchable at the OS level — no Python handler can run. The existing `cancel_orphaned_jobs` cleanup (runs at the start of every new `dbt run` per model) is the safety net for that case: orphaned jobs from previous runs are cancelled before submitting a new one.
- On Windows, `SIGTERM` is not delivered the same way as on POSIX; `SIGINT` (Ctrl+C) works.
- Only jobs submitted by the **current** Python process are tracked — orphans from earlier `dbt run` invocations are handled by `cancel_orphaned_jobs`, not by this shutdown hook.

## Contributing

See [CONTRIBUTING.md](CONTRIBUTING.md).
2 changes: 2 additions & 0 deletions dbt/adapters/scope/_file_lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

# Well-known lock file for Azure CLI token serialization
AZ_CLI_TOKEN_LOCK = str(Path(tempfile.gettempdir()) / "dbt-scope-az-cli-token")
# Well-known lock file for custom (e.g. Fabric notebook / SNI) token credentials
FABRIC_TOKEN_LOCK = str(Path(tempfile.gettempdir()) / "dbt-scope-fabric-token")

# Default timeout for acquiring the lock (seconds). With several xdist workers
# all racing for the Azure CLI token at startup, contention is high.
Expand Down
171 changes: 150 additions & 21 deletions dbt/adapters/scope/adls_gen1_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,84 @@

from __future__ import annotations

import inspect
import logging
import re
import threading
import time
from concurrent.futures import FIRST_COMPLETED, Future, ThreadPoolExecutor, wait
from dataclasses import dataclass, field, replace
from datetime import datetime, timezone

import requests
from azure.core.credentials import TokenCredential
from azure.datalake.store import core as adls_core
from azure.identity import AzureCliCredential
from azure.identity import CredentialUnavailableError
from dbt.adapters.events.logging import AdapterLogger

from dbt.adapters.scope._file_lock import AZ_CLI_TOKEN_LOCK, FileLock
from dbt.adapters.scope.delta_lake import RetryPolicy
from dbt.adapters.scope.message_retry import MessageRetryPolicy, retry_on_message

log = AdapterLogger("scope")


_LEGACY_GEN1_SCOPE = "https://datalake.azure.net//.default"


class _LegacyDataLakeCredentialAdapter:
"""Bridge a modern ``azure.core.credentials.TokenCredential`` to the legacy
``azure.datalake.store.lib.DataLakeCredential`` ``signed_session()`` API.

Fabric notebook runtimes ship ``azure-datalake-store`` 0.0.5x preinstalled.
That version's ``DatalakeRESTInterface.__init__`` silently drops the
modern ``token_credential=`` kwarg and falls back to MSAL device-code
interactive auth — a hard-failure on any headless surface. This adapter
keeps the bundled wheel honest by exposing ``signed_session()`` on top
of our non-interactive credential, refreshing the bearer token a few
minutes before expiry. The 5-minute skew matches the legacy SDK's own
100-second slop window with extra headroom for long-running directory
walks.
"""

_REFRESH_LEAD_SECONDS = 300

def __init__(self, credential: TokenCredential, *, scope: str = _LEGACY_GEN1_SCOPE) -> None:
self._credential = credential
self._scope = scope
self._lock = threading.Lock()
self._access_token: str | None = None
self._expires_on: int = 0

def _refresh(self) -> None:
token = self._credential.get_token(self._scope)
self._access_token = token.token
self._expires_on = int(token.expires_on)

def signed_session(self) -> requests.Session:
with self._lock:
now = int(time.time())
if not self._access_token or now > self._expires_on - self._REFRESH_LEAD_SECONDS:
self._refresh()
bearer = self._access_token
session = requests.Session()
session.headers["Authorization"] = f"Bearer {bearer}"
return session

def refresh_token(self, authority: str | None = None) -> None:
with self._lock:
self._refresh()


def _legacy_gen1_sdk_in_use() -> bool:
"""Return True when the running ``AzureDLFileSystem.__init__`` predates the
1.x ``token_credential=`` kwarg and therefore needs the legacy adapter."""
try:
params = inspect.signature(adls_core.AzureDLFileSystem.__init__).parameters
except (TypeError, ValueError):
return False
return "token_credential" not in params


class _SuppressFileNotFound(logging.Filter):
"""Reject Azure SDK log records that report a 404 / FileNotFoundError.

Expand Down Expand Up @@ -80,14 +142,24 @@ def _list_one_dir(
fs: adls_core.AzureDLFileSystem,
dir_path: str,
depth: int,
*,
message_retry_policy: MessageRetryPolicy | None = None,
) -> tuple[list[dict], list[dict], str, int, float]:
"""List a single directory. Returns (files, subdirs, path, depth, elapsed_ms)."""
t0 = time.monotonic()
policy = message_retry_policy or MessageRetryPolicy.disabled()
try:
entries = fs.ls(dir_path, detail=True)
entries = retry_on_message(
lambda: fs.ls(dir_path, detail=True),
policy=policy,
label=f"gen1.ls {dir_path}",
)
except FileNotFoundError:
log.debug(f"Path not found (skipping): {dir_path}")
return [], [], dir_path, depth, (time.monotonic() - t0) * 1000
except CredentialUnavailableError:
log.error(f"_list_directory: credential acquisition exhausted for {dir_path}")
raise
except Exception:
log.warning(f"Failed to list {dir_path} (skipping)")
return [], [], dir_path, depth, (time.monotonic() - t0) * 1000
Expand All @@ -105,23 +177,50 @@ def __init__(
self,
account: str,
*,
lock_file: str = AZ_CLI_TOKEN_LOCK,
credential: TokenCredential | None = None,
retry_policy: RetryPolicy | None = None,
message_retry_policy: MessageRetryPolicy | None = None,
) -> None:
self._account = account
self._lock_file = lock_file
self._credential = credential
self._retry_policy = retry_policy
self._message_retry_policy = message_retry_policy or MessageRetryPolicy.disabled()
self._fs: adls_core.AzureDLFileSystem | None = None
self._file_cache: dict[tuple[str, str | None], list[FileInfo]] = {}
self._enrichment_cache: dict[str, tuple[int, tuple[str, ...]]] = {}

def _retry(self, op, *, label: str):
return retry_on_message(op, policy=self._message_retry_policy, label=label)

def _get_fs(self) -> adls_core.AzureDLFileSystem:
"""Lazily initialize the ADLS Gen1 filesystem client."""
"""Lazily initialize the ADLS Gen1 filesystem client.

On Fabric notebook runtimes the preinstalled ``azure-datalake-store``
is the 0.0.5x line, whose ``DatalakeRESTInterface.__init__`` silently
ignores ``token_credential=`` and falls back to MSAL device-code
interactive auth. We detect that signature mismatch and route
through :class:`_LegacyDataLakeCredentialAdapter` instead.
"""
if self._fs is None:
with FileLock(self._lock_file):
credential = AzureCliCredential()
self._fs = adls_core.AzureDLFileSystem(
token_credential=credential,
store_name=self._account,
)
if self._credential is None:
raise RuntimeError(
"AdlsGen1Client requires an explicit ``credential``; "
"callers should pass ``credential=build_credential(creds)``."
)
if _legacy_gen1_sdk_in_use():
log.debug(
"AdlsGen1Client: legacy azure-datalake-store detected — "
"wrapping credential in _LegacyDataLakeCredentialAdapter"
)
self._fs = adls_core.AzureDLFileSystem(
token=_LegacyDataLakeCredentialAdapter(self._credential),
store_name=self._account,
)
else:
self._fs = adls_core.AzureDLFileSystem(
token_credential=self._credential,
store_name=self._account,
)
return self._fs

def list_files(
Expand Down Expand Up @@ -162,14 +261,20 @@ def list_files(

walk_start = time.monotonic()
if recursive:
raw_entries = self._walk(fs, root, max_workers)
raw_entries = self._walk(fs, root, max_workers, self._message_retry_policy)
else:
t0 = time.monotonic()
try:
raw_entries = fs.ls(root, detail=True)
raw_entries = self._retry(
lambda: fs.ls(root, detail=True),
label=f"gen1.list_files {root}",
)
except FileNotFoundError:
log.debug(f"Path not found: {root}")
return []
except CredentialUnavailableError:
log.error(f"list_files: credential acquisition exhausted for {root}")
raise
except Exception:
log.warning(f"Failed to list {root}")
return []
Expand Down Expand Up @@ -221,6 +326,7 @@ def _walk(
fs: adls_core.AzureDLFileSystem,
root: str,
max_workers: int,
message_retry_policy: MessageRetryPolicy | None = None,
) -> list[dict]:
"""Walk directories in parallel, logging per-directory progress."""
all_files: list[dict] = []
Expand All @@ -229,7 +335,9 @@ def _walk(
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures: dict[Future, tuple[str, int]] = {}

f = executor.submit(_list_one_dir, fs, root, 0)
f = executor.submit(
_list_one_dir, fs, root, 0, message_retry_policy=message_retry_policy
)
futures[f] = (root, 0)

while futures:
Expand All @@ -239,6 +347,10 @@ def _walk(
futures.pop(completed)
try:
files, dirs, dir_path, depth, elapsed_ms = completed.result()
except CredentialUnavailableError:
for pending in futures:
pending.cancel()
raise
except Exception:
dirs_done += 1
continue
Expand All @@ -255,7 +367,13 @@ def _walk(
all_files.extend(files)

for d in sorted(dirs, key=lambda e: e.get("name", "")):
new_f = executor.submit(_list_one_dir, fs, d["name"], depth + 1)
new_f = executor.submit(
_list_one_dir,
fs,
d["name"],
depth + 1,
message_retry_policy=message_retry_policy,
)
futures[new_f] = (d["name"], depth + 1)

if futures:
Expand Down Expand Up @@ -344,6 +462,9 @@ def enrich_with_estimates(self, files: list[FileInfo]) -> list[FileInfo]:
contributing_files=contrib_tuple,
)
)
except CredentialUnavailableError:
log.error(f"enrich_with_estimates: credential acquisition exhausted for {f.path}")
raise
except Exception:
log.warning(f"Failed to estimate bytes for {f.path} — using file length")
self._enrichment_cache[f.path] = (f.length, ())
Expand All @@ -356,20 +477,22 @@ def enrich_with_estimates(self, files: list[FileInfo]) -> list[FileInfo]:
)
return enriched

@staticmethod
def _directory_exists(path: str, fs: adls_core.AzureDLFileSystem) -> bool:
def _directory_exists(self, path: str, fs: adls_core.AzureDLFileSystem) -> bool:
"""Check if a directory exists on ADLS Gen1."""
try:
info = fs.info(path)
info = self._retry(lambda: fs.info(path), label=f"gen1.info {path}")
return info.get("type") == "DIRECTORY"
except FileNotFoundError:
return False
except CredentialUnavailableError:
log.error(f"_directory_exists: credential acquisition exhausted for {path}")
raise
except Exception:
log.debug(f"_directory_exists: error checking {path} — assuming not exists")
return False

@staticmethod
def _list_directory_files(
self,
dir_path: str,
fs: adls_core.AzureDLFileSystem,
) -> list[dict]:
Expand All @@ -380,9 +503,15 @@ def _list_directory_files(
while dirs_to_visit:
current = dirs_to_visit.pop()
try:
entries = fs.ls(current, detail=True)
entries = self._retry(
lambda c=current: fs.ls(c, detail=True),
label=f"gen1.ls {current}",
)
except FileNotFoundError:
continue
except CredentialUnavailableError:
log.error(f"_list_directory_files: credential acquisition exhausted for {current}")
raise
except Exception:
log.debug(f"_list_directory_files: failed to list {current} — skipping")
continue
Expand Down
Loading
Loading