Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func NewKubernetesAgent(e config.Env, resourceName string, kubeProvider *kuberne
RepoURL: params.HelmRepoURL,
ValuesYAML: params.HelmValues,
Fakeintake: params.FakeIntake,
V3MetricsEnabled: params.V3MetricsEnabled,
AgentFullImagePath: params.AgentFullImagePath,
ClusterAgentFullImagePath: params.ClusterAgentFullImagePath,
DualShipping: params.DualShipping,
Expand Down
24 changes: 21 additions & 3 deletions test/e2e-framework/components/datadog/agent/kubernetes_helm.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ type HelmInstallationArgs struct {
// HelmChartVersion overrides the default HelmVersion for this installation.
// When empty, HelmVersion is used.
HelmChartVersion string
// V3MetricsEnabled adds DD_SERIALIZER_EXPERIMENTAL_USE_V3_API_SERIES_ENDPOINTS and
// DD_SERIALIZER_EXPERIMENTAL_USE_V3_API_SKETCHES_ENDPOINTS env vars pointing at the
// fakeintake URL so the agent serializer uses /api/intake/metrics/v3/series instead of
// /api/v2/series.
V3MetricsEnabled bool
}

type HelmComponent struct {
Expand Down Expand Up @@ -187,7 +192,7 @@ func NewHelmInstallation(e config.Env, args HelmInstallationArgs, opts ...pulumi
values = buildLinuxHelmValues(baseName, agentImagePath, agentImageTag, clusterAgentImagePath, clusterAgentImageTag, randomClusterAgentToken.Result, !args.DisableLogsContainerCollectAll, e.TestingWorkloadDeploy(), args.FIPS, true)
}
values.configureImagePullSecret(imgPullSecret)
values.configureFakeintake(e, args.Fakeintake, args.DualShipping)
values.configureFakeintake(e, args.Fakeintake, args.DualShipping, args.V3MetricsEnabled)

defaultYAMLValues := values.ToYAMLPulumiAssetOutput()

Expand Down Expand Up @@ -239,7 +244,7 @@ func NewHelmInstallation(e config.Env, args HelmInstallationArgs, opts ...pulumi
if args.DeployWindows {
values := buildWindowsHelmValues(baseName, agentImagePath, agentImageTag, clusterAgentImagePath, clusterAgentImageTag)
values.configureImagePullSecret(imgPullSecret)
values.configureFakeintake(e, args.Fakeintake, args.DualShipping)
values.configureFakeintake(e, args.Fakeintake, args.DualShipping, args.V3MetricsEnabled)
defaultYAMLValues := values.ToYAMLPulumiAssetOutput()

var windowsValuesYAML pulumi.AssetOrArchiveArray
Expand Down Expand Up @@ -884,7 +889,7 @@ func (values HelmValues) configureImagePullSecret(secret *corev1.Secret) {
}
}

func (values HelmValues) configureFakeintake(e config.Env, fakeintake *fakeintake.Fakeintake, dualShipping bool) {
func (values HelmValues) configureFakeintake(e config.Env, fakeintake *fakeintake.Fakeintake, dualShipping bool, v3MetricsEnabled bool) {
if fakeintake == nil {
return
}
Expand Down Expand Up @@ -978,6 +983,19 @@ func (values HelmValues) configureFakeintake(e config.Env, fakeintake *fakeintak
}
}

if v3MetricsEnabled {
endpointsEnvVar = append(endpointsEnvVar,
pulumi.StringMap{
"name": pulumi.String("DD_SERIALIZER_EXPERIMENTAL_USE_V3_API_SERIES_ENDPOINTS"),
"value": pulumi.Sprintf("%s", fakeintake.URL),
},
pulumi.StringMap{
"name": pulumi.String("DD_SERIALIZER_EXPERIMENTAL_USE_V3_API_SKETCHES_ENDPOINTS"),
"value": pulumi.Sprintf("%s", fakeintake.URL),
},
)
}

for _, section := range []string{"datadog", "clusterAgent", "clusterChecksRunner"} {
if _, ok := values[section].(pulumi.Map); !ok {
continue
Expand Down
35 changes: 35 additions & 0 deletions test/e2e-framework/components/datadog/agentparams/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ type Params struct {
// parameters like the MSI flags.
AdditionalInstallParameters []string
SkipAPIKeyInConfig bool

// intakeScheme, intakeHostname, intakePort are stored by withIntakeHostname so that
// WithV3MetricsEnabled (applied after fakeintake wiring) can inject V3 endpoints config.
intakeScheme pulumi.StringInput
intakeHostname pulumi.StringInput
intakePort pulumi.IntInput
}

type Option = func(*Params) error
Expand Down Expand Up @@ -268,6 +274,11 @@ func WithPulumiResourceOptions(resources ...pulumi.ResourceOption) func(*Params)

func withIntakeHostname(scheme pulumi.StringInput, hostname pulumi.StringInput, port pulumi.IntInput) func(*Params) error {
return func(p *Params) error {
// Store so that WithV3MetricsEnabled (applied after) can build V3 endpoints config.
p.intakeScheme = scheme
p.intakeHostname = hostname
p.intakePort = port

extraConfig := pulumi.Sprintf(`dd_url: %[3]s://%[1]s:%[2]d
logs_config.logs_dd_url: %[1]s:%[2]d
logs_config.logs_no_ssl: true
Expand Down Expand Up @@ -338,6 +349,30 @@ func WithFakeintake(fakeintake *fakeintake.Fakeintake) func(*Params) error {
}
}

// WithV3MetricsEnabled opts the agent into the V3 metrics intake API for its primary fakeintake
// endpoint. It adds serializer_experimental_use_v3_api.series/sketches.endpoints pointing at the
// same URL used for dd_url, so the serializer sends to /api/intake/metrics/v3/series instead of
// /api/v2/series.
//
// Must be called after WithFakeintake (or WithIntakeHostname) so the intake URL is known.
func WithV3MetricsEnabled() func(*Params) error {
return func(p *Params) error {
if p.intakeHostname == nil || p.intakePort == nil || p.intakeScheme == nil {
return fmt.Errorf("WithV3MetricsEnabled must be called after WithFakeintake or WithIntakeHostname")
}
v3Config := pulumi.Sprintf(`serializer_experimental_use_v3_api:
series:
endpoints:
- %[3]s://%[1]s:%[2]d
sketches:
endpoints:
- %[3]s://%[1]s:%[2]d
Comment on lines +367 to +369
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Stop rerouting sketches until fakeintake can read v3 sketches

This option now enables serializer_experimental_use_v3_api.sketches.endpoints, which redirects sketch/distribution traffic to /api/intake/metrics/v3/sketches, but fakeintake still has no corresponding v3-sketch retrieval path (FilterSketches still queries /api/beta/sketches, and there is no /api/intake/metrics/v3/sketches handler in serverstore/parser.go). As soon as a v3-enabled test needs to assert sketch metrics, it will get empty results despite sketches being sent.

Useful? React with 👍 / 👎.

Comment on lines +367 to +369
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Don’t enable V3 sketches without fakeintake sketch support

This now configures serializer_experimental_use_v3_api.sketches.endpoints, so sketch/distribution payloads are sent to /api/intake/metrics/v3/sketches, but fakeintake still only exposes sketch parsing/querying on /api/beta/sketches (FilterSketches/GetSketchNames in test/fakeintake/client/client.go). In suites that enable V3 and assert on sketch metrics, those assertions will incorrectly see no data because sketches have been rerouted to an unsupported endpoint.

Useful? React with 👍 / 👎.

`, p.intakeHostname, p.intakePort, p.intakeScheme)
p.ExtraAgentConfig = append(p.ExtraAgentConfig, v3Config)
return nil
}
}

// WithLogs enables the log agent
func WithLogs() func(*Params) error {
return func(p *Params) error {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ type Params struct {
// HelmChartVersion overrides the default Helm chart version for this installation.
// When empty, the framework default HelmVersion is used.
HelmChartVersion string
// V3MetricsEnabled opts the agent into V3 metrics intake by injecting
// DD_SERIALIZER_EXPERIMENTAL_USE_V3_API_SERIES_ENDPOINTS pointing at the fakeintake URL.
V3MetricsEnabled bool
}

type Option = func(*Params) error
Expand Down Expand Up @@ -372,3 +375,14 @@ datadog:
return nil
}
}

// WithV3MetricsEnabled opts the agent into V3 metrics intake. When a fakeintake is configured
// it injects DD_SERIALIZER_EXPERIMENTAL_USE_V3_API_SERIES_ENDPOINTS and
// DD_SERIALIZER_EXPERIMENTAL_USE_V3_API_SKETCHES_ENDPOINTS pointing at the fakeintake URL so
// the serializer sends to /api/intake/metrics/v3/series instead of /api/v2/series.
func WithV3MetricsEnabled() func(*Params) error {
return func(p *Params) error {
p.V3MetricsEnabled = true
return nil
}
}
9 changes: 8 additions & 1 deletion test/fakeintake/aggregator/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ go_library(
"metadataAggregator.go",
"metricAggregator.go",
"metricAggregatorV1.go",
"metricAggregatorV3.go",
"ncmAggregator.go",
"ndmAggregator.go",
"ndmflowAggregator.go",
Expand All @@ -35,15 +36,17 @@ go_library(
"//comp/netflow/payload",
"//pkg/metrics/event",
"//pkg/networkpath/payload",
"//pkg/proto/pbgo/dogstatsdhttp",
"//pkg/proto/pbgo/trace",
"//test/fakeintake/aggregator/internal/reader",
"//test/fakeintake/api",
"@com_github_datadog_agent_payload_v5//contimage",
"@com_github_datadog_agent_payload_v5//contlcycle",
"@com_github_datadog_agent_payload_v5//gogen",
"@com_github_datadog_agent_payload_v5//healthplatform",
"@com_github_datadog_agent_payload_v5//process",
"@com_github_datadog_agent_payload_v5//sbom",
"@com_github_datadog_zstd//:zstd",
"@com_github_klauspost_compress//zstd",
"@com_github_samber_lo//:lo",
"@com_github_tinylib_msgp//msgp",
"@org_golang_google_protobuf//proto",
Expand All @@ -65,6 +68,7 @@ go_test(
"logAggregator_test.go",
"metadataAggregator_test.go",
"metricAggregatorV1_test.go",
"metricAggregatorV3_test.go",
"metricAggregator_test.go",
"ndmAggregator_test.go",
"ndmflowAggregator_test.go",
Expand Down Expand Up @@ -105,12 +109,15 @@ go_test(
deps = [
"//pkg/metrics/event",
"//pkg/networkpath/payload",
"//pkg/proto/pbgo/dogstatsdhttp",
"//test/fakeintake/api",
"//test/fakeintake/fixtures",
"@com_github_datadog_agent_payload_v5//gogen",
"@com_github_datadog_agent_payload_v5//process",
"@com_github_datadog_zstd//:zstd",
"@com_github_kr_pretty//:pretty",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@org_golang_google_protobuf//encoding/protowire",
],
)
19 changes: 16 additions & 3 deletions test/fakeintake/aggregator/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"sync"
"time"

"github.com/DataDog/zstd"
"github.com/klauspost/compress/zstd"

"github.com/DataDog/datadog-agent/test/fakeintake/api"
)
Expand Down Expand Up @@ -154,6 +154,10 @@ func (agg *Aggregator[P]) getNamesUnsorted() []string {
}

func inflate(payload []byte, encoding string) (inflated []byte, err error) {
if encoding == encodingZstd {
return inflateZstd(payload)
}

rc, err := getReadCloserForEncoding(payload, encoding)
if err != nil {
return nil, err
Expand All @@ -166,14 +170,23 @@ func inflate(payload []byte, encoding string) (inflated []byte, err error) {
return inflated, nil
}

func inflateZstd(payload []byte) ([]byte, error) {
// Metrics V3 payloads concatenate independently compressed protobuf
// headers and columns, so zstd decoding must consume all frames.
decoder, err := zstd.NewReader(nil)
if err != nil {
return nil, err
}
defer decoder.Close()
return decoder.DecodeAll(payload, nil)
}

func getReadCloserForEncoding(payload []byte, encoding string) (rc io.ReadCloser, err error) {
switch encoding {
case encodingGzip:
rc, err = gzip.NewReader(bytes.NewReader(payload))
case encodingDeflate:
rc, err = zlib.NewReader(bytes.NewReader(payload))
case encodingZstd:
rc = zstd.NewReader(bytes.NewReader(payload))
default:
rc = io.NopCloser(bytes.NewReader(payload))
}
Expand Down
13 changes: 13 additions & 0 deletions test/fakeintake/aggregator/internal/reader/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
load("@rules_go//go:def.bzl", "go_library")

go_library(
name = "reader",
srcs = ["reader.go"],
importpath = "github.com/DataDog/datadog-agent/test/fakeintake/aggregator/internal/reader",
visibility = ["//visibility:public"],
deps = [
"//pkg/proto/pbgo/dogstatsdhttp",
"@com_github_datadog_agent_payload_v5//gogen",
"@org_golang_google_protobuf//encoding/protowire",
],
)
Loading
Loading