Skip to content

fix: localize deferred row admission#743

Open
eric-tramel wants to merge 2 commits into
NVIDIA-NeMo:mainfrom
eric-tramel:etramel/fix/742-localized-deferred-admission-pr
Open

fix: localize deferred row admission#743
eric-tramel wants to merge 2 commits into
NVIDIA-NeMo:mainfrom
eric-tramel:etramel/fix/742-localized-deferred-admission-pr

Conversation

@eric-tramel

Copy link
Copy Markdown
Contributor

📋 Summary

Fixes adaptive row-group admission so deferred retry work only blocks new row groups when the next row group has no independent resource/column work to expose. This keeps healthy model resources busy while preserving row/frontier guardrails and adds deferred-admission diagnostics.

🔗 Related Issue

Fixes #742

🔄 Changes

  • Localize deferred retry row-group blockers by request resource, scheduler resource, custom-model group, multi-output column flow, and graph dependency.
  • Track the pending admission row group directly so deferred-pressure analysis does not rescan completed row-group prefixes.
  • Add stable deferred-admission diagnostics for blocker scope, counts by column/resource, candidate columns, and independent candidate columns.
  • Add regression coverage for unrelated resources, same-resource blocks, dependency blocks, multi-output sibling blocks, shared scheduler resources across request domains, custom-model fallback localization, row-guard precedence, independent local branches, and async healthy-resource progress during cooldown.

🧪 Testing

  • uv run --group dev pytest -q packages/data-designer-engine/tests/engine/dataset_builders/test_async_scheduler.py -k 'adaptive_row_group or deferred_cooldown' — 13 passed
  • uv run --group dev pytest -q packages/data-designer-engine/tests/engine/dataset_builders/test_async_scheduler.py — 104 passed
  • Large mock-provider experiment — 2048 row groups, one cooling resource, three healthy resources; healthy_total=300, observed_max_target=101, no deferred_tasks blockers
  • make check-all-fix
  • make test — config 616 passed; engine 2245 passed; interface 955 passed, 1 skipped

✅ Checklist

  • Follows commit message conventions
  • Commits are signed off (DCO)
  • Self-reviewed and review-nuke personas run; environment capped parallel reviewer threads at six, so scope reviewer was run immediately after the six-persona pass
  • Architecture docs updated (N/A — internal scheduler behavior/diagnostics only)

@eric-tramel eric-tramel requested a review from a team as a code owner June 5, 2026 19:44
@github-actions

github-actions Bot commented Jun 5, 2026

Copy link
Copy Markdown
Contributor

Code Review: PR #743 — fix: localize deferred row admission

Summary

Reworks adaptive row-group admission so that deferred retry work (e.g. cooling
LLM resources hit by a 429) only blocks the next row group when no independent
work could be exposed by admitting it. Previously, any deferred task forced
an "all stop" on row-group admission via the deferred_tasks reason. The new
logic computes per-row-group whether the deferred set's resource/group/dependency
footprint actually overlaps with what the next row group would dispatch.

Additionally, the helper _next_unadmitted_row_group_size() is replaced with
_next_unadmitted_row_group(), which reads from a new field
_row_group_admission_pending that _admit_row_groups maintains as it walks
the row-group list. This avoids re-scanning all completed row-group prefixes
on each adaptive tick.

Touches one source file (async_scheduler.py, +215/-36) and one test file
(+492/-1). 11 new tests cover the localization edge cases and an async smoke
test that exercises the keep-healthy-resources-busy path.

Findings

Correctness

  • _admit_row_groups does not reset _row_group_admission_pending between
    the row-guard re-check and _rg_states[rg_id] = ...
    — that's intentional
    (the row group is still "pending" until admitted). But there's a small
    diagnostic gap: between iterations, _row_group_admission_pending is None,
    so _adaptive_row_group_block_reason() returns "no_pending_row_groups"
    even though the admit task is about to pick up the next row group. This is a
    behavioral change vs. the pre-PR _next_unadmitted_row_group_size(), which
    scanned the full row-group list on demand. In practice this is fine (the
    admit task immediately sets the field on its next iteration), but it's worth
    noting that adaptive decisions and emitted diagnostics during this micro-gap
    will now key off "no pending" rather than the next eligible row group. Not a
    bug, but a behavior delta worth a one-line comment in
    _next_unadmitted_row_group() so future readers don't re-derive this.

  • Cache key for _deferred_admission_cache is order-sensitive
    (tuple(self._deferred)). The salvage path (lines ~1347, ~1453) reassigns
    self._deferred with reordered contents, so a deferred set that's logically
    identical but reordered will miss the cache. Correctness is unaffected — it
    just recomputes — but if you wanted maximum cache hit rate you could use
    frozenset(self._deferred). Tasks are frozen dataclasses so they're
    hashable. Minor nit; current behavior is correct.

  • Cache key does not include _rg_states or _tracker state. The cached
    analysis is keyed only by (deferred_tuple, (row_group, row_group_size)).
    The candidate-task generation in _row_group_admission_candidate_tasks
    depends only on the graph (immutable post-init), so the cache is sound for a
    given pending row group. Confirmed by inspection — no hidden state read.

  • _walk_graph memoizes per-starting-column but doesn't share suffix
    results.
    A → B → C: walking from A computes {B,C}, walking from B later is
    a fresh walk producing {C}. For typical column counts (dozens to low
    hundreds) this is negligible, but worth mentioning if graphs ever scale
    large. Not a blocker.

