Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .devcontainer/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@ RUN groupadd -g ${USER_GID} ${USERNAME} && \

# This branch of flux-sched has the bindings we need!
RUN ln -s /usr/local/go/bin/go /usr/bin/go && \
git clone https://github.com/flux-framework/flux-sched /opt/flux-sched && \
export FLUX_SCHED_VERSION=0.53.0 && \
git clone -b implement-reapi-cli-update-allocate https://github.com/vsoch/flux-sched /opt/flux-sched && \
# git clone -b https://github.com/flux-framework/flux-sched /opt/flux-sched && \
cd /opt/flux-sched && \
export WITH_GO=yes && \
./configure --prefix=/usr && \
Expand Down
4 changes: 3 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ RUN wget -q https://go.dev/dl/go1.26.0.linux-amd64.tar.gz \
ENV PATH=$PATH:/usr/local/go/bin

# flux-sched (Fluxion) with the Go reapi bindings -> /usr; build tree at /opt/flux-sched
RUN git clone https://github.com/flux-framework/flux-sched /opt/flux-sched \
#RUN git clone https://github.com/flux-framework/flux-sched /opt/flux-sched \
RUN git clone -b implement-reapi-cli-update-allocate https://github.com/vsoch/flux-sched /opt/flux-sched \
&& export FLUX_SCHED_VERSION=0.53.0 \
&& cd /opt/flux-sched && export WITH_GO=yes && ./configure --prefix=/usr \
&& mkdir build && cd build && cmake ../ && cd ../ && make -j"$(nproc)" && make install
ENV FLUX_SCHED_ROOT=/opt/flux-sched
Expand Down
5 changes: 5 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ test-graph: ## Matcher tests (needs flux-sched)
CGO_ENABLED=1 CGO_CFLAGS="$(CGO_CFLAGS)" CGO_LDFLAGS="$(CGO_LDFLAGS)" \
go test ./pkg/graph/...

.PHONY: test-restore
test-restore:
CGO_ENABLED=1 CGO_CFLAGS="$(CGO_CFLAGS)" CGO_LDFLAGS="$(CGO_LDFLAGS)" \
go run ./cmd/recovery-probe -graph ./examples/test/cluster.jgf -spec ./examples/test/jobspec-cpu.yaml

.PHONY: image
image: ## Build the scheduler container image
docker build -t $(IMG) .
Expand Down
28 changes: 17 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,17 +111,11 @@ kubectl apply -f deploy/fluence.yaml

This installs the scheduler, its RBAC, and the mutating webhook. Pods opt in with
`schedulerName: fluence`; a multi-pod gang adds a `scheduling.k8s.io/pod-group`
label (a single pod is treated as a group of one and needs no label).

## Testing

### 1. Classical (a pod group)

The base install is enough. Schedule a gang:
label (a single pod is treated as a group of one and needs no label). Test with a pod group:

