diff --git a/api/v1alpha1/committed_resource_types.go b/api/v1alpha1/committed_resource_types.go index 44eab1f0e..2443dce80 100644 --- a/api/v1alpha1/committed_resource_types.go +++ b/api/v1alpha1/committed_resource_types.go @@ -193,6 +193,23 @@ type CommittedResource struct { Status CommittedResourceStatus `json:"status,omitempty,omitzero"` } +// IsActive reports whether the commitment spec has active Reservation slots +// (state is confirmed or guaranteed). +func (s *CommittedResourceSpec) IsActive() bool { + return s.State == CommitmentStatusConfirmed || s.State == CommitmentStatusGuaranteed +} + +// IsActive reports whether the commitment has active Reservation slots +// (state is confirmed or guaranteed). +func (c *CommittedResource) IsActive() bool { + return c.Spec.IsActive() +} + +// MatchesGroup reports whether the commitment targets the given project and flavor group. +func (c *CommittedResource) MatchesGroup(projectID, flavorGroup string) bool { + return c.Spec.ProjectID == projectID && c.Spec.FlavorGroupName == flavorGroup +} + // +kubebuilder:object:root=true // CommittedResourceList contains a list of CommittedResource diff --git a/api/v1alpha1/committed_resource_types_test.go b/api/v1alpha1/committed_resource_types_test.go new file mode 100644 index 000000000..2571a9a66 --- /dev/null +++ b/api/v1alpha1/committed_resource_types_test.go @@ -0,0 +1,70 @@ +// Copyright SAP SE +// SPDX-License-Identifier: Apache-2.0 + +package v1alpha1 + +import "testing" + +func TestCommittedResourceSpec_IsActive(t *testing.T) { + tests := []struct { + state CommitmentStatus + want bool + }{ + {CommitmentStatusConfirmed, true}, + {CommitmentStatusGuaranteed, true}, + {CommitmentStatusPlanned, false}, + {CommitmentStatusPending, false}, + {CommitmentStatusSuperseded, false}, + {CommitmentStatusExpired, false}, + } + for _, tt := range tests { + spec := CommittedResourceSpec{State: tt.state} + if got := spec.IsActive(); got != tt.want { + t.Errorf("IsActive() with state %q = %v, want %v", tt.state, got, tt.want) + } + } +} + +func TestCommittedResource_IsActive(t *testing.T) { + tests := []struct { + state CommitmentStatus + want bool + }{ + {CommitmentStatusConfirmed, true}, + {CommitmentStatusGuaranteed, true}, + {CommitmentStatusPlanned, false}, + {CommitmentStatusPending, false}, + {CommitmentStatusSuperseded, false}, + {CommitmentStatusExpired, false}, + } + for _, tt := range tests { + cr := CommittedResource{Spec: CommittedResourceSpec{State: tt.state}} + if got := cr.IsActive(); got != tt.want { + t.Errorf("IsActive() with state %q = %v, want %v", tt.state, got, tt.want) + } + } +} + +func TestCommittedResource_MatchesGroup(t *testing.T) { + cr := CommittedResource{ + Spec: CommittedResourceSpec{ + ProjectID: "proj-1", + FlavorGroupName: "kvm_v2", + }, + } + tests := []struct { + project string + group string + want bool + }{ + {"proj-1", "kvm_v2", true}, + {"proj-X", "kvm_v2", false}, + {"proj-1", "kvm_v3", false}, + {"proj-X", "kvm_v3", false}, + } + for _, tt := range tests { + if got := cr.MatchesGroup(tt.project, tt.group); got != tt.want { + t.Errorf("MatchesGroup(%q, %q) = %v, want %v", tt.project, tt.group, got, tt.want) + } + } +} diff --git a/api/v1alpha1/reservation_types.go b/api/v1alpha1/reservation_types.go index 988d4b97d..4ec3ee9c5 100644 --- a/api/v1alpha1/reservation_types.go +++ b/api/v1alpha1/reservation_types.go @@ -265,6 +265,11 @@ type ReservationList struct { Items []Reservation `json:"items"` } +// MatchesGroup reports whether the reservation targets the given project and resource group. +func (s *CommittedResourceReservationSpec) MatchesGroup(projectID, resourceGroup string) bool { + return s.ProjectID == projectID && s.ResourceGroup == resourceGroup +} + // IsReady returns true if the reservation has the Ready condition set to True. func (r *Reservation) IsReady() bool { return meta.IsStatusConditionTrue(r.Status.Conditions, ReservationConditionReady) diff --git a/api/v1alpha1/reservation_types_test.go b/api/v1alpha1/reservation_types_test.go new file mode 100644 index 000000000..63019d56e --- /dev/null +++ b/api/v1alpha1/reservation_types_test.go @@ -0,0 +1,28 @@ +// Copyright SAP SE +// SPDX-License-Identifier: Apache-2.0 + +package v1alpha1 + +import "testing" + +func TestCommittedResourceReservationSpec_MatchesGroup(t *testing.T) { + spec := CommittedResourceReservationSpec{ + ProjectID: "proj-1", + ResourceGroup: "kvm_v2", + } + tests := []struct { + project string + group string + want bool + }{ + {"proj-1", "kvm_v2", true}, + {"proj-X", "kvm_v2", false}, + {"proj-1", "kvm_v3", false}, + {"proj-X", "kvm_v3", false}, + } + for _, tt := range tests { + if got := spec.MatchesGroup(tt.project, tt.group); got != tt.want { + t.Errorf("MatchesGroup(%q, %q) = %v, want %v", tt.project, tt.group, got, tt.want) + } + } +} diff --git a/cmd/manager/main.go b/cmd/manager/main.go index c5d5fa323..0c4e82505 100644 --- a/cmd/manager/main.go +++ b/cmd/manager/main.go @@ -395,10 +395,16 @@ func main() { commitmentsAPI.Init(mux, metrics.Registry, ctrl.Log.WithName("commitments-api")) if slices.Contains(mainConfig.EnabledControllers, "nova-pipeline-controllers") { + featureGates := conf.GetConfigOrDie[conf.FeatureGates]() // Filter-weigher pipeline controller setup. filterWeigherController := &nova.FilterWeigherPipelineController{ - Monitor: filterWeigherPipelineMonitor, + Monitor: filterWeigherPipelineMonitor, + FeatureGates: featureGates, + NoHostFoundCounter: nova.NewNoHostFoundCounter(), + PlacementCounter: nova.NewPlacementCounter(), } + metrics.Registry.MustRegister(filterWeigherController.NoHostFoundCounter) + metrics.Registry.MustRegister(filterWeigherController.PlacementCounter) // Inferred through the base controller. filterWeigherController.Client = multiclusterClient if err := filterWeigherController.SetupWithManager(mgr, multiclusterClient); err != nil { diff --git a/helm/bundles/cortex-nova/values.yaml b/helm/bundles/cortex-nova/values.yaml index 22dfb6cc8..bc5451e96 100644 --- a/helm/bundles/cortex-nova/values.yaml +++ b/helm/bundles/cortex-nova/values.yaml @@ -136,15 +136,21 @@ cortex-scheduling-controllers: - failover-reservations-controller - quota-controller - capacity-controller + enabledTasks: + - nova-history-cleanup-task + - commitments-sync-task # required for committed resources + # Feature gates control optional capabilities within running components. + featureGates: + # Enables VM allocation write-back into Reservation slots and no-host-found + # classification by committed resource coverage. Enable only on deployments + # that use committed resources. Requires also enabling of CR controllers and tasks + committedResourceTracking: false # Pipeline used for the empty-state capacity probe (ignores allocations and reservations). capacityTotalPipeline: "kvm-report-capacity" # Pipeline used for the current-state capacity probe (considers current VM allocations). capacityPlaceablePipeline: "kvm-general-purpose-load-balancing" # How often the capacity controller re-runs its scheduler probes. capacityReconcileInterval: 5m - enabledTasks: - - nova-history-cleanup-task - - commitments-sync-task # If true, the external scheduler API will limit the list of hosts in its # response to those included in the scheduling request. novaLimitHostsToRequest: true diff --git a/internal/scheduling/nova/cr_allocation.go b/internal/scheduling/nova/cr_allocation.go new file mode 100644 index 000000000..5c3ffb2a5 --- /dev/null +++ b/internal/scheduling/nova/cr_allocation.go @@ -0,0 +1,243 @@ +// Copyright SAP SE +// SPDX-License-Identifier: Apache-2.0 + +package nova + +import ( + "context" + "errors" + "fmt" + + api "github.com/cobaltcore-dev/cortex/api/external/nova" + "github.com/cobaltcore-dev/cortex/api/v1alpha1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/util/retry" + + "github.com/cobaltcore-dev/cortex/internal/knowledge/extractor/plugins/compute" + "github.com/cobaltcore-dev/cortex/internal/scheduling/reservations" + hv1 "github.com/cobaltcore-dev/openstack-hypervisor-operator/api/v1" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +// recordCRAllocation writes the placed VM UUID into the matching Reservation +// Spec.CommittedResourceReservation.Allocations after a successful Nova placement. +// Best-effort: any failure is logged but never propagated to the caller. +func (c *FilterWeigherPipelineController) recordCRAllocation(ctx context.Context, decision *v1alpha1.Decision, request api.ExternalSchedulerRequest) { + log := ctrl.LoggerFrom(ctx) + + instanceUUID := request.Spec.Data.InstanceUUID + projectID := request.Context.ProjectID + flavorName := request.Spec.Data.Flavor.Data.Name + selectedHost := *decision.Status.Result.TargetHost + intent := string(decision.Spec.Intent) + + flavorGroupName, flavorInGroup, err := c.resolveFlavorGroup(ctx, flavorName) + if err != nil { + if errors.Is(err, errFlavorNotInGroup) { + log.V(1).Info("CR allocation: flavor not in any group, PAYG placement", "flavor", flavorName) + } else { + log.Error(err, "CR allocation: failed to resolve flavor group", + "flavor", flavorName, "instanceUUID", instanceUUID) + if c.PlacementCounter != nil { + c.PlacementCounter.WithLabelValues("unknown", intent, "error").Inc() + } + } + return + } + + evaluator, err := BuildCRSlotEvaluator(ctx, c.Client) + if err != nil { + log.Error(err, "CR allocation: failed to build CR slot evaluator", "instanceUUID", instanceUUID) + return + } + + var crList v1alpha1.CommittedResourceList + if err := c.List(ctx, &crList); err != nil { + log.Error(err, "CR allocation: failed to list committed resources", "instanceUUID", instanceUUID) + return + } + var activeCRs []v1alpha1.CommittedResource + for _, cr := range crList.Items { + if !cr.MatchesGroup(projectID, flavorGroupName) || !cr.IsActive() { + continue + } + activeCRs = append(activeCRs, cr) + } + + candidateHosts := decision.Status.Result.OrderedHosts + crOutcome := classifyCRPlacement(evaluator, activeCRs, candidateHosts, projectID, flavorGroupName) + + log.V(1).Info("CR allocation: placement classified", + "cr_outcome", crOutcome, + "instanceUUID", instanceUUID, + "host", selectedHost, + "projectID", projectID, + "flavorGroup", flavorGroupName, + ) + if c.PlacementCounter != nil { + c.PlacementCounter.WithLabelValues(flavorGroupName, intent, crOutcome).Inc() + } + + if crOutcome != "slot_used" { + return + } + + slotsOnTarget := evaluator.SlotsForHost(selectedHost, projectID, flavorGroupName) + + // Idempotency: if this VM UUID is already recorded in any slot, the work is done. + for _, slot := range slotsOnTarget { + if _, exists := slot.Spec.CommittedResourceReservation.Allocations[instanceUUID]; exists { + log.Info("CR allocation: VM UUID already in reservation, skipping", + "instanceUUID", instanceUUID, "reservation", slot.Name) + return + } + } + + vmMemoryBytes := int64(flavorInGroup.MemoryMB) * 1024 * 1024 //nolint:gosec // flavor memory bounded by specs + vmCPUs := int64(flavorInGroup.VCPUs) //nolint:gosec // VCPUs bounded by specs + + slotName := pickReservationSlot(slotsOnTarget, vmMemoryBytes) + if slotName == "" { + log.V(1).Info("CR allocation: slot_used but target host has no slot with remaining capacity", + "instanceUUID", instanceUUID, "host", selectedHost) + return + } + + log.Info("CR allocation: writing VM UUID into reservation", + "instanceUUID", instanceUUID, "reservation", slotName, + "projectID", projectID, "flavorGroup", flavorGroupName, "host", selectedHost) + + vmResources := map[hv1.ResourceName]resource.Quantity{ + hv1.ResourceMemory: *resource.NewQuantity(vmMemoryBytes, resource.BinarySI), + hv1.ResourceCPU: *resource.NewQuantity(vmCPUs, resource.DecimalSI), + } + if retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error { + latest := &v1alpha1.Reservation{} + if err := c.Get(ctx, client.ObjectKey{Name: slotName}, latest); err != nil { + return err + } + if latest.Spec.CommittedResourceReservation == nil { + return fmt.Errorf("reservation %s lost CommittedResourceReservation spec", slotName) + } + base := latest.DeepCopy() + if latest.Spec.CommittedResourceReservation.Allocations == nil { + latest.Spec.CommittedResourceReservation.Allocations = make(map[string]v1alpha1.CommittedResourceAllocation) + } + latest.Spec.CommittedResourceReservation.Allocations[instanceUUID] = v1alpha1.CommittedResourceAllocation{ + CreationTimestamp: metav1.Now(), + Resources: vmResources, + } + return c.Patch(ctx, latest, client.MergeFrom(base)) + }); retryErr != nil { + log.Error(retryErr, "CR allocation: failed to patch reservation", + "reservation", slotName, "instanceUUID", instanceUUID) + return + } + + log.Info("CR allocation: done", "instanceUUID", instanceUUID, "reservation", slotName) +} + +// classifyCRPlacement determines the CR slot outcome for a successful placement: +// - no_cr: no active CR or CR capacity fully exhausted (H1) +// - slot_missed: CR has remaining capacity but no candidate host has a slot with remaining > 0 (H2) +// - slot_used: CR has remaining capacity and at least one candidate host has a slot with remaining > 0 (H3) +func classifyCRPlacement( + evaluator *CRSlotEvaluator, + activeCRs []v1alpha1.CommittedResource, + candidateHosts []string, + projectID, flavorGroupName string, +) string { + + if len(activeCRs) == 0 { + return "no_cr" + } + totalCapacity := resource.Quantity{} + totalUsed := resource.Quantity{} + for _, cr := range activeCRs { + totalCapacity.Add(cr.Spec.Amount) + if used, ok := cr.Status.UsedResources["memory"]; ok { + totalUsed.Add(used) + } + } + if totalUsed.Cmp(totalCapacity) >= 0 { + return "no_cr" + } + for _, host := range candidateHosts { + for _, slot := range evaluator.SlotsForHost(host, projectID, flavorGroupName) { + if reservationRemainingMemory(slot) > 0 { + return "slot_used" + } + } + } + return "slot_missed" +} + +// pickReservationSlot selects the best reservation slot for a new VM. +// A slot is usable if it has any remaining memory (overfill is allowed: the VM +// may exceed the slot's remaining capacity, with the overflow covered by the +// host's free capacity which the pipeline already verified). +// Selection: maximise coverage (min(remMem, vmMemoryBytes)), tiebreak by +// smallest remaining memory (tightest fit), then reservation name. +// Returns the slot name, or "" if no slot has any remaining memory. +func pickReservationSlot(candidates []v1alpha1.Reservation, vmMemoryBytes int64) string { + bestName := "" + var bestCoverage, bestRemMem int64 + + for _, res := range candidates { + remMem := reservationRemainingMemory(res) + if remMem <= 0 { + continue + } + coverage := min(remMem, vmMemoryBytes) + + if bestName == "" || + coverage > bestCoverage || + (coverage == bestCoverage && remMem < bestRemMem) || + (coverage == bestCoverage && remMem == bestRemMem && res.Name < bestName) { + bestName = res.Name + bestCoverage = coverage + bestRemMem = remMem + } + } + + return bestName +} + +// reservationRemainingMemory returns how many bytes of memory remain +// unallocated in a reservation slot. Returns 0 if the slot is full or nil. +func reservationRemainingMemory(res v1alpha1.Reservation) int64 { + cr := res.Spec.CommittedResourceReservation + if cr == nil { + return 0 + } + totalMemQ := res.Spec.Resources[hv1.ResourceMemory] + var usedMem int64 + for _, alloc := range cr.Allocations { + allocMemQ := alloc.Resources[hv1.ResourceMemory] + usedMem += allocMemQ.Value() + } + return max(totalMemQ.Value()-usedMem, 0) +} + +// errFlavorNotInGroup is returned by resolveFlavorGroup when the flavor is not +// part of any configured flavor group (PAYG placement). Callers should +// distinguish this from real lookup errors. +var errFlavorNotInGroup = errors.New("flavor not in any group") + +// resolveFlavorGroup looks up which flavor group the given flavor belongs to. +// Returns errFlavorNotInGroup (PAYG) if the flavor is not in any group. +// Returns a different error for transient failures (Knowledge CRD unavailable, etc). +func (c *FilterWeigherPipelineController) resolveFlavorGroup(ctx context.Context, flavorName string) (string, *compute.FlavorInGroup, error) { + fgClient := reservations.FlavorGroupKnowledgeClient{Client: c.Client} + flavorGroups, err := fgClient.GetAllFlavorGroups(ctx, nil) + if err != nil { + return "", nil, err + } + groupName, flavor, err := reservations.FindFlavorInGroups(flavorName, flavorGroups) + if err != nil { + return "", nil, errFlavorNotInGroup + } + return groupName, flavor, nil +} diff --git a/internal/scheduling/nova/cr_evaluator.go b/internal/scheduling/nova/cr_evaluator.go new file mode 100644 index 000000000..81f61ffc6 --- /dev/null +++ b/internal/scheduling/nova/cr_evaluator.go @@ -0,0 +1,109 @@ +// Copyright SAP SE +// SPDX-License-Identifier: Apache-2.0 + +package nova + +import ( + "context" + + "github.com/cobaltcore-dev/cortex/api/v1alpha1" + hv1 "github.com/cobaltcore-dev/openstack-hypervisor-operator/api/v1" + "k8s.io/apimachinery/pkg/api/resource" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +// CRSlotEvaluator provides post-pipeline evaluation of CR reservation slots. +// Build once per request from HV and Reservation CRDs; query without further reads. +type CRSlotEvaluator struct { + // hvFreeMemory is (EffectiveCapacity - Allocation).memory per host, before reservation blocks. + hvFreeMemory map[string]int64 + // reservationsByHost maps host name to its ready CR reservation slots. + reservationsByHost map[string][]v1alpha1.Reservation +} + +// BuildCRSlotEvaluator lists HV CRDs and CR Reservation CRDs once and returns an evaluator +// that can answer slot-usability queries without further K8s reads. +func BuildCRSlotEvaluator(ctx context.Context, c client.Client) (*CRSlotEvaluator, error) { + eval := &CRSlotEvaluator{ + hvFreeMemory: make(map[string]int64), + reservationsByHost: make(map[string][]v1alpha1.Reservation), + } + + var hvList hv1.HypervisorList + if err := c.List(ctx, &hvList); err != nil { + return nil, err + } + for _, hv := range hvList.Items { + var capacityMap map[hv1.ResourceName]resource.Quantity + if hv.Status.EffectiveCapacity != nil { + capacityMap = hv.Status.EffectiveCapacity + } else { + capacityMap = hv.Status.Capacity + } + effectiveMemQ := capacityMap[hv1.ResourceMemory] + allocMemQ := hv.Status.Allocation[hv1.ResourceMemory] + effectiveMem := effectiveMemQ.Value() + allocMem := allocMemQ.Value() + eval.hvFreeMemory[hv.Name] = max(effectiveMem-allocMem, 0) + } + + var resList v1alpha1.ReservationList + if err := c.List(ctx, &resList, + client.MatchingLabels{v1alpha1.LabelReservationType: v1alpha1.ReservationTypeLabelCommittedResource}, + ); err != nil { + return nil, err + } + for _, res := range resList.Items { + if !res.IsReady() { + continue + } + if res.Spec.CommittedResourceReservation == nil { + continue + } + host := res.Spec.TargetHost + eval.reservationsByHost[host] = append(eval.reservationsByHost[host], res) + } + + return eval, nil +} + +// SlotsForHost returns all CR reservation slots on hostName matching projectID + flavorGroup. +func (e *CRSlotEvaluator) SlotsForHost(hostName, projectID, flavorGroup string) []v1alpha1.Reservation { + var result []v1alpha1.Reservation + for _, res := range e.reservationsByHost[hostName] { + cr := res.Spec.CommittedResourceReservation + if cr.MatchesGroup(projectID, flavorGroup) { + result = append(result, res) + } + } + return result +} + +// HasUsableSlot reports whether hostName has at least one CR slot that can accommodate +// a VM of vmMemBytes under the overfill model: +// +// slot.remaining + host.base_free >= vmMemBytes +// +// where host.base_free = hvFreeMemory[host] - sum(all reservation blocks on host). +// On happy-path candidates the pipeline already guarantees host capacity, so this +// simplifies to slot.remaining > 0 — but the full formula is evaluated regardless. +func (e *CRSlotEvaluator) HasUsableSlot(hostName, projectID, flavorGroup string, vmMemBytes int64) bool { + var allBlocks int64 + for _, res := range e.reservationsByHost[hostName] { + blockQ := res.Spec.Resources[hv1.ResourceMemory] + allBlocks += blockQ.Value() + } + hvFree := e.hvFreeMemory[hostName] + + for _, slot := range e.SlotsForHost(hostName, projectID, flavorGroup) { + slotRemaining := reservationRemainingMemory(slot) + if slotRemaining <= 0 { + continue + } + // host.base_free + slot.remaining = hvFree - allBlocks + slotRemaining + if hvFree-allBlocks+slotRemaining >= vmMemBytes { + return true + } + } + return false +} diff --git a/internal/scheduling/nova/cr_metrics.go b/internal/scheduling/nova/cr_metrics.go new file mode 100644 index 000000000..22f6eb34a --- /dev/null +++ b/internal/scheduling/nova/cr_metrics.go @@ -0,0 +1,133 @@ +// Copyright SAP SE +// SPDX-License-Identifier: Apache-2.0 + +package nova + +import ( + "context" + "errors" + + api "github.com/cobaltcore-dev/cortex/api/external/nova" + "github.com/cobaltcore-dev/cortex/api/v1alpha1" + "github.com/prometheus/client_golang/prometheus" + "k8s.io/apimachinery/pkg/api/resource" + + ctrl "sigs.k8s.io/controller-runtime" +) + +// NewNoHostFoundCounter creates the Prometheus counter for no-host-found classification. +// Register it with the metrics registry before assigning it to the controller. +func NewNoHostFoundCounter() *prometheus.CounterVec { + return prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_nova_no_host_found_total", + Help: "Nova no-host-found results classified by committed resource coverage (no_cr/cr_exhausted/slot_exhausted/slot_blocked/error).", + }, []string{"cr_slot", "flavor_group", "intent"}) +} + +// NewPlacementCounter creates the Prometheus counter for successful Nova placements. +// Labels: flavor_group, intent, cr_slot (no_cr/slot_missed/slot_used/error). PAYG placements +// (flavor not in any group) are not counted — they return before reaching this counter. +// cr_slot=error is emitted when the flavor group lookup fails due to a K8s error. +// Register it with the metrics registry before assigning it to the controller. +func NewPlacementCounter() *prometheus.CounterVec { + return prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_nova_placement_total", + Help: "Successful Nova placements by committed resource slot outcome.", + }, []string{"flavor_group", "intent", "cr_slot"}) +} + +// classifyNoHostFound determines why no host was found for a nova placement request, +// in terms of committed resource coverage: +// +// - no_cr: project has no active CommittedResources for the flavor group (NH1) +// - cr_exhausted: CommittedResources exist but are fully occupied (NH2) +// - slot_exhausted: CR has remaining capacity but no input host has a usable slot (NH3) +// - slot_blocked: a usable slot exists but scheduling constraints excluded all such hosts (NH4) +func classifyNoHostFound( + activeCRs []v1alpha1.CommittedResource, + evaluator *CRSlotEvaluator, + inputHosts []string, + projectID, flavorGroupName string, + vmMemBytes int64, +) string { + + if len(activeCRs) == 0 { + return "no_cr" + } + + totalCapacity := resource.Quantity{} + totalUsed := resource.Quantity{} + for _, cr := range activeCRs { + totalCapacity.Add(cr.Spec.Amount) + if used, ok := cr.Status.UsedResources["memory"]; ok { + totalUsed.Add(used) + } + } + if totalUsed.Cmp(totalCapacity) >= 0 { + return "cr_exhausted" + } + + for _, host := range inputHosts { + if evaluator.HasUsableSlot(host, projectID, flavorGroupName, vmMemBytes) { + return "slot_blocked" + } + } + return "slot_exhausted" +} + +// logNoHostFound classifies a no-host-found result and emits a log line and metric. +func (c *FilterWeigherPipelineController) logNoHostFound(ctx context.Context, decision *v1alpha1.Decision, request api.ExternalSchedulerRequest) { + log := ctrl.LoggerFrom(ctx) + + projectID := request.Context.ProjectID + flavorName := request.Spec.Data.Flavor.Data.Name + instanceUUID := request.Spec.Data.InstanceUUID + intent := decision.Spec.Intent + + flavorGroupName, flavorInGroup, err := c.resolveFlavorGroup(ctx, flavorName) + if err != nil { + if errors.Is(err, errFlavorNotInGroup) { + return // PAYG: flavor has no group, no metric + } + log.Error(err, "no-host-found: failed to resolve flavor group", + "instanceUUID", instanceUUID, "flavor", flavorName) + if c.NoHostFoundCounter != nil { + c.NoHostFoundCounter.WithLabelValues("error", "unknown", string(intent)).Inc() + } + return + } + + vmMemBytes := int64(flavorInGroup.MemoryMB) * 1024 * 1024 //nolint:gosec // flavor memory bounded by specs + + var crList v1alpha1.CommittedResourceList + if err := c.List(ctx, &crList); err != nil { + log.Error(err, "no-host-found: failed to list committed resources", "instanceUUID", instanceUUID) + return + } + var activeCRs []v1alpha1.CommittedResource + for _, cr := range crList.Items { + if !cr.MatchesGroup(projectID, flavorGroupName) || !cr.IsActive() { + continue + } + activeCRs = append(activeCRs, cr) + } + + evaluator, err := BuildCRSlotEvaluator(ctx, c.Client) + if err != nil { + log.Error(err, "no-host-found: failed to build CR slot evaluator", "instanceUUID", instanceUUID) + return + } + + noHostFoundCase := classifyNoHostFound(activeCRs, evaluator, request.GetHosts(), projectID, flavorGroupName, vmMemBytes) + + log.Info("no-host-found classified", + "cr_slot", noHostFoundCase, + "instanceUUID", instanceUUID, + "projectID", projectID, + "flavorGroup", flavorGroupName, + "intent", intent, + ) + if c.NoHostFoundCounter != nil { + c.NoHostFoundCounter.WithLabelValues(noHostFoundCase, flavorGroupName, string(intent)).Inc() + } +} diff --git a/internal/scheduling/nova/cr_metrics_test.go b/internal/scheduling/nova/cr_metrics_test.go new file mode 100644 index 000000000..e934ef5d4 --- /dev/null +++ b/internal/scheduling/nova/cr_metrics_test.go @@ -0,0 +1,612 @@ +// Copyright SAP SE +// SPDX-License-Identifier: Apache-2.0 + +package nova + +import ( + "context" + "encoding/json" + "fmt" + "testing" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + + api "github.com/cobaltcore-dev/cortex/api/external/nova" + "github.com/cobaltcore-dev/cortex/api/v1alpha1" + "github.com/cobaltcore-dev/cortex/internal/knowledge/extractor/plugins/compute" + "github.com/cobaltcore-dev/cortex/internal/scheduling/lib" + "github.com/cobaltcore-dev/cortex/pkg/conf" + hv1 "github.com/cobaltcore-dev/openstack-hypervisor-operator/api/v1" +) + +// makeCR builds a CommittedResource for testing. +func makeCR(state v1alpha1.CommitmentStatus, amountMiB, usedMiB int64) v1alpha1.CommittedResource { + cr := v1alpha1.CommittedResource{ + Spec: v1alpha1.CommittedResourceSpec{ + State: state, + Amount: *resource.NewQuantity(amountMiB*1024*1024, resource.BinarySI), + }, + } + if usedMiB > 0 { + cr.Status.UsedResources = map[string]resource.Quantity{ + "memory": *resource.NewQuantity(usedMiB*1024*1024, resource.BinarySI), + } + } + return cr +} + +// makeSlot builds a Reservation slot for testing. +func makeSlot(projectID, flavorGroup string, totalMemMiB, allocatedMemMiB int64) v1alpha1.Reservation { + res := v1alpha1.Reservation{ + ObjectMeta: metav1.ObjectMeta{Name: "slot-" + flavorGroup}, + Spec: v1alpha1.ReservationSpec{ + Resources: map[hv1.ResourceName]resource.Quantity{ + hv1.ResourceMemory: *resource.NewQuantity(totalMemMiB*1024*1024, resource.BinarySI), + }, + CommittedResourceReservation: &v1alpha1.CommittedResourceReservationSpec{ + ProjectID: projectID, + ResourceGroup: flavorGroup, + }, + }, + } + if allocatedMemMiB > 0 { + res.Spec.CommittedResourceReservation.Allocations = map[string]v1alpha1.CommittedResourceAllocation{ + "some-vm": { + Resources: map[hv1.ResourceName]resource.Quantity{ + hv1.ResourceMemory: *resource.NewQuantity(allocatedMemMiB*1024*1024, resource.BinarySI), + }, + }, + } + } + return res +} + +func TestClassifyNoHostFound(t *testing.T) { + const ( + proj = "project-1" + group = "kvm_v2_hana_s" + ) + const MiB = int64(1024 * 1024) + + // emptyEval has no hosts or slots. + emptyEval := &CRSlotEvaluator{ + hvFreeMemory: map[string]int64{}, + reservationsByHost: map[string][]v1alpha1.Reservation{}, + } + + // evalWithSlot builds an evaluator with a single slot on "host-1". + evalWithSlot := func(totalMiB, allocMiB int64) *CRSlotEvaluator { + slot := makeSlot(proj, group, totalMiB, allocMiB) + return &CRSlotEvaluator{ + hvFreeMemory: map[string]int64{"host-1": 16384 * MiB}, + reservationsByHost: map[string][]v1alpha1.Reservation{ + "host-1": {slot}, + }, + } + } + + tests := []struct { + name string + activeCRs []v1alpha1.CommittedResource + eval *CRSlotEvaluator + inputHosts []string + vmMemBytes int64 + expectedCase string + }{ + { + name: "no_cr: no active CRs for project+flavor group", + activeCRs: nil, + eval: emptyEval, + inputHosts: nil, + vmMemBytes: 4096 * MiB, + expectedCase: "no_cr", + }, + { + name: "cr_exhausted: CRs fully occupied (used == capacity)", + activeCRs: []v1alpha1.CommittedResource{ + makeCR(v1alpha1.CommitmentStatusConfirmed, 8192, 8192), + }, + eval: emptyEval, + inputHosts: []string{"host-1"}, + vmMemBytes: 4096 * MiB, + expectedCase: "cr_exhausted", + }, + { + name: "cr_exhausted: CRs fully occupied (used > capacity)", + activeCRs: []v1alpha1.CommittedResource{ + makeCR(v1alpha1.CommitmentStatusConfirmed, 8192, 10000), + }, + eval: emptyEval, + inputHosts: []string{"host-1"}, + vmMemBytes: 4096 * MiB, + expectedCase: "cr_exhausted", + }, + { + name: "cr_exhausted: multiple CRs, total used >= total capacity", + activeCRs: []v1alpha1.CommittedResource{ + makeCR(v1alpha1.CommitmentStatusConfirmed, 4096, 4096), + makeCR(v1alpha1.CommitmentStatusGuaranteed, 4096, 4096), + }, + eval: emptyEval, + inputHosts: []string{"host-1"}, + vmMemBytes: 4096 * MiB, + expectedCase: "cr_exhausted", + }, + { + name: "slot_exhausted: CRs have capacity but slot fully allocated", + activeCRs: []v1alpha1.CommittedResource{ + makeCR(v1alpha1.CommitmentStatusConfirmed, 8192, 4096), + }, + eval: evalWithSlot(8192, 8192), // slotRemaining=0 → skipped + inputHosts: []string{"host-1"}, + vmMemBytes: 4096 * MiB, + expectedCase: "slot_exhausted", + }, + { + name: "slot_exhausted: CRs have capacity, no slots at all", + activeCRs: []v1alpha1.CommittedResource{ + makeCR(v1alpha1.CommitmentStatusConfirmed, 8192, 0), + }, + eval: emptyEval, + inputHosts: []string{"host-1"}, + vmMemBytes: 4096 * MiB, + expectedCase: "slot_exhausted", + }, + { + name: "slot_blocked: free slot exists on input host", + activeCRs: []v1alpha1.CommittedResource{ + makeCR(v1alpha1.CommitmentStatusConfirmed, 8192, 4096), + }, + eval: evalWithSlot(8192, 4096), // 4096 MiB remaining; 16384-8192+4096=12288 >= 4096 + inputHosts: []string{"host-1"}, + vmMemBytes: 4096 * MiB, + expectedCase: "slot_blocked", + }, + { + name: "slot_blocked: overfill — slot smaller than VM is still usable", + activeCRs: []v1alpha1.CommittedResource{ + makeCR(v1alpha1.CommitmentStatusConfirmed, 8192, 4096), + }, + eval: evalWithSlot(8192, 6144), // 2048 MiB remaining; 16384-8192+2048=10240 >= 4096 + inputHosts: []string{"host-1"}, + vmMemBytes: 4096 * MiB, + expectedCase: "slot_blocked", + }, + { + name: "slot_exhausted: slots for other project ignored", + activeCRs: []v1alpha1.CommittedResource{ + makeCR(v1alpha1.CommitmentStatusConfirmed, 8192, 0), + }, + eval: &CRSlotEvaluator{ + hvFreeMemory: map[string]int64{"host-1": 16384 * MiB}, + reservationsByHost: map[string][]v1alpha1.Reservation{ + "host-1": {makeSlot("other-project", group, 8192, 0)}, + }, + }, + inputHosts: []string{"host-1"}, + vmMemBytes: 4096 * MiB, + expectedCase: "slot_exhausted", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := classifyNoHostFound(tt.activeCRs, tt.eval, tt.inputHosts, proj, group, tt.vmMemBytes) + if got != tt.expectedCase { + t.Errorf("classifyNoHostFound() = %q, want %q", got, tt.expectedCase) + } + }) + } +} + +func TestReservationRemainingMemory(t *testing.T) { + tests := []struct { + name string + totalMemMiB int64 + usedMemMiB int64 + wantBytes int64 + }{ + {"empty slot", 8192, 0, 8192 * 1024 * 1024}, + {"partially used", 8192, 4096, 4096 * 1024 * 1024}, + {"fully used", 8192, 8192, 0}, + {"over-allocated (clamped to zero)", 4096, 8192, 0}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + res := makeSlot("proj", "group", tt.totalMemMiB, tt.usedMemMiB) + got := reservationRemainingMemory(res) + if got != tt.wantBytes { + t.Errorf("reservationRemainingMemory() = %d, want %d", got, tt.wantBytes) + } + }) + } +} + +func TestLogNoHostFound(t *testing.T) { + scheme := runtime.NewScheme() + if err := v1alpha1.AddToScheme(scheme); err != nil { + t.Fatalf("AddToScheme v1alpha1: %v", err) + } + if err := hv1.AddToScheme(scheme); err != nil { + t.Fatalf("AddToScheme hv1: %v", err) + } + + const ( + projectID = "project-1" + flavorName = "m1.large" + flavorGroup = "m1" + instanceID = "vm-uuid-1" + ) + + ratio := uint64(2048) + fg := compute.FlavorGroupFeature{ + Name: flavorGroup, + Flavors: []compute.FlavorInGroup{{Name: flavorName, VCPUs: 2, MemoryMB: 4096}}, + LargestFlavor: compute.FlavorInGroup{Name: flavorName, VCPUs: 2, MemoryMB: 4096}, + SmallestFlavor: compute.FlavorInGroup{Name: flavorName, VCPUs: 2, MemoryMB: 4096}, + RamCoreRatio: &ratio, + } + + flavorKnowledge := func() *v1alpha1.Knowledge { + raw, err := v1alpha1.BoxFeatureList([]compute.FlavorGroupFeature{fg}) + if err != nil { + t.Fatalf("BoxFeatureList: %v", err) + } + return &v1alpha1.Knowledge{ + ObjectMeta: metav1.ObjectMeta{Name: "flavor-groups"}, + Status: v1alpha1.KnowledgeStatus{ + Raw: raw, + Conditions: []metav1.Condition{{ + Type: v1alpha1.KnowledgeConditionReady, + Status: metav1.ConditionTrue, + Reason: "Ready", + LastTransitionTime: metav1.Now(), + }}, + }, + } + } + + makeCRObject := func(state v1alpha1.CommitmentStatus, amountMiB, usedMiB int64) *v1alpha1.CommittedResource { + cr := &v1alpha1.CommittedResource{ + ObjectMeta: metav1.ObjectMeta{Name: "cr-1"}, + Spec: v1alpha1.CommittedResourceSpec{ + ProjectID: projectID, + FlavorGroupName: flavorGroup, + State: state, + Amount: *resource.NewQuantity(amountMiB*1024*1024, resource.BinarySI), + }, + } + if usedMiB > 0 { + cr.Status.UsedResources = map[string]resource.Quantity{ + "memory": *resource.NewQuantity(usedMiB*1024*1024, resource.BinarySI), + } + } + return cr + } + + makeReservationSlot := func(name string, totalMemMiB, allocatedMemMiB int64) *v1alpha1.Reservation { + res := &v1alpha1.Reservation{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Labels: map[string]string{ + v1alpha1.LabelReservationType: v1alpha1.ReservationTypeLabelCommittedResource, + }, + }, + Spec: v1alpha1.ReservationSpec{ + Resources: map[hv1.ResourceName]resource.Quantity{ + hv1.ResourceMemory: *resource.NewQuantity(totalMemMiB*1024*1024, resource.BinarySI), + }, + CommittedResourceReservation: &v1alpha1.CommittedResourceReservationSpec{ + ProjectID: projectID, + ResourceGroup: flavorGroup, + }, + }, + } + if allocatedMemMiB > 0 { + res.Spec.CommittedResourceReservation.Allocations = map[string]v1alpha1.CommittedResourceAllocation{ + "some-vm": { + Resources: map[hv1.ResourceName]resource.Quantity{ + hv1.ResourceMemory: *resource.NewQuantity(allocatedMemMiB*1024*1024, resource.BinarySI), + }, + }, + } + } + return res + } + + makeReadyReservationSlot := func(name, targetHost string, totalMemMiB, allocatedMemMiB int64) *v1alpha1.Reservation { + res := makeReservationSlot(name, totalMemMiB, allocatedMemMiB) + res.Spec.TargetHost = targetHost + res.Status.Conditions = []metav1.Condition{{ + Type: v1alpha1.ReservationConditionReady, + Status: metav1.ConditionTrue, + Reason: "Ready", + LastTransitionTime: metav1.Now(), + }} + return res + } + + makeHV := func(name string, capacityMiB int64) *hv1.Hypervisor { + return &hv1.Hypervisor{ + ObjectMeta: metav1.ObjectMeta{Name: name}, + Status: hv1.HypervisorStatus{ + EffectiveCapacity: map[hv1.ResourceName]resource.Quantity{ + hv1.ResourceMemory: *resource.NewQuantity(capacityMiB*1024*1024, resource.BinarySI), + }, + Allocation: map[hv1.ResourceName]resource.Quantity{ + hv1.ResourceMemory: *resource.NewQuantity(0, resource.BinarySI), + }, + }, + } + } + + tests := []struct { + name string + objects []client.Object + requestHosts []api.ExternalSchedulerHost + payg bool + expectedCase string // "" means no counter increment expected + expectedFlavorGroup string // defaults to flavorGroup if empty + }{ + { + name: "no_cr: no active CRs", + objects: []client.Object{flavorKnowledge()}, + expectedCase: "no_cr", + }, + { + name: "cr_exhausted: CRs fully occupied", + objects: []client.Object{ + flavorKnowledge(), + makeCRObject(v1alpha1.CommitmentStatusConfirmed, 8192, 8192), + }, + expectedCase: "cr_exhausted", + }, + { + name: "slot_exhausted: slot exists but fully allocated", + objects: []client.Object{ + flavorKnowledge(), + makeCRObject(v1alpha1.CommitmentStatusConfirmed, 8192, 4096), + makeHV("host-1", 16384), + makeReadyReservationSlot("slot-full", "host-1", 8192, 8192), + }, + requestHosts: []api.ExternalSchedulerHost{{ComputeHost: "host-1"}}, + expectedCase: "slot_exhausted", + }, + { + name: "slot_blocked: free slot on candidate host", + objects: []client.Object{ + flavorKnowledge(), + makeCRObject(v1alpha1.CommitmentStatusConfirmed, 8192, 4096), + makeHV("host-1", 16384), + makeReadyReservationSlot("slot-free", "host-1", 8192, 4096), + }, + requestHosts: []api.ExternalSchedulerHost{{ComputeHost: "host-1"}}, + expectedCase: "slot_blocked", + }, + { + name: "no_cr: inactive CR (pending state) is filtered by IsActive()", + objects: []client.Object{ + flavorKnowledge(), + makeCRObject(v1alpha1.CommitmentStatusPending, 8192, 0), + }, + expectedCase: "no_cr", + }, + { + name: "no_cr: CR for wrong project is filtered by MatchesGroup()", + objects: []client.Object{ + flavorKnowledge(), + func() *v1alpha1.CommittedResource { + cr := makeCRObject(v1alpha1.CommitmentStatusConfirmed, 8192, 0) + cr.Spec.ProjectID = "other-project" + return cr + }(), + }, + expectedCase: "no_cr", + }, + { + name: "PAYG: flavor not in any group", + objects: []client.Object{flavorKnowledge()}, + payg: true, + expectedCase: "", + }, + { + name: "error: knowledge CRD unavailable", + objects: []client.Object{ + makeCRObject(v1alpha1.CommitmentStatusConfirmed, 8192, 0), + }, + expectedCase: "error", + expectedFlavorGroup: "unknown", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + fakeClient := fake.NewClientBuilder(). + WithScheme(scheme). + WithObjects(tt.objects...). + Build() + + counter := NewNoHostFoundCounter() + reg := prometheus.NewRegistry() + reg.MustRegister(counter) + + controller := &FilterWeigherPipelineController{ + BasePipelineController: lib.BasePipelineController[lib.FilterWeigherPipeline[api.ExternalSchedulerRequest]]{ + Client: fakeClient, + }, + NoHostFoundCounter: counter, + } + + requestFlavorName := flavorName + if tt.payg { + requestFlavorName = "unknown-flavor" + } + request := api.ExternalSchedulerRequest{ + Spec: api.NovaObject[api.NovaSpec]{ + Data: api.NovaSpec{ + InstanceUUID: instanceID, + Flavor: api.NovaObject[api.NovaFlavor]{ + Data: api.NovaFlavor{Name: requestFlavorName}, + }, + }, + }, + Context: api.NovaRequestContext{ProjectID: projectID}, + Hosts: tt.requestHosts, + } + decision := &v1alpha1.Decision{ + Spec: v1alpha1.DecisionSpec{Intent: api.CreateIntent}, + } + + controller.logNoHostFound(context.Background(), decision, request) + + if tt.expectedCase == "" { + total := testutil.ToFloat64(counter.WithLabelValues("no_cr", flavorGroup, string(api.CreateIntent))) + + testutil.ToFloat64(counter.WithLabelValues("cr_exhausted", flavorGroup, string(api.CreateIntent))) + + testutil.ToFloat64(counter.WithLabelValues("slot_exhausted", flavorGroup, string(api.CreateIntent))) + + testutil.ToFloat64(counter.WithLabelValues("slot_blocked", flavorGroup, string(api.CreateIntent))) + + testutil.ToFloat64(counter.WithLabelValues("error", "unknown", string(api.CreateIntent))) + if total != 0 { + t.Errorf("expected no counter increment, got total %.0f", total) + } + } else { + expectedFG := tt.expectedFlavorGroup + if expectedFG == "" { + expectedFG = flavorGroup + } + got := testutil.ToFloat64(counter.WithLabelValues(tt.expectedCase, expectedFG, string(api.CreateIntent))) + if got != 1 { + t.Errorf("counter[case=%q, flavorGroup=%q, intent=%q] = %.0f, want 1", + tt.expectedCase, expectedFG, string(api.CreateIntent), got) + } + } + }) + } +} + +func TestFeatureGate_CommittedResourceTracking(t *testing.T) { + const ( + projectID = "project-1" + flavorName = "m1.large" + flavorGroup = "m1" + ) + + scheme := runtime.NewScheme() + if err := v1alpha1.AddToScheme(scheme); err != nil { + t.Fatalf("AddToScheme v1alpha1: %v", err) + } + if err := hv1.AddToScheme(scheme); err != nil { + t.Fatalf("AddToScheme hv1: %v", err) + } + ratio := uint64(2048) + fg := compute.FlavorGroupFeature{ + Name: flavorGroup, + Flavors: []compute.FlavorInGroup{{Name: flavorName, VCPUs: 2, MemoryMB: 4096}}, + LargestFlavor: compute.FlavorInGroup{Name: flavorName, VCPUs: 2, MemoryMB: 4096}, + SmallestFlavor: compute.FlavorInGroup{Name: flavorName, VCPUs: 2, MemoryMB: 4096}, + RamCoreRatio: &ratio, + } + raw, err := v1alpha1.BoxFeatureList([]compute.FlavorGroupFeature{fg}) + if err != nil { + t.Fatalf("BoxFeatureList: %v", err) + } + flavorKnowledge := &v1alpha1.Knowledge{ + ObjectMeta: metav1.ObjectMeta{Name: "flavor-groups"}, + Status: v1alpha1.KnowledgeStatus{ + Raw: raw, + Conditions: []metav1.Condition{{ + Type: v1alpha1.KnowledgeConditionReady, + Status: metav1.ConditionTrue, + Reason: "Ready", + LastTransitionTime: metav1.Now(), + }}, + }, + } + + // Zero hosts → pipeline returns no TargetHost → triggers logNoHostFound path. + novaRequest := api.ExternalSchedulerRequest{ + Spec: api.NovaObject[api.NovaSpec]{ + Data: api.NovaSpec{ + InstanceUUID: "test-instance", + Flavor: api.NovaObject[api.NovaFlavor]{Data: api.NovaFlavor{Name: flavorName}}, + }, + }, + Context: api.NovaRequestContext{ProjectID: projectID}, + Hosts: []api.ExternalSchedulerHost{}, + Pipeline: "test-pipeline", + } + novaRaw, err := json.Marshal(novaRequest) + if err != nil { + t.Fatalf("json.Marshal: %v", err) + } + + pipelineConf := v1alpha1.Pipeline{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pipeline"}, + Spec: v1alpha1.PipelineSpec{ + Type: v1alpha1.PipelineTypeFilterWeigher, + SchedulingDomain: v1alpha1.SchedulingDomainNova, + CreateHistory: true, + Filters: []v1alpha1.FilterSpec{}, + Weighers: []v1alpha1.WeigherSpec{}, + }, + } + + for _, enabled := range []bool{false, true} { + t.Run(fmt.Sprintf("CommittedResourceTracking=%v", enabled), func(t *testing.T) { + fakeClient := fake.NewClientBuilder(). + WithScheme(scheme). + WithObjects(flavorKnowledge). + WithStatusSubresource(&v1alpha1.Decision{}, &v1alpha1.History{}). + Build() + + counter := NewNoHostFoundCounter() + reg := prometheus.NewRegistry() + reg.MustRegister(counter) + + controller := &FilterWeigherPipelineController{ + BasePipelineController: lib.BasePipelineController[lib.FilterWeigherPipeline[api.ExternalSchedulerRequest]]{ + Client: fakeClient, + Pipelines: make(map[string]lib.FilterWeigherPipeline[api.ExternalSchedulerRequest]), + PipelineConfigs: make(map[string]v1alpha1.Pipeline), + HistoryManager: lib.HistoryClient{Client: fakeClient}, + }, + FeatureGates: conf.FeatureGates{CommittedResourceTracking: enabled}, + NoHostFoundCounter: counter, + } + controller.PipelineConfigs["test-pipeline"] = pipelineConf + initResult := controller.InitPipeline(context.Background(), pipelineConf) + if len(initResult.FilterErrors) > 0 || len(initResult.WeigherErrors) > 0 { + t.Fatalf("pipeline init errors: filters=%v weighers=%v", initResult.FilterErrors, initResult.WeigherErrors) + } + controller.Pipelines["test-pipeline"] = initResult.Pipeline + + decision := &v1alpha1.Decision{ + ObjectMeta: metav1.ObjectMeta{Name: "test-decision", Namespace: "default"}, + Spec: v1alpha1.DecisionSpec{ + SchedulingDomain: v1alpha1.SchedulingDomainNova, + PipelineRef: corev1.ObjectReference{Name: "test-pipeline"}, + NovaRaw: &runtime.RawExtension{Raw: novaRaw}, + }, + } + + if err := controller.ProcessNewDecisionFromAPI(context.Background(), decision); err != nil { + t.Fatalf("ProcessNewDecisionFromAPI: %v", err) + } + + count := testutil.CollectAndCount(counter) + if enabled && count == 0 { + t.Error("expected counter increment with CommittedResourceTracking=true, got 0") + } + if !enabled && count != 0 { + t.Errorf("expected no counter increment with CommittedResourceTracking=false, got %d", count) + } + }) + } +} diff --git a/internal/scheduling/nova/filter_weigher_pipeline_controller.go b/internal/scheduling/nova/filter_weigher_pipeline_controller.go index 279ac1c3e..7fc695d66 100644 --- a/internal/scheduling/nova/filter_weigher_pipeline_controller.go +++ b/internal/scheduling/nova/filter_weigher_pipeline_controller.go @@ -13,12 +13,14 @@ import ( api "github.com/cobaltcore-dev/cortex/api/external/nova" "github.com/cobaltcore-dev/cortex/api/v1alpha1" + "github.com/prometheus/client_golang/prometheus" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/cobaltcore-dev/cortex/internal/scheduling/lib" "github.com/cobaltcore-dev/cortex/internal/scheduling/nova/plugins/filters" "github.com/cobaltcore-dev/cortex/internal/scheduling/nova/plugins/weighers" + "github.com/cobaltcore-dev/cortex/pkg/conf" "github.com/cobaltcore-dev/cortex/pkg/multicluster" hv1 "github.com/cobaltcore-dev/openstack-hypervisor-operator/api/v1" ctrl "sigs.k8s.io/controller-runtime" @@ -41,10 +43,17 @@ type FilterWeigherPipelineController struct { // Mutex to only allow one process at a time processMu sync.Mutex + // FeatureGates holds feature flags for this controller. + FeatureGates conf.FeatureGates // Monitor to pass down to all pipelines. Monitor lib.FilterWeigherPipelineMonitor // Candidate gatherer to get all placement candidates if needed. gatherer CandidateGatherer + + // NoHostFoundCounter counts no-host-found results by CR slot outcome (no_cr/cr_exhausted/slot_exhausted/slot_blocked/error). + NoHostFoundCounter *prometheus.CounterVec + // PlacementCounter counts successful placements by CR slot outcome. + PlacementCounter *prometheus.CounterVec } // The type of pipeline this controller manages. @@ -62,7 +71,7 @@ func (c *FilterWeigherPipelineController) Reconcile(ctx context.Context, req ctr return ctrl.Result{}, client.IgnoreNotFound(err) } old := decision.DeepCopy() - if err := c.process(ctx, decision); err != nil { + if _, err := c.process(ctx, decision); err != nil { return ctrl.Result{}, err } patch := client.MergeFrom(old) @@ -81,7 +90,8 @@ func (c *FilterWeigherPipelineController) ProcessNewDecisionFromAPI(ctx context. if !ok { return fmt.Errorf("pipeline %s not configured", decision.Spec.PipelineRef.Name) } - err := c.process(ctx, decision) + + request, err := c.process(ctx, decision) if err != nil { meta.SetStatusCondition(&decision.Status.Conditions, metav1.Condition{ Type: v1alpha1.DecisionConditionReady, @@ -98,25 +108,38 @@ func (c *FilterWeigherPipelineController) ProcessNewDecisionFromAPI(ctx context. }) } if pipelineConf.Spec.CreateHistory { - c.upsertHistory(ctx, decision, err) + c.upsertHistory(ctx, decision, request, err) + if err == nil && decision.Status.Result != nil && request != nil && c.FeatureGates.CommittedResourceTracking { + if decision.Status.Result.TargetHost != nil && isUserVMPlacement(decision.Spec.Intent) { + c.recordCRAllocation(ctx, decision, *request) + } + if decision.Status.Result.TargetHost == nil { + c.logNoHostFound(ctx, decision, *request) + } + } } return err } -func (c *FilterWeigherPipelineController) upsertHistory(ctx context.Context, decision *v1alpha1.Decision, pipelineErr error) { +// isUserVMPlacement returns true for intents that represent actual VM +// placements from Nova. Returns false for Cortex-internal synthetic requests +// (failover and CR reservation scheduling), which must not update allocations. +func isUserVMPlacement(intent v1alpha1.SchedulingIntent) bool { + switch intent { + case api.ReserveForCommittedResourceIntent, api.ReserveForFailoverIntent: + return false + default: + return true + } +} + +func (c *FilterWeigherPipelineController) upsertHistory(ctx context.Context, decision *v1alpha1.Decision, request *api.ExternalSchedulerRequest, pipelineErr error) { log := ctrl.LoggerFrom(ctx) var az *string - - if decision.Spec.NovaRaw != nil { - var request api.ExternalSchedulerRequest - err := json.Unmarshal(decision.Spec.NovaRaw.Raw, &request) - if err != nil { - log.Error(err, "failed to unmarshal novaRaw for history, using defaults") - } else { - azStr := request.Spec.Data.AvailabilityZone - az = &azStr - } + if request != nil { + azStr := request.Spec.Data.AvailabilityZone + az = &azStr } if upsertErr := c.HistoryManager.CreateOrUpdateHistory(ctx, decision, az, pipelineErr); upsertErr != nil { @@ -124,23 +147,23 @@ func (c *FilterWeigherPipelineController) upsertHistory(ctx context.Context, dec } } -func (c *FilterWeigherPipelineController) process(ctx context.Context, decision *v1alpha1.Decision) error { +func (c *FilterWeigherPipelineController) process(ctx context.Context, decision *v1alpha1.Decision) (*api.ExternalSchedulerRequest, error) { log := ctrl.LoggerFrom(ctx) startedAt := time.Now() // So we can measure sync duration. pipeline, ok := c.Pipelines[decision.Spec.PipelineRef.Name] if !ok { log.Error(nil, "pipeline not found or not ready", "pipelineName", decision.Spec.PipelineRef.Name) - return errors.New("pipeline not found or not ready") + return nil, errors.New("pipeline not found or not ready") } if decision.Spec.NovaRaw == nil { log.Error(nil, "skipping decision, no novaRaw spec defined") - return errors.New("no novaRaw spec defined") + return nil, errors.New("no novaRaw spec defined") } var request api.ExternalSchedulerRequest if err := json.Unmarshal(decision.Spec.NovaRaw.Raw, &request); err != nil { log.Error(err, "failed to unmarshal novaRaw spec") - return err + return nil, err } if intent, err := request.GetIntent(); err != nil { @@ -155,13 +178,13 @@ func (c *FilterWeigherPipelineController) process(ctx context.Context, decision pipelineConf, ok := c.PipelineConfigs[decision.Spec.PipelineRef.Name] if !ok { log.Error(nil, "pipeline config not found", "pipelineName", decision.Spec.PipelineRef.Name) - return errors.New("pipeline config not found") + return &request, errors.New("pipeline config not found") } if pipelineConf.Spec.IgnorePreselection { log.Info("gathering all placement candidates before filtering") if err := c.gatherer.MutateWithAllCandidates(ctx, &request); err != nil { log.Error(err, "failed to gather all placement candidates") - return err + return &request, err } log.Info("gathered all placement candidates", "numHosts", len(request.Hosts)) } @@ -169,7 +192,7 @@ func (c *FilterWeigherPipelineController) process(ctx context.Context, decision result, err := pipeline.Run(request) if err != nil { log.Error(err, "failed to run pipeline") - return err + return &request, err } decision.Status.Result = &result meta.SetStatusCondition(&decision.Status.Conditions, metav1.Condition{ @@ -179,7 +202,7 @@ func (c *FilterWeigherPipelineController) process(ctx context.Context, decision Message: "pipeline run succeeded", }) log.Info("decision processed successfully", "duration", time.Since(startedAt)) - return nil + return &request, nil } // The base controller will delegate the pipeline creation down to this method. @@ -252,6 +275,11 @@ func (c *FilterWeigherPipelineController) SetupWithManager(mgr manager.Manager, if err != nil { return err } + // Watch committed resource changes so the no-host-found classifier can read them. + bldr, err = bldr.WatchesMulticluster(&v1alpha1.CommittedResource{}, handler.Funcs{}) + if err != nil { + return err + } // Watch decision changes across all clusters. bldr, err = bldr.WatchesMulticluster( &v1alpha1.Decision{}, diff --git a/internal/scheduling/nova/filter_weigher_pipeline_controller_test.go b/internal/scheduling/nova/filter_weigher_pipeline_controller_test.go index 752725df8..2c2e85414 100644 --- a/internal/scheduling/nova/filter_weigher_pipeline_controller_test.go +++ b/internal/scheduling/nova/filter_weigher_pipeline_controller_test.go @@ -11,7 +11,10 @@ import ( "testing" "time" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" @@ -21,8 +24,9 @@ import ( api "github.com/cobaltcore-dev/cortex/api/external/nova" "github.com/cobaltcore-dev/cortex/api/v1alpha1" - + "github.com/cobaltcore-dev/cortex/internal/knowledge/extractor/plugins/compute" "github.com/cobaltcore-dev/cortex/internal/scheduling/lib" + hv1 "github.com/cobaltcore-dev/openstack-hypervisor-operator/api/v1" ) // mockCandidateGatherer implements CandidateGatherer for testing @@ -926,5 +930,434 @@ func TestFilterWeigherPipelineController_IgnorePreselection(t *testing.T) { } } +func TestIsUserVMPlacement(t *testing.T) { + tests := []struct { + intent v1alpha1.SchedulingIntent + expected bool + }{ + {api.CreateIntent, true}, + {api.LiveMigrationIntent, true}, + {api.EvacuateIntent, true}, + {api.RebuildIntent, true}, + {api.ResizeIntent, true}, + {api.ReserveForCommittedResourceIntent, false}, + {api.ReserveForFailoverIntent, false}, + {v1alpha1.SchedulingIntentUnknown, true}, + } + for _, tt := range tests { + if got := isUserVMPlacement(tt.intent); got != tt.expected { + t.Errorf("isUserVMPlacement(%q) = %v, want %v", tt.intent, got, tt.expected) + } + } +} + +func TestPickReservationSlot(t *testing.T) { + // vmMemBytes for a 4096 MiB flavor. + const vmMemBytes = int64(4096) * 1024 * 1024 + + makeSlot := func(name string, totalMemMiB, totalCPU, usedMemMiB, usedCPU int64) v1alpha1.Reservation { + var allocs map[string]v1alpha1.CommittedResourceAllocation + if usedMemMiB > 0 || usedCPU > 0 { + allocs = map[string]v1alpha1.CommittedResourceAllocation{ + "vm-existing": { + Resources: map[hv1.ResourceName]resource.Quantity{ + hv1.ResourceMemory: *resource.NewQuantity(usedMemMiB*1024*1024, resource.BinarySI), + hv1.ResourceCPU: *resource.NewQuantity(usedCPU, resource.DecimalSI), + }, + }, + } + } + return v1alpha1.Reservation{ + ObjectMeta: metav1.ObjectMeta{Name: name}, + Spec: v1alpha1.ReservationSpec{ + Resources: map[hv1.ResourceName]resource.Quantity{ + hv1.ResourceMemory: *resource.NewQuantity(totalMemMiB*1024*1024, resource.BinarySI), + hv1.ResourceCPU: *resource.NewQuantity(totalCPU, resource.DecimalSI), + }, + CommittedResourceReservation: &v1alpha1.CommittedResourceReservationSpec{ + Allocations: allocs, + }, + }, + } + } + + tests := []struct { + name string + candidates []v1alpha1.Reservation + want string + }{ + { + name: "no candidates", + candidates: nil, + want: "", + }, + { + name: "single slot fits fully", + candidates: []v1alpha1.Reservation{makeSlot("a", 8192, 8, 0, 0)}, + want: "a", + }, + { + name: "slot with zero remaining memory excluded", + candidates: []v1alpha1.Reservation{makeSlot("a", 8192, 8, 8192, 0)}, + want: "", + }, + { + name: "picks slot with least remaining memory when both cover fully", + candidates: []v1alpha1.Reservation{ + makeSlot("large", 8192, 8, 0, 0), // 8192 MiB remaining + makeSlot("small", 6144, 8, 0, 0), // 6144 MiB remaining + }, + want: "small", + }, + { + name: "name tiebreak when coverage and remaining memory are equal", + candidates: []v1alpha1.Reservation{ + makeSlot("slot-b", 6144, 4, 0, 0), + makeSlot("slot-a", 6144, 4, 0, 0), + }, + want: "slot-a", + }, + { + name: "missing resource keys treated as zero remaining", + candidates: []v1alpha1.Reservation{ + { + ObjectMeta: metav1.ObjectMeta{Name: "empty-res"}, + Spec: v1alpha1.ReservationSpec{ + CommittedResourceReservation: &v1alpha1.CommittedResourceReservationSpec{}, + }, + }, + }, + want: "", + }, + { + name: "partially used slot still usable", + candidates: []v1alpha1.Reservation{makeSlot("partial", 8192, 8, 2048, 2)}, // 6144 MiB remaining + want: "partial", + }, + { + name: "overfill: slot smaller than VM is usable", + candidates: []v1alpha1.Reservation{makeSlot("a", 4096, 8, 2048, 0)}, // 2048 MiB remaining < vmMemBytes + want: "a", + }, + { + name: "overfill: full coverage preferred over partial", + candidates: []v1alpha1.Reservation{ + makeSlot("partial", 4096, 8, 2048, 0), // 2048 MiB remaining, coverage=2048 + makeSlot("full", 6144, 8, 0, 0), // 6144 MiB remaining, coverage=4096 (full) + }, + want: "full", + }, + { + name: "overfill: highest partial coverage preferred", + candidates: []v1alpha1.Reservation{ + makeSlot("low", 4096, 8, 2048, 0), // 2048 MiB remaining, coverage=2048 + makeSlot("high", 4096, 8, 1024, 0), // 3072 MiB remaining, coverage=3072 + }, + want: "high", + }, + { + name: "CPU exhausted slot is still usable (overfill applies to CPU too)", + candidates: []v1alpha1.Reservation{makeSlot("cpu-full", 8192, 2, 0, 2)}, // remCPU=0 but remMem>0 + want: "cpu-full", + }, + { + name: "name tiebreak when memory equal (CPU no longer a tiebreak criterion)", + candidates: []v1alpha1.Reservation{ + makeSlot("more-cpu", 6144, 8, 0, 0), + makeSlot("less-cpu", 6144, 4, 0, 0), + }, + want: "less-cpu", // wins by name, not by CPU + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := pickReservationSlot(tt.candidates, vmMemBytes) + if got != tt.want { + t.Errorf("pickReservationSlot() = %q, want %q", got, tt.want) + } + }) + } +} + +func TestRecordCRAllocation(t *testing.T) { + scheme := runtime.NewScheme() + if err := v1alpha1.AddToScheme(scheme); err != nil { + t.Fatalf("AddToScheme v1alpha1: %v", err) + } + if err := hv1.AddToScheme(scheme); err != nil { + t.Fatalf("AddToScheme hv1: %v", err) + } + + const ( + instanceUUID = "vm-uuid-1" + projectID = "project-1" + flavorName = "m1.large" + flavorGroup = "m1" + selectedHost = "compute-1" + ) + + ratio := uint64(2048) + fg := compute.FlavorGroupFeature{ + Name: flavorGroup, + Flavors: []compute.FlavorInGroup{{Name: flavorName, VCPUs: 2, MemoryMB: 4096}}, + LargestFlavor: compute.FlavorInGroup{Name: flavorName, VCPUs: 2, MemoryMB: 4096}, + SmallestFlavor: compute.FlavorInGroup{Name: flavorName, VCPUs: 2, MemoryMB: 4096}, + RamCoreRatio: &ratio, + } + + flavorKnowledge := func() *v1alpha1.Knowledge { + raw, err := v1alpha1.BoxFeatureList([]compute.FlavorGroupFeature{fg}) + if err != nil { + t.Fatalf("BoxFeatureList: %v", err) + } + return &v1alpha1.Knowledge{ + ObjectMeta: metav1.ObjectMeta{Name: "flavor-groups"}, + Status: v1alpha1.KnowledgeStatus{ + Raw: raw, + Conditions: []metav1.Condition{{ + Type: v1alpha1.KnowledgeConditionReady, + Status: metav1.ConditionTrue, + Reason: "Ready", + LastTransitionTime: metav1.Now(), + }}, + }, + } + } + + makeReservation := func(name string, memMiB, cpus int64, proj, group, host string, allocs map[string]v1alpha1.CommittedResourceAllocation) *v1alpha1.Reservation { + return &v1alpha1.Reservation{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Labels: map[string]string{ + v1alpha1.LabelReservationType: v1alpha1.ReservationTypeLabelCommittedResource, + }, + }, + Spec: v1alpha1.ReservationSpec{ + Type: v1alpha1.ReservationTypeCommittedResource, + TargetHost: host, + Resources: map[hv1.ResourceName]resource.Quantity{ + hv1.ResourceMemory: *resource.NewQuantity(memMiB*1024*1024, resource.BinarySI), + hv1.ResourceCPU: *resource.NewQuantity(cpus, resource.DecimalSI), + }, + CommittedResourceReservation: &v1alpha1.CommittedResourceReservationSpec{ + ProjectID: proj, + ResourceGroup: group, + Allocations: allocs, + }, + }, + } + } + + makeRequest := func(uuid, proj, flavor string) api.ExternalSchedulerRequest { + return api.ExternalSchedulerRequest{ + Spec: api.NovaObject[api.NovaSpec]{ + Data: api.NovaSpec{ + InstanceUUID: uuid, + Flavor: api.NovaObject[api.NovaFlavor]{ + Data: api.NovaFlavor{Name: flavor, MemoryMB: 4096, VCPUs: 2}, + }, + }, + }, + Context: api.NovaRequestContext{ProjectID: proj}, + } + } + + makeDecision := func(host string, candidates ...string) *v1alpha1.Decision { + h := host + return &v1alpha1.Decision{ + Status: v1alpha1.DecisionStatus{ + Result: &v1alpha1.DecisionResult{ + TargetHost: &h, + OrderedHosts: candidates, + }, + }, + } + } + + // makeCRObject creates a CommittedResource for the test project+flavorGroup with remaining capacity. + makeCRObject := func() *v1alpha1.CommittedResource { + return &v1alpha1.CommittedResource{ + ObjectMeta: metav1.ObjectMeta{Name: "cr-1"}, + Spec: v1alpha1.CommittedResourceSpec{ + ProjectID: projectID, + FlavorGroupName: flavorGroup, + State: v1alpha1.CommitmentStatusConfirmed, + Amount: *resource.NewQuantity(int64(8192)*1024*1024, resource.BinarySI), + }, + } + } + + // setReady marks a reservation as Ready so BuildCRSlotEvaluator indexes it. + setReady := func(res *v1alpha1.Reservation) *v1alpha1.Reservation { + res.Status.Conditions = []metav1.Condition{{ + Type: v1alpha1.ReservationConditionReady, + Status: metav1.ConditionTrue, + Reason: "Ready", + LastTransitionTime: metav1.Now(), + }} + return res + } + + vmAlloc := func() map[string]v1alpha1.CommittedResourceAllocation { + return map[string]v1alpha1.CommittedResourceAllocation{ + instanceUUID: { + CreationTimestamp: metav1.Now(), + Resources: map[hv1.ResourceName]resource.Quantity{ + hv1.ResourceMemory: *resource.NewQuantity(int64(4096)*1024*1024, resource.BinarySI), + hv1.ResourceCPU: *resource.NewQuantity(2, resource.DecimalSI), + }, + }, + } + } + + tests := []struct { + name string + objects []client.Object + request api.ExternalSchedulerRequest + decision *v1alpha1.Decision + checkSlot string + expectAllocation bool + expectedCRSlot string // if non-empty, asserts PlacementCounter cr_slot label + }{ + { + name: "writes allocation into matching reservation", + objects: []client.Object{ + flavorKnowledge(), + makeCRObject(), + setReady(makeReservation("slot-1", 8192, 8, projectID, flavorGroup, selectedHost, nil)), + }, + request: makeRequest(instanceUUID, projectID, flavorName), + decision: makeDecision(selectedHost, selectedHost), + checkSlot: "slot-1", + expectAllocation: true, + }, + { + name: "idempotent: UUID already in allocations", + objects: []client.Object{ + flavorKnowledge(), + makeCRObject(), + setReady(makeReservation("slot-1", 8192, 8, projectID, flavorGroup, selectedHost, vmAlloc())), + }, + request: makeRequest(instanceUUID, projectID, flavorName), + decision: makeDecision(selectedHost, selectedHost), + checkSlot: "slot-1", + expectAllocation: true, + }, + { + name: "PAYG: flavor not in any group", + objects: []client.Object{ + flavorKnowledge(), + setReady(makeReservation("slot-1", 8192, 8, projectID, flavorGroup, selectedHost, nil)), + }, + request: makeRequest(instanceUUID, projectID, "unknown-flavor"), + decision: makeDecision(selectedHost, selectedHost), + checkSlot: "slot-1", + expectAllocation: false, + }, + { + name: "no matching reservation: host mismatch", + objects: []client.Object{ + flavorKnowledge(), + makeCRObject(), + setReady(makeReservation("slot-1", 8192, 8, projectID, flavorGroup, "other-host", nil)), + }, + request: makeRequest(instanceUUID, projectID, flavorName), + decision: makeDecision(selectedHost, selectedHost), + checkSlot: "slot-1", + expectAllocation: false, + }, + { + name: "no slot fits: all capacity used", + objects: []client.Object{ + flavorKnowledge(), + makeCRObject(), + setReady(makeReservation("slot-full", 4096, 2, projectID, flavorGroup, selectedHost, + map[string]v1alpha1.CommittedResourceAllocation{ + "other-vm": { + Resources: map[hv1.ResourceName]resource.Quantity{ + hv1.ResourceMemory: *resource.NewQuantity(int64(4096)*1024*1024, resource.BinarySI), + hv1.ResourceCPU: *resource.NewQuantity(2, resource.DecimalSI), + }, + }, + })), + }, + request: makeRequest(instanceUUID, projectID, flavorName), + decision: makeDecision(selectedHost, selectedHost), + checkSlot: "slot-full", + expectAllocation: false, + }, + { + name: "inactive CR (pending state) is filtered by IsActive(): no allocation", + objects: []client.Object{ + flavorKnowledge(), + func() *v1alpha1.CommittedResource { + cr := makeCRObject() + cr.Spec.State = v1alpha1.CommitmentStatusPending + return cr + }(), + setReady(makeReservation("slot-1", 8192, 8, projectID, flavorGroup, selectedHost, nil)), + }, + request: makeRequest(instanceUUID, projectID, flavorName), + decision: makeDecision(selectedHost, selectedHost), + checkSlot: "slot-1", + expectAllocation: false, + }, + { + name: "no knowledge CRD: logs error, no allocation", + objects: []client.Object{ + makeReservation("slot-1", 8192, 8, projectID, flavorGroup, selectedHost, nil), + }, + request: makeRequest(instanceUUID, projectID, flavorName), + decision: makeDecision(selectedHost, selectedHost), + checkSlot: "slot-1", + expectAllocation: false, + expectedCRSlot: "error", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + fakeClient := fake.NewClientBuilder(). + WithScheme(scheme). + WithObjects(tt.objects...). + Build() + + counter := NewPlacementCounter() + reg := prometheus.NewRegistry() + reg.MustRegister(counter) + + controller := &FilterWeigherPipelineController{ + BasePipelineController: lib.BasePipelineController[lib.FilterWeigherPipeline[api.ExternalSchedulerRequest]]{ + Client: fakeClient, + }, + PlacementCounter: counter, + } + + controller.recordCRAllocation(context.Background(), tt.decision, tt.request) + + var res v1alpha1.Reservation + if err := fakeClient.Get(context.Background(), client.ObjectKey{Name: tt.checkSlot}, &res); err != nil { + t.Fatalf("Get reservation %q: %v", tt.checkSlot, err) + } + _, hasAlloc := res.Spec.CommittedResourceReservation.Allocations[instanceUUID] + if tt.expectAllocation && !hasAlloc { + t.Errorf("expected allocation for UUID %q but none found", instanceUUID) + } + if !tt.expectAllocation && hasAlloc { + t.Errorf("expected no allocation for UUID %q but one was written", instanceUUID) + } + if tt.expectedCRSlot != "" { + intent := string(tt.decision.Spec.Intent) + got := testutil.ToFloat64(counter.WithLabelValues("unknown", intent, tt.expectedCRSlot)) + if got != 1 { + t.Errorf("PlacementCounter[flavor_group=unknown, intent=%q, cr_slot=%q] = %.0f, want 1", + intent, tt.expectedCRSlot, got) + } + } + }) + } +} + // Error variable for testing var errGathererFailed = errors.New("gatherer failed") diff --git a/internal/scheduling/nova/plugins/filters/filter_has_enough_capacity.go b/internal/scheduling/nova/plugins/filters/filter_has_enough_capacity.go index b97d3e0e5..e72575c57 100644 --- a/internal/scheduling/nova/plugins/filters/filter_has_enough_capacity.go +++ b/internal/scheduling/nova/plugins/filters/filter_has_enough_capacity.go @@ -139,8 +139,7 @@ func (s *FilterHasEnoughCapacity) Run(traceLog *slog.Logger, request api.Externa // For committed resource reservations: unlock resources only if: // 1. Project ID matches // 2. ResourceGroup matches the flavor's hw_version - reservation.Spec.CommittedResourceReservation.ProjectID == request.Spec.Data.ProjectID && - reservation.Spec.CommittedResourceReservation.ResourceGroup == request.Spec.Data.Flavor.Data.ExtraSpecs["hw_version"]: + reservation.Spec.CommittedResourceReservation.MatchesGroup(request.Spec.Data.ProjectID, request.Spec.Data.Flavor.Data.ExtraSpecs["hw_version"]): traceLog.Info("unlocking resources reserved by matching committed resource reservation with allocation", "reservation", reservation.Name, "instanceUUID", request.Spec.Data.InstanceUUID, diff --git a/internal/scheduling/nova/plugins/filters/filter_quota_enforcement.go b/internal/scheduling/nova/plugins/filters/filter_quota_enforcement.go index 18286fce6..13fcc1fc7 100644 --- a/internal/scheduling/nova/plugins/filters/filter_quota_enforcement.go +++ b/internal/scheduling/nova/plugins/filters/filter_quota_enforcement.go @@ -149,7 +149,7 @@ func (s *FilterQuotaEnforcement) Run(traceLog *slog.Logger, request api.External if spec.ResourceType != v1alpha1.CommittedResourceTypeMemory { continue } - if spec.State != v1alpha1.CommitmentStatusConfirmed && spec.State != v1alpha1.CommitmentStatusGuaranteed { + if !spec.IsActive() { continue } now := time.Now() diff --git a/internal/scheduling/reservations/capacity/controller.go b/internal/scheduling/reservations/capacity/controller.go index da4ce5b58..4672a2cdd 100644 --- a/internal/scheduling/reservations/capacity/controller.go +++ b/internal/scheduling/reservations/capacity/controller.go @@ -337,7 +337,7 @@ func (c *Controller) sumCommittedCapacity(ctx context.Context, groupName, az str if cr.Spec.ResourceType != v1alpha1.CommittedResourceTypeMemory { continue } - if cr.Spec.State != v1alpha1.CommitmentStatusGuaranteed && cr.Spec.State != v1alpha1.CommitmentStatusConfirmed { + if !cr.IsActive() { continue } amount := cr.Spec.Amount diff --git a/internal/scheduling/reservations/commitments/committed_resource_controller.go b/internal/scheduling/reservations/commitments/committed_resource_controller.go index 4659b83c5..53811854a 100644 --- a/internal/scheduling/reservations/commitments/committed_resource_controller.go +++ b/internal/scheduling/reservations/commitments/committed_resource_controller.go @@ -257,7 +257,7 @@ func (r *CommittedResourceController) reconcileCoresHeadroom(ctx context.Context if other.Spec.FlavorGroupName != cr.Spec.FlavorGroupName || other.Spec.AvailabilityZone != cr.Spec.AvailabilityZone { continue } - if other.Spec.State != v1alpha1.CommitmentStatusGuaranteed && other.Spec.State != v1alpha1.CommitmentStatusConfirmed { + if !other.IsActive() { continue } if other.Status.AcceptedSpec == nil { diff --git a/internal/scheduling/reservations/commitments/usage.go b/internal/scheduling/reservations/commitments/usage.go index a6d360bd1..acd6aeccb 100644 --- a/internal/scheduling/reservations/commitments/usage.go +++ b/internal/scheduling/reservations/commitments/usage.go @@ -224,7 +224,7 @@ func buildCommitmentCapacityMap( } // Use AcceptedSpec.State so sibling CRs whose spec is mid-transition (e.g. syncer just // wrote expired before the CR controller accepted it) don't lose capacity prematurely. - if cr.Status.AcceptedSpec.State != v1alpha1.CommitmentStatusConfirmed && cr.Status.AcceptedSpec.State != v1alpha1.CommitmentStatusGuaranteed { + if !cr.Status.AcceptedSpec.IsActive() { continue } diff --git a/internal/scheduling/reservations/commitments/usage_reconciler.go b/internal/scheduling/reservations/commitments/usage_reconciler.go index 8a0f072ba..085045c19 100644 --- a/internal/scheduling/reservations/commitments/usage_reconciler.go +++ b/internal/scheduling/reservations/commitments/usage_reconciler.go @@ -49,7 +49,7 @@ func (r *UsageReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl log := ctrl.LoggerFrom(ctx).WithValues("component", "usage-reconciler", "committedResource", req.Name) // Only active commitments have assigned VMs. Clear stale usage status if present. - if cr.Spec.State != v1alpha1.CommitmentStatusConfirmed && cr.Spec.State != v1alpha1.CommitmentStatusGuaranteed { + if !cr.IsActive() { log.Info("skipping: commitment state is not active", "state", cr.Spec.State) if len(cr.Status.AssignedInstances) > 0 || len(cr.Status.UsedResources) > 0 { old := cr.DeepCopy() diff --git a/internal/scheduling/reservations/commitments/usage_reconciler_monitor.go b/internal/scheduling/reservations/commitments/usage_reconciler_monitor.go index fe64ac823..2adb25a44 100644 --- a/internal/scheduling/reservations/commitments/usage_reconciler_monitor.go +++ b/internal/scheduling/reservations/commitments/usage_reconciler_monitor.go @@ -18,17 +18,17 @@ type UsageReconcilerMonitor struct { func NewUsageReconcilerMonitor() UsageReconcilerMonitor { m := UsageReconcilerMonitor{ reconcileDuration: prometheus.NewHistogramVec(prometheus.HistogramOpts{ - Name: "cortex_cr_usage_reconcile_duration_seconds", + Name: "cortex_committed_resource_usage_reconcile_duration_seconds", Help: "Duration of committed resource usage reconcile runs in seconds.", Buckets: []float64{0.1, 0.25, 0.5, 1, 2.5, 5, 10, 30}, }, []string{"result"}), statusAge: prometheus.NewHistogram(prometheus.HistogramOpts{ - Name: "cortex_cr_usage_status_age_seconds", + Name: "cortex_committed_resource_usage_status_age_seconds", Help: "Age of CommittedResource usage status at reconcile time, in seconds. Distribution across all active commitments shows freshness spread.", Buckets: []float64{30, 60, 120, 300, 600, 900, 1800}, }), assignedInstances: prometheus.NewGaugeVec(prometheus.GaugeOpts{ - Name: "cortex_cr_usage_assigned_vms_total", + Name: "cortex_committed_resource_usage_assigned_vms_total", Help: "Number of VMs currently assigned to committed resources for a project.", }, []string{"project_id"}), } diff --git a/pkg/conf/features.go b/pkg/conf/features.go new file mode 100644 index 000000000..1955c87ba --- /dev/null +++ b/pkg/conf/features.go @@ -0,0 +1,19 @@ +// Copyright SAP SE +// SPDX-License-Identifier: Apache-2.0 + +package conf + +// FeatureGates holds boolean toggles for features that are in active rollout +// or that are only relevant for certain deployment types. Set via conf.json +// under the "featureGates" key. All flags default to false (disabled) when omitted. +type FeatureGates struct { + // CommittedResourceTracking enables committed resource integration: writing + // placed VM UUIDs back into Reservation slots and classifying no-host-found + // results by committed resource coverage. Enable only on deployments that + // use committed resources. + CommittedResourceTracking bool `json:"committedResourceTracking,omitempty"` + // PessimisticBlocking enables blocking all candidate hosts at scheduling time + // to prevent concurrent placements from violating capacity constraints. Enable + // only on deployments that support pessimistic blocking. + PessimisticBlocking bool `json:"pessimisticBlocking,omitempty"` +}