diff --git a/.devcontainer/Dockerfile b/.devcontainer/Dockerfile index 47baac1..0f730db 100644 --- a/.devcontainer/Dockerfile +++ b/.devcontainer/Dockerfile @@ -75,7 +75,9 @@ RUN groupadd -g ${USER_GID} ${USERNAME} && \ # This branch of flux-sched has the bindings we need! RUN ln -s /usr/local/go/bin/go /usr/bin/go && \ - git clone https://github.com/flux-framework/flux-sched /opt/flux-sched && \ + export FLUX_SCHED_VERSION=0.53.0 && \ + git clone -b implement-reapi-cli-update-allocate https://github.com/vsoch/flux-sched /opt/flux-sched && \ + # git clone -b https://github.com/flux-framework/flux-sched /opt/flux-sched && \ cd /opt/flux-sched && \ export WITH_GO=yes && \ ./configure --prefix=/usr && \ diff --git a/Dockerfile b/Dockerfile index 4dba6f6..ceed5da 100644 --- a/Dockerfile +++ b/Dockerfile @@ -20,7 +20,9 @@ RUN wget -q https://go.dev/dl/go1.26.0.linux-amd64.tar.gz \ ENV PATH=$PATH:/usr/local/go/bin # flux-sched (Fluxion) with the Go reapi bindings -> /usr; build tree at /opt/flux-sched -RUN git clone https://github.com/flux-framework/flux-sched /opt/flux-sched \ +#RUN git clone https://github.com/flux-framework/flux-sched /opt/flux-sched \ +RUN git clone -b implement-reapi-cli-update-allocate https://github.com/vsoch/flux-sched /opt/flux-sched \ + && export FLUX_SCHED_VERSION=0.53.0 \ && cd /opt/flux-sched && export WITH_GO=yes && ./configure --prefix=/usr \ && mkdir build && cd build && cmake ../ && cd ../ && make -j"$(nproc)" && make install ENV FLUX_SCHED_ROOT=/opt/flux-sched diff --git a/Makefile b/Makefile index 5d9328a..5ea76a7 100644 --- a/Makefile +++ b/Makefile @@ -34,6 +34,11 @@ test-graph: ## Matcher tests (needs flux-sched) CGO_ENABLED=1 CGO_CFLAGS="$(CGO_CFLAGS)" CGO_LDFLAGS="$(CGO_LDFLAGS)" \ go test ./pkg/graph/... +.PHONY: test-restore +test-restore: + CGO_ENABLED=1 CGO_CFLAGS="$(CGO_CFLAGS)" CGO_LDFLAGS="$(CGO_LDFLAGS)" \ + go run ./cmd/recovery-probe -graph ./examples/test/cluster.jgf -spec ./examples/test/jobspec-cpu.yaml + .PHONY: image image: ## Build the scheduler container image docker build -t $(IMG) . diff --git a/README.md b/README.md index ba2c24f..18b8be7 100644 --- a/README.md +++ b/README.md @@ -111,17 +111,11 @@ kubectl apply -f deploy/fluence.yaml This installs the scheduler, its RBAC, and the mutating webhook. Pods opt in with `schedulerName: fluence`; a multi-pod gang adds a `scheduling.k8s.io/pod-group` -label (a single pod is treated as a group of one and needs no label). - -## Testing - -### 1. Classical (a pod group) - -The base install is enough. Schedule a gang: +label (a single pod is treated as a group of one and needs no label). Test with a pod group: ```bash kubectl apply -f examples/podgroup.yaml -kubectl get pods -o wide +kubectl get pods -o wide kubectl get events --field-selector reason=Scheduled kubectl get podgroups.scheduling.k8s.io ``` @@ -130,7 +124,7 @@ NAME POLICY WORKLOAD STATUS AGE training Gang Scheduled 15s ``` -And cleanup. +And a quick cleanup. ```bash kubectl patch podgroup training -n default --type=merge -p '{"metadata":{"finalizers":null}}' @@ -197,6 +191,7 @@ kubectl logs sampler -f {"results": [{"data": {"c": {"samples": ["0x0", "0x1", "0x0", "0x0", "0x1", "0x1", "0x1", "0x1", "0x1", "0x0", "0x0", "0x0", "0x0", "0x1", "0x1", "0x1", "0x1", "0x1", "0x1", "0x1", "0x0", "0x1", "0x0", "0x0", "0x0", "0x1", "0x0", "0x1", "0x0", "0x1", "0x1", "0x0", "0x0", "0x0", "0x0", "0x1", "0x1", "0x1", "0x1", "0x1", "0x1", "0x1", "0x0", "0x0", "0x0", "0x1", "0x0", "0x1", "0x0", "0x0", "0x1", "0x1", "0x1", "0x0", "0x0", "0x0", "0x1", "0x1", "0x1", "0x0", "0x1", "0x1", "0x0", "0x1", "0x0", "0x1", "0x1", "0x1", "0x1", "0x0", "0x0", "0x1", "0x1", "0x1", "0x1", "0x1", "0x1", "0x1", "0x0", "0x0", "0x1", "0x0", "0x1", "0x1", "0x0", "0x1", "0x0", "0x1", "0x1", "0x1", "0x0", "0x0", "0x0", "0x1", "0x1", "0x1", "0x1", "0x0", "0x0", "0x1", "0x0", "0x0", "0x0", "0x0", "0x0", "0x0", "0x1", "0x1", "0x0", "0x1", "0x0", "0x0", "0x1", "0x1", "0x1", "0x1", "0x1", "0x0", "0x1", "0x0", "0x0", "0x0", "0x1", "0x1", "0x1", "0x0", "0x1", "0x1", "0x0", "0x0", "0x1", "0x0", "0x1", "0x0", "0x0", "0x1", "0x0", "0x0", "0x1", "0x1", "0x1", "0x0", "0x1", "0x1", "0x0", "0x0", "0x0", "0x1", "0x1", "0x1", "0x0", "0x1", "0x1", "0x0", "0x0", "0x1", "0x0", "0x1", "0x0", "0x0", "0x0", "0x0", "0x1", "0x1", "0x1", "0x1", "0x1", "0x1", "0x1", "0x0", "0x1", "0x1", "0x1", "0x0", "0x1", "0x1", "0x0", "0x0", "0x0", "0x0", "0x1", "0x0", "0x0", "0x0", "0x0", "0x1", "0x0", "0x0", "0x1", "0x1", "0x1", "0x0", "0x1", "0x0", "0x1", "0x0", "0x1", "0x1", "0x1", "0x1", "0x0", "0x0", "0x1", "0x1", "0x0", "0x1", "0x0", "0x0", "0x0", "0x1", "0x1", "0x1", "0x0", "0x0", "0x0", "0x1", "0x0", "0x0", "0x0", "0x1", "0x0", "0x1", "0x0", "0x0", "0x0", "0x1", "0x1", "0x0", "0x1", "0x0", "0x0", "0x0", "0x1", "0x0", "0x0", "0x1", "0x1", "0x0", "0x0", "0x0", "0x0", "0x0", "0x1", "0x1", "0x1", "0x0", "0x1", "0x1", "0x1", "0x1", "0x1", "0x1", "0x0", "0x0", "0x0", "0x0"], "num_bits": 1}}, "metadata": {"circuit_metadata": {}}}], "metadata": {"execution": {"execution_spans": [[{"date": "2026-06-06T19:04:43.221657"}, {"date": "2026-06-06T19:04:44.372421"}, {"0": [[256], [0, 1], [0, 256]]}]]}, "version": 2}} 2026/06/06 19:04:50 done: 2070 bytes from ibm_marrakesh ``` + Boum! You will see in the fluence logs that when the pod completes, the fluxion job is cancelled, freeing the resources. ```bash @@ -212,12 +207,23 @@ sampler 0/1 Completed 0 24s ### A note on deletion When developing/debugging, a PodGroup (or its pods) can hang on delete because of -finalizers (the workload controller may not be running). Clear them with: +finalizers (the workload controller may not be running). Our plugin is designed to handle this, +and normally it just takes a little time to finish. If you are impatient, you can do: ```bash kubectl patch podgroup training -n default --type=merge -p '{"metadata":{"finalizers":null}}' ``` +### Restoring State + +The plugin is designed to be able to come up and restore state, meaning we can read in existing groups +and repopulate the graph. We do that by way of annotating each group with the resources "R" that fluxion returns on a match, +and then in the case of a restart, we re-populate the graph using this metadata. Here is how to test that. + +```bash +make test-restore +``` + Importantly, submission is **not** done by the scheduler — the workload container holds the user's credentials and submits via qrmi-go (job mode on the IBM open plan; see fluxion-quantum for that story). Fluence only schedules and hands off the backend. @@ -232,4 +238,4 @@ See [LICENSE](LICENSE), [COPYRIGHT](COPYRIGHT), and [NOTICE](NOTICE) for details SPDX-License-Identifier: MIT -LLNL-CODE-842614 +LLNL-CODE-842614 \ No newline at end of file diff --git a/cmd/recovery-probe/main.go b/cmd/recovery-probe/main.go new file mode 100644 index 0000000..4d41bf0 --- /dev/null +++ b/cmd/recovery-probe/main.go @@ -0,0 +1,76 @@ +// Command recovery-probe verifies the restart-recovery primitive against a real +// flux-sched. It match-allocates a jobspec in one graph (capturing the rv1 R + +// jobid), then builds a SECOND, fresh graph from the same cluster file — exactly +// what a scheduler restart does, a graph with no allocation history — and +// replays R verbatim with UpdateAllocate under the same jobid. Finally it +// re-matches against the fresh graph; for an exclusive, singular resource a +// refusal proves the replay re-held it. +// +// Run in the devcontainer (links flux-sched via cgo). Throwaway diagnostic. +// +// go run ./cmd/recovery-probe -graph -spec +package main + +import ( + "flag" + "fmt" + "os" + + "github.com/converged-computing/fluence/pkg/graph" +) + +func newGraph(graphFile string) *graph.FluxionGraph { + g := &graph.FluxionGraph{MatchFormat: "rv1"} + g.Init(graphFile, os.Getenv("FLUENCE_MATCH_POLICY"), "") + return g +} + +func main() { + graphFile := flag.String("graph", "", "cluster resource graph (JGF)") + specFile := flag.String("spec", "", "jobspec to allocate (YAML or JSON)") + flag.Parse() + if *graphFile == "" || *specFile == "" { + fmt.Fprintln(os.Stderr, "usage: recovery-probe -graph -spec ") + os.Exit(2) + } + + // --- Graph instance #1: the "pre-restart" scheduler. --- + fmt.Println("\n=== 1. instance #1: MatchAllocate (capture rv1 R + jobid) ===") + pre := newGraph(*graphFile) + req, err := pre.MatchAllocate(*specFile) + if err != nil { + fmt.Println("initial match failed (need a satisfiable spec):", err) + os.Exit(1) + } + jobid, R := req.Number, req.Allocation + fmt.Printf("captured jobid=%d, R=%d bytes (rv1)\n", jobid, len(R)) + + // --- Graph instance #2: the "post-restart" scheduler. Fresh context, fresh + // graph built from the same cluster file, NO allocation history. We never + // cancel jobid in instance #1; instance #2 simply never knew about it. --- + fmt.Println("\n=== 2. instance #2: fresh graph (simulate restart) ===") + post := newGraph(*graphFile) + + // --- Replay the exact R under the same jobid into the fresh graph. --- + fmt.Println("\n=== 3. instance #2: UpdateAllocate (replay R) ===") + if err := post.UpdateAllocate(jobid, R); err != nil { + fmt.Println("UpdateAllocate failed:", err) + fmt.Println(">>> If this still fails on a FRESH graph, update_allocate likely") + fmt.Println(">>> requires the jobid to already exist (an update, not a create).") + fmt.Println(">>> Fallback: MatchAllocate an equivalent spec to mint a jobid, then") + fmt.Println(">>> UpdateAllocate that jobid to the persisted R (the real assignment).") + os.Exit(1) + } + + // --- Prove the replay consumed the resource: re-match the same spec against + // the fresh graph. For an exclusive, singular resource this must be refused. --- + fmt.Println("\n=== 4. instance #2: re-MatchAllocate (refusal = replay held it) ===") + req2, err := post.MatchAllocate(*specFile) + if err != nil { + fmt.Println("PASS: second match refused — the replayed allocation is holding the resource.") + return + } + fmt.Printf("second match SUCCEEDED (jobid=%d) — interpret with your graph in mind:\n", req2.Number) + fmt.Println(" - spare capacity for this spec exists -> expected, not a failure;") + fmt.Println(" - resource is exclusive & singular -> replay did NOT consume it, investigate.") +} diff --git a/examples/test/cluster.jgf b/examples/test/cluster.jgf new file mode 100644 index 0000000..f82b1e9 --- /dev/null +++ b/examples/test/cluster.jgf @@ -0,0 +1,23 @@ +{ + "graph": { + "nodes": [ + {"id": "0", "metadata": {"type": "cluster", "basename": "tiny", "name": "tiny0", "id": 0, "uniq_id": 0, "rank": -1, "exclusive": false, "unit": "", "size": 1, "paths": {"containment": "/tiny0"}}}, + {"id": "1", "metadata": {"type": "rack", "basename": "rack", "name": "rack0", "id": 0, "uniq_id": 1, "rank": -1, "exclusive": false, "unit": "", "size": 1, "paths": {"containment": "/tiny0/rack0"}}}, + {"id": "2", "metadata": {"type": "node", "basename": "node", "name": "node0", "id": 0, "uniq_id": 2, "rank": 0, "exclusive": false, "unit": "", "size": 1, "paths": {"containment": "/tiny0/rack0/node0"}}}, + {"id": "3", "metadata": {"type": "socket", "basename": "socket", "name": "socket0", "id": 0, "uniq_id": 3, "rank": -1, "exclusive": false, "unit": "", "size": 1, "paths": {"containment": "/tiny0/rack0/node0/socket0"}}}, + {"id": "4", "metadata": {"type": "core", "basename": "core", "name": "core0", "id": 0, "uniq_id": 4, "rank": -1, "exclusive": false, "unit": "", "size": 1, "paths": {"containment": "/tiny0/rack0/node0/socket0/core0"}}}, + {"id": "5", "metadata": {"type": "core", "basename": "core", "name": "core1", "id": 1, "uniq_id": 5, "rank": -1, "exclusive": false, "unit": "", "size": 1, "paths": {"containment": "/tiny0/rack0/node0/socket0/core1"}}}, + {"id": "6", "metadata": {"type": "qgateway", "basename": "qgateway", "name": "qgateway0", "id": 0, "uniq_id": 6, "rank": -1, "exclusive": false, "unit": "", "size": 1, "vendor": "ibm", "paths": {"containment": "/tiny0/qgateway0"}}}, + {"id": "7", "metadata": {"type": "qpu", "basename": "qpu", "name": "ibm_test", "id": 0, "uniq_id": 7, "rank": -1, "exclusive": true, "unit": "", "size": 1, "vendor": "ibm", "qrmi_type": "qiskit-runtime-service", "num_qubits": 156, "topology": "heavy-hex", "paths": {"containment": "/tiny0/qgateway0/ibm_test"}}} + ], + "edges": [ + {"source": "0", "target": "1", "metadata": {"subsystem": "containment"}}, + {"source": "1", "target": "2", "metadata": {"subsystem": "containment"}}, + {"source": "2", "target": "3", "metadata": {"subsystem": "containment"}}, + {"source": "3", "target": "4", "metadata": {"subsystem": "containment"}}, + {"source": "3", "target": "5", "metadata": {"subsystem": "containment"}}, + {"source": "0", "target": "6", "metadata": {"subsystem": "containment"}}, + {"source": "6", "target": "7", "metadata": {"subsystem": "containment"}} + ] + } +} diff --git a/examples/test/jobspec-cpu.yaml b/examples/test/jobspec-cpu.yaml new file mode 100644 index 0000000..5677c33 --- /dev/null +++ b/examples/test/jobspec-cpu.yaml @@ -0,0 +1,19 @@ +# Classical spec (2 cores). Cores aren't exclusive and there are 2 of them, so +# this exercises the round-trip but step 4 may re-match against spare capacity; +# use qpu-jobspec.yaml for the unambiguous exclusive PASS signal. +version: 9999 +resources: + - type: slot + count: 1 + label: default + with: + - type: core + count: 2 +attributes: + system: + duration: 3600 +tasks: + - command: [""] + slot: default + count: + per_slot: 1 \ No newline at end of file diff --git a/examples/test/jobspec-qpu.yaml b/examples/test/jobspec-qpu.yaml new file mode 100644 index 0000000..5802579 --- /dev/null +++ b/examples/test/jobspec-qpu.yaml @@ -0,0 +1,18 @@ +# One slot holding one exclusive qpu — the clean case for the probe: the graph +# has exactly one exclusive qpu, so after replay the re-match MUST be refused. +version: 9999 +resources: + - type: slot + count: 1 + label: default + with: + - type: qpu + count: 1 +attributes: + system: + duration: 3600 +tasks: + - command: [""] + slot: default + count: + per_slot: 1 \ No newline at end of file diff --git a/pkg/fluence/fluence.go b/pkg/fluence/fluence.go index 50bd82b..58d069d 100644 --- a/pkg/fluence/fluence.go +++ b/pkg/fluence/fluence.go @@ -117,7 +117,11 @@ func New(ctx context.Context, _ runtime.Object, h fwk.Handle) (fwk.Plugin, error } _ = tmp.Close() - matcher := &graph.FluxionGraph{MatchFormat: "jgf"} + // rv1 (full writer, with the scheduling key) is a superset of jgf: its + // scheduling key is the same JGF vertex subgraph we parse for placement, and + // it carries the execution view flux uses to replay an allocation on restart. + // This is the format we persist and feed back to UpdateAllocate for recovery. + matcher := &graph.FluxionGraph{MatchFormat: "rv1"} matcher.Init(tmp.Name(), os.Getenv("FLUENCE_MATCH_POLICY"), "") f := &Fluence{ diff --git a/pkg/graph/graph.go b/pkg/graph/graph.go index 29490dd..5b7ab10 100644 --- a/pkg/graph/graph.go +++ b/pkg/graph/graph.go @@ -117,6 +117,25 @@ func (f *FluxionGraph) Cancel(jobid uint64) error { return f.cli.Cancel(int64(jobid), true) } +// UpdateAllocate replays a previously returned allocation R into the freshly +// built graph under the given jobid, marking exactly the vertices named in R as +// allocated — no re-matching, no guessing. This is how a scheduler restart +// restores prior state: persist (jobid, R) when the allocation is made, rebuild +// the graph (empty of allocations) on startup, then UpdateAllocate each +// surviving allocation. R must be the rv1 form (with the scheduling key); the +// jobid is preserved, so a later Cancel(jobid) still frees it. +func (f *FluxionGraph) UpdateAllocate(jobid uint64, R string) error { + fmt.Printf(" 🌀 UpdateAllocate (replay) jobid: %d\n", jobid) + at, overhead, rOut, err := f.cli.UpdateAllocate(int(jobid), R) + if err != nil { + return err + } + fmt.Printf(" Time at : %d\n", at) + fmt.Printf(" Overhead : %.6f seconds\n", overhead) + _ = rOut + return nil +} + // Satisfy determines if we can satisfy func (f *FluxionGraph) Satisfy(specFile string) (bool, error) { fmt.Printf(" 🌀 Request: %s\n", specFile) diff --git a/pkg/quantum/allocation.go b/pkg/quantum/allocation.go index 864c79b..bb9bfdd 100644 --- a/pkg/quantum/allocation.go +++ b/pkg/quantum/allocation.go @@ -5,21 +5,50 @@ import ( "fmt" ) -// allocation is the subset of a Fluxion allocation graph we need: the metadata -// type and name of each allocated vertex. +// graphVertices is the subset of a JGF graph we need: the type and name of each +// vertex. It appears at the top level of a "jgf" allocation, and under the +// "scheduling" key of an "rv1" allocation. +type graphVertices struct { + Nodes []struct { + Metadata struct { + Type string `json:"type"` + Name string `json:"name"` + } `json:"metadata"` + } `json:"nodes"` +} + +// allocation captures the allocated vertices from a Fluxion allocation R in +// either match format: +// - "jgf": the vertices are at the top level under "graph". +// - "rv1": the vertices are under "scheduling.graph" (the rv1 scheduling key), +// alongside an "execution" view we don't need for name extraction. +// +// We persist and replay rv1 (it is a superset of jgf and is the format flux +// recommends for failure recovery), but accepting both keeps callers and older +// allocations working. type allocation struct { - Graph struct { - Nodes []struct { - Metadata struct { - Type string `json:"type"` - Name string `json:"name"` - } `json:"metadata"` - } `json:"nodes"` - } `json:"graph"` + Graph graphVertices `json:"graph"` + Scheduling struct { + Graph graphVertices `json:"graph"` + } `json:"scheduling"` +} + +// vertices returns the allocated vertices regardless of match format, preferring +// the rv1 scheduling key and falling back to a top-level jgf graph. +func (a allocation) vertices() []struct { + Metadata struct { + Type string `json:"type"` + Name string `json:"name"` + } `json:"metadata"` +} { + if len(a.Scheduling.Graph.Nodes) > 0 { + return a.Scheduling.Graph.Nodes + } + return a.Graph.Nodes } // BackendFromAllocation returns the name of the first vertex of vertexType -// (e.g. "qpu" or "node") in a Fluxion allocation graph. +// (e.g. "qpu" or "node") in a Fluxion allocation. func BackendFromAllocation(alloc string, vertexType string) (string, error) { names, err := NamesFromAllocation(alloc, vertexType) if err != nil { @@ -32,15 +61,15 @@ func BackendFromAllocation(alloc string, vertexType string) (string, error) { } // NamesFromAllocation returns the names of every vertex of vertexType in a -// Fluxion allocation graph. Used to map an allocation onto cluster node names -// (vertexType "node") for pod placement, or onto quantum backends ("qpu"). +// Fluxion allocation (jgf or rv1). Used to map an allocation onto cluster node +// names (vertexType "node") for pod placement, or onto quantum backends ("qpu"). func NamesFromAllocation(alloc string, vertexType string) ([]string, error) { var a allocation if err := json.Unmarshal([]byte(alloc), &a); err != nil { return nil, fmt.Errorf("parse allocation: %w", err) } var names []string - for _, n := range a.Graph.Nodes { + for _, n := range a.vertices() { if n.Metadata.Type == vertexType { names = append(names, n.Metadata.Name) } diff --git a/pkg/quantum/allocation_test.go b/pkg/quantum/allocation_test.go new file mode 100644 index 0000000..f71abb0 --- /dev/null +++ b/pkg/quantum/allocation_test.go @@ -0,0 +1,39 @@ +package quantum + +import "testing" + +// jgf match format: vertices at top-level graph. +const jgfAlloc = `{"graph":{"nodes":[ + {"metadata":{"type":"node","name":"kind-worker"}}, + {"metadata":{"type":"core","name":"core0"}}, + {"metadata":{"type":"qpu","name":"ibm_marrakesh"}}]}}` + +// rv1 match format: execution view + scheduling key holding the same vertices. +const rv1Alloc = `{"version":1, + "execution":{"R_lite":[{"rank":"0","children":{"core":"0"}}],"nodelist":["kind-worker"]}, + "scheduling":{"graph":{"nodes":[ + {"metadata":{"type":"node","name":"kind-worker"}}, + {"metadata":{"type":"core","name":"core0"}}, + {"metadata":{"type":"qpu","name":"ibm_marrakesh"}}]}}}` + +func TestNamesFromAllocationJGF(t *testing.T) { + nodes, err := NamesFromAllocation(jgfAlloc, "node") + if err != nil || len(nodes) != 1 || nodes[0] != "kind-worker" { + t.Fatalf("jgf node parse: %v %v", nodes, err) + } + be, err := BackendFromAllocation(jgfAlloc, "qpu") + if err != nil || be != "ibm_marrakesh" { + t.Fatalf("jgf qpu parse: %q %v", be, err) + } +} + +func TestNamesFromAllocationRV1(t *testing.T) { + nodes, err := NamesFromAllocation(rv1Alloc, "node") + if err != nil || len(nodes) != 1 || nodes[0] != "kind-worker" { + t.Fatalf("rv1 node parse: %v %v", nodes, err) + } + be, err := BackendFromAllocation(rv1Alloc, "qpu") + if err != nil || be != "ibm_marrakesh" { + t.Fatalf("rv1 qpu parse: %q %v", be, err) + } +}