```bash
kubectl apply -f examples/podgroup.yaml
kubectl get pods -o wide
kubectl get pods -o wide
kubectl get events --field-selector reason=Scheduled
kubectl get podgroups.scheduling.k8s.io
```
Expand All @@ -130,7 +124,7 @@ NAME POLICY WORKLOAD STATUS AGE
training Gang <none> Scheduled 15s
```

And cleanup.
And a quick cleanup.

```bash
kubectl patch podgroup training -n default --type=merge -p '{"metadata":{"finalizers":null}}'
Expand Down Expand Up @@ -197,6 +191,7 @@ kubectl logs sampler -f
{"results": [{"data": {"c": {"samples": ["0x0", "0x1", "0x0", "0x0", "0x1", "0x1", "0x1", "0x1", "0x1", "0x0", "0x0", "0x0", "0x0", "0x1", "0x1", "0x1", "0x1", "0x1", "0x1", "0x1", "0x0", "0x1", "0x0", "0x0", "0x0", "0x1", "0x0", "0x1", "0x0", "0x1", "0x1", "0x0", "0x0", "0x0", "0x0", "0x1", "0x1", "0x1", "0x1", "0x1", "0x1", "0x1", "0x0", "0x0", "0x0", "0x1", "0x0", "0x1", "0x0", "0x0", "0x1", "0x1", "0x1", "0x0", "0x0", "0x0", "0x1", "0x1", "0x1", "0x0", "0x1", "0x1", "0x0", "0x1", "0x0", "0x1", "0x1", "0x1", "0x1", "0x0", "0x0", "0x1", "0x1", "0x1", "0x1", "0x1", "0x1", "0x1", "0x0", "0x0", "0x1", "0x0", "0x1", "0x1", "0x0", "0x1", "0x0", "0x1", "0x1", "0x1", "0x0", "0x0", "0x0", "0x1", "0x1", "0x1", "0x1", "0x0", "0x0", "0x1", "0x0", "0x0", "0x0", "0x0", "0x0", "0x0", "0x1", "0x1", "0x0", "0x1", "0x0", "0x0", "0x1", "0x1", "0x1", "0x1", "0x1", "0x0", "0x1", "0x0", "0x0", "0x0", "0x1", "0x1", "0x1", "0x0", "0x1", "0x1", "0x0", "0x0", "0x1", "0x0", "0x1", "0x0", "0x0", "0x1", "0x0", "0x0", "0x1", "0x1", "0x1", "0x0", "0x1", "0x1", "0x0", "0x0", "0x0", "0x1", "0x1", "0x1", "0x0", "0x1", "0x1", "0x0", "0x0", "0x1", "0x0", "0x1", "0x0", "0x0", "0x0", "0x0", "0x1", "0x1", "0x1", "0x1", "0x1", "0x1", "0x1", "0x0", "0x1", "0x1", "0x1", "0x0", "0x1", "0x1", "0x0", "0x0", "0x0", "0x0", "0x1", "0x0", "0x0", "0x0", "0x0", "0x1", "0x0", "0x0", "0x1", "0x1", "0x1", "0x0", "0x1", "0x0", "0x1", "0x0", "0x1", "0x1", "0x1", "0x1", "0x0", "0x0", "0x1", "0x1", "0x0", "0x1", "0x0", "0x0", "0x0", "0x1", "0x1", "0x1", "0x0", "0x0", "0x0", "0x1", "0x0", "0x0", "0x0", "0x1", "0x0", "0x1", "0x0", "0x0", "0x0", "0x1", "0x1", "0x0", "0x1", "0x0", "0x0", "0x0", "0x1", "0x0", "0x0", "0x1", "0x1", "0x0", "0x0", "0x0", "0x0", "0x0", "0x1", "0x1", "0x1", "0x0", "0x1", "0x1", "0x1", "0x1", "0x1", "0x1", "0x0", "0x0", "0x0", "0x0"], "num_bits": 1}}, "metadata": {"circuit_metadata": {}}}], "metadata": {"execution": {"execution_spans": [[{"date": "2026-06-06T19:04:43.221657"}, {"date": "2026-06-06T19:04:44.372421"}, {"0": [[256], [0, 1], [0, 256]]}]]}, "version": 2}}
2026/06/06 19:04:50 done: 2070 bytes from ibm_marrakesh
```

Boum! You will see in the fluence logs that when the pod completes, the fluxion job is cancelled, freeing the resources.

```bash
Expand All @@ -212,12 +207,23 @@ sampler 0/1 Completed 0 24s
### A note on deletion

When developing/debugging, a PodGroup (or its pods) can hang on delete because of
finalizers (the workload controller may not be running). Clear them with:
finalizers (the workload controller may not be running). Our plugin is designed to handle this,
and normally it just takes a little time to finish. If you are impatient, you can do:

```bash
kubectl patch podgroup training -n default --type=merge -p '{"metadata":{"finalizers":null}}'
```

### Restoring State

The plugin is designed to be able to come up and restore state, meaning we can read in existing groups
and repopulate the graph. We do that by way of annotating each group with the resources "R" that fluxion returns on a match,
and then in the case of a restart, we re-populate the graph using this metadata. Here is how to test that.

```bash
make test-restore
```

Importantly, submission is **not** done by the scheduler — the workload container holds the
user's credentials and submits via qrmi-go (job mode on the IBM open plan; see
fluxion-quantum for that story). Fluence only schedules and hands off the backend.
Expand All @@ -232,4 +238,4 @@ See [LICENSE](LICENSE), [COPYRIGHT](COPYRIGHT), and [NOTICE](NOTICE) for details

SPDX-License-Identifier: MIT

