Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
313 changes: 224 additions & 89 deletions .github/workflows/collectivex-sweep.yml

Large diffs are not rendered by default.

13 changes: 13 additions & 0 deletions experimental/CollectiveX/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
__pycache__/
*.pyc
results/
unsupported/
.shards/
.cx_workloads/
/matrix_full.json

# Local plans and infrastructure inventory.
goal.md
notes.md
configs/platforms.yaml
private-infra.md
101 changes: 101 additions & 0 deletions experimental/CollectiveX/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
# CollectiveX

CollectiveX is an experimental MoE expert-parallel communication benchmark. It measures dispatch,
combine, and paired roundtrip latency across EP libraries and accelerator systems.

> Publication hold: historical schema 3-5 data is diagnostic. No current dataset is approved for
> rankings, recommendations, or regression baselines.

## v1 Execution Profile

Every scheduled case is BF16, normal mode, `layout-and-dispatch-v1`, backend-tuned resources, packed
placement, and `fixed-512-v1` sampling: 64 trials x 8 timed iterations with 32 synchronized full
roundtrip warmups before each measured component at every trial/point. Roundtrip is measured first,
and every backend uses the same phase-specific conditioning ramp and ascending point order. Routing is limited
to uniform and one Zipf sensitivity; EPLB is measured only
as the Zipf remedy. A stdlib integer counter produces byte-identical routing and gate weights.

The current matrix has 38 runnable allocation cells across H100, H200, B200, B300, GB200, GB300,
MI325X, and MI355X. It requests 360 cases / 840 token points: 228 runnable cases / 532 points and
132 explicit unsupported cases / 308 points. `sweep_matrix.py` materializes every token ladder and
rejects missing, stale, malformed, or altered shard controls. Workflow shards are emitted round-robin
by SKU so the bounded GHA matrix can use every available runner pool from its first scheduling cycle.

| Backend | Current scope |
|---|---|
| DeepEP V1 | Image-pinned `deep_ep.Buffer`: upstream v1.2.1 on x86 and the image's GB fork on arm64 |
| DeepEP V2 | PR #605 `ElasticBuffer` at an exact commit; isolated torch 2.10/NCCL 2.30.4 runtime |
| DeepEP Hybrid | Pinned `HybridEPBuffer` revision; NVLink/MNNVL scale-up domain |
| UCCL | Pinned 0.1.1 wheel and wrapper on Hopper; Blackwell is explicitly unsupported |
| NCCL/RCCL A2A | Portable rank-deduplicated payload plus expert/routing-metadata reference |
| MoRI | MI325X AsyncLL transport and MI355X intranode transport |

FlashInfer is outside v1 because its exercised EP path failed intermittently at runtime. It is not
misreported as a platform capability limitation and can return after a stable pinned path is proven.

