diff --git a/Makefile b/Makefile index fdcc2b2..e8ddee2 100644 --- a/Makefile +++ b/Makefile @@ -308,9 +308,10 @@ delete-targetsources-dev-lab: ## Delete the target sources for the development l ##@ Testing Lab .PHONY: run-integration-tests -run-integration-tests: docker-build undeploy-test-cluster deploy-test-cluster install-test-cluster-dependencies load-test-image deploy install-kubectl install-gnmic install-containerlab deploy-test-topology apply-test-resources +run-integration-tests: docker-build undeploy-test-cluster deploy-test-cluster install-test-cluster-dependencies load-test-image deploy install-kubectl install-gnmic install-containerlab deploy-test-topology deploy-test-http-server apply-test-resources kubectl wait --for=condition=Ready cluster --all --timeout=180s kubectl wait --for=condition=Ready pipeline --all --timeout=180s + kubectl wait --for=condition=Ready targetsource --all --timeout=180s kubectl wait --for=jsonpath='{.status.connectionState}'=READY target --all --timeout=180s kubectl get subscriptions -o yaml kubectl get outputs -o yaml diff --git a/api/v1alpha1/targetsource_types.go b/api/v1alpha1/targetsource_types.go index dd6fb59..f9bd0ab 100644 --- a/api/v1alpha1/targetsource_types.go +++ b/api/v1alpha1/targetsource_types.go @@ -17,45 +17,211 @@ limitations under the License. package v1alpha1 import ( + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) // TargetSourceSpec defines the desired state of TargetSource // +kubebuilder:validation:Required type TargetSourceSpec struct { + // Provider defines the source of targets for this TargetSource + // Only one provider can be specified per TargetSource + // +kubebuilder:validation:Required Provider *ProviderSpec `json:"provider"` + // TODO: implement in message processor + // Optional port to use for discovered targets if not specified by the provider + // +kubebuilder:validation:Optional + TargetPort int32 `json:"targetPort,omitempty"` + + // Optional labels to apply to all targets discovered by this TargetSource // +kubebuilder:validation:Optional TargetLabels map[string]string `json:"targetLabels,omitempty"` + // The TargetProfile to use for targets discovered by this TargetSource + // +kubebuilder:validation:Required // +kubebuilder:validation:MinLength=1 TargetProfile string `json:"targetProfile"` } -// +kubebuilder:validation:ExactlyOneOf=http;consul +// ProviderSpec defines the source of targets for a TargetSource +// Only one provider can be specified per TargetSource +// +kubebuilder:validation:ExactlyOneOf=http type ProviderSpec struct { - HTTP *HTTPConfig `json:"http,omitempty"` - Consul *ConsulConfig `json:"consul,omitempty"` + // HTTP defines the configuration for a HTTP provider + HTTP *HTTPConfig `json:"http,omitempty"` } +// HTTPConfig defines the configuration for the HTTP provider +// +kubebuilder:validation:AtLeastOneOf:=url;push type HTTPConfig struct { + // URL of the HTTP endpoint to pull targets from + // If defined, the loader will periodically poll this endpoint for targets + // +kubebuilder:validation:Optional + URL string `json:"url,omitempty"` + + // Optional authorization configuration for accessing the HTTP endpoint + // +kubebuilder:validation:Optional + Authorization *AuthorizationSpec `json:"authorization,omitempty"` + + // Optional interval for polling the HTTP endpoint for targets + // TODO: document about default value + // +kubebuilder:default="6h" + // +kubebuilder:validation:Optional + Interval *metav1.Duration `json:"interval,omitempty"` + + // Optional timeout for HTTP requests to the endpoint + // +kubebuilder:default="10s" + // +kubebuilder:validation:Optional + Timeout *metav1.Duration `json:"timeout,omitempty"` + + // Optional TLS configuration for connecting to the HTTP endpoint + // If it is an HTTP endpoint, this will be ignored + // +kubebuilder:validation:Optional + TLS *ClientTLSConfig `json:"tls,omitempty"` + + // Optional pagination configuration for parsing responses from the HTTP endpoint + // +kubebuilder:validation:Optional + Pagination *PaginationSpec `json:"pagination,omitempty"` + + // Optional mapping configuration for parsing responses from the HTTP endpoint + // +kubebuilder:validation:Optional + ResponseMapping *ResponseMappingSpec `json:"mapping,omitempty"` + + // Optional configuration to enable push + // +kubebuilder:validation:Optional + Push *PushSpec `json:"push,omitempty"` +} + +type ClientTLSConfig struct { + // Skip TLS verification of the Provider's certificate. + // +kubebuilder:default:=false + InsecureSkipVerify bool `json:"insecureSkipVerify,omitempty"` + + // Reference to a ConfigMap containing a bundle of PEM-encoded CAs to use when + // verifying the certificate chain presented by the Provider when using HTTPS. + // Mutually exclusive with CABundle. + // +kubebuilder:validation:Optional + CABundleRef *corev1.ConfigMapKeySelector `json:"caBundleRef,omitempty"` +} + +// AuthorizationSpec defines the configuration for authentication +// +kubebuilder:validation:ExactlyOneOf=basic;token +type AuthorizationSpec struct { + // Basic authentication configuration + Basic *BasicAuthSpec `json:"basic,omitempty"` + // Token-based authentication configuration + Token *TokenAuthSpec `json:"token,omitempty"` +} + +// BasicAuthSpec defines the configuration for basic authentication +type BasicAuthSpec struct { + // Reference to a Secret containing "username" and "password" keys to use for + // basic authentication when connecting to the Provider. + // +kubebuilder:validation:Required + CredentialsSecretRef *corev1.SecretKeySelector `json:"credentialsSecretRef"` +} + +// TokenAuthSpec defines the configuration for token-based authentication +type TokenAuthSpec struct { + // Scheme for the token, e.g. "Bearer" // +kubebuilder:validation:MinLength=1 - URL string `json:"url"` + Scheme string `json:"scheme"` + // Reference to a Secret containing a key with the token value to use for + // authentication when connecting to the Provider. + // Mutually exclusive with Token. + // +kubebuilder:validation:Required + TokenSecretRef *corev1.SecretKeySelector `json:"tokenSecretRef,omitempty"` +} + +// PaginationSpec defines the configuration for paginating through responses from providers +type PaginationSpec struct { + // Field name in the JSON response that contains the next page reference. + // The value can be either: + // - a full URL (used directly for the next request), or + // - a pagination token (appended as a query parameter using this field name as the key). + // + // Must refer to a top-level key in the response object. + // Example: "next" or "nextToken" + NextField string `json:"nextField,omitempty"` +} + +// CEL expressions to extract target fields from the response +// and map them to the corresponding Target fields. +type ResponseMappingSpec struct { + // Field name in the JSON response that contains the list of items (targets). + // If not specified, the entire response is expected to be a list of items. + // All subsequent fields are specified relative to this field + // Example: "results" if the response is of the form {"results": [ ... list of items ... ]} + // +kubebuilder:validation:Optional + TargetsField string `json:"targetsField,omitempty"` + + // CEL expression to extract the target name from the response + // If TargetsField is specified, this should be relative to TargetsField + // +kubebuilder:validation:Optional + Name string `json:"name"` + + // CEL expression to extract the target Address from the response + // If TargetsField is specified, this should be relative to TargetsField + // +kubebuilder:validation:Optional + Address string `json:"address"` + + // CEL expression to extract the target port from the response + // If TargetsField is specified, this should be relative to TargetsField + // +kubebuilder:validation:Optional + Port string `json:"port,omitempty"` + + // CEL expression to extract the target labels from the response + // The extracted labels will be merged with the static TargetLabels defined in the TargetSourceSpec, + // with values from the response taking precedence in case of conflicts. + // +kubebuilder:validation:Optional + Labels map[string]string `json:"labels,omitempty"` + + // CEL expression to extract the target profile from the response + // If TargetsField is specified, this should be relative to TargetsField // +kubebuilder:validation:Optional - AcceptPush bool `json:"acceptPush,omitempty"` + TargetProfile string `json:"targetProfile,omitempty"` } -type ConsulConfig struct { +// PushSpec defines the settings for event-based update mechanism (i.e. webhooks sent from the server) +type PushSpec struct { + // +kubebuilder:default=false + Enabled bool `json:"enabled"` + + // +kubebuilder:validation:Optional + Auth *PushAuthSpec `json:"auth,omitempty"` +} + +// +kubebuilder:validation:ExactlyOneOf:=bearer;signature +type PushAuthSpec struct { + Bearer *PushBearerAuthSpec `json:"bearer,omitempty"` + Signature *PushSignatureAuthSpec `json:"signature,omitempty"` +} + +// +kubebuilder:validation:Required +type PushBearerAuthSpec struct { + TokenSecretRef *corev1.SecretKeySelector `json:"tokenSecretRef,omitempty"` +} + +// +kubebuilder:validation:Required +type PushSignatureAuthSpec struct { + SecretRef *corev1.SecretKeySelector `json:"secretRef"` + + // Header containing the signature // +kubebuilder:validation:MinLength=1 - URL string `json:"url,omitempty"` + Header string `json:"header"` + + // +kubebuilder:default="sha512" + // +kubebuilder:validation:Enum=sha1;sha256;sha512 + Algorithm string `json:"algorithm"` } // TargetSourceStatus defines the observed state of TargetSource type TargetSourceStatus struct { - Status string `json:"status,omitempty"` - ObservedGeneration int64 `json:"observedGeneration"` - TargetsCount int32 `json:"targetsCount,omitempty"` - LastSync metav1.Time `json:"lastSync,omitempty"` + ObservedGeneration int64 `json:"observedGeneration,omitempty"` + TargetsCount int32 `json:"targetsCount,omitempty"` + LastSync metav1.Time `json:"lastSync,omitempty"` + Conditions []metav1.Condition `json:"conditions,omitempty"` } //+kubebuilder:object:root=true diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index 61e81fd..e2bab1d 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -46,6 +46,71 @@ func (in *APIConfig) DeepCopy() *APIConfig { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *AuthorizationSpec) DeepCopyInto(out *AuthorizationSpec) { + *out = *in + if in.Basic != nil { + in, out := &in.Basic, &out.Basic + *out = new(BasicAuthSpec) + (*in).DeepCopyInto(*out) + } + if in.Token != nil { + in, out := &in.Token, &out.Token + *out = new(TokenAuthSpec) + (*in).DeepCopyInto(*out) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new AuthorizationSpec. +func (in *AuthorizationSpec) DeepCopy() *AuthorizationSpec { + if in == nil { + return nil + } + out := new(AuthorizationSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *BasicAuthSpec) DeepCopyInto(out *BasicAuthSpec) { + *out = *in + if in.CredentialsSecretRef != nil { + in, out := &in.CredentialsSecretRef, &out.CredentialsSecretRef + *out = new(v1.SecretKeySelector) + (*in).DeepCopyInto(*out) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new BasicAuthSpec. +func (in *BasicAuthSpec) DeepCopy() *BasicAuthSpec { + if in == nil { + return nil + } + out := new(BasicAuthSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ClientTLSConfig) DeepCopyInto(out *ClientTLSConfig) { + *out = *in + if in.CABundleRef != nil { + in, out := &in.CABundleRef, &out.CABundleRef + *out = new(v1.ConfigMapKeySelector) + (*in).DeepCopyInto(*out) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ClientTLSConfig. +func (in *ClientTLSConfig) DeepCopy() *ClientTLSConfig { + if in == nil { + return nil + } + out := new(ClientTLSConfig) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Cluster) DeepCopyInto(out *Cluster) { *out = *in @@ -213,21 +278,6 @@ func (in *ClusterTargetState) DeepCopy() *ClusterTargetState { return out } -// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *ConsulConfig) DeepCopyInto(out *ConsulConfig) { - *out = *in -} - -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ConsulConfig. -func (in *ConsulConfig) DeepCopy() *ConsulConfig { - if in == nil { - return nil - } - out := new(ConsulConfig) - in.DeepCopyInto(out) - return out -} - // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *GRPCKeepAliveConfig) DeepCopyInto(out *GRPCKeepAliveConfig) { *out = *in @@ -273,6 +323,41 @@ func (in *GRPCTunnelConfig) DeepCopy() *GRPCTunnelConfig { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *HTTPConfig) DeepCopyInto(out *HTTPConfig) { *out = *in + if in.Authorization != nil { + in, out := &in.Authorization, &out.Authorization + *out = new(AuthorizationSpec) + (*in).DeepCopyInto(*out) + } + if in.Interval != nil { + in, out := &in.Interval, &out.Interval + *out = new(metav1.Duration) + **out = **in + } + if in.Timeout != nil { + in, out := &in.Timeout, &out.Timeout + *out = new(metav1.Duration) + **out = **in + } + if in.TLS != nil { + in, out := &in.TLS, &out.TLS + *out = new(ClientTLSConfig) + (*in).DeepCopyInto(*out) + } + if in.Pagination != nil { + in, out := &in.Pagination, &out.Pagination + *out = new(PaginationSpec) + **out = **in + } + if in.ResponseMapping != nil { + in, out := &in.ResponseMapping, &out.ResponseMapping + *out = new(ResponseMappingSpec) + (*in).DeepCopyInto(*out) + } + if in.Push != nil { + in, out := &in.Push, &out.Push + *out = new(PushSpec) + (*in).DeepCopyInto(*out) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new HTTPConfig. @@ -587,6 +672,21 @@ func (in *OutputStatus) DeepCopy() *OutputStatus { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *PaginationSpec) DeepCopyInto(out *PaginationSpec) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PaginationSpec. +func (in *PaginationSpec) DeepCopy() *PaginationSpec { + if in == nil { + return nil + } + out := new(PaginationSpec) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Pipeline) DeepCopyInto(out *Pipeline) { *out = *in @@ -824,12 +924,7 @@ func (in *ProviderSpec) DeepCopyInto(out *ProviderSpec) { if in.HTTP != nil { in, out := &in.HTTP, &out.HTTP *out = new(HTTPConfig) - **out = **in - } - if in.Consul != nil { - in, out := &in.Consul, &out.Consul - *out = new(ConsulConfig) - **out = **in + (*in).DeepCopyInto(*out) } } @@ -843,6 +938,113 @@ func (in *ProviderSpec) DeepCopy() *ProviderSpec { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *PushAuthSpec) DeepCopyInto(out *PushAuthSpec) { + *out = *in + if in.Bearer != nil { + in, out := &in.Bearer, &out.Bearer + *out = new(PushBearerAuthSpec) + (*in).DeepCopyInto(*out) + } + if in.Signature != nil { + in, out := &in.Signature, &out.Signature + *out = new(PushSignatureAuthSpec) + (*in).DeepCopyInto(*out) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PushAuthSpec. +func (in *PushAuthSpec) DeepCopy() *PushAuthSpec { + if in == nil { + return nil + } + out := new(PushAuthSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *PushBearerAuthSpec) DeepCopyInto(out *PushBearerAuthSpec) { + *out = *in + if in.TokenSecretRef != nil { + in, out := &in.TokenSecretRef, &out.TokenSecretRef + *out = new(v1.SecretKeySelector) + (*in).DeepCopyInto(*out) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PushBearerAuthSpec. +func (in *PushBearerAuthSpec) DeepCopy() *PushBearerAuthSpec { + if in == nil { + return nil + } + out := new(PushBearerAuthSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *PushSignatureAuthSpec) DeepCopyInto(out *PushSignatureAuthSpec) { + *out = *in + if in.SecretRef != nil { + in, out := &in.SecretRef, &out.SecretRef + *out = new(v1.SecretKeySelector) + (*in).DeepCopyInto(*out) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PushSignatureAuthSpec. +func (in *PushSignatureAuthSpec) DeepCopy() *PushSignatureAuthSpec { + if in == nil { + return nil + } + out := new(PushSignatureAuthSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *PushSpec) DeepCopyInto(out *PushSpec) { + *out = *in + if in.Auth != nil { + in, out := &in.Auth, &out.Auth + *out = new(PushAuthSpec) + (*in).DeepCopyInto(*out) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PushSpec. +func (in *PushSpec) DeepCopy() *PushSpec { + if in == nil { + return nil + } + out := new(PushSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ResponseMappingSpec) DeepCopyInto(out *ResponseMappingSpec) { + *out = *in + if in.Labels != nil { + in, out := &in.Labels, &out.Labels + *out = make(map[string]string, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ResponseMappingSpec. +func (in *ResponseMappingSpec) DeepCopy() *ResponseMappingSpec { + if in == nil { + return nil + } + out := new(ResponseMappingSpec) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ServiceConfig) DeepCopyInto(out *ServiceConfig) { *out = *in @@ -1315,6 +1517,13 @@ func (in *TargetSourceSpec) DeepCopy() *TargetSourceSpec { func (in *TargetSourceStatus) DeepCopyInto(out *TargetSourceStatus) { *out = *in in.LastSync.DeepCopyInto(&out.LastSync) + if in.Conditions != nil { + in, out := &in.Conditions, &out.Conditions + *out = make([]metav1.Condition, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TargetSourceStatus. @@ -1384,6 +1593,26 @@ func (in *TargetTLSConfig) DeepCopy() *TargetTLSConfig { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *TokenAuthSpec) DeepCopyInto(out *TokenAuthSpec) { + *out = *in + if in.TokenSecretRef != nil { + in, out := &in.TokenSecretRef, &out.TokenSecretRef + *out = new(v1.SecretKeySelector) + (*in).DeepCopyInto(*out) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TokenAuthSpec. +func (in *TokenAuthSpec) DeepCopy() *TokenAuthSpec { + if in == nil { + return nil + } + out := new(TokenAuthSpec) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *TunnelTargetPolicy) DeepCopyInto(out *TunnelTargetPolicy) { *out = *in diff --git a/config/crd/bases/operator.gnmic.dev_targetsources.yaml b/config/crd/bases/operator.gnmic.dev_targetsources.yaml index b8d24e1..fd4a317 100644 --- a/config/crd/bases/operator.gnmic.dev_targetsources.yaml +++ b/config/crd/bases/operator.gnmic.dev_targetsources.yaml @@ -40,33 +40,309 @@ spec: description: TargetSourceSpec defines the desired state of TargetSource properties: provider: + description: |- + Provider defines the source of targets for this TargetSource + Only one provider can be specified per TargetSource properties: - consul: - properties: - url: - minLength: 1 - type: string - type: object http: + description: HTTP defines the configuration for a HTTP provider properties: - acceptPush: - type: boolean + authorization: + description: Optional authorization configuration for accessing + the HTTP endpoint + properties: + basic: + description: Basic authentication configuration + properties: + credentialsSecretRef: + description: |- + Reference to a Secret containing "username" and "password" keys to use for + basic authentication when connecting to the Provider. + properties: + key: + description: The key of the secret to select from. Must + be a valid secret key. + type: string + name: + default: "" + description: |- + Name of the referent. + This field is effectively required, but due to backwards compatibility is + allowed to be empty. Instances of this type with an empty value here are + almost certainly wrong. + More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names + type: string + optional: + description: Specify whether the Secret or its + key must be defined + type: boolean + required: + - key + type: object + x-kubernetes-map-type: atomic + required: + - credentialsSecretRef + type: object + token: + description: Token-based authentication configuration + properties: + scheme: + description: Scheme for the token, e.g. "Bearer" + minLength: 1 + type: string + tokenSecretRef: + description: |- + Reference to a Secret containing a key with the token value to use for + authentication when connecting to the Provider. + Mutually exclusive with Token. + properties: + key: + description: The key of the secret to select from. Must + be a valid secret key. + type: string + name: + default: "" + description: |- + Name of the referent. + This field is effectively required, but due to backwards compatibility is + allowed to be empty. Instances of this type with an empty value here are + almost certainly wrong. + More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names + type: string + optional: + description: Specify whether the Secret or its + key must be defined + type: boolean + required: + - key + type: object + x-kubernetes-map-type: atomic + required: + - scheme + - tokenSecretRef + type: object + type: object + x-kubernetes-validations: + - message: exactly one of the fields in [basic token] must + be set + rule: '[has(self.basic),has(self.token)].filter(x,x==true).size() + == 1' + interval: + default: 6h + description: Optional interval for polling the HTTP endpoint + for targets + type: string + mapping: + description: Optional mapping configuration for parsing responses + from the HTTP endpoint + properties: + address: + description: |- + CEL expression to extract the target Address from the response + If TargetsField is specified, this should be relative to TargetsField + type: string + labels: + additionalProperties: + type: string + description: |- + CEL expression to extract the target labels from the response + The extracted labels will be merged with the static TargetLabels defined in the TargetSourceSpec, + with values from the response taking precedence in case of conflicts. + type: object + name: + description: |- + CEL expression to extract the target name from the response + If TargetsField is specified, this should be relative to TargetsField + type: string + port: + description: |- + CEL expression to extract the target port from the response + If TargetsField is specified, this should be relative to TargetsField + type: string + targetProfile: + description: |- + CEL expression to extract the target profile from the response + If TargetsField is specified, this should be relative to TargetsField + type: string + targetsField: + description: |- + Field name in the JSON response that contains the list of items (targets). + If not specified, the entire response is expected to be a list of items. + All subsequent fields are specified relative to this field + Example: "results" if the response is of the form {"results": [ ... list of items ... ]} + type: string + type: object + pagination: + description: Optional pagination configuration for parsing + responses from the HTTP endpoint + properties: + nextField: + description: |- + Field name in the JSON response that contains the next page reference. + The value can be either: + - a full URL (used directly for the next request), or + - a pagination token (appended as a query parameter using this field name as the key). + + Must refer to a top-level key in the response object. + Example: "next" or "nextToken" + type: string + type: object + push: + description: Optional configuration to enable push + properties: + auth: + properties: + bearer: + properties: + tokenSecretRef: + description: SecretKeySelector selects a key of + a Secret. + properties: + key: + description: The key of the secret to select + from. Must be a valid secret key. + type: string + name: + default: "" + description: |- + Name of the referent. + This field is effectively required, but due to backwards compatibility is + allowed to be empty. Instances of this type with an empty value here are + almost certainly wrong. + More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names + type: string + optional: + description: Specify whether the Secret or + its key must be defined + type: boolean + required: + - key + type: object + x-kubernetes-map-type: atomic + type: object + signature: + properties: + algorithm: + default: sha512 + enum: + - sha1 + - sha256 + - sha512 + type: string + header: + description: Header containing the signature + minLength: 1 + type: string + secretRef: + description: SecretKeySelector selects a key of + a Secret. + properties: + key: + description: The key of the secret to select + from. Must be a valid secret key. + type: string + name: + default: "" + description: |- + Name of the referent. + This field is effectively required, but due to backwards compatibility is + allowed to be empty. Instances of this type with an empty value here are + almost certainly wrong. + More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names + type: string + optional: + description: Specify whether the Secret or + its key must be defined + type: boolean + required: + - key + type: object + x-kubernetes-map-type: atomic + required: + - algorithm + - header + - secretRef + type: object + type: object + x-kubernetes-validations: + - message: exactly one of the fields in [bearer signature] + must be set + rule: '[has(self.bearer),has(self.signature)].filter(x,x==true).size() + == 1' + enabled: + default: false + type: boolean + required: + - enabled + type: object + timeout: + default: 10s + description: Optional timeout for HTTP requests to the endpoint + type: string + tls: + description: |- + Optional TLS configuration for connecting to the HTTP endpoint + If it is an HTTP endpoint, this will be ignored + properties: + caBundleRef: + description: |- + Reference to a ConfigMap containing a bundle of PEM-encoded CAs to use when + verifying the certificate chain presented by the Provider when using HTTPS. + Mutually exclusive with CABundle. + properties: + key: + description: The key to select. + type: string + name: + default: "" + description: |- + Name of the referent. + This field is effectively required, but due to backwards compatibility is + allowed to be empty. Instances of this type with an empty value here are + almost certainly wrong. + More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names + type: string + optional: + description: Specify whether the ConfigMap or its + key must be defined + type: boolean + required: + - key + type: object + x-kubernetes-map-type: atomic + insecureSkipVerify: + default: false + description: Skip TLS verification of the Provider's certificate. + type: boolean + type: object url: - minLength: 1 + description: |- + URL of the HTTP endpoint to pull targets from + If defined, the loader will periodically poll this endpoint for targets type: string - required: - - url type: object + x-kubernetes-validations: + - message: at least one of the fields in [url push] must be set + rule: '[has(self.url),has(self.push)].filter(x,x==true).size() + >= 1' type: object x-kubernetes-validations: - - message: exactly one of the fields in [http consul] must be set - rule: '[has(self.http),has(self.consul)].filter(x,x==true).size() - == 1' + - message: exactly one of the fields in [http] must be set + rule: '[has(self.http)].filter(x,x==true).size() == 1' targetLabels: additionalProperties: type: string + description: Optional labels to apply to all targets discovered by + this TargetSource type: object + targetPort: + description: Optional port to use for discovered targets if not specified + by the provider + format: int32 + type: integer targetProfile: + description: The TargetProfile to use for targets discovered by this + TargetSource minLength: 1 type: string required: @@ -76,19 +352,71 @@ spec: status: description: TargetSourceStatus defines the observed state of TargetSource properties: + conditions: + items: + description: Condition contains details for one aspect of the current + state of this API Resource. + properties: + lastTransitionTime: + description: |- + lastTransitionTime is the last time the condition transitioned from one status to another. + This should be when the underlying condition changed. If that is not known, then using the time when the API field changed is acceptable. + format: date-time + type: string + message: + description: |- + message is a human readable message indicating details about the transition. + This may be an empty string. + maxLength: 32768 + type: string + observedGeneration: + description: |- + observedGeneration represents the .metadata.generation that the condition was set based upon. + For instance, if .metadata.generation is currently 12, but the .status.conditions[x].observedGeneration is 9, the condition is out of date + with respect to the current state of the instance. + format: int64 + minimum: 0 + type: integer + reason: + description: |- + reason contains a programmatic identifier indicating the reason for the condition's last transition. + Producers of specific condition types may define expected values and meanings for this field, + and whether the values are considered a guaranteed API. + The value should be a CamelCase string. + This field may not be empty. + maxLength: 1024 + minLength: 1 + pattern: ^[A-Za-z]([A-Za-z0-9_,:]*[A-Za-z0-9_])?$ + type: string + status: + description: status of the condition, one of True, False, Unknown. + enum: + - "True" + - "False" + - Unknown + type: string + type: + description: type of condition in CamelCase or in foo.example.com/CamelCase. + maxLength: 316 + pattern: ^([a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*/)?(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])$ + type: string + required: + - lastTransitionTime + - message + - reason + - status + - type + type: object + type: array lastSync: format: date-time type: string observedGeneration: format: int64 type: integer - status: - type: string targetsCount: format: int32 type: integer - required: - - observedGeneration type: object type: object served: true diff --git a/internal/controller/discovery/client.go b/internal/controller/discovery/client.go index e5cc5ea..a9d790f 100644 --- a/internal/controller/discovery/client.go +++ b/internal/controller/discovery/client.go @@ -7,7 +7,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" - "k8s.io/client-go/util/retry" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" @@ -70,20 +69,3 @@ func deleteTarget(ctx context.Context, c client.Client, name string, namespace s return err } - -// updateTargetSourceStatus updates the status of the TargetSource Object ts. The only fields updated are targetCount and LastSync, which takes the current timestamp. -func updateTargetSourceStatus(ctx context.Context, c client.Client, ts *gnmicv1alpha1.TargetSource, targetCount int32) error { - err := retry.RetryOnConflict(retry.DefaultRetry, func() error { - latest := &gnmicv1alpha1.TargetSource{} - if err := c.Get(ctx, client.ObjectKeyFromObject(ts), latest); err != nil { - return err - } - - latest.Status.TargetsCount = targetCount - latest.Status.LastSync = metav1.Now() - - return c.Status().Update(ctx, latest) - }) - - return err -} diff --git a/internal/controller/discovery/core/discovery_kubernetes_client.go b/internal/controller/discovery/core/discovery_kubernetes_client.go new file mode 100644 index 0000000..f8a3a33 --- /dev/null +++ b/internal/controller/discovery/core/discovery_kubernetes_client.go @@ -0,0 +1,68 @@ +package core + +import ( + "context" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/util/retry" + "sigs.k8s.io/controller-runtime/pkg/client" + + gnmicv1alpha1 "github.com/gnmic/operator/api/v1alpha1" +) + +// DiscoveryKubernetesClient is a client which fulfills the StatusUpdater interface +type DiscoveryKubernetesClient struct { + client client.Client + scheme *runtime.Scheme + targetSource *gnmicv1alpha1.TargetSource +} + +// Returns an instance of DiscoveryKubernetesClient +func NewDiscoveryKubernetesClient(c client.Client, s *runtime.Scheme, ts *gnmicv1alpha1.TargetSource) *DiscoveryKubernetesClient { + return &DiscoveryKubernetesClient{ + client: c, + scheme: s, + targetSource: ts, + } +} + +// UpdateStatus takes a StatusUpdate holding Conditions and a pointer referencing the TargetsCount. +// If TargetsCount is set, the LastSync time gets set to metav1.Now(). +// Replaces LastTransitionTime of each Condition with metav1.Now(). +func (c *DiscoveryKubernetesClient) UpdateStatus(ctx context.Context, update StatusUpdate) error { + + return c.patchStatus(ctx, func( + ts *gnmicv1alpha1.TargetSource, + ) { + now := metav1.Now() + + // Update status fields: Replace all Conditions and set TargetsCount and LastSync if pointer != nil + for i := range update.Conditions { + update.Conditions[i].LastTransitionTime = now + } + ts.Status.Conditions = update.Conditions + + if update.TargetsCount != nil { + ts.Status.TargetsCount = *update.TargetsCount + ts.Status.LastSync = now + } + }) +} + +// patchStatus is an internal function to update the Kubernetes object +func (c *DiscoveryKubernetesClient) patchStatus(ctx context.Context, mutate func(*gnmicv1alpha1.TargetSource)) error { + err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + latest := &gnmicv1alpha1.TargetSource{} + if err := c.client.Get(ctx, client.ObjectKeyFromObject(c.targetSource), latest); err != nil { + return err + } + + patch := client.MergeFrom(latest.DeepCopy()) + mutate(latest) + + return c.client.Status().Patch(ctx, latest, patch) + }) + + return err +} diff --git a/internal/controller/discovery/core/status_updater_interface.go b/internal/controller/discovery/core/status_updater_interface.go new file mode 100644 index 0000000..35a7c3d --- /dev/null +++ b/internal/controller/discovery/core/status_updater_interface.go @@ -0,0 +1,33 @@ +package core + +import ( + "context" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +const ( + ConditionTypeReady = "Ready" + ConditionTypeReconciling = "Reconciling" + ConditionTypeDegraded = "Degraded" + ConditionTypeStalled = "Stalled" + + ReasonWaitingForSync Reason = "WaitingForSync" + ReasonSyncStarted Reason = "SyncStarted" + ReasonSyncSucceeded Reason = "SyncSucceeded" + ReasonSyncCompleted Reason = "SyncCompleted" + ReasonSyncWithErrors Reason = "SyncSucceededWithErrors" + ReasonSyncFailed Reason = "SyncFailed" +) + +type Reason string + +type StatusUpdate struct { + Conditions []metav1.Condition + TargetsCount *int32 +} + +// StatusUpdater defines the interface for TargetLoaders and MessageProcessor to update the status of the TargetSource +type StatusUpdater interface { + UpdateStatus(context.Context, StatusUpdate) error +} diff --git a/internal/controller/discovery/core/types.go b/internal/controller/discovery/core/types.go index 5a1c8cf..be14203 100644 --- a/internal/controller/discovery/core/types.go +++ b/internal/controller/discovery/core/types.go @@ -22,6 +22,7 @@ type CommonLoaderConfig struct { TargetsourceNN types.NamespacedName ChunkSize int AcceptPush bool + Client StatusUpdater } // EventAction represents the type of a discovery event @@ -37,9 +38,11 @@ const ( // DiscoveredTarget represents a target discovered from an external source // before it is materialized as a Kubernetes Target CR type DiscoveredTarget struct { - Name string - Address string - Labels map[string]string + Name string + Address string + Port int32 + Labels map[string]string + TargetProfile string } type DiscoveryEvent struct { diff --git a/internal/controller/discovery/loaders.go b/internal/controller/discovery/loaders.go index c888c27..e8061d9 100644 --- a/internal/controller/discovery/loaders.go +++ b/internal/controller/discovery/loaders.go @@ -13,10 +13,10 @@ func NewLoader(cfg *core.CommonLoaderConfig, spec gnmicv1alpha1.TargetSourceSpec switch { case spec.Provider.HTTP != nil: - cfg.AcceptPush = spec.Provider.HTTP.AcceptPush + if spec.Provider.HTTP.Push != nil { + cfg.AcceptPush = spec.Provider.HTTP.Push.Enabled + } return http.New(*cfg), nil - case spec.Provider.Consul != nil: - return nil, fmt.Errorf("Unimplemented targetsource provider, check TargetSource CRD for %s", cfg.TargetsourceNN) default: return nil, fmt.Errorf("unknown targetsource provider, check TargetSource CRD for %s", cfg.TargetsourceNN) } diff --git a/internal/controller/discovery/loaders/http/loader.go b/internal/controller/discovery/loaders/http/loader.go index 5169e59..90cf17d 100644 --- a/internal/controller/discovery/loaders/http/loader.go +++ b/internal/controller/discovery/loaders/http/loader.go @@ -5,6 +5,7 @@ import ( "fmt" "time" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "sigs.k8s.io/controller-runtime/pkg/log" "github.com/gnmic/operator/internal/controller/discovery/core" @@ -51,20 +52,36 @@ func (l *Loader) Run(ctx context.Context, out chan<- []core.DiscoveryMessage) er return nil case <-ticker.C: + l.commonCfg.Client.UpdateStatus( + ctx, + core.StatusUpdate{ + Conditions: []metav1.Condition{ + { + Type: core.ConditionTypeReconciling, + Status: metav1.ConditionStatus("True"), + Reason: string(core.ReasonSyncStarted), + Message: "Started fetching targets", + }, + }, + }, + ) + time.Sleep(10 * time.Second) // Switch case + i only needed to test behavior for messages with different values. switch i { case 1: snapshotID := fmt.Sprintf("%s-%s-%s", l.commonCfg.TargetsourceNN.Namespace, l.commonCfg.TargetsourceNN.Name, uuid.NewString()) targets := []core.DiscoveredTarget{ { - Name: "ceos1", - Address: "clab-3-nodes-ceos1:6030", + Name: "spine1", + Address: "clab-t1-spine1", + Port: 57400, Labels: map[string]string{}, }, { Name: "leaf1", - Address: "clab-3-nodes-leaf1:57400", - Labels: map[string]string{"gnmic_operator_target_profile": "default1"}, + Address: "clab-leaf1", + Port: 57400, + Labels: map[string]string{}, }, } @@ -75,14 +92,16 @@ func (l *Loader) Run(ctx context.Context, out chan<- []core.DiscoveryMessage) er snapshotID := fmt.Sprintf("%s-%s-%s", l.commonCfg.TargetsourceNN.Namespace, l.commonCfg.TargetsourceNN.Name, uuid.NewString()) targets := []core.DiscoveredTarget{ { - Name: "ceos1", - Address: "clab-3-nodes-ceos1:6030", - Labels: map[string]string{"gnmic_operator_target_profile": "default1"}, + Name: "spine1", + Address: "clab-t1-spine1", + Port: 57400, + Labels: map[string]string{}, }, { Name: "leaf2", - Address: "clab-3-nodes-leaf2:57400", - Labels: map[string]string{"gnmic_operator_target_profile": "default1"}, + Address: "clab-t1-leaf2", + Port: 57400, + Labels: map[string]string{}, }, } @@ -94,9 +113,22 @@ func (l *Loader) Run(ctx context.Context, out chan<- []core.DiscoveryMessage) er snapshotID := fmt.Sprintf("%s-%s-%s", l.commonCfg.TargetsourceNN.Namespace, l.commonCfg.TargetsourceNN.Name, uuid.NewString()) targets := []core.DiscoveredTarget{ { - Name: "ceos1", - Address: "clab-3-nodes-ceos2:6030", - Labels: map[string]string{"gnmic_operator_target_profile": "default2"}, + Name: "spine1", + Address: "clab-t1-spine1", + Port: 57400, + Labels: map[string]string{}, + }, + { + Name: "leaf1", + Address: "clab-t1-leaf1", + Port: 57400, + Labels: map[string]string{}, + }, + { + Name: "leaf2", + Address: "clab-t1-leaf2", + Port: 57400, + Labels: map[string]string{}, }, } diff --git a/internal/controller/discovery/mapper.go b/internal/controller/discovery/mapper.go index bc42531..4690fd1 100644 --- a/internal/controller/discovery/mapper.go +++ b/internal/controller/discovery/mapper.go @@ -1,6 +1,7 @@ package discovery import ( + "fmt" "maps" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -20,10 +21,19 @@ func generateTargetResource(d core.DiscoveredTarget, ts *gnmicv1alpha1.TargetSou }, } - // Add Address from DiscoveredTarget - t.Spec.Address = d.Address - // Add default Target Profile from the TargetSource Spec TargetProfile - t.Spec.Profile = ts.Spec.TargetProfile + // Add Address + Port from DiscoveredTarget or use TargetSource.spec.targetPort + targetPort := ts.Spec.TargetPort + if d.Port != 0 { + targetPort = d.Port + } + t.Spec.Address = fmt.Sprintf("%s:%d", d.Address, targetPort) + + // Add discovered Target Profile or use TargetSource.spec.targetProfile + targetProfile := ts.Spec.TargetProfile + if d.TargetProfile != "" { + targetProfile = d.TargetProfile + } + t.Spec.Profile = targetProfile // Copy TargetLabels from TargetSource Spec & DiscoveredTarget. Discovered labels take precedence over TargetSource labels. maps.Copy(t.Labels, ts.Spec.TargetLabels) diff --git a/internal/controller/discovery/message_processor.go b/internal/controller/discovery/message_processor.go index f573b1b..4302a90 100644 --- a/internal/controller/discovery/message_processor.go +++ b/internal/controller/discovery/message_processor.go @@ -4,6 +4,7 @@ import ( "context" "fmt" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log" @@ -31,15 +32,17 @@ type MessageProcessor struct { // Events are deferred while snapshot is in progress deferredEvents []core.DiscoveryEvent targetCount int32 + updater core.StatusUpdater } // NewMessageProcessor wires a MessageProcessor instance -func NewMessageProcessor(c client.Client, s *runtime.Scheme, ts *gnmicv1alpha1.TargetSource, in <-chan []core.DiscoveryMessage) *MessageProcessor { +func NewMessageProcessor(c client.Client, s *runtime.Scheme, ts *gnmicv1alpha1.TargetSource, in <-chan []core.DiscoveryMessage, u core.StatusUpdater) *MessageProcessor { return &MessageProcessor{ client: c, scheme: s, targetSource: ts, in: in, + updater: u, } } @@ -234,11 +237,39 @@ func (m *MessageProcessor) processEvent(ctx context.Context, event core.Discover switch event.Event { case core.EventApply: m.targetCount++ - m.updateStatus(ctx, logger) + m.updater.UpdateStatus( + ctx, + core.StatusUpdate{ + Conditions: []metav1.Condition{ + { + Type: core.ConditionTypeReady, + Status: metav1.ConditionStatus("True"), + Reason: string(core.ReasonSyncSucceeded), + Message: "Successfully synced all targets", + }, + }, + TargetsCount: &m.targetCount, + }, + ) case core.EventDelete: m.targetCount-- - m.updateStatus(ctx, logger) + m.updater.UpdateStatus( + ctx, + core.StatusUpdate{ + Conditions: []metav1.Condition{ + { + Type: core.ConditionTypeReady, + Status: metav1.ConditionStatus("True"), + Reason: string(core.ReasonSyncSucceeded), + Message: "Successfully synced all targets", + }, + }, + TargetsCount: &m.targetCount, + }, + ) } + } else { + // m.updateStatus(ctx, gnmicv1alpha1.SyncStatusError, err) } return err @@ -309,8 +340,32 @@ func (m *MessageProcessor) applySnapshot(ctx context.Context, snapshot *snapshot "numOfDelete", nDelete, ) + errCount := 0 for _, e := range events { - m.applyEvent(ctx, e, logger) + err = m.applyEvent(ctx, e, logger) + if err != nil { + errCount++ + } + } + if errCount != 0 { + // m.updateStatus(ctx, gnmicv1alpha1.SyncStatusSyncedWithErrors, err) + } else { + // Because of idempotency, allTargets = desired state = targets existing in Kubernetes. Overwrites the counter to "reset" it. + m.targetCount = int32(len(allTargets)) + m.updater.UpdateStatus( + ctx, + core.StatusUpdate{ + Conditions: []metav1.Condition{ + { + Type: core.ConditionTypeReady, + Status: metav1.ConditionStatus("True"), + Reason: string(core.ReasonSyncSucceeded), + Message: "Successfully synced all targets", + }, + }, + TargetsCount: &m.targetCount, + }, + ) } // Replay deferred events @@ -325,10 +380,6 @@ func (m *MessageProcessor) applySnapshot(ctx context.Context, snapshot *snapshot } } - // Because of idempotency, allTargets = desired state = targets existing in Kubernetes. Overwrites the counter to "reset" it. - m.targetCount = int32(len(allTargets)) - m.updateStatus(ctx, logger) - m.resetSnapshot() m.deferredEvents = nil return nil @@ -365,16 +416,6 @@ func (m *MessageProcessor) applyEvent(ctx context.Context, event core.DiscoveryE return nil } -func (m *MessageProcessor) updateStatus(ctx context.Context, logger logr.Logger) { - if err := updateTargetSourceStatus(ctx, m.client, m.targetSource, m.targetCount); err != nil { - logger.Error(err, "error updating TargetSource status") - } else { - logger.Info("updated target source status", - "targetCount", m.targetCount, - ) - } -} - func (m *MessageProcessor) resetSnapshot() { m.activeSnapshot = nil } diff --git a/internal/controller/targetsource_controller.go b/internal/controller/targetsource_controller.go index 7f30fc8..ed880f4 100644 --- a/internal/controller/targetsource_controller.go +++ b/internal/controller/targetsource_controller.go @@ -20,8 +20,10 @@ import ( "context" apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/util/retry" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" @@ -104,8 +106,8 @@ func (r *TargetSourceReconciler) Reconcile(ctx context.Context, req ctrl.Request return ctrl.Result{}, err } - targetSource.Status.ObservedGeneration = targetSource.Generation - if err := r.Status().Update(ctx, targetSource); err != nil { + // Update TargetSource Status for new generation + if err := r.updateObservedGeneration(ctx, targetSource); err != nil { return ctrl.Result{}, err } @@ -179,9 +181,25 @@ func (r *TargetSourceReconciler) startDiscovery( ) error { targetChannel := make(chan []discoveryTypes.DiscoveryMessage, r.BufferSize) ctx, cancel := context.WithCancel(context.Background()) + + statusUpdater := discoveryTypes.NewDiscoveryKubernetesClient(r.Client, r.Scheme, targetSource) + if err := statusUpdater.UpdateStatus(ctx, discoveryTypes.StatusUpdate{ + Conditions: []metav1.Condition{ + { + Type: discoveryTypes.ConditionTypeReady, + Status: metav1.ConditionFalse, + Reason: string(discoveryTypes.ReasonWaitingForSync), + Message: "Waiting for initial sync", + }, + }, + }); err != nil { + logger.Error(err, "updating targetsource status failed") + } + loaderConfig := discoveryTypes.CommonLoaderConfig{ TargetsourceNN: key, ChunkSize: r.ChunkSize, + Client: statusUpdater, } // Cleanup function to cleanup discovery runtime of targetsource @@ -195,6 +213,7 @@ func (r *TargetSourceReconciler) startDiscovery( r.Scheme, targetSource, targetChannel, + statusUpdater, ) loader, err := discovery.NewLoader(&loaderConfig, targetSource.Spec) if err != nil { @@ -238,6 +257,22 @@ func (r *TargetSourceReconciler) startDiscovery( return nil } +func (r *TargetSourceReconciler) updateObservedGeneration(ctx context.Context, ts *gnmicv1alpha1.TargetSource) error { + err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + latest := &gnmicv1alpha1.TargetSource{} + if err := r.Client.Get(ctx, client.ObjectKeyFromObject(ts), latest); err != nil { + return err + } + + patch := client.MergeFrom(latest.DeepCopy()) + latest.Status.ObservedGeneration = ts.Generation + + return r.Client.Status().Patch(ctx, latest, patch) + }) + + return err +} + // SetupWithManager sets up the controller with the Manager. func (r *TargetSourceReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). diff --git a/test.mk b/test.mk index 23c5983..67f1923 100644 --- a/test.mk +++ b/test.mk @@ -153,5 +153,5 @@ apply-test-clusters: ## Apply the test clusters for testing kubectl apply -f test/integration/resources/clusters .PHONY: apply-test-resources -apply-test-resources: apply-test-targets apply-test-subscriptions apply-test-outputs apply-test-pipelines apply-test-clusters +apply-test-resources: apply-test-targets apply-test-targetsources apply-test-subscriptions apply-test-outputs apply-test-pipelines apply-test-clusters diff --git a/test/integration/http/resources/configmap.yaml b/test/integration/http/resources/configmap.yaml index f017566..be7091f 100644 --- a/test/integration/http/resources/configmap.yaml +++ b/test/integration/http/resources/configmap.yaml @@ -6,7 +6,8 @@ data: targets.json: | [ { - "address": "clab-t1-spine1:57400", + "address": "clab-t1-spine1", + "port": 57400, "name": "spine1", "labels": { "vendor": "nokia_srlinux", @@ -14,7 +15,8 @@ data: } }, { - "address": "clab-t1-leaf1:57400", + "address": "clab-t1-leaf1", + "port": 57400, "name": "leaf1", "labels": { "vendor": "nokia_srlinux", @@ -22,7 +24,8 @@ data: } }, { - "address": "clab-t1-leaf2:57400", + "address": "clab-t1-leaf2", + "port": 57400, "name": "leaf2", "labels": { "vendor": "nokia_srlinux",