Skip to content

Commit 795cde0

Browse files
committed
EndpointAccessibleController: Try the check 3 times
The reason for this change is to get rid of intermittent Available=false, even during upgrades. This is achieved by retrying the whole fetch endpoints and send requests loop 3 times, including a short sleep between each attempt.
1 parent 9a8c53c commit 795cde0

2 files changed

Lines changed: 304 additions & 51 deletions

File tree

pkg/libs/endpointaccessible/endpoint_accessible_controller.go

Lines changed: 107 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,34 @@ import (
2020
"github.com/openshift/library-go/pkg/operator/v1helpers"
2121
)
2222

23+
// The following constants are put together so that
24+
// all attempts fit safely into resyncInterval.
25+
const (
26+
resyncInterval = 1 * time.Minute
27+
28+
defaultRequestTimeout = 10 * time.Second
29+
defaultRetryInterval = 5 * time.Second
30+
defaultAttemptCount = 3
31+
)
32+
2333
type endpointAccessibleController struct {
2434
controllerInstanceName string
2535
operatorClient v1helpers.OperatorClient
2636
endpointListFn EndpointListFunc
2737
getTLSConfigFn EndpointTLSConfigFunc
2838
availableConditionName string
2939
endpointCheckDisabledFunc EndpointCheckDisabledFunc
40+
// httpClient overrides the default TLS client when set; used in tests.
41+
httpClient *http.Client
42+
// requestTimeout is the per-request context timeout. Zero means no
43+
// timeout; defaults to defaultRequestTimeout when unset.
44+
requestTimeout time.Duration
45+
// retryInterval is the sleep duration between retry attempts. Zero means
46+
// no sleep; defaults to defaultRetryInterval when unset.
47+
retryInterval time.Duration
48+
// attemptCount is the maximum number of fetch+check cycles; defaults to
49+
// defaultAttemptCount when unset.
50+
attemptCount int
3051
}
3152

