perf: Production readiness with Workload purging, and AuthN injection#32
Merged
Conversation
… JSON `CheckpointManager._write_snapshot_parquet` round-trips records through DuckDB → Python dict → NDJSON → parquet to produce each compaction snapshot. When the prior parquet snapshot already exists, DuckDB infers the `batchProcessingTime` column as `TIMESTAMP` (auto-inferred from ISO strings once enough rows exist), so a subsequent compaction round-trips the column back to Python as `datetime` objects. The next `json.dumps(record)` then raises: TypeError: Object of type datetime is not JSON serializable …which surfaces in dbt as a compilation error and aborts the model run. The bug only triggers from the second compaction onwards and only when the NDJSON sample is large enough for DuckDB to infer `TIMESTAMP` (reproducible at ~20 rows). The production stress run hit this at `write_batch_sources failed for batch 20` after 7h43m. Fix: 1. Add a module-level `_json_default` encoder that converts `datetime` instances to ISO-8601 strings (re-attaching UTC for naive datetimes so the existing timezone-aware contract is preserved on round-trip) and wire it into both `_write_jsonl` and the NDJSON write in `_write_snapshot_parquet`. 2. Pin the snapshot parquet schema by explicitly casting each column in the `CREATE TABLE` so the schema no longer depends on DuckDB's JSON type inference. `batchProcessingTime` is now deterministically `TIMESTAMP`, and the round-trip back to datetime is handled by (1). Regression coverage added to `tests/unit/test_checkpoint_lifecycle.py::test_compaction_handles_timestamp_typed_prior_snapshot`: seeds a `TIMESTAMP`-typed prior snapshot with 22 rows, writes a JSONL diff at batch 15, then triggers compaction at batch 20 and asserts the new snapshot contains records from all three legs (prior snapshot + JSONL diff + current batch) and that its `batchProcessingTime` schema is `TIMESTAMP`. Fails on `main` with the exact production `TypeError`, passes after the fix. Closes #33 Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Under high concurrency the ``az`` subprocess that ``AzureCliCredential`` shells out to can time out, surfacing as ``CredentialUnavailableError: Failed to invoke the Azure CLI``. The adapter previously raised on the first occurrence, failing dbt runs after multi-hour stress jobs (PR #32). This change centralises retry handling in ``LockedTokenCredential``: * New ``RetryPolicy`` dataclass — linear backoff, capped delay, no jitter; total attempts == ``max_retries + 1``. * ``RetryPolicy.from_http_retries`` reuses the existing ``http_retries`` profile field so users can tune retries via ``profiles.yml`` (default bumped 3 -> 10 per stress-test spec). * ``LockedTokenCredential.get_token`` catches only ``CredentialUnavailableError`` (transient), releases the ``FileLock`` between attempts so other workers make progress, and sleeps via an injectable ``sleep`` callable for deterministic tests. All credential touchpoints now flow through ``LockedTokenCredential``: * ``ScopeConnectionHandle`` (ADLA query path) * ``AdlsGen1Client`` (source-file discovery) * ``CheckpointManager`` (watermark + snapshot RW) * ``ScopeAdapter.list_relations_without_caching`` Adds defensive ``except CredentialUnavailableError: raise`` ahead of broad ``except Exception`` blocks that previously silently converted exhausted credentials into 'no watermark' / 'no files' / 'no delta table' / 'no columns', any of which can corrupt incremental state. Tests: 17 new unit tests (353 total), all integration tests pass (3/3, 9:58). Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Contributor
Author
|
Pushed second commit Verification: 353/353 unit tests, 3/3 integration tests (9:58), lint clean. |
Emit SET @@MaxFileCountPerOutputFileSet = N; in every generated SCOPE
script so the cap on distinct OutputFileSet partition files is
deterministic across cluster defaults. Configurable end-to-end via:
- profiles.yml target.max_file_count_per_output_file_set
- dbt_project.yml models config
- per-model {{ config(max_file_count_per_output_file_set=...) }}
Adapter default is 5000 (matches the conservative cluster ceiling on
Fabric/OneLake clusters seen today). Project consumers needing the
SCOPE compiler upstream default of 100000 set it in profiles.yml.
ScriptBuilder validates the value falls in the compiler's
[1, 1_000_000] range and raises DbtRuntimeError on misconfiguration so
failures surface at compile time rather than after job submission.
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
…IGTERM
When the dbt process receives SIGINT (Ctrl+C) or SIGTERM, the adapter now:
1. Sets a shared shutdown flag so every in-flight submit_and_wait loop
self-cancels its own SCOPE job.
2. Snapshots the process-wide active-jobs registry and fans out parallel
CancelJob REST calls (one thread per job, bounded at 32).
3. Waits for each cancelled job to reach a terminal Ended state, bounded
by wait_on_cancel_seconds (default 30s, exposed in profiles.yml). Since
cancels run in parallel, total wall-clock is ~wait_on_cancel_seconds
regardless of job count.
Default-on; opt out via 'cancel_jobs_on_shutdown: false' in profiles.yml.
Also flips is_cancelable() to True and wires ScopeConnectionManager.cancel /
cancel_open to delegate to cancel_all_active_jobs for belt-and-suspenders
coverage of dbt's own cancellation pathways. atexit fallback covers paths
that unwind via unhandled exception rather than our signal handler.
Tests: 31 new tests in test_shutdown_cancellation.py + 2 in test_credentials.py.
Total: 399 unit tests pass (368 baseline + 31).
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Mirrors dbt-fabricspark PR #177. Adds three new profiles.yml fields to
ScopeCredentials so the adapter can route every credential acquisition
through a user-supplied azure.core.credentials.TokenCredential:
authentication: token_credential
credential_class: 'my_pkg.MyCredential'
credential_kwargs: { ... arbitrary kwargs ... }
Default remains authentication='cli' (AzureCliCredential wrapped in the
existing LockedTokenCredential file-lock + retry). The lock is now ONLY
applied on the CLI path — non-CLI credentials (SNI / managed identity /
notebookutils) skip the az-subprocess serialization which is meaningless
for them.
New module dbt/adapters/scope/custom_credential.py provides:
- load_custom_credential(dotted, kwargs) — importlib + isinstance check
- Process-wide instance cache keyed by (dotted_path, sorted kwargs)
- Regex validation of the dotted path
A shared build_credential(creds) helper in delta_lake.py is now the
single entry point used by every runtime credential acquisition:
- connections.py: ScopeConnectionHandle ADLA token
- impl.py: list_relations_without_caching ADLS Gen2 listing
- impl.py: _get_gen1_client → AdlsGen1Client (Gen1 token)
- impl.py: _get_checkpoint_manager → CheckpointManager (Gen2 r/w)
AdlsGen1Client and CheckpointManager now accept an optional
`credential` kwarg; production callers always supply build_credential
output. The CLI-default fallback in __init__ is kept only for backward
compatibility with existing test fixtures that don't supply one.
DuckDbDeltaLakeClient no longer auto-wraps in LockedTokenCredential —
callers (specifically get_default_delta_client for integration tests)
now pre-wrap themselves. This preserves test behaviour while letting
a future caller pass a non-CLI credential without an unwanted lock.
Tests: 419/419 passing. Added test_custom_credential.py (11 cases) and
authentication-field validation in test_credentials.py (7 cases).
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
…concurrent device-code prompts When dbt-scope runs with multiple threads (e.g. dbt run --threads 4) and 'authentication: token_credential', each worker was independently calling the inner credential (e.g. EntraTokenCredential), which would each walk their own fallback chain. On headless Fabric notebooks this lands on interactive device-code auth — one prompt per thread. Wrap the custom credential in LockedTokenCredential so the cross-process file lock serializes token acquisition. The first thread populates the inner credential's token cache; subsequent threads reuse the cached token without re-entering the fallback chain. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
…on code AzureCliCredential is now only constructed inside build_credential() when the user explicitly opts in via authentication='cli'. Every other production code path (AdlsGen1Client._get_fs, CheckpointManager) now requires the caller to pass an explicit credential and raises a clear error if none is provided. Eliminates the silent fallback that was masking misconfigured callers on environments without az login (Fabric notebooks, managed identity hosts) and made device-code prompts surprising to diagnose. - Removed get_default_delta_client() helper; integration conftest constructs the dev-only AzureCliCredential inline. - Tests updated to pass credential=MagicMock() instead of patching AzureCliCredential. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
…ential Integration conftest helpers (read_watermark, list_source_files, read_batch_source) were still constructing CheckpointManager() bare, which now raises after the production fallback removal. Wire them through the same dev-only AzureCliCredential as _delta_client(). Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
When authentication=token_credential (Fabric notebook + SNI etc), use a separate file lock from the AzureCliCredential one so the log lines and contention surface are unambiguous. - New constant FABRIC_TOKEN_LOCK = /tmp/dbt-scope-fabric-token - build_credential() dispatches: cli -> AZ_CLI_TOKEN_LOCK, token_credential -> FABRIC_TOKEN_LOCK - New unit tests lock the dispatch in Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
The Fabric notebook runtime ships azure-datalake-store==0.0.5x preinstalled at /home/trusted-service-user/jupyter-env/.... When dbt-scope passes token_credential=<modern TokenCredential> to AzureDLFileSystem, the legacy SDK signature is def __init__(self, store_name, token=None, **kwargs), so the token_credential kwarg is silently dropped into **kwargs, token stays None, and the SDK falls back to msal.PublicClientApplication.initiate_device_flow emitting 'To sign in, use a web browser to open https://login.microsoft.com/device' once per dbt worker thread. Detect the legacy SDK by inspecting AzureDLFileSystem.__init__ signature. When detected, wrap the modern TokenCredential in _LegacyDataLakeCredentialAdapter which exposes the legacy DataLakeCredential API (signed_session() returning a requests.Session and refresh_token()), then pass it via token= instead of token_credential=. The adapter is thread-safe (legacy SDK's _walk uses 8 worker threads), caches tokens with a 300s refresh-lead window, and uses the canonical legacy scope 'https://datalake.azure.net//.default' (double slash, matches the modern SDK's own default for parity). Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Why this change is needed
Closes #33.
The
mon_database_metadataincremental model indbt-ingest-genevafailedduring a long stress run with:
Inside the adapter:
The crash always happens on the second parquet compaction
(
batch_id > 0 and batch_id % source_compaction_interval == 0),i.e. once a previous
.parquetsnapshot already exists under_checkpoint/sources/. With the defaultsource_compaction_interval=10,that's batch 20 (as observed); with
source_compaction_interval=1(theaggressive case), it's batch 2.
Root cause
CheckpointManager._write_snapshot_parquetbuilds the next snapshot by:.parquetsnapshot via DuckDB(
SELECT * FROM read_parquet(...)).list[dict]in Python.nf.write(json.dumps(r) + "\n"), then loading the NDJSON back viaread_json_autoandCOPY ... TO ... (FORMAT PARQUET).batchProcessingTimeis created as an ISO 8601 string in_build_source_records, but DuckDB'sread_json_autoinfers it asTIMESTAMPonce the NDJSON has enough rows of consistent ISO text(reproducible at ~20 rows with
duckdb 1.5.3; the in-test fast-loop casewith ≤6 records per batch stays under the threshold, which is why our
existing multi-compaction tests didn't catch it).
So the first snapshot (batch 10) stores the column as
TIMESTAMP. On thenext compaction (batch 20), DuckDB returns those values as Python
datetime.datetime, andjson.dumps(r)then raisesTypeError: Object of type datetime is not JSON serializable. The outertry/exceptinwrite_batch_sourceslogs the failure and re-raises,which dbt surfaces as a "Compilation Error" on the model after ~7h43m of
wasted SCOPE compute.
How
Two small changes in
dbt/adapters/scope/checkpoint.py:_json_default(o)helper that mapsdatetime → o.isoformat()and re-raisesTypeErrorfor anything else.Naive
datetimeinstances (which is what DuckDB hands back after aTIMESTAMPround-trip) are re-attached to UTC before formatting sothe on-disk ISO string keeps its
+00:00suffix — preserving thetimezone-aware contract that
_build_source_recordsoriginallyestablished. Wired into both
json.dumpscall sites —_write_jsonl(defensive; today's inputs are pure str/int) and the NDJSON write in
_write_snapshot_parquet(where the crash actually happens). Thiscovers any existing customer parquet snapshots whose
batchProcessingTimecolumn is alreadyTIMESTAMP.SELECT *with an explicit, fully-typed projection —path VARCHAR,modificationTime BIGINT,batchId INTEGER,batchProcessingTime TIMESTAMP. This stops the schema from driftingbetween
VARCHARandTIMESTAMPdepending on DuckDB's sample-sizeheuristic, and also defends against future drift on the other three
columns. Every subsequent read then returns
datetimeforbatchProcessingTime, and_json_defaultnormalises it back to anISO string on the next round-trip.
Considerations
customer ADLS accounts (where the column was previously inferred as
TIMESTAMP) are now handled correctly by_json_default.union entirely in DuckDB via
read_parquet UNION ALL read_json_autoto skip the Python round-trip) would be cleaner but is out of scope —
the compaction code path is I/O-bound on ADLS, not on the Python
round-trip, so the perf win at current scale (≤a few hundred records
per snapshot) is negligible.
TIMESTAMP, wecould pin to
VARCHAR— both fix the bug. We pickedTIMESTAMPbecause it's the better long-term schema (and matches what DuckDB
picks at production scale anyway).
Test
tests/unit/test_checkpoint_lifecycle.py::TestCheckpointLifecycle::test_compaction_handles_timestamp_typed_prior_snapshotpre-seeds a
TIMESTAMP-typed parquet snapshot (22 rows) into thein-memory ADLS mock, writes an intermediate JSONL diff at batch 15
(3 rows), then triggers compaction at batch 20 (2 rows) and asserts:
write_batch_sourcesdoes not raise.union — prior snapshot + JSONL diff + current batch
(22 + 3 + 2 = 27 rows, with
batchIdvalues for every leg).batchProcessingTimeis deterministicallyTIMESTAMP.Verified to fail without the fix with the exact production error
(
TypeError: Object of type datetime is not JSON serializable) andpass with it.
.scripts/run.sh unit-test— 336/336 passing..scripts/run.sh lint— clean..scripts/run.sh integration-test— full suite re-run end-to-endagainst ADLA (no regressions observed).
addendum: credential retry resilience (commit 9fc39e8)
Why a second fix in this PR
After the datetime fix landed, the same
dbt-ingest-genevastress runsurfaced a second, independent failure mode in a CI workflow run:
https://github.com/microsoft/dbt-scope/actions/runs/27011393672/job/79716601894
Under high concurrency (the production model runs with
Concurrency: 32 threads), theazsubprocess thatAzureCliCredential.get_tokenshells out to can time out. The adapterpreviously raised on the first occurrence, failing the dbt run after
multi-hour ingestion jobs even though the underlying issue is
transient.
What changed
Centralised retry handling in
LockedTokenCredential:RetryPolicydataclass — linear backoff, capped delay, nojitter. Total attempts ==
max_retries + 1.RetryPolicy.from_http_retries(http_retries)reuses the existinghttp_retriesprofile field so users can tune retries viaprofiles.yml(default bumped 3 → 10 per user spec).LockedTokenCredential.get_tokencatches onlyCredentialUnavailableError(transient subprocess failures);permanent
ClientAuthenticationErroris intentionally not retried.FileLockbetween attempts so other workers continueto make progress.
All credential acquisition sites now flow through
LockedTokenCredentialand honour the policy:ScopeConnectionHandle(ADLA query path)AdlsGen1Client(source-file discovery)CheckpointManager(watermark + snapshot read/write)ScopeAdapter.list_relations_without_cachingDefensive: no more silent swallowing of exhausted credentials
A rubber-duck review surfaced several broad
except Exceptionblocksthat would silently convert exhausted credentials into apparently
benign outcomes — masking auth failures and corrupting incremental
state. Each now re-raises
CredentialUnavailableErrorbefore thebroad handler:
CheckpointManager.read_watermark→ would have returnedNone,silently flipping incremental → full refresh.
AdlsGen1Client._list_one_dir/_walk/list_files(non-recursive)/
_directory_exists/_list_directory_files/enrich_with_estimates→ would have returned empty / partial file lists, advancing the
watermark past unseen files.
DuckDbDeltaLakeClient.table_exists/get_max_partition/get_columns/list_table_paths→ would have looked like"Delta table missing".
ScopeAdapter.list_relations_without_caching(inner + outer except)→ would have returned an empty Delta-table list.
Tests
tests/unit/test_locked_token_credential.py(14 tests): defaults,
from_http_retriesforNone/-1/0/25,succeed-on-first-attempt (no sleep), succeed-after-N-failures,
exhaust-and-re-raise, linear-capped delay sequence
[1,2,3,...,10,10,10], zero-retries == single-attempt,non-
CredentialUnavailableErrorexceptions are NOT retried,claims=kwarg passthrough, lock-released-between-attempts (viaparallel acquisition probe), default policy used when
None.tests/unit/test_adls_gen1_client.py::TestAdlsGen1Client::test_recursive_subdir_credential_exhaustion_propagatestests/unit/test_adls_gen1_client.py::TestAdlsGen1Client::test_non_recursive_credential_exhaustion_propagatestests/unit/test_checkpoint.py::TestCheckpointManagerWatermark::test_read_watermark_propagates_credential_exhaustion.scripts/run.sh unit-test— 353/353 passing..scripts/run.sh lint— clean..scripts/run.sh integration-test— 3/3 passing end-to-endagainst ADLA (9:58).