From 70af06a6c551ddd794454ee01e4369fe5941ac52 Mon Sep 17 00:00:00 2001 From: Jakub Guzik Date: Thu, 21 May 2026 10:07:35 +0200 Subject: [PATCH] Fix dispatcher reliability: capability parsing, crash-on-prom-failure, validation, and persistence - Unify capability extraction into a single exported ExtractCapabilities function in pkg/dispatcher. - Change logrus.Fatal to logrus.Error+return when GetJobVolumes fails. A transient Prometheus timeout no longer kills the entire dispatcher process. - Fix Validate() off-by-one: len(matches) > 1 should be > 0, so a single duplicated job name across groups is now caught. Also call Validate() from LoadConfig() so it runs in production, not only tests. - Skip Regenerate and downstream side-effects when dispatchJobs returns nil (no build farm clusters configured) instead of wiping all in-memory state. This prevents the HTTP API from either serving stale assignments to removed clusters or returning 404 for every job. - Persist delta dispatch results to the gob file so assignments survive process restarts. - Make gob writes atomic via temp file + rename to prevent corruption on crash mid-write. Signed-off-by: Jakub Guzik --- cmd/prow-job-dispatcher/main.go | 33 +++++++++++++----------------- pkg/dispatcher/config.go | 13 +++++++++--- pkg/dispatcher/gob.go | 36 ++++++++++++++++++++------------- 3 files changed, 46 insertions(+), 36 deletions(-) diff --git a/cmd/prow-job-dispatcher/main.go b/cmd/prow-job-dispatcher/main.go index e6429c8b314..2e74b9af03d 100644 --- a/cmd/prow-job-dispatcher/main.go +++ b/cmd/prow-job-dispatcher/main.go @@ -269,19 +269,6 @@ func (cv *clusterVolume) findClusterForJobConfig(cloudProvider string, jc *prowc return cluster, utilerrors.NewAggregate(errs) } -func extractCapabilities(labels map[string]string) []string { - var capabilities []string - prefix := "capability/" - - for key, value := range labels { - if strings.HasPrefix(key, prefix) { - capabilities = append(capabilities, value) - } - } - - return capabilities -} - func isBlockedClusterRelocationException(jobName string) bool { for _, re := range blockedClusterRelocationJobExceptions { if re.MatchString(jobName) { @@ -312,7 +299,7 @@ func findClusterAssigmentsForJobs(jc *prowconfig.JobConfig, path string, config blockedForJob := blockedClustersForJob(jobBase.Name, string(determinedCluster), blocked) c := dispatcher.DetermineTargetCluster(cluster, string(determinedCluster), string(config.Default), canBeRelocated, blockedForJob) - pjs[jobBase.Name] = dispatcher.ProwJobData{Cluster: c, Capabilities: extractCapabilities(jobBase.Labels)} + pjs[jobBase.Name] = dispatcher.ProwJobData{Cluster: c, Capabilities: dispatcher.ExtractCapabilities(jobBase.Labels)} logrus.WithField("job", jobBase.Name).WithField("cluster", c).Info("found cluster for job") return nil } @@ -320,7 +307,7 @@ func findClusterAssigmentsForJobs(jc *prowconfig.JobConfig, path string, config var errs []error for k := range jc.PresubmitsStatic { for _, job := range jc.PresubmitsStatic[k] { - if _, ok := pjs[job.Name]; !ok || !slices.Equal(pjs[job.Name].Capabilities, extractCapabilities(job.Labels)) { + if _, ok := pjs[job.Name]; !ok || !slices.Equal(pjs[job.Name].Capabilities, dispatcher.ExtractCapabilities(job.Labels)) { if err := getClusterForMissingJob(mostUsedCluster, job.JobBase, pjs); err != nil { errs = append(errs, err) } @@ -330,7 +317,7 @@ func findClusterAssigmentsForJobs(jc *prowconfig.JobConfig, path string, config for k := range jc.PostsubmitsStatic { for _, job := range jc.PostsubmitsStatic[k] { - if _, ok := pjs[job.Name]; !ok || !slices.Equal(pjs[job.Name].Capabilities, extractCapabilities(job.Labels)) { + if _, ok := pjs[job.Name]; !ok || !slices.Equal(pjs[job.Name].Capabilities, dispatcher.ExtractCapabilities(job.Labels)) { if err := getClusterForMissingJob(mostUsedCluster, job.JobBase, pjs); err != nil { errs = append(errs, err) } @@ -338,7 +325,7 @@ func findClusterAssigmentsForJobs(jc *prowconfig.JobConfig, path string, config } } for _, job := range jc.Periodics { - if _, ok := pjs[job.Name]; !ok || !slices.Equal(pjs[job.Name].Capabilities, extractCapabilities(job.Labels)) { + if _, ok := pjs[job.Name]; !ok || !slices.Equal(pjs[job.Name].Capabilities, dispatcher.ExtractCapabilities(job.Labels)) { if err := getClusterForMissingJob(mostUsedCluster, job.JobBase, pjs); err != nil { errs = append(errs, err) } @@ -357,7 +344,7 @@ func (cv *clusterVolume) addToVolume(cluster string, jobBase prowconfig.JobBase, blockedForJob := blockedClustersForJob(jobBase.Name, string(determinedCluster), cv.blocked) c := dispatcher.DetermineTargetCluster(cluster, string(determinedCluster), string(config.Default), canBeRelocated, blockedForJob) - cv.pjs[jobBase.Name] = dispatcher.ProwJobData{Cluster: c, Capabilities: extractCapabilities(jobBase.Labels)} + cv.pjs[jobBase.Name] = dispatcher.ProwJobData{Cluster: c, Capabilities: dispatcher.ExtractCapabilities(jobBase.Labels)} if determinedCloudProvider := config.IsInBuildFarm(api.Cluster(c)); determinedCloudProvider != "" { cv.clusterVolumeMap[string(determinedCloudProvider)][c] = cv.clusterVolumeMap[string(determinedCloudProvider)][c] + jobVolumes[jobBase.Name] return nil @@ -757,6 +744,9 @@ func main() { return } prowjobs.Regenerate(pjs) + if err := dispatcher.WriteGob(o.jobsStoragePath, pjs); err != nil { + logrus.WithError(err).Error("continuing on cache memory, error writing Gob file after delta dispatch") + } } dispatchWrapper = func(forceDispatch bool) { @@ -789,7 +779,8 @@ func main() { jobVolumes, err := promVolumes.GetJobVolumes() if err != nil { - logrus.WithError(err).Fatal("failed to get job volumes") + logrus.WithError(err).Error("failed to get job volumes") + return } addEnabledClusters(config, enabled, @@ -805,6 +796,10 @@ func main() { logrus.WithError(err).Error("failed to dispatch") return } + if pjs == nil { + logrus.Warn("no build farm clusters configured, skipping regeneration") + return + } prowjobs.Regenerate(pjs) ecd.Reset(clustersFromConfig.UnsortedList()) diff --git a/pkg/dispatcher/config.go b/pkg/dispatcher/config.go index 4d784700077..a0187499783 100644 --- a/pkg/dispatcher/config.go +++ b/pkg/dispatcher/config.go @@ -118,13 +118,17 @@ func DetermineCloud(jobBase prowconfig.JobBase) string { return "" } -func extractRequiredCapabilities(labels map[string]string) []string { +// ExtractCapabilities returns a sorted list of capability values from labels of +// the form "capability/: " (key suffix must equal value). +// The result is sorted to ensure deterministic comparison with slices.Equal. +func ExtractCapabilities(labels map[string]string) []string { var capabilities []string for key, value := range labels { if strings.HasPrefix(key, "capability/") && strings.TrimPrefix(key, "capability/") == value { capabilities = append(capabilities, value) } } + sort.Strings(capabilities) return capabilities } @@ -172,7 +176,7 @@ func (config *Config) DetermineClusterForJob(jobBase prowconfig.JobBase, path st return api.Cluster(cluster), false, nil } - requiredCapabilities := extractRequiredCapabilities(jobBase.Labels) + requiredCapabilities := ExtractCapabilities(jobBase.Labels) if len(requiredCapabilities) > 0 { matchingClusters := []string{} matchingClustersByProvider := map[string][]string{} @@ -361,6 +365,9 @@ func LoadConfig(configPath string) (*Config, error) { if len(errs) > 0 { return nil, utilerrors.NewAggregate(errs) } + if err := config.Validate(); err != nil { + return nil, err + } return config, nil } @@ -383,7 +390,7 @@ func (config *Config) Validate() error { } // sort for tests sort.Strings(matches) - if len(matches) > 1 { + if len(matches) > 0 { return fmt.Errorf("there are job names occurring more than once: %s", matches) } return nil diff --git a/pkg/dispatcher/gob.go b/pkg/dispatcher/gob.go index c7548f36db9..d4272d9af03 100644 --- a/pkg/dispatcher/gob.go +++ b/pkg/dispatcher/gob.go @@ -2,7 +2,9 @@ package dispatcher import ( "encoding/gob" + "fmt" "os" + "path/filepath" ) func ReadGob(filename string, data interface{}) error { @@ -13,26 +15,32 @@ func ReadGob(filename string, data interface{}) error { defer file.Close() decoder := gob.NewDecoder(file) - err = decoder.Decode(data) - if err != nil { - return err - } - - return nil + return decoder.Decode(data) } +// WriteGob atomically writes data to filename by writing to a temporary file +// in the same directory and then renaming it. This prevents corruption if the +// process is interrupted mid-write. func WriteGob(filename string, data interface{}) error { - file, err := os.Create(filename) + dir := filepath.Dir(filename) + tmp, err := os.CreateTemp(dir, filepath.Base(filename)+".tmp.*") if err != nil { - return err + return fmt.Errorf("failed to create temp file: %w", err) } - defer file.Close() + tmpName := tmp.Name() - encoder := gob.NewEncoder(file) - err = encoder.Encode(data) - if err != nil { - return err + if err := gob.NewEncoder(tmp).Encode(data); err != nil { + tmp.Close() + os.Remove(tmpName) + return fmt.Errorf("failed to encode gob: %w", err) + } + if err := tmp.Close(); err != nil { + os.Remove(tmpName) + return fmt.Errorf("failed to close temp file: %w", err) + } + if err := os.Rename(tmpName, filename); err != nil { + os.Remove(tmpName) + return fmt.Errorf("failed to rename temp file: %w", err) } - return nil }