3253
type EndpointListFunc func() ([]string, error)
@@ -59,7 +80,7 @@ func NewEndpointAccessibleController(
5980
WithInformers(triggers...).
6081
WithInformers(operatorClient.Informer()).
6182
WithSync(c.sync).
62-
ResyncEvery(wait.Jitter(time.Minute, 1.0)).
83+
ResyncEvery(wait.Jitter(resyncInterval, 1.0)).
6384
WithSyncDegradedOnError(operatorClient).
6485
ToController(
6586
controllerName, // Don't change what is passed here unless you also remove the old FooDegraded condition
@@ -88,59 +109,99 @@ func (c *endpointAccessibleController) sync(ctx context.Context, syncCtx factory
88109
}
89110
}
90111

91-
endpoints, err := c.endpointListFn()
92-
if err != nil {
93-
if apierrors.IsNotFound(err) {
94-
status := applyoperatorv1.OperatorStatus().
95-
WithConditions(applyoperatorv1.OperatorCondition().
96-
WithType(c.availableConditionName).
97-
WithStatus(operatorv1.ConditionFalse).
98-
WithReason("ResourceNotFound").
99-
WithMessage(err.Error()))
100-
return c.operatorClient.ApplyOperatorStatus(ctx, c.controllerInstanceName, status)
112+
client := c.httpClient
113+
if client == nil {
114+
var err error
115+
client, err = c.buildTLSClient()
116+
if err != nil {
117+
return err
101118
}
102-
103-
return err
104119
}
105120

106-
client, err := c.buildTLSClient()
107-
if err != nil {
108-
return err
121+
// Retry the full fetch+check cycle so that stale pod IPs from a rolling
122+
// upgrade are replaced with fresh ones as soon as the Endpoints object is
123+
// updated between attempts.
124+
var (
125+
endpoints []string
126+
errors []error
127+
)
128+
attempts := c.attemptCount
129+
if attempts <= 0 {
130+
attempts = defaultAttemptCount
131+
}
132+
requestTimeout := c.requestTimeout
133+
if requestTimeout <= 0 {
134+
requestTimeout = defaultRequestTimeout
109135
}
110-
// check all the endpoints in parallel. This matters for pods.
111-
errCh := make(chan error, len(endpoints))
112-
wg := sync.WaitGroup{}
113-
for _, endpoint := range endpoints {
114-
wg.Add(1)
115-
go func(endpoint string) {
116-
defer wg.Done()
117-
118-
reqCtx, cancel := context.WithTimeout(ctx, 10*time.Second) // avoid waiting forever
119-
defer cancel()
120-
req, err := http.NewRequestWithContext(reqCtx, http.MethodGet, endpoint, nil)
121-
if err != nil {
122-
errCh <- humanizeError(err)
123-
return
136+
retryInterval := c.retryInterval
137+
if retryInterval <= 0 {
138+
retryInterval = defaultRetryInterval
139+
}
140+
for i := range attempts {
141+
var err error
142+
endpoints, err = c.endpointListFn()
143+
if err != nil {
144+
if apierrors.IsNotFound(err) {
145+
status := applyoperatorv1.OperatorStatus().
146+
WithConditions(applyoperatorv1.OperatorCondition().
147+
WithType(c.availableConditionName).
148+
WithStatus(operatorv1.ConditionFalse).
149+
WithReason("ResourceNotFound").
150+
WithMessage(err.Error()))
151+
return c.operatorClient.ApplyOperatorStatus(ctx, c.controllerInstanceName, status)
124152
}
153+
return err
154+
}
125155

126-
resp, err := client.Do(req)
127-
if err != nil {
128-
errCh <- humanizeError(err)
129-
return
130-
}
131-
defer resp.Body.Close()
156+
// Check all the endpoints in parallel. This matters for pods.
157+
errCh := make(chan error, len(endpoints))
158+
wg := sync.WaitGroup{}
159+
for _, endpoint := range endpoints {
160+
wg.Add(1)
161+
go func(endpoint string) {
162+
defer wg.Done()
132163

133-
if resp.StatusCode > 299 || resp.StatusCode < 200 {
134-
errCh <- fmt.Errorf("%q returned %q", endpoint, resp.Status)
135-
}
136-
}(endpoint)
137-
}
138-
wg.Wait()
139-
close(errCh)
164+
reqCtx, cancel := context.WithTimeout(ctx, requestTimeout)
165+
defer cancel()
166+
167+
req, err := http.NewRequestWithContext(reqCtx, http.MethodGet, endpoint, nil)
168+
if err != nil {
169+
errCh <- humanizeError(err)
170+
return
171+
}
172+
173+
resp, err := client.Do(req)
174+
if err != nil {
175+
errCh <- humanizeError(err)
176+
return
177+
}
178+
defer resp.Body.Close() //nolint:errcheck
140179

141-
var errors []error
142-
for err := range errCh {
143-
errors = append(errors, err)
180+
if resp.StatusCode > 299 || resp.StatusCode < 200 {
181+
errCh <- fmt.Errorf("%q returned %q", endpoint, resp.Status)
182+
}
183+
}(endpoint)
184+
}
185+
wg.Wait()
186+
close(errCh)
187+
188+
errors = nil
189+
for err := range errCh {
190+
errors = append(errors, err)
191+
}
192+
193+
if len(endpoints) > 0 && len(errors) < len(endpoints) {
194+
break // at least one endpoint responded; no need to retry
195+
}
196+
197+
// Sleep before the next attempt to give the Endpoints object time to
198+
// be updated (e.g. during a rolling upgrade). Skip on the last attempt.
199+
if i < attempts-1 {
200+
select {
201+
case <-time.After(retryInterval):
202+
case <-ctx.Done():
203+
}
204+
}
144205
}
145206

146207
// if at least one endpoint responded, we are available

0 commit comments

Comments
 (0)