diff --git a/.license-scan-overrides.jsonl b/.license-scan-overrides.jsonl index faa62bcb5..f6e9877b5 100644 --- a/.license-scan-overrides.jsonl +++ b/.license-scan-overrides.jsonl @@ -1,6 +1,7 @@ {"name": "github.com/chzyer/logex", "licenceType": "MIT"} {"name": "github.com/grpc-ecosystem/go-grpc-middleware/v2", "licenceType": "Apache-2.0"} {"name": "github.com/hashicorp/vault/api/auth/approle", "licenceType": "MPL-2.0"} +{"name": "github.com/ironcore-dev/gnmi-test-server", "licenceType": "Apache-2.0"} {"name": "github.com/jpillora/longestcommon", "licenceType": "MIT"} {"name": "github.com/logrusorgru/aurora", "licenceType": "Unlicense"} {"name": "github.com/mattn/go-localereader", "licenceType": "MIT"} diff --git a/Dockerfile b/Dockerfile index 8619d4333..ab5dad76b 100644 --- a/Dockerfile +++ b/Dockerfile @@ -16,6 +16,7 @@ WORKDIR /workspace RUN --mount=type=cache,target=/go/pkg/mod \ --mount=type=bind,source=go.mod,target=go.mod \ --mount=type=bind,source=go.sum,target=go.sum \ + --mount=type=bind,source=test/gnmi,target=test/gnmi \ go mod download -x RUN --mount=type=bind,target=. \ diff --git a/config/develop/gnmi-test-server.yaml b/config/develop/gnmi-test-server.yaml index 789b76681..9f2d4d1c8 100644 --- a/config/develop/gnmi-test-server.yaml +++ b/config/develop/gnmi-test-server.yaml @@ -33,7 +33,7 @@ spec: containers: - name: gnmi-test-server image: ghcr.io/ironcore-dev/gnmi-test-server:latest - imagePullPolicy: IfNotPresent + imagePullPolicy: Never ports: - containerPort: 9339 name: grpc diff --git a/go.mod b/go.mod index 31f543d34..7e41b4f98 100644 --- a/go.mod +++ b/go.mod @@ -129,3 +129,5 @@ require ( sigs.k8s.io/randfill v1.0.0 // indirect sigs.k8s.io/structured-merge-diff/v6 v6.3.2 // indirect ) + +replace github.com/ironcore-dev/gnmi-test-server => ./test/gnmi diff --git a/test/e2e/testutil/doc.go b/test/e2e/testutil/doc.go new file mode 100644 index 000000000..9098b9793 --- /dev/null +++ b/test/e2e/testutil/doc.go @@ -0,0 +1,16 @@ +// SPDX-FileCopyrightText: 2026 SAP SE or an SAP affiliate company and IronCore contributors +// SPDX-License-Identifier: Apache-2.0 + +// Package testutil provides test infrastructure for e2e tests. +// +// It supports two test modes selected by build tags: +// +// - Envtest (build tag: envtest): Uses controller-runtime's envtest.Environment +// with an in-process gNMI test server. Fast (~10s) but doesn't test deployment. +// +// - Cluster (default): Uses a real Kubernetes cluster (typically Kind) with a +// deployed operator and gnmi-test-server pod. Slower (~2-5min) but tests full stack. +// +// The concrete types ClusterEnvironment and EnvtestEnvironment provide the same +// methods, allowing test logic to work with either mode via build tag selection. +package testutil diff --git a/test/e2e/testutil/helpers.go b/test/e2e/testutil/helpers.go new file mode 100644 index 000000000..5d38e28fb --- /dev/null +++ b/test/e2e/testutil/helpers.go @@ -0,0 +1,547 @@ +// SPDX-FileCopyrightText: 2025 SAP SE or an SAP affiliate company and IronCore contributors +// SPDX-License-Identifier: Apache-2.0 + +package testutil + +import ( + "bufio" + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "os" + "os/exec" + "reflect" + "sort" + "strings" + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "sigs.k8s.io/yaml" +) + +const ( + prometheusURL = "https://github.com/prometheus-operator/prometheus-operator/releases/download/v0.82.2/bundle.yaml" + certmanagerURL = "https://github.com/cert-manager/cert-manager/releases/download/v1.17.2/cert-manager.yaml" +) + +// warnError writes a warning to the provided writer. +func warnError(w io.Writer, err error) { + _, _ = fmt.Fprintf(w, "warning: %v\n", err) +} + +// Run executes the provided command within this context. +// It writes the command to the provided writer for logging. +func Run(cmd *exec.Cmd, w io.Writer) (string, error) { + dir, err := GetProjectDir() + if err != nil { + return "", fmt.Errorf("failed to get project directory: %w", err) + } + + cmd.Dir = dir + if err = os.Chdir(cmd.Dir); err != nil { + _, _ = fmt.Fprintf(w, "chdir dir: %s\n", err) + } + + command := strings.Join(cmd.Args, " ") + // #nosec G705 + _, _ = fmt.Fprintf(w, "running: %s\n", command) + + output, err := cmd.CombinedOutput() + if err != nil { + return string(output), fmt.Errorf("%s failed with error: (%w) %s", command, err, string(output)) + } + + return string(output), nil +} + +// Apply takes a raw YAML resource and applies it to the cluster by +// creating a temporary file and running 'kubectl apply -f'. +func Apply(ctx context.Context, resource string, w io.Writer) error { + file, err := os.CreateTemp("", "resource-*.yaml") + if err != nil { + return fmt.Errorf("failed to create temp file: %w", err) + } + // #nosec G703 + defer func() { _ = os.Remove(file.Name()) }() + if _, err = file.WriteString(resource); err != nil { + return fmt.Errorf("failed to write to temp file: %w", err) + } + if err = file.Close(); err != nil { + return fmt.Errorf("failed to close temp file: %w", err) + } + // #nosec G204 G702 + cmd := exec.CommandContext(ctx, "kubectl", "apply", "-f", file.Name()) + if _, err = Run(cmd, w); err != nil { + return fmt.Errorf("failed to apply resource: %w", err) + } + return nil +} + +// ExtractResourceIdentifier parses YAML and returns "kind/name" for use with kubectl wait. +// The kind is lowercased to match kubectl's resource type format. +func ExtractResourceIdentifier(resourceYAML string) (string, error) { + var obj unstructured.Unstructured + if err := yaml.Unmarshal([]byte(resourceYAML), &obj); err != nil { + return "", fmt.Errorf("failed to unmarshal YAML: %w", err) + } + + kind := strings.ToLower(obj.GetKind()) + name := obj.GetName() + if kind == "" || name == "" { + return "", errors.New("YAML missing kind or metadata.name") + } + + return kind + "/" + name, nil +} + +// CompareJSON compares two JSON strings and returns an error if they are not equal. +// For comparison, it unmarshals both into interface{} and uses reflect.DeepEqual +// after sorting any arrays and removing empty arrays/objects to ignore ordering +// and cleanup artifacts. +func CompareJSON(got, want string) error { + var gotObj, wantObj any + if err := json.Unmarshal([]byte(got), &gotObj); err != nil { + return fmt.Errorf("failed to unmarshal got JSON: %w", err) + } + if err := json.Unmarshal([]byte(want), &wantObj); err != nil { + return fmt.Errorf("failed to unmarshal want JSON: %w", err) + } + + // Normalize both objects (sort arrays, remove empty containers) + gotObj = normalizeJSON(gotObj) + wantObj = normalizeJSON(wantObj) + + if !reflect.DeepEqual(gotObj, wantObj) { + // For error message, show original compacted JSON (not normalized) + // so empty objects show as {} not null + var gotBuf, wantBuf bytes.Buffer + _ = json.Compact(&gotBuf, []byte(got)) //nolint:errcheck // already parsed successfully above + _ = json.Compact(&wantBuf, []byte(want)) //nolint:errcheck // already parsed successfully above + return fmt.Errorf("JSON mismatch:\ngot: %s\nwant: %s", gotBuf.String(), wantBuf.String()) + } + return nil +} + +// normalizeJSON recursively sorts arrays and removes empty arrays/objects +// to make comparison order-independent and ignore cleanup artifacts. +func normalizeJSON(v any) any { + switch val := v.(type) { + case map[string]any: + result := make(map[string]any) + for k, v := range val { + normalized := normalizeJSON(v) + // Skip empty maps and empty arrays + if !isEmpty(normalized) { + result[k] = normalized + } + } + if len(result) == 0 { + return nil + } + return result + case []any: + var result []any + for _, elem := range val { + normalized := normalizeJSON(elem) + if !isEmpty(normalized) { + result = append(result, normalized) + } + } + if len(result) == 0 { + return nil + } + // Sort the array by JSON representation + sort.Slice(result, func(i, j int) bool { + bi, _ := json.Marshal(result[i]) //nolint:errcheck // sorting comparison, errors treated as equal + bj, _ := json.Marshal(result[j]) //nolint:errcheck // sorting comparison, errors treated as equal + return string(bi) < string(bj) + }) + return result + default: + return v + } +} + +// isEmpty checks if a value is an empty map, empty array, or nil. +func isEmpty(v any) bool { + if v == nil { + return true + } + switch val := v.(type) { + case map[string]any: + return len(val) == 0 + case []any: + return len(val) == 0 + } + return false +} + +// InstallPrometheusOperator installs the prometheus Operator to be used to export the enabled metrics. +func InstallPrometheusOperator(ctx context.Context, w io.Writer) error { + cmd := exec.CommandContext(ctx, "kubectl", "create", "-f", prometheusURL) + _, err := Run(cmd, w) + return err +} + +// UninstallPrometheusOperator uninstalls the prometheus +func UninstallPrometheusOperator(ctx context.Context, w io.Writer) { + cmd := exec.CommandContext(ctx, "kubectl", "delete", "-f", prometheusURL) + if _, err := Run(cmd, w); err != nil { + warnError(w, err) + } +} + +// IsPrometheusCRDsInstalled checks if any Prometheus CRDs are installed +// by verifying the existence of key CRDs related to Prometheus. +func IsPrometheusCRDsInstalled(ctx context.Context, w io.Writer) bool { + // List of common Prometheus CRDs + prometheusCRDs := []string{ + "prometheuses.monitoring.coreos.com", + "prometheusrules.monitoring.coreos.com", + "prometheusagents.monitoring.coreos.com", + } + + cmd := exec.CommandContext(ctx, "kubectl", "get", "crds", "-o", "custom-columns=NAME:.metadata.name") + output, err := Run(cmd, w) + if err != nil { + return false + } + crdList := GetNonEmptyLines(output) + for _, crd := range prometheusCRDs { + for _, line := range crdList { + if strings.Contains(line, crd) { + return true + } + } + } + + return false +} + +// InstallCertManager installs the cert manager bundle. +func InstallCertManager(ctx context.Context, w io.Writer) error { + cmd := exec.CommandContext(ctx, "kubectl", "apply", "-f", certmanagerURL) + if _, err := Run(cmd, w); err != nil { + return err + } + // Wait for cert-manager-webhook to be ready, which can take time if cert-manager + // was re-installed after uninstalling on a cluster. + cmd = exec.CommandContext( + ctx, "kubectl", "wait", "deployment.apps/cert-manager-webhook", + "--for", "condition=Available", + "--namespace", "cert-manager", + "--timeout", "5m", + ) + if _, err := Run(cmd, w); err != nil { + return err + } + + // Wait for webhook to be fully operational (TLS cert ready) + // The deployment being Available doesn't mean the webhook TLS is ready + cmd = exec.CommandContext( + ctx, "kubectl", "wait", "certificate/cert-manager-webhook-ca", + "--for", "condition=Ready", + "--namespace", "cert-manager", + "--timeout", "2m", + ) + _, _ = Run(cmd, w) //nolint:errcheck // cert may not exist in older versions + + // Give the webhook a moment to pick up the cert + time.Sleep(5 * time.Second) + return nil +} + +// UninstallCertManager uninstalls the cert manager +func UninstallCertManager(ctx context.Context, w io.Writer) { + cmd := exec.CommandContext(ctx, "kubectl", "delete", "-f", certmanagerURL) + if _, err := Run(cmd, w); err != nil { + warnError(w, err) + } +} + +// WaitForCertManagerWebhook waits for the cert-manager webhook to be fully operational. +// This should be called before deploying resources that use cert-manager certificates. +func WaitForCertManagerWebhook(ctx context.Context, w io.Writer) error { + // Wait for deployment to be available + cmd := exec.CommandContext( + ctx, "kubectl", "wait", "deployment.apps/cert-manager-webhook", + "--for", "condition=Available", + "--namespace", "cert-manager", + "--timeout", "2m", + ) + if _, err := Run(cmd, w); err != nil { + return err + } + + // Wait for the CA injector to inject the CA bundle into the webhook + cmd = exec.CommandContext( + ctx, "kubectl", "wait", "deployment.apps/cert-manager-cainjector", + "--for", "condition=Available", + "--namespace", "cert-manager", + "--timeout", "2m", + ) + if _, err := Run(cmd, w); err != nil { + return err + } + + // Wait for the cainjector to inject the CA bundle into the webhook configuration + // This is what actually makes the webhook work - the API server needs the CA to verify the webhook's TLS cert + cmd = exec.CommandContext( + ctx, "kubectl", "wait", "validatingwebhookconfiguration/cert-manager-webhook", + "--for", "jsonpath={.webhooks[0].clientConfig.caBundle}", + "--timeout", "2m", + ) + if _, err := Run(cmd, w); err != nil { + return fmt.Errorf("cert-manager webhook CA bundle not injected: %w", err) + } + + return nil +} + +// IsCertManagerCRDsInstalled checks if any Cert Manager CRDs are installed +// by verifying the existence of key CRDs related to Cert Manager. +func IsCertManagerCRDsInstalled(ctx context.Context, w io.Writer) bool { + // List of common Cert Manager CRDs + certManagerCRDs := []string{ + "certificates.cert-manager.io", + "issuers.cert-manager.io", + "clusterissuers.cert-manager.io", + "certificaterequests.cert-manager.io", + "orders.acme.cert-manager.io", + "challenges.acme.cert-manager.io", + } + + // Execute the kubectl command to get all CRDs + cmd := exec.CommandContext(ctx, "kubectl", "get", "crds") + output, err := Run(cmd, w) + if err != nil { + return false + } + + // Check if any of the Cert Manager CRDs are present + crdList := GetNonEmptyLines(output) + for _, crd := range certManagerCRDs { + for _, line := range crdList { + if strings.Contains(line, crd) { + return true + } + } + } + + return false +} + +// LoadImageToKindClusterWithName loads a local docker image to the kind cluster +func LoadImageToKindClusterWithName(ctx context.Context, name string, w io.Writer) error { + cluster := "kind" + if v, ok := os.LookupEnv("KIND_CLUSTER"); ok { + cluster = v + } + // See: https://kind.sigs.k8s.io/docs/user/rootless/#creating-a-kind-cluster-with-rootless-nerdctl + prov, ok := os.LookupEnv("KIND_EXPERIMENTAL_PROVIDER") + if ok && prov != "docker" { + // If kind is configured to not use the docker runtime (e.g. when using podman or nerctl), + // we need to create a temp file to store the image archive and load it as a tarball. + // See: https://github.com/kubernetes-sigs/kind/issues/2760 + file, err := os.CreateTemp("", "operator-image-") + if err != nil { + return fmt.Errorf("failed to create temp file: %w", err) + } + _ = file.Close() + // #nosec G703 + defer func() { _ = os.Remove(file.Name()) }() + + // https://github.com/containerd/nerdctl/blob/main/docs/command-reference.md#whale-nerdctl-save + // https://docs.podman.io/en/v5.3.0/markdown/podman-save.1.html + // #nosec G702 + cmd := exec.CommandContext(ctx, prov, "save", name, "--output", file.Name()) + if _, err = Run(cmd, w); err != nil { + return fmt.Errorf("failed to save image: %w", err) + } + + cmd = exec.CommandContext(ctx, "kind", "load", "image-archive", file.Name(), "--name", cluster) //nolint:gosec + _, err = Run(cmd, w) + return err + } + cmd := exec.CommandContext(ctx, "kind", "load", "docker-image", name, "--name", cluster) + _, err := Run(cmd, w) + return err +} + +// GetNonEmptyLines converts given command output string into individual objects +// according to line breakers, and ignores the empty elements in it. +func GetNonEmptyLines(output string) []string { + var res []string + for element := range strings.SplitSeq(output, "\n") { + if element != "" { + res = append(res, element) + } + } + return res +} + +// GetProjectDir will return the directory where the project is +func GetProjectDir() (string, error) { + wd, err := os.Getwd() + if err != nil { + return wd, err + } + wd = strings.ReplaceAll(wd, "/test/e2e", "") + return wd, nil +} + +// PatchResourceYAML takes a raw YAML resource and patches its namespace and deviceRef. +// This allows txtar test files to have placeholder values that get replaced at runtime. +// It returns the patched YAML string ready for kubectl apply. +func PatchResourceYAML(resourceYAML, namespace, deviceName string) (string, error) { + // Parse YAML into unstructured map + var obj map[string]any + if err := yaml.Unmarshal([]byte(resourceYAML), &obj); err != nil { + return "", fmt.Errorf("failed to unmarshal YAML: %w", err) + } + + // Patch metadata.namespace + metadata, ok := obj["metadata"].(map[string]any) + if !ok { + metadata = make(map[string]any) + obj["metadata"] = metadata + } + metadata["namespace"] = namespace + + // Ensure labels map exists and add e2e test label + labels, ok := metadata["labels"].(map[string]any) + if !ok { + labels = make(map[string]any) + metadata["labels"] = labels + } + labels[E2ETestLabel] = "" + + // Patch spec.deviceRef.name if it exists + if spec, ok := obj["spec"].(map[string]any); ok { + if deviceRef, ok := spec["deviceRef"].(map[string]any); ok { + deviceRef["name"] = deviceName + } + } + + // Patch the device label if it exists + if _, hasDeviceLabel := labels["networking.metal.ironcore.dev/device"]; hasDeviceLabel { + labels["networking.metal.ironcore.dev/device"] = deviceName + } + + // Marshal back to YAML + out, err := yaml.Marshal(obj) + if err != nil { + return "", fmt.Errorf("failed to marshal patched YAML: %w", err) + } + return string(out), nil +} + +// ApplyWithPatch applies a YAML resource after patching its namespace and deviceRef. +// This is the cluster-mode equivalent of envtest's createResourceFromTxtar. +func ApplyWithPatch(ctx context.Context, resourceYAML, namespace, deviceName string, w io.Writer) error { + patched, err := PatchResourceYAML(resourceYAML, namespace, deviceName) + if err != nil { + return err + } + return Apply(ctx, patched, w) +} + +// WaitForCondition waits for a resource to have a condition set to True. +// It tries "Configured" first, falls back to "Ready" if Configured doesn't exist. +// Skips config-only resources that don't have status conditions. +func WaitForCondition(ctx context.Context, resourceName, namespace string, w io.Writer) error { + // Config-only resources don't have status conditions - skip them + // resourceName format is "kind/name" e.g. "bgpconfig/evpn-settings" + kind := strings.Split(resourceName, "/")[0] + switch strings.ToLower(kind) { + case "interfaceconfig", "lldpconfig", "bgpconfig", "nveconfig", "managementaccessconfig": + return nil // No conditions to wait for + } + + // Try Configured first using jsonpath (more reliable than --for condition=X with multiple conditions) + cmd := exec.CommandContext( + ctx, "kubectl", "wait", resourceName, + "--for", `jsonpath={.status.conditions[?(@.type=="Configured")].status}=True`, + "--namespace", namespace, + "--timeout", "10s", + ) + if _, err := Run(cmd, w); err == nil { + return nil + } + + // Fallback to Ready using jsonpath (condition=Ready doesn't work reliably with custom resources) + cmd = exec.CommandContext( + ctx, "kubectl", "wait", resourceName, + "--for", `jsonpath={.status.conditions[?(@.type=="Ready")].status}=True`, + "--namespace", namespace, + "--timeout", "2m", + ) + _, err := Run(cmd, w) + return err +} + +// UncommentCode searches for target in the file and remove the comment prefix +// of the target content. The target content may span multiple lines. +func UncommentCode(filename, target, prefix string) error { + content, err := os.ReadFile(filename) + if err != nil { + return err + } + + before, after, ok := bytes.Cut(content, []byte(target)) + if !ok { + if bytes.Contains(content, []byte(target)[len(prefix):]) { + return nil // already uncommented + } + + return fmt.Errorf("unable to find the code %s to be uncomment", target) + } + + out := new(bytes.Buffer) + if _, err = out.Write(before); err != nil { + return err + } + + scanner := bufio.NewScanner(bytes.NewBufferString(target)) + if !scanner.Scan() { + return nil + } + for { + _, err = out.WriteString(strings.TrimPrefix(scanner.Text(), prefix)) + if err != nil { + return err + } + // Avoid writing a newline in case the previous line was the last in target. + if !scanner.Scan() { + break + } + if _, err = out.WriteString("\n"); err != nil { + return err + } + } + + if _, err = out.Write(after); err != nil { + return err + } + + return os.WriteFile(filename, out.Bytes(), 0o644) +} + +// ExtractConditions extracts status conditions from an unstructured object +// into a typed []metav1.Condition slice for use with apimeta helpers. +func ExtractConditions(obj *unstructured.Unstructured) ([]metav1.Condition, error) { + raw, _, err := unstructured.NestedSlice(obj.Object, "status", "conditions") + if err != nil { + return nil, err + } + data, err := json.Marshal(raw) + if err != nil { + return nil, err + } + var conditions []metav1.Condition + return conditions, json.Unmarshal(data, &conditions) +} diff --git a/test/e2e/testutil/provider.go b/test/e2e/testutil/provider.go new file mode 100644 index 000000000..986747139 --- /dev/null +++ b/test/e2e/testutil/provider.go @@ -0,0 +1,156 @@ +// SPDX-FileCopyrightText: 2026 SAP SE or an SAP affiliate company and IronCore contributors +// SPDX-License-Identifier: Apache-2.0 + +package testutil + +import ( + "context" + "time" + + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "sigs.k8s.io/controller-runtime/pkg/client" + + nxv1alpha1 "github.com/ironcore-dev/network-operator/api/cisco/nx/v1alpha1" + "github.com/ironcore-dev/network-operator/api/core/v1alpha1" + "github.com/ironcore-dev/network-operator/internal/provider" + "github.com/ironcore-dev/network-operator/internal/provider/cisco/iosxr" + "github.com/ironcore-dev/network-operator/internal/provider/cisco/nxos" +) + +// E2ETestLabel is the label key applied to resources created by e2e tests for cleanup tracking. +const E2ETestLabel = "networking.metal.ironcore.dev/e2e-test" + +// ProviderType represents the network device provider to test against. +type ProviderType string + +// ProviderFactory creates a new provider instance. +type ProviderFactory = func() provider.Provider + +// Provider names must match the registered provider names in internal/provider/*/provider.go +const ( + ProviderNXOS ProviderType = "cisco-nxos-gnmi" + ProviderIOSXR ProviderType = "cisco-iosxr-gnmi" +) + +// ProviderConfig holds the configuration for a provider test. +type ProviderConfig struct { + Name ProviderType + NewProvider ProviderFactory +} + +// SupportedProviders lists all providers to test. +var SupportedProviders = []ProviderConfig{ + {Name: ProviderNXOS, NewProvider: func() provider.Provider { return nxos.NewProvider() }}, + {Name: ProviderIOSXR, NewProvider: func() provider.Provider { return iosxr.NewProvider() }}, +} + +// CoreResources are the main API resources with finalizers. +// During cleanup, these are deleted FIRST so their finalizers can +// complete while Device and config resources still exist. +var CoreResources = []schema.GroupVersionKind{ + v1alpha1.GroupVersion.WithKind("Interface"), + v1alpha1.GroupVersion.WithKind("VLAN"), + v1alpha1.GroupVersion.WithKind("VRF"), + v1alpha1.GroupVersion.WithKind("NTP"), + v1alpha1.GroupVersion.WithKind("DNS"), + v1alpha1.GroupVersion.WithKind("LLDP"), + v1alpha1.GroupVersion.WithKind("Banner"), + v1alpha1.GroupVersion.WithKind("OSPF"), + v1alpha1.GroupVersion.WithKind("PIM"), + v1alpha1.GroupVersion.WithKind("NetworkVirtualizationEdge"), + v1alpha1.GroupVersion.WithKind("EVPNInstance"), + v1alpha1.GroupVersion.WithKind("RoutingPolicy"), + v1alpha1.GroupVersion.WithKind("PrefixSet"), + v1alpha1.GroupVersion.WithKind("BGP"), + v1alpha1.GroupVersion.WithKind("BGPPeer"), + v1alpha1.GroupVersion.WithKind("Syslog"), + v1alpha1.GroupVersion.WithKind("SNMP"), + v1alpha1.GroupVersion.WithKind("ManagementAccess"), + v1alpha1.GroupVersion.WithKind("AccessControlList"), + v1alpha1.GroupVersion.WithKind("DHCPRelay"), + v1alpha1.GroupVersion.WithKind("ISIS"), +} + +// ConfigResources are provider-specific config resources (e.g., NX-OS configs). +// During cleanup, these are deleted AFTER core resources. +var ConfigResources = []schema.GroupVersionKind{ + nxv1alpha1.GroupVersion.WithKind("InterfaceConfig"), + nxv1alpha1.GroupVersion.WithKind("LLDPConfig"), + nxv1alpha1.GroupVersion.WithKind("BGPConfig"), + nxv1alpha1.GroupVersion.WithKind("VPCDomain"), +} + +// ResourcePluralName returns the plural resource name for a GVK. +// These must match the CRD spec.names.plural values (from `kubectl api-resources`). +// We can't use meta.UnsafeGuessKindToResource because CRDs define their own plurals +// which don't always follow standard Kubernetes pluralization rules. +func ResourcePluralName(gvk schema.GroupVersionKind) string { + plurals := map[string]string{ + "Interface": "interfaces", + "VLAN": "vlans", + "VRF": "vrfs", + "NTP": "ntp", + "DNS": "dns", + "LLDP": "lldps", + "Banner": "banners", + "OSPF": "ospf", + "PIM": "pim", + "NetworkVirtualizationEdge": "networkvirtualizationedges", + "EVPNInstance": "evpninstances", + "InterfaceConfig": "interfaceconfigs", + "LLDPConfig": "lldpconfigs", + "VPCDomain": "vpcdomains", + "BGPConfig": "bgpconfigs", + "RoutingPolicy": "routingpolicies", + "PrefixSet": "prefixsets", + "BGP": "bgp", + "BGPPeer": "bgppeers", + "Syslog": "syslogs", + "SNMP": "snmp", + "ManagementAccess": "managementaccesses", + "AccessControlList": "accesscontrollists", + "DHCPRelay": "dhcprelays", + "ISIS": "isis", + "Device": "devices", + } + if plural, ok := plurals[gvk.Kind]; ok { + return plural + } + // Fallback to standard pluralization + plural, _ := meta.UnsafeGuessKindToResource(gvk) + return plural.Resource +} + +// CreateTestDevice creates a Device pointing to the gNMI server with a generated name. +func CreateTestDevice(ctx context.Context, c client.Client, gnmiAddr, namespace string) (*v1alpha1.Device, error) { + device := &v1alpha1.Device{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "test-device-", + Namespace: namespace, + }, + Spec: v1alpha1.DeviceSpec{ + Endpoint: v1alpha1.Endpoint{ + Address: gnmiAddr, + }, + }, + } + if err := c.Create(ctx, device); err != nil { + return nil, err + } + + // Set the device status to Running so that dependent resources can reconcile + device.Status.Phase = v1alpha1.DevicePhaseRunning + if err := c.Status().Update(ctx, device); err != nil { + return nil, err + } + + return device, nil +} + +// CleanupTimeout is the timeout for cleanup operations. +const CleanupTimeout = 30 * time.Second + +// CleanupInterval is the polling interval for cleanup operations. +const CleanupInterval = 100 * time.Millisecond diff --git a/test/e2e/testutil/timeouts.go b/test/e2e/testutil/timeouts.go new file mode 100644 index 000000000..4c8e8ef31 --- /dev/null +++ b/test/e2e/testutil/timeouts.go @@ -0,0 +1,17 @@ +// SPDX-FileCopyrightText: 2025 SAP SE or an SAP affiliate company and IronCore contributors +// SPDX-License-Identifier: Apache-2.0 + +package testutil + +import "time" + +const ( + // DefaultTimeout is used for standard resource reconciliation + DefaultTimeout = 30 * time.Second + + // LongTimeout is used for operations that may take longer (deployments, pod starts) + LongTimeout = 2 * time.Minute + + // VeryLongTimeout is used for end-to-end scenarios with multiple dependencies + VeryLongTimeout = 5 * time.Minute +) diff --git a/test/gnmi/Dockerfile b/test/gnmi/Dockerfile index d80e16ac4..bdc864cc6 100644 --- a/test/gnmi/Dockerfile +++ b/test/gnmi/Dockerfile @@ -12,17 +12,14 @@ ARG TARGETARCH WORKDIR /workspace -# Install dependencies -RUN --mount=type=cache,target=/go/pkg/mod \ - --mount=type=bind,source=go.mod,target=go.mod \ - --mount=type=bind,source=go.sum,target=go.sum \ - go mod download -x +# Copy source files +COPY go.mod go.sum ./ +RUN go mod download -x + +COPY . . # Build the application into a static executable while removing the symbol table and debugging information -RUN --mount=type=bind,target=. \ - --mount=type=cache,target=/go/pkg/mod \ - --mount=type=cache,target=/root/.cache/go-build \ - CGO_ENABLED=0 GOOS=${TARGETOS} GOARCH=${TARGETARCH} go build -ldflags="-s -w" -o /usr/bin/server ./main.go +RUN CGO_ENABLED=0 GOOS=${TARGETOS} GOARCH=${TARGETARCH} go build -ldflags="-s -w" -o /usr/bin/server ./main.go FROM alpine:${ALPINE_VERSION} @@ -39,5 +36,5 @@ USER 65532:65532 # Switch into workspace WORKDIR / -# Start the server application -CMD ["/server", "--port=9339", "--http-port=8000"] +# Start the server application with NX-OS behavior enabled +CMD ["/server", "--port=9339", "--http-port=8000", "--nxos"] diff --git a/test/gnmi/main.go b/test/gnmi/main.go index ef2018cde..eb5408639 100644 --- a/test/gnmi/main.go +++ b/test/gnmi/main.go @@ -4,362 +4,57 @@ package main import ( - "bytes" "context" - "crypto/tls" - "encoding/json" "flag" - "fmt" - "io" "log" - "net" - "net/http" - "strconv" - "strings" - "sync" - "time" + "os" + "os/signal" + "syscall" - gpb "github.com/openconfig/gnmi/proto/gnmi" - "github.com/tidwall/gjson" - "github.com/tidwall/sjson" - "google.golang.org/grpc" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/credentials" - "google.golang.org/grpc/reflection" - "google.golang.org/grpc/status" - - gtls "github.com/openconfig/gnmi/testing/fake/testing/tls" + "github.com/ironcore-dev/gnmi-test-server/testserver" ) -var _ gpb.GNMIServer = (*Server)(nil) - -// Server implements the GNMI gRPC server -type Server struct { - gpb.UnimplementedGNMIServer - - State *State -} - -func (s *Server) Capabilities(_ context.Context, _ *gpb.CapabilityRequest) (*gpb.CapabilityResponse, error) { - return &gpb.CapabilityResponse{SupportedEncodings: []gpb.Encoding{gpb.Encoding_JSON}}, nil -} - -func (s *Server) Get(_ context.Context, req *gpb.GetRequest) (*gpb.GetResponse, error) { - notifications := make([]*gpb.Notification, 0, len(req.GetPath())) - for _, path := range req.GetPath() { - if len(path.GetElem()) == 0 { - return nil, status.Error(codes.InvalidArgument, "root path is not allowed") - } - log.Printf("Getting path: %v", path) - notifications = append(notifications, &gpb.Notification{ - Timestamp: time.Now().UnixNano(), - Update: []*gpb.Update{ - { - Path: path, - Val: &gpb.TypedValue{ - Value: &gpb.TypedValue_JsonVal{ - JsonVal: s.State.Get(path), - }, - }, - }, - }, - }) - } - return &gpb.GetResponse{ - Notification: notifications, - }, nil -} - -func (s *Server) Set(_ context.Context, req *gpb.SetRequest) (*gpb.SetResponse, error) { - log.Printf("Received Set request: %v", req) - res := make([]*gpb.UpdateResult, 0, len(req.GetDelete())+len(req.GetUpdate())) - for _, del := range req.GetDelete() { - log.Printf("Deleting path: %v", del) - res = append(res, &gpb.UpdateResult{ - Timestamp: time.Now().UnixNano(), - Path: del, - Op: gpb.UpdateResult_DELETE, - }) - s.State.Del(del) - } - for _, replace := range req.GetReplace() { - log.Printf("Replacing path: %v with value: %q", replace.GetPath(), replace.GetVal().GetJsonVal()) - res = append(res, &gpb.UpdateResult{ - Timestamp: time.Now().UnixNano(), - Path: replace.Path, - Op: gpb.UpdateResult_REPLACE, - }) - // Delete the existing value at the path and set the new value. - s.State.Del(replace.GetPath()) - s.State.Set(replace.GetPath(), replace.GetVal().GetJsonVal()) - } - for _, update := range req.GetUpdate() { - log.Printf("Updating path: %v with value: %q", update.GetPath(), update.GetVal().GetJsonVal()) - res = append(res, &gpb.UpdateResult{ - Timestamp: time.Now().UnixNano(), - Path: update.Path, - Op: gpb.UpdateResult_UPDATE, - }) - // The value will automatically be merged into the existing state. - s.State.Set(update.GetPath(), update.GetVal().GetJsonVal()) - } - // TODO: Handle UnionReplace - return &gpb.SetResponse{ - Response: res, - Timestamp: time.Now().UnixNano(), - }, nil -} - -func (s *Server) Subscribe(stream grpc.BidiStreamingServer[gpb.SubscribeRequest, gpb.SubscribeResponse]) error { - req, err := stream.Recv() - switch { - case err == io.EOF: - return nil - case err != nil: - return err - case req.GetSubscribe() == nil: - return status.Errorf(codes.InvalidArgument, "the subscribe request must contain a subscription definition") - } - - switch req.GetRequest().(type) { - case *gpb.SubscribeRequest_Poll: - return status.Errorf(codes.InvalidArgument, "invalid request type: %T", req.GetRequest()) - case *gpb.SubscribeRequest_Subscribe: - } - - switch mode := req.GetSubscribe().GetMode(); mode { - case gpb.SubscriptionList_ONCE: - log.Printf("Received Subscribe request with ONCE mode") - - paths := make([]*gpb.Path, 0, len(req.GetSubscribe().GetSubscription())) - for _, r := range req.GetSubscribe().GetSubscription() { - paths = append(paths, r.GetPath()) - } - - res, err := s.Get(stream.Context(), &gpb.GetRequest{ - Prefix: req.GetSubscribe().GetPrefix(), - Path: paths, - Encoding: req.GetSubscribe().GetEncoding(), - UseModels: req.GetSubscribe().GetUseModels(), - Extension: req.GetExtension(), - }) - if err != nil { - return err - } - - for _, notification := range res.GetNotification() { - if err := stream.Send(&gpb.SubscribeResponse{ - Response: &gpb.SubscribeResponse_Update{ - Update: notification, - }, - }); err != nil { - return status.Errorf(codes.Internal, "failed to send response: %v", err) - } - } - - case gpb.SubscriptionList_STREAM: - return status.Errorf(codes.Unimplemented, "subscribe method Stream not implemented") - case gpb.SubscriptionList_POLL: - return status.Errorf(codes.Unimplemented, "subscribe method Poll not implemented") - default: - return status.Errorf(codes.InvalidArgument, "unknown subscribe request mode: %v", mode) - } - - return nil -} - -// handleState handles HTTP requests to the /v1/state endpoint -func (s *Server) handleState(w http.ResponseWriter, r *http.Request) { - switch r.Method { - case http.MethodGet: - s.State.RLock() - defer s.State.RUnlock() - w.Header().Set("Content-Type", "application/json") - w.WriteHeader(http.StatusOK) - if len(s.State.Buf) == 0 { - w.Write([]byte("{}")) - return - } - var buf bytes.Buffer - if err := json.Compact(&buf, s.State.Buf); err != nil { - log.Printf("Failed to compact JSON: %v", err) - w.WriteHeader(http.StatusInternalServerError) - w.Write([]byte("Internal Server Error")) - return - } - w.Write(buf.Bytes()) - case http.MethodDelete: - s.State.Lock() - defer s.State.Unlock() - s.State.Buf = nil - w.WriteHeader(http.StatusNoContent) - default: - w.WriteHeader(http.StatusMethodNotAllowed) - } -} - -// State represents a JSON body that can be manipulated using [sjson] syntax. -type State struct { - sync.RWMutex - - Buf []byte -} - -func (s State) Get(path *gpb.Path) []byte { - s.RLock() - defer s.RUnlock() - var sb strings.Builder - for _, elem := range path.GetElem() { - if elem.GetName() == "" { - continue - } - if sb.Len() > 0 { - sb.WriteByte('|') - } - sb.WriteString(elem.GetName()) - if len(elem.GetKey()) == 0 { - continue - } - for k, v := range elem.GetKey() { - sb.WriteByte('|') - sb.WriteString(`#(`) - sb.WriteString(k) - sb.WriteString(`=="`) - sb.WriteString(v) - sb.WriteString(`")#`) - } - } - res := gjson.GetBytes(s.Buf, sb.String()) - if !res.Exists() || (res.IsArray() && len(res.Array()) == 0) { - return []byte("null") - } - return []byte(res.Raw) -} - -func (s *State) Set(path *gpb.Path, raw []byte) { - s.Lock() - defer s.Unlock() - var sb strings.Builder - for _, elem := range path.GetElem() { - if elem.GetName() == "" { - continue - } - if sb.Len() > 0 { - sb.WriteByte('.') - } - sb.WriteString(elem.GetName()) - if len(elem.GetKey()) == 0 { - continue - } - var idx int - gjson.GetBytes(s.Buf, sb.String()).ForEach(func(_, r gjson.Result) bool { - for k, v := range elem.GetKey() { - if r.Get(k).String() != v { - idx++ - return true - } - } - return false - }) - sb.WriteByte('.') - sb.WriteString(strconv.Itoa(idx)) - } - s.Buf, _ = sjson.SetRawBytes(s.Buf, sb.String(), raw) //nolint:errcheck -} - -func (s *State) Del(path *gpb.Path) { - s.Lock() - defer s.Unlock() - var sb strings.Builder - for _, elem := range path.GetElem() { - if elem.GetName() == "" { - continue - } - if sb.Len() > 0 { - sb.WriteByte('.') - } - sb.WriteString(elem.GetName()) - if len(elem.GetKey()) == 0 { - continue - } - var ( - idx int - found bool - ) - gjson.GetBytes(s.Buf, sb.String()).ForEach(func(_, r gjson.Result) bool { - for k, v := range elem.GetKey() { - if r.Get(k).String() != v { - idx++ - return true - } - } - found = true - return false - }) - if !found { - return - } - sb.WriteByte('.') - sb.WriteString(strconv.Itoa(idx)) - } - - s.Buf, _ = sjson.DeleteBytes(s.Buf, sb.String()) //nolint:errcheck -} - func main() { // Parse command line flags port := flag.Int("port", 9339, "The gRPC server port") httpPort := flag.Int("http-port", 8000, "The HTTP server port") + nxos := flag.Bool("nxos", false, "Enable NX-OS behavior (strip DME markers)") flag.Parse() - // Create a listener on the specified port - lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port)) - if err != nil { - log.Fatalf("Failed to listen on port %d: %v", *port, err) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Build server options + opts := []testserver.ServerOption{ + testserver.WithGRPCPort(*port), + testserver.WithHTTPPort(*httpPort), + testserver.WithBindAddress("0.0.0.0"), + } + if *nxos { + opts = append(opts, testserver.WithNXOSBehavior()) } - // Create a TLS certificate for gRPC server - // This is a self-signed certificate for testing purposes. - cert, err := gtls.NewCert() + // Start the server using the reusable NewTestServer function + // Bind to 0.0.0.0 to accept connections from other pods in the cluster + server, grpcAddr, httpAddr, err := testserver.NewTestServer(ctx, opts...) if err != nil { - log.Fatalf("Failed to create TLS certificate: %v", err) + log.Fatalf("Failed to start server: %v", err) } - // Create a new gRPC server with TLS - grpcServer := grpc.NewServer(grpc.Creds(credentials.NewTLS(&tls.Config{ - Certificates: []tls.Certificate{cert}, - }))) - - // Create our server implementation - server := &Server{State: &State{}} - - // Register the GNMIService with our server implementation - gpb.RegisterGNMIServer(grpcServer, server) - - // Enable reflection for easier testing with tools like grpcurl - reflection.Register(grpcServer) - - // Setup HTTP server - http.HandleFunc("/v1/state", server.handleState) - httpServer := &http.Server{Addr: fmt.Sprintf(":%d", *httpPort)} - - // Start HTTP server in a goroutine - go func() { - log.Printf("Starting HTTP server on port %d", *httpPort) - log.Printf("HTTP endpoint available at: /v1/state") - if err := httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed { - log.Fatalf("Failed to serve HTTP server: %v", err) - } - }() - - log.Printf("Starting gRPC server on port %d", *port) - log.Printf("Server is ready to accept connections...") + log.Printf("gRPC server listening on %s", grpcAddr) + log.Printf("HTTP server listening on %s", httpAddr) + log.Printf("HTTP endpoint available at: /v1/state") log.Printf("Use --port flag to specify a different gRPC port (default: 9339)") log.Printf("Use --http-port flag to specify a different HTTP port (default: 8000)") log.Printf("Available services: GNMI") - // Start serving - if err := grpcServer.Serve(lis); err != nil { - log.Fatalf("Failed to serve gRPC server: %v", err) + // Wait for interrupt signal + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) + <-sigCh + + log.Println("Shutting down...") + if err := server.Close(); err != nil { + log.Printf("Error during shutdown: %v", err) } } diff --git a/test/gnmi/testserver/server.go b/test/gnmi/testserver/server.go new file mode 100644 index 000000000..c8a7aa734 --- /dev/null +++ b/test/gnmi/testserver/server.go @@ -0,0 +1,636 @@ +// SPDX-FileCopyrightText: 2026 SAP SE or an SAP affiliate company and IronCore contributors +// SPDX-License-Identifier: Apache-2.0 + +package testserver + +import ( + "bytes" + "context" + "crypto/tls" + "encoding/json" + "fmt" + "io" + "log" + "net" + "net/http" + "strconv" + "strings" + "sync" + "time" + + gpb "github.com/openconfig/gnmi/proto/gnmi" + "github.com/tidwall/gjson" + "github.com/tidwall/sjson" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/reflection" + "google.golang.org/grpc/status" + + gtls "github.com/openconfig/gnmi/testing/fake/testing/tls" +) + +var _ gpb.GNMIServer = (*Server)(nil) + +// Server implements the GNMI gRPC server +type Server struct { + gpb.UnimplementedGNMIServer + + State *State + + grpcServer *grpc.Server + httpServer *http.Server + grpcAddr string + httpAddr string +} + +// ServerOption configures the test server +type ServerOption func(*serverConfig) + +type serverConfig struct { + grpcPort int + httpPort int + bindAddress string + stripDMEMarkers bool + dmeMarkerValue string +} + +// WithGRPCPort sets a specific gRPC port (default: 0 for random) +func WithGRPCPort(port int) ServerOption { + return func(c *serverConfig) { + c.grpcPort = port + } +} + +// WithHTTPPort sets a specific HTTP port (default: 0 for random) +func WithHTTPPort(port int) ServerOption { + return func(c *serverConfig) { + c.httpPort = port + } +} + +// WithBindAddress sets the address to bind to (default: 127.0.0.1). +// Use "0.0.0.0" to listen on all interfaces (required for container/pod deployments). +func WithBindAddress(addr string) ServerOption { + return func(c *serverConfig) { + c.bindAddress = addr + } +} + +// WithNXOSBehavior configures the server to emulate NX-OS device behavior: +// - Strips fields with DME_UNSET_PROPERTY_MARKER value when storing (the marker +// means "unset this field", not "store this literal string") +// - Returns empty TypedValue for non-existent paths (instead of NOT_FOUND error) +func WithNXOSBehavior() ServerOption { + return func(c *serverConfig) { + c.stripDMEMarkers = true + c.dmeMarkerValue = "DME_UNSET_PROPERTY_MARKER" + } +} + +// NewTestServer starts an in-process gNMI + HTTP server. +// By default, it uses random available ports. Use WithGRPCPort/WithHTTPPort to specify ports. +// Returns the server, gRPC address, HTTP address, and any error. +func NewTestServer(ctx context.Context, opts ...ServerOption) (*Server, string, string, error) { + cfg := &serverConfig{ + grpcPort: 0, // Random port by default + httpPort: 0, // Random port by default + bindAddress: "127.0.0.1", // Localhost by default (safe for in-process tests) + } + for _, opt := range opts { + opt(cfg) + } + + // Create a listener on the specified port + grpcLis, err := net.Listen("tcp", fmt.Sprintf("%s:%d", cfg.bindAddress, cfg.grpcPort)) + if err != nil { + return nil, "", "", fmt.Errorf("failed to listen for gRPC: %w", err) + } + + httpLis, err := net.Listen("tcp", fmt.Sprintf("%s:%d", cfg.bindAddress, cfg.httpPort)) + if err != nil { + grpcLis.Close() + return nil, "", "", fmt.Errorf("failed to listen for HTTP: %w", err) + } + + // Create a TLS certificate for gRPC server + cert, err := gtls.NewCert() + if err != nil { + grpcLis.Close() + httpLis.Close() + return nil, "", "", fmt.Errorf("failed to create TLS certificate: %w", err) + } + + // Create a new gRPC server with TLS + grpcServer := grpc.NewServer(grpc.Creds(credentials.NewTLS(&tls.Config{ + Certificates: []tls.Certificate{cert}, + }))) + + // Create our server implementation + server := &Server{ + State: &State{ + stripDMEMarkers: cfg.stripDMEMarkers, + dmeMarkerValue: cfg.dmeMarkerValue, + }, + grpcServer: grpcServer, + grpcAddr: grpcLis.Addr().String(), + httpAddr: httpLis.Addr().String(), + } + + // Register the GNMIService with our server implementation + gpb.RegisterGNMIServer(grpcServer, server) + + // Enable reflection for easier testing + reflection.Register(grpcServer) + + // Setup HTTP server + mux := http.NewServeMux() + mux.HandleFunc("/v1/state", server.handleState) + mux.HandleFunc("/v1/clear", server.handleClear) + server.httpServer = &http.Server{Handler: mux} + + // Start HTTP server in a goroutine + go func() { + log.Printf("Starting HTTP server on %s", server.httpAddr) + if err := server.httpServer.Serve(httpLis); err != nil && err != http.ErrServerClosed { + log.Printf("HTTP server error: %v", err) + } + }() + + // Start gRPC server in a goroutine + go func() { + log.Printf("Starting gRPC server on %s", server.grpcAddr) + if err := grpcServer.Serve(grpcLis); err != nil { + log.Printf("gRPC server error: %v", err) + } + }() + + return server, server.grpcAddr, server.httpAddr, nil +} + +// GRPCAddr returns the gRPC server address +func (s *Server) GRPCAddr() string { + return s.grpcAddr +} + +// HTTPAddr returns the HTTP server address +func (s *Server) HTTPAddr() string { + return s.httpAddr +} + +// GetState returns the current JSON state +func (s *Server) GetState() ([]byte, error) { + s.State.RLock() + defer s.State.RUnlock() + if len(s.State.Buf) == 0 { + return []byte("{}"), nil + } + var buf bytes.Buffer + if err := json.Compact(&buf, s.State.Buf); err != nil { + return nil, fmt.Errorf("failed to compact JSON: %w", err) + } + return buf.Bytes(), nil +} + +// ClearState clears all accumulated state +func (s *Server) ClearState() { + s.State.Lock() + defer s.State.Unlock() + s.State.Buf = nil +} + +// Close gracefully shuts down the server +func (s *Server) Close() error { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + var errs []error + if s.httpServer != nil { + if err := s.httpServer.Shutdown(ctx); err != nil { + errs = append(errs, fmt.Errorf("HTTP shutdown: %w", err)) + } + } + if s.grpcServer != nil { + s.grpcServer.GracefulStop() + } + if len(errs) > 0 { + return errs[0] + } + return nil +} + +func (s *Server) Capabilities(_ context.Context, _ *gpb.CapabilityRequest) (*gpb.CapabilityResponse, error) { + return &gpb.CapabilityResponse{SupportedEncodings: []gpb.Encoding{gpb.Encoding_JSON}}, nil +} + +func (s *Server) Get(_ context.Context, req *gpb.GetRequest) (*gpb.GetResponse, error) { + notifications := make([]*gpb.Notification, 0, len(req.GetPath())) + for _, path := range req.GetPath() { + if len(path.GetElem()) == 0 { + return nil, status.Error(codes.InvalidArgument, "root path is not allowed") + } + log.Printf("Getting path: %v", path) + notifications = append(notifications, &gpb.Notification{ + Timestamp: time.Now().UnixNano(), + Update: []*gpb.Update{ + { + Path: path, + Val: &gpb.TypedValue{ + Value: &gpb.TypedValue_JsonVal{ + JsonVal: s.State.Get(path), + }, + }, + }, + }, + }) + } + return &gpb.GetResponse{ + Notification: notifications, + }, nil +} + +func (s *Server) Set(_ context.Context, req *gpb.SetRequest) (*gpb.SetResponse, error) { + log.Printf("Received Set request: %v", req) + res := make([]*gpb.UpdateResult, 0, len(req.GetDelete())+len(req.GetUpdate())) + for _, del := range req.GetDelete() { + log.Printf("Deleting path: %v", del) + res = append(res, &gpb.UpdateResult{ + Timestamp: time.Now().UnixNano(), + Path: del, + Op: gpb.UpdateResult_DELETE, + }) + s.State.Del(del) + } + for _, replace := range req.GetReplace() { + log.Printf("Replacing path: %v with value: %q", replace.GetPath(), replace.GetVal().GetJsonVal()) + res = append(res, &gpb.UpdateResult{ + Timestamp: time.Now().UnixNano(), + Path: replace.Path, + Op: gpb.UpdateResult_REPLACE, + }) + // Delete the existing value at the path and set the new value. + s.State.Del(replace.GetPath()) + s.State.Set(replace.GetPath(), replace.GetVal().GetJsonVal()) + } + for _, update := range req.GetUpdate() { + log.Printf("Updating path: %v with value: %q", update.GetPath(), update.GetVal().GetJsonVal()) + res = append(res, &gpb.UpdateResult{ + Timestamp: time.Now().UnixNano(), + Path: update.Path, + Op: gpb.UpdateResult_UPDATE, + }) + // The value will automatically be merged into the existing state. + s.State.Set(update.GetPath(), update.GetVal().GetJsonVal()) + } + // TODO: Handle UnionReplace + return &gpb.SetResponse{ + Response: res, + Timestamp: time.Now().UnixNano(), + }, nil +} + +func (s *Server) Subscribe(stream grpc.BidiStreamingServer[gpb.SubscribeRequest, gpb.SubscribeResponse]) error { + req, err := stream.Recv() + switch { + case err == io.EOF: + return nil + case err != nil: + return err + case req.GetSubscribe() == nil: + return status.Errorf(codes.InvalidArgument, "the subscribe request must contain a subscription definition") + } + + switch req.GetRequest().(type) { + case *gpb.SubscribeRequest_Poll: + return status.Errorf(codes.InvalidArgument, "invalid request type: %T", req.GetRequest()) + case *gpb.SubscribeRequest_Subscribe: + } + + switch mode := req.GetSubscribe().GetMode(); mode { + case gpb.SubscriptionList_ONCE: + log.Printf("Received Subscribe request with ONCE mode") + + paths := make([]*gpb.Path, 0, len(req.GetSubscribe().GetSubscription())) + for _, r := range req.GetSubscribe().GetSubscription() { + paths = append(paths, r.GetPath()) + } + + res, err := s.Get(stream.Context(), &gpb.GetRequest{ + Prefix: req.GetSubscribe().GetPrefix(), + Path: paths, + Encoding: req.GetSubscribe().GetEncoding(), + UseModels: req.GetSubscribe().GetUseModels(), + Extension: req.GetExtension(), + }) + if err != nil { + return err + } + + for _, notification := range res.GetNotification() { + if err := stream.Send(&gpb.SubscribeResponse{ + Response: &gpb.SubscribeResponse_Update{ + Update: notification, + }, + }); err != nil { + return status.Errorf(codes.Internal, "failed to send response: %v", err) + } + } + + case gpb.SubscriptionList_STREAM: + return status.Errorf(codes.Unimplemented, "subscribe method Stream not implemented") + case gpb.SubscriptionList_POLL: + return status.Errorf(codes.Unimplemented, "subscribe method Poll not implemented") + default: + return status.Errorf(codes.InvalidArgument, "unknown subscribe request mode: %v", mode) + } + + return nil +} + +// handleState handles HTTP requests to the /v1/state endpoint +// GET: returns current state as JSON +// POST: preloads nested JSON into state +// DELETE: clears all state +// Supports X-HTTP-Method-Override header for clients that can't send DELETE. +func (s *Server) handleState(w http.ResponseWriter, r *http.Request) { + method := r.Method + if override := r.Header.Get("X-HTTP-Method-Override"); override != "" { + method = override + } + switch method { + case http.MethodGet: + state, err := s.GetState() + if err != nil { + log.Printf("Failed to get state: %v", err) + w.WriteHeader(http.StatusInternalServerError) + w.Write([]byte("Internal Server Error")) + return + } + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + w.Write(state) + case http.MethodPost: + body, err := io.ReadAll(r.Body) + if err != nil { + log.Printf("Failed to read body: %v", err) + http.Error(w, "Failed to read body", http.StatusBadRequest) + return + } + if len(body) == 0 { + w.WriteHeader(http.StatusNoContent) + return + } + if !gjson.ValidBytes(body) { + http.Error(w, "invalid JSON", http.StatusBadRequest) + return + } + // Use Set with empty path to merge JSON into root of state + s.State.Set(&gpb.Path{}, body) + log.Printf("Merged state from JSON") + w.WriteHeader(http.StatusNoContent) + case http.MethodDelete: + s.ClearState() + w.WriteHeader(http.StatusNoContent) + default: + w.WriteHeader(http.StatusMethodNotAllowed) + } +} + +// handleClear handles POST /v1/clear to clear all state. +func (s *Server) handleClear(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + w.WriteHeader(http.StatusMethodNotAllowed) + return + } + s.ClearState() + w.WriteHeader(http.StatusNoContent) +} + +// mergeJSON merges src JSON into dst JSON at the root level. +// Keys in src overwrite keys in dst. +func mergeJSON(dst, src []byte) []byte { + srcParsed := gjson.ParseBytes(src) + if !srcParsed.IsObject() { + return src + } + result := dst + srcParsed.ForEach(func(key, value gjson.Result) bool { + result, _ = sjson.SetRawBytes(result, key.String(), []byte(value.Raw)) + return true + }) + return result +} + +// State represents a JSON body that can be manipulated using [sjson] syntax. +type State struct { + sync.RWMutex + + Buf []byte + + // NX-OS behavior options + stripDMEMarkers bool + dmeMarkerValue string +} + +// stripMarkerFields removes fields with the DME marker value from JSON recursively. +// This emulates NX-OS behavior where these markers mean "unset this field" +// rather than "store this literal string". +func (s *State) stripMarkerFields(data []byte) []byte { + if !s.stripDMEMarkers || s.dmeMarkerValue == "" { + return data + } + return s.stripMarkersRecursive(data) +} + +// stripMarkersRecursive walks the JSON structure and removes marker fields at all levels. +func (s *State) stripMarkersRecursive(data []byte) []byte { + parsed := gjson.ParseBytes(data) + if !parsed.IsObject() && !parsed.IsArray() { + return data + } + + if parsed.IsArray() { + // Process each array element + var results []string + parsed.ForEach(func(_, value gjson.Result) bool { + processed := s.stripMarkersRecursive([]byte(value.Raw)) + results = append(results, string(processed)) + return true + }) + return []byte("[" + strings.Join(results, ",") + "]") + } + + // It's an object - process fields + var toDelete []string + parsed.ForEach(func(key, value gjson.Result) bool { + keyStr := key.String() + if value.Type == gjson.String && value.String() == s.dmeMarkerValue { + toDelete = append(toDelete, keyStr) + } else if value.IsObject() || value.IsArray() { + // Recurse into nested structures + processed := s.stripMarkersRecursive([]byte(value.Raw)) + data, _ = sjson.SetRawBytes(data, keyStr, processed) + } + return true + }) + for _, key := range toDelete { + data, _ = sjson.DeleteBytes(data, key) + } + return data +} + +func (s *State) Get(path *gpb.Path) []byte { + s.RLock() + defer s.RUnlock() + var sb strings.Builder + for _, elem := range path.GetElem() { + if elem.GetName() == "" { + continue + } + if sb.Len() > 0 { + sb.WriteByte('|') + } + sb.WriteString(elem.GetName()) + if len(elem.GetKey()) == 0 { + continue + } + for k, v := range elem.GetKey() { + sb.WriteByte('|') + sb.WriteString(`#(`) + sb.WriteString(k) + sb.WriteString(`=="`) + sb.WriteString(v) + sb.WriteString(`")#`) + } + } + res := gjson.GetBytes(s.Buf, sb.String()) + if !res.Exists() || (res.IsArray() && len(res.Array()) == 0) { + // Return empty bytes for non-existent paths. This triggers gnmiext's + // ErrNil handling (len(b) == 0), matching real NX-OS behavior which + // returns empty TypedValue for paths that don't exist yet. + return []byte{} + } + return []byte(res.Raw) +} + +func (s *State) Set(path *gpb.Path, raw []byte) { + s.Lock() + defer s.Unlock() + + // Strip DME marker fields if NX-OS behavior is enabled + raw = s.stripMarkerFields(raw) + + elems := path.GetElem() + + // Handle empty path - merge raw into state at root level + if len(elems) == 0 { + if len(s.Buf) == 0 { + s.Buf = raw + } else { + s.Buf = mergeJSON(s.Buf, raw) + } + return + } + + var sb strings.Builder + + for i, elem := range elems { + if elem.GetName() == "" { + continue + } + if sb.Len() > 0 { + sb.WriteByte('.') + } + sb.WriteString(elem.GetName()) + + if len(elem.GetKey()) == 0 { + continue + } + + // Find existing array index or append + var idx int + gjson.GetBytes(s.Buf, sb.String()).ForEach(func(_, r gjson.Result) bool { + for k, v := range elem.GetKey() { + if r.Get(k).String() != v { + idx++ + return true + } + } + return false + }) + sb.WriteByte('.') + sb.WriteString(strconv.Itoa(idx)) + + // Inject keys into this list element if it's not the final element + // (for the final element, keys go into raw below) + if i < len(elems)-1 { + currentPath := sb.String() + current := gjson.GetBytes(s.Buf, currentPath) + if !current.Exists() || current.Raw == "null" { + // Create the element with its keys + keyObj := make(map[string]string) + for k, v := range elem.GetKey() { + keyObj[k] = v + } + keyJSON, _ := json.Marshal(keyObj) + s.Buf, _ = sjson.SetRawBytes(s.Buf, currentPath, keyJSON) + } else { + // Element exists, ensure keys are set + for k, v := range elem.GetKey() { + if !gjson.GetBytes(s.Buf, currentPath+"."+k).Exists() { + s.Buf, _ = sjson.SetBytes(s.Buf, currentPath+"."+k, v) + } + } + } + } + } + + // For the final element, inject its keys (from the last keyed element) into raw + lastElem := elems[len(elems)-1] + for k, v := range lastElem.GetKey() { + if !gjson.GetBytes(raw, k).Exists() { + raw, _ = sjson.SetBytes(raw, k, v) + } + } + + s.Buf, _ = sjson.SetRawBytes(s.Buf, sb.String(), raw) //nolint:errcheck +} + +func (s *State) Del(path *gpb.Path) { + s.Lock() + defer s.Unlock() + var sb strings.Builder + for _, elem := range path.GetElem() { + if elem.GetName() == "" { + continue + } + if sb.Len() > 0 { + sb.WriteByte('.') + } + sb.WriteString(elem.GetName()) + if len(elem.GetKey()) == 0 { + continue + } + var ( + idx int + found bool + ) + gjson.GetBytes(s.Buf, sb.String()).ForEach(func(_, r gjson.Result) bool { + for k, v := range elem.GetKey() { + if r.Get(k).String() != v { + idx++ + return true + } + } + found = true + return false + }) + if !found { + return + } + sb.WriteByte('.') + sb.WriteString(strconv.Itoa(idx)) + } + + s.Buf, _ = sjson.DeleteBytes(s.Buf, sb.String()) //nolint:errcheck +}