Style / project conventions

  • All new code uses absolute imports, modern type syntax, and
    from __future__ import annotations is already present.
  • _DeferredAdmissionAnalysis is a frozen dataclass with three fields and
    no methods
    — appropriate for a value type used only as a return value.
  • Naming is consistent with the file's existing patterns (_localized_…,
    _transitive_…, _row_group_admission_…). ✅
  • The new _localized_deferred_admission_keys builds string keys with
    request_resource: / scheduler_resource: / group: namespacing
    — clean
    and avoids collisions across categories.

Performance

  • Memoization of transitive upstream/downstream sets is the right call;
    the analysis is invoked on every adaptive tick.
  • _row_group_admission_candidate_tasks walks the full topological order
    per call
    but is memoized via the analysis cache when the deferred set
    is unchanged. Should not be a hot-path concern.
  • Diagnostics dict construction in _deferred_admission_diagnostics is
    unconditional even when nothing observes it.
    It's only called from
    _row_group_admission_diagnostics, which is itself called on event
    emission and from _adaptive_row_group_block_reason indirectly. Fine.

Test coverage

  • 11 new tests covering: unrelated resources don't block; same-resource
    blocks; downstream dependency blocks; multi-output sibling generators
    (shared generator id) propagate the block; shared scheduler resource across
    request domains (chat vs embedding) blocks; custom-model fallback (no
    resource key, only group) localizes; row-guard precedence over deferred;
    independent local branches stay admittable; async healthy-resource progress
    during cooldown.
  • The async test
    test_scheduler_adaptive_row_group_admission_keeps_healthy_resource_exposed_during_deferred_cooldown
    is the most realistic — it actually runs the scheduler with a permanently
    cooling generator and asserts that the healthy generator makes ≥16 calls
    before a 5s timeout. Good integration-style coverage. The cleanup in the
    finally block (cancel pending, gather, then re-cancel run_task with
    swallow) is defensively written and looks correct.
  • Test helpers _build_adaptive_model_resource_scheduler and
    _set_next_row_group_pending
    appropriately reduce boilerplate. The
    latter does poke at private state (_row_group_admission_pending), which
    is the natural way to exercise _deferred_admission_analysis in
    isolation — acceptable for unit tests on internal scheduler invariants.
  • Tests directly mutate scheduler._deferred, scheduler._deferred_errors,
    scheduler._rg_states
    — these are intentional white-box probes. The
    alternative (running the full scheduler to drive a 429 into the deferred
    list) would be much heavier; the targeted approach is reasonable and
    matches the existing test style in this file.

Risks

  • Behavioral change for callers that observed block_reason == "deferred_tasks" as a global signal. Any external diagnostics consumer
    that relied on that reason firing the moment any task was deferred will
    now see it fire less often. The new diagnostics under deferred_admission
    expose the richer state, but downstream dashboards or tests outside this PR
    may need to be aware.
  • _row_group_admission_pending is now load-bearing for the adaptive block
    reason.
    If a future change adds another caller that admits row groups
    outside _admit_row_groups, that path will need to maintain this field too.
    A short docstring on the field would help guard against this.
  • Backward compatibility with the existing scheduler event schema: the
    new deferred_admission key inside row_group_admission_diagnostics is
    additive — existing consumers won't break, new consumers can opt in.

Suggestions (non-blocking)

  1. Add a one-line docstring or comment on _row_group_admission_pending
    explaining that _admit_row_groups is its sole writer and what None
    means (no row group currently being admitted).
  2. Consider a brief module-level docstring or comment near
    _deferred_admission_analysis explaining the three exclusion rules
    (resource/group key match, transitive dependency on a blocked column,
    transitive reach into a blocked column when not resource-scoped). The
    logic is correct but takes a few reads to follow.
  3. The _localized_deferred_admission_keys function is named "localized" but
    actually returns the set of keys used to localize — naming nit. Maybe
    _admission_locality_keys or _admission_scope_keys. Subjective; skip if
    it would churn callers.

