Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,17 @@ func newCRTestClient(scheme *runtime.Scheme, objects ...client.Object) client.Cl
}
return []string{cr.Spec.ProjectID}
}).
WithIndex(&v1alpha1.Reservation{}, idxReservationByAllocationVMUUID, func(obj client.Object) []string {
res, ok := obj.(*v1alpha1.Reservation)
if !ok || res.Spec.CommittedResourceReservation == nil {
return nil
}
uuids := make([]string, 0, len(res.Spec.CommittedResourceReservation.Allocations))
for vmUUID := range res.Spec.CommittedResourceReservation.Allocations {
uuids = append(uuids, vmUUID)
}
return uuids
}).
Build()
}

Expand Down
41 changes: 37 additions & 4 deletions internal/scheduling/reservations/commitments/field_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,17 @@ const idxCommittedResourceByUUID = "spec.commitmentUUID"
const idxCommittedResourceByProjectID = "spec.projectID"
const idxReservationByCommitmentUUID = "spec.committedResourceReservation.commitmentUUID"
const idxProjectQuotaByProjectID = "spec.projectID"
const idxReservationByAllocationVMUUID = "spec.committedResourceReservation.allocations"

// once guards ensure each field index is registered exactly once.
// Both CommittedResourceController and UsageReconciler call indexCommittedResourceByUUID;
// the underlying cache returns "indexer conflict" on double registration.
var (
onceIndexCRByUUID sync.Once
onceIndexCRByProjectID sync.Once
onceIndexReservationByUUID sync.Once
onceIndexPQByProjectID sync.Once
onceIndexCRByUUID sync.Once
onceIndexCRByProjectID sync.Once
onceIndexReservationByUUID sync.Once
onceIndexPQByProjectID sync.Once
onceIndexReservationByAllocationVMID sync.Once
)

// indexCommittedResourceByUUID registers the index used by UsageReconciler to look up
Expand Down Expand Up @@ -128,3 +130,34 @@ func indexProjectQuotaByProjectID(ctx context.Context, mcl *multicluster.Client)
})
return err
}

// indexReservationByAllocationVMUUID registers an index over all VM UUIDs present in
// Spec.CommittedResourceReservation.Allocations. This allows the reservation controller
// to efficiently find all other Reservation CRDs carrying a specific VM UUID without
// scanning every reservation in the cluster.
func indexReservationByAllocationVMUUID(ctx context.Context, mcl *multicluster.Client) (err error) {
onceIndexReservationByAllocationVMID.Do(func() {
log := logf.FromContext(ctx)
err = mcl.IndexField(ctx,
&v1alpha1.Reservation{},
&v1alpha1.ReservationList{},
idxReservationByAllocationVMUUID,
func(obj client.Object) []string {
res, ok := obj.(*v1alpha1.Reservation)
if !ok {
log.Error(errors.New("unexpected type"), "expected Reservation", "object", obj)
return nil
}
if res.Spec.CommittedResourceReservation == nil {
return nil
}
uuids := make([]string, 0, len(res.Spec.CommittedResourceReservation.Allocations))
for vmUUID := range res.Spec.CommittedResourceReservation.Allocations {
uuids = append(uuids, vmUUID)
}
return uuids
},
)
})
return err
}
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,17 @@ func newIntgEnv(t *testing.T, initialObjects []client.Object, schedulerFn http.H
}
return []string{cr.Spec.CommitmentUUID}
}).
WithIndex(&v1alpha1.Reservation{}, idxReservationByAllocationVMUUID, func(obj client.Object) []string {
res, ok := obj.(*v1alpha1.Reservation)
if !ok || res.Spec.CommittedResourceReservation == nil {
return nil
}
uuids := make([]string, 0, len(res.Spec.CommittedResourceReservation.Allocations))
for vmUUID := range res.Spec.CommittedResourceReservation.Allocations {
uuids = append(uuids, vmUUID)
}
return uuids
}).
Build()

schedulerSrv := httptest.NewServer(schedulerFn)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,13 @@ func (r *CommitmentReservationController) reconcileAllocations(ctx context.Conte
res.Status.CommittedResourceReservation = &v1alpha1.CommittedResourceReservationStatus{}
}

