Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 14 additions & 19 deletions cmd/prow-job-dispatcher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,19 +269,6 @@ func (cv *clusterVolume) findClusterForJobConfig(cloudProvider string, jc *prowc
return cluster, utilerrors.NewAggregate(errs)
}

func extractCapabilities(labels map[string]string) []string {
var capabilities []string
prefix := "capability/"

for key, value := range labels {
if strings.HasPrefix(key, prefix) {
capabilities = append(capabilities, value)
}
}

return capabilities
}

func isBlockedClusterRelocationException(jobName string) bool {
for _, re := range blockedClusterRelocationJobExceptions {
if re.MatchString(jobName) {
Expand Down Expand Up @@ -312,15 +299,15 @@ func findClusterAssigmentsForJobs(jc *prowconfig.JobConfig, path string, config

blockedForJob := blockedClustersForJob(jobBase.Name, string(determinedCluster), blocked)
c := dispatcher.DetermineTargetCluster(cluster, string(determinedCluster), string(config.Default), canBeRelocated, blockedForJob)
pjs[jobBase.Name] = dispatcher.ProwJobData{Cluster: c, Capabilities: extractCapabilities(jobBase.Labels)}
pjs[jobBase.Name] = dispatcher.ProwJobData{Cluster: c, Capabilities: dispatcher.ExtractCapabilities(jobBase.Labels)}
logrus.WithField("job", jobBase.Name).WithField("cluster", c).Info("found cluster for job")
return nil
}

var errs []error
for k := range jc.PresubmitsStatic {
for _, job := range jc.PresubmitsStatic[k] {
if _, ok := pjs[job.Name]; !ok || !slices.Equal(pjs[job.Name].Capabilities, extractCapabilities(job.Labels)) {
if _, ok := pjs[job.Name]; !ok || !slices.Equal(pjs[job.Name].Capabilities, dispatcher.ExtractCapabilities(job.Labels)) {
if err := getClusterForMissingJob(mostUsedCluster, job.JobBase, pjs); err != nil {
errs = append(errs, err)
}
Expand All @@ -330,15 +317,15 @@ func findClusterAssigmentsForJobs(jc *prowconfig.JobConfig, path string, config

for k := range jc.PostsubmitsStatic {
for _, job := range jc.PostsubmitsStatic[k] {
if _, ok := pjs[job.Name]; !ok || !slices.Equal(pjs[job.Name].Capabilities, extractCapabilities(job.Labels)) {
if _, ok := pjs[job.Name]; !ok || !slices.Equal(pjs[job.Name].Capabilities, dispatcher.ExtractCapabilities(job.Labels)) {
if err := getClusterForMissingJob(mostUsedCluster, job.JobBase, pjs); err != nil {
errs = append(errs, err)
}
}
}
}
for _, job := range jc.Periodics {
if _, ok := pjs[job.Name]; !ok || !slices.Equal(pjs[job.Name].Capabilities, extractCapabilities(job.Labels)) {
if _, ok := pjs[job.Name]; !ok || !slices.Equal(pjs[job.Name].Capabilities, dispatcher.ExtractCapabilities(job.Labels)) {
if err := getClusterForMissingJob(mostUsedCluster, job.JobBase, pjs); err != nil {
errs = append(errs, err)
}
Expand All @@ -357,7 +344,7 @@ func (cv *clusterVolume) addToVolume(cluster string, jobBase prowconfig.JobBase,

blockedForJob := blockedClustersForJob(jobBase.Name, string(determinedCluster), cv.blocked)
c := dispatcher.DetermineTargetCluster(cluster, string(determinedCluster), string(config.Default), canBeRelocated, blockedForJob)
cv.pjs[jobBase.Name] = dispatcher.ProwJobData{Cluster: c, Capabilities: extractCapabilities(jobBase.Labels)}
cv.pjs[jobBase.Name] = dispatcher.ProwJobData{Cluster: c, Capabilities: dispatcher.ExtractCapabilities(jobBase.Labels)}
if determinedCloudProvider := config.IsInBuildFarm(api.Cluster(c)); determinedCloudProvider != "" {
cv.clusterVolumeMap[string(determinedCloudProvider)][c] = cv.clusterVolumeMap[string(determinedCloudProvider)][c] + jobVolumes[jobBase.Name]
return nil
Expand Down Expand Up @@ -757,6 +744,9 @@ func main() {
return
}
prowjobs.Regenerate(pjs)
if err := dispatcher.WriteGob(o.jobsStoragePath, pjs); err != nil {
logrus.WithError(err).Error("continuing on cache memory, error writing Gob file after delta dispatch")
}
}

dispatchWrapper = func(forceDispatch bool) {
Expand Down Expand Up @@ -789,7 +779,8 @@ func main() {

jobVolumes, err := promVolumes.GetJobVolumes()
if err != nil {
logrus.WithError(err).Fatal("failed to get job volumes")
logrus.WithError(err).Error("failed to get job volumes")
return
}

addEnabledClusters(config, enabled,
Expand All @@ -805,6 +796,10 @@ func main() {
logrus.WithError(err).Error("failed to dispatch")
return
}
if pjs == nil {
logrus.Warn("no build farm clusters configured, skipping regeneration")
return
}
prowjobs.Regenerate(pjs)

ecd.Reset(clustersFromConfig.UnsortedList())
Expand Down
13 changes: 10 additions & 3 deletions pkg/dispatcher/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,17 @@ func DetermineCloud(jobBase prowconfig.JobBase) string {
return ""
}

func extractRequiredCapabilities(labels map[string]string) []string {
// ExtractCapabilities returns a sorted list of capability values from labels of
// the form "capability/<name>: <name>" (key suffix must equal value).
// The result is sorted to ensure deterministic comparison with slices.Equal.
func ExtractCapabilities(labels map[string]string) []string {
var capabilities []string
for key, value := range labels {
if strings.HasPrefix(key, "capability/") && strings.TrimPrefix(key, "capability/") == value {
capabilities = append(capabilities, value)
}
}
sort.Strings(capabilities)
return capabilities
}

Expand Down Expand Up @@ -172,7 +176,7 @@ func (config *Config) DetermineClusterForJob(jobBase prowconfig.JobBase, path st
return api.Cluster(cluster), false, nil
}

requiredCapabilities := extractRequiredCapabilities(jobBase.Labels)
requiredCapabilities := ExtractCapabilities(jobBase.Labels)
if len(requiredCapabilities) > 0 {
matchingClusters := []string{}
matchingClustersByProvider := map[string][]string{}
Expand Down Expand Up @@ -361,6 +365,9 @@ func LoadConfig(configPath string) (*Config, error) {
if len(errs) > 0 {
return nil, utilerrors.NewAggregate(errs)
}
if err := config.Validate(); err != nil {
return nil, err
}
return config, nil
}

Expand All @@ -383,7 +390,7 @@ func (config *Config) Validate() error {
}
// sort for tests
sort.Strings(matches)
if len(matches) > 1 {
if len(matches) > 0 {
return fmt.Errorf("there are job names occurring more than once: %s", matches)
}
return nil
Expand Down
36 changes: 22 additions & 14 deletions pkg/dispatcher/gob.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package dispatcher

import (
"encoding/gob"
"fmt"
"os"
"path/filepath"
)

func ReadGob(filename string, data interface{}) error {
Expand All @@ -13,26 +15,32 @@ func ReadGob(filename string, data interface{}) error {
defer file.Close()

decoder := gob.NewDecoder(file)
err = decoder.Decode(data)
if err != nil {
return err
}

return nil
return decoder.Decode(data)
}

// WriteGob atomically writes data to filename by writing to a temporary file
// in the same directory and then renaming it. This prevents corruption if the
// process is interrupted mid-write.
func WriteGob(filename string, data interface{}) error {
file, err := os.Create(filename)
dir := filepath.Dir(filename)
tmp, err := os.CreateTemp(dir, filepath.Base(filename)+".tmp.*")
if err != nil {
return err
return fmt.Errorf("failed to create temp file: %w", err)
}
defer file.Close()
tmpName := tmp.Name()

encoder := gob.NewEncoder(file)
err = encoder.Encode(data)
if err != nil {
return err
if err := gob.NewEncoder(tmp).Encode(data); err != nil {
tmp.Close()
os.Remove(tmpName)
return fmt.Errorf("failed to encode gob: %w", err)
}
if err := tmp.Close(); err != nil {
os.Remove(tmpName)
return fmt.Errorf("failed to close temp file: %w", err)
}
if err := os.Rename(tmpName, filename); err != nil {
os.Remove(tmpName)
return fmt.Errorf("failed to rename temp file: %w", err)
}

return nil
}