From b000a62d755c997141176a503f43abb07df0f2ec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Vladimir=20Vargas=20Calder=C3=B3n?= Date: Wed, 1 Apr 2026 09:54:59 -0700 Subject: [PATCH] Add example and tests for fully yielded Zephyr subgraph search Improve documentation of parameters and type hinting Correct relative import in init. Warn the user that the source has been updated when an incomplete source was provided. Clarify that the zephyr_quotient_embedding_search is a greedy method and has limitations in the docstrings. Co-authored-by: Copilot Correct imports Co-authored-by: Copilot Improve print statements so that the user knows more about what goes on during the example. Co-authored-by: Copilot Remove Embedding and EmbeddingChain types for clarity. Use absolute import instead of relative. Simplify embedding is not None check. Remove type hints from docstrings when signature already contains type hints Replace ZephyrNode with tuple[int, int, int, int, int] everywhere Add release note Merge some checks when validating search parameters and modify corresponding tests Write returns of functions' docstrings without mentioning the names of the returned variables. Change name of iterator variables to show which coordinates are being iterated on. Add code directive to minorminer routines in docstrings Change note directive. Add inline comments to clarify what certain instructions do. Replace _ensure_* function names to _normalize_* Remove mention to example and tests Remove "Systems Inc." from header. Co-authored-by: Theodor Isacsson --- .../embedding_methods/__init__.py | 15 + .../zephyr_quotient_embedding_search.py | 933 ++++++++++++++++++ examples/fully_yielded_zephyr_subgraph.py | 158 +++ ...ent-embedding-search-b19297aca3ea3be2.yaml | 6 + tests/test_zephyr_quotient_search.py | 431 ++++++++ 5 files changed, 1543 insertions(+) create mode 100644 dwave/experimental/embedding_methods/__init__.py create mode 100644 dwave/experimental/embedding_methods/zephyr_quotient_embedding_search.py create mode 100644 examples/fully_yielded_zephyr_subgraph.py create mode 100644 releasenotes/notes/add-zephyr-quotient-embedding-search-b19297aca3ea3be2.yaml create mode 100644 tests/test_zephyr_quotient_search.py diff --git a/dwave/experimental/embedding_methods/__init__.py b/dwave/experimental/embedding_methods/__init__.py new file mode 100644 index 0000000..2fe666d --- /dev/null +++ b/dwave/experimental/embedding_methods/__init__.py @@ -0,0 +1,15 @@ +# Copyright 2026 D-Wave +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from dwave.experimental.embedding_methods.zephyr_quotient_embedding_search import * diff --git a/dwave/experimental/embedding_methods/zephyr_quotient_embedding_search.py b/dwave/experimental/embedding_methods/zephyr_quotient_embedding_search.py new file mode 100644 index 0000000..cdf3d63 --- /dev/null +++ b/dwave/experimental/embedding_methods/zephyr_quotient_embedding_search.py @@ -0,0 +1,933 @@ +# Copyright 2026 D-Wave +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import itertools +import warnings +from collections import namedtuple +from typing import Callable, Literal, get_args + +import networkx as nx +import numpy as np +from dwave.embedding import verify_embedding +from dwave_networkx import zephyr_coordinates, zephyr_graph + +__all__ = ["zephyr_quotient_search"] + +YieldType = Literal["node", "edge", "rail-edge"] +QuotientSearchType = Literal["by_quotient_rail", "by_quotient_node", "by_rail_then_node"] + +ZephyrSearchMetadata = namedtuple( + "ZephyrSearchMetadata", ["max_num_yielded", "starting_num_yielded", "final_num_yielded"] +) + + +def _validate_graph_inputs(source: nx.Graph, target: nx.Graph) -> None: + """Validate that source and target are Zephyr NetworkX graphs. + + Both source and target graphs must be networkx graph instances with a 'family' metadata key + set to 'zephyr'. Each graph must also contain 'rows', 'tile' and 'labels' metadata keys. + + Args: + source: Source Zephyr graph. + target: Target Zephyr graph. + + Raises: + TypeError: If inputs are not NetworkX graphs. + ValueError: If either graph is not a Zephyr family graph or is missing 'rows'/'tile' + metadata. + """ + if not isinstance(source, nx.Graph) or not isinstance(target, nx.Graph): + raise TypeError("source and target must both be networkx.Graph instances") + + if source.graph.get("family") != "zephyr": + raise ValueError("source graph should be a zephyr family graph") + if target.graph.get("family") != "zephyr": + raise ValueError("target graph should be a zephyr family graph") + + for graph_name, graph in zip(("source", "target"), (source, target)): + for key in ("rows", "tile", "labels"): + if key not in graph.graph: + raise ValueError(f"{graph_name} graph is missing required '{key}' metadata") + + +def _extract_graph_properties(source: nx.Graph, target: nx.Graph) -> tuple[int, int, int]: + """Extract and validate Zephyr graph properties, returning ``(rows, tile count, and target + tile count)``. + + Each graph must contain required metadata fields: 'rows' (number of rows) and 'tile' + (tile count). All metadata values must be positive integers. The source and target graphs must + have matching row counts. The target tile count must be greater than or equal to the source tile + count to accommodate the embedding. + + Args: + source: Source Zephyr graph. + target: Target Zephyr graph. + + Returns: + Source Zephyr rows, source tile count and target tile count. + + Raises: + TypeError: If metadata values are not integers. + KeyError: If rows or tile metadata is missing from either graph. + ValueError: If rows or tile metadata are not compatible, i.e., not the same in the case of + rows, or if the target tile count is less than the source tile count. + """ + m = source.graph["rows"] + tp = source.graph["tile"] + t = target.graph["tile"] + + for v, name in zip((m, tp, t), ("rows", "source tile", "target tile")): + if not isinstance(v, int): + raise TypeError(f"graph '{name}' metadata must be an integer") + if v <= 0: + raise ValueError(f"graph '{name}' metadata must be positive") + if target.graph["rows"] != m: + raise ValueError("source and target must have the same number of rows") + if t < tp: + raise ValueError("target tile count must be >= source tile count") + + return m, tp, t + + +def _validate_search_parameters( + quotient_search: str, + yield_type: str, + embedding: dict[tuple[int, int, int, int, int], tuple[tuple[int, int, int, int, int], ...]] | None = None, +) -> None: + """Validate high-level search parameters. + + ``quotient_search`` must be one of ``'by_quotient_rail'``, ``'by_quotient_node'``, or + ``'by_rail_then_node'``; ``yield_type`` must be one of ``'node'``, ``'edge'``, or + ``'rail-edge'``; and ``embedding`` must be ``None`` or a ``dict`` representing a + one-to-one chain mapping of Zephyr coordinate nodes, where each node is a 5-tuple + ``(u, w, k, j, z)`` and each value is a tuple of one target node. + + Args: + quotient_search: Search mode. + yield_type: Optimization objective. + embedding: Optional initial one-to-one chain mapping in 5-tuple coordinate format, + where each node is ``(u, w, k, j, z)``. If None, no validation of the embedding is + performed. + + Raises: + ValueError: If ``quotient_search`` or ``yield_type`` is invalid, if ``embedding`` + contains duplicate target nodes (i.e. is not one-to-one), or if embedding nodes/chains + are not in 5-tuple key and singleton-chain tuple format. + TypeError: If ``embedding`` is provided but is not a dictionary. + """ + valid_ksearch = get_args(QuotientSearchType) + valid_yield_type = get_args(YieldType) + + if quotient_search not in valid_ksearch: + raise ValueError( + f"quotient_search must be one of {sorted(valid_ksearch)}. Got " f"'{quotient_search}'" + ) + if yield_type not in valid_yield_type: + raise ValueError( + f"yield_type must be one of {sorted(valid_yield_type)}. Got '{yield_type}'" + ) + if embedding is not None: + if not isinstance(embedding, dict): + raise TypeError(f"embedding must be a dictionary when provided. Got {type(embedding)}") + # Validate chain format: keys are nodes, values are tuples of nodes + for key, value in embedding.items(): + if not isinstance(key, tuple) or len(key) != 5: + raise ValueError( + f"embedding keys must be 5-tuples representing Zephyr coordinates. " + f"Got key {key} of type {type(key)}" + + (f" with length {len(key)}" if isinstance(key, tuple) else "") + ) + if not isinstance(value, tuple) or len(value) != 1: + raise ValueError( + f"embedding values must be singleton tuples representing node chains. " + f"Got value {value} of type {type(value)}" + + (f" with length {len(value)}" if isinstance(value, tuple) else "") + + f" for key {key}" + ) + for i, node in enumerate(value): + if not isinstance(node, tuple) or len(node) != 5: + raise ValueError( + f"embedding chains must contain 5-tuples. Got {node} " + f"(length {len(node) if isinstance(node, tuple) else 'N/A'}) " + f"at position {i} in chain for key {key}" + ) + # Check one-to-one constraint: flatten all chains and ensure no duplicates + all_target_nodes = [] + for chain in embedding.values(): + all_target_nodes.extend(chain) + if len(all_target_nodes) != len(set(all_target_nodes)): + raise ValueError( + "embedding must be a one-to-one mapping: duplicate target nodes detected across " + "chains. " + ) + + +def _normalize_coordinate_source( + source: nx.Graph, + m: int, + tp: int, +) -> tuple[nx.Graph, set[tuple[int, int, int, int, int]], Callable[[tuple[int, int, int, int, int]], int | tuple[int, int, int, int, int]]]: + """Normalise the source graph to coordinate labels. + + This function ensures the rest of the search code can operate on a + coordinate-labelled representation of the graphs, regardless of the input node-labelling + convention. The quotient search internally assumes Zephyr coordinates of the + form ``(u, w, k, j, z)``, where each such 5-tuple identifies one Zephyr node. + + Args: + source: Source Zephyr graph, either linear or coordinate labelled. + m: Number of rows (must be consistent with ``source``). + tp: Source tile count (must be consistent with ``source``). + + Returns: + coordinate-labelled (5-tuple) source Zephyr graph, the full canonical coordinate node set + implied by ``m`` and ``tp``, and a callable that maps coordinate nodes back to the original + source labelling space + + Raises: + ValueError: If source labels are unsupported. + """ + source_nodes: set[tuple[int, int, int, int, int]] = { + (u, w, k, j, z) + for u in range(2) + for w in range(2 * m + 1) + for k in range(tp) + for j in range(2) + for z in range(m) + } + + # If the labels are linear integers, convert to coordinate labels and define a function to + # convert back. + if source.graph["labels"] == "int": + coords = zephyr_coordinates(m, tp) + to_tuple = coords.linear_to_zephyr + _source = zephyr_graph( + m, + tp, + coordinates=True, + node_list=source_nodes, + edge_list=[(to_tuple(n1), to_tuple(n2)) for n1, n2 in source.edges()], + ) + + def to_source_linear(n: tuple[int, int, int, int, int]) -> int: + return coords.zephyr_to_linear(n) + + return _source, source_nodes, to_source_linear + + # IF labels are not linear nor coordinate, we raise an error. + if source.graph["labels"] != "coordinate": + raise ValueError("source graph has unknown labelling scheme") + + _source = source.copy() + for n in source_nodes: + if not _source.has_node(n): + warnings.warn( + f"Source graph is missing expected node {n}. We are manually adding it to the " + "graph, along with any other missing nodes.", UserWarning + ) + _source.add_nodes_from(source_nodes) + + # If the labels are coordinate. Then we just return the graph as is and the identity function + # for to_source: + + def to_source(n: tuple[int, int, int, int, int]) -> tuple[int, int, int, int, int]: + return n + + return _source, source_nodes, to_source + + +def _normalize_coordinate_target( + target: nx.Graph, + m: int, + t: int, +) -> tuple[nx.Graph, Callable[[tuple[int, int, int, int, int]], int | tuple[int, int, int, int, int]]]: + """Return a coordinate-labelled target graph and conversion callable. + + This helper normalises ``target`` to coordinate labels and returns a callable that maps + candidate nodes into the target's original label space. + + Similar to ``_normalize_coordinate_source``, but it does not return the full canonical node set + because the search only checks node presence in the target rather than iterating over all + nodes, and the target may be defective and missing some nodes. + + Args: + target: Target Zephyr graph, either linear or coordinate labelled. + m: Number of rows (must be consistent with ``target``). + t: Target tile count (must be consistent with ``target``). + + Returns: + Target subgraph relabelled into coordinates, and a callable that maps coordinate nodes back + to the original target labelling space. + + Raises: + ValueError: If target labels are unsupported. + """ + if target.graph["labels"] == "int": + coords = zephyr_coordinates(m, t) + to_tuple = coords.linear_to_zephyr + _target = zephyr_graph( + m, + t, + coordinates=True, + node_list=[to_tuple(n) for n in target.nodes()], + edge_list=[(to_tuple(n1), to_tuple(n2)) for n1, n2 in target.edges()], + ) + + def to_target_linear(n: tuple[int, int, int, int, int]) -> int: + return coords.zephyr_to_linear(n) + + return _target, to_target_linear + + if target.graph["labels"] != "coordinate": + raise ValueError("target graph has unknown labelling scheme") + + def to_target(n: tuple[int, int, int, int, int]) -> tuple[int, int, int, int, int]: + return n + + return target, to_target + + +def _boundary_proposals( + u: int, + w: int, + tp: int, + t: int, + embedding: dict[tuple[int, int, int, int, int], tuple[int, int, int, int, int]], + j: int = 0, + z: int = 0, +) -> set[tuple[int, int, int, int, int]]: + r"""Generate candidate targets for boundary expansion. + + For a fixed quotient index ``(u, w, j, z)``, this function proposes all target ``k`` locations + in that rail, then removes the entries already occupied by the currently mapped source + :math:`k \in \{0, \dots, tp-1\}`. + + Args: + u: Zephyr orientation. + w: Zephyr column index. + tp: Source tile count. + embedding: Current one-to-one proposal mapping. + j: Intra-cell orientation index. Default is 0. + z: Row index. Default is 0. + + Returns: + Available target coordinate nodes, each represented as the 5-tuple + ``(u, w, k, j, z)``, with fixed ``(u, w, j, z)``. + """ + all_target_coordinates = {(u, w, k, j, z) for k in range(t)} + used_coordinates = { + embedding[(u, w, k, j, z)] for k in range(tp) if (u, w, k, j, z) in embedding + } + return all_target_coordinates.difference(used_coordinates) + + +def _node_search( + source: nx.Graph, + target: nx.Graph, + embedding: dict[tuple[int, int, int, int, int], tuple[int, int, int, int, int]], + *, + expand_boundary_search: bool = True, + ksymmetric: bool = False, + yield_type: YieldType = "edge", +) -> dict[tuple[int, int, int, int, int], tuple[int, int, int, int, int]]: + r"""Greedy node-level quotient search over Zephyr coordinates. + + The source and target are viewed in quotient blocks indexed by :math:`(u, w, j, z)`, each + containing :math:`tp` source nodes. For each block, we propose target nodes with the same + :math:`(u, w, j, z)` and varying target :math:`k`, optionally augmented with boundary proposals. + + The scoring objective is: + + .. math:: + + \operatorname{score}(p) = + \begin{cases} + \sum\limits_{n \in B} \mathbf{1}[p_n \in V(T)] & \text{node yield}\\ + \sum\limits_{(n,m) \in E(S_B, S_\text{fixed})} + \mathbf{1}[(p_n, \phi(m)) \in E(T)] & \text{edge yield} + \end{cases} + + For a fixed quotient index :math:`q = (u, w, j, z)`, define the source block :math:`B_q` as + + .. math:: + + B_q = \{(u, w, k, j, z) : k \in \{0, \dots, tp-1\}\}. + + A proposal :math:`p` is an assignment on that block, :math:`p: B_q \to V(T)`, and can be + viewed as a length-``tp`` vector :math:`(p_0, \dots, p_{tp-1})` where :math:`p_k` is the + proposed target node for source node :math:`(u, w, k, j, z)`. + + Here :math:`T` is the target graph, :math:`V(T)` is its node set, and :math:`E(T)` is its edge + set. Let :math:`S` be the source graph and define the already-fixed outside set + + .. math:: + + F_q = \{m \in V(S) \setminus B_q : m \in \operatorname{dom}(\phi)\}, + + where :math:`\phi` is the current embedding. Then + + .. math:: + + E(S_B, S_\text{fixed}) + := \{(n,m) \in E(S) : n \in B_q,\ m \in F_q\}, + + i.e., the source edges that cross from the current block to already-fixed source nodes outside + the block. + + In other words, node yield counts how many proposed nodes :math:`p_n` are present in + :math:`V(T)`; while edge yield counts how many source edges crossing from the current block to + already-fixed nodes are preserved as target edges :math:`(p_n, \phi(m)) \in E(T)`. + + Yield types in this node-level search are interpreted as follows: ``"node"`` maximises target + node presence for each proposed block; ``"edge"`` maximises preserved cross-block + source-to-fixed edge connectivity; and ``"rail-edge"`` follows the same node-level scoring as + ``"edge"`` in this function (the distinction between ``"edge"`` and ``"rail-edge"`` is made + in rail-level search). + + Args: + source: Coordinate-labeled source Zephyr graph. Each coordinate node is a + 5-tuple ``(u, w, k, j, z)``. + target: Coordinate-labeled target Zephyr graph. Each coordinate node is a + 5-tuple ``(u, w, k, j, z)``. + embedding: Current mapping, updated in-place. + expand_boundary_search: If ``True``, augment boundary columns using the adjacent + internal column. Defaults to ``True``. + ksymmetric: If ``True``, assume the order of source ``k`` indices is interchangeable + for scoring and use top-``tp`` selection. Defaults to ``False``. + yield_type: ``"node"``, ``"edge"``, or ``"rail-edge"``. Defaults to ``"edge"``. + + Returns: + Updated embedding. + + Raises: + ValueError: If graph geometry metadata is inconsistent. + """ + m = source.graph["rows"] + tp = source.graph["tile"] + t = target.graph["tile"] + if m != target.graph["rows"]: + raise ValueError("source and target rows must match for node search") + + if expand_boundary_search: + # Visit interior columns first so boundary expansion can reuse already-assigned assignments: + uwjz_iterator = itertools.product( + range(2), + list(range(1, 2 * m)) + [0, 2 * m], + range(2), + range(m), + ) + ksymmetric_original = ksymmetric + else: + uwjz_iterator = itertools.product(range(2), range(2 * m + 1), range(2), range(m)) + + for u, w, j, z in uwjz_iterator: + # Base proposals preserve (u, w, j, z) and search only over target k-indices: + proposals = [(u, w, k, j, z) for k in range(t)] + + if expand_boundary_search: + if w == 0: + ksymmetric = False + # borrow candidates from adjacent internal column + proposals += list(_boundary_proposals(u, 1, tp, t, embedding, j, z)) + elif w == 2 * m: + ksymmetric = False + proposals += list(_boundary_proposals(u, 2 * m - 1, tp, t, embedding, j, z)) + else: + ksymmetric = ksymmetric_original + + if ksymmetric or yield_type != "edge": + if yield_type == "node": + # symmetry doesn't matter: just count how many proposed nodes are present in the + # target: + counts = [int(target.has_node(n_t)) for n_t in proposals] + else: + # Count preserved edges from already-mapped neighboring source nodes into each + # proposed target node. + source_neighbours = source.neighbors((u, w, 0, j, z)) + counts = [ + sum( + int(target.has_edge(embedding[n_s], n_t)) + for n_s in source_neighbours + if n_s in embedding + ) + for n_t in proposals + ] + # performance: this is faster than selected = proposals[np.argsort()]... + top_indices = np.argpartition(np.asarray(counts), -tp)[-tp:] + selected = [proposals[idx] for idx in top_indices] + else: + # Nodes with different k indices in the source block are not interchangeable, so we + # evaluate all permutations of the proposals: + permutation_scores = { + proposal_perm: sum( + int(target.has_edge(embedding[n], proposal_perm[k])) + for k in range(tp) + for n in source.neighbors((u, w, k, j, z)) + if n in embedding + ) + for proposal_perm in itertools.permutations(proposals, tp) + } + selected_key = max(permutation_scores, key=lambda k: permutation_scores[k]) + selected = list(selected_key) + + embedding.update({(u, w, k, j, z): proposal for k, proposal in zip(range(tp), selected)}) + + return embedding + + +def _rail_search( + source: nx.Graph, + target: nx.Graph, + embedding: dict[tuple[int, int, int, int, int], tuple[int, int, int, int, int]], + *, + expand_boundary_search: bool = True, + ksymmetric: bool = False, + yield_type: YieldType = "edge", +) -> dict[tuple[int, int, int, int, int], tuple[int, int, int, int, int]]: + r"""Greedy rail-level quotient search over Zephyr rails. + + A Zephyr rail is indexed by :math:`(u, w, k)` and contains nodes + :math:`(u, w, k, j, z)` for :math:`j \in \{0,1\}` and :math:`z \in \{0,\dots,m-1\}`. + + For fixed orientation and column :math:`(u, w)`, define the source rail family + + .. math:: + + \mathcal{R}^{S}_{u,w} := \{(u, w, k_s) : k_s \in \{0, \dots, t_p-1\}\}. + + The search chooses :math:`t_p` target rails for each family :math:`\mathcal{R}^{S}_{u,w}` + from candidate rails optionally augmented at boundaries (:math:`w=0` and :math:`w=2m`) using + adjacent interior columns. + + Let the target rail indexed by :math:`(u, w_t, k_t)` be + + .. math:: + + R^{T}_{u,w_t,k_t} := + \{(u, w_t, k_t, j, z) : j \in \{0,1\},\ z \in \{0,\dots,m-1\}\}. + + We can define its objective for ``yield_type='edge'`` as the number of edges preserved within + that rail, i.e., the number of edges in the target subgraph induced by the proposed rail, or + equivalently the number of edges in the source rail (which is fixed) that are preserved by the + proposal: + + .. math:: + + Q(u,w_t,k_t) := |E(T[R^{T}_{u,w_t,k_t}])|, + + or, for ``yield_type='node'``, the number of present target nodes in that rail. Here :math:`T` + is the target graph and :math:`E(T[R])` is the edge set of the target subgraph induced by node + set :math:`R`. + For ``yield_type='edge'``, each proposal also gets an external connectivity term counting + preserved edges from already-embedded neighbouring source nodes into the proposed target rail. + + .. math:: + + \operatorname{score}(u,w_t,k_t) + = Q(u,w_t,k_t) + + \sum \mathbf{1}[\text{external source edge maps to a target edge}]. + + Depending on ``ksymmetric``, the algorithm either selects the top :math:`t_p` rail proposals by + score (treating source :math:`k` order as interchangeable), or evaluates permutations assigning + proposal rails to source indices :math:`k_s \in \{0,\dots,t_p-1\}`. + + Yield types in this rail-level search are interpreted as follows: ``"node"`` scores each + proposal rail by the number of present target nodes in that rail. ``"edge"`` prefers rails + that both have many internal rail edges and connect well to already-embedded neighbouring + rails. ``"rail-edge"`` focuses first on how good the rail itself is, measured by the number of + target edges inside that rail; when permutations are evaluated, it also includes the same + already-embedded neighbour consistency term as ``"edge"``. + + Example: suppose two candidate target rails have the same internal rail structure, but one of + them has more edges to neighbouring rails that are already fixed in the embedding. Then + ``"edge"`` prefers that better-connected rail, while ``"rail-edge"`` treats the two rails as + equivalent in the top-rail selection path because it only compares their internal rail + structure there. + + Selected rails are then expanded back to node assignments for all :math:`(j,z)` in + each source rail. + + Args: + source: Coordinate-labeled source Zephyr graph. Each coordinate node is a + 5-tuple ``(u, w, k, j, z)``. + target: Coordinate-labeled target Zephyr graph. Each coordinate node is a + 5-tuple ``(u, w, k, j, z)``. + embedding: Current mapping, updated in-place. + expand_boundary_search: If ``True``, include adjacent-column rail proposals when + :math:`w` is at a boundary. Defaults to ``True``. + ksymmetric: If ``True``, treat source :math:`k` order as interchangeable when scoring + rails. Defaults to ``False``. + yield_type: ``"node"``, ``"edge"``, or ``"rail-edge"``. Defaults to ``"edge"``. + + Returns: + Updated embedding. + + Raises: + ValueError: If duplicate target assignments are produced. + """ + m = source.graph["rows"] + tp = source.graph["tile"] + t = target.graph["tile"] + + if yield_type == "node": + rail_score = { + (u, w, k): sum(target.has_node((u, w, k, j, z)) for j in range(2) for z in range(m)) + for u in range(2) + for w in range(2 * m + 1) + for k in range(t) + } + else: + # Precompute per-rail edge number for fast proposal scoring. + rail_score = { + (u, w, k): target.subgraph( + {(u, w, k, j, z) for j in range(2) for z in range(m)} + ).number_of_edges() + for u in range(2) + for w in range(2 * m + 1) + for k in range(t) + } + + # when optimising for edges, we consider all edges that do not share the same orientation + source_external_edges = ( + source.edge_subgraph({e for e in source.edges() if e[0][0] != e[1][0]}) + if "edge" in yield_type + else None + ) + + if expand_boundary_search: + uw_iterator = itertools.product(range(2), list(range(1, 2 * m)) + [0, 2 * m]) + ksymmetric_original = ksymmetric + else: + uw_iterator = itertools.product(range(2), range(2 * m + 1)) + + for u, w in uw_iterator: + # rail proposals preserve orientation in the target graph and only move in (w, k) quotient + # graph. + proposals = [(w, k) for k in range(t)] + + if expand_boundary_search: + if w == 0: + # b[1:3] is taken because those are the w and k indices + proposals += [b[1:3] for b in _boundary_proposals(u, 1, tp, t, embedding)] + ksymmetric = False + elif w == 2 * m: + proposals += [b[1:3] for b in _boundary_proposals(u, 2 * m - 1, tp, t, embedding)] + ksymmetric = False + else: + ksymmetric = ksymmetric_original + + if ksymmetric or yield_type == "node": + if yield_type in ("node", "rail-edge"): + counts = [rail_score[(u, w_t, k_t)] for w_t, k_t in proposals] + else: + # the other only possibility is that yield_type == "edge". The following check is + # just to avoid linter complaint about source_external_edges being possibly None. + if source_external_edges is None: + raise ValueError("internal error: missing external edge subgraph") + counts = [ + rail_score[(u, w_t, k_t)] + + sum( + int(target.has_edge(embedding[n_s], (u, w_t, k_t, j, z))) + for j in range(2) + for z in range(m) + # n_s will be nodes in the source graph with a different orientation + # to the current rail, that are neighbours of nodes in the current rail. + # Note that we pick k=0 because ksymmetric means that all k indices in the + # source rail are interchangeable, so we can just look at one of them. + for n_s in source_external_edges.neighbors((u, w, 0, j, z)) + if n_s in embedding + ) + for w_t, k_t in proposals + ] + + p_indices = np.argpartition(np.asarray(counts), -tp)[-tp:] + # Apply chosen rails to all nodes in the quotient rail block. + embedding.update( + { + (u, w, k, j, z): (u,) + proposals[p_indices[k]] + (j, z) + for k in range(tp) + for j in range(2) + for z in range(m) + } + ) + else: + # this path is activated when ksymmetric is False and yield_type is either "edge" or + # "rail-edge". + if source_external_edges is None: + raise ValueError("internal error: missing external edge subgraph") + permutation_scores = { + proposal_perm: sum(rail_score[(u,) + proposal] for proposal in proposal_perm) + + sum( + int(target.has_edge(embedding[n_s], (u,) + proposal + (j, z))) + for k_s, proposal in enumerate(proposal_perm) + for j in range(2) + for z in range(m) + for n_s in source_external_edges.neighbors((u, w, k_s, j, z)) + if n_s in embedding + ) + for proposal_perm in itertools.permutations(proposals, tp) + } + selected = max(permutation_scores, key=lambda k: permutation_scores[k]) + embedding.update( + { + (u, w, k, j, z): (u,) + selected[k] + (j, z) + for k in range(tp) + for j in range(2) + for z in range(m) + } + ) + + if len(set(embedding.values())) != len(embedding): + raise ValueError("Duplicate target coordinates detected in embedding") + + return embedding + + +def zephyr_quotient_search( + source: nx.Graph, + target: nx.Graph, + *, + quotient_search: QuotientSearchType = "by_quotient_rail", + embedding: dict[tuple[int, int, int, int, int], tuple[tuple[int, int, int, int, int], ...]] | None = None, + expand_boundary_search: bool = True, + ksymmetric: bool = False, + yield_type: YieldType = "edge", +) -> tuple[dict[tuple[int, int, int, int, int], tuple[tuple[int, int, int, int, int], ...]], ZephyrSearchMetadata]: + r"""Compute a high-yield Zephyr-to-Zephyr embedding. + + This routine starts from a source Zephyr graph with ``m`` rows and ``tp`` tiles, + and maps it into a target Zephyr graph with the same ``m`` rows and ``t >= tp`` + tiles. It is designed for defective targets where a direct identity map may lose + nodes or edges. Since a greedy method is used for embedding search, it is possible it fails to + find a 1:1 embedding where one is viable. A complete method such as + :code:``minorminer.subgraph.find_subgraph`` may be more appropriate in a scenario such as this, + especially with customization of parameters to the target families. Similarly, when defect rates + are high direct use of :code:``minorminer.find_embedding`` may be a more efficient strategy. + + The search is organized around the **quotient graph** of the Zephyr topology, formed by + contracting fine-grained coordinate indices so that each equivalence class maps to a single + quotient node. Two coarsenings are used: + + - **Quotient node** block :math:`(u, w, j, z)`: groups the ``tp`` source nodes that share + orientation ``u``, column ``w``, intra-cell index ``j``, and row ``z`` but differ in + tile index :math:`k \in \{0, \dots, tp-1\}`. + - **Quotient rail** block :math:`(u, w)`: groups all :math:`2 m \cdot tp` nodes that share + orientation ``u`` and column ``w`` (i.e. a whole Zephyr rail family) before any + :math:`(k, j, z)` variation. + + The function can be used in (1) node-level mode (``quotient_search='by_quotient_node'``), where + each quotient node block :math:`(u,w,j,z)` is optimized by choosing target candidates with the + same :math:`(u,w,j,z)` and selecting the highest-yield proposals; (2) rail-level mode + (``quotient_search='by_quotient_rail'``): optimise each quotient rail block :math:`(u,w,:)` by + selecting rails :math:`(u,w_t,k_t)` that maximise yield.; and (3) hybrid mode + (``quotient_search='by_rail_then_node'``): rail search followed by node refinement. + + When ``expand_boundary_search=True``, boundary columns ``w=0`` and ``w=2m`` are augmented using + proposals drawn from adjacent internal columns. Whenever this behaviour is activated, nodes from + the internal columns are assigned first, so that the unassigned nodes in the internal columns + adjacent to the boundaries can be considered as proposals when optimising the boundary columns. + + Yield types control what the greedy search tries to preserve. ``"node"`` tries to place as + many source nodes as possible onto target nodes that actually exist. ``"edge"`` tries to + preserve source edges throughout the search. ``"rail-edge"`` is a mixed strategy: during rail + search it first prefers rails that are internally well-formed, and if a node-refinement phase + runs afterward it switches to ordinary edge-preservation scoring. The final yield for both + ``"edge"`` and ``"rail-edge"`` is reported as a number of preserved source edges. + + Args: + source: Zephyr source graph (linear or coordinate labels). + target: Zephyr target graph (linear or coordinate labels). + quotient_search: Search strategy. One of ``'by_quotient_rail'``, + ``'by_quotient_node'``, or ``'by_rail_then_node'``. See full docstrings for a + description of these. Defaults to ``'by_quotient_rail'``. + embedding: Optional initial one-to-one chain mapping. If omitted, + the identity on source coordinate indices is used (wrapped in singleton chains). + Defaults to ``None``. This must be a chain mapping where each source node maps to + a tuple of one or more target nodes (e.g., ``{source_node: (target_node,)}`` for + singleton chains). In coordinate form, each node is the 5-tuple + ``(u, w, k, j, z)``. + expand_boundary_search: Enable additional boundary proposals. Defaults to ``True``. + ksymmetric: Assume source ``k`` ordering can be treated symmetrically during greedy + selection when valid. Defaults to ``False``. + yield_type: Optimization objective: ``'node'``, ``'edge'``, or ``'rail-edge'``. + See full docstrings for a description of these. Defaults to ``'edge'``. + + Returns: + A pruned one-to-one chain embedding of the form ``source_node -> (target_node,)`` (singleton + chains) that contains only mappings whose target node exists in the target, and a + :class:`ZephyrSearchMetadata` namedtuple with fields ``max_num_yielded``, + ``starting_num_yielded``, and ``final_num_yielded``. + + .. note:: + If you want to embed a Zephyr graph with parameter ``mp`` < ``m``, where ``m`` is the row + count of the target, you can use + ``minorminer.utils.parallel_embeddings.find_sublattice_embeddings`` to locate a compatible + ``mp``-row sublattice first, then pass that induced subgraph as the target. + + .. code-block:: python + + import networkx as nx + import dwave_networkx as dnx + from minorminer.utils.parallel_embeddings import find_sublattice_embeddings + + # Build an mp-row Zephyr tile and locate it in the original target. + tile = dnx.zephyr_graph(mp, target.graph["tile"], coordinates=True) + tile_embs = find_sublattice_embeddings( + S=tile, + T=target, + max_num_emb=1, + one_to_iterable=False, + ) + + if tile_embs: + tile_to_target = tile_embs[0] # pick the first one + mp_nodes = set(tile_to_target.values()) + target_mp = target.subgraph(mp_nodes).copy() + + # Relabel to canonical mp coordinates expected by source/target metadata. + target_to_tile = {tgt: tile_n for tile_n, tgt in tile_to_target.items()} + target_mp = nx.relabel_nodes(target_mp, target_to_tile, copy=True) + target_mp.graph.update(family="zephyr", rows=mp, tile=target.graph["tile"], + labels="coordinate") + + emb_mp, metadata = zephyr_quotient_search(source, target_mp) + + # Map the final embedding back to the original target labels. + emb_in_original_target = { + s: tuple(tile_to_target[v] for v in chain) + for s, chain in emb_mp.items() + } + + If you want to refine a non-full-yield result with an external solver, run + :func:`zephyr_quotient_search` first and only call the refinement routine when + ``metadata.final_num_yielded < metadata.max_num_yielded``. + + .. code-block:: python + + emb, metadata = zephyr_quotient_search(source, target, yield_type="edge") + if metadata.final_num_yielded < metadata.max_num_yielded: + import minorminer + + initial_chains = {s: chain for s, chain in emb.items() if chain[0] in target} + refined = minorminer.find_embedding( + S=source, + T=target, + initial_chains=initial_chains, + timeout=5, # or whatever you want + ) + """ + + _validate_graph_inputs(source, target) + m, tp, t = _extract_graph_properties(source, target) + _validate_search_parameters(quotient_search, yield_type, embedding) + + # Make sure source and target are in coordinate form (5-tuples) + _source, source_nodes, to_source = _normalize_coordinate_source(source, m, tp) + _target, to_target = _normalize_coordinate_target(target, m, t) + target_nodeset = set(_target.nodes()) + + if embedding is None: + # Start with the identity mapping + working_embedding = {n: n for n in source_nodes} + else: + # Convert chain format to internal single-node format + working_embedding = {k: v[0] for k, v in embedding.items()} + + if yield_type == "node": + max_num_yielded = source.number_of_nodes() + num_yielded = sum( + _target.has_node(working_embedding[n]) + for n in _source.nodes() + if n in working_embedding + ) + else: + max_num_yielded = source.number_of_edges() + num_yielded = sum( + _target.has_edge(working_embedding[n1], working_embedding[n2]) + for n1, n2 in _source.edges() + if n1 in working_embedding and n2 in working_embedding + ) + + full_yield = max_num_yielded == num_yielded + starting_yield = num_yielded + + if not full_yield: + supplement = quotient_search == "by_rail_then_node" + + if quotient_search == "by_quotient_rail" or supplement: + working_embedding = _rail_search( + source=_source, + target=_target, + embedding=working_embedding, + # if quotient_search is by_rail_then_node, we expand boundary search only in the + # node search, and disable it in the rail search: + expand_boundary_search=((not supplement) and expand_boundary_search), + ksymmetric=ksymmetric, + yield_type=yield_type, + ) + if supplement: + working_embedding = _node_search( + source=_source, + target=_target, + embedding=working_embedding, + expand_boundary_search=expand_boundary_search, + ksymmetric=False, + yield_type=yield_type, + ) + elif quotient_search == "by_quotient_node": + working_embedding = _node_search( + source=_source, + target=_target, + embedding=working_embedding, + expand_boundary_search=expand_boundary_search, + ksymmetric=ksymmetric, + yield_type=yield_type, + ) + + if yield_type == "node": + num_yielded = sum( + _target.has_node(working_embedding[n]) + for n in _source.nodes() + if n in working_embedding + ) + else: + num_yielded = sum( + _target.has_edge(working_embedding[n1], working_embedding[n2]) + for n1, n2 in _source.edges() + if n1 in working_embedding and n2 in working_embedding + ) + full_yield = max_num_yielded == num_yielded + + if num_yielded < starting_yield: + raise ValueError("Greedy quotient search reduced the objective value") + + # If there are unfeasible mappings to target nodes, the final working_embedding might contain + # entries that map to non-existent target nodes. We prune those out before returning the final + # embedding: + pruned_embedding = { + to_source(k): to_target(v) for k, v in working_embedding.items() if v in target_nodeset + } + + # Convert to chain format for return value + pruned_embedding = {k: (v,) for k, v in pruned_embedding.items()} + + if full_yield and yield_type != "node": + verify_embedding(emb=pruned_embedding, source=source, target=target) + + metadata = ZephyrSearchMetadata( + max_num_yielded=max_num_yielded, + starting_num_yielded=starting_yield, + final_num_yielded=num_yielded, + ) + return pruned_embedding, metadata diff --git a/examples/fully_yielded_zephyr_subgraph.py b/examples/fully_yielded_zephyr_subgraph.py new file mode 100644 index 0000000..d527405 --- /dev/null +++ b/examples/fully_yielded_zephyr_subgraph.py @@ -0,0 +1,158 @@ +# Copyright 2026 D-Wave +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import dwave_networkx as dnx +import networkx as nx +import numpy as np +from minorminer import find_embedding +from minorminer.utils.parallel_embeddings import find_sublattice_embeddings + +from dwave.experimental.embedding_methods import zephyr_quotient_search + +seed = 12345 +rng = np.random.default_rng(seed) + +print( + "This example demonstrates how to use zephyr_quotient_search to find a full-yield embedding of " + "a smaller Zephyr graph into a larger, defective Zephyr graph. Since zephyr_quotient_search " + " finds embeddings for source and target graphs with the same number of rows, this example " + "shows how to use find_sublattice_embeddings to first identify a complete sublattice in the " + "defective target that matches the smaller source graph's parameters, and then run " + "zephyr_quotient_search on that sublattice. " +) + +tile = dnx.zephyr_graph(6, 4, coordinates=True) +target = dnx.zephyr_graph(12, 4, coordinates=True) +print( + "Step 1: Build two Zephyr graphs.\nThe smaller graph is the m=6, t=4 tile we want to recover " + f"({tile.number_of_nodes()} nodes, {tile.number_of_edges()} edges), and the larger graph is the" + " m=12, t=4 target that will later be damaged " + f"({target.number_of_nodes()} nodes, {target.number_of_edges()} edges)." +) + +# first, identify one complete m=6, t=4 sublattice in the pristine target. +reference_embeddings = find_sublattice_embeddings( + S=tile, + T=target, + max_num_emb=1, + one_to_iterable=False, + seed=seed, +) + +print( + "Step 2: In the defect-free target, search for one complete copy of the smaller graph. " + f"The search found {len(reference_embeddings)} candidate sublattices. We will protect the " + "first one so we know the damaged target still contains a valid solution." +) + + +# now, remove 10% random nodes from outside the sublattice that was found before +protected_nodes = set(reference_embeddings[0].values()) +num_remove = int(0.1 * target.number_of_nodes()) +removable_nodes = [n for n in target.nodes() if n not in protected_nodes] +removed_idx = rng.choice(len(removable_nodes), size=num_remove, replace=False) +removed_nodes = [removable_nodes[i] for i in removed_idx] +target.remove_nodes_from(removed_nodes) + +print( + "Step 3: Created a defective target by randomly removing qubits outside the protected " + f"sublattice. We keep {len(protected_nodes)} nodes untouched, remove {len(removed_nodes)} " + f"nodes, and end up with a damaged target containing {target.number_of_nodes()} nodes and " + f"{target.number_of_edges()} edges." +) + +# this finishes up creating our "defective" target graph, which, by construction, still contains at +# least one complete m=6, t=4 sublattice, but is now missing 10% of the nodes outside that +# sublattice. + +# our example actually starts here. we start from this defective target graph, so we need to +# discover a complete m=6, t=4 sublattice in the defective target. +tile_embeddings = find_sublattice_embeddings( + S=tile, + T=target, + max_num_emb=1, + one_to_iterable=False, + seed=seed, +) +tile_embedding = tile_embeddings[0] # pick the first embedding. + +print( + "Step 4: Starting only from the defective target, search again for a complete m=6, t=4 " + f"sublattice. The algorithm found {len(tile_embeddings)} valid sublattice(s); this example " + "continues with the first one." +) + +# Relabel to canonical m=6 coordinates before zephyr_quotient_search. +sublattice_nodes = set(tile_embedding.values()) +target_sub = target.subgraph(sublattice_nodes).copy() +inv_map = {target_node: tile_node for tile_node, target_node in tile_embedding.items()} +target_sub = nx.relabel_nodes(target_sub, inv_map, copy=True) +target_sub.graph.update(family="zephyr", rows=6, tile=4, labels="coordinate") + +print( + "Step 5: Relabel the recovered sublattice into canonical m=6 coordinates so quotient search can" + f" work on a standard Zephyr graph. The relabeled subgraph has {target_sub.number_of_nodes()} " + f"nodes and {target_sub.number_of_edges()} edges." +) + +# embed source zephyr(mp=6, tp=2) into the found complete m=6, t=4 sublattice. +source = dnx.zephyr_graph(6, 2, coordinates=True) +print( + "Step 6: Build the source graph we actually want to place into that recovered sublattice. " + f"Here the source is a Zephyr m=6, t=2 graph with {source.number_of_nodes()} nodes and " + f"{source.number_of_edges()} edges." +) + +emb, metadata = zephyr_quotient_search(source, target_sub, yield_type="edge") + +print( + "Step 7: Run zephyr_quotient_search on the canonical sublattice. It successfully placed " + f"{metadata.final_num_yielded} of {metadata.max_num_yielded} source edges." +) + +# If not full-yield, refine with minorminer.find_embedding. +best_embedding = emb +if metadata.final_num_yielded < metadata.max_num_yielded: + print( + "Step 8: The quotient search did not reach full yield, so we pass its chains to minorminer " + "as an initial guess and try to refine the embedding." + ) + refined = find_embedding( + S=source, + T=target_sub, + initial_chains=emb, + timeout=50, + ) + if refined: + best_embedding = refined + print( + "The refinement returned an embedding, improving the result to " + f"{len(best_embedding)} mapped source nodes." + ) + else: + print( + "The refinement step did not return a better embedding, " + "so the script keeps the quotient-search result." + ) +else: + print( + "Step 8: The quotient search already achieved full yield, so no refinement step is needed." + ) + +# map back to original target labels, which can be used as the effective embedding for the source +# into the original target. +embedding_in_original_target = { + s: tuple(tile_embedding[v] for v in chain) + for s, chain in best_embedding.items() +} diff --git a/releasenotes/notes/add-zephyr-quotient-embedding-search-b19297aca3ea3be2.yaml b/releasenotes/notes/add-zephyr-quotient-embedding-search-b19297aca3ea3be2.yaml new file mode 100644 index 0000000..7650679 --- /dev/null +++ b/releasenotes/notes/add-zephyr-quotient-embedding-search-b19297aca3ea3be2.yaml @@ -0,0 +1,6 @@ +--- +features: + - | + Add ``dwave.experimental.embedding_methods.zephyr_quotient_search`` for + finding fully yielded Zephyr-to-Zephyr subgraph embeddings on hardware + graphs, with configurable search strategy and yield objective. diff --git a/tests/test_zephyr_quotient_search.py b/tests/test_zephyr_quotient_search.py new file mode 100644 index 0000000..3d1f840 --- /dev/null +++ b/tests/test_zephyr_quotient_search.py @@ -0,0 +1,431 @@ +# Copyright 2026 D-Wave +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import itertools +import unittest + +import networkx as nx +import numpy as np +from dwave_networkx import zephyr_graph + +from dwave.experimental.embedding_methods import zephyr_quotient_search +from dwave.experimental.embedding_methods.zephyr_quotient_embedding_search import \ + ZephyrSearchMetadata + + +def generate_faulty_zephyr_graph( + m: int, t: int, proportion: float, uniform_proportion: float, seed: int | None = None +) -> nx.Graph: + """Create a Zephyr graph with simulated hardware faults. + + Nodes are deleted in two phases: (1) ``round(proportion * uniform_proportion * N)`` nodes are + chosen uniformly at random and removed; (2) ``round(proportion * (1 - uniform_proportion) * N)`` + additional nodes are removed iteratively, one node at a time. + + During phase (2), for each candidate node ``v`` we compute + ``r(v) = sum(dist(v, d) for d in D)``, where ``D`` is the current set of deleted nodes and + ``dist`` is shortest-path distance in the original (unfaulted) graph. The next deleted node is + sampled with probability proportional to ``1 / r(v)``. After each deletion, distances are + updated by adding shortest-path contributions from the newly deleted node, so probabilities are + re-evaluated at every iteration. This makes nodes near multiple already deleted nodes more + likely to fail than nodes near fewer deleted nodes. + + Nodes that are unreachable from at least one deleted node have zero weight and are not selected. + The two phases remove approximately ``proportion`` of all nodes. + + Args: + m: Zephyr row count. + t: Zephyr tile count. + proportion: Total fraction of nodes to remove, in ``(0, 1)``. + uniform_proportion: Fraction of removed nodes that are chosen + uniformly (the complementary fraction is chosen by distance-based + sampling). + seed: RNG seed for reproducibility. Defaults to ``None``. + + Returns: + Copy of the full Zephyr graph with faulty nodes removed. + All graph-level metadata (family, rows, tile, labels) is preserved. + """ + rng = np.random.default_rng(seed) + full_graph = zephyr_graph(m, t, coordinates=True) + all_nodes = list(full_graph.nodes()) + N = len(all_nodes) + + # Phase 1: uniform random deletion + n_uniform = round(proportion * uniform_proportion * N) + uniform_indices = rng.choice(N, size=n_uniform, replace=False) + deleted_nodes = {all_nodes[i] for i in uniform_indices} + + # Phase 2: iterative distance-based deletion with dynamic updates + n_distance = round(proportion * (1 - uniform_proportion) * N) + deleted_distance = set() + + if n_distance > 0 and deleted_nodes: + # cumulative_dist[v] stores sum(dist(v, d) for d in current deleted set D) + cumulative_dist = {node: 0.0 for node in all_nodes} + for deleted_node in deleted_nodes: + distances = nx.single_source_shortest_path_length(full_graph, deleted_node) + for node, dist in distances.items(): + cumulative_dist[node] += dist + + for _ in range(n_distance): + current_deleted = deleted_nodes | deleted_distance + remaining = [node for node in all_nodes if node not in current_deleted] + if not remaining: + break + + weights = np.array( + [ + (1.0 / cumulative_dist[node]) if cumulative_dist[node] > 0 else 0.0 + for node in remaining + ] + ) + total_weight = float(weights.sum()) + probs = weights / total_weight + chosen_index = rng.choice(len(remaining), size=1, p=probs)[0] + chosen_node = remaining[chosen_index] + deleted_distance.add(chosen_node) + + distances = nx.single_source_shortest_path_length(full_graph, chosen_node) + for node, dist in distances.items(): + cumulative_dist[node] += dist + + faulty_graph = full_graph.copy() + faulty_graph.remove_nodes_from(deleted_nodes | deleted_distance) + return faulty_graph + + +class TestYieldImprovement(unittest.TestCase): + """Check that the greedy search never reduces the yield objective.""" + + _SOURCE_M = 6 + _SOURCE_TP = 2 + _TARGET_M = 6 + _TARGET_T = 4 + _PROPORTION = 0.10 + _UNIFORM_PROPORTION = 0.10 + _SEED = 7795 + _TRUE_FALSE = [True, False] + _YIELD_TYPES = ["node", "edge", "rail-edge"] + _BY_STRATEGIES = ["by_quotient_rail", "by_quotient_node", "by_rail_then_node"] + + @classmethod + def setUpClass(cls): + cls.source = zephyr_graph(cls._SOURCE_M, cls._SOURCE_TP, coordinates=True) + cls.target = generate_faulty_zephyr_graph( + cls._TARGET_M, + cls._TARGET_T, + proportion=cls._PROPORTION, + uniform_proportion=cls._UNIFORM_PROPORTION, + seed=cls._SEED, + ) + # Make sure that the target is a connected graph: + if not nx.is_connected(cls.target): + raise ValueError("Generated target graph is not connected; adjust parameters or seed.") + + def _assert_search_improves_yield( + self, yield_type, quotient_search, expand_boundary_search, ksymmetric, + ): + sub_emb, metadata = zephyr_quotient_search( + self.source, + self.target, + yield_type=yield_type, + quotient_search=quotient_search, + expand_boundary_search=expand_boundary_search, + ksymmetric=ksymmetric, + ) + + self.assertIsInstance(metadata, ZephyrSearchMetadata) + self.assertGreaterEqual( + metadata.final_num_yielded, + metadata.starting_num_yielded, + msg=( + f"Yield decreased from {metadata.starting_num_yielded} to " + f"{metadata.final_num_yielded} with yield_type={yield_type}, " + f"quotient_search={quotient_search}, " + f"expand={expand_boundary_search}, ksymmetric={ksymmetric}" + ), + ) + # this should be impossible, but just double checking: + self.assertLessEqual(metadata.final_num_yielded, metadata.max_num_yielded) + + target_nodes = set(self.target.nodes()) + # check the nodes the source was embedded onto are actually in the target + # Flatten the chain tuples to check if all target nodes are in the target graph + all_target_nodes = {node for chain in sub_emb.values() for node in chain} + self.assertTrue(all_target_nodes.issubset(target_nodes)) + # check the nodes in the subgraph embedding are actually in the source + self.assertTrue(set(sub_emb.keys()).issubset(set(self.source.nodes()))) + + def test_search_yields_improvement(self): + for quotient_search, expand, ksym, yt in itertools.product( + self._BY_STRATEGIES, self._TRUE_FALSE, self._TRUE_FALSE, self._YIELD_TYPES, + ): + with self.subTest( + quotient_search=quotient_search, + expand_boundary_search=expand, + ksymmetric=ksym, + yield_type=yt, + ): + self._assert_search_improves_yield( + yield_type=yt, + quotient_search=quotient_search, + expand_boundary_search=expand, + ksymmetric=ksym, + ) + + +class TestMetadataConsistency(unittest.TestCase): + """Verify the ZephyrSearchMetadata fields are internally consistent.""" + + @classmethod + def setUpClass(cls): + cls.source = zephyr_graph(6, 2, coordinates=True) + cls.target = generate_faulty_zephyr_graph( + 6, 4, proportion=0.10, uniform_proportion=0.10, seed=7795 + ) + + def test_metadata_ordering(self): + """max >= final >= starting >= 0 for all yield types.""" + for yt in ("node", "edge", "rail-edge"): + with self.subTest(yield_type=yt): + _sub, metadata = zephyr_quotient_search( + self.source, + self.target, + yield_type=yt, + ) + self.assertGreaterEqual(metadata.max_num_yielded, 0) + self.assertGreaterEqual(metadata.starting_num_yielded, 0) + self.assertGreaterEqual(metadata.final_num_yielded, 0) + self.assertGreaterEqual( + metadata.max_num_yielded, metadata.final_num_yielded + ) + self.assertGreaterEqual( + metadata.final_num_yielded, metadata.starting_num_yielded + ) + + def test_full_target_gives_full_yield(self): + """A perfect target should achieve full yield immediately (starting == final == max).""" + full_target = zephyr_graph(6, 4, coordinates=True) + for yt in ("node", "edge"): + with self.subTest(yield_type=yt): + _sub, metadata = zephyr_quotient_search( + self.source, + full_target, + yield_type=yt, + ) + self.assertEqual(metadata.starting_num_yielded, metadata.max_num_yielded) + self.assertEqual(metadata.final_num_yielded, metadata.max_num_yielded) + + def test_return_is_two_tuple(self): + sub_emb, metadata = zephyr_quotient_search(self.source, self.target) + self.assertIsInstance(sub_emb, dict) + self.assertIsInstance(metadata, ZephyrSearchMetadata) + + +class TestGraphInputValidation(unittest.TestCase): + """Tests for TypeError / ValueError raised by _validate_graph_inputs.""" + + def setUp(self): + self.source = zephyr_graph(6, 2, coordinates=True) + self.target = zephyr_graph(6, 4, coordinates=True) + + def test_non_graph_source_or_target_raises_type_error(self): + with self.assertRaisesRegex(TypeError, r"source and target must both be networkx"): + zephyr_quotient_search("not_a_graph", self.target) # type: ignore + with self.assertRaisesRegex(TypeError, r"source and target must both be networkx"): + zephyr_quotient_search(self.source, 42) # type: ignore + + def test_source_or_target_wrong_family_raises_value_error(self): + bad_graph = self.source.copy() + bad_graph.graph["family"] = "chimera" + with self.assertRaisesRegex(ValueError, r"source graph should be a zephyr family graph"): + zephyr_quotient_search(bad_graph, self.target) + with self.assertRaisesRegex(ValueError, r"target graph should be a zephyr family graph"): + zephyr_quotient_search(self.source, bad_graph) + + def test_source_or_target_missing_rows_metadata_raises_value_error(self): + graph_no_rows = self.source.copy() + del graph_no_rows.graph["rows"] + with self.assertRaisesRegex(ValueError, r"source graph is missing required 'rows'"): + zephyr_quotient_search(graph_no_rows, self.target) + with self.assertRaisesRegex(ValueError, r"target graph is missing required 'rows'"): + zephyr_quotient_search(self.source, graph_no_rows) + + def test_source_or_target_missing_tile_metadata_raises_value_error(self): + graph_no_tile = self.source.copy() + del graph_no_tile.graph["tile"] + with self.assertRaisesRegex(ValueError, r"source graph is missing required 'tile'"): + zephyr_quotient_search(graph_no_tile, self.target) + with self.assertRaisesRegex(ValueError, r"target graph is missing required 'tile'"): + zephyr_quotient_search(self.source, graph_no_tile) + + def test_source_or_target_missing_labels_metadata_raises_value_error(self): + graph_no_labels = self.source.copy() + del graph_no_labels.graph["labels"] + with self.assertRaisesRegex(ValueError, r"source graph is missing required 'labels'"): + zephyr_quotient_search(graph_no_labels, self.target) + with self.assertRaisesRegex(ValueError, r"target graph is missing required 'labels'"): + zephyr_quotient_search(self.source, graph_no_labels) + + def test_incompatible_m_raises_value_error(self): + target_diff_m = zephyr_graph(5, 4, coordinates=True) + with self.assertRaisesRegex( + ValueError, r"source and target must have the same number of rows" + ): + zephyr_quotient_search(self.source, target_diff_m) + + def test_target_tile_less_than_source_tile_raises_value_error(self): + small_tile_target = self.target.copy() + small_tile_target.graph["tile"] = 1 # less than source tp=2 + with self.assertRaisesRegex( + ValueError, r"target tile count must be >= source tile count" + ): + zephyr_quotient_search(self.source, small_tile_target) + + def test_non_integer_rows_metadata_raises_type_error(self): + bad_source = self.source.copy() + bad_source.graph["rows"] = "six" + with self.assertRaisesRegex(TypeError, r"graph 'rows' metadata must be an integer"): + zephyr_quotient_search(bad_source, self.target) + + def test_non_positive_rows_metadata_raises_value_error(self): + bad_source = self.source.copy() + bad_source.graph["rows"] = 0 + with self.assertRaisesRegex(ValueError, r"graph 'rows' metadata must be positive"): + zephyr_quotient_search(bad_source, self.target) + + +class TestSearchParameterValidation(unittest.TestCase): + """Tests for TypeError / ValueError raised by _validate_search_parameters.""" + + def setUp(self): + self.source = zephyr_graph(6, 2, coordinates=True) + self.target = zephyr_graph(6, 4, coordinates=True) + + def test_invalid_quotient_search_raises_value_error(self): + with self.assertRaisesRegex(ValueError, r"quotient_search must be one of"): + zephyr_quotient_search( + self.source, self.target, quotient_search="unknown_strategy" # type: ignore + ) + + def test_invalid_yield_type_raises_value_error(self): + with self.assertRaisesRegex(ValueError, r"yield_type must be one of"): + zephyr_quotient_search( + self.source, self.target, yield_type="invalid" # type: ignore + ) + + def test_non_dict_embedding_raises_type_error(self): + with self.assertRaisesRegex( + TypeError, r"embedding must be a dictionary when provided" + ): + zephyr_quotient_search( + self.source, self.target, embedding=[1, 2, 3] # type: ignore + ) + + def test_embedding_with_non_tuple_keys_raises_value_error(self): + """Embedding keys must be 5-tuples, not other types.""" + bad_embedding = {"not_a_tuple": ((0, 0, 0, 0, 0),)} # type: ignore + with self.assertRaisesRegex( + ValueError, r"embedding keys must be 5-tuples representing Zephyr coordinates" + ): + zephyr_quotient_search( + self.source, self.target, embedding=bad_embedding # type: ignore + ) + + def test_embedding_with_wrong_length_tuple_keys_raises_value_error(self): + """Embedding keys must be exactly 5-tuples.""" + bad_embedding = {(0, 0, 0, 0): ((0, 0, 0, 0, 0),)} # 4-tuple key instead of 5-tuple + with self.assertRaisesRegex( + ValueError, r"embedding keys must be 5-tuples representing Zephyr coordinates" + ): + zephyr_quotient_search( + self.source, self.target, embedding=bad_embedding # type: ignore + ) + + def test_embedding_with_non_tuple_values_raises_value_error(self): + """Embedding values must be singleton tuples (chain format), not lists.""" + bad_embedding = {(0, 0, 0, 0, 0): [(0, 0, 0, 0, 0)]} # List, not tuple + with self.assertRaisesRegex( + ValueError, r"embedding values must be singleton tuples representing node chains" + ): + zephyr_quotient_search( + self.source, self.target, embedding=bad_embedding # type: ignore + ) + + def test_embedding_with_empty_chain_raises_value_error(self): + """Embedding chains must contain exactly one target node.""" + bad_embedding = {(0, 0, 0, 0, 0): ()} # Empty chain + with self.assertRaisesRegex( + ValueError, r"embedding values must be singleton tuples representing node chains" + ): + zephyr_quotient_search( + self.source, self.target, embedding=bad_embedding # type: ignore + ) + + def test_embedding_with_non_5tuple_in_chain_raises_value_error(self): + """Nodes in embedding chains must be 5-tuples.""" + bad_embedding = {(0, 0, 0, 0, 0): ((0, 0, 0, 0),)} # 4-tuple instead of 5-tuple + with self.assertRaisesRegex( + ValueError, r"embedding chains must contain 5-tuples" + ): + zephyr_quotient_search( + self.source, self.target, embedding=bad_embedding # type: ignore + ) + + def test_embedding_with_duplicate_target_nodes_raises_value_error(self): + """Embedding must be one-to-one: no duplicate target nodes across chains.""" + source_node1 = (0, 0, 0, 0, 0) + source_node2 = (0, 0, 1, 0, 0) + duplicate_target = (1, 1, 1, 1, 1) + bad_embedding = { + source_node1: (duplicate_target,), + source_node2: (duplicate_target,), # Duplicate target + } + with self.assertRaisesRegex( + ValueError, r"embedding must be a one-to-one mapping.*duplicate target nodes" + ): + zephyr_quotient_search( + self.source, self.target, embedding=bad_embedding # type: ignore + ) + + def test_valid_chain_embedding_is_accepted(self): + """Valid chain embedding with proper format should be accepted.""" + source = zephyr_graph(6, 2, coordinates=True) + target = zephyr_graph(6, 4, coordinates=True) + # Create a valid small chain embedding (identity mapping) + valid_embedding = {node: (node,) for i, node in enumerate(source.nodes()) if i < 10} + # Should not raise any errors + try: + zephyr_quotient_search(source, target, embedding=valid_embedding) + except (TypeError, ValueError) as e: + self.fail(f"Valid embedding raised unexpected error: {e}") + + +class TestLabelingSchemeErrors(unittest.TestCase): + """Tests for ValueError raised by _ensure_coordinate_source / _ensure_coordinate_target.""" + + def test_unknown_source_labels_raises_value_error(self): + source = zephyr_graph(6, 2, coordinates=True) + source.graph["labels"] = "custom_scheme" + target = zephyr_graph(6, 4, coordinates=True) + with self.assertRaisesRegex(ValueError, r"unknown labelling scheme"): + zephyr_quotient_search(source, target) + + def test_unknown_target_labels_raises_value_error(self): + source = zephyr_graph(6, 2, coordinates=True) + target = zephyr_graph(6, 4, coordinates=True) + target.graph["labels"] = "custom_scheme" + with self.assertRaisesRegex(ValueError, r"unknown labelling scheme"): + zephyr_quotient_search(source, target)