DeepEP V2 means the `ElasticBuffer` implementation introduced by
[DeepEP PR #605](https://github.com/deepseek-ai/DeepEP/pull/605), not a newer legacy `Buffer` build.
The adapter builds the exact PR head in an isolated pinned environment and records the API, source,
loaded libraries, and JIT CUBIN identities. NVIDIA SKUs are schedulable from upstream architecture
support; each remains explicitly unvalidated until its GPU outcome passes the native correctness and
publication gates.

Removed v1 axes include cached-layout `[cl]`, runtime-visible `[rv]`, LL, FP8, quantized combine,
extra routing distributions, activation profiles, uneven allocation, placement permutations, model
envelopes, and scaling studies.

## Workflow And Artifacts

`.github/workflows/collectivex-sweep.yml` generates a public-SKU matrix, extracts a strict ignored
`.shards/<id>.json` control, executes one allocation per shard, privacy-checks result JSON, and uploads
raw GitHub artifacts. Raw producers are diagnostic-only; they cannot self-promote evidence.

Development publication uses one self-hosted persistent filesystem. GitHub artifacts are
transient input; Vercel storage, GCP, Neon, managed databases, and managed object stores are out of
scope. `publisher.py` ingests complete downloaded workflow artifacts, verifies or promotes explicit
bundle IDs, and writes the atomic content-addressed layout consumed by the frontend. It never runs on
GPU workers. The store contract and promotion gates are in [docs/methodology.md](docs/methodology.md).

## Runner Configuration

Runner-local Slurm and storage values use a strict per-SKU JSON document at
`$XDG_CONFIG_HOME/inferencex/collectivex.json` or `COLLECTIVEX_OPERATOR_CONFIG`. The mode-0600,
same-owner, non-symlink file is outside the checkout and never uploaded. Unknown runners, fields,
duplicate keys, endpoint literals, unsafe paths, and non-JSON input fail closed; configuration is
never evaluated as shell. GHA passes encrypted `COLLECTIVEX_OPERATOR_CONFIG_V1` content only to the
launcher, which validates it, exports the selected SKU's allowlisted values, and deletes the
temporary copy before allocation. Required JSON fields are:

| SKU | Variables |
|---|---|
| `h100-dgxc`, `b200-dgxc` | `partition`, `account`, `squash_dir` |
| `h200-dgxc` | `partition`, `squash_dir` |
| `b300` | `partition`, `account`, `squash_dir`, `stage_dir` |
| `gb200` | `partition`, `account`, ordered `storage_roots` |
| `gb300` | `partition`, `account`, `squash_dir`, `stage_dir`, `enroot_cache_path` |
| `mi325x`, `mi355x` | `partition`, `squash_dir` |

Before import, each Docker Hub tag is resolved with bounded registry requests and must match its
pinned digest; digest-qualified overrides are rejected. Enroot imports use a fixed filesystem epoch
and a versioned, registry-digest-bound cache key. Every mounted squash is freshly hashed. The
verified registry digest and local squash hash are both recorded. Image-provided DeepEP is checked
against exact wheel and installed-file fingerprints; source-built backends use pinned commits and
runtime-verified GPU targets.
Compute containers receive an explicit environment allowlist. Private host, address, device, NIC,
credential, workspace, and path data stays in encrypted config, ignored operator notes, or bounded
mode-0600 runner logs; it is never uploaded.

## Local Checks

```bash
uv run --with-requirements experimental/CollectiveX/requirements.txt \
python -m unittest discover experimental/CollectiveX/tests -p 'test_*.py'
uv run --with-requirements experimental/CollectiveX/requirements.txt \
python experimental/CollectiveX/sweep_matrix.py --backends all --out /tmp/cx-matrix.json >/dev/null
uv run --with-requirements experimental/CollectiveX/requirements.txt \
python experimental/CollectiveX/publisher.py --store-root "$COLLECTIVEX_STORE_ROOT" verify
bash -n experimental/CollectiveX/runtime/*.sh experimental/CollectiveX/launchers/*.sh
```

Core paths are `capability.py`, `configs/`, `contracts.py`, `schemas/`, `sweep_matrix.py`,
`publisher.py`, `runtime/`, `launchers/`, and `tests/`.
175 changes: 175 additions & 0 deletions experimental/CollectiveX/artifact_safety.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
#!/usr/bin/env python3
"""Fail-closed privacy check for CollectiveX public result documents."""
from __future__ import annotations

import argparse
import ipaddress
import json
import os
import re


SENSITIVE_FIELDS = frozenset({
"environment", "env", "host", "hostname", "uuid", "gpu_uuid", "device_uuid",
"pci_bus_id", "ip_address", "ip_addresses", "master_addr", "ssh", "ssh_target",
"nodelist", "node_list", "nic_guid", "ib_guid", "topology_matrix", "rdma_devices",
"user", "username", "password", "passwd", "secret", "token", "access_token",
"api_token", "auth_token", "api_key", "private_key", "credential", "credentials",
"address", "addresses", "ip", "ips",
})
SENSITIVE_FIELD_SUFFIXES = (
"_host", "_hostname", "_address", "_addresses", "_path", "_paths", "_ip", "_ips",
"_password", "_passwd", "_secret", "_token", "_credential", "_credentials",
)
SENSITIVE_VALUE_PATTERNS = (
("private-path", re.compile(
r"(?<![A-Za-z0-9_.-])/(?:home|mnt|workspace|root|users|tmp|data|it-share|lustre|raid|nvme_home)(?:/|$)",
re.I,
)),
("ipv4-address", re.compile(r"(?<!\d)(?:\d{1,3}\.){3}\d{1,3}(?!\d)")),
("pci-address", re.compile(r"\b[0-9a-f]{4}:[0-9a-f]{2}:[0-9a-f]{2}\.[0-7]\b", re.I)),
("uuid", re.compile(
r"\b(?:GPU-|MIG-)?[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\b",
re.I,
)),
("ssh-target", re.compile(r"(?:ssh://|\bssh\s+[^\s/@]+@[^\s/]+)", re.I)),
("host-identifier", re.compile(
r"\b(?:host(?:name)?|master[_-]?(?:addr|address)|node[_-]?list)\s*(?:=|:)\s*[^\s,;]+",
re.I,
)),
("private-hostname", re.compile(
r"\b(?:[a-z0-9-]+\.)+(?:cluster|corp|internal|lan|local)\b|"
r"\b(?:compute|gpu|head|login|node|worker)[-_]?[0-9][a-z0-9_.-]*\b|"
r"\bdgx-[a-z0-9-]+-[0-9]+\b|\bip-(?:[0-9]{1,3}-){3}[0-9]{1,3}\b",
re.I,
)),
("secret-token", re.compile(
r"(?:gh[pousr]_[A-Za-z0-9]{20,}|github_pat_[A-Za-z0-9_]{20,}|"
r"glpat-[A-Za-z0-9_-]{20,}|xox[baprs]-[A-Za-z0-9-]{20,}|"
r"(?:AKIA|ASIA)[0-9A-Z]{16}|AIza[0-9A-Za-z_-]{35}|"
r"(?:sk-(?:proj|svcacct)-[A-Za-z0-9_-]{20,}|sk-[A-Za-z0-9]{32,}|"
r"sk_(?:live|test)_[A-Za-z0-9]{20,}|hf_[A-Za-z0-9]{20,})|"
r"npm_[A-Za-z0-9]{20,}|"
r"pypi-[A-Za-z0-9_-]{20,}|dckr_pat_[A-Za-z0-9_-]{20,}|"
r"Bearer\s+[A-Za-z0-9._~+/-]{16,}|Basic\s+[A-Za-z0-9+/=]{16,}|"
r"eyJ[A-Za-z0-9_-]{8,}\.[A-Za-z0-9_-]{8,}\.[A-Za-z0-9_-]{8,}|"
r"-----BEGIN(?: [A-Z]+)? PRIVATE KEY-----)",
re.I,
)),
("secret-assignment", re.compile(
r"\b(?:api[_-]?key|access[_-]?token|auth[_-]?token|client[_-]?secret|"
r"password|passwd|secret|accountkey)\s*(?:=|:)\s*[\"']?"
r"[A-Za-z0-9+/_=.~-]{8,}",
re.I,
)),
)
IPV6_CANDIDATE = re.compile(
r"(?<![0-9A-Za-z])\[?([0-9A-Fa-f:]{2,}(?:%[0-9A-Za-z_.-]+)?)\]?"
)
CONTEXTUAL_VALUE_RULES = frozenset({"ssh-target", "host-identifier", "private-hostname"})


class ArtifactSafetyError(ValueError):
"""A document contains data that cannot cross the public boundary."""


def _normalized_field(value: object) -> str:
normalized = re.sub(r"(?<!^)(?=[A-Z])", "_", str(value).strip())
return normalized.lower().replace("-", "_")


def _sensitive_value_rule(value: str, *, contextual: bool = True) -> str | None:
matched = next(
(
name for name, pattern in SENSITIVE_VALUE_PATTERNS
if (contextual or name not in CONTEXTUAL_VALUE_RULES) and pattern.search(value)
),
None,
)
if matched:
return matched
for candidate in IPV6_CANDIDATE.findall(value):
try:
address = candidate.split("%", 1)[0]
if ipaddress.ip_address(address).version == 6:
return "ipv6-address"
except ValueError:
continue
return None


def assert_publication_safe(docs: list[dict]) -> None:
"""Reject private infrastructure fields and value shapes."""
def walk(value, doc_index: int, parent_field: str | None = None) -> None:
if isinstance(value, dict):
for key, child in value.items():
field = _normalized_field(key)
if field in SENSITIVE_FIELDS or field.endswith(SENSITIVE_FIELD_SUFFIXES):
raise ArtifactSafetyError(
f"artifact safety: doc[{doc_index}] contains forbidden private field"
)
key_rule = _sensitive_value_rule(str(key))
if key_rule:
raise ArtifactSafetyError(
f"artifact safety: doc[{doc_index}] contains forbidden {key_rule} key"
)
walk(child, doc_index, field)
elif isinstance(value, list):
for child in value:
walk(child, doc_index, parent_field)
elif isinstance(value, str):
rule = _sensitive_value_rule(value, contextual=parent_field != "ref")
if rule:
raise ArtifactSafetyError(
f"artifact safety: doc[{doc_index}] contains forbidden {rule} value"
)

for index, doc in enumerate(docs):
if not isinstance(doc, dict):
raise ArtifactSafetyError(f"artifact safety: doc[{index}] is not a JSON object")
walk(doc, index)


def load_documents(paths: list[str]) -> list[dict]:
docs: list[dict] = []
for path in paths:
if not os.path.isfile(path):
raise ArtifactSafetyError("artifact safety: result file is unavailable")
try:
with open(path) as fh:
if path.endswith(".ndjson"):
for line_number, line in enumerate(fh, 1):
if not line.strip():
continue
try:
docs.append(json.loads(line))
except json.JSONDecodeError as exc:
raise ArtifactSafetyError(
f"artifact safety: malformed NDJSON at input line {line_number}"
) from exc
else:
docs.append(json.load(fh))
except json.JSONDecodeError as exc:
raise ArtifactSafetyError("artifact safety: malformed JSON input") from exc
except (OSError, UnicodeError) as exc:
raise ArtifactSafetyError("artifact safety: result file is unreadable") from exc
if not docs:
raise ArtifactSafetyError("artifact safety: no public result documents found")
return docs


def main() -> int:
parser = argparse.ArgumentParser(description="Check CollectiveX result artifacts for private data")
parser.add_argument("paths", nargs="+")
args = parser.parse_args()
try:
docs = load_documents(args.paths)
assert_publication_safe(docs)
except ArtifactSafetyError as exc:
parser.error(str(exc))
print(f"artifact safety: {len(docs)} public document(s) passed")
return 0


if __name__ == "__main__":
raise SystemExit(main())
Loading