From f8c55e44bc81561229e02c262e444f2138b50d96 Mon Sep 17 00:00:00 2001 From: "Peter B. Robinson" Date: Mon, 11 May 2026 10:47:08 -0700 Subject: [PATCH 01/15] Introduce ready-work sets to ATS, per processor bucket queues in aleats --- ats/ready_queue.py | 167 +++++++++++++++++++++++++++ docs/source/scheduler_extensions.rst | 129 +++++++++++++++++++++ 2 files changed, 296 insertions(+) create mode 100644 ats/ready_queue.py create mode 100644 docs/source/scheduler_extensions.rst diff --git a/ats/ready_queue.py b/ats/ready_queue.py new file mode 100644 index 0000000..43e3b65 --- /dev/null +++ b/ats/ready_queue.py @@ -0,0 +1,167 @@ +"""Cached ready-queue helpers for ATS schedulers. + +``ReadyWorkSet`` tracks work that is structurally ready but still needs a +machine-specific capacity check before launch. It keeps FIFO scheduler order +within resource buckets, prefers the largest bucket that fits the current +capacity, and restores deferred candidates when a machine policy cannot run +them yet. +""" +from collections import defaultdict +import heapq + + +class ReadyWorkSet: + """Maintain resource-binned heaps of structurally ready scheduler work.""" + + def __init__(self, item_lookup, order_lookup, resource_bucket=None, serial_lookup=None): + self._item_lookup = item_lookup + self._order_lookup = order_lookup + self._resource_bucket = resource_bucket or self.default_resource_bucket + self._serial_lookup = serial_lookup or self.default_serial + self.reset() + + def reset(self): + """Clear all ready buckets and membership tracking.""" + self._ready_heaps = defaultdict(list) + self._ready_serials = set() + + @staticmethod + def default_resource_bucket(item): + """Map an ATS test-like item to its processor-count bucket.""" + return max(1, int(getattr(item, "np", 1))) + + @staticmethod + def default_serial(item): + """Return an ATS test-like item's stable serial identifier.""" + return item.serialNumber + + def bucket_for(self, item): + """Map ``item`` to a positive integer resource bucket.""" + return max(1, int(self._resource_bucket(item))) + + def order_of(self, item): + """Return the stable scheduler order used inside ready buckets.""" + return self._order_lookup(item) + + def enqueue_if_ready(self, item, ready_predicate): + """Add ``item`` to the ready set if the supplied predicate accepts it.""" + if item is None or not ready_predicate(item): + return False + serial = self._serial_lookup(item) + if serial in self._ready_serials: + return False + heapq.heappush(self._ready_heaps[self.bucket_for(item)], self._heap_key(item)) + self._ready_serials.add(serial) + return True + + def pop_next(self, available_slots, ready_predicate, can_run, blocked_predicate=None): + """Pop the largest fitting runnable item while restoring deferred candidates.""" + persistence_blocked = False + deferred = defaultdict(list) + for bucket in sorted(self._ready_heaps.keys(), reverse=True): + if bucket > available_slots: + continue + heap = self._ready_heaps[bucket] + while heap: + order_index, serial = heapq.heappop(heap) + self._ready_serials.discard(serial) + candidate = self._item_lookup(serial) + if candidate is None or not ready_predicate(candidate): + continue + if blocked_predicate is not None and blocked_predicate(candidate): + persistence_blocked = True + deferred[bucket].append((order_index, serial)) + continue + if can_run(candidate): + self._restore_deferred(deferred) + return candidate, persistence_blocked + deferred[bucket].append((order_index, serial)) + + self._restore_deferred(deferred) + return None, persistence_blocked + + def remove(self, item): + """Remove ``item`` from its ready bucket if it is still queued.""" + if item is None: + return False + serial = self._serial_lookup(item) + bucket = self.bucket_for(item) + heap = self._ready_heaps.get(bucket) + if not heap: + self._ready_serials.discard(serial) + return False + key = self._heap_key(item) + try: + heap.remove(key) + except ValueError: + self._ready_serials.discard(serial) + return False + heapq.heapify(heap) + self._ready_serials.discard(serial) + return True + + def has_candidates(self): + """Return whether any ready bucket still contains heap entries.""" + return any(heap for heap in self._ready_heaps.values()) + + def ready_count(self, ready_predicate): + """Count currently queued items that still satisfy ``ready_predicate``.""" + count = 0 + for serial in self._ready_serials: + candidate = self._item_lookup(serial) + if candidate is not None and ready_predicate(candidate): + count += 1 + return count + + def live_ready_count(self): + """Return the cached ready-set size.""" + return len(self._ready_serials) + + def bucket_counts(self, predicate=None): + """Return queued item counts by resource bucket.""" + counts = defaultdict(int) + for serial in self._ready_serials: + item = self._item_lookup(serial) + if item is None: + continue + if predicate is not None and not predicate(item): + continue + counts[self.bucket_for(item)] += 1 + return dict(counts) + + def buckets(self): + """Return a snapshot of bucket keys currently known to the work set.""" + return list(self._ready_heaps.keys()) + + def candidates_for_bucket(self, bucket, ready_predicate, candidate_predicate=None, limit=None): + """Return queued candidates from one bucket without mutating the work set.""" + candidates = [] + for _order_index, serial in sorted(self._ready_heaps.get(bucket, [])): + candidate = self._item_lookup(serial) + if candidate is None or not ready_predicate(candidate): + continue + if candidate_predicate is not None and not candidate_predicate(candidate): + continue + candidates.append(candidate) + if limit is not None and len(candidates) >= limit: + break + return candidates + + def _heap_key(self, item): + return (self._order_lookup(item), self._serial_lookup(item)) + + def _restore_deferred(self, deferred): + for bucket, items in deferred.items(): + for item in items: + heapq.heappush(self._ready_heaps[bucket], item) + self._ready_serials.add(item[-1]) + + +class NoPromotionPolicy: + """Promotion policy that leaves all candidates in normal ready buckets.""" + + def pop_promoted(self, ready_work_set, available_slots, ready_predicate, can_run, blocked_predicate=None): + return None, False + + def snapshot_counts(self, ready_work_set, ready_predicate): + return 0, {} diff --git a/docs/source/scheduler_extensions.rst b/docs/source/scheduler_extensions.rst new file mode 100644 index 0000000..84e0aec --- /dev/null +++ b/docs/source/scheduler_extensions.rst @@ -0,0 +1,129 @@ +==================================== + Scheduler Extension Design Guide +==================================== + +This guide describes the reusable ATS ready-queue support used by larger or +more specialized schedulers. The examples are intentionally domain-neutral. +Application drivers can use these APIs to add scheduling policy without +teaching ATS about application-specific status files, reports, or test +metadata. + +Core Responsibilities +===================== + +ATS owns the generic execution model: + +* tests are collected into ATS ``AtsTest`` objects; +* dependencies are represented with ``waitUntil`` and ``dependents``; +* the active scheduler decides which created tests should be offered to the + machine; +* the machine owns launch, running-test tracking, completion detection, and + final test-end notification. + +Application drivers own policy: + +* what "structurally ready" means for their tests; +* how tests should be bucketed for resource-aware scheduling; +* how retry and group-output behavior interacts with cached readiness. + +That split keeps ATS reusable. A driver can use ATS queues while keeping +domain-specific data structures outside the ATS package. + +Scheduler Flow +============== + +The normal interactive flow is: + +1. ``manager.collectTests()`` executes input files and defines tests. +2. The scheduler prioritizes and loads the collected interactive tests. +3. ``scheduler.step()`` starts runnable tests while the machine has capacity. +4. ``machine.checkRunning()`` detects completed tests. +5. ``machine.testEnded(test, status)`` records final machine-level state. +6. The scheduler is notified and can unblock dependent work. + +A custom scheduler should preserve two invariants: + +* scheduler operations and machine operations run on the main ATS thread unless + the driver has explicitly built a safe handoff; +* resource policy remains authoritative in the machine. A scheduler may cache + readiness, but it should still call ``machine.canRunNow(test)`` or + ``machine.startRun(test)`` before consuming resources. + +ReadyWorkSet +============ + +``ats.ready_queue.ReadyWorkSet`` stores work that is structurally ready but +still needs a machine capacity check. It groups work into resource buckets, +usually processor counts, and returns the largest fitting candidate first. + +The class is intentionally policy-light. The owner supplies: + +* ``item_lookup(serial)`` to map stable ids back to live test objects; +* ``order_lookup(item)`` to preserve scheduler order inside buckets; +* an optional ``resource_bucket(item)`` if ``item.np`` is not the right bucket; +* a ``ready_predicate(item)`` each time candidates are enqueued or popped; +* a ``can_run(item)`` predicate, normally ``machine.canRunNow``. + +``ReadyWorkSet`` does not decide what "ready" means. A scheduler commonly +defines readiness as: created, dependencies complete, not directory-blocked, +and still relevant to the current run. + +Beginning Tutorial: A Cached Ready Scheduler +============================================ + +A scheduler can use ``ReadyWorkSet`` to avoid rescanning every created test on +every pass. + +:: + + from ats.atsut import CREATED, RUNNING + from ats.ready_queue import ReadyWorkSet + + class ReadyScheduler: + def __init__(self): + self.tests_by_serial = {} + self.order = {} + self.ready = ReadyWorkSet( + item_lookup=self.tests_by_serial.get, + order_lookup=lambda test: self.order[test.serialNumber], + ) + + def add_tests(self, tests): + for test in tests: + self.tests_by_serial[test.serialNumber] = test + self.order[test.serialNumber] = len(self.order) + self.ready.enqueue_if_ready(test, self.is_ready) + + def is_ready(self, test): + if test.status is not CREATED: + return False + return not any(parent.status in (CREATED, RUNNING) + for parent in test.waitUntil) + + def start_available(self, machine): + available = machine.remainingCapacity() + while True: + test, blocked = self.ready.pop_next( + available, + self.is_ready, + machine.canRunNow, + ) + if test is None: + return + machine.startRun(test) + available = machine.remainingCapacity() + +Real schedulers also need to update dependent tests when a parent completes and +to handle directory blocks, retries, and logging. The example shows the +division of labor: the ready set stores candidates; the scheduler owns +dependency policy; the machine owns resource admission. + +Design Checklist +================ + +When adding a scheduler extension: + +* keep application-specific state outside ATS; +* call machine resource checks before launch; +* preserve ordering only where required by correctness; +* test completion bursts, dependency chains, and retry/reset behavior. From edea1e9f995261f0908231f57abd6e152814f063 Mon Sep 17 00:00:00 2001 From: "Peter B. Robinson" Date: Mon, 11 May 2026 12:57:05 -0700 Subject: [PATCH 02/15] remove NoPromotionPolicy for now. --- ats/ready_queue.py | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/ats/ready_queue.py b/ats/ready_queue.py index 43e3b65..b5cf681 100644 --- a/ats/ready_queue.py +++ b/ats/ready_queue.py @@ -155,13 +155,3 @@ def _restore_deferred(self, deferred): for item in items: heapq.heappush(self._ready_heaps[bucket], item) self._ready_serials.add(item[-1]) - - -class NoPromotionPolicy: - """Promotion policy that leaves all candidates in normal ready buckets.""" - - def pop_promoted(self, ready_work_set, available_slots, ready_predicate, can_run, blocked_predicate=None): - return None, False - - def snapshot_counts(self, ready_work_set, ready_predicate): - return 0, {} From 33ed761e6c975ba8877d2fe37ecfffda30112d12 Mon Sep 17 00:00:00 2001 From: "Peter B. Robinson" Date: Mon, 11 May 2026 13:26:13 -0700 Subject: [PATCH 03/15] add more documentation to the example. --- docs/source/scheduler_extensions.rst | 180 ++++++++++++++++++++-- test/test_ready_queue_example.py | 219 +++++++++++++++++++++++++++ 2 files changed, 383 insertions(+), 16 deletions(-) create mode 100644 test/test_ready_queue_example.py diff --git a/docs/source/scheduler_extensions.rst b/docs/source/scheduler_extensions.rst index 84e0aec..d3d9f38 100644 --- a/docs/source/scheduler_extensions.rst +++ b/docs/source/scheduler_extensions.rst @@ -72,51 +72,199 @@ Beginning Tutorial: A Cached Ready Scheduler ============================================ A scheduler can use ``ReadyWorkSet`` to avoid rescanning every created test on -every pass. +every pass. This example is intentionally small, but it includes the parts +needed for a copied scheduler to stay correct: initial indexing, direct +dependent updates, directory-block updates, and launch-failure recovery. :: + from collections import defaultdict + from ats.atsut import CREATED, RUNNING from ats.ready_queue import ReadyWorkSet class ReadyScheduler: def __init__(self): + # ``ReadyWorkSet`` stores only compact heap entries, so the + # scheduler must provide a way to map a serial number back to the + # live ATS test object. self.tests_by_serial = {} + + # Stable order is separate from serial number. Serial numbers are + # usually creation order, but a scheduler may sort groups or apply + # priority before loading tests. Keep the order you want preserved + # inside each resource bucket. self.order = {} + + # Cache the number of unfinished wait dependencies for each test. + # This turns "did this parent unblock anything?" into updates of the + # completed test's direct dependents instead of a full-suite scan. + self.remaining_waits = {} + + # ATS directory blocks prevent two non-independent groups from + # running in the same directory. When a block clears, only tests in + # that directory need to be reconsidered. + self.tests_by_block = defaultdict(list) + self.blocks = {} + + # The ready set decides queue mechanics: resource buckets, stable + # in-bucket ordering, stale-entry cleanup, and temporary deferral of + # candidates that are ready but cannot pass machine policy yet. self.ready = ReadyWorkSet( item_lookup=self.tests_by_serial.get, order_lookup=lambda test: self.order[test.serialNumber], ) def add_tests(self, tests): - for test in tests: + # Load tests in the scheduler order you want to preserve. A real + # scheduler usually calls this after ATS has built and sorted groups. + tests = list(tests) + next_order = len(self.order) + for offset, test in enumerate(tests): self.tests_by_serial[test.serialNumber] = test - self.order[test.serialNumber] = len(self.order) + self.order[test.serialNumber] = next_order + offset + + # Count only unfinished parents. Completed parents should not + # keep a test out of the initial ready set. + self.remaining_waits[test.serialNumber] = sum( + 1 for parent in test.waitUntil + if parent.status in (CREATED, RUNNING) + ) + + # Record block membership once so clearing a directory block can + # revisit only affected tests. + if getattr(test, "block", None): + self.tests_by_block[test.block].append(test) + + for test in tests: + # ``enqueue_if_ready`` calls back into scheduler policy. The + # ready set never decides what CREATED, waits, or blocks mean. self.ready.enqueue_if_ready(test, self.is_ready) def is_ready(self, test): + # "Ready" here means structurally ready for scheduler consideration. + # It does not mean resources are currently available; the machine + # still decides that later through ``canRunNow`` / ``startRun``. + if test is None: + return False if test.status is not CREATED: return False - return not any(parent.status in (CREATED, RUNNING) - for parent in test.waitUntil) + if self.remaining_waits.get(test.serialNumber, 0) != 0: + return False + + # The cached counter is the fast path. This direct check is a + # defensive guard for schedulers that can mutate waitUntil after + # initial indexing or after retry/reset handling. + if any(parent.status in (CREATED, RUNNING) + for parent in test.waitUntil): + return False + return not self.is_blocked(test) + + def is_blocked(self, test): + # Directory blocks are owned by the group that is currently running + # in that directory. Tests from that same group may continue; tests + # from other groups must wait. + block = getattr(test, "block", None) + if not block: + return False + owner = self.blocks.get(block) + return owner is not None and owner != test.group.number + + def add_block(self, test): + # Match ATS's default block policy: if any non-independent member of + # a group has a block directory, the whole group owns that block + # while one of its blocking members is CREATED or RUNNING. + group = test.group + if getattr(group, "isBlocking", False): + return + for member in group: + if getattr(member, "independent", False): + continue + block = getattr(member, "block", None) + if block: + group.isBlocking = True + self.blocks[block] = group.number + + def remove_block(self, test): + # A group keeps its directory block until all non-independent group + # members have left CREATED/RUNNING. When that happens, sibling + # tests from other groups in the same directory may become ready. + group = test.group + if not getattr(group, "isBlocking", False): + return + for member in group: + if getattr(member, "independent", False): + continue + if member.status in (CREATED, RUNNING): + return + group.isBlocking = False + for member in group: + if getattr(member, "independent", False): + continue + block = getattr(member, "block", None) + if block: + self.blocks.pop(block, None) + + def test_ended(self, test): + # Completion can create new ready work in two local ways: + # 1. direct dependents may have one fewer unfinished parent; + # 2. the completed test's directory block may have cleared. + block = getattr(test, "block", None) + self.remove_block(test) + + for dependent in getattr(test, "dependents", []): + # Some ATS bookkeeping can list broad dependents. Only update + # tests that were actually waiting on this completed parent. + if test not in getattr(dependent, "waitUntil", []): + continue + serial = dependent.serialNumber + remaining = self.remaining_waits.get(serial, 0) + if remaining > 0: + self.remaining_waits[serial] = remaining - 1 + + # This is cheap even if the dependent is not ready yet. The + # scheduler predicate filters out tests with other unfinished + # parents or active blocks. + self.ready.enqueue_if_ready(dependent, self.is_ready) + + if block and block not in self.blocks: + # Avoid rescanning all CREATED tests just because one directory + # block cleared. Only tests from that block can be affected. + for candidate in self.tests_by_block.get(block, []): + self.ready.enqueue_if_ready(candidate, self.is_ready) + + def next_ready(self, machine): + # ``pop_next`` picks the largest resource bucket that fits the + # current capacity, then checks machine policy. Candidates that are + # still structurally ready but cannot run now are restored so a later + # scheduler pass can reconsider them. + test, _blocked = self.ready.pop_next( + machine.remainingCapacity(), + self.is_ready, + machine.canRunNow, + ) + return test def start_available(self, machine): - available = machine.remainingCapacity() + # Keep launching until the ready set has no machine-runnable work or + # the machine is full. Real schedulers usually also log each launch. while True: - test, blocked = self.ready.pop_next( - available, - self.is_ready, - machine.canRunNow, - ) + test = self.next_ready(machine) if test is None: return - machine.startRun(test) - available = machine.remainingCapacity() + self.add_block(test) + if not machine.startRun(test): + # If the machine refuses a launch after selection, undo the + # scheduler-side block and requeue the test if it is still + # structurally ready. + self.remove_block(test) + self.ready.enqueue_if_ready(test, self.is_ready) + return -Real schedulers also need to update dependent tests when a parent completes and -to handle directory blocks, retries, and logging. The example shows the +Production schedulers also need logging, retry behavior, group-output handling, +periodic reports, and a cheap "work remains" check. The example shows the division of labor: the ready set stores candidates; the scheduler owns -dependency policy; the machine owns resource admission. +dependency and block policy; the machine owns resource admission. Design Checklist ================ diff --git a/test/test_ready_queue_example.py b/test/test_ready_queue_example.py new file mode 100644 index 0000000..f7bcfef --- /dev/null +++ b/test/test_ready_queue_example.py @@ -0,0 +1,219 @@ +from collections import defaultdict +import types +import unittest + +from ats.atsut import CREATED, PASSED, RUNNING +from ats.ready_queue import ReadyWorkSet + + +class ReadyScheduler: + """Minimal scheduler matching the docs/source/scheduler_extensions.rst example.""" + + def __init__(self): + self.tests_by_serial = {} + self.order = {} + self.remaining_waits = {} + self.tests_by_block = defaultdict(list) + self.blocks = {} + self.ready = ReadyWorkSet( + item_lookup=self.tests_by_serial.get, + order_lookup=lambda test: self.order[test.serialNumber], + ) + + def add_tests(self, tests): + tests = list(tests) + next_order = len(self.order) + for offset, test in enumerate(tests): + self.tests_by_serial[test.serialNumber] = test + self.order[test.serialNumber] = next_order + offset + self.remaining_waits[test.serialNumber] = sum( + 1 for parent in test.waitUntil + if parent.status in (CREATED, RUNNING) + ) + if getattr(test, "block", None): + self.tests_by_block[test.block].append(test) + + for test in tests: + self.ready.enqueue_if_ready(test, self.is_ready) + + def is_ready(self, test): + if test is None: + return False + if test.status is not CREATED: + return False + if self.remaining_waits.get(test.serialNumber, 0) != 0: + return False + if any(parent.status in (CREATED, RUNNING) for parent in test.waitUntil): + return False + return not self.is_blocked(test) + + def is_blocked(self, test): + block = getattr(test, "block", None) + if not block: + return False + owner = self.blocks.get(block) + return owner is not None and owner != test.group.number + + def add_block(self, test): + group = test.group + if getattr(group, "isBlocking", False): + return + for member in group: + if getattr(member, "independent", False): + continue + block = getattr(member, "block", None) + if block: + group.isBlocking = True + self.blocks[block] = group.number + + def remove_block(self, test): + group = test.group + if not getattr(group, "isBlocking", False): + return + for member in group: + if getattr(member, "independent", False): + continue + if member.status in (CREATED, RUNNING): + return + group.isBlocking = False + for member in group: + if getattr(member, "independent", False): + continue + block = getattr(member, "block", None) + if block: + self.blocks.pop(block, None) + + def test_ended(self, test): + block = getattr(test, "block", None) + self.remove_block(test) + + for dependent in getattr(test, "dependents", []): + if test not in getattr(dependent, "waitUntil", []): + continue + serial = dependent.serialNumber + remaining = self.remaining_waits.get(serial, 0) + if remaining > 0: + self.remaining_waits[serial] = remaining - 1 + self.ready.enqueue_if_ready(dependent, self.is_ready) + + if block and block not in self.blocks: + for candidate in self.tests_by_block.get(block, []): + self.ready.enqueue_if_ready(candidate, self.is_ready) + + def next_ready(self, machine): + test, _blocked = self.ready.pop_next( + machine.remainingCapacity(), + self.is_ready, + machine.canRunNow, + ) + return test + + def start_available(self, machine): + while True: + test = self.next_ready(machine) + if test is None: + return + self.add_block(test) + if not machine.startRun(test): + self.remove_block(test) + self.ready.enqueue_if_ready(test, self.is_ready) + return + + +class _Group(list): + def __init__(self, number, tests=()): + super().__init__(tests) + self.number = number + self.isBlocking = False + + +class _Machine: + def __init__(self, capacity, blocked_serials=None): + self.capacity = capacity + self.blocked_serials = set(blocked_serials or []) + self.launched = [] + + def remainingCapacity(self): + return self.capacity + + def canRunNow(self, test): + return test.serialNumber not in self.blocked_serials and test.np <= self.capacity + + def startRun(self, test): + if not self.canRunNow(test): + return False + self.launched.append(test.serialNumber) + self.capacity -= test.np + test.status = RUNNING + return True + + +def _make_test(serial, np=1, block=None, wait_until=None): + return types.SimpleNamespace( + serialNumber=serial, + status=CREATED, + waitUntil=list(wait_until or []), + dependents=[], + block=block, + independent=False, + np=np, + ) + + +class ReadySchedulerExampleTest(unittest.TestCase): + def test_launches_ready_work_and_unblocks_direct_dependents(self): + parent = _make_test(1, np=1, block="case") + child = _make_test(2, np=1, block="case", wait_until=[parent]) + other = _make_test(3, np=1, block="other") + parent.dependents = [child] + + group = _Group(1, [parent, child]) + other_group = _Group(2, [other]) + parent.group = group + child.group = group + other.group = other_group + + scheduler = ReadyScheduler() + scheduler.add_tests([parent, child, other]) + + machine = _Machine(capacity=1) + scheduler.start_available(machine) + self.assertEqual(machine.launched, [1]) + self.assertIs(parent.status, RUNNING) + self.assertIs(child.status, CREATED) + + parent.status = PASSED + machine.capacity = 1 + scheduler.test_ended(parent) + scheduler.start_available(machine) + + self.assertEqual(machine.launched, [1, 2]) + self.assertIs(child.status, RUNNING) + + def test_prefers_largest_fitting_bucket_and_restores_blocked_candidate(self): + small = _make_test(1, np=1, block="small") + large = _make_test(2, np=4, block="large") + small.group = _Group(1, [small]) + large.group = _Group(2, [large]) + + scheduler = ReadyScheduler() + scheduler.add_tests([small, large]) + + machine = _Machine(capacity=4, blocked_serials={2}) + scheduler.start_available(machine) + self.assertEqual(machine.launched, [1]) + self.assertIs(small.status, RUNNING) + self.assertIs(large.status, CREATED) + + small.status = PASSED + machine.capacity = 4 + machine.blocked_serials.clear() + scheduler.test_ended(small) + scheduler.start_available(machine) + + self.assertEqual(machine.launched, [1, 2]) + self.assertIs(large.status, RUNNING) + + +if __name__ == "__main__": + unittest.main() From e55793a6a9e9504b3862d81dd1de033c5736a79b Mon Sep 17 00:00:00 2001 From: "Peter B. Robinson" Date: Mon, 11 May 2026 14:04:32 -0700 Subject: [PATCH 04/15] add more extended documentation to ready_queue.py --- ats/ready_queue.py | 231 ++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 216 insertions(+), 15 deletions(-) diff --git a/ats/ready_queue.py b/ats/ready_queue.py index b5cf681..8a9530a 100644 --- a/ats/ready_queue.py +++ b/ats/ready_queue.py @@ -5,15 +5,49 @@ within resource buckets, prefers the largest bucket that fits the current capacity, and restores deferred candidates when a machine policy cannot run them yet. + +Items are usually ``ats.tests.AtsTest`` instances, but the class only requires +test-like objects with stable serial identifiers. By default, an item must +have: + +* ``serialNumber``: integer-like unique id for this scheduler; +* ``np``: optional integer-like processor count used as the resource bucket. + +Schedulers supply callbacks for lookup, ordering, readiness, and machine +admission. ``ReadyWorkSet`` deliberately does not know what CREATED, waits, +directory blocks, retries, or machine resources mean. """ from collections import defaultdict import heapq class ReadyWorkSet: - """Maintain resource-binned heaps of structurally ready scheduler work.""" + """Maintain resource-binned heaps of structurally ready scheduler work. + + The work set stores heap entries as ``(order, serial)`` tuples rather than + storing item objects directly. This keeps heap ordering stable and lets a + scheduler's readiness predicate see the current live object each time a + candidate is considered. + """ def __init__(self, item_lookup, order_lookup, resource_bucket=None, serial_lookup=None): + """Create an empty ready work set. + + Args: + item_lookup (callable): Function with signature + ``item_lookup(serial: int) -> object | None``. It maps a + serial id from a heap entry back to the live scheduler item. + order_lookup (callable): Function with signature + ``order_lookup(item: object) -> int``. Lower values run first + inside one resource bucket. + resource_bucket (callable, optional): Function with signature + ``resource_bucket(item: object) -> int``. It maps an item to a + positive resource bucket. The default uses ``item.np`` or + ``1`` when ``np`` is missing. + serial_lookup (callable, optional): Function with signature + ``serial_lookup(item: object) -> int``. It returns the stable + unique id for an item. The default uses ``item.serialNumber``. + """ self._item_lookup = item_lookup self._order_lookup = order_lookup self._resource_bucket = resource_bucket or self.default_resource_bucket @@ -21,30 +55,83 @@ def __init__(self, item_lookup, order_lookup, resource_bucket=None, serial_looku self.reset() def reset(self): - """Clear all ready buckets and membership tracking.""" + """Clear all ready buckets and membership tracking. + + Args: + None. + + Returns: + None. + """ self._ready_heaps = defaultdict(list) self._ready_serials = set() @staticmethod def default_resource_bucket(item): - """Map an ATS test-like item to its processor-count bucket.""" + """Map an ATS test-like item to its processor-count bucket. + + Args: + item (object): Test-like object. If present, ``item.np`` must be + integer-like. + + Returns: + int: ``max(1, int(item.np))``, or ``1`` if ``item`` has no ``np``. + """ return max(1, int(getattr(item, "np", 1))) @staticmethod def default_serial(item): - """Return an ATS test-like item's stable serial identifier.""" + """Return an ATS test-like item's stable serial identifier. + + Args: + item (object): Test-like object with integer-like + ``item.serialNumber``. + + Returns: + int: The item's stable serial identifier. + """ return item.serialNumber def bucket_for(self, item): - """Map ``item`` to a positive integer resource bucket.""" + """Map ``item`` to a positive integer resource bucket. + + Args: + item (object): Scheduler item accepted by this work set's + ``resource_bucket`` callback. + + Returns: + int: Positive resource bucket for ``item``. + """ return max(1, int(self._resource_bucket(item))) def order_of(self, item): - """Return the stable scheduler order used inside ready buckets.""" + """Return the stable scheduler order used inside ready buckets. + + Args: + item (object): Scheduler item accepted by this work set's + ``order_lookup`` callback. + + Returns: + int: Stable order key. Lower values are selected first inside the + same resource bucket. + """ return self._order_lookup(item) def enqueue_if_ready(self, item, ready_predicate): - """Add ``item`` to the ready set if the supplied predicate accepts it.""" + """Add ``item`` to the ready set if the supplied predicate accepts it. + + Args: + item (object | None): Scheduler item to enqueue. ``None`` is + ignored. + ready_predicate (callable): Function with signature + ``ready_predicate(item: object) -> bool``. It should return + ``True`` only when the item is structurally ready for scheduler + consideration. + + Returns: + bool: ``True`` if a new heap entry was added, otherwise ``False``. + Duplicate serial ids are ignored. + """ if item is None or not ready_predicate(item): return False serial = self._serial_lookup(item) @@ -55,7 +142,34 @@ def enqueue_if_ready(self, item, ready_predicate): return True def pop_next(self, available_slots, ready_predicate, can_run, blocked_predicate=None): - """Pop the largest fitting runnable item while restoring deferred candidates.""" + """Pop the largest fitting runnable item while restoring deferred candidates. + + Args: + available_slots (int): Current machine capacity. Buckets larger + than this value are skipped. + ready_predicate (callable): Function with signature + ``ready_predicate(item: object) -> bool``. It is rechecked for + each candidate so stale heap entries can be discarded. + can_run (callable): Function with signature + ``can_run(item: object) -> bool``. This is normally + ``machine.canRunNow`` and is the final machine-admission check. + blocked_predicate (callable, optional): Function with signature + ``blocked_predicate(item: object) -> bool``. When it returns + ``True``, the candidate is left queued and the returned blocked + flag is set. This is for scheduler-owned temporary blockers + that are not represented by ``ready_predicate``. + + Returns: + tuple: ``(item, blocked)`` where ``item`` is the selected scheduler + item or ``None``. ``blocked`` is ``True`` if at least one otherwise + ready candidate was retained by ``blocked_predicate``. + + Notes: + Candidates that still satisfy ``ready_predicate`` but fail + ``can_run`` are restored to their original buckets before return. + Candidates that no longer satisfy ``ready_predicate`` are discarded + as stale entries. + """ persistence_blocked = False deferred = defaultdict(list) for bucket in sorted(self._ready_heaps.keys(), reverse=True): @@ -81,7 +195,16 @@ def pop_next(self, available_slots, ready_predicate, can_run, blocked_predicate= return None, persistence_blocked def remove(self, item): - """Remove ``item`` from its ready bucket if it is still queued.""" + """Remove ``item`` from its ready bucket if it is still queued. + + Args: + item (object | None): Scheduler item to remove. ``None`` is + ignored. + + Returns: + bool: ``True`` if the exact heap entry was found and removed, + otherwise ``False``. + """ if item is None: return False serial = self._serial_lookup(item) @@ -101,11 +224,28 @@ def remove(self, item): return True def has_candidates(self): - """Return whether any ready bucket still contains heap entries.""" + """Return whether any ready bucket still contains heap entries. + + Args: + None. + + Returns: + bool: ``True`` if any bucket contains queued heap entries. This is + a cheap structural check; entries may still be stale. + """ return any(heap for heap in self._ready_heaps.values()) def ready_count(self, ready_predicate): - """Count currently queued items that still satisfy ``ready_predicate``.""" + """Count currently queued items that still satisfy ``ready_predicate``. + + Args: + ready_predicate (callable): Function with signature + ``ready_predicate(item: object) -> bool``. + + Returns: + int: Number of queued serial ids whose live item exists and still + satisfies ``ready_predicate``. + """ count = 0 for serial in self._ready_serials: candidate = self._item_lookup(serial) @@ -114,11 +254,30 @@ def ready_count(self, ready_predicate): return count def live_ready_count(self): - """Return the cached ready-set size.""" + """Return the cached ready-set size. + + Args: + None. + + Returns: + int: Number of serial ids currently tracked as queued. This is + cheap and may include stale entries that will be discarded by a + later ``pop_next`` or ``ready_count`` call. + """ return len(self._ready_serials) def bucket_counts(self, predicate=None): - """Return queued item counts by resource bucket.""" + """Return queued item counts by resource bucket. + + Args: + predicate (callable, optional): Function with signature + ``predicate(item: object) -> bool``. If provided, only live + items accepted by this predicate are counted. + + Returns: + dict: Mapping ``bucket: int`` to ``count: int`` for queued live + items. + """ counts = defaultdict(int) for serial in self._ready_serials: item = self._item_lookup(serial) @@ -130,11 +289,35 @@ def bucket_counts(self, predicate=None): return dict(counts) def buckets(self): - """Return a snapshot of bucket keys currently known to the work set.""" + """Return a snapshot of bucket keys currently known to the work set. + + Args: + None. + + Returns: + list: Resource bucket integers. Empty buckets may be present if + they previously held work. + """ return list(self._ready_heaps.keys()) def candidates_for_bucket(self, bucket, ready_predicate, candidate_predicate=None, limit=None): - """Return queued candidates from one bucket without mutating the work set.""" + """Return queued candidates from one bucket without mutating the work set. + + Args: + bucket (int): Resource bucket to inspect. + ready_predicate (callable): Function with signature + ``ready_predicate(item: object) -> bool``. Stale or no-longer + ready candidates are skipped in the returned list, but not + removed from the heap. + candidate_predicate (callable, optional): Additional filter with + signature ``candidate_predicate(item: object) -> bool``. + limit (int, optional): Maximum number of candidates to return. If + ``None``, all matching candidates in the bucket are returned. + + Returns: + list: Live scheduler items from ``bucket`` in stable in-bucket + order. + """ candidates = [] for _order_index, serial in sorted(self._ready_heaps.get(bucket, [])): candidate = self._item_lookup(serial) @@ -148,9 +331,27 @@ def candidates_for_bucket(self, bucket, ready_predicate, candidate_predicate=Non return candidates def _heap_key(self, item): + """Build the stored heap key for ``item``. + + Args: + item (object): Scheduler item accepted by ``order_lookup`` and + ``serial_lookup``. + + Returns: + tuple: ``(order: int, serial: int)``. + """ return (self._order_lookup(item), self._serial_lookup(item)) def _restore_deferred(self, deferred): + """Restore deferred heap entries after a pop attempt. + + Args: + deferred (dict): Mapping ``bucket: int`` to lists of heap-key + tuples ``(order: int, serial: int)``. + + Returns: + None. + """ for bucket, items in deferred.items(): for item in items: heapq.heappush(self._ready_heaps[bucket], item) From 2aa2d111d1677e839bd5e004d9c6e99986dafcc7 Mon Sep 17 00:00:00 2001 From: "Peter B. Robinson" Date: Mon, 11 May 2026 14:10:45 -0700 Subject: [PATCH 05/15] doc cleanup. --- ats/ready_queue.py | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/ats/ready_queue.py b/ats/ready_queue.py index 8a9530a..b9e23e1 100644 --- a/ats/ready_queue.py +++ b/ats/ready_queue.py @@ -57,9 +57,6 @@ def __init__(self, item_lookup, order_lookup, resource_bucket=None, serial_looku def reset(self): """Clear all ready buckets and membership tracking. - Args: - None. - Returns: None. """ @@ -226,9 +223,6 @@ def remove(self, item): def has_candidates(self): """Return whether any ready bucket still contains heap entries. - Args: - None. - Returns: bool: ``True`` if any bucket contains queued heap entries. This is a cheap structural check; entries may still be stale. @@ -256,9 +250,6 @@ def ready_count(self, ready_predicate): def live_ready_count(self): """Return the cached ready-set size. - Args: - None. - Returns: int: Number of serial ids currently tracked as queued. This is cheap and may include stale entries that will be discarded by a @@ -291,9 +282,6 @@ def bucket_counts(self, predicate=None): def buckets(self): """Return a snapshot of bucket keys currently known to the work set. - Args: - None. - Returns: list: Resource bucket integers. Empty buckets may be present if they previously held work. From 129b7955691be2909101ab8c092f741922fffe9a Mon Sep 17 00:00:00 2001 From: Peter B Robinson Date: Thu, 14 May 2026 13:33:50 -0700 Subject: [PATCH 06/15] Generalize ready queue priorities --- ats/ready_queue.py | 184 ++++++++++++++++++++++--------- test/test_ready_queue_example.py | 33 +++++- 2 files changed, 162 insertions(+), 55 deletions(-) diff --git a/ats/ready_queue.py b/ats/ready_queue.py index b9e23e1..0910f7a 100644 --- a/ats/ready_queue.py +++ b/ats/ready_queue.py @@ -2,16 +2,16 @@ ``ReadyWorkSet`` tracks work that is structurally ready but still needs a machine-specific capacity check before launch. It keeps FIFO scheduler order -within resource buckets, prefers the largest bucket that fits the current -capacity, and restores deferred candidates when a machine policy cannot run -them yet. +within integer-priority buckets, prefers the highest runnable priority, and +restores deferred candidates when a machine policy cannot run them yet. Items are usually ``ats.tests.AtsTest`` instances, but the class only requires test-like objects with stable serial identifiers. By default, an item must have: * ``serialNumber``: integer-like unique id for this scheduler; -* ``np``: optional integer-like processor count used as the resource bucket. +* ``np``: optional integer-like processor count used as the default priority + and capacity requirement. Schedulers supply callbacks for lookup, ordering, readiness, and machine admission. ``ReadyWorkSet`` deliberately does not know what CREATED, waits, @@ -22,7 +22,7 @@ class ReadyWorkSet: - """Maintain resource-binned heaps of structurally ready scheduler work. + """Maintain priority-binned heaps of structurally ready scheduler work. The work set stores heap entries as ``(order, serial)`` tuples rather than storing item objects directly. This keeps heap ordering stable and lets a @@ -30,7 +30,15 @@ class ReadyWorkSet: candidate is considered. """ - def __init__(self, item_lookup, order_lookup, resource_bucket=None, serial_lookup=None): + def __init__( + self, + item_lookup, + order_lookup, + priority_lookup=None, + serial_lookup=None, + resource_bucket=None, + capacity_lookup=None, + ): """Create an empty ready work set. Args: @@ -39,19 +47,29 @@ def __init__(self, item_lookup, order_lookup, resource_bucket=None, serial_looku serial id from a heap entry back to the live scheduler item. order_lookup (callable): Function with signature ``order_lookup(item: object) -> int``. Lower values run first - inside one resource bucket. - resource_bucket (callable, optional): Function with signature - ``resource_bucket(item: object) -> int``. It maps an item to a - positive resource bucket. The default uses ``item.np`` or - ``1`` when ``np`` is missing. + inside one priority bucket. + priority_lookup (callable, optional): Function with signature + ``priority_lookup(item: object) -> int``. Higher values are + considered first. The default uses ``item.np`` or ``1`` when + ``np`` is missing. serial_lookup (callable, optional): Function with signature ``serial_lookup(item: object) -> int``. It returns the stable unique id for an item. The default uses ``item.serialNumber``. + resource_bucket (callable, optional): Backward-compatible alias for + ``priority_lookup``. + capacity_lookup (callable, optional): Function with signature + ``capacity_lookup(item: object) -> int``. It maps an item to + the capacity units compared with ``available_slots`` in + ``pop_next``. The default uses ``item.np`` or ``1`` when + ``np`` is missing. """ self._item_lookup = item_lookup self._order_lookup = order_lookup - self._resource_bucket = resource_bucket or self.default_resource_bucket + if priority_lookup is None and resource_bucket is not None: + priority_lookup = resource_bucket + self._priority_lookup = priority_lookup or self.default_priority self._serial_lookup = serial_lookup or self.default_serial + self._capacity_lookup = capacity_lookup or self.default_capacity self.reset() def reset(self): @@ -64,8 +82,23 @@ def reset(self): self._ready_serials = set() @staticmethod - def default_resource_bucket(item): - """Map an ATS test-like item to its processor-count bucket. + def default_priority(item): + """Map an ATS test-like item to its default integer priority. + + Args: + item (object): Test-like object. If present, ``item.np`` must be + integer-like. + + Returns: + int: ``max(1, int(item.np))``, or ``1`` if ``item`` has no ``np``. + """ + return max(1, int(getattr(item, "np", 1))) + + default_resource_bucket = default_priority + + @staticmethod + def default_capacity(item): + """Map an ATS test-like item to its default capacity requirement. Args: item (object): Test-like object. If present, ``item.np`` must be @@ -89,20 +122,36 @@ def default_serial(item): """ return item.serialNumber + def priority_for(self, item): + """Map ``item`` to an integer priority. + + Args: + item (object): Scheduler item accepted by this work set's + ``priority_lookup`` callback. + + Returns: + int: Priority for ``item``. Higher values are considered first. + """ + return int(self._priority_lookup(item)) + def bucket_for(self, item): - """Map ``item`` to a positive integer resource bucket. + """Backward-compatible alias for :meth:`priority_for`.""" + return self.priority_for(item) + + def capacity_for(self, item): + """Map ``item`` to a capacity requirement. Args: item (object): Scheduler item accepted by this work set's - ``resource_bucket`` callback. + ``capacity_lookup`` callback. Returns: - int: Positive resource bucket for ``item``. + int: Non-negative capacity requirement for ``item``. """ - return max(1, int(self._resource_bucket(item))) + return max(0, int(self._capacity_lookup(item))) def order_of(self, item): - """Return the stable scheduler order used inside ready buckets. + """Return the stable scheduler order used inside ready priorities. Args: item (object): Scheduler item accepted by this work set's @@ -110,7 +159,7 @@ def order_of(self, item): Returns: int: Stable order key. Lower values are selected first inside the - same resource bucket. + same priority. """ return self._order_lookup(item) @@ -134,16 +183,17 @@ def enqueue_if_ready(self, item, ready_predicate): serial = self._serial_lookup(item) if serial in self._ready_serials: return False - heapq.heappush(self._ready_heaps[self.bucket_for(item)], self._heap_key(item)) + heapq.heappush(self._ready_heaps[self.priority_for(item)], self._heap_key(item)) self._ready_serials.add(serial) return True def pop_next(self, available_slots, ready_predicate, can_run, blocked_predicate=None): - """Pop the largest fitting runnable item while restoring deferred candidates. + """Pop the highest-priority runnable item while restoring deferred candidates. Args: - available_slots (int): Current machine capacity. Buckets larger - than this value are skipped. + available_slots (int): Current machine capacity. Candidates whose + ``capacity_lookup`` value exceeds this are left queued; priority + itself does not need to represent capacity. ready_predicate (callable): Function with signature ``ready_predicate(item: object) -> bool``. It is rechecked for each candidate so stale heap entries can be discarded. @@ -162,37 +212,38 @@ def pop_next(self, available_slots, ready_predicate, can_run, blocked_predicate= ready candidate was retained by ``blocked_predicate``. Notes: - Candidates that still satisfy ``ready_predicate`` but fail - ``can_run`` are restored to their original buckets before return. - Candidates that no longer satisfy ``ready_predicate`` are discarded - as stale entries. + Candidates that still satisfy ``ready_predicate`` but fail capacity + or ``can_run`` are restored to their original priorities before + return. Candidates that no longer satisfy ``ready_predicate`` are + discarded as stale entries. """ persistence_blocked = False deferred = defaultdict(list) - for bucket in sorted(self._ready_heaps.keys(), reverse=True): - if bucket > available_slots: - continue - heap = self._ready_heaps[bucket] + for priority in sorted(self._ready_heaps.keys(), reverse=True): + heap = self._ready_heaps[priority] while heap: order_index, serial = heapq.heappop(heap) self._ready_serials.discard(serial) candidate = self._item_lookup(serial) if candidate is None or not ready_predicate(candidate): continue + if available_slots is not None and self.capacity_for(candidate) > available_slots: + deferred[priority].append((order_index, serial)) + continue if blocked_predicate is not None and blocked_predicate(candidate): persistence_blocked = True - deferred[bucket].append((order_index, serial)) + deferred[priority].append((order_index, serial)) continue if can_run(candidate): self._restore_deferred(deferred) return candidate, persistence_blocked - deferred[bucket].append((order_index, serial)) + deferred[priority].append((order_index, serial)) self._restore_deferred(deferred) return None, persistence_blocked def remove(self, item): - """Remove ``item`` from its ready bucket if it is still queued. + """Remove ``item`` from its ready priority if it is still queued. Args: item (object | None): Scheduler item to remove. ``None`` is @@ -205,8 +256,8 @@ def remove(self, item): if item is None: return False serial = self._serial_lookup(item) - bucket = self.bucket_for(item) - heap = self._ready_heaps.get(bucket) + priority = self.priority_for(item) + heap = self._ready_heaps.get(priority) if not heap: self._ready_serials.discard(serial) return False @@ -221,10 +272,10 @@ def remove(self, item): return True def has_candidates(self): - """Return whether any ready bucket still contains heap entries. + """Return whether any ready priority still contains heap entries. Returns: - bool: ``True`` if any bucket contains queued heap entries. This is + bool: ``True`` if any priority contains queued heap entries. This is a cheap structural check; entries may still be stale. """ return any(heap for heap in self._ready_heaps.values()) @@ -258,7 +309,7 @@ def live_ready_count(self): return len(self._ready_serials) def bucket_counts(self, predicate=None): - """Return queued item counts by resource bucket. + """Return queued item counts by priority. Args: predicate (callable, optional): Function with signature @@ -266,7 +317,7 @@ def bucket_counts(self, predicate=None): items accepted by this predicate are counted. Returns: - dict: Mapping ``bucket: int`` to ``count: int`` for queued live + dict: Mapping ``priority: int`` to ``count: int`` for queued live items. """ counts = defaultdict(int) @@ -276,23 +327,46 @@ def bucket_counts(self, predicate=None): continue if predicate is not None and not predicate(item): continue - counts[self.bucket_for(item)] += 1 + counts[self.priority_for(item)] += 1 return dict(counts) + def priority_counts(self, predicate=None): + """Return queued item counts by priority. + + Args: + predicate (callable, optional): Function with signature + ``predicate(item: object) -> bool``. If provided, only live + items accepted by this predicate are counted. + + Returns: + dict: Mapping ``priority: int`` to ``count: int`` for queued live + items. + """ + return self.bucket_counts(predicate) + def buckets(self): - """Return a snapshot of bucket keys currently known to the work set. + """Return a snapshot of priority keys currently known to the work set. Returns: - list: Resource bucket integers. Empty buckets may be present if + list: Priority integers. Empty priorities may be present if they previously held work. """ return list(self._ready_heaps.keys()) - def candidates_for_bucket(self, bucket, ready_predicate, candidate_predicate=None, limit=None): - """Return queued candidates from one bucket without mutating the work set. + def priorities(self): + """Return a snapshot of priority keys currently known to the work set. + + Returns: + list: Priority integers. Empty priorities may be present if they + previously held work. + """ + return self.buckets() + + def candidates_for_priority(self, priority, ready_predicate, candidate_predicate=None, limit=None): + """Return queued candidates from one priority without mutating the work set. Args: - bucket (int): Resource bucket to inspect. + priority (int): Priority to inspect. ready_predicate (callable): Function with signature ``ready_predicate(item: object) -> bool``. Stale or no-longer ready candidates are skipped in the returned list, but not @@ -300,14 +374,14 @@ def candidates_for_bucket(self, bucket, ready_predicate, candidate_predicate=Non candidate_predicate (callable, optional): Additional filter with signature ``candidate_predicate(item: object) -> bool``. limit (int, optional): Maximum number of candidates to return. If - ``None``, all matching candidates in the bucket are returned. + ``None``, all matching candidates in the priority are returned. Returns: - list: Live scheduler items from ``bucket`` in stable in-bucket + list: Live scheduler items from ``priority`` in stable in-priority order. """ candidates = [] - for _order_index, serial in sorted(self._ready_heaps.get(bucket, [])): + for _order_index, serial in sorted(self._ready_heaps.get(priority, [])): candidate = self._item_lookup(serial) if candidate is None or not ready_predicate(candidate): continue @@ -318,6 +392,10 @@ def candidates_for_bucket(self, bucket, ready_predicate, candidate_predicate=Non break return candidates + def candidates_for_bucket(self, bucket, ready_predicate, candidate_predicate=None, limit=None): + """Backward-compatible alias for :meth:`candidates_for_priority`.""" + return self.candidates_for_priority(bucket, ready_predicate, candidate_predicate, limit) + def _heap_key(self, item): """Build the stored heap key for ``item``. @@ -334,13 +412,13 @@ def _restore_deferred(self, deferred): """Restore deferred heap entries after a pop attempt. Args: - deferred (dict): Mapping ``bucket: int`` to lists of heap-key + deferred (dict): Mapping ``priority: int`` to lists of heap-key tuples ``(order: int, serial: int)``. Returns: None. """ - for bucket, items in deferred.items(): + for priority, items in deferred.items(): for item in items: - heapq.heappush(self._ready_heaps[bucket], item) + heapq.heappush(self._ready_heaps[priority], item) self._ready_serials.add(item[-1]) diff --git a/test/test_ready_queue_example.py b/test/test_ready_queue_example.py index f7bcfef..34cc458 100644 --- a/test/test_ready_queue_example.py +++ b/test/test_ready_queue_example.py @@ -148,7 +148,7 @@ def startRun(self, test): return True -def _make_test(serial, np=1, block=None, wait_until=None): +def _make_test(serial, np=1, block=None, wait_until=None, priority=None): return types.SimpleNamespace( serialNumber=serial, status=CREATED, @@ -157,6 +157,7 @@ def _make_test(serial, np=1, block=None, wait_until=None): block=block, independent=False, np=np, + priority=priority if priority is not None else np, ) @@ -190,7 +191,7 @@ def test_launches_ready_work_and_unblocks_direct_dependents(self): self.assertEqual(machine.launched, [1, 2]) self.assertIs(child.status, RUNNING) - def test_prefers_largest_fitting_bucket_and_restores_blocked_candidate(self): + def test_prefers_default_processor_count_priority_and_restores_blocked_candidate(self): small = _make_test(1, np=1, block="small") large = _make_test(2, np=4, block="large") small.group = _Group(1, [small]) @@ -214,6 +215,34 @@ def test_prefers_largest_fitting_bucket_and_restores_blocked_candidate(self): self.assertEqual(machine.launched, [1, 2]) self.assertIs(large.status, RUNNING) + def test_can_prioritize_independently_from_processor_count(self): + low_priority_large = _make_test(1, np=4, priority=10) + high_priority_small = _make_test(2, np=1, priority=20) + tests_by_serial = { + low_priority_large.serialNumber: low_priority_large, + high_priority_small.serialNumber: high_priority_small, + } + order = { + low_priority_large.serialNumber: 0, + high_priority_small.serialNumber: 1, + } + ready = ReadyWorkSet( + item_lookup=tests_by_serial.get, + order_lookup=lambda test: order[test.serialNumber], + priority_lookup=lambda test: test.priority, + ) + ready.enqueue_if_ready(low_priority_large, lambda test: test.status is CREATED) + ready.enqueue_if_ready(high_priority_small, lambda test: test.status is CREATED) + + selected, blocked = ready.pop_next( + available_slots=4, + ready_predicate=lambda test: test.status is CREATED, + can_run=lambda _test: True, + ) + + self.assertIs(selected, high_priority_small) + self.assertFalse(blocked) + if __name__ == "__main__": unittest.main() From b74e8e2c24fa6389b229fc42c471a999fd09264a Mon Sep 17 00:00:00 2001 From: Peter B Robinson Date: Thu, 14 May 2026 14:57:51 -0700 Subject: [PATCH 07/15] Use item priority in ready queue default --- ats/ready_queue.py | 18 ++++++++++-------- test/test_ready_queue_example.py | 3 +-- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/ats/ready_queue.py b/ats/ready_queue.py index 0910f7a..0a29cb9 100644 --- a/ats/ready_queue.py +++ b/ats/ready_queue.py @@ -10,8 +10,9 @@ have: * ``serialNumber``: integer-like unique id for this scheduler; -* ``np``: optional integer-like processor count used as the default priority - and capacity requirement. +* ``priority``: optional integer-like scheduling priority; +* ``np``: optional integer-like processor count used as the default capacity + requirement. Schedulers supply callbacks for lookup, ordering, readiness, and machine admission. ``ReadyWorkSet`` deliberately does not know what CREATED, waits, @@ -50,8 +51,8 @@ def __init__( inside one priority bucket. priority_lookup (callable, optional): Function with signature ``priority_lookup(item: object) -> int``. Higher values are - considered first. The default uses ``item.np`` or ``1`` when - ``np`` is missing. + considered first. The default uses ``item.priority`` or ``1`` + when ``priority`` is missing. serial_lookup (callable, optional): Function with signature ``serial_lookup(item: object) -> int``. It returns the stable unique id for an item. The default uses ``item.serialNumber``. @@ -86,13 +87,14 @@ def default_priority(item): """Map an ATS test-like item to its default integer priority. Args: - item (object): Test-like object. If present, ``item.np`` must be - integer-like. + item (object): Test-like object. If present, ``item.priority`` + must be integer-like. Returns: - int: ``max(1, int(item.np))``, or ``1`` if ``item`` has no ``np``. + int: ``int(item.priority)``, or ``1`` if ``item`` has no + ``priority``. """ - return max(1, int(getattr(item, "np", 1))) + return int(getattr(item, "priority", 1)) default_resource_bucket = default_priority diff --git a/test/test_ready_queue_example.py b/test/test_ready_queue_example.py index 34cc458..c17f20f 100644 --- a/test/test_ready_queue_example.py +++ b/test/test_ready_queue_example.py @@ -215,7 +215,7 @@ def test_prefers_default_processor_count_priority_and_restores_blocked_candidate self.assertEqual(machine.launched, [1, 2]) self.assertIs(large.status, RUNNING) - def test_can_prioritize_independently_from_processor_count(self): + def test_default_priority_uses_item_priority(self): low_priority_large = _make_test(1, np=4, priority=10) high_priority_small = _make_test(2, np=1, priority=20) tests_by_serial = { @@ -229,7 +229,6 @@ def test_can_prioritize_independently_from_processor_count(self): ready = ReadyWorkSet( item_lookup=tests_by_serial.get, order_lookup=lambda test: order[test.serialNumber], - priority_lookup=lambda test: test.priority, ) ready.enqueue_if_ready(low_priority_large, lambda test: test.status is CREATED) ready.enqueue_if_ready(high_priority_small, lambda test: test.status is CREATED) From 8e54e5b6c24f4c76e380b6d65285f593428dd320 Mon Sep 17 00:00:00 2001 From: "Peter B. Robinson" Date: Thu, 14 May 2026 15:41:15 -0700 Subject: [PATCH 08/15] comment cleanup. --- ats/ready_queue.py | 14 ++++----- docs/source/ats.rst | 43 +++++++++++++++++----------- docs/source/scheduler_extensions.rst | 14 +++++++++ 3 files changed, 45 insertions(+), 26 deletions(-) diff --git a/ats/ready_queue.py b/ats/ready_queue.py index 0a29cb9..2f4594f 100644 --- a/ats/ready_queue.py +++ b/ats/ready_queue.py @@ -11,8 +11,6 @@ * ``serialNumber``: integer-like unique id for this scheduler; * ``priority``: optional integer-like scheduling priority; -* ``np``: optional integer-like processor count used as the default capacity - requirement. Schedulers supply callbacks for lookup, ordering, readiness, and machine admission. ``ReadyWorkSet`` deliberately does not know what CREATED, waits, @@ -61,8 +59,7 @@ def __init__( capacity_lookup (callable, optional): Function with signature ``capacity_lookup(item: object) -> int``. It maps an item to the capacity units compared with ``available_slots`` in - ``pop_next``. The default uses ``item.np`` or ``1`` when - ``np`` is missing. + ``pop_next``. """ self._item_lookup = item_lookup self._order_lookup = order_lookup @@ -91,8 +88,7 @@ def default_priority(item): must be integer-like. Returns: - int: ``int(item.priority)``, or ``1`` if ``item`` has no - ``priority``. + int: ``int(item.priority)``, or ``1`` if ``item`` has no ``priority``. """ return int(getattr(item, "priority", 1)) @@ -103,13 +99,13 @@ def default_capacity(item): """Map an ATS test-like item to its default capacity requirement. Args: - item (object): Test-like object. If present, ``item.np`` must be + item (object): Test-like object. If present, ``item.priority`` must be integer-like. Returns: - int: ``max(1, int(item.np))``, or ``1`` if ``item`` has no ``np``. + int: ``max(1, int(item.priority))``, or ``1`` if ``item`` has no ``priority``. """ - return max(1, int(getattr(item, "np", 1))) + return max(1, int(getattr(item, "prioirty", 1))) @staticmethod def default_serial(item): diff --git a/docs/source/ats.rst b/docs/source/ats.rst index 609acb5..a147406 100755 --- a/docs/source/ats.rst +++ b/docs/source/ats.rst @@ -703,16 +703,13 @@ to understand what influenced the scheduling: * Test attribute runOrder, an integer indicating the order of test launch. .. note:: - Important: by default two tests will not be run in the same directory at the same time. - -This is a modestly conservative scheme to avoid common resource conflicts when testing -one file with different parameters. + Important: by default two non-independent tests will not be run in the same + directory at the same time. See + :ref:`directory blocking ` for the full rule. -If you know a test does not have such a problem, you can give it the option -``independent = True``. Note that the ``group`` command makes the default value of -``independent`` False for all members of the group, overriding anything except an actual -option in the test statement. Thus if you do not want this behavior for the group -you must use independent = True as an argument in your group command. +This is a modestly conservative scheme to avoid common resource conflicts when +testing one file with different parameters. If you know a test does not have +such a problem, you can give it the option ``independent = True``. The standard scheduler sorts the groups by the highest priority test in the group. In effect, every member of a group behaves as if it has the priority of the highest-priority test in the @@ -988,14 +985,26 @@ keep .. _directory_blocking: independent - If independent is True, the user is certifying that there is no obstacle to - this test executing at the same time as any other test. Otherwise, by default - tests are assumed to conflict with others in the same directory, because - they might write files there with the same names as those read or written by - other tests. If two tests conflict, they are never run at the same time. - Judicious use of independent = True will increase ATS's throughput. - We suggest that while a stick(independent=True) may be appropriate, - in some test files, to glue this definition may be reckless. + If ``independent`` is True, the user is certifying that there is no obstacle + to this test executing at the same time as any other test. Otherwise, by + default tests are assumed to conflict with other non-independent tests in the + same directory, because they might write files there with the same names as + those read or written by other tests. + + ATS implements this with a directory block. A test's block is normally its + execution directory. When a non-independent test starts, its group owns that + directory block. Another non-independent test or group using the same block + cannot start until the owning group's non-independent members have finished. + Other members of the owning group may continue to run under that block. + + The ``group`` command makes the default value of ``independent`` False for + all members of the group, overriding anything except an explicit option in + the test statement. If you do not want this behavior for the group, pass + ``independent=True`` to ``group``. + + Judicious use of ``independent=True`` will increase ATS's throughput. Use + it only when tests in the same directory do not share output files, generated + inputs, cleanup patterns, or other directory-local state. priority By default the priority of a test is np + the sum of the priorities of diff --git a/docs/source/scheduler_extensions.rst b/docs/source/scheduler_extensions.rst index d3d9f38..ddfa702 100644 --- a/docs/source/scheduler_extensions.rst +++ b/docs/source/scheduler_extensions.rst @@ -68,6 +68,20 @@ The class is intentionally policy-light. The owner supplies: defines readiness as: created, dependencies complete, not directory-blocked, and still relevant to the current run. +Directory Blocks +================ + +The user-facing directory-blocking model is documented in +:ref:`directory blocking `. Scheduler extensions only need +to preserve the scheduler responsibilities that follow from that model. + +Schedulers that cache readiness need to handle block changes incrementally: + +* index tests by ``test.block`` when tests are loaded; +* reject candidates whose block is currently owned by another group; +* after a test ends, remove its group's block if the group is no longer active; +* when a block clears, reconsider only tests indexed under that block. + Beginning Tutorial: A Cached Ready Scheduler ============================================ From 7b75b8fc9da48c78df462b5f946f178a9d0d19af Mon Sep 17 00:00:00 2001 From: "Peter B. Robinson" Date: Mon, 18 May 2026 14:27:37 -0700 Subject: [PATCH 09/15] simplifiy capacity / available slots infrastructure (mosty eliminated) --- ats/ready_queue.py | 93 +++------------------------- docs/source/scheduler_extensions.rst | 17 +++-- test/test_ready_queue_example.py | 2 - 3 files changed, 17 insertions(+), 95 deletions(-) diff --git a/ats/ready_queue.py b/ats/ready_queue.py index 2f4594f..863d082 100644 --- a/ats/ready_queue.py +++ b/ats/ready_queue.py @@ -1,9 +1,9 @@ """Cached ready-queue helpers for ATS schedulers. -``ReadyWorkSet`` tracks work that is structurally ready but still needs a -machine-specific capacity check before launch. It keeps FIFO scheduler order -within integer-priority buckets, prefers the highest runnable priority, and -restores deferred candidates when a machine policy cannot run them yet. +``ReadyWorkSet`` tracks work that is structurally ready for scheduler +consideration. It keeps FIFO scheduler order within integer-priority buckets, +prefers the highest runnable priority, and restores deferred candidates when a +machine policy cannot run them yet. Items are usually ``ats.tests.AtsTest`` instances, but the class only requires test-like objects with stable serial identifiers. By default, an item must @@ -35,8 +35,6 @@ def __init__( order_lookup, priority_lookup=None, serial_lookup=None, - resource_bucket=None, - capacity_lookup=None, ): """Create an empty ready work set. @@ -54,20 +52,11 @@ def __init__( serial_lookup (callable, optional): Function with signature ``serial_lookup(item: object) -> int``. It returns the stable unique id for an item. The default uses ``item.serialNumber``. - resource_bucket (callable, optional): Backward-compatible alias for - ``priority_lookup``. - capacity_lookup (callable, optional): Function with signature - ``capacity_lookup(item: object) -> int``. It maps an item to - the capacity units compared with ``available_slots`` in - ``pop_next``. """ self._item_lookup = item_lookup self._order_lookup = order_lookup - if priority_lookup is None and resource_bucket is not None: - priority_lookup = resource_bucket self._priority_lookup = priority_lookup or self.default_priority self._serial_lookup = serial_lookup or self.default_serial - self._capacity_lookup = capacity_lookup or self.default_capacity self.reset() def reset(self): @@ -92,21 +81,6 @@ def default_priority(item): """ return int(getattr(item, "priority", 1)) - default_resource_bucket = default_priority - - @staticmethod - def default_capacity(item): - """Map an ATS test-like item to its default capacity requirement. - - Args: - item (object): Test-like object. If present, ``item.priority`` must be - integer-like. - - Returns: - int: ``max(1, int(item.priority))``, or ``1`` if ``item`` has no ``priority``. - """ - return max(1, int(getattr(item, "prioirty", 1))) - @staticmethod def default_serial(item): """Return an ATS test-like item's stable serial identifier. @@ -132,22 +106,6 @@ def priority_for(self, item): """ return int(self._priority_lookup(item)) - def bucket_for(self, item): - """Backward-compatible alias for :meth:`priority_for`.""" - return self.priority_for(item) - - def capacity_for(self, item): - """Map ``item`` to a capacity requirement. - - Args: - item (object): Scheduler item accepted by this work set's - ``capacity_lookup`` callback. - - Returns: - int: Non-negative capacity requirement for ``item``. - """ - return max(0, int(self._capacity_lookup(item))) - def order_of(self, item): """Return the stable scheduler order used inside ready priorities. @@ -185,13 +143,10 @@ def enqueue_if_ready(self, item, ready_predicate): self._ready_serials.add(serial) return True - def pop_next(self, available_slots, ready_predicate, can_run, blocked_predicate=None): + def pop_next(self, ready_predicate, can_run, blocked_predicate=None): """Pop the highest-priority runnable item while restoring deferred candidates. Args: - available_slots (int): Current machine capacity. Candidates whose - ``capacity_lookup`` value exceeds this are left queued; priority - itself does not need to represent capacity. ready_predicate (callable): Function with signature ``ready_predicate(item: object) -> bool``. It is rechecked for each candidate so stale heap entries can be discarded. @@ -210,8 +165,8 @@ def pop_next(self, available_slots, ready_predicate, can_run, blocked_predicate= ready candidate was retained by ``blocked_predicate``. Notes: - Candidates that still satisfy ``ready_predicate`` but fail capacity - or ``can_run`` are restored to their original priorities before + Candidates that still satisfy ``ready_predicate`` but fail + ``can_run`` are restored to their original priorities before return. Candidates that no longer satisfy ``ready_predicate`` are discarded as stale entries. """ @@ -225,9 +180,6 @@ def pop_next(self, available_slots, ready_predicate, can_run, blocked_predicate= candidate = self._item_lookup(serial) if candidate is None or not ready_predicate(candidate): continue - if available_slots is not None and self.capacity_for(candidate) > available_slots: - deferred[priority].append((order_index, serial)) - continue if blocked_predicate is not None and blocked_predicate(candidate): persistence_blocked = True deferred[priority].append((order_index, serial)) @@ -306,7 +258,7 @@ def live_ready_count(self): """ return len(self._ready_serials) - def bucket_counts(self, predicate=None): + def priority_counts(self, predicate=None): """Return queued item counts by priority. Args: @@ -328,21 +280,7 @@ def bucket_counts(self, predicate=None): counts[self.priority_for(item)] += 1 return dict(counts) - def priority_counts(self, predicate=None): - """Return queued item counts by priority. - - Args: - predicate (callable, optional): Function with signature - ``predicate(item: object) -> bool``. If provided, only live - items accepted by this predicate are counted. - - Returns: - dict: Mapping ``priority: int`` to ``count: int`` for queued live - items. - """ - return self.bucket_counts(predicate) - - def buckets(self): + def priorities(self): """Return a snapshot of priority keys currently known to the work set. Returns: @@ -351,15 +289,6 @@ def buckets(self): """ return list(self._ready_heaps.keys()) - def priorities(self): - """Return a snapshot of priority keys currently known to the work set. - - Returns: - list: Priority integers. Empty priorities may be present if they - previously held work. - """ - return self.buckets() - def candidates_for_priority(self, priority, ready_predicate, candidate_predicate=None, limit=None): """Return queued candidates from one priority without mutating the work set. @@ -390,10 +319,6 @@ def candidates_for_priority(self, priority, ready_predicate, candidate_predicate break return candidates - def candidates_for_bucket(self, bucket, ready_predicate, candidate_predicate=None, limit=None): - """Backward-compatible alias for :meth:`candidates_for_priority`.""" - return self.candidates_for_priority(bucket, ready_predicate, candidate_predicate, limit) - def _heap_key(self, item): """Build the stored heap key for ``item``. diff --git a/docs/source/scheduler_extensions.rst b/docs/source/scheduler_extensions.rst index ddfa702..06a31c1 100644 --- a/docs/source/scheduler_extensions.rst +++ b/docs/source/scheduler_extensions.rst @@ -52,15 +52,15 @@ A custom scheduler should preserve two invariants: ReadyWorkSet ============ -``ats.ready_queue.ReadyWorkSet`` stores work that is structurally ready but -still needs a machine capacity check. It groups work into resource buckets, -usually processor counts, and returns the largest fitting candidate first. +``ats.ready_queue.ReadyWorkSet`` stores work that is structurally ready. It +groups work into priority buckets, usually processor counts, and returns the +largest runnable candidate first. The class is intentionally policy-light. The owner supplies: * ``item_lookup(serial)`` to map stable ids back to live test objects; * ``order_lookup(item)`` to preserve scheduler order inside buckets; -* an optional ``resource_bucket(item)`` if ``item.np`` is not the right bucket; +* an optional ``priority_lookup(item)`` if ``item.np`` is not the right bucket; * a ``ready_predicate(item)`` each time candidates are enqueued or popped; * a ``can_run(item)`` predicate, normally ``machine.canRunNow``. @@ -248,12 +248,11 @@ dependent updates, directory-block updates, and launch-failure recovery. self.ready.enqueue_if_ready(candidate, self.is_ready) def next_ready(self, machine): - # ``pop_next`` picks the largest resource bucket that fits the - # current capacity, then checks machine policy. Candidates that are - # still structurally ready but cannot run now are restored so a later - # scheduler pass can reconsider them. + # ``pop_next`` picks the largest priority bucket, then checks + # machine policy. Candidates that are still structurally ready but + # cannot run now are restored so a later scheduler pass can + # reconsider them. test, _blocked = self.ready.pop_next( - machine.remainingCapacity(), self.is_ready, machine.canRunNow, ) diff --git a/test/test_ready_queue_example.py b/test/test_ready_queue_example.py index c17f20f..44f6023 100644 --- a/test/test_ready_queue_example.py +++ b/test/test_ready_queue_example.py @@ -102,7 +102,6 @@ def test_ended(self, test): def next_ready(self, machine): test, _blocked = self.ready.pop_next( - machine.remainingCapacity(), self.is_ready, machine.canRunNow, ) @@ -234,7 +233,6 @@ def test_default_priority_uses_item_priority(self): ready.enqueue_if_ready(high_priority_small, lambda test: test.status is CREATED) selected, blocked = ready.pop_next( - available_slots=4, ready_predicate=lambda test: test.status is CREATED, can_run=lambda _test: True, ) From 276e036b6c60c7fc8821eb24b900980fbf41b45b Mon Sep 17 00:00:00 2001 From: "Peter B. Robinson" Date: Mon, 18 May 2026 15:03:54 -0700 Subject: [PATCH 10/15] some test cleanup --- test/test_ready_queue_example.py | 44 +++++++------------------------- 1 file changed, 9 insertions(+), 35 deletions(-) diff --git a/test/test_ready_queue_example.py b/test/test_ready_queue_example.py index 44f6023..820f33f 100644 --- a/test/test_ready_queue_example.py +++ b/test/test_ready_queue_example.py @@ -147,7 +147,7 @@ def startRun(self, test): return True -def _make_test(serial, np=1, block=None, wait_until=None, priority=None): +def _make_test(serial, priority, block=None, wait_until=None); return types.SimpleNamespace( serialNumber=serial, status=CREATED, @@ -155,16 +155,17 @@ def _make_test(serial, np=1, block=None, wait_until=None, priority=None): dependents=[], block=block, independent=False, - np=np, - priority=priority if priority is not None else np, + np=priority, + priority=priority + ) class ReadySchedulerExampleTest(unittest.TestCase): def test_launches_ready_work_and_unblocks_direct_dependents(self): - parent = _make_test(1, np=1, block="case") - child = _make_test(2, np=1, block="case", wait_until=[parent]) - other = _make_test(3, np=1, block="other") + parent = _make_test(1, 1, block="case") + child = _make_test(2, 1, block="case", wait_until=[parent]) + other = _make_test(3, 1, block="other") parent.dependents = [child] group = _Group(1, [parent, child]) @@ -191,8 +192,8 @@ def test_launches_ready_work_and_unblocks_direct_dependents(self): self.assertIs(child.status, RUNNING) def test_prefers_default_processor_count_priority_and_restores_blocked_candidate(self): - small = _make_test(1, np=1, block="small") - large = _make_test(2, np=4, block="large") + small = _make_test(1, 1, block="small") + large = _make_test(2, 4, block="large") small.group = _Group(1, [small]) large.group = _Group(2, [large]) @@ -214,32 +215,5 @@ def test_prefers_default_processor_count_priority_and_restores_blocked_candidate self.assertEqual(machine.launched, [1, 2]) self.assertIs(large.status, RUNNING) - def test_default_priority_uses_item_priority(self): - low_priority_large = _make_test(1, np=4, priority=10) - high_priority_small = _make_test(2, np=1, priority=20) - tests_by_serial = { - low_priority_large.serialNumber: low_priority_large, - high_priority_small.serialNumber: high_priority_small, - } - order = { - low_priority_large.serialNumber: 0, - high_priority_small.serialNumber: 1, - } - ready = ReadyWorkSet( - item_lookup=tests_by_serial.get, - order_lookup=lambda test: order[test.serialNumber], - ) - ready.enqueue_if_ready(low_priority_large, lambda test: test.status is CREATED) - ready.enqueue_if_ready(high_priority_small, lambda test: test.status is CREATED) - - selected, blocked = ready.pop_next( - ready_predicate=lambda test: test.status is CREATED, - can_run=lambda _test: True, - ) - - self.assertIs(selected, high_priority_small) - self.assertFalse(blocked) - - if __name__ == "__main__": unittest.main() From 50a47a29a8ba9b49a8a98348c72ae2211e77ef11 Mon Sep 17 00:00:00 2001 From: Peter B Robinson Date: Mon, 18 May 2026 15:08:38 -0700 Subject: [PATCH 11/15] Fix ready queue example test --- test/test_ready_queue_example.py | 37 ++++++++++++++++++++++++++------ 1 file changed, 31 insertions(+), 6 deletions(-) diff --git a/test/test_ready_queue_example.py b/test/test_ready_queue_example.py index 820f33f..acbe353 100644 --- a/test/test_ready_queue_example.py +++ b/test/test_ready_queue_example.py @@ -147,7 +147,7 @@ def startRun(self, test): return True -def _make_test(serial, priority, block=None, wait_until=None); +def _make_test(serial, np=1, block=None, wait_until=None, priority=None): return types.SimpleNamespace( serialNumber=serial, status=CREATED, @@ -155,9 +155,8 @@ def _make_test(serial, priority, block=None, wait_until=None); dependents=[], block=block, independent=False, - np=priority, - priority=priority - + np=np, + priority=priority if priority is not None else np, ) @@ -192,8 +191,8 @@ def test_launches_ready_work_and_unblocks_direct_dependents(self): self.assertIs(child.status, RUNNING) def test_prefers_default_processor_count_priority_and_restores_blocked_candidate(self): - small = _make_test(1, 1, block="small") - large = _make_test(2, 4, block="large") + small = _make_test(1, np=1, block="small") + large = _make_test(2, np=4, block="large") small.group = _Group(1, [small]) large.group = _Group(2, [large]) @@ -215,5 +214,31 @@ def test_prefers_default_processor_count_priority_and_restores_blocked_candidate self.assertEqual(machine.launched, [1, 2]) self.assertIs(large.status, RUNNING) + def test_default_priority_uses_item_priority(self): + low_priority_large = _make_test(1, np=4, priority=10) + high_priority_small = _make_test(2, np=1, priority=20) + tests_by_serial = { + low_priority_large.serialNumber: low_priority_large, + high_priority_small.serialNumber: high_priority_small, + } + order = { + low_priority_large.serialNumber: 0, + high_priority_small.serialNumber: 1, + } + ready = ReadyWorkSet( + item_lookup=tests_by_serial.get, + order_lookup=lambda test: order[test.serialNumber], + ) + ready.enqueue_if_ready(low_priority_large, lambda test: test.status is CREATED) + ready.enqueue_if_ready(high_priority_small, lambda test: test.status is CREATED) + + selected, blocked = ready.pop_next( + ready_predicate=lambda test: test.status is CREATED, + can_run=lambda _test: True, + ) + + self.assertIs(selected, high_priority_small) + self.assertFalse(blocked) + if __name__ == "__main__": unittest.main() From e9013e68af17777f318e74dc61f885d1510e0ba9 Mon Sep 17 00:00:00 2001 From: "Peter B. Robinson" Date: Mon, 18 May 2026 15:12:49 -0700 Subject: [PATCH 12/15] Revert "Fix ready queue example test" This reverts commit b24c45b86f44b78afaf6a82bfc272061889c9b3f. --- test/test_ready_queue_example.py | 37 ++++++-------------------------- 1 file changed, 6 insertions(+), 31 deletions(-) diff --git a/test/test_ready_queue_example.py b/test/test_ready_queue_example.py index acbe353..820f33f 100644 --- a/test/test_ready_queue_example.py +++ b/test/test_ready_queue_example.py @@ -147,7 +147,7 @@ def startRun(self, test): return True -def _make_test(serial, np=1, block=None, wait_until=None, priority=None): +def _make_test(serial, priority, block=None, wait_until=None); return types.SimpleNamespace( serialNumber=serial, status=CREATED, @@ -155,8 +155,9 @@ def _make_test(serial, np=1, block=None, wait_until=None, priority=None): dependents=[], block=block, independent=False, - np=np, - priority=priority if priority is not None else np, + np=priority, + priority=priority + ) @@ -191,8 +192,8 @@ def test_launches_ready_work_and_unblocks_direct_dependents(self): self.assertIs(child.status, RUNNING) def test_prefers_default_processor_count_priority_and_restores_blocked_candidate(self): - small = _make_test(1, np=1, block="small") - large = _make_test(2, np=4, block="large") + small = _make_test(1, 1, block="small") + large = _make_test(2, 4, block="large") small.group = _Group(1, [small]) large.group = _Group(2, [large]) @@ -214,31 +215,5 @@ def test_prefers_default_processor_count_priority_and_restores_blocked_candidate self.assertEqual(machine.launched, [1, 2]) self.assertIs(large.status, RUNNING) - def test_default_priority_uses_item_priority(self): - low_priority_large = _make_test(1, np=4, priority=10) - high_priority_small = _make_test(2, np=1, priority=20) - tests_by_serial = { - low_priority_large.serialNumber: low_priority_large, - high_priority_small.serialNumber: high_priority_small, - } - order = { - low_priority_large.serialNumber: 0, - high_priority_small.serialNumber: 1, - } - ready = ReadyWorkSet( - item_lookup=tests_by_serial.get, - order_lookup=lambda test: order[test.serialNumber], - ) - ready.enqueue_if_ready(low_priority_large, lambda test: test.status is CREATED) - ready.enqueue_if_ready(high_priority_small, lambda test: test.status is CREATED) - - selected, blocked = ready.pop_next( - ready_predicate=lambda test: test.status is CREATED, - can_run=lambda _test: True, - ) - - self.assertIs(selected, high_priority_small) - self.assertFalse(blocked) - if __name__ == "__main__": unittest.main() From 9b44c97f378c2cd30dca422134ee796ef90d0bc1 Mon Sep 17 00:00:00 2001 From: Peter B Robinson Date: Mon, 18 May 2026 15:19:26 -0700 Subject: [PATCH 13/15] Document ready queue invariants --- ats/ready_queue.py | 13 ++++++--- docs/source/scheduler_extensions.rst | 12 ++++----- test/test_ready_queue_example.py | 40 +++++++++++++++++++++++----- 3 files changed, 49 insertions(+), 16 deletions(-) diff --git a/ats/ready_queue.py b/ats/ready_queue.py index 863d082..b0b2bd9 100644 --- a/ats/ready_queue.py +++ b/ats/ready_queue.py @@ -6,8 +6,9 @@ machine policy cannot run them yet. Items are usually ``ats.tests.AtsTest`` instances, but the class only requires -test-like objects with stable serial identifiers. By default, an item must -have: +test-like objects with stable serial identifiers. The lookup callbacks must +continue to return the same serial, order, and priority values for an item +while that item remains queued. By default, an item must have: * ``serialNumber``: integer-like unique id for this scheduler; * ``priority``: optional integer-like scheduling priority; @@ -44,14 +45,18 @@ def __init__( serial id from a heap entry back to the live scheduler item. order_lookup (callable): Function with signature ``order_lookup(item: object) -> int``. Lower values run first - inside one priority bucket. + inside one priority bucket, and the result must stay stable + for as long as an item remains queued. priority_lookup (callable, optional): Function with signature ``priority_lookup(item: object) -> int``. Higher values are considered first. The default uses ``item.priority`` or ``1`` - when ``priority`` is missing. + when ``priority`` is missing. The result must stay stable for + as long as an item remains queued. serial_lookup (callable, optional): Function with signature ``serial_lookup(item: object) -> int``. It returns the stable unique id for an item. The default uses ``item.serialNumber``. + The result must stay stable for as long as an item remains + queued. """ self._item_lookup = item_lookup self._order_lookup = order_lookup diff --git a/docs/source/scheduler_extensions.rst b/docs/source/scheduler_extensions.rst index 06a31c1..4ee417d 100644 --- a/docs/source/scheduler_extensions.rst +++ b/docs/source/scheduler_extensions.rst @@ -53,14 +53,14 @@ ReadyWorkSet ============ ``ats.ready_queue.ReadyWorkSet`` stores work that is structurally ready. It -groups work into priority buckets, usually processor counts, and returns the -largest runnable candidate first. +groups work into scheduler-defined priority buckets and returns the largest +runnable candidate first. The class is intentionally policy-light. The owner supplies: * ``item_lookup(serial)`` to map stable ids back to live test objects; * ``order_lookup(item)`` to preserve scheduler order inside buckets; -* an optional ``priority_lookup(item)`` if ``item.np`` is not the right bucket; +* an optional ``priority_lookup(item)`` if ``item.np`` is not the right priority key; * a ``ready_predicate(item)`` each time candidates are enqueued or popped; * a ``can_run(item)`` predicate, normally ``machine.canRunNow``. @@ -107,7 +107,7 @@ dependent updates, directory-block updates, and launch-failure recovery. # Stable order is separate from serial number. Serial numbers are # usually creation order, but a scheduler may sort groups or apply # priority before loading tests. Keep the order you want preserved - # inside each resource bucket. + # inside each priority bucket. self.order = {} # Cache the number of unfinished wait dependencies for each test. @@ -121,7 +121,7 @@ dependent updates, directory-block updates, and launch-failure recovery. self.tests_by_block = defaultdict(list) self.blocks = {} - # The ready set decides queue mechanics: resource buckets, stable + # The ready set decides queue mechanics: priority buckets, stable # in-bucket ordering, stale-entry cleanup, and temporary deferral of # candidates that are ready but cannot pass machine policy yet. self.ready = ReadyWorkSet( @@ -248,7 +248,7 @@ dependent updates, directory-block updates, and launch-failure recovery. self.ready.enqueue_if_ready(candidate, self.is_ready) def next_ready(self, machine): - # ``pop_next`` picks the largest priority bucket, then checks + # ``pop_next`` picks the highest-priority bucket, then checks # machine policy. Candidates that are still structurally ready but # cannot run now are restored so a later scheduler pass can # reconsider them. diff --git a/test/test_ready_queue_example.py b/test/test_ready_queue_example.py index 820f33f..6600083 100644 --- a/test/test_ready_queue_example.py +++ b/test/test_ready_queue_example.py @@ -147,7 +147,7 @@ def startRun(self, test): return True -def _make_test(serial, priority, block=None, wait_until=None); +def _make_test(serial, np=1, block=None, wait_until=None, priority=None): return types.SimpleNamespace( serialNumber=serial, status=CREATED, @@ -155,14 +155,14 @@ def _make_test(serial, priority, block=None, wait_until=None); dependents=[], block=block, independent=False, - np=priority, - priority=priority - + np=np, + priority=priority if priority is not None else np, ) class ReadySchedulerExampleTest(unittest.TestCase): def test_launches_ready_work_and_unblocks_direct_dependents(self): + """Verify that completion only unblocks direct dependents and relaunches them.""" parent = _make_test(1, 1, block="case") child = _make_test(2, 1, block="case", wait_until=[parent]) other = _make_test(3, 1, block="other") @@ -192,8 +192,9 @@ def test_launches_ready_work_and_unblocks_direct_dependents(self): self.assertIs(child.status, RUNNING) def test_prefers_default_processor_count_priority_and_restores_blocked_candidate(self): - small = _make_test(1, 1, block="small") - large = _make_test(2, 4, block="large") + """Verify that a blocked higher-bucket candidate is restored instead of dropped.""" + small = _make_test(1, np=1, block="small") + large = _make_test(2, np=4, block="large") small.group = _Group(1, [small]) large.group = _Group(2, [large]) @@ -215,5 +216,32 @@ def test_prefers_default_processor_count_priority_and_restores_blocked_candidate self.assertEqual(machine.launched, [1, 2]) self.assertIs(large.status, RUNNING) + def test_default_priority_uses_item_priority(self): + """Verify that ``ReadyWorkSet`` sorts by item priority, not by launch size.""" + low_priority_large = _make_test(1, np=4, priority=10) + high_priority_small = _make_test(2, np=1, priority=20) + tests_by_serial = { + low_priority_large.serialNumber: low_priority_large, + high_priority_small.serialNumber: high_priority_small, + } + order = { + low_priority_large.serialNumber: 0, + high_priority_small.serialNumber: 1, + } + ready = ReadyWorkSet( + item_lookup=tests_by_serial.get, + order_lookup=lambda test: order[test.serialNumber], + ) + ready.enqueue_if_ready(low_priority_large, lambda test: test.status is CREATED) + ready.enqueue_if_ready(high_priority_small, lambda test: test.status is CREATED) + + selected, blocked = ready.pop_next( + ready_predicate=lambda test: test.status is CREATED, + can_run=lambda _test: True, + ) + + self.assertIs(selected, high_priority_small) + self.assertFalse(blocked) + if __name__ == "__main__": unittest.main() From b760155ee4916f362cadf19a59921ef18910d73e Mon Sep 17 00:00:00 2001 From: "Peter B. Robinson" Date: Mon, 18 May 2026 15:31:35 -0700 Subject: [PATCH 14/15] revert codexes stubbornness --- test/test_ready_queue_example.py | 34 ++++---------------------------- 1 file changed, 4 insertions(+), 30 deletions(-) diff --git a/test/test_ready_queue_example.py b/test/test_ready_queue_example.py index 6600083..a250a6a 100644 --- a/test/test_ready_queue_example.py +++ b/test/test_ready_queue_example.py @@ -147,7 +147,7 @@ def startRun(self, test): return True -def _make_test(serial, np=1, block=None, wait_until=None, priority=None): +def _make_test(serial, np=1, block=None, wait_until=None): return types.SimpleNamespace( serialNumber=serial, status=CREATED, @@ -155,8 +155,9 @@ def _make_test(serial, np=1, block=None, wait_until=None, priority=None): dependents=[], block=block, independent=False, - np=np, - priority=priority if priority is not None else np, + """ trieat np as the priority """ + np=priority, + priority=priority ) @@ -216,32 +217,5 @@ def test_prefers_default_processor_count_priority_and_restores_blocked_candidate self.assertEqual(machine.launched, [1, 2]) self.assertIs(large.status, RUNNING) - def test_default_priority_uses_item_priority(self): - """Verify that ``ReadyWorkSet`` sorts by item priority, not by launch size.""" - low_priority_large = _make_test(1, np=4, priority=10) - high_priority_small = _make_test(2, np=1, priority=20) - tests_by_serial = { - low_priority_large.serialNumber: low_priority_large, - high_priority_small.serialNumber: high_priority_small, - } - order = { - low_priority_large.serialNumber: 0, - high_priority_small.serialNumber: 1, - } - ready = ReadyWorkSet( - item_lookup=tests_by_serial.get, - order_lookup=lambda test: order[test.serialNumber], - ) - ready.enqueue_if_ready(low_priority_large, lambda test: test.status is CREATED) - ready.enqueue_if_ready(high_priority_small, lambda test: test.status is CREATED) - - selected, blocked = ready.pop_next( - ready_predicate=lambda test: test.status is CREATED, - can_run=lambda _test: True, - ) - - self.assertIs(selected, high_priority_small) - self.assertFalse(blocked) - if __name__ == "__main__": unittest.main() From 66a60ced0ff617544a73a3f862cc0361045ef5a7 Mon Sep 17 00:00:00 2001 From: "Peter B. Robinson" Date: Mon, 18 May 2026 15:40:14 -0700 Subject: [PATCH 15/15] require priority callable --- ats/ready_queue.py | 19 +++---------------- docs/source/scheduler_extensions.rst | 4 ++-- 2 files changed, 5 insertions(+), 18 deletions(-) diff --git a/ats/ready_queue.py b/ats/ready_queue.py index b0b2bd9..c1d7267 100644 --- a/ats/ready_queue.py +++ b/ats/ready_queue.py @@ -47,10 +47,9 @@ def __init__( ``order_lookup(item: object) -> int``. Lower values run first inside one priority bucket, and the result must stay stable for as long as an item remains queued. - priority_lookup (callable, optional): Function with signature + priority_lookup (callable): Function with signature ``priority_lookup(item: object) -> int``. Higher values are - considered first. The default uses ``item.priority`` or ``1`` - when ``priority`` is missing. The result must stay stable for + considered first. The result must stay stable for as long as an item remains queued. serial_lookup (callable, optional): Function with signature ``serial_lookup(item: object) -> int``. It returns the stable @@ -60,7 +59,7 @@ def __init__( """ self._item_lookup = item_lookup self._order_lookup = order_lookup - self._priority_lookup = priority_lookup or self.default_priority + self._priority_lookup = priority_lookup self._serial_lookup = serial_lookup or self.default_serial self.reset() @@ -73,18 +72,6 @@ def reset(self): self._ready_heaps = defaultdict(list) self._ready_serials = set() - @staticmethod - def default_priority(item): - """Map an ATS test-like item to its default integer priority. - - Args: - item (object): Test-like object. If present, ``item.priority`` - must be integer-like. - - Returns: - int: ``int(item.priority)``, or ``1`` if ``item`` has no ``priority``. - """ - return int(getattr(item, "priority", 1)) @staticmethod def default_serial(item): diff --git a/docs/source/scheduler_extensions.rst b/docs/source/scheduler_extensions.rst index 4ee417d..5aa7d15 100644 --- a/docs/source/scheduler_extensions.rst +++ b/docs/source/scheduler_extensions.rst @@ -53,14 +53,14 @@ ReadyWorkSet ============ ``ats.ready_queue.ReadyWorkSet`` stores work that is structurally ready. It -groups work into scheduler-defined priority buckets and returns the largest +groups work into scheduler-defined priority buckets and returns the highest-priority runnable candidate first. The class is intentionally policy-light. The owner supplies: * ``item_lookup(serial)`` to map stable ids back to live test objects; * ``order_lookup(item)`` to preserve scheduler order inside buckets; -* an optional ``priority_lookup(item)`` if ``item.np`` is not the right priority key; +* ``priority_lookup(item)`` to define the priority of a test. * a ``ready_predicate(item)`` each time candidates are enqueued or popped; * a ``can_run(item)`` predicate, normally ``machine.canRunNow``.