// Snapshot existing Status.Allocations before we overwrite it so we can detect
// which VM UUIDs are newly confirmed after the patch.
existingStatusAllocations := make(map[string]string, len(res.Status.CommittedResourceReservation.Allocations))
for k, v := range res.Status.CommittedResourceReservation.Allocations {
existingStatusAllocations[k] = v
}
Comment thread
juliusclausnitzer marked this conversation as resolved.

// Build new Status.Allocations map based on HV CRD state.
newStatusAllocations := make(map[string]string)
// Track allocations to remove from Spec (stale/leaving VMs).
Expand Down Expand Up @@ -469,6 +476,19 @@ func (r *CommitmentReservationController) reconcileAllocations(ctx context.Conte
res.Status.CommittedResourceReservation.Allocations = newStatusAllocations
}

// Proactively remove this VM UUID from all other candidate reservations that still
// carry it in their Spec.Allocations. Only do this for VMs that are newly confirmed
// in this reconcile cycle (present in newStatusAllocations but absent in the snapshot
// taken before any patch) to avoid redundant work on subsequent reconciles.
for vmUUID := range newStatusAllocations {
if _, wasAlreadyConfirmed := existingStatusAllocations[vmUUID]; wasAlreadyConfirmed {
continue
}
if err := r.cleanupCandidateReservations(ctx, res.Name, vmUUID); err != nil {
return nil, fmt.Errorf("failed to cleanup candidate reservations for vm %s: %w", vmUUID, err)
}
}

// Patch Status
patch := client.MergeFrom(old)
if err := r.Status().Patch(ctx, res, patch); err != nil {
Expand All @@ -487,6 +507,41 @@ func (r *CommitmentReservationController) reconcileAllocations(ctx context.Conte
return result, nil
}

// cleanupCandidateReservations removes vmUUID from Spec.Allocations of all Reservation CRDs
// other than the one that just confirmed the VM. This is called once per newly confirmed VM
// so that candidate slots on non-selected hosts are freed immediately rather than waiting
// for those reservations' own grace period or periodic requeue.
func (r *CommitmentReservationController) cleanupCandidateReservations(ctx context.Context, confirmedReservationName, vmUUID string) error {
logger := LoggerFromContext(ctx).WithValues("component", "controller", "vm", vmUUID)

var candidates v1alpha1.ReservationList
if err := r.List(ctx, &candidates, client.MatchingFields{idxReservationByAllocationVMUUID: vmUUID}); err != nil {
return fmt.Errorf("failed to list candidate reservations: %w", err)
}

for i := range candidates.Items {
candidate := &candidates.Items[i]
if candidate.Name == confirmedReservationName {
continue
}
if candidate.Spec.CommittedResourceReservation == nil {
continue
}
if _, ok := candidate.Spec.CommittedResourceReservation.Allocations[vmUUID]; !ok {
continue
}
old := candidate.DeepCopy()
delete(candidate.Spec.CommittedResourceReservation.Allocations, vmUUID)
if err := r.Patch(ctx, candidate, client.MergeFrom(old)); err != nil {
if client.IgnoreNotFound(err) != nil {
return fmt.Errorf("failed to patch candidate reservation %s: %w", candidate.Name, err)
}
}
logger.Info("removed vm from candidate reservation", "reservation", candidate.Name, "host", candidate.Status.Host)
}
return nil
}

// getPipelineForFlavorGroup returns the pipeline name for a given flavor group.
func (r *CommitmentReservationController) getPipelineForFlavorGroup(flavorGroupName string, logger logr.Logger) string {
// Try exact match first (e.g., "2152" -> "kvm-cr-hana")
Expand Down Expand Up @@ -581,6 +636,10 @@ func (r *CommitmentReservationController) SetupWithManager(mgr ctrl.Manager, mcl
return err
}

if err := indexReservationByAllocationVMUUID(context.Background(), mcl); err != nil {
return fmt.Errorf("failed to set up reservation allocation VM UUID index: %w", err)
}

// Use WatchesMulticluster to watch Reservations across all configured clusters
// (home + remotes). This is required because Reservation CRDs may be stored
// in remote clusters, not just the home cluster. Without this, the controller
Expand Down
Loading