From f16982531eca9e154b8cfd3e14994cc9d77d3a25 Mon Sep 17 00:00:00 2001 From: vsoch Date: Sat, 6 Jun 2026 12:09:58 -0700 Subject: [PATCH] feat: add deviceplugin, webhook This adds full support for scheduling and running the quantum workload. We do this by allowing addition of custom resource types to fluxion, and in this case, the qpu. These are countable resources that can be thus returned "YES" by a node-level device plugin. I am choosing this approach over DRA for the time being because I want the user to be able to define fluxion type resources in the resource spec and not have to define DeviceClass or ResourceClaim that I still find annoying and clunky. To handle the backend, we add an envar in a webhook before pod creation to expect the envar via an annotation, and that is added in prebind. This was just tested with a quantum pod and vanilla podgroup and works great! Super cool. Signed-off-by: vsoch --- Dockerfile | 14 +- Makefile | 7 +- README.md | 221 +++++++++++++++++++++++--------- cmd/deviceplugin/main.go | 83 ++++++++++++ cmd/webhook/main.go | 82 ++++++++++++ config/quantum-backends.yaml | 12 -- deploy/fluence-resources.yaml | 77 +++++++++++ deploy/fluence.yaml | 107 ++++++++++++++-- deploy/kind-config.yaml | 18 ++- examples/podgroup.yaml | 14 +- examples/quantum-pod.yaml | 30 +++++ go.mod | 7 +- go.sum | 2 - pkg/cluster/cluster.go | 38 +++++- pkg/deviceplugin/plugin.go | 172 +++++++++++++++++++++++++ pkg/fluence/fluence.go | 125 ++++++++++++++---- pkg/placement/placement.go | 116 +++++++++++------ pkg/placement/placement_test.go | 114 ++++++++++++---- pkg/webhook/webhook.go | 201 +++++++++++++++++++++++++++++ pkg/webhook/webhook_test.go | 56 ++++++++ 20 files changed, 1290 insertions(+), 206 deletions(-) create mode 100644 cmd/deviceplugin/main.go create mode 100644 cmd/webhook/main.go delete mode 100644 config/quantum-backends.yaml create mode 100644 deploy/fluence-resources.yaml create mode 100644 examples/quantum-pod.yaml create mode 100644 pkg/deviceplugin/plugin.go create mode 100644 pkg/webhook/webhook.go create mode 100644 pkg/webhook/webhook_test.go diff --git a/Dockerfile b/Dockerfile index cce7802..4dba6f6 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,11 +1,7 @@ +# Mr. Fluence! # Multi-stage build for the fluence scheduler. -# # The scheduler binary cgo-links flux-sched (Fluxion) for resource matching. -# It does NOT depend on QRMI — quantum job submission is a separate workload -# (github.com/converged-computing/qrmi-sampler). So this image needs only -# flux-sched, no Rust/QRMI. Mirrors the .devcontainer build. -# ---------- builder ---------- FROM fluxrm/flux-core:noble AS builder USER root @@ -37,7 +33,9 @@ COPY . . RUN CGO_ENABLED=1 \ CGO_CFLAGS="-I/opt/flux-sched" \ CGO_LDFLAGS="-L/opt/flux-sched/resource -L/opt/flux-sched/resource/libjobspec -L/opt/flux-sched/resource/reapi/bindings -lresource -ljobspec_conv -lreapi_cli -lflux-idset -lstdc++ -lczmq -ljansson -lhwloc -lboost_system -lflux-hostlist -lboost_graph -lyaml-cpp" \ - go build -ldflags '-w' -o /bin/fluence ./cmd/fluence + go build -ldflags '-w' -o /bin/fluence ./cmd/fluence && \ + CGO_ENABLED=0 go build -ldflags '-w' -o /bin/fluence-deviceplugin ./cmd/deviceplugin && \ + CGO_ENABLED=0 go build -ldflags '-w' -o /bin/fluence-webhook ./cmd/webhook FROM fluxrm/flux-core:noble AS runtime @@ -55,4 +53,6 @@ COPY --from=builder /usr/lib/libjobspec_conv.so* /usr/lib/ RUN ldconfig COPY --from=builder /bin/fluence /bin/fluence -ENTRYPOINT ["/bin/fluence"] \ No newline at end of file +COPY --from=builder /bin/fluence-deviceplugin /bin/fluence-deviceplugin +COPY --from=builder /bin/fluence-webhook /bin/fluence-webhook +ENTRYPOINT ["/bin/fluence"] diff --git a/Makefile b/Makefile index 2fde646..5d9328a 100644 --- a/Makefile +++ b/Makefile @@ -18,13 +18,16 @@ CGO_LDFLAGS = -L$(FLUX_SCHED_ROOT)/resource \ -lflux-hostlist -lboost_graph -lyaml-cpp .PHONY: build -build: ## Build the fluence scheduler binary (needs flux-sched) +build: ## Build all binaries (scheduler needs flux-sched; helpers are pure Go) CGO_ENABLED=1 CGO_CFLAGS="$(CGO_CFLAGS)" CGO_LDFLAGS="$(CGO_LDFLAGS)" \ go build -o bin/fluence ./cmd/fluence + CGO_ENABLED=0 go build -o bin/fluence-deviceplugin ./cmd/deviceplugin + CGO_ENABLED=0 go build -o bin/fluence-webhook ./cmd/webhook .PHONY: test test: ## Pure-Go unit tests (no flux, no k8s scheduler libs, no cluster) - go test ./pkg/jgf/... ./pkg/cluster/... ./pkg/jobspec/... ./pkg/placement/... ./pkg/quantum/... + go test ./pkg/jgf/... ./pkg/cluster/... ./pkg/jobspec/... ./pkg/placement/... \ + ./pkg/quantum/... ./pkg/webhook/... ./pkg/deviceplugin/... .PHONY: test-graph test-graph: ## Matcher tests (needs flux-sched) diff --git a/README.md b/README.md index 8b7ca43..eafb62f 100644 --- a/README.md +++ b/README.md @@ -4,119 +4,222 @@ A Kubernetes scheduler plugin that places **pod groups** (and individual pods) by matching them against a [Fluxion](https://github.com/flux-framework/flux-sched) -(flux-sched) resource graph built from the live cluster. +(flux-sched) resource graph built from the live cluster. This is an update from [flux-k8s](https://github.com/flux-framework/flux-k8s) that uses the native PodGroup and optionally allows for scheduling -against **quantum resources** modeled in the same graph. I am also improving -the design by not requiring a sidecar for fluence - the plugin is built as one -container. +against arbitrary resources such as **quantum resources** modeled in the same graph. +I am also improving the design by not requiring a sidecar for fluence, and not +requiring the `kubernetes-sigs/scheduler-plugins` dependency. We use native Gang +scheduling provided by Kubernetes. For quantum resource modeling, we start from the prototype proven out in -[fluxion-quantum](https://github.com/converged-computing/fluxion-quantum). -This design is an improvement upon the initial fluence because we drop -the `kubernetes-sigs/scheduler-plugins` dependency and use Kubernetes -**native gang scheduling** (the `PodGroup` API, `scheduling.k8s.io/v1alpha2`, -alpha in 1.35/1.36). +[fluxion-quantum](https://github.com/converged-computing/fluxion-quantum). ## How it works +### Gang Scheduling + Gang semantics (all-or-nothing) come from the native `PodGroup` API. Fluence is responsible only for **placement**: 1. **Discover** — on startup fluence lists cluster nodes and turns their cpu/memory/gpu capacity into a Fluxion JGF resource graph - (`pkg/cluster` + `pkg/jgf`). Quantum backends from a config file are injected - as `qpu` vertices under a `qgateway` (`AddQuantum`). + (`pkg/cluster` + `pkg/jgf`). If a resources config is provided (via + `FLUENCE_RESOURCES`), its entries (e.g. quantum backends) are injected as + `qpu`/`qubit` vertices. With no config the graph is classical-only. 2. **Match** — when the first pod of a group hits `PreFilter`, fluence builds a - Fluxion jobspec for the whole gang (`pkg/fluence.JobspecForGroup`), asks the + Fluxion jobspec for the whole gang (`pkg/placement.JobspecForGroup`), asks the matcher to allocate (`pkg/graph.FluxionGraph.MatchAllocateSpec`), and parses - the allocation into node names (`PlacementFromAllocation`). -3. **Place** — `Filter` then permits each pod only on its allocated node. - -For a **quantum** pod (one that requests `quantum.flux-framework.org/qpu`), the -match allocates a `qpu` vertex instead of cores; the allocated backend name -(e.g. `ibm_fez`) is what the workload submits to via -[qrmi-go](https://github.com/converged-computing/qrmi-go) (job mode on the IBM -open plan — see fluxion-quantum for that story). - -``` -nodes (kubectl get nodes) ─┐ - ├─► JGF resource graph ─► Fluxion match ─► node + backend placement -quantum-backends.yaml ─────┘ + the allocation into node and backend names (`PlacementFromAllocation`). +3. **Place** — `Filter` permits each pod only on its allocated node. (A + quantum-only pod allocates a `qpu` but no node — the backend is a remote API + any node can reach — so fluence imposes no node constraint in that case.) +4. **Hand off** — for a quantum pod, `PreBind` records the allocated backend on + the pod as the `fluence.flux-framework.org/backend` annotation. The mutating + webhook (installed with the base) injects a downward-API env so the container + reads it as `QRMI_BACKEND` with no boilerplate in the manifest. + +### Design Choices + +While Quantum resources are this first target, notably we should be able to support +any arbitrary resource in the graph. I decided that a pod can request a graph resource generically +e.g., `fluxion.flux-framework.org/` (like `.../qpu: "1"`) and that becomes a jobspec count +of ``. To support this, we deploy a **device plugin** that can advertise these virtual +types on every node. We need to do this because of the in-tree `NodeResourcesFit` endpoint. +If we do not have the device plugin, this call will not be satisfied. Note that +this device plugin will return True for any resources it sees added to the Fluxion resource graph, +but is not actually involved with scheduling. Fluxion does the real matching. + +```console +nodes (kubectl get nodes) ──┐ + ├─► JGF resource graph ─► Fluxion match ─► node + backend placement +fluence-resources ConfigMap ┘ ``` +I am also choosing to keep credentials and qrmi interactions on the level of the application. +I am not comfortable with the design of an operator holding any kind of credential or being +responsible for managing calls with qrmi in a multi-tenant environment. Finally, since +there are (and will continue to be) a lot of environment variables that I do not want +to place on the user to define, we have a webhook to handle this. We can combine an annotation +added with the webhook with a PreBind call to define the annotation to orchestrate that. + ## Build -The scheduler binary links flux-sched (the matcher) and, for quantum, QRMI: +The scheduler binary links flux-sched (the matcher). It does **not** link QRMI — +quantum job submission lives in a separate workload container +([qrmi-sampler](https://github.com/converged-computing/qrmi-sampler)), not here. ```bash -# If you want to debug inside the .devcontainer, use this one -make build # needs flux-sched at /opt/flux-sched and QRMI at /usr/local +# Inside the .devcontainer (flux-sched at /opt/flux-sched): +# builds bin/fluence (cgo+flux) + bin/fluence-deviceplugin + bin/fluence-webhook +make build +make test -# If you want to test outside (and build the docker image, this one) +# Or build the container image (all three binaries): make image ``` -Pure-Go pieces (graph builder, discovery, jobspec, placement) need neither and -are covered by: +## Deploy + +Create a development cluster on a Kubernetes release that supports native gang +scheduling, with the feature gates enabled: ```bash -make test +kind create cluster --image kindest/node:v1.36.1 --config deploy/kind-config.yaml ``` -## Deploy +(See [installing kind](https://kind.sigs.k8s.io/docs/user/quick-start#installing-from-release-binaries).) +The kind config turns on the `GangScheduling` and `GenericWorkload` feature gates +and the `scheduling.k8s.io/v1alpha2` API group on the apiserver and scheduler. In +the future these will likely be enabled by default. -Here is how I am creating a development cluster with a release of Kubernetes that will support -what we need: +Load the image (built above) into the cluster: ```bash -kind create cluster --image kindest/node:v1.36.1 --config deploy/kind-config.yaml +kind load docker-image ghcr.io/converged-computing/fluence:latest ``` -And if you [need to install kind](https://kind.sigs.k8s.io/docs/user/quick-start#installing-from-release-binaries). +### 1. Gang Scheduling +Install the **base** scheduler (this is all you need for classical scheduling — +no device plugin, no quantum): ```bash -# This creates the quantum backends yaml graph -kubectl create configmap fluence-quantum-backends --from-file=quantum-backends.yaml=config/quantum-backends.yaml -n kube-system +kubectl apply -f deploy/fluence.yaml +``` -# load docker image -kind load docker-image ghcr.io/converged-computing/fluence +This installs the scheduler, its RBAC, and the mutating webhook. Pods opt in with +`schedulerName: fluence`; a multi-pod gang adds a `scheduling.k8s.io/pod-group` +label (a single pod is treated as a group of one and needs no label). -kubectl apply -f deploy/fluence.yaml # RBAC + scheduler in kube-system -kubectl apply -f examples/podgroup.yaml # a gang scheduled by fluence -``` +## Testing + +### 1. Classical (a pod group) -This works by enabling the native gang feature on the cluster (kube-scheduler / API server), meaning -the `GangScheduling` and `GenericWorkload` feature gates and the `scheduling.k8s.io/v1alpha2` API group. -In the future these will likely be enabled by default. +The base install is enough. Schedule a gang: -Pods opt in with `schedulerName: fluence` and a `scheduling.k8s.io/pod-group` label; group size can be set explicitly with -`fluence.flux-framework.org/group-size`. +```bash +kubectl apply -f examples/podgroup.yaml +kubectl get pods -o wide +kubectl get events --field-selector reason=Scheduled +kubectl get podgroups.scheduling.k8s.io +``` +```console +NAME POLICY WORKLOAD STATUS AGE +training Gang Scheduled 15s +``` -Note that when you are developing / debugging a group deletion can hang because of finalizers. I do: +And cleanup. ```bash kubectl patch podgroup training -n default --type=merge -p '{"metadata":{"finalizers":null}}' +kubectl delete -f examples/podgroup.yaml ``` -## Quantum +### 2. Quantum -We can bing fluence up with quantum resources by pointing `FLUENCE_QUANTUM_CONFIG` at a backends file (see `config/quantum-backends.yaml`). -Those backends become schedulable `qpu` vertices; a pod requesting `quantum.flux-framework.org/qpu` will be matched to one, and the allocated backend is handed to the workload. +Quantum needs the resources add-on, which supplies the `fluence-resources` +ConfigMap (the single source of truth for which backends exist) **and** the +device plugin that advertises them: + +```bash +kubectl apply -f deploy/fluence-resources.yaml +# The scheduler reads its resources config at startup, so restart it to pick up +# the quantum vertices: +kubectl rollout restart deployment/fluence -n kube-system +``` + +Confirm the device plugin advertised the resources on the nodes: + +```bash +kubectl get nodes -o jsonpath='{range .items[*]}{.metadata.name}{"\t"}{.status.allocatable}{"\n"}{end}' \ + | grep fluxion.flux-framework.org +``` +```console +kind-control-plane {"cpu":"16","ephemeral-storage":"982292956Ki","fluxion.flux-framework.org/qpu":"1k","fluxion.flux-framework.org/qubit":"1k","hugepages-1Gi":"0","hugepages-2Mi":"0","memory":"61400748Ki","pods":"110"} +kind-worker {"cpu":"16","ephemeral-storage":"982292956Ki","fluxion.flux-framework.org/qpu":"1k","fluxion.flux-framework.org/qubit":"1k","hugepages-1Gi":"0","hugepages-2Mi":"0","memory":"61400748Ki","pods":"110"} +kind-worker2 {"cpu":"16","ephemeral-storage":"982292956Ki","fluxion.flux-framework.org/qpu":"1k","fluxion.flux-framework.org/qubit":"1k","hugepages-1Gi":"0","hugepages-2Mi":"0","memory":"61400748Ki","pods":"110"} +``` + +Create the IBM credentials the **workload** uses to submit (in the namespace +where the workload runs — the scheduler itself never needs them): + +```bash +# If you don't have this yet +curl -fsSL https://clis.cloud.ibm.com/install/linux | sudo sh +ibmcloud login --apikey +# 12 for us-east +``` +```bash +export IBM_CLOUD_TOKEN= +export IBM_CLOUD_CRN=$(ibmcloud resource service-instances --service-name quantum-computing --output json | jq -r '.[] | {name: .name, crn: .crn}' | jq -r .crn) +``` + +```bash +kubectl create secret generic ibm-quantum -n default --from-literal=token="$IBM_CLOUD_TOKEN" --from-literal=crn="$IBM_CLOUD_CRN" +``` + +Run a single quantum pod. It just requests `fluxion.flux-framework.org/qpu` — no +group, and no hard-coded backend (the webhook + PreBind supply `QRMI_BACKEND`): + +```bash +kubectl apply -f examples/quantum-pod.yaml +kubectl get pod sampler -o wide + +# fluence's chosen backend, injected as an environment variable: +kubectl get pod sampler -o jsonpath='{.metadata.annotations.fluence\.flux-framework\.org/backend}{"\n"}' +kubectl logs sampler +``` +```console +kubectl logs sampler -f +2026/06/06 19:04:38 submitting sampler job to ibm_marrakesh +{"results": [{"data": {"c": {"samples": ["0x0", "0x1", "0x0", "0x0", "0x1", "0x1", "0x1", "0x1", "0x1", "0x0", "0x0", "0x0", "0x0", "0x1", "0x1", "0x1", "0x1", "0x1", "0x1", "0x1", "0x0", "0x1", "0x0", "0x0", "0x0", "0x1", "0x0", "0x1", "0x0", "0x1", "0x1", "0x0", "0x0", "0x0", "0x0", "0x1", "0x1", "0x1", "0x1", "0x1", "0x1", "0x1", "0x0", "0x0", "0x0", "0x1", "0x0", "0x1", "0x0", "0x0", "0x1", "0x1", "0x1", "0x0", "0x0", "0x0", "0x1", "0x1", "0x1", "0x0", "0x1", "0x1", "0x0", "0x1", "0x0", "0x1", "0x1", "0x1", "0x1", "0x0", "0x0", "0x1", "0x1", "0x1", "0x1", "0x1", "0x1", "0x1", "0x0", "0x0", "0x1", "0x0", "0x1", "0x1", "0x0", "0x1", "0x0", "0x1", "0x1", "0x1", "0x0", "0x0", "0x0", "0x1", "0x1", "0x1", "0x1", "0x0", "0x0", "0x1", "0x0", "0x0", "0x0", "0x0", "0x0", "0x0", "0x1", "0x1", "0x0", "0x1", "0x0", "0x0", "0x1", "0x1", "0x1", "0x1", "0x1", "0x0", "0x1", "0x0", "0x0", "0x0", "0x1", "0x1", "0x1", "0x0", "0x1", "0x1", "0x0", "0x0", "0x1", "0x0", "0x1", "0x0", "0x0", "0x1", "0x0", "0x0", "0x1", "0x1", "0x1", "0x0", "0x1", "0x1", "0x0", "0x0", "0x0", "0x1", "0x1", "0x1", "0x0", "0x1", "0x1", "0x0", "0x0", "0x1", "0x0", "0x1", "0x0", "0x0", "0x0", "0x0", "0x1", "0x1", "0x1", "0x1", "0x1", "0x1", "0x1", "0x0", "0x1", "0x1", "0x1", "0x0", "0x1", "0x1", "0x0", "0x0", "0x0", "0x0", "0x1", "0x0", "0x0", "0x0", "0x0", "0x1", "0x0", "0x0", "0x1", "0x1", "0x1", "0x0", "0x1", "0x0", "0x1", "0x0", "0x1", "0x1", "0x1", "0x1", "0x0", "0x0", "0x1", "0x1", "0x0", "0x1", "0x0", "0x0", "0x0", "0x1", "0x1", "0x1", "0x0", "0x0", "0x0", "0x1", "0x0", "0x0", "0x0", "0x1", "0x0", "0x1", "0x0", "0x0", "0x0", "0x1", "0x1", "0x0", "0x1", "0x0", "0x0", "0x0", "0x1", "0x0", "0x0", "0x1", "0x1", "0x0", "0x0", "0x0", "0x0", "0x0", "0x1", "0x1", "0x1", "0x0", "0x1", "0x1", "0x1", "0x1", "0x1", "0x1", "0x0", "0x0", "0x0", "0x0"], "num_bits": 1}}, "metadata": {"circuit_metadata": {}}}], "metadata": {"execution": {"execution_spans": [[{"date": "2026-06-06T19:04:43.221657"}, {"date": "2026-06-06T19:04:44.372421"}, {"0": [[256], [0, 1], [0, 256]]}]]}, "version": 2}} +2026/06/06 19:04:50 done: 2070 bytes from ibm_marrakesh +``` +Boum! + +### A note on deletion + +When developing/debugging, a PodGroup (or its pods) can hang on delete because of +finalizers (the workload controller may not be running). Clear them with: + +```bash +kubectl patch podgroup training -n default --type=merge -p '{"metadata":{"finalizers":null}}' +``` -**under development** I am still thinking about how to make this request. -V +Importantly, submission is **not** done by the scheduler — the workload container holds the +user's credentials and submits via qrmi-go (job mode on the IBM open plan; see +fluxion-quantum for that story). Fluence only schedules and hands off the backend. +When we actually have control of local quantum devices this will be different. ## License HPCIC DevTools is distributed under the terms of the MIT license. All new contributions must be made under this license. -See [LICENSE](https://github.com/converged-computing/cloud-select/blob/main/LICENSE), -[COPYRIGHT](https://github.com/converged-computing/cloud-select/blob/main/COPYRIGHT), and -[NOTICE](https://github.com/converged-computing/cloud-select/blob/main/NOTICE) for details. +See [LICENSE](LICENSE), [COPYRIGHT](COPYRIGHT), and [NOTICE](NOTICE) for details. -SPDX-License-Identifier: (MIT) +SPDX-License-Identifier: MIT -LLNL-CODE- 842614 +LLNL-CODE-842614 diff --git a/cmd/deviceplugin/main.go b/cmd/deviceplugin/main.go new file mode 100644 index 0000000..8e88e6b --- /dev/null +++ b/cmd/deviceplugin/main.go @@ -0,0 +1,83 @@ +// Command fluence-deviceplugin advertises exotic Fluxion resource types as +// counted extended resources on the node it runs on (one per type, e.g. +// fluxion.flux-framework.org/qpu and .../qubit). Deploy it as a DaemonSet +// alongside the fluence scheduler. +// +// The set of types is derived from the SAME resources config the scheduler uses +// to build its graph (FLUENCE_RESOURCES), so the advertised resources and the +// graph's resource types come from one source and cannot drift. If +// FLUENCE_RESOURCES is unset or the file is absent, nothing is advertised — the +// node stays classical-only. +// +// A quantum backend is a remote API reachable from any node, not a node-local +// device, so each type is advertised at a large per-node ceiling. That count is +// only a local admission gate (so NodeResourcesFit is satisfied); the real gates +// are Fluxion (which backend, is one available) and the user's API limit. +// +// FLUENCE_RESOURCES path to the shared resources config +// (default /etc/fluence/resources.yaml) +// FLUENCE_RESOURCE_CAPACITY per-node ceiling for each type (default 1000) +package main + +import ( + "context" + "log" + "os" + "os/signal" + "strconv" + "sync" + "syscall" + + "github.com/converged-computing/fluence/pkg/cluster" + "github.com/converged-computing/fluence/pkg/deviceplugin" +) + +func main() { + cfgPath := os.Getenv("FLUENCE_RESOURCES") + if cfgPath == "" { + cfgPath = "/etc/fluence/resources.yaml" + } + + var names []string + if data, err := os.ReadFile(cfgPath); err == nil { + qc, perr := cluster.LoadQuantumConfig(data) + if perr != nil { + log.Fatalf("parse resources config %s: %v", cfgPath, perr) + } + names = cluster.FluxionResourceNames(qc.Backends) + log.Printf("derived %d resource(s) from %s: %v", len(names), cfgPath, names) + } else { + log.Printf("no resources config at %s (%v); advertising nothing", cfgPath, err) + } + + capacity := 1000 + if v := os.Getenv("FLUENCE_RESOURCE_CAPACITY"); v != "" { + if n, err := strconv.Atoi(v); err == nil && n > 0 { + capacity = n + } + } + + ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + defer stop() + + if len(names) == 0 { + log.Print("no exotic resources to advertise; idling") + <-ctx.Done() + return + } + + var wg sync.WaitGroup + for _, name := range names { + wg.Add(1) + go func(name string) { + defer wg.Done() + p := deviceplugin.New(name, capacity) + log.Printf("advertising %s capacity=%d", name, capacity) + if err := p.Run(ctx); err != nil { + log.Printf("device plugin %s: %v", name, err) + stop() // bring the process down so the DaemonSet restarts it + } + }(name) + } + wg.Wait() +} diff --git a/cmd/webhook/main.go b/cmd/webhook/main.go new file mode 100644 index 0000000..bc5f816 --- /dev/null +++ b/cmd/webhook/main.go @@ -0,0 +1,82 @@ +// Command fluence-webhook runs fluence's mutating admission webhook. At startup +// it generates a self-signed CA + serving certificate, patches its +// MutatingWebhookConfiguration's caBundle so the apiserver trusts it, then +// serves the /mutate endpoint over HTTPS. No cert-manager or committed keys. +// +// WEBHOOK_SERVICE Service name (default fluence-webhook) +// WEBHOOK_NAMESPACE Service namespace (default kube-system) +// WEBHOOK_CONFIG MutatingWebhookConfiguration name (default fluence-webhook) +// WEBHOOK_ADDR listen address (default :8443) +package main + +import ( + "context" + "crypto/tls" + "log" + "net/http" + "os" + "time" + + "github.com/converged-computing/fluence/pkg/webhook" + + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" +) + +func env(key, def string) string { + if v := os.Getenv(key); v != "" { + return v + } + return def +} + +func main() { + svc := env("WEBHOOK_SERVICE", "fluence-webhook") + ns := env("WEBHOOK_NAMESPACE", "kube-system") + cfgName := env("WEBHOOK_CONFIG", "fluence-webhook") + addr := env("WEBHOOK_ADDR", ":8443") + + dnsNames := []string{ + svc + "." + ns + ".svc", + svc + "." + ns + ".svc.cluster.local", + } + caPEM, certPEM, keyPEM, err := webhook.GenerateCerts(dnsNames) + if err != nil { + log.Fatalf("generate certs: %v", err) + } + cert, err := tls.X509KeyPair(certPEM, keyPEM) + if err != nil { + log.Fatalf("load serving cert: %v", err) + } + + cfg, err := rest.InClusterConfig() + if err != nil { + log.Fatalf("in-cluster config: %v", err) + } + client, err := kubernetes.NewForConfig(cfg) + if err != nil { + log.Fatalf("client: %v", err) + } + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + if err := webhook.EnsureCABundle(ctx, client, cfgName, caPEM); err != nil { + cancel() + log.Fatalf("patch caBundle on %s: %v", cfgName, err) + } + cancel() + log.Printf("patched caBundle on MutatingWebhookConfiguration %q", cfgName) + + mux := http.NewServeMux() + mux.HandleFunc("/mutate", webhook.Handler) + mux.HandleFunc("/healthz", func(w http.ResponseWriter, _ *http.Request) { w.WriteHeader(http.StatusOK) }) + + srv := &http.Server{ + Addr: addr, + Handler: mux, + TLSConfig: &tls.Config{Certificates: []tls.Certificate{cert}}, + } + log.Printf("serving webhook on %s", addr) + if err := srv.ListenAndServeTLS("", ""); err != nil { + log.Fatalf("serve: %v", err) + } +} diff --git a/config/quantum-backends.yaml b/config/quantum-backends.yaml deleted file mode 100644 index 1dcfc14..0000000 --- a/config/quantum-backends.yaml +++ /dev/null @@ -1,12 +0,0 @@ -# Virtual quantum resources to inject into the cluster resource graph. -# Point FLUENCE_QUANTUM_CONFIG at this file to bring up fluence with quantum. -# Each backend becomes a `qpu` vertex (name = QRMI backend id) under a qgateway. -backends: - - name: ibm_fez - num_qubits: 156 - vendor: ibm - qrmi_type: qiskit-runtime-service - - name: ibm_marrakesh - num_qubits: 156 - vendor: ibm - qrmi_type: qiskit-runtime-service diff --git a/deploy/fluence-resources.yaml b/deploy/fluence-resources.yaml new file mode 100644 index 0000000..83a992a --- /dev/null +++ b/deploy/fluence-resources.yaml @@ -0,0 +1,77 @@ +# Resources add-on for fluence. Turns on fluence-managed resources by supplying +# (1) the resources config and (2) the device plugin that advertises them. +# Quantum backends are just the example payload here; any resource type fluence +# can model goes in the same ConfigMap. Apply AFTER deploy/fluence.yaml: +# +# kubectl apply -f deploy/fluence.yaml # base scheduler (no devices) +# kubectl apply -f deploy/fluence-resources.yaml # + resources config + device plugin +# kubectl rollout restart deployment/fluence -n kube-system # scheduler re-reads resources +# +# The base scheduler already mounts the `fluence-resources` ConfigMap optionally +# and reads FLUENCE_RESOURCES, so this add-on is purely additive — no edits to +# the base Deployment. + +# Resources config: the SINGLE source of truth for the resource types fluence +# injects/advertises. The scheduler builds qpu/qubit graph vertices from it; the +# device plugin derives which extended resources to advertise from the SAME +# document (same rule), so the two cannot drift. +apiVersion: v1 +kind: ConfigMap +metadata: + name: fluence-resources + namespace: kube-system +data: + resources.yaml: | + backends: + - name: ibm_fez + num_qubits: 156 + vendor: ibm + qrmi_type: qiskit-runtime-service + - name: ibm_marrakesh + num_qubits: 156 + vendor: ibm + qrmi_type: qiskit-runtime-service +--- +# Device plugin: advertises the exotic Fluxion resource types (derived from the +# resources config above) on every node, so pods can request them via resources +# and NodeResourcesFit is satisfied. +apiVersion: apps/v1 +kind: DaemonSet +metadata: + name: fluence-deviceplugin + namespace: kube-system + labels: {app: fluence-deviceplugin} +spec: + selector: + matchLabels: {app: fluence-deviceplugin} + template: + metadata: + labels: {app: fluence-deviceplugin} + spec: + priorityClassName: system-node-critical + tolerations: + - operator: Exists # run on every node, including tainted/control-plane + containers: + - name: deviceplugin + image: ghcr.io/converged-computing/fluence:latest + imagePullPolicy: Never + command: ["/bin/fluence-deviceplugin"] + env: + - name: FLUENCE_RESOURCES + value: /etc/fluence/resources.yaml + - name: FLUENCE_RESOURCE_CAPACITY + value: "1000" + securityContext: + privileged: true + volumeMounts: + - name: device-plugin + mountPath: /var/lib/kubelet/device-plugins + - name: resources + mountPath: /etc/fluence + volumes: + - name: device-plugin + hostPath: + path: /var/lib/kubelet/device-plugins + - name: resources + configMap: + name: fluence-resources diff --git a/deploy/fluence.yaml b/deploy/fluence.yaml index 89b17a5..a16dcda 100644 --- a/deploy/fluence.yaml +++ b/deploy/fluence.yaml @@ -62,6 +62,16 @@ rules: - apiGroups: ["coordination.k8s.io"] resources: ["leases"] verbs: ["create", "get", "update", "list", "watch"] + # PreBind stamps the allocated backend onto the pod as an annotation; the + # built-in system:kube-scheduler role only allows patching pods/status, not + # the pod object, so grant it here. + - apiGroups: [""] + resources: ["pods"] + verbs: ["get", "list", "watch", "patch", "update"] + # The webhook self-manages its TLS by patching its own config's caBundle. + - apiGroups: ["admissionregistration.k8s.io"] + resources: ["mutatingwebhookconfigurations"] + verbs: ["get", "list", "watch", "patch"] --- apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRoleBinding @@ -90,9 +100,11 @@ data: profiles: - schedulerName: fluence plugins: - preFilter: - enabled: [{name: Fluence}] - filter: + # multiPoint wires Fluence into every extension point its Go type + # implements: PreFilter, Filter, and PreBind (which stamps the backend + # annotation). Listing points individually risks omitting one — that is + # exactly what left PreBind unwired and the backend annotation unset. + multiPoint: enabled: [{name: Fluence}] --- apiVersion: apps/v1 @@ -113,19 +125,21 @@ spec: containers: - name: fluence image: ghcr.io/converged-computing/fluence:latest - # For development only imagePullPolicy: Never command: - /bin/fluence - --config=/etc/fluence/scheduler-config.yaml + # fluence is its own scheduler binary, so it needs the gang gates set + # here (the cluster-level kube-scheduler gates don't apply to it). + # Without these its PodGroup/GangScheduling plugin is inactive, pods + # schedule with no gang semantics, and PodGroup status stays Pending. + - --feature-gates=GenericWorkload=true,GangScheduling=true - --v=4 env: - - name: FLUENCE_QUANTUM_CONFIG - value: /etc/fluence/quantum-backends.yaml - - name: IBM_CLOUD_TOKEN - valueFrom: {secretKeyRef: {name: ibm-quantum, key: token, optional: true}} - - name: IBM_CLOUD_CRN - valueFrom: {secretKeyRef: {name: ibm-quantum, key: crn, optional: true}} + # Path to the resources config (e.g. quantum backends). Unset/empty + # file -> classical-only graph. Supplied by the quantum add-on. + - name: FLUENCE_RESOURCES + value: /etc/fluence/resources.yaml volumeMounts: - name: config mountPath: /etc/fluence @@ -134,4 +148,75 @@ spec: projected: sources: - configMap: {name: fluence-scheduler-config} - - configMap: {name: fluence-quantum-backends, optional: true} \ No newline at end of file + - configMap: {name: fluence-resources, optional: true} +--- +# Mutating webhook: injects scheduler-chosen values into pods at creation time +# (currently a downward-API QRMI_BACKEND env for quantum pods). It self-manages +# TLS — generates a CA + serving cert at startup and patches the caBundle below — +# so no cert-manager and no committed keys. failurePolicy Ignore keeps a webhook +# outage from blocking pod creation cluster-wide. +apiVersion: apps/v1 +kind: Deployment +metadata: + name: fluence-webhook + namespace: kube-system + labels: {app: fluence-webhook} +spec: + replicas: 1 + selector: + matchLabels: {app: fluence-webhook} + template: + metadata: + labels: {app: fluence-webhook} + spec: + serviceAccountName: fluence + containers: + - name: webhook + image: ghcr.io/converged-computing/fluence:latest + imagePullPolicy: Never + command: ["/bin/fluence-webhook"] + ports: + - containerPort: 8443 + readinessProbe: + httpGet: {path: /healthz, port: 8443, scheme: HTTPS} + initialDelaySeconds: 2 +--- +apiVersion: v1 +kind: Service +metadata: + name: fluence-webhook + namespace: kube-system +spec: + selector: {app: fluence-webhook} + ports: + - port: 443 + targetPort: 8443 +--- +apiVersion: admissionregistration.k8s.io/v1 +kind: MutatingWebhookConfiguration +metadata: + name: fluence-webhook +webhooks: + - name: pods.fluence.flux-framework.org + admissionReviewVersions: ["v1"] + sideEffects: None + failurePolicy: Ignore # never block pod creation if the webhook is down + # caBundle is filled in at runtime by the webhook patching this object. + clientConfig: + service: + name: fluence-webhook + namespace: kube-system + path: /mutate + port: 443 + rules: + - apiGroups: [""] + apiVersions: ["v1"] + operations: ["CREATE"] + resources: ["pods"] + scope: Namespaced + # Don't intercept system pods (and avoid bootstrap coupling). + namespaceSelector: + matchExpressions: + - key: kubernetes.io/metadata.name + operator: NotIn + values: ["kube-system"] \ No newline at end of file diff --git a/deploy/kind-config.yaml b/deploy/kind-config.yaml index 811b40e..1ef46da 100644 --- a/deploy/kind-config.yaml +++ b/deploy/kind-config.yaml @@ -1,8 +1,10 @@ -# kind create cluster --image kindest/node:v1.36.1 --config deploy/kind-config.yaml +# kind create cluster --image kindest/node:v1.36.1 --config deploy/kind-config-v1.36.yaml # # Enables the alpha Workload / PodGroup gang-scheduling feature (off by default # in 1.36). Gates are set per-component because GangScheduling is scheduler-only -# (setting it on the apiserver would be rejected as an unknown gate). +# (setting it on the apiserver would be rejected as an unknown gate). extraArgs +# uses the kubeadm v1beta4 list form ({name,value}); the old map form is silently +# mishandled on 1.36 and the gates never apply. kind: Cluster apiVersion: kind.x-k8s.io/v1alpha4 nodes: @@ -12,11 +14,13 @@ nodes: kind: ClusterConfiguration apiServer: extraArgs: - # Turn on the alpha API group + the API itself. - runtime-config: "scheduling.k8s.io/v1alpha2=true" - feature-gates: "GenericWorkload=true" + - name: runtime-config + value: "scheduling.k8s.io/v1alpha2=true" + - name: feature-gates + value: "GenericWorkload=true" scheduler: extraArgs: - feature-gates: "GenericWorkload=true,GangScheduling=true" + - name: feature-gates + value: "GenericWorkload=true,GangScheduling=true" + - role: worker - role: worker - - role: worker \ No newline at end of file diff --git a/examples/podgroup.yaml b/examples/podgroup.yaml index a59e995..068e56c 100644 --- a/examples/podgroup.yaml +++ b/examples/podgroup.yaml @@ -1,10 +1,10 @@ -# Native gang scheduling (k8s >= 1.35, GangScheduling/GenericWorkload gates on). -# Fluence does placement; the PodGroup gives all-or-nothing semantics. +# Native gang scheduling (k8s >= 1.36, GangScheduling/GenericWorkload gates on). +# Fluence does placement; the PodGroup gives all-or-nothing semantics. Pods link +# to the PodGroup via the first-class field spec.schedulingGroup.podGroupName. apiVersion: scheduling.k8s.io/v1alpha2 kind: PodGroup metadata: name: training - namespace: default spec: schedulingPolicy: gang: @@ -14,7 +14,6 @@ apiVersion: apps/v1 kind: Deployment metadata: name: training - namespace: default spec: replicas: 2 selector: @@ -23,14 +22,15 @@ spec: metadata: labels: app: training - scheduling.k8s.io/pod-group: training spec: schedulerName: fluence + schedulingGroup: + podGroupName: training containers: - name: worker image: busybox command: ["sleep", "3600"] resources: requests: - cpu: "2" - memory: 8Gi + cpu: "4" + memory: 8Gi \ No newline at end of file diff --git a/examples/quantum-pod.yaml b/examples/quantum-pod.yaml new file mode 100644 index 0000000..a619df9 --- /dev/null +++ b/examples/quantum-pod.yaml @@ -0,0 +1,30 @@ +# A quantum workload scheduled by fluence. The pod REQUESTS a quantum backend +# via resources (the fluence device plugin advertises fluxion.flux-framework.org/qpu +# on every node, so NodeResourcesFit is satisfied). Fluence's PreFilter matches +# the request against the resource graph and picks a backend, the webhook injects +# QRMI_BACKEND (the allocated backend) automatically, and note we can add other +# envars here in the future. I chose a webhook because I think this is going to +# be a requirement, and the pod is immutable after creation. +# Then the container submits via qrmi-go (the separate qrmi-sampler image). +# the credentials live on the level of the application container NOT in +# a shared space. +apiVersion: v1 +kind: Pod +metadata: + name: sampler +spec: + schedulerName: fluence + restartPolicy: Never + containers: + - name: sampler + image: ghcr.io/converged-computing/qrmi-sampler:latest + env: + - name: IBM_CLOUD_TOKEN + valueFrom: {secretKeyRef: {name: ibm-quantum, key: token}} + - name: IBM_CLOUD_CRN + valueFrom: {secretKeyRef: {name: ibm-quantum, key: crn}} + resources: + requests: + fluxion.flux-framework.org/qpu: "1" + limits: + fluxion.flux-framework.org/qpu: "1" \ No newline at end of file diff --git a/go.mod b/go.mod index df802a2..7f712a1 100644 --- a/go.mod +++ b/go.mod @@ -3,11 +3,13 @@ module github.com/converged-computing/fluence go 1.26.0 require ( - github.com/converged-computing/qrmi-go v0.0.0-20260605012309-c5c8239ecbba github.com/flux-framework/flux-sched/resource/reapi/bindings/go v0.0.0-20260526195258-f0e815f1f354 + google.golang.org/grpc v1.79.3 k8s.io/api v0.36.0 k8s.io/apimachinery v0.36.0 k8s.io/component-base v0.36.0 + k8s.io/kube-scheduler v0.36.0 + k8s.io/kubelet v0.36.0 k8s.io/kubernetes v1.36.0 sigs.k8s.io/yaml v1.6.0 ) @@ -91,7 +93,6 @@ require ( golang.org/x/time v0.14.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20260128011058-8636f8732409 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20260128011058-8636f8732409 // indirect - google.golang.org/grpc v1.79.3 // indirect google.golang.org/protobuf v1.36.12-0.20260120151049-f2248ac996af // indirect gopkg.in/evanphx/json-patch.v4 v4.13.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect @@ -108,8 +109,6 @@ require ( k8s.io/klog/v2 v2.140.0 // indirect k8s.io/kms v0.36.0 // indirect k8s.io/kube-openapi v0.0.0-20260317180543-43fb72c5454a // indirect - k8s.io/kube-scheduler v0.36.0 // indirect - k8s.io/kubelet v0.36.0 // indirect k8s.io/streaming v0.36.0 // indirect k8s.io/utils v0.0.0-20260210185600-b8788abfbbc2 // indirect sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.34.0 // indirect diff --git a/go.sum b/go.sum index d7b875c..a67f060 100644 --- a/go.sum +++ b/go.sum @@ -14,8 +14,6 @@ github.com/cenkalti/backoff/v5 v5.0.3 h1:ZN+IMa753KfX5hd8vVaMixjnqRZ3y8CuJKRKj1x github.com/cenkalti/backoff/v5 v5.0.3/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= -github.com/converged-computing/qrmi-go v0.0.0-20260605012309-c5c8239ecbba h1:hRm9gcU/geUqAl8OgYSxF4pO/oqanKxxpLZJEzSwzcE= -github.com/converged-computing/qrmi-go v0.0.0-20260605012309-c5c8239ecbba/go.mod h1:BvdrzMeplw5UgflH0s/VpyuN0eUxXMKDzf5DAC4oQhk= github.com/coreos/go-semver v0.3.1 h1:yi21YpKnrx1gt5R+la8n5WgS0kCrsPp33dmEyHReZr4= github.com/coreos/go-semver v0.3.1/go.mod h1:irMmmIw/7yzSRPWryHsK7EYSg09caPQL03VsM8rvUec= github.com/coreos/go-systemd/v22 v22.7.0 h1:LAEzFkke61DFROc7zNLX/WA2i5J8gYqe0rSj9KI28KA= diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index db383f1..6bb572a 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -6,8 +6,10 @@ package cluster import ( "fmt" + "sort" "github.com/converged-computing/fluence/pkg/jgf" + "github.com/converged-computing/fluence/pkg/placement" corev1 "k8s.io/api/core/v1" "sigs.k8s.io/yaml" ) @@ -94,6 +96,31 @@ func BuildGraph(nodes []corev1.Node, opts Options) ([]byte, error) { return b.JSON() } +// FluxionResourceNames returns the distinct extended-resource names a device +// plugin should advertise for a set of quantum backends. It uses the SAME +// type-derivation rule as AddQuantum — each backend is a `qpu`, and a backend +// with num_qubits > 0 contributes `qubit` — so the device plugin's advertised +// resources and the graph's resource types are derived from one config and +// cannot drift. Names are prefixed with placement.FluxionResourcePrefix so they +// match what the scheduler strips off a pod request. +func FluxionResourceNames(backends []QuantumBackend) []string { + types := map[string]bool{} + if len(backends) > 0 { + types["qpu"] = true + } + for _, b := range backends { + if b.NumQubits > 0 { + types["qubit"] = true + } + } + names := make([]string, 0, len(types)) + for t := range types { + names = append(names, placement.FluxionResourcePrefix+t) + } + sort.Strings(names) + return names +} + // AddQuantum injects a qgateway under the cluster with one qpu vertex per // backend. Exposed so a graph built elsewhere can be augmented the same way. func AddQuantum(b *jgf.Builder, cluster *jgf.Vertex, backends []QuantumBackend) { @@ -108,11 +135,18 @@ func AddQuantum(b *jgf.Builder, cluster *jgf.Vertex, backends []QuantumBackend) if be.Vendor != "" { props["vendor"] = be.Vendor } - b.AddChild(gw, "qpu", "qpu", jgf.Options{ + qpu := b.AddChild(gw, "qpu", "qpu", jgf.Options{ Name: be.Name, Exclusive: true, Properties: props, }) + // Model qubits as a counted child so a request for N qubits matches a + // backend with at least that many (Fluxion count matching is >=). This + // is how the numeric "at least N qubits" ask is expressed without a + // numeric constraint (RFC 31 properties are boolean tags, not >=). + if be.NumQubits > 0 { + b.AddChild(qpu, "qubit", "qubit", jgf.Options{Size: int64(be.NumQubits)}) + } } } @@ -144,4 +178,4 @@ func orDefault(s, def string) string { return def } return s -} +} \ No newline at end of file diff --git a/pkg/deviceplugin/plugin.go b/pkg/deviceplugin/plugin.go new file mode 100644 index 0000000..cc98012 --- /dev/null +++ b/pkg/deviceplugin/plugin.go @@ -0,0 +1,172 @@ +package deviceplugin + +// Package deviceplugin implements a Kubernetes device plugin that advertises +// "quantum reachability" as a counted extended resource on every node. +// +// A quantum backend is not hardware attached to a node — it is a remote API any +// node can call (subject to the user's access). So the plugin advertises a large +// per-node ceiling of a single counted resource (quantum.flux-framework.org/qpu): +// this is what lets a pod write `resources.requests: {quantum.flux-framework.org/qpu: "1"}` +// and have the in-tree NodeResourcesFit plugin be satisfied (no wrapper needed). +// +// The count is a local admission gate only. Whether a backend is actually +// available and which one is matched is decided by Fluxion in the scheduler, and +// the real per-user limit lives on the IBM API — neither of which is node-local, +// which is exactly why the ceiling is large rather than a true quota. + +import ( + "context" + "fmt" + "net" + "os" + "path/filepath" + "strings" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1" +) + +// Plugin is a device-plugin server for a single counted resource. +type Plugin struct { + pluginapi.UnimplementedDevicePluginServer + + resourceName string + capacity int + socket string + + server *grpc.Server + devices []*pluginapi.Device + stop chan struct{} +} + +// New builds a plugin advertising `capacity` units of resourceName. The socket +// and device IDs are derived from the resource name so multiple plugins can run +// in one process without colliding. +func New(resourceName string, capacity int) *Plugin { + tag := sanitize(resourceName) + devs := make([]*pluginapi.Device, 0, capacity) + for i := 0; i < capacity; i++ { + devs = append(devs, &pluginapi.Device{ + ID: fmt.Sprintf("%s-%d", tag, i), + Health: pluginapi.Healthy, + }) + } + sock := filepath.Join(pluginapi.DevicePluginPath, tag+".sock") + return &Plugin{ + resourceName: resourceName, + capacity: capacity, + socket: sock, + devices: devs, + stop: make(chan struct{}), + } +} + +// sanitize turns a resource name into a filesystem/identifier-safe tag, e.g. +// "fluxion.flux-framework.org/qpu" -> "fluxion-flux-framework-org-qpu". +func sanitize(name string) string { + repl := func(r rune) rune { + if (r >= 'a' && r <= 'z') || (r >= 'A' && r <= 'Z') || (r >= '0' && r <= '9') { + return r + } + return '-' + } + return strings.Map(repl, name) +} + +// Run serves the plugin and registers it with the kubelet, blocking until the +// context is cancelled. +func (p *Plugin) Run(ctx context.Context) error { + if err := p.serve(); err != nil { + return err + } + defer p.server.Stop() + + if err := p.register(ctx); err != nil { + return fmt.Errorf("register with kubelet: %w", err) + } + + <-ctx.Done() + close(p.stop) + return nil +} + +func (p *Plugin) serve() error { + if err := os.Remove(p.socket); err != nil && !os.IsNotExist(err) { + return err + } + lis, err := net.Listen("unix", p.socket) + if err != nil { + return fmt.Errorf("listen on %s: %w", p.socket, err) + } + p.server = grpc.NewServer() + pluginapi.RegisterDevicePluginServer(p.server, p) + go func() { _ = p.server.Serve(lis) }() + return nil +} + +func (p *Plugin) register(ctx context.Context) error { + conn, err := grpc.NewClient( + "unix://"+pluginapi.KubeletSocket, + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + if err != nil { + return err + } + defer conn.Close() + + client := pluginapi.NewRegistrationClient(conn) + _, err = client.Register(ctx, &pluginapi.RegisterRequest{ + Version: pluginapi.Version, + Endpoint: filepath.Base(p.socket), + ResourceName: p.resourceName, + }) + return err +} + +// GetDevicePluginOptions: no pre-start hook needed. +func (p *Plugin) GetDevicePluginOptions(context.Context, *pluginapi.Empty) (*pluginapi.DevicePluginOptions, error) { + return &pluginapi.DevicePluginOptions{PreStartRequired: false}, nil +} + +// ListAndWatch streams the (static) device list to the kubelet. +func (p *Plugin) ListAndWatch(_ *pluginapi.Empty, stream pluginapi.DevicePlugin_ListAndWatchServer) error { + if err := stream.Send(&pluginapi.ListAndWatchResponse{Devices: p.devices}); err != nil { + return err + } + // Static list: keep the stream open, re-sending periodically until shutdown. + ticker := time.NewTicker(60 * time.Second) + defer ticker.Stop() + for { + select { + case <-p.stop: + return nil + case <-ticker.C: + if err := stream.Send(&pluginapi.ListAndWatchResponse{Devices: p.devices}); err != nil { + return err + } + } + } +} + +// Allocate is a no-op: a quantum "device" is just a reachability token, so no +// env vars, mounts, or device nodes are injected. The workload gets its backend +// from the scheduler and its credentials from a Secret. +func (p *Plugin) Allocate(_ context.Context, req *pluginapi.AllocateRequest) (*pluginapi.AllocateResponse, error) { + resp := &pluginapi.AllocateResponse{} + for range req.ContainerRequests { + resp.ContainerResponses = append(resp.ContainerResponses, &pluginapi.ContainerAllocateResponse{}) + } + return resp, nil +} + +// GetPreferredAllocation: no preference. +func (p *Plugin) GetPreferredAllocation(context.Context, *pluginapi.PreferredAllocationRequest) (*pluginapi.PreferredAllocationResponse, error) { + return &pluginapi.PreferredAllocationResponse{}, nil +} + +// PreStartContainer: nothing to do. +func (p *Plugin) PreStartContainer(context.Context, *pluginapi.PreStartContainerRequest) (*pluginapi.PreStartContainerResponse, error) { + return &pluginapi.PreStartContainerResponse{}, nil +} diff --git a/pkg/fluence/fluence.go b/pkg/fluence/fluence.go index a615839..82f5943 100644 --- a/pkg/fluence/fluence.go +++ b/pkg/fluence/fluence.go @@ -14,6 +14,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" fwk "k8s.io/kube-scheduler/framework" ) @@ -37,13 +38,15 @@ type Fluence struct { matcher *graph.FluxionGraph mu sync.Mutex - // placement maps a pod-group key to the nodes chosen for the group. - placement map[string][]string + // placement maps a pod-group key to the placement chosen for the group + // (nodes + allocated backend). + placement map[string]placement.Placement } var ( _ fwk.PreFilterPlugin = (*Fluence)(nil) _ fwk.FilterPlugin = (*Fluence)(nil) + _ fwk.PreBindPlugin = (*Fluence)(nil) ) // New builds the plugin: discover cluster nodes, optionally inject quantum @@ -51,7 +54,8 @@ var ( // // Configuration (for now via env; can move to plugin args): // -// FLUENCE_QUANTUM_CONFIG path to a YAML/JSON list of quantum backends +// FLUENCE_RESOURCES path to a YAML/JSON resources config (e.g. quantum +// backends). Unset = classical-only graph. // FLUENCE_MATCH_POLICY Fluxion match policy (default "first") func New(ctx context.Context, _ runtime.Object, h fwk.Handle) (fwk.Plugin, error) { // List nodes via the API. The scheduler's shared snapshot is empty at @@ -64,19 +68,25 @@ func New(ctx context.Context, _ runtime.Object, h fwk.Handle) (fwk.Plugin, error return nil, fmt.Errorf("list nodes: %w", err) } - // Classical compute always comes from the cluster nodes. Quantum resources - // are added only when a backends config is provided. + // Classical compute always comes from the cluster nodes. Quantum/other + // resources are added only when a resources config is present. FLUENCE_RESOURCES + // is set on the base scheduler but the file only exists once the resources + // add-on is applied, so a missing file is normal (classical-only), not fatal. opts := cluster.Options{} - if path := os.Getenv("FLUENCE_QUANTUM_CONFIG"); path != "" { + if path := os.Getenv("FLUENCE_RESOURCES"); path != "" { raw, err := os.ReadFile(path) - if err != nil { - return nil, fmt.Errorf("read quantum config: %w", err) + switch { + case err == nil: + qc, err := cluster.LoadQuantumConfig(raw) + if err != nil { + return nil, err + } + opts.Quantum = qc.Backends + case os.IsNotExist(err): + // No resources config mounted -> classical-only graph. + default: + return nil, fmt.Errorf("read resources config %s: %w", path, err) } - qc, err := cluster.LoadQuantumConfig(raw) - if err != nil { - return nil, err - } - opts.Quantum = qc.Backends } jgfBytes, err := cluster.BuildGraph(nodeList.Items, opts) @@ -100,7 +110,7 @@ func New(ctx context.Context, _ runtime.Object, h fwk.Handle) (fwk.Plugin, error return &Fluence{ handle: h, matcher: matcher, - placement: map[string][]string{}, + placement: map[string]placement.Placement{}, }, nil } @@ -147,16 +157,21 @@ func (f *Fluence) PreFilter( if err != nil { return nil, fwk.AsStatus(err) } - if len(place.Nodes) == 0 { - return nil, fwk.NewStatus(fwk.Unschedulable, "fluxion returned no node placement") + if len(place.Nodes) == 0 && place.Backend == "" { + return nil, fwk.NewStatus(fwk.Unschedulable, "fluxion returned no allocation") } + // Note: a quantum-only allocation has a Backend but no Nodes (a qpu vertex + // lives under the qgateway, not under a compute node). That is valid — the + // backend is a remote API reachable from any node — so we do not require a + // node here; Filter imposes no node constraint in that case. f.mu.Lock() - f.placement[group] = place.Nodes + f.placement[group] = place f.mu.Unlock() - // place.Backend (quantum) would be recorded on the pod(s) here so the - // workload knows which QRMI backend to submit to (e.g. via annotation/env). + // The allocated backend is recorded onto each pod in PreBind (container env + // is immutable post-creation, but annotations can be patched); the + // webhook-injected downward-API env then surfaces it as QRMI_BACKEND. return nil, fwk.NewStatus(fwk.Success) } @@ -173,9 +188,16 @@ func (f *Fluence) Filter( group := groupKey(pod) f.mu.Lock() - nodes := f.placement[group] + nodes := f.placement[group].Nodes f.mu.Unlock() + // A quantum-only allocation pins no node (the backend is a remote API any + // node can reach), so impose no constraint; the qpu device plugin already + // gates which nodes can admit the pod. + if len(nodes) == 0 { + return fwk.NewStatus(fwk.Success) + } + for _, n := range nodes { if n == nodeInfo.Node().Name { return fwk.NewStatus(fwk.Success) @@ -184,29 +206,78 @@ func (f *Fluence) Filter( return fwk.NewStatus(fwk.Unschedulable, "node not in fluxion allocation for this group") } -// groupPods returns the pods belonging to the same group as pod, by label. +// PreBindPreFlight runs before PreBind. It returns Success when this plugin has +// a backend to stamp on the pod (a quantum group), and Skip otherwise so the +// framework doesn't call PreBind needlessly. It is lightweight: it only reads +// the cached group placement, no API calls. +func (f *Fluence) PreBindPreFlight( + ctx context.Context, + state fwk.CycleState, + pod *corev1.Pod, + nodeName string, +) (*fwk.PreBindPreFlightResult, *fwk.Status) { + f.mu.Lock() + backend := f.placement[groupKey(pod)].Backend + f.mu.Unlock() + if backend == "" { + return nil, fwk.NewStatus(fwk.Skip) + } + return nil, fwk.NewStatus(fwk.Success) +} + +// PreBind writes the backend Fluxion allocated for this pod's group onto the pod +// as the annotation placement.BackendAnnotation. The mutating webhook has +// already wired a downward-API env (QRMI_BACKEND) that reads this annotation, so +// the container sees the backend as an ordinary env var. Container env cannot be +// patched after creation, which is why the value travels via an annotation. +func (f *Fluence) PreBind( + ctx context.Context, + state fwk.CycleState, + pod *corev1.Pod, + nodeName string, +) *fwk.Status { + f.mu.Lock() + backend := f.placement[groupKey(pod)].Backend + f.mu.Unlock() + if backend == "" { + return fwk.NewStatus(fwk.Success) // nothing to do; PreBindPreFlight skips these + } + + patch := fmt.Sprintf(`{"metadata":{"annotations":{%q:%q}}}`, placement.BackendAnnotation, backend) + _, err := f.handle.ClientSet().CoreV1().Pods(pod.Namespace).Patch( + ctx, pod.Name, types.MergePatchType, []byte(patch), metav1.PatchOptions{}) + if err != nil { + return fwk.AsStatus(fmt.Errorf("stamp backend annotation: %w", err)) + } + return fwk.NewStatus(fwk.Success) +} + +// groupPods returns the pods belonging to the same native PodGroup as pod +// (spec.schedulingGroup.podGroupName). That field is not label-selectable, so we +// list the namespace and filter in code. A pod with no scheduling group is its +// own group of one. func (f *Fluence) groupPods(pod *corev1.Pod) ([]corev1.Pod, error) { - group := pod.Labels[placement.PodGroupLabel] + group := placement.PodGroupName(pod) if group == "" { - // Singleton pod: treat it as its own group of one. return []corev1.Pod{*pod}, nil } - sel := labels.SelectorFromSet(labels.Set{placement.PodGroupLabel: group}) list, err := f.handle.SharedInformerFactory().Core().V1().Pods().Lister(). - Pods(pod.Namespace).List(sel) + Pods(pod.Namespace).List(labels.Everything()) if err != nil { return nil, err } out := make([]corev1.Pod, 0, len(list)) for _, p := range list { - out = append(out, *p) + if placement.PodGroupName(p) == group { + out = append(out, *p) + } } return out, nil } // groupKey is the cache key for a pod's group (namespace-scoped). func groupKey(pod *corev1.Pod) string { - if g := pod.Labels[placement.PodGroupLabel]; g != "" { + if g := placement.PodGroupName(pod); g != "" { return pod.Namespace + "/" + g } return pod.Namespace + "/" + pod.Name diff --git a/pkg/placement/placement.go b/pkg/placement/placement.go index e106540..2b65aad 100644 --- a/pkg/placement/placement.go +++ b/pkg/placement/placement.go @@ -2,6 +2,8 @@ package placement import ( "fmt" + "sort" + "strings" "github.com/converged-computing/fluence/pkg/jobspec" "github.com/converged-computing/fluence/pkg/quantum" @@ -9,65 +11,97 @@ import ( ) const ( - // QuantumResource is the extended resource a pod requests to be placed on a - // quantum backend (a qpu vertex) instead of classical compute. - QuantumResource corev1.ResourceName = "quantum.flux-framework.org/qpu" + // FluxionResourcePrefix marks an extended resource whose suffix is a Fluxion + // graph type. A request for fluxion.flux-framework.org/ is translated + // generically into a jobspec count of — no per-type code. Anything the + // graph models as a count (qpu, qubit, ...) is requestable this way. + FluxionResourcePrefix = "fluxion.flux-framework.org/" - // PodGroupLabel and PodGroupSizeLabel mirror the native PodGroup wiring; a - // pod carries its group name and the group's total size so the scheduler can - // match the whole gang at once. - PodGroupLabel = "scheduling.k8s.io/pod-group" - PodGroupSizeLabel = "fluence.flux-framework.org/group-size" + // BackendAnnotation is where the scheduler records the Fluxion-allocated + // backend for a pod. The mutating webhook wires a downward-API env + // (QRMI_BACKEND) that reads this annotation. + BackendAnnotation = "fluence.flux-framework.org/backend" ) -// podRes is the classical/quantum resource ask distilled from a pod. -type podRes struct { - cpu int - gpu int - quantum bool +// PodGroupName returns the native (Kubernetes 1.36) scheduling-group name a pod +// belongs to, from spec.schedulingGroup.podGroupName, or "" if the pod is not +// part of a group. This is the first-class field that links a Pod to its +// PodGroup object; the pre-1.36 label/annotation pattern is gone. +func PodGroupName(pod *corev1.Pod) string { + if sg := pod.Spec.SchedulingGroup; sg != nil && sg.PodGroupName != nil { + return *sg.PodGroupName + } + return "" } -// podResources sums container requests into whole cores/gpus and detects a -// quantum request. -func podResources(p *corev1.Pod) podRes { - var r podRes +// podResources distills a pod's container requests into Fluxion resource counts +// keyed by Fluxion graph type (e.g. "core", "gpu", "qpu", "qubit"). +// +// Kubernetes names its native resources (cpu, memory, nvidia.com/gpu), so those +// get a small fixed mapping to graph types. Every resource named +// fluxion.flux-framework.org/ is passed through generically as , +// with no knowledge of what the type means — if the graph has it as a count, +// Fluxion will verify it. +func podResources(p *corev1.Pod) map[string]int { + counts := map[string]int{} for i := range p.Spec.Containers { - req := p.Spec.Containers[i].Resources.Requests - if q, ok := req[corev1.ResourceCPU]; ok { - r.cpu += int(q.Value()) // Value() rounds millicores up to whole cores - } - if q, ok := req["nvidia.com/gpu"]; ok { - r.gpu += int(q.Value()) - } - if _, ok := req[QuantumResource]; ok { - r.quantum = true + for name, q := range p.Spec.Containers[i].Resources.Requests { + switch { + case name == corev1.ResourceCPU: + counts["core"] += int(q.Value()) // rounds millicores up to whole cores + case name == corev1.ResourceMemory: + counts["memory"] += int(q.Value() / (1000 * 1000)) // bytes -> MB + case name == "nvidia.com/gpu": + counts["gpu"] += int(q.Value()) + case strings.HasPrefix(string(name), FluxionResourcePrefix): + t := strings.TrimPrefix(string(name), FluxionResourcePrefix) + counts[t] += int(q.Value()) + } } } - if !r.quantum && r.cpu == 0 { - r.cpu = 1 // every classical pod needs at least one core to match + // A pod that requested no exotic (non-classical) resource still needs at + // least one core to land on. + if !hasExotic(counts) && counts["core"] == 0 { + counts["core"] = 1 + } + return counts +} + +// hasExotic reports whether counts contains any non-classical type (i.e. one +// that came through the Fluxion prefix, like qpu/qubit). +func hasExotic(counts map[string]int) bool { + for t := range counts { + switch t { + case "core", "memory", "gpu": + default: + return true + } } - return r + return false } // JobspecForGroup builds a Fluxion jobspec for a whole pod group: a slot per pod -// (count = group size), each holding the per-pod resources. The group is -// assumed homogeneous (same shape per pod), which is the common case for a gang; -// heterogeneous groups are a TODO. +// (count = group size), each holding the per-pod resources as `with` entries — +// one per requested Fluxion type. A hybrid pod (e.g. cores + a qpu) produces a +// slot with both, so classical and quantum are requested together. The group is +// assumed homogeneous (same shape per pod); heterogeneous groups are a TODO. func JobspecForGroup(groupName string, pods []corev1.Pod) (*jobspec.Jobspec, error) { if len(pods) == 0 { return nil, fmt.Errorf("pod group %q has no pods", groupName) } - r := podResources(&pods[0]) + counts := podResources(&pods[0]) + + // Deterministic order for stable jobspecs/tests. + types := make([]string, 0, len(counts)) + for t := range counts { + types = append(types, t) + } + sort.Strings(types) var with []jobspec.Resource - if r.quantum { - with = []jobspec.Resource{{Type: "qpu", Count: 1}} - } else { - if r.cpu > 0 { - with = append(with, jobspec.Resource{Type: "core", Count: r.cpu}) - } - if r.gpu > 0 { - with = append(with, jobspec.Resource{Type: "gpu", Count: r.gpu}) + for _, t := range types { + if counts[t] > 0 { + with = append(with, jobspec.Resource{Type: t, Count: counts[t]}) } } diff --git a/pkg/placement/placement_test.go b/pkg/placement/placement_test.go index 518904d..2def510 100644 --- a/pkg/placement/placement_test.go +++ b/pkg/placement/placement_test.go @@ -1,55 +1,99 @@ package placement import ( - "strings" "testing" + "github.com/converged-computing/fluence/pkg/jobspec" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -func cpuPod(name string, cpu, gpu int64) corev1.Pod { - req := corev1.ResourceList{corev1.ResourceCPU: *resource.NewQuantity(cpu, resource.DecimalSI)} - if gpu > 0 { - req["nvidia.com/gpu"] = *resource.NewQuantity(gpu, resource.DecimalSI) - } +func podWith(name string, req corev1.ResourceList) corev1.Pod { return corev1.Pod{ ObjectMeta: metav1.ObjectMeta{Name: name}, Spec: corev1.PodSpec{Containers: []corev1.Container{{Resources: corev1.ResourceRequirements{Requests: req}}}}, } } -func TestJobspecForGroupClassical(t *testing.T) { - pods := []corev1.Pod{cpuPod("p0", 4, 1), cpuPod("p1", 4, 1), cpuPod("p2", 4, 1)} +func qty(n int64) resource.Quantity { return *resource.NewQuantity(n, resource.DecimalSI) } + +// withType returns the count for a given Fluxion type in the slot's `with`. +func withType(js *jobspec.Jobspec, t string) (int, bool) { + for _, w := range js.Resources[0].With { + if w.Type == t { + return w.Count, true + } + } + return 0, false +} + +func TestClassical(t *testing.T) { + pods := []corev1.Pod{ + podWith("p0", corev1.ResourceList{corev1.ResourceCPU: qty(4), "nvidia.com/gpu": qty(1)}), + podWith("p1", corev1.ResourceList{corev1.ResourceCPU: qty(4), "nvidia.com/gpu": qty(1)}), + } js, err := JobspecForGroup("grp", pods) if err != nil { t.Fatal(err) } - if js.Resources[0].Count != 3 { - t.Fatalf("slot count = %d, want 3", js.Resources[0].Count) + if js.Resources[0].Count != 2 { + t.Fatalf("slot count = %d, want 2", js.Resources[0].Count) + } + if c, _ := withType(js, "core"); c != 4 { + t.Errorf("core = %d, want 4", c) + } + if c, _ := withType(js, "gpu"); c != 1 { + t.Errorf("gpu = %d, want 1", c) + } + if _, ok := withType(js, "qpu"); ok { + t.Error("classical pod should not request qpu") + } +} + +func TestGenericQuantumCount(t *testing.T) { + // fluxion.flux-framework.org/qpu: 1 -> a qpu count, with no per-type code. + p := podWith("q", corev1.ResourceList{FluxionResourcePrefix + "qpu": qty(1)}) + js, err := JobspecForGroup("qgrp", []corev1.Pod{p}) + if err != nil { + t.Fatal(err) + } + if c, ok := withType(js, "qpu"); !ok || c != 1 { + t.Fatalf("qpu = %d (ok=%v), want 1", c, ok) } - y, _ := js.YAML() - if !strings.Contains(y, "core") || !strings.Contains(y, "gpu") { - t.Fatalf("missing core/gpu:\n%s", y) + // no classical core forced on an exotic-only request + if _, ok := withType(js, "core"); ok { + t.Error("quantum-only pod should not be forced to request a core") } } -func TestJobspecForGroupQuantum(t *testing.T) { - q := corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{Name: "q0"}, - Spec: corev1.PodSpec{Containers: []corev1.Container{{ - Resources: corev1.ResourceRequirements{Requests: corev1.ResourceList{ - QuantumResource: *resource.NewQuantity(1, resource.DecimalSI), - }}, - }}}, - } - js, err := JobspecForGroup("qgrp", []corev1.Pod{q}) +func TestGenericQubitCount(t *testing.T) { + // "at least 156 qubits" expressed as a count (Fluxion count match is >=). + p := podWith("q", corev1.ResourceList{FluxionResourcePrefix + "qubit": qty(156)}) + js, err := JobspecForGroup("qubits", []corev1.Pod{p}) if err != nil { t.Fatal(err) } - if js.Resources[0].With[0].Type != "qpu" { - t.Fatalf("want qpu, got %+v", js.Resources[0].With) + if c, ok := withType(js, "qubit"); !ok || c != 156 { + t.Fatalf("qubit = %d (ok=%v), want 156", c, ok) + } +} + +func TestHybrid(t *testing.T) { + // cores AND a qpu in the same pod -> both appear in the slot. + p := podWith("h", corev1.ResourceList{ + corev1.ResourceCPU: qty(2), + FluxionResourcePrefix + "qpu": qty(1), + }) + js, err := JobspecForGroup("hyb", []corev1.Pod{p}) + if err != nil { + t.Fatal(err) + } + if c, _ := withType(js, "core"); c != 2 { + t.Errorf("core = %d, want 2", c) + } + if c, _ := withType(js, "qpu"); c != 1 { + t.Errorf("qpu = %d, want 1", c) } } @@ -70,3 +114,23 @@ func TestPlacementFromAllocation(t *testing.T) { t.Fatalf("backend = %q", p.Backend) } } + +func TestPlacementQuantumOnly(t *testing.T) { + // A pure-quantum allocation has a qpu (under qgateway) but NO node vertex. + // Nodes must be empty and Backend set — fluence then imposes no node constraint. + alloc := `{"graph":{"nodes":[ + {"metadata":{"type":"cluster","name":"kind"}}, + {"metadata":{"type":"qgateway","name":"qgateway0"}}, + {"metadata":{"type":"qpu","name":"ibm_marrakesh"}}, + {"metadata":{"type":"qubit","name":"qubit0"}}]}}` + p, err := PlacementFromAllocation(alloc) + if err != nil { + t.Fatal(err) + } + if len(p.Nodes) != 0 { + t.Fatalf("quantum-only allocation should have no nodes, got %v", p.Nodes) + } + if p.Backend != "ibm_marrakesh" { + t.Fatalf("backend = %q, want ibm_marrakesh", p.Backend) + } +} diff --git a/pkg/webhook/webhook.go b/pkg/webhook/webhook.go new file mode 100644 index 0000000..08e8364 --- /dev/null +++ b/pkg/webhook/webhook.go @@ -0,0 +1,201 @@ +// Package webhook is fluence's mutating admission webhook. Its job is to make +// scheduler-chosen values reach a pod's containers without the user wiring +// anything. Container env is immutable after a pod is created, so the scheduler +// cannot write it directly; instead this webhook injects, at pod-creation time, +// a downward-API env that reads an annotation the scheduler fills in later +// (during PreBind). The user writes a plain pod; the plumbing is automatic. +// +// Current rule: for a pod scheduled by fluence whose container requests a +// fluxion.flux-framework.org/* resource, inject QRMI_BACKEND sourced from the +// fluence backend annotation. New mutation rules can be added in Mutate. +// +// The webhook also manages its own TLS: it generates a self-signed CA + serving +// certificate at startup and patches its MutatingWebhookConfiguration's caBundle, +// so the install needs no cert-manager and no committed keys. +package webhook + +import ( + "context" + "crypto/rand" + "crypto/rsa" + "crypto/x509" + "crypto/x509/pkix" + "encoding/base64" + "encoding/json" + "encoding/pem" + "fmt" + "io" + "math/big" + "net/http" + "strings" + "time" + + "github.com/converged-computing/fluence/pkg/placement" + + admissionv1 "k8s.io/api/admission/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes" +) + +// SchedulerName is the scheduler whose pods this webhook mutates. +const SchedulerName = "fluence" + +// jsonPatchOp is a single RFC 6902 JSON Patch operation. +type jsonPatchOp struct { + Op string `json:"op"` + Path string `json:"path"` + Value any `json:"value,omitempty"` +} + +// backendEnv is the downward-API env injected into quantum containers. To the +// app it is an ordinary env var; its value comes from the fluence backend +// annotation, which the scheduler sets in PreBind. +func backendEnv() corev1.EnvVar { + return corev1.EnvVar{ + Name: "QRMI_BACKEND", + ValueFrom: &corev1.EnvVarSource{ + FieldRef: &corev1.ObjectFieldSelector{ + FieldPath: fmt.Sprintf("metadata.annotations['%s']", placement.BackendAnnotation), + }, + }, + } +} + +// Mutate returns the JSON Patch operations for a pod, or nil if nothing applies. +func Mutate(pod *corev1.Pod) []jsonPatchOp { + if pod.Spec.SchedulerName != SchedulerName { + return nil + } + var ops []jsonPatchOp + for i, c := range pod.Spec.Containers { + if !requestsFluxionResource(c) || hasEnv(c, "QRMI_BACKEND") { + continue + } + if len(c.Env) == 0 { + ops = append(ops, jsonPatchOp{ + Op: "add", + Path: fmt.Sprintf("/spec/containers/%d/env", i), + Value: []corev1.EnvVar{backendEnv()}, + }) + } else { + ops = append(ops, jsonPatchOp{ + Op: "add", + Path: fmt.Sprintf("/spec/containers/%d/env/-", i), + Value: backendEnv(), + }) + } + } + return ops +} + +func requestsFluxionResource(c corev1.Container) bool { + for name := range c.Resources.Requests { + if strings.HasPrefix(string(name), placement.FluxionResourcePrefix) { + return true + } + } + return false +} + +func hasEnv(c corev1.Container, name string) bool { + for _, e := range c.Env { + if e.Name == name { + return true + } + } + return false +} + +// Handler is the /mutate endpoint. It always admits the pod (failure to mutate +// must not block creation); it only adds a patch when Mutate returns one. +func Handler(w http.ResponseWriter, r *http.Request) { + body, err := io.ReadAll(r.Body) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + var review admissionv1.AdmissionReview + if err := json.Unmarshal(body, &review); err != nil || review.Request == nil { + http.Error(w, "bad admission review", http.StatusBadRequest) + return + } + + resp := &admissionv1.AdmissionResponse{UID: review.Request.UID, Allowed: true} + var pod corev1.Pod + if err := json.Unmarshal(review.Request.Object.Raw, &pod); err == nil { + if ops := Mutate(&pod); len(ops) > 0 { + if patch, err := json.Marshal(ops); err == nil { + pt := admissionv1.PatchTypeJSONPatch + resp.Patch = patch + resp.PatchType = &pt + } + } + } + + out := admissionv1.AdmissionReview{TypeMeta: review.TypeMeta, Response: resp} + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(out) +} + +// GenerateCerts returns a self-signed CA (PEM) and a serving cert+key (PEM) valid +// for the given DNS names. The CA PEM is what the apiserver must trust (caBundle). +func GenerateCerts(dnsNames []string) (caPEM, certPEM, keyPEM []byte, err error) { + caKey, err := rsa.GenerateKey(rand.Reader, 2048) + if err != nil { + return nil, nil, nil, err + } + caTmpl := &x509.Certificate{ + SerialNumber: big.NewInt(1), + Subject: pkix.Name{CommonName: "fluence-webhook-ca"}, + NotBefore: time.Now().Add(-time.Hour), + NotAfter: time.Now().AddDate(10, 0, 0), + IsCA: true, + KeyUsage: x509.KeyUsageCertSign | x509.KeyUsageDigitalSignature, + BasicConstraintsValid: true, + } + caDER, err := x509.CreateCertificate(rand.Reader, caTmpl, caTmpl, &caKey.PublicKey, caKey) + if err != nil { + return nil, nil, nil, err + } + caCert, err := x509.ParseCertificate(caDER) + if err != nil { + return nil, nil, nil, err + } + + leafKey, err := rsa.GenerateKey(rand.Reader, 2048) + if err != nil { + return nil, nil, nil, err + } + leafTmpl := &x509.Certificate{ + SerialNumber: big.NewInt(2), + Subject: pkix.Name{CommonName: dnsNames[0]}, + NotBefore: time.Now().Add(-time.Hour), + NotAfter: time.Now().AddDate(10, 0, 0), + KeyUsage: x509.KeyUsageDigitalSignature | x509.KeyUsageKeyEncipherment, + ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth}, + DNSNames: dnsNames, + } + leafDER, err := x509.CreateCertificate(rand.Reader, leafTmpl, caCert, &leafKey.PublicKey, caKey) + if err != nil { + return nil, nil, nil, err + } + + caPEM = pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: caDER}) + certPEM = pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: leafDER}) + keyPEM = pem.EncodeToMemory(&pem.Block{Type: "RSA PRIVATE KEY", Bytes: x509.MarshalPKCS1PrivateKey(leafKey)}) + return caPEM, certPEM, keyPEM, nil +} + +// EnsureCABundle patches the named MutatingWebhookConfiguration so its first +// webhook trusts caPEM. +func EnsureCABundle(ctx context.Context, client kubernetes.Interface, configName string, caPEM []byte) error { + patch := fmt.Sprintf( + `[{"op":"replace","path":"/webhooks/0/clientConfig/caBundle","value":%q}]`, + base64.StdEncoding.EncodeToString(caPEM), + ) + _, err := client.AdmissionregistrationV1().MutatingWebhookConfigurations().Patch( + ctx, configName, types.JSONPatchType, []byte(patch), metav1.PatchOptions{}) + return err +} diff --git a/pkg/webhook/webhook_test.go b/pkg/webhook/webhook_test.go new file mode 100644 index 0000000..c5a164c --- /dev/null +++ b/pkg/webhook/webhook_test.go @@ -0,0 +1,56 @@ +package webhook + +import ( + "testing" + + "github.com/converged-computing/fluence/pkg/placement" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" +) + +func qpuPod(scheduler string, withEnv bool) *corev1.Pod { + c := corev1.Container{ + Name: "app", + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + placement.FluxionResourcePrefix + "qpu": *resource.NewQuantity(1, resource.DecimalSI), + }, + }, + } + if withEnv { + c.Env = []corev1.EnvVar{{Name: "QRMI_BACKEND", Value: "preset"}} + } + return &corev1.Pod{Spec: corev1.PodSpec{SchedulerName: scheduler, Containers: []corev1.Container{c}}} +} + +func TestMutateInjectsBackendEnv(t *testing.T) { + ops := Mutate(qpuPod("fluence", false)) + if len(ops) != 1 { + t.Fatalf("want 1 op, got %d", len(ops)) + } + if ops[0].Path != "/spec/containers/0/env" { + t.Errorf("path = %q", ops[0].Path) + } +} + +func TestMutateSkipsOtherScheduler(t *testing.T) { + if ops := Mutate(qpuPod("default-scheduler", false)); ops != nil { + t.Fatalf("non-fluence pod should not be mutated, got %v", ops) + } +} + +func TestMutateRespectsExistingEnv(t *testing.T) { + if ops := Mutate(qpuPod("fluence", true)); ops != nil { + t.Fatalf("should not override an existing QRMI_BACKEND, got %v", ops) + } +} + +func TestMutateSkipsNonQuantum(t *testing.T) { + p := &corev1.Pod{Spec: corev1.PodSpec{ + SchedulerName: "fluence", + Containers: []corev1.Container{{Name: "c", Resources: corev1.ResourceRequirements{Requests: corev1.ResourceList{corev1.ResourceCPU: *resource.NewQuantity(1, resource.DecimalSI)}}}}, + }} + if ops := Mutate(p); ops != nil { + t.Fatalf("classical pod should not be mutated, got %v", ops) + } +}