LLNL-CODE-842614
LLNL-CODE-842614
76 changes: 76 additions & 0 deletions cmd/recovery-probe/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Command recovery-probe verifies the restart-recovery primitive against a real
// flux-sched. It match-allocates a jobspec in one graph (capturing the rv1 R +
// jobid), then builds a SECOND, fresh graph from the same cluster file — exactly
// what a scheduler restart does, a graph with no allocation history — and
// replays R verbatim with UpdateAllocate under the same jobid. Finally it
// re-matches against the fresh graph; for an exclusive, singular resource a
// refusal proves the replay re-held it.
//
// Run in the devcontainer (links flux-sched via cgo). Throwaway diagnostic.
//
// go run ./cmd/recovery-probe -graph <cluster.jgf> -spec <jobspec.yaml>
package main

import (
"flag"
"fmt"
"os"

"github.com/converged-computing/fluence/pkg/graph"
)

func newGraph(graphFile string) *graph.FluxionGraph {
g := &graph.FluxionGraph{MatchFormat: "rv1"}
g.Init(graphFile, os.Getenv("FLUENCE_MATCH_POLICY"), "")
return g
}

func main() {
graphFile := flag.String("graph", "", "cluster resource graph (JGF)")
specFile := flag.String("spec", "", "jobspec to allocate (YAML or JSON)")
flag.Parse()
if *graphFile == "" || *specFile == "" {
fmt.Fprintln(os.Stderr, "usage: recovery-probe -graph <jgf> -spec <jobspec>")
os.Exit(2)
}

// --- Graph instance #1: the "pre-restart" scheduler. ---
fmt.Println("\n=== 1. instance #1: MatchAllocate (capture rv1 R + jobid) ===")
pre := newGraph(*graphFile)
req, err := pre.MatchAllocate(*specFile)
if err != nil {
fmt.Println("initial match failed (need a satisfiable spec):", err)
os.Exit(1)
}
jobid, R := req.Number, req.Allocation
fmt.Printf("captured jobid=%d, R=%d bytes (rv1)\n", jobid, len(R))

// --- Graph instance #2: the "post-restart" scheduler. Fresh context, fresh
// graph built from the same cluster file, NO allocation history. We never
// cancel jobid in instance #1; instance #2 simply never knew about it. ---
fmt.Println("\n=== 2. instance #2: fresh graph (simulate restart) ===")
post := newGraph(*graphFile)

// --- Replay the exact R under the same jobid into the fresh graph. ---
fmt.Println("\n=== 3. instance #2: UpdateAllocate (replay R) ===")
if err := post.UpdateAllocate(jobid, R); err != nil {
fmt.Println("UpdateAllocate failed:", err)
fmt.Println(">>> If this still fails on a FRESH graph, update_allocate likely")
fmt.Println(">>> requires the jobid to already exist (an update, not a create).")
fmt.Println(">>> Fallback: MatchAllocate an equivalent spec to mint a jobid, then")
fmt.Println(">>> UpdateAllocate that jobid to the persisted R (the real assignment).")
os.Exit(1)
}

// --- Prove the replay consumed the resource: re-match the same spec against
// the fresh graph. For an exclusive, singular resource this must be refused. ---
fmt.Println("\n=== 4. instance #2: re-MatchAllocate (refusal = replay held it) ===")
req2, err := post.MatchAllocate(*specFile)
if err != nil {
fmt.Println("PASS: second match refused — the replayed allocation is holding the resource.")
return
}
fmt.Printf("second match SUCCEEDED (jobid=%d) — interpret with your graph in mind:\n", req2.Number)
fmt.Println(" - spare capacity for this spec exists -> expected, not a failure;")
fmt.Println(" - resource is exclusive & singular -> replay did NOT consume it, investigate.")
}
23 changes: 23 additions & 0 deletions examples/test/cluster.jgf
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
{
"graph": {
"nodes": [
{"id": "0", "metadata": {"type": "cluster", "basename": "tiny", "name": "tiny0", "id": 0, "uniq_id": 0, "rank": -1, "exclusive": false, "unit": "", "size": 1, "paths": {"containment": "/tiny0"}}},
{"id": "1", "metadata": {"type": "rack", "basename": "rack", "name": "rack0", "id": 0, "uniq_id": 1, "rank": -1, "exclusive": false, "unit": "", "size": 1, "paths": {"containment": "/tiny0/rack0"}}},
{"id": "2", "metadata": {"type": "node", "basename": "node", "name": "node0", "id": 0, "uniq_id": 2, "rank": 0, "exclusive": false, "unit": "", "size": 1, "paths": {"containment": "/tiny0/rack0/node0"}}},
{"id": "3", "metadata": {"type": "socket", "basename": "socket", "name": "socket0", "id": 0, "uniq_id": 3, "rank": -1, "exclusive": false, "unit": "", "size": 1, "paths": {"containment": "/tiny0/rack0/node0/socket0"}}},
{"id": "4", "metadata": {"type": "core", "basename": "core", "name": "core0", "id": 0, "uniq_id": 4, "rank": -1, "exclusive": false, "unit": "", "size": 1, "paths": {"containment": "/tiny0/rack0/node0/socket0/core0"}}},
{"id": "5", "metadata": {"type": "core", "basename": "core", "name": "core1", "id": 1, "uniq_id": 5, "rank": -1, "exclusive": false, "unit": "", "size": 1, "paths": {"containment": "/tiny0/rack0/node0/socket0/core1"}}},
{"id": "6", "metadata": {"type": "qgateway", "basename": "qgateway", "name": "qgateway0", "id": 0, "uniq_id": 6, "rank": -1, "exclusive": false, "unit": "", "size": 1, "vendor": "ibm", "paths": {"containment": "/tiny0/qgateway0"}}},
{"id": "7", "metadata": {"type": "qpu", "basename": "qpu", "name": "ibm_test", "id": 0, "uniq_id": 7, "rank": -1, "exclusive": true, "unit": "", "size": 1, "vendor": "ibm", "qrmi_type": "qiskit-runtime-service", "num_qubits": 156, "topology": "heavy-hex", "paths": {"containment": "/tiny0/qgateway0/ibm_test"}}}
],
"edges": [
{"source": "0", "target": "1", "metadata": {"subsystem": "containment"}},
{"source": "1", "target": "2", "metadata": {"subsystem": "containment"}},
{"source": "2", "target": "3", "metadata": {"subsystem": "containment"}},
{"source": "3", "target": "4", "metadata": {"subsystem": "containment"}},
{"source": "3", "target": "5", "metadata": {"subsystem": "containment"}},
{"source": "0", "target": "6", "metadata": {"subsystem": "containment"}},
{"source": "6", "target": "7", "metadata": {"subsystem": "containment"}}
]
}
}
19 changes: 19 additions & 0 deletions examples/test/jobspec-cpu.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Classical spec (2 cores). Cores aren't exclusive and there are 2 of them, so
# this exercises the round-trip but step 4 may re-match against spare capacity;
# use qpu-jobspec.yaml for the unambiguous exclusive PASS signal.
version: 9999
resources:
- type: slot
count: 1
label: default
with:
- type: core
count: 2
attributes:
system:
duration: 3600
tasks:
- command: [""]
slot: default
count:
per_slot: 1
18 changes: 18 additions & 0 deletions examples/test/jobspec-qpu.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# One slot holding one exclusive qpu — the clean case for the probe: the graph
# has exactly one exclusive qpu, so after replay the re-match MUST be refused.
version: 9999
resources:
- type: slot
count: 1
label: default
with:
- type: qpu
count: 1
attributes:
system:
duration: 3600
tasks:
- command: [""]
slot: default
count:
per_slot: 1
6 changes: 5 additions & 1 deletion pkg/fluence/fluence.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,11 @@ func New(ctx context.Context, _ runtime.Object, h fwk.Handle) (fwk.Plugin, error
}
_ = tmp.Close()

