@@ -12,6 +12,7 @@ import (
1212 apierrors "k8s.io/apimachinery/pkg/api/errors"
1313 utilerrors "k8s.io/apimachinery/pkg/util/errors"
1414 "k8s.io/apimachinery/pkg/util/wait"
15+ "k8s.io/klog/v2"
1516
1617 operatorv1 "github.com/openshift/api/operator/v1"
1718 applyoperatorv1 "github.com/openshift/client-go/operator/applyconfigurations/operator/v1"
@@ -20,13 +21,34 @@ import (
2021 "github.com/openshift/library-go/pkg/operator/v1helpers"
2122)
2223
24+ // The following constants are put together so that
25+ // all attempts fit safely into resyncInterval.
26+ const (
27+ resyncInterval = 1 * time .Minute
28+
29+ defaultRequestTimeout = 10 * time .Second
30+ defaultRetryInterval = 5 * time .Second
31+ defaultAttemptCount = 3
32+ )
33+
2334type endpointAccessibleController struct {
2435 controllerInstanceName string
2536 operatorClient v1helpers.OperatorClient
2637 endpointListFn EndpointListFunc
2738 getTLSConfigFn EndpointTLSConfigFunc
2839 availableConditionName string
2940 endpointCheckDisabledFunc EndpointCheckDisabledFunc
41+ // httpClient overrides the default TLS client when set; used in tests.
42+ httpClient * http.Client
43+ // requestTimeout is the per-request context timeout. Zero means no
44+ // timeout; defaults to defaultRequestTimeout when unset.
45+ requestTimeout time.Duration
46+ // retryInterval is the sleep duration between retry attempts. Zero means
47+ // no sleep; defaults to defaultRetryInterval when unset.
48+ retryInterval time.Duration
49+ // attemptCount is the maximum number of fetch+check cycles; defaults to
50+ // defaultAttemptCount when unset.
51+ attemptCount int
3052}
3153
3254type EndpointListFunc func () ([]string , error )
@@ -59,7 +81,7 @@ func NewEndpointAccessibleController(
5981 WithInformers (triggers ... ).
6082 WithInformers (operatorClient .Informer ()).
6183 WithSync (c .sync ).
62- ResyncEvery (wait .Jitter (time . Minute , 1.0 )).
84+ ResyncEvery (wait .Jitter (resyncInterval , 1.0 )).
6385 WithSyncDegradedOnError (operatorClient ).
6486 ToController (
6587 controllerName , // Don't change what is passed here unless you also remove the old FooDegraded condition
@@ -88,59 +110,104 @@ func (c *endpointAccessibleController) sync(ctx context.Context, syncCtx factory
88110 }
89111 }
90112
91- endpoints , err := c .endpointListFn ()
92- if err != nil {
93- if apierrors .IsNotFound (err ) {
94- status := applyoperatorv1 .OperatorStatus ().
95- WithConditions (applyoperatorv1 .OperatorCondition ().
96- WithType (c .availableConditionName ).
97- WithStatus (operatorv1 .ConditionFalse ).
98- WithReason ("ResourceNotFound" ).
99- WithMessage (err .Error ()))
100- return c .operatorClient .ApplyOperatorStatus (ctx , c .controllerInstanceName , status )
113+ client := c .httpClient
114+ if client == nil {
115+ var err error
116+ client , err = c .buildTLSClient ()
117+ if err != nil {
118+ return err
101119 }
102-
103- return err
104120 }
105121
106- client , err := c .buildTLSClient ()
107- if err != nil {
108- return err
122+ // Retry the full fetch+check cycle so that stale pod IPs from a rolling
123+ // upgrade are replaced with fresh ones as soon as the Endpoints object is
124+ // updated between attempts.
125+ var (
126+ endpoints []string
127+ errors []error
128+ )
129+ attempts := c .attemptCount
130+ if attempts <= 0 {
131+ attempts = defaultAttemptCount
132+ }
133+ requestTimeout := c .requestTimeout
134+ if requestTimeout <= 0 {
135+ requestTimeout = defaultRequestTimeout
136+ }
137+ retryInterval := c .retryInterval
138+ if retryInterval <= 0 {
139+ retryInterval = defaultRetryInterval
109140 }
110- // check all the endpoints in parallel. This matters for pods.
111- errCh := make (chan error , len (endpoints ))
112- wg := sync.WaitGroup {}
113- for _ , endpoint := range endpoints {
114- wg .Add (1 )
115- go func (endpoint string ) {
116- defer wg .Done ()
117-
118- reqCtx , cancel := context .WithTimeout (ctx , 10 * time .Second ) // avoid waiting forever
119- defer cancel ()
120- req , err := http .NewRequestWithContext (reqCtx , http .MethodGet , endpoint , nil )
121- if err != nil {
122- errCh <- humanizeError (err )
123- return
141+ for i := range attempts {
142+ // Sleep before the next attempt to give the Endpoints object time to
143+ // be updated (e.g. during a rolling upgrade).
144+ if i > 0 {
145+ select {
146+ case <- time .After (retryInterval ):
147+ case <- ctx .Done ():
148+ return ctx .Err ()
124149 }
150+ }
125151
126- resp , err := client .Do (req )
127- if err != nil {
128- errCh <- humanizeError (err )
129- return
152+ var err error
153+ endpoints , err = c .endpointListFn ()
154+ if err != nil {
155+ if apierrors .IsNotFound (err ) {
156+ status := applyoperatorv1 .OperatorStatus ().
157+ WithConditions (applyoperatorv1 .OperatorCondition ().
158+ WithType (c .availableConditionName ).
159+ WithStatus (operatorv1 .ConditionFalse ).
160+ WithReason ("ResourceNotFound" ).
161+ WithMessage (err .Error ()))
162+ return c .operatorClient .ApplyOperatorStatus (ctx , c .controllerInstanceName , status )
130163 }
131- defer resp .Body .Close ()
132-
133- if resp .StatusCode > 299 || resp .StatusCode < 200 {
134- errCh <- fmt .Errorf ("%q returned %q" , endpoint , resp .Status )
164+ if i == attempts - 1 {
165+ return err
135166 }
136- }(endpoint )
137- }
138- wg .Wait ()
139- close (errCh )
167+ klog .FromContext (ctx ).Error (err , "Failed to list endpoints, retrying..." , "attempt" , i )
168+ continue
169+ }
170+
171+ // Check all the endpoints in parallel. This matters for pods.
172+ errCh := make (chan error , len (endpoints ))
173+ wg := sync.WaitGroup {}
174+ for _ , endpoint := range endpoints {
175+ wg .Add (1 )
176+ go func (endpoint string ) {
177+ defer wg .Done ()
178+
179+ reqCtx , cancel := context .WithTimeout (ctx , requestTimeout )
180+ defer cancel ()
181+
182+ req , err := http .NewRequestWithContext (reqCtx , http .MethodGet , endpoint , nil )
183+ if err != nil {
184+ errCh <- humanizeError (err )
185+ return
186+ }
187+
188+ resp , err := client .Do (req )
189+ if err != nil {
190+ errCh <- humanizeError (err )
191+ return
192+ }
193+ defer resp .Body .Close () //nolint:errcheck
140194
141- var errors []error
142- for err := range errCh {
143- errors = append (errors , err )
195+ if resp .StatusCode > 299 || resp .StatusCode < 200 {
196+ errCh <- fmt .Errorf ("%q returned %q" , endpoint , resp .Status )
197+ }
198+ }(endpoint )
199+ }
200+ wg .Wait ()
201+ close (errCh )
202+
203+ errors = nil
204+ for err := range errCh {
205+ errors = append (errors , err )
206+ }
207+
208+ if len (endpoints ) > 0 && len (errors ) < len (endpoints ) {
209+ break // at least one endpoint responded; no need to retry
210+ }
144211 }
145212
146213 // if at least one endpoint responded, we are available
@@ -189,7 +256,6 @@ func (c *endpointAccessibleController) buildTLSClient() (*http.Client, error) {
189256 transport .TLSClientConfig = tlsConfig
190257 }
191258 return & http.Client {
192- Timeout : 5 * time .Second ,
193259 Transport : transport ,
194260 }, nil
195261}
0 commit comments