diff --git a/ats/ready_queue.py b/ats/ready_queue.py new file mode 100644 index 0000000..c1d7267 --- /dev/null +++ b/ats/ready_queue.py @@ -0,0 +1,339 @@ +"""Cached ready-queue helpers for ATS schedulers. + +``ReadyWorkSet`` tracks work that is structurally ready for scheduler +consideration. It keeps FIFO scheduler order within integer-priority buckets, +prefers the highest runnable priority, and restores deferred candidates when a +machine policy cannot run them yet. + +Items are usually ``ats.tests.AtsTest`` instances, but the class only requires +test-like objects with stable serial identifiers. The lookup callbacks must +continue to return the same serial, order, and priority values for an item +while that item remains queued. By default, an item must have: + +* ``serialNumber``: integer-like unique id for this scheduler; +* ``priority``: optional integer-like scheduling priority; + +Schedulers supply callbacks for lookup, ordering, readiness, and machine +admission. ``ReadyWorkSet`` deliberately does not know what CREATED, waits, +directory blocks, retries, or machine resources mean. +""" +from collections import defaultdict +import heapq + + +class ReadyWorkSet: + """Maintain priority-binned heaps of structurally ready scheduler work. + + The work set stores heap entries as ``(order, serial)`` tuples rather than + storing item objects directly. This keeps heap ordering stable and lets a + scheduler's readiness predicate see the current live object each time a + candidate is considered. + """ + + def __init__( + self, + item_lookup, + order_lookup, + priority_lookup=None, + serial_lookup=None, + ): + """Create an empty ready work set. + + Args: + item_lookup (callable): Function with signature + ``item_lookup(serial: int) -> object | None``. It maps a + serial id from a heap entry back to the live scheduler item. + order_lookup (callable): Function with signature + ``order_lookup(item: object) -> int``. Lower values run first + inside one priority bucket, and the result must stay stable + for as long as an item remains queued. + priority_lookup (callable): Function with signature + ``priority_lookup(item: object) -> int``. Higher values are + considered first. The result must stay stable for + as long as an item remains queued. + serial_lookup (callable, optional): Function with signature + ``serial_lookup(item: object) -> int``. It returns the stable + unique id for an item. The default uses ``item.serialNumber``. + The result must stay stable for as long as an item remains + queued. + """ + self._item_lookup = item_lookup + self._order_lookup = order_lookup + self._priority_lookup = priority_lookup + self._serial_lookup = serial_lookup or self.default_serial + self.reset() + + def reset(self): + """Clear all ready buckets and membership tracking. + + Returns: + None. + """ + self._ready_heaps = defaultdict(list) + self._ready_serials = set() + + + @staticmethod + def default_serial(item): + """Return an ATS test-like item's stable serial identifier. + + Args: + item (object): Test-like object with integer-like + ``item.serialNumber``. + + Returns: + int: The item's stable serial identifier. + """ + return item.serialNumber + + def priority_for(self, item): + """Map ``item`` to an integer priority. + + Args: + item (object): Scheduler item accepted by this work set's + ``priority_lookup`` callback. + + Returns: + int: Priority for ``item``. Higher values are considered first. + """ + return int(self._priority_lookup(item)) + + def order_of(self, item): + """Return the stable scheduler order used inside ready priorities. + + Args: + item (object): Scheduler item accepted by this work set's + ``order_lookup`` callback. + + Returns: + int: Stable order key. Lower values are selected first inside the + same priority. + """ + return self._order_lookup(item) + + def enqueue_if_ready(self, item, ready_predicate): + """Add ``item`` to the ready set if the supplied predicate accepts it. + + Args: + item (object | None): Scheduler item to enqueue. ``None`` is + ignored. + ready_predicate (callable): Function with signature + ``ready_predicate(item: object) -> bool``. It should return + ``True`` only when the item is structurally ready for scheduler + consideration. + + Returns: + bool: ``True`` if a new heap entry was added, otherwise ``False``. + Duplicate serial ids are ignored. + """ + if item is None or not ready_predicate(item): + return False + serial = self._serial_lookup(item) + if serial in self._ready_serials: + return False + heapq.heappush(self._ready_heaps[self.priority_for(item)], self._heap_key(item)) + self._ready_serials.add(serial) + return True + + def pop_next(self, ready_predicate, can_run, blocked_predicate=None): + """Pop the highest-priority runnable item while restoring deferred candidates. + + Args: + ready_predicate (callable): Function with signature + ``ready_predicate(item: object) -> bool``. It is rechecked for + each candidate so stale heap entries can be discarded. + can_run (callable): Function with signature + ``can_run(item: object) -> bool``. This is normally + ``machine.canRunNow`` and is the final machine-admission check. + blocked_predicate (callable, optional): Function with signature + ``blocked_predicate(item: object) -> bool``. When it returns + ``True``, the candidate is left queued and the returned blocked + flag is set. This is for scheduler-owned temporary blockers + that are not represented by ``ready_predicate``. + + Returns: + tuple: ``(item, blocked)`` where ``item`` is the selected scheduler + item or ``None``. ``blocked`` is ``True`` if at least one otherwise + ready candidate was retained by ``blocked_predicate``. + + Notes: + Candidates that still satisfy ``ready_predicate`` but fail + ``can_run`` are restored to their original priorities before + return. Candidates that no longer satisfy ``ready_predicate`` are + discarded as stale entries. + """ + persistence_blocked = False + deferred = defaultdict(list) + for priority in sorted(self._ready_heaps.keys(), reverse=True): + heap = self._ready_heaps[priority] + while heap: + order_index, serial = heapq.heappop(heap) + self._ready_serials.discard(serial) + candidate = self._item_lookup(serial) + if candidate is None or not ready_predicate(candidate): + continue + if blocked_predicate is not None and blocked_predicate(candidate): + persistence_blocked = True + deferred[priority].append((order_index, serial)) + continue + if can_run(candidate): + self._restore_deferred(deferred) + return candidate, persistence_blocked + deferred[priority].append((order_index, serial)) + + self._restore_deferred(deferred) + return None, persistence_blocked + + def remove(self, item): + """Remove ``item`` from its ready priority if it is still queued. + + Args: + item (object | None): Scheduler item to remove. ``None`` is + ignored. + + Returns: + bool: ``True`` if the exact heap entry was found and removed, + otherwise ``False``. + """ + if item is None: + return False + serial = self._serial_lookup(item) + priority = self.priority_for(item) + heap = self._ready_heaps.get(priority) + if not heap: + self._ready_serials.discard(serial) + return False + key = self._heap_key(item) + try: + heap.remove(key) + except ValueError: + self._ready_serials.discard(serial) + return False + heapq.heapify(heap) + self._ready_serials.discard(serial) + return True + + def has_candidates(self): + """Return whether any ready priority still contains heap entries. + + Returns: + bool: ``True`` if any priority contains queued heap entries. This is + a cheap structural check; entries may still be stale. + """ + return any(heap for heap in self._ready_heaps.values()) + + def ready_count(self, ready_predicate): + """Count currently queued items that still satisfy ``ready_predicate``. + + Args: + ready_predicate (callable): Function with signature + ``ready_predicate(item: object) -> bool``. + + Returns: + int: Number of queued serial ids whose live item exists and still + satisfies ``ready_predicate``. + """ + count = 0 + for serial in self._ready_serials: + candidate = self._item_lookup(serial) + if candidate is not None and ready_predicate(candidate): + count += 1 + return count + + def live_ready_count(self): + """Return the cached ready-set size. + + Returns: + int: Number of serial ids currently tracked as queued. This is + cheap and may include stale entries that will be discarded by a + later ``pop_next`` or ``ready_count`` call. + """ + return len(self._ready_serials) + + def priority_counts(self, predicate=None): + """Return queued item counts by priority. + + Args: + predicate (callable, optional): Function with signature + ``predicate(item: object) -> bool``. If provided, only live + items accepted by this predicate are counted. + + Returns: + dict: Mapping ``priority: int`` to ``count: int`` for queued live + items. + """ + counts = defaultdict(int) + for serial in self._ready_serials: + item = self._item_lookup(serial) + if item is None: + continue + if predicate is not None and not predicate(item): + continue + counts[self.priority_for(item)] += 1 + return dict(counts) + + def priorities(self): + """Return a snapshot of priority keys currently known to the work set. + + Returns: + list: Priority integers. Empty priorities may be present if + they previously held work. + """ + return list(self._ready_heaps.keys()) + + def candidates_for_priority(self, priority, ready_predicate, candidate_predicate=None, limit=None): + """Return queued candidates from one priority without mutating the work set. + + Args: + priority (int): Priority to inspect. + ready_predicate (callable): Function with signature + ``ready_predicate(item: object) -> bool``. Stale or no-longer + ready candidates are skipped in the returned list, but not + removed from the heap. + candidate_predicate (callable, optional): Additional filter with + signature ``candidate_predicate(item: object) -> bool``. + limit (int, optional): Maximum number of candidates to return. If + ``None``, all matching candidates in the priority are returned. + + Returns: + list: Live scheduler items from ``priority`` in stable in-priority + order. + """ + candidates = [] + for _order_index, serial in sorted(self._ready_heaps.get(priority, [])): + candidate = self._item_lookup(serial) + if candidate is None or not ready_predicate(candidate): + continue + if candidate_predicate is not None and not candidate_predicate(candidate): + continue + candidates.append(candidate) + if limit is not None and len(candidates) >= limit: + break + return candidates + + def _heap_key(self, item): + """Build the stored heap key for ``item``. + + Args: + item (object): Scheduler item accepted by ``order_lookup`` and + ``serial_lookup``. + + Returns: + tuple: ``(order: int, serial: int)``. + """ + return (self._order_lookup(item), self._serial_lookup(item)) + + def _restore_deferred(self, deferred): + """Restore deferred heap entries after a pop attempt. + + Args: + deferred (dict): Mapping ``priority: int`` to lists of heap-key + tuples ``(order: int, serial: int)``. + + Returns: + None. + """ + for priority, items in deferred.items(): + for item in items: + heapq.heappush(self._ready_heaps[priority], item) + self._ready_serials.add(item[-1]) diff --git a/docs/source/ats.rst b/docs/source/ats.rst index 609acb5..a147406 100755 --- a/docs/source/ats.rst +++ b/docs/source/ats.rst @@ -703,16 +703,13 @@ to understand what influenced the scheduling: * Test attribute runOrder, an integer indicating the order of test launch. .. note:: - Important: by default two tests will not be run in the same directory at the same time. - -This is a modestly conservative scheme to avoid common resource conflicts when testing -one file with different parameters. + Important: by default two non-independent tests will not be run in the same + directory at the same time. See + :ref:`directory blocking ` for the full rule. -If you know a test does not have such a problem, you can give it the option -``independent = True``. Note that the ``group`` command makes the default value of -``independent`` False for all members of the group, overriding anything except an actual -option in the test statement. Thus if you do not want this behavior for the group -you must use independent = True as an argument in your group command. +This is a modestly conservative scheme to avoid common resource conflicts when +testing one file with different parameters. If you know a test does not have +such a problem, you can give it the option ``independent = True``. The standard scheduler sorts the groups by the highest priority test in the group. In effect, every member of a group behaves as if it has the priority of the highest-priority test in the @@ -988,14 +985,26 @@ keep .. _directory_blocking: independent - If independent is True, the user is certifying that there is no obstacle to - this test executing at the same time as any other test. Otherwise, by default - tests are assumed to conflict with others in the same directory, because - they might write files there with the same names as those read or written by - other tests. If two tests conflict, they are never run at the same time. - Judicious use of independent = True will increase ATS's throughput. - We suggest that while a stick(independent=True) may be appropriate, - in some test files, to glue this definition may be reckless. + If ``independent`` is True, the user is certifying that there is no obstacle + to this test executing at the same time as any other test. Otherwise, by + default tests are assumed to conflict with other non-independent tests in the + same directory, because they might write files there with the same names as + those read or written by other tests. + + ATS implements this with a directory block. A test's block is normally its + execution directory. When a non-independent test starts, its group owns that + directory block. Another non-independent test or group using the same block + cannot start until the owning group's non-independent members have finished. + Other members of the owning group may continue to run under that block. + + The ``group`` command makes the default value of ``independent`` False for + all members of the group, overriding anything except an explicit option in + the test statement. If you do not want this behavior for the group, pass + ``independent=True`` to ``group``. + + Judicious use of ``independent=True`` will increase ATS's throughput. Use + it only when tests in the same directory do not share output files, generated + inputs, cleanup patterns, or other directory-local state. priority By default the priority of a test is np + the sum of the priorities of diff --git a/docs/source/scheduler_extensions.rst b/docs/source/scheduler_extensions.rst new file mode 100644 index 0000000..5aa7d15 --- /dev/null +++ b/docs/source/scheduler_extensions.rst @@ -0,0 +1,290 @@ +==================================== + Scheduler Extension Design Guide +==================================== + +This guide describes the reusable ATS ready-queue support used by larger or +more specialized schedulers. The examples are intentionally domain-neutral. +Application drivers can use these APIs to add scheduling policy without +teaching ATS about application-specific status files, reports, or test +metadata. + +Core Responsibilities +===================== + +ATS owns the generic execution model: + +* tests are collected into ATS ``AtsTest`` objects; +* dependencies are represented with ``waitUntil`` and ``dependents``; +* the active scheduler decides which created tests should be offered to the + machine; +* the machine owns launch, running-test tracking, completion detection, and + final test-end notification. + +Application drivers own policy: + +* what "structurally ready" means for their tests; +* how tests should be bucketed for resource-aware scheduling; +* how retry and group-output behavior interacts with cached readiness. + +That split keeps ATS reusable. A driver can use ATS queues while keeping +domain-specific data structures outside the ATS package. + +Scheduler Flow +============== + +The normal interactive flow is: + +1. ``manager.collectTests()`` executes input files and defines tests. +2. The scheduler prioritizes and loads the collected interactive tests. +3. ``scheduler.step()`` starts runnable tests while the machine has capacity. +4. ``machine.checkRunning()`` detects completed tests. +5. ``machine.testEnded(test, status)`` records final machine-level state. +6. The scheduler is notified and can unblock dependent work. + +A custom scheduler should preserve two invariants: + +* scheduler operations and machine operations run on the main ATS thread unless + the driver has explicitly built a safe handoff; +* resource policy remains authoritative in the machine. A scheduler may cache + readiness, but it should still call ``machine.canRunNow(test)`` or + ``machine.startRun(test)`` before consuming resources. + +ReadyWorkSet +============ + +``ats.ready_queue.ReadyWorkSet`` stores work that is structurally ready. It +groups work into scheduler-defined priority buckets and returns the highest-priority +runnable candidate first. + +The class is intentionally policy-light. The owner supplies: + +* ``item_lookup(serial)`` to map stable ids back to live test objects; +* ``order_lookup(item)`` to preserve scheduler order inside buckets; +* ``priority_lookup(item)`` to define the priority of a test. +* a ``ready_predicate(item)`` each time candidates are enqueued or popped; +* a ``can_run(item)`` predicate, normally ``machine.canRunNow``. + +``ReadyWorkSet`` does not decide what "ready" means. A scheduler commonly +defines readiness as: created, dependencies complete, not directory-blocked, +and still relevant to the current run. + +Directory Blocks +================ + +The user-facing directory-blocking model is documented in +:ref:`directory blocking `. Scheduler extensions only need +to preserve the scheduler responsibilities that follow from that model. + +Schedulers that cache readiness need to handle block changes incrementally: + +* index tests by ``test.block`` when tests are loaded; +* reject candidates whose block is currently owned by another group; +* after a test ends, remove its group's block if the group is no longer active; +* when a block clears, reconsider only tests indexed under that block. + +Beginning Tutorial: A Cached Ready Scheduler +============================================ + +A scheduler can use ``ReadyWorkSet`` to avoid rescanning every created test on +every pass. This example is intentionally small, but it includes the parts +needed for a copied scheduler to stay correct: initial indexing, direct +dependent updates, directory-block updates, and launch-failure recovery. + +:: + + from collections import defaultdict + + from ats.atsut import CREATED, RUNNING + from ats.ready_queue import ReadyWorkSet + + class ReadyScheduler: + def __init__(self): + # ``ReadyWorkSet`` stores only compact heap entries, so the + # scheduler must provide a way to map a serial number back to the + # live ATS test object. + self.tests_by_serial = {} + + # Stable order is separate from serial number. Serial numbers are + # usually creation order, but a scheduler may sort groups or apply + # priority before loading tests. Keep the order you want preserved + # inside each priority bucket. + self.order = {} + + # Cache the number of unfinished wait dependencies for each test. + # This turns "did this parent unblock anything?" into updates of the + # completed test's direct dependents instead of a full-suite scan. + self.remaining_waits = {} + + # ATS directory blocks prevent two non-independent groups from + # running in the same directory. When a block clears, only tests in + # that directory need to be reconsidered. + self.tests_by_block = defaultdict(list) + self.blocks = {} + + # The ready set decides queue mechanics: priority buckets, stable + # in-bucket ordering, stale-entry cleanup, and temporary deferral of + # candidates that are ready but cannot pass machine policy yet. + self.ready = ReadyWorkSet( + item_lookup=self.tests_by_serial.get, + order_lookup=lambda test: self.order[test.serialNumber], + ) + + def add_tests(self, tests): + # Load tests in the scheduler order you want to preserve. A real + # scheduler usually calls this after ATS has built and sorted groups. + tests = list(tests) + next_order = len(self.order) + for offset, test in enumerate(tests): + self.tests_by_serial[test.serialNumber] = test + self.order[test.serialNumber] = next_order + offset + + # Count only unfinished parents. Completed parents should not + # keep a test out of the initial ready set. + self.remaining_waits[test.serialNumber] = sum( + 1 for parent in test.waitUntil + if parent.status in (CREATED, RUNNING) + ) + + # Record block membership once so clearing a directory block can + # revisit only affected tests. + if getattr(test, "block", None): + self.tests_by_block[test.block].append(test) + + for test in tests: + # ``enqueue_if_ready`` calls back into scheduler policy. The + # ready set never decides what CREATED, waits, or blocks mean. + self.ready.enqueue_if_ready(test, self.is_ready) + + def is_ready(self, test): + # "Ready" here means structurally ready for scheduler consideration. + # It does not mean resources are currently available; the machine + # still decides that later through ``canRunNow`` / ``startRun``. + if test is None: + return False + if test.status is not CREATED: + return False + if self.remaining_waits.get(test.serialNumber, 0) != 0: + return False + + # The cached counter is the fast path. This direct check is a + # defensive guard for schedulers that can mutate waitUntil after + # initial indexing or after retry/reset handling. + if any(parent.status in (CREATED, RUNNING) + for parent in test.waitUntil): + return False + return not self.is_blocked(test) + + def is_blocked(self, test): + # Directory blocks are owned by the group that is currently running + # in that directory. Tests from that same group may continue; tests + # from other groups must wait. + block = getattr(test, "block", None) + if not block: + return False + owner = self.blocks.get(block) + return owner is not None and owner != test.group.number + + def add_block(self, test): + # Match ATS's default block policy: if any non-independent member of + # a group has a block directory, the whole group owns that block + # while one of its blocking members is CREATED or RUNNING. + group = test.group + if getattr(group, "isBlocking", False): + return + for member in group: + if getattr(member, "independent", False): + continue + block = getattr(member, "block", None) + if block: + group.isBlocking = True + self.blocks[block] = group.number + + def remove_block(self, test): + # A group keeps its directory block until all non-independent group + # members have left CREATED/RUNNING. When that happens, sibling + # tests from other groups in the same directory may become ready. + group = test.group + if not getattr(group, "isBlocking", False): + return + for member in group: + if getattr(member, "independent", False): + continue + if member.status in (CREATED, RUNNING): + return + group.isBlocking = False + for member in group: + if getattr(member, "independent", False): + continue + block = getattr(member, "block", None) + if block: + self.blocks.pop(block, None) + + def test_ended(self, test): + # Completion can create new ready work in two local ways: + # 1. direct dependents may have one fewer unfinished parent; + # 2. the completed test's directory block may have cleared. + block = getattr(test, "block", None) + self.remove_block(test) + + for dependent in getattr(test, "dependents", []): + # Some ATS bookkeeping can list broad dependents. Only update + # tests that were actually waiting on this completed parent. + if test not in getattr(dependent, "waitUntil", []): + continue + serial = dependent.serialNumber + remaining = self.remaining_waits.get(serial, 0) + if remaining > 0: + self.remaining_waits[serial] = remaining - 1 + + # This is cheap even if the dependent is not ready yet. The + # scheduler predicate filters out tests with other unfinished + # parents or active blocks. + self.ready.enqueue_if_ready(dependent, self.is_ready) + + if block and block not in self.blocks: + # Avoid rescanning all CREATED tests just because one directory + # block cleared. Only tests from that block can be affected. + for candidate in self.tests_by_block.get(block, []): + self.ready.enqueue_if_ready(candidate, self.is_ready) + + def next_ready(self, machine): + # ``pop_next`` picks the highest-priority bucket, then checks + # machine policy. Candidates that are still structurally ready but + # cannot run now are restored so a later scheduler pass can + # reconsider them. + test, _blocked = self.ready.pop_next( + self.is_ready, + machine.canRunNow, + ) + return test + + def start_available(self, machine): + # Keep launching until the ready set has no machine-runnable work or + # the machine is full. Real schedulers usually also log each launch. + while True: + test = self.next_ready(machine) + if test is None: + return + self.add_block(test) + if not machine.startRun(test): + # If the machine refuses a launch after selection, undo the + # scheduler-side block and requeue the test if it is still + # structurally ready. + self.remove_block(test) + self.ready.enqueue_if_ready(test, self.is_ready) + return + +Production schedulers also need logging, retry behavior, group-output handling, +periodic reports, and a cheap "work remains" check. The example shows the +division of labor: the ready set stores candidates; the scheduler owns +dependency and block policy; the machine owns resource admission. + +Design Checklist +================ + +When adding a scheduler extension: + +* keep application-specific state outside ATS; +* call machine resource checks before launch; +* preserve ordering only where required by correctness; +* test completion bursts, dependency chains, and retry/reset behavior. diff --git a/test/test_ready_queue_example.py b/test/test_ready_queue_example.py new file mode 100644 index 0000000..a250a6a --- /dev/null +++ b/test/test_ready_queue_example.py @@ -0,0 +1,221 @@ +from collections import defaultdict +import types +import unittest + +from ats.atsut import CREATED, PASSED, RUNNING +from ats.ready_queue import ReadyWorkSet + + +class ReadyScheduler: + """Minimal scheduler matching the docs/source/scheduler_extensions.rst example.""" + + def __init__(self): + self.tests_by_serial = {} + self.order = {} + self.remaining_waits = {} + self.tests_by_block = defaultdict(list) + self.blocks = {} + self.ready = ReadyWorkSet( + item_lookup=self.tests_by_serial.get, + order_lookup=lambda test: self.order[test.serialNumber], + ) + + def add_tests(self, tests): + tests = list(tests) + next_order = len(self.order) + for offset, test in enumerate(tests): + self.tests_by_serial[test.serialNumber] = test + self.order[test.serialNumber] = next_order + offset + self.remaining_waits[test.serialNumber] = sum( + 1 for parent in test.waitUntil + if parent.status in (CREATED, RUNNING) + ) + if getattr(test, "block", None): + self.tests_by_block[test.block].append(test) + + for test in tests: + self.ready.enqueue_if_ready(test, self.is_ready) + + def is_ready(self, test): + if test is None: + return False + if test.status is not CREATED: + return False + if self.remaining_waits.get(test.serialNumber, 0) != 0: + return False + if any(parent.status in (CREATED, RUNNING) for parent in test.waitUntil): + return False + return not self.is_blocked(test) + + def is_blocked(self, test): + block = getattr(test, "block", None) + if not block: + return False + owner = self.blocks.get(block) + return owner is not None and owner != test.group.number + + def add_block(self, test): + group = test.group + if getattr(group, "isBlocking", False): + return + for member in group: + if getattr(member, "independent", False): + continue + block = getattr(member, "block", None) + if block: + group.isBlocking = True + self.blocks[block] = group.number + + def remove_block(self, test): + group = test.group + if not getattr(group, "isBlocking", False): + return + for member in group: + if getattr(member, "independent", False): + continue + if member.status in (CREATED, RUNNING): + return + group.isBlocking = False + for member in group: + if getattr(member, "independent", False): + continue + block = getattr(member, "block", None) + if block: + self.blocks.pop(block, None) + + def test_ended(self, test): + block = getattr(test, "block", None) + self.remove_block(test) + + for dependent in getattr(test, "dependents", []): + if test not in getattr(dependent, "waitUntil", []): + continue + serial = dependent.serialNumber + remaining = self.remaining_waits.get(serial, 0) + if remaining > 0: + self.remaining_waits[serial] = remaining - 1 + self.ready.enqueue_if_ready(dependent, self.is_ready) + + if block and block not in self.blocks: + for candidate in self.tests_by_block.get(block, []): + self.ready.enqueue_if_ready(candidate, self.is_ready) + + def next_ready(self, machine): + test, _blocked = self.ready.pop_next( + self.is_ready, + machine.canRunNow, + ) + return test + + def start_available(self, machine): + while True: + test = self.next_ready(machine) + if test is None: + return + self.add_block(test) + if not machine.startRun(test): + self.remove_block(test) + self.ready.enqueue_if_ready(test, self.is_ready) + return + + +class _Group(list): + def __init__(self, number, tests=()): + super().__init__(tests) + self.number = number + self.isBlocking = False + + +class _Machine: + def __init__(self, capacity, blocked_serials=None): + self.capacity = capacity + self.blocked_serials = set(blocked_serials or []) + self.launched = [] + + def remainingCapacity(self): + return self.capacity + + def canRunNow(self, test): + return test.serialNumber not in self.blocked_serials and test.np <= self.capacity + + def startRun(self, test): + if not self.canRunNow(test): + return False + self.launched.append(test.serialNumber) + self.capacity -= test.np + test.status = RUNNING + return True + + +def _make_test(serial, np=1, block=None, wait_until=None): + return types.SimpleNamespace( + serialNumber=serial, + status=CREATED, + waitUntil=list(wait_until or []), + dependents=[], + block=block, + independent=False, + """ trieat np as the priority """ + np=priority, + priority=priority + ) + + +class ReadySchedulerExampleTest(unittest.TestCase): + def test_launches_ready_work_and_unblocks_direct_dependents(self): + """Verify that completion only unblocks direct dependents and relaunches them.""" + parent = _make_test(1, 1, block="case") + child = _make_test(2, 1, block="case", wait_until=[parent]) + other = _make_test(3, 1, block="other") + parent.dependents = [child] + + group = _Group(1, [parent, child]) + other_group = _Group(2, [other]) + parent.group = group + child.group = group + other.group = other_group + + scheduler = ReadyScheduler() + scheduler.add_tests([parent, child, other]) + + machine = _Machine(capacity=1) + scheduler.start_available(machine) + self.assertEqual(machine.launched, [1]) + self.assertIs(parent.status, RUNNING) + self.assertIs(child.status, CREATED) + + parent.status = PASSED + machine.capacity = 1 + scheduler.test_ended(parent) + scheduler.start_available(machine) + + self.assertEqual(machine.launched, [1, 2]) + self.assertIs(child.status, RUNNING) + + def test_prefers_default_processor_count_priority_and_restores_blocked_candidate(self): + """Verify that a blocked higher-bucket candidate is restored instead of dropped.""" + small = _make_test(1, np=1, block="small") + large = _make_test(2, np=4, block="large") + small.group = _Group(1, [small]) + large.group = _Group(2, [large]) + + scheduler = ReadyScheduler() + scheduler.add_tests([small, large]) + + machine = _Machine(capacity=4, blocked_serials={2}) + scheduler.start_available(machine) + self.assertEqual(machine.launched, [1]) + self.assertIs(small.status, RUNNING) + self.assertIs(large.status, CREATED) + + small.status = PASSED + machine.capacity = 4 + machine.blocked_serials.clear() + scheduler.test_ended(small) + scheduler.start_available(machine) + + self.assertEqual(machine.launched, [1, 2]) + self.assertIs(large.status, RUNNING) + +if __name__ == "__main__": + unittest.main()