matcher := &graph.FluxionGraph{MatchFormat: "jgf"}
// rv1 (full writer, with the scheduling key) is a superset of jgf: its
// scheduling key is the same JGF vertex subgraph we parse for placement, and
// it carries the execution view flux uses to replay an allocation on restart.
// This is the format we persist and feed back to UpdateAllocate for recovery.
matcher := &graph.FluxionGraph{MatchFormat: "rv1"}
matcher.Init(tmp.Name(), os.Getenv("FLUENCE_MATCH_POLICY"), "")

f := &Fluence{
Expand Down
19 changes: 19 additions & 0 deletions pkg/graph/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,25 @@ func (f *FluxionGraph) Cancel(jobid uint64) error {
return f.cli.Cancel(int64(jobid), true)
}

// UpdateAllocate replays a previously returned allocation R into the freshly
// built graph under the given jobid, marking exactly the vertices named in R as
// allocated — no re-matching, no guessing. This is how a scheduler restart
// restores prior state: persist (jobid, R) when the allocation is made, rebuild
// the graph (empty of allocations) on startup, then UpdateAllocate each
// surviving allocation. R must be the rv1 form (with the scheduling key); the
// jobid is preserved, so a later Cancel(jobid) still frees it.
func (f *FluxionGraph) UpdateAllocate(jobid uint64, R string) error {
fmt.Printf(" 🌀 UpdateAllocate (replay) jobid: %d\n", jobid)
at, overhead, rOut, err := f.cli.UpdateAllocate(int(jobid), R)
if err != nil {
return err
}
fmt.Printf(" Time at : %d\n", at)
fmt.Printf(" Overhead : %.6f seconds\n", overhead)
_ = rOut
return nil
}

// Satisfy determines if we can satisfy
func (f *FluxionGraph) Satisfy(specFile string) (bool, error) {
fmt.Printf(" 🌀 Request: %s\n", specFile)
Expand Down
57 changes: 43 additions & 14 deletions pkg/quantum/allocation.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,50 @@ import (
"fmt"
)

// allocation is the subset of a Fluxion allocation graph we need: the metadata
// type and name of each allocated vertex.
// graphVertices is the subset of a JGF graph we need: the type and name of each
// vertex. It appears at the top level of a "jgf" allocation, and under the
// "scheduling" key of an "rv1" allocation.
type graphVertices struct {
Nodes []struct {
Metadata struct {
Type string `json:"type"`
Name string `json:"name"`
} `json:"metadata"`
} `json:"nodes"`
}

// allocation captures the allocated vertices from a Fluxion allocation R in
// either match format:
// - "jgf": the vertices are at the top level under "graph".
// - "rv1": the vertices are under "scheduling.graph" (the rv1 scheduling key),
// alongside an "execution" view we don't need for name extraction.
//
// We persist and replay rv1 (it is a superset of jgf and is the format flux
// recommends for failure recovery), but accepting both keeps callers and older
// allocations working.
type allocation struct {
Graph struct {
Nodes []struct {
Metadata struct {
Type string `json:"type"`
Name string `json:"name"`
} `json:"metadata"`
} `json:"nodes"`
} `json:"graph"`
Graph graphVertices `json:"graph"`
Scheduling struct {
Graph graphVertices `json:"graph"`
} `json:"scheduling"`
}

// vertices returns the allocated vertices regardless of match format, preferring
// the rv1 scheduling key and falling back to a top-level jgf graph.
func (a allocation) vertices() []struct {
Metadata struct {
Type string `json:"type"`
Name string `json:"name"`
} `json:"metadata"`
} {
if len(a.Scheduling.Graph.Nodes) > 0 {
return a.Scheduling.Graph.Nodes
}
return a.Graph.Nodes
}

// BackendFromAllocation returns the name of the first vertex of vertexType
// (e.g. "qpu" or "node") in a Fluxion allocation graph.
// (e.g. "qpu" or "node") in a Fluxion allocation.
func BackendFromAllocation(alloc string, vertexType string) (string, error) {
names, err := NamesFromAllocation(alloc, vertexType)
if err != nil {
Expand All @@ -32,15 +61,15 @@ func BackendFromAllocation(alloc string, vertexType string) (string, error) {
}

// NamesFromAllocation returns the names of every vertex of vertexType in a
// Fluxion allocation graph. Used to map an allocation onto cluster node names
// (vertexType "node") for pod placement, or onto quantum backends ("qpu").
// Fluxion allocation (jgf or rv1). Used to map an allocation onto cluster node
// names (vertexType "node") for pod placement, or onto quantum backends ("qpu").
func NamesFromAllocation(alloc string, vertexType string) ([]string, error) {
var a allocation
if err := json.Unmarshal([]byte(alloc), &a); err != nil {
return nil, fmt.Errorf("parse allocation: %w", err)
}
var names []string
for _, n := range a.Graph.Nodes {
for _, n := range a.vertices() {
if n.Metadata.Type == vertexType {
names = append(names, n.Metadata.Name)
}
Expand Down
Loading
Loading