Structural Impact

No pre-computed structural impact analysis was available
(/tmp/structural-impact-743.md not present). Manual assessment:

  • Blast radius: Limited to data_designer.engine.dataset_builders.async_scheduler.
    No public API changes; only adds private methods, one private field, and one
    internal frozen dataclass (_DeferredAdmissionAnalysis). The pre-existing
    helper _next_unadmitted_row_group_size is replaced with
    _next_unadmitted_row_group, but both are private (_-prefixed). Tests in
    the same package are the only direct consumers.
  • Import direction: Imports remain within data_designer.engine
    (scheduling.resources, scheduling.task_admission). No upward imports
    into interface or sideways to other packages. ✅
  • Cross-package dependencies: None added.

Verdict

Approve with minor suggestions. The change is well-scoped, well-tested,
and addresses a real fairness problem (deferred work on one cooling resource
shouldn't stall healthy resources). Correctness analysis holds: the cache
key is sound, the dependency walk is correct, and the row-guard precedence
is preserved (and explicitly tested). The new _row_group_admission_pending
field tightens the diagnostic semantics in a subtle way that's worth a
one-line comment, but doesn't change correctness. No blockers.

@greptile-apps

greptile-apps Bot commented Jun 5, 2026

Copy link
Copy Markdown
Contributor

Greptile Summary

This PR fixes adaptive row-group admission so that a deferred retry task (e.g. a rate-limited model) only blocks the next row group when every candidate task in that row group is entangled with the cooling resource — either by sharing the same request/scheduler resource key, or through graph dependency. Previously, any deferred entry unconditionally blocked all further row-group admission, starving healthy model resources.

  • Localized admission analysis (_deferred_admission_analysis): classifies each candidate task for the next row group as blocked or independent based on resource key overlap and transitive graph reachability, using caches for graph traversal and the analysis result itself.
  • _row_group_admission_pending tracking: replaces the old full-scan of _row_groups with a direct pointer to the row group currently being considered for admission, set at the top of each _admit_row_groups iteration and cleared after admission or in the finally block.
  • Deferred-admission diagnostics: a new _deferred_admission_diagnostics() method adds blocker scope, per-resource counts, candidate columns, and independent candidate columns to the row_group_admission_blocked events.

Confidence Score: 5/5

The change is safe to merge; the localization logic is well-tested at unit and integration level and the _row_group_admission_pending invariant is properly guarded by a try/finally cleanup.

The deferred-admission analysis correctly classifies candidates using resource key overlap and transitive graph reachability. Cache invalidation keys on tuple(self._deferred) and the row group identity, which is sufficient. No data-loss, ordering, or correctness issues were found.

No files require special attention.

Important Files Changed

Filename Overview
packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py Core scheduler logic — new deferred-admission analysis, pending-row-group tracking, graph traversal caches, and diagnostics; logic is sound and the _row_group_admission_pending invariant is properly guarded.
packages/data-designer-engine/tests/engine/dataset_builders/test_async_scheduler.py Comprehensive regression suite — 9 new unit tests and 1 async integration test cover unrelated-resource passthrough, same-resource blocking, dependency blocking, multi-output siblings, shared scheduler resources, custom-model fallback, local branches, row-guard precedence, and live async progress.

Reviews (3): Last reviewed commit: "Merge branch 'main' into etramel/fix/742..." | Re-trigger Greptile

andreatgretel
andreatgretel previously approved these changes Jun 8, 2026

@andreatgretel andreatgretel left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed #743. No blocking findings.

Validated the GitHub/local diff, ran graphify, ruff on changed Python files, and the full changed scheduler test file (104 passed). Claude cross-review also found no correctness blockers. Only note is to preserve the localized deferred-admission behavior when rebasing with the overlapping scheduler PRs.

Fixes NVIDIA-NeMo#742

Signed-off-by: Eric W. Tramel <1223539+eric-tramel@users.noreply.github.com>
@eric-tramel eric-tramel force-pushed the etramel/fix/742-localized-deferred-admission-pr branch from 0a30df1 to 5a60c2a Compare June 8, 2026 15:44
@eric-tramel eric-tramel requested a review from andreatgretel June 8, 2026 16:30
@eric-tramel

Copy link
Copy Markdown
Contributor Author

Updated to resolve merge conflicts.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Adaptive row-group admission globally blocks healthy work behind deferred retries

2 participants