From e36ebb233ef5c6a91546647181b17cc84030d8de Mon Sep 17 00:00:00 2001 From: Mark Kirichenko Date: Thu, 21 May 2026 16:44:31 +0200 Subject: [PATCH 1/3] feat(fakeintake): add V3 metrics intake support --- test/fakeintake/aggregator/BUILD.bazel | 9 +- test/fakeintake/aggregator/common.go | 19 +- .../aggregator/internal/reader/BUILD.bazel | 13 + .../aggregator/internal/reader/reader.go | 624 ++++++++++++++++++ .../aggregator/metricAggregatorV3.go | 154 +++++ .../aggregator/metricAggregatorV3_test.go | 277 ++++++++ test/fakeintake/client/client.go | 22 + test/fakeintake/go.mod | 2 +- test/fakeintake/server/serverstore/parser.go | 17 +- 9 files changed, 1126 insertions(+), 11 deletions(-) create mode 100644 test/fakeintake/aggregator/internal/reader/BUILD.bazel create mode 100644 test/fakeintake/aggregator/internal/reader/reader.go create mode 100644 test/fakeintake/aggregator/metricAggregatorV3.go create mode 100644 test/fakeintake/aggregator/metricAggregatorV3_test.go diff --git a/test/fakeintake/aggregator/BUILD.bazel b/test/fakeintake/aggregator/BUILD.bazel index 73b6de6599ad..88b99aeffd2a 100644 --- a/test/fakeintake/aggregator/BUILD.bazel +++ b/test/fakeintake/aggregator/BUILD.bazel @@ -17,6 +17,7 @@ go_library( "metadataAggregator.go", "metricAggregator.go", "metricAggregatorV1.go", + "metricAggregatorV3.go", "ncmAggregator.go", "ndmAggregator.go", "ndmflowAggregator.go", @@ -35,7 +36,9 @@ go_library( "//comp/netflow/payload", "//pkg/metrics/event", "//pkg/networkpath/payload", + "//pkg/proto/pbgo/dogstatsdhttp", "//pkg/proto/pbgo/trace", + "//test/fakeintake/aggregator/internal/reader", "//test/fakeintake/api", "@com_github_datadog_agent_payload_v5//contimage", "@com_github_datadog_agent_payload_v5//contlcycle", @@ -43,7 +46,7 @@ go_library( "@com_github_datadog_agent_payload_v5//healthplatform", "@com_github_datadog_agent_payload_v5//process", "@com_github_datadog_agent_payload_v5//sbom", - "@com_github_datadog_zstd//:zstd", + "@com_github_klauspost_compress//zstd", "@com_github_samber_lo//:lo", "@com_github_tinylib_msgp//msgp", "@org_golang_google_protobuf//proto", @@ -65,6 +68,7 @@ go_test( "logAggregator_test.go", "metadataAggregator_test.go", "metricAggregatorV1_test.go", + "metricAggregatorV3_test.go", "metricAggregator_test.go", "ndmAggregator_test.go", "ndmflowAggregator_test.go", @@ -105,12 +109,15 @@ go_test( deps = [ "//pkg/metrics/event", "//pkg/networkpath/payload", + "//pkg/proto/pbgo/dogstatsdhttp", "//test/fakeintake/api", "//test/fakeintake/fixtures", "@com_github_datadog_agent_payload_v5//gogen", "@com_github_datadog_agent_payload_v5//process", + "@com_github_datadog_zstd//:zstd", "@com_github_kr_pretty//:pretty", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", + "@org_golang_google_protobuf//encoding/protowire", ], ) diff --git a/test/fakeintake/aggregator/common.go b/test/fakeintake/aggregator/common.go index cd299c9958a7..0549810e3480 100644 --- a/test/fakeintake/aggregator/common.go +++ b/test/fakeintake/aggregator/common.go @@ -24,7 +24,7 @@ import ( "sync" "time" - "github.com/DataDog/zstd" + "github.com/klauspost/compress/zstd" "github.com/DataDog/datadog-agent/test/fakeintake/api" ) @@ -154,6 +154,10 @@ func (agg *Aggregator[P]) getNamesUnsorted() []string { } func inflate(payload []byte, encoding string) (inflated []byte, err error) { + if encoding == encodingZstd { + return inflateZstd(payload) + } + rc, err := getReadCloserForEncoding(payload, encoding) if err != nil { return nil, err @@ -166,14 +170,23 @@ func inflate(payload []byte, encoding string) (inflated []byte, err error) { return inflated, nil } +func inflateZstd(payload []byte) ([]byte, error) { + // Metrics V3 payloads concatenate independently compressed protobuf + // headers and columns, so zstd decoding must consume all frames. + decoder, err := zstd.NewReader(nil) + if err != nil { + return nil, err + } + defer decoder.Close() + return decoder.DecodeAll(payload, nil) +} + func getReadCloserForEncoding(payload []byte, encoding string) (rc io.ReadCloser, err error) { switch encoding { case encodingGzip: rc, err = gzip.NewReader(bytes.NewReader(payload)) case encodingDeflate: rc, err = zlib.NewReader(bytes.NewReader(payload)) - case encodingZstd: - rc = zstd.NewReader(bytes.NewReader(payload)) default: rc = io.NopCloser(bytes.NewReader(payload)) } diff --git a/test/fakeintake/aggregator/internal/reader/BUILD.bazel b/test/fakeintake/aggregator/internal/reader/BUILD.bazel new file mode 100644 index 000000000000..568747f1ebf8 --- /dev/null +++ b/test/fakeintake/aggregator/internal/reader/BUILD.bazel @@ -0,0 +1,13 @@ +load("@rules_go//go:def.bzl", "go_library") + +go_library( + name = "reader", + srcs = ["reader.go"], + importpath = "github.com/DataDog/datadog-agent/test/fakeintake/aggregator/internal/reader", + visibility = ["//visibility:public"], + deps = [ + "//pkg/proto/pbgo/dogstatsdhttp", + "@com_github_datadog_agent_payload_v5//gogen", + "@org_golang_google_protobuf//encoding/protowire", + ], +) diff --git a/test/fakeintake/aggregator/internal/reader/reader.go b/test/fakeintake/aggregator/internal/reader/reader.go new file mode 100644 index 000000000000..d57553a6da3a --- /dev/null +++ b/test/fakeintake/aggregator/internal/reader/reader.go @@ -0,0 +1,624 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +// Package reader implements an iterator over encoded dogstatsd http payload. +package reader + +import ( + "encoding/binary" + "errors" + "math" + "slices" + + agentpayload "github.com/DataDog/agent-payload/v5/gogen" + pb "github.com/DataDog/datadog-agent/pkg/proto/pbgo/dogstatsdhttp" + "google.golang.org/protobuf/encoding/protowire" +) + +type originInfo = agentpayload.Origin + +// Resource holds a resource name and type. +type Resource struct { + Name string + Type string +} + +// MetricDataReader is an iterator over data contained in MetricData part of the payload. +// +// Usage: +// +// r := NewMetricDataReader(payload.MetricData) +// if err := r.Initialize(); err != nil { +// return err +// } +// for r.HaveMoreMetrics() { +// if err := r.NextMetric(); err != nil { +// return err +// } +// // Accessors for metric entry data can be called. +// for r.HaveMorePoints() { +// if err := r.NextPoint(); err != nil { +// return err +// } +// // Acessors for metric data point can be called. +// } +// } +type MetricDataReader struct { + data *pb.MetricData + + // Indexes point the next unconsumed element + metricIdx int + pointIdx int + + valsSint64Idx int + valsFloat32Idx int + valsFloat64Idx int + sketchNumBinsIdx int + sketchBinsIdx int + unitRefsIdx int + + pointsRemaining uint64 + + // Accumulators for delta encoded columns + nameRef int64 + tagsRef int64 + resourcesRef int64 + sourceTypeNameRef int64 + originInfoRef int64 + unitRef int64 + currentUnitRef int64 + timestamp int64 + + // Dicts are pre-loaded with empty element at index zero + dictNameStr []string + dictTagStr []string + dictTagsets [][]string + dictResourceStr []string + dictResources [][]*Resource + dictSourceTypeName []string + dictOriginInfo []*originInfo + dictUnitStr []string + unitRefs []int64 +} + +// NewMetricDataReader creates new reader from data. +func NewMetricDataReader(data *pb.MetricData) *MetricDataReader { + return &MetricDataReader{ + data: data, + } +} + +// Initialize reads and normalizes payload dictionaries for fast access. +func (r *MetricDataReader) Initialize() error { + var err error + if r.data == nil { + return nil + } + r.dictNameStr, err = unpackStrDict(r.data.DictNameStr) + if err != nil { + return err + } + r.dictTagStr, err = unpackStrDict(r.data.DictTagStr) + if err != nil { + return err + } + r.dictTagsets, err = r.unpackTagsetsDict() + if err != nil { + return err + } + r.dictResourceStr, err = unpackStrDict(r.data.DictResourceStr) + if err != nil { + return err + } + r.dictResources, err = r.unpackResourcesDict() + if err != nil { + return err + } + r.dictSourceTypeName, err = unpackStrDict(r.data.DictSourceTypeName) + if err != nil { + return err + } + r.dictOriginInfo, err = unpackOriginInfoDict(r.data.DictOriginInfo) + if err != nil { + return err + } + r.dictUnitStr, r.unitRefs, err = unpackUnitFields(r.data.ProtoReflect().GetUnknown()) + if err != nil { + return err + } + return nil +} + +var ( + errUnexpectedEOF = errors.New("unexpected end of column") + errOverflow = errors.New("length field overflow") + errBadReference = errors.New("invalid reference") +) + +const ( + flagHasUnit = 0x200 + + fieldDictUnitStr protowire.Number = 25 + fieldUnitRefs protowire.Number = 26 +) + +func unpackStrDict(raw []byte) ([]string, error) { + dict := []string{""} + + for len(raw) > 0 { + length, n := binary.Uvarint(raw) + if n == 0 { + return nil, errUnexpectedEOF + } + if n < 0 { + return nil, errOverflow + } + if length > uint64(math.MaxInt-n) { + return nil, errOverflow + } + end := n + int(length) + if end > len(raw) { + return nil, errUnexpectedEOF + } + str := string(raw[n:end]) + dict = append(dict, str) + raw = raw[end:] + } + return dict, nil +} + +func unpackUnitFields(raw []byte) ([]string, []int64, error) { + dictUnitStr := []string{""} + var unitRefs []int64 + + for len(raw) > 0 { + field, typ, n := protowire.ConsumeTag(raw) + if n < 0 { + return nil, nil, protowire.ParseError(n) + } + raw = raw[n:] + + switch field { + case fieldDictUnitStr: + if typ != protowire.BytesType { + return nil, nil, errBadReference + } + value, n := protowire.ConsumeBytes(raw) + if n < 0 { + return nil, nil, protowire.ParseError(n) + } + dict, err := unpackStrDict(value) + if err != nil { + return nil, nil, err + } + dictUnitStr = append(dictUnitStr, dict[1:]...) + raw = raw[n:] + case fieldUnitRefs: + if typ != protowire.BytesType { + return nil, nil, errBadReference + } + value, n := protowire.ConsumeBytes(raw) + if n < 0 { + return nil, nil, protowire.ParseError(n) + } + refs, err := unpackPackedSint64(value) + if err != nil { + return nil, nil, err + } + unitRefs = append(unitRefs, refs...) + raw = raw[n:] + default: + n := protowire.ConsumeFieldValue(field, typ, raw) + if n < 0 { + return nil, nil, protowire.ParseError(n) + } + raw = raw[n:] + } + } + + return dictUnitStr, unitRefs, nil +} + +func unpackPackedSint64(raw []byte) ([]int64, error) { + var values []int64 + for len(raw) > 0 { + value, n := protowire.ConsumeVarint(raw) + if n < 0 { + return nil, protowire.ParseError(n) + } + values = append(values, int64(value>>1)^-int64(value&1)) + raw = raw[n:] + } + return values, nil +} + +func (r *MetricDataReader) unpackTagsetsDict() ([][]string, error) { + packed := r.data.DictTagsets + tagsets := [][]string{nil} + + for len(packed) > 0 { + size := packed[0] + packed = packed[1:] + if size < 0 || size > int64(len(packed)) { + return nil, errUnexpectedEOF + } + tags := make([]string, 0, size) + + idx := int64(0) + for i := int64(0); i < size; i++ { + idx += packed[i] + + if idx < 0 { + if idx <= -math.MaxInt64 || -idx >= int64(len(tagsets)) { + return nil, errBadReference + } + tags = append(tags, tagsets[-idx]...) + } else { + if idx >= int64(len(r.dictTagStr)) { + return nil, errBadReference + } + tags = append(tags, r.dictTagStr[idx]) + } + } + packed = packed[size:] + tagsets = append(tagsets, tags) + } + return tagsets, nil +} + +func (r *MetricDataReader) unpackResourcesDict() ([][]*Resource, error) { + packedLen := r.data.DictResourceLen + packedType := r.data.DictResourceType + packedName := r.data.DictResourceName + resourcesDict := make([][]*Resource, 1, len(packedLen)+1) + + start := int64(0) + for _, size := range packedLen { + if size < 0 { + return nil, errUnexpectedEOF + } + if size > math.MaxInt64-start { + return nil, errOverflow + } + end := start + size + if end > int64(len(packedType)) || end > int64(len(packedName)) { + return nil, errBadReference + } + + typeRef := int64(0) + nameRef := int64(0) + resourcesSet := make([]*Resource, size) + resourcesDict = append(resourcesDict, resourcesSet) + for i := int64(0); i < size; i++ { + typeRef += packedType[start+i] + nameRef += packedName[start+i] + + if typeRef < 0 || typeRef >= int64(len(r.dictResourceStr)) || + nameRef < 0 || nameRef >= int64(len(r.dictResourceStr)) { + return nil, errBadReference + } + + resourcesSet[i] = &Resource{ + Type: r.dictResourceStr[typeRef], + Name: r.dictResourceStr[nameRef], + } + } + start = end + } + + return resourcesDict, nil +} + +func unpackOriginInfoDict(raw []int32) ([]*originInfo, error) { + nelem := len(raw) / 3 + if len(raw) != nelem*3 { + return nil, errUnexpectedEOF + } + dict := make([]*originInfo, 1, nelem+1) + for i := 0; i < len(raw); i += 3 { + dict = append(dict, &originInfo{ + OriginProduct: uint32(raw[i+0]), + OriginCategory: uint32(raw[i+1]), + OriginService: uint32(raw[i+2]), + }) + } + + return dict, nil +} + +// HaveMoreMetrics returns true if there are more metrics to read. +func (r *MetricDataReader) HaveMoreMetrics() bool { + if r.data == nil { + return false + } + return r.metricIdx < len(r.data.Types) +} + +// NextMetric consumes next metric entry and prepares data for access. +// +// If this method returns an error the reader is in an invalid state and calling data access methods may panic. +func (r *MetricDataReader) NextMetric() error { + if !r.HaveMoreMetrics() { + return errUnexpectedEOF + } + + if r.metricIdx > 0 { + for r.HaveMorePoints() { + if err := r.NextPoint(); err != nil { + return err + } + } + } + + r.metricIdx++ + + if r.metricIdx > len(r.data.Types) || + r.metricIdx > len(r.data.NameRefs) || + r.metricIdx > len(r.data.TagsetRefs) || + r.metricIdx > len(r.data.ResourcesRefs) || + r.metricIdx > len(r.data.SourceTypeNameRefs) || + r.metricIdx > len(r.data.OriginInfoRefs) || + r.metricIdx > len(r.data.Intervals) || + r.metricIdx > len(r.data.NumPoints) { + return errUnexpectedEOF + } + + r.pointsRemaining = r.NumPoints() + + r.nameRef += r.data.NameRefs[r.metricIdx-1] + if r.nameRef < 0 || r.nameRef >= int64(len(r.dictNameStr)) { + return errBadReference + } + + r.tagsRef += r.data.TagsetRefs[r.metricIdx-1] + if r.tagsRef < 0 || r.tagsRef >= int64(len(r.dictTagsets)) { + return errBadReference + } + + r.resourcesRef += r.data.ResourcesRefs[r.metricIdx-1] + if r.resourcesRef < 0 || r.resourcesRef >= int64(len(r.dictResources)) { + return errBadReference + } + + r.sourceTypeNameRef += r.data.SourceTypeNameRefs[r.metricIdx-1] + if r.sourceTypeNameRef < 0 || r.sourceTypeNameRef >= int64(len(r.dictSourceTypeName)) { + return errBadReference + } + + r.originInfoRef += r.data.OriginInfoRefs[r.metricIdx-1] + if r.originInfoRef < 0 || r.originInfoRef >= int64(len(r.dictOriginInfo)) { + return errBadReference + } + + r.currentUnitRef = 0 + if r.HasUnit() { + if r.unitRefsIdx >= len(r.unitRefs) { + return errUnexpectedEOF + } + r.unitRef += r.unitRefs[r.unitRefsIdx] + r.unitRefsIdx++ + if r.unitRef < 0 || r.unitRef >= int64(len(r.dictUnitStr)) { + return errBadReference + } + r.currentUnitRef = r.unitRef + } + + return nil +} + +func (r *MetricDataReader) packedType() uint64 { + return r.data.Types[r.metricIdx-1] +} + +// Type returns type of current metric entry. +func (r *MetricDataReader) Type() pb.MetricType { + return pb.MetricType(r.packedType() & 0xF) +} + +func (r *MetricDataReader) ValueType() pb.ValueType { + return pb.ValueType(r.packedType() & 0xF0) +} + +// HasUnit returns whether the current metric entry carries a unit. +func (r *MetricDataReader) HasUnit() bool { + return r.packedType()&flagHasUnit != 0 +} + +// Unit returns unit for the current metric entry. +func (r *MetricDataReader) Unit() string { + return r.dictUnitStr[r.currentUnitRef] +} + +// Name returns metric name of current metric entry. +func (r *MetricDataReader) Name() string { + return r.dictNameStr[r.nameRef] +} + +// Tags returns set of tags for current metric entry. +func (r *MetricDataReader) Tags() []string { + return r.dictTagsets[r.tagsRef] +} + +// Resources returns set of resources for current metric entry. +func (r *MetricDataReader) Resources() []*Resource { + return r.dictResources[r.resourcesRef] +} + +// SourceTypeName returns source type identifier for current metric entry. +func (r *MetricDataReader) SourceTypeName() string { + return r.dictSourceTypeName[r.sourceTypeNameRef] +} + +// Origin returns product origin information for current metric entry. +func (r *MetricDataReader) Origin() *originInfo { + return r.dictOriginInfo[r.originInfoRef] +} + +// Interval returns metric time interval for current metric entry. +func (r *MetricDataReader) Interval() uint64 { + return r.data.Intervals[r.metricIdx-1] +} + +// NumPoints returns number of data points contained in the current metric entry. +func (r *MetricDataReader) NumPoints() uint64 { + return r.data.NumPoints[r.metricIdx-1] +} + +// HaveMorePoints returns true if there are more points to read. +func (r *MetricDataReader) HaveMorePoints() bool { + return r.pointsRemaining > 0 +} + +// NextPoint consumes next urnead metric data point and prepares data for access. +// +// If this method returns an error the reader is in an invalid state and calling data access methods may panic. +func (r *MetricDataReader) NextPoint() error { + if !r.HaveMorePoints() { + return errUnexpectedEOF + } + + r.pointIdx++ + r.pointsRemaining-- + + if r.pointIdx > len(r.data.Timestamps) { + return errUnexpectedEOF + } + + switch r.Type() { + case pb.MetricType_Sketch: + r.sketchNumBinsIdx++ + if r.sketchNumBinsIdx > len(r.data.SketchNumBins) { + return errUnexpectedEOF + } + r.sketchBinsIdx += r.SketchNumBins() + switch r.ValueType() { + case pb.ValueType_Float64: + r.valsFloat64Idx += 3 + r.valsSint64Idx++ + case pb.ValueType_Float32: + r.valsFloat32Idx += 3 + r.valsSint64Idx++ + case pb.ValueType_Sint64: + r.valsSint64Idx += 4 + case pb.ValueType_Zero: + r.valsSint64Idx++ + } + default: + switch r.ValueType() { + case pb.ValueType_Float64: + r.valsFloat64Idx++ + case pb.ValueType_Float32: + r.valsFloat32Idx++ + case pb.ValueType_Sint64: + r.valsSint64Idx++ + } + } + + if r.valsFloat64Idx > len(r.data.ValsFloat64) { + return errUnexpectedEOF + } + if r.valsFloat32Idx > len(r.data.ValsFloat32) { + return errUnexpectedEOF + } + if r.valsSint64Idx > len(r.data.ValsSint64) { + return errUnexpectedEOF + } + if r.sketchBinsIdx > len(r.data.SketchBinKeys) { + return errUnexpectedEOF + } + if r.sketchBinsIdx > len(r.data.SketchBinCnts) { + return errUnexpectedEOF + } + + r.timestamp += r.data.Timestamps[r.pointIdx-1] + + return nil +} + +// Timestamp returns timestamp for current metric data point. +func (r *MetricDataReader) Timestamp() int64 { + return r.timestamp +} + +// Value returns metric value for current metric data point. +// +// Only valid to call if r.Type() != MetricType_Sketch, panics otherwise. +func (r *MetricDataReader) Value() float64 { + if r.Type() == pb.MetricType_Sketch { + panic("invalid type") + } + switch r.ValueType() { + case pb.ValueType_Float64: + return r.data.ValsFloat64[r.valsFloat64Idx-1] + case pb.ValueType_Float32: + return float64(r.data.ValsFloat32[r.valsFloat32Idx-1]) + case pb.ValueType_Sint64: + return float64(r.data.ValsSint64[r.valsSint64Idx-1]) + default: + return 0 + } +} + +// SketchSummary returns sketch summary for current metric data point. +// +// Only valid if r.Type() == MetricType_Sketch, panics otherwise. +func (r *MetricDataReader) SketchSummary() (sum, min, max float64, cnt uint64) { + if r.Type() != pb.MetricType_Sketch { + panic("invalid type") + } + + cnt = uint64(r.data.ValsSint64[r.valsSint64Idx-1]) + + switch r.ValueType() { + case pb.ValueType_Zero: + case pb.ValueType_Sint64: + sum = float64(r.data.ValsSint64[r.valsSint64Idx-4]) + min = float64(r.data.ValsSint64[r.valsSint64Idx-3]) + max = float64(r.data.ValsSint64[r.valsSint64Idx-2]) + // -1 is cnt + case pb.ValueType_Float32: + sum = float64(r.data.ValsFloat32[r.valsFloat32Idx-3]) + min = float64(r.data.ValsFloat32[r.valsFloat32Idx-2]) + max = float64(r.data.ValsFloat32[r.valsFloat32Idx-1]) + case pb.ValueType_Float64: + sum = r.data.ValsFloat64[r.valsFloat64Idx-3] + min = r.data.ValsFloat64[r.valsFloat64Idx-2] + max = r.data.ValsFloat64[r.valsFloat64Idx-1] + } + + return +} + +// SketchNumBins returns number of sketch bins for the current metric data point. +// +// Only valid if r.Type() == MetricType_Sketch, panics otherwise. +func (r *MetricDataReader) SketchNumBins() int { + if r.Type() != pb.MetricType_Sketch { + panic("invalid type") + } + return int(r.data.SketchNumBins[r.sketchNumBinsIdx-1]) +} + +// SketchCols returns sketch data columns for the current metric data +// point. +// +// Only valid if r.Type() == MetricType_Sketch, panics otherwise. +func (r *MetricDataReader) SketchCols() (k []int32, n []uint32) { + if r.Type() != pb.MetricType_Sketch { + panic("invalid type") + } + size := r.SketchNumBins() + start := r.sketchBinsIdx - size + k = slices.Clone(r.data.SketchBinKeys[start:][:size]) + n = slices.Clone(r.data.SketchBinCnts[start:][:size]) + deltaDecode(k) + return +} + +func deltaDecode(s []int32) { + for i := 1; i < len(s); i++ { + s[i] += s[i-1] + } +} diff --git a/test/fakeintake/aggregator/metricAggregatorV3.go b/test/fakeintake/aggregator/metricAggregatorV3.go new file mode 100644 index 000000000000..72e5f79e6c5a --- /dev/null +++ b/test/fakeintake/aggregator/metricAggregatorV3.go @@ -0,0 +1,154 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +package aggregator + +import ( + "bytes" + "fmt" + "time" + + pb "github.com/DataDog/datadog-agent/pkg/proto/pbgo/dogstatsdhttp" + + "github.com/DataDog/datadog-agent/test/fakeintake/aggregator/internal/reader" + "github.com/DataDog/datadog-agent/test/fakeintake/api" +) + +// MetricSeriesV3 represents a single time series decoded from a V3 metrics intake payload +// (/api/intake/metrics/v3/series). The V3 format is a column-oriented protobuf encoding +// described in pkg/proto/datadog/dogstatsdhttp/payload.proto. +type MetricSeriesV3 struct { + Metric string + Tags []string + Resources []Resource + Type pb.MetricType + Unit string + Points []V3Point + collectedTime time.Time +} + +// Resource holds a resource name and type. +type Resource struct { + Name string + Type string +} + +// V3Point is a (timestamp, value) data point from a V3 series payload. +type V3Point struct { + Timestamp int64 + Value float64 +} + +func (m *MetricSeriesV3) name() string { + return m.Metric +} + +// GetTags returns the tags attached to this series. +func (m *MetricSeriesV3) GetTags() []string { + return m.Tags +} + +// GetCollectedTime returns when fakeintake received the payload. +func (m *MetricSeriesV3) GetCollectedTime() time.Time { + return m.collectedTime +} + +// ParseMetricSeriesV3 decodes a /api/intake/metrics/v3/series payload (compressed protobuf) +// into individual MetricSeriesV3 entries, one per (metric name, tagset) pair. +// Sketch entries embedded in the same payload are skipped; they are handled separately. +func ParseMetricSeriesV3(payload api.Payload) ([]*MetricSeriesV3, error) { + if len(payload.Data) == 0 || bytes.Equal(payload.Data, []byte("{}")) { + return []*MetricSeriesV3{}, nil + } + + inflated, err := inflate(payload.Data, payload.Encoding) + if err != nil { + return nil, fmt.Errorf("v3 payload inflate: %w", err) + } + if len(inflated) == 0 || bytes.Equal(inflated, []byte("{}")) { + return []*MetricSeriesV3{}, nil + } + + var p pb.Payload + if err := p.UnmarshalVT(inflated); err != nil { + return nil, fmt.Errorf("v3 payload unmarshal: %w", err) + } + + r := reader.NewMetricDataReader(p.MetricData) + if err := r.Initialize(); err != nil { + return nil, fmt.Errorf("v3 reader init: %w", err) + } + + var series []*MetricSeriesV3 + for r.HaveMoreMetrics() { + if err := r.NextMetric(); err != nil { + return nil, fmt.Errorf("v3 next metric: %w", err) + } + + metricType := r.Type() + if metricType == pb.MetricType_Sketch { + // Drain the sketch's points so the reader advances correctly, + // but don't produce a series entry — sketches have their own aggregator. + for r.HaveMorePoints() { + if err := r.NextPoint(); err != nil { + return nil, fmt.Errorf("v3 next sketch point: %w", err) + } + } + continue + } + + s := &MetricSeriesV3{ + Metric: r.Name(), + Tags: append([]string{}, r.Tags()...), + Resources: cloneResources(r.Resources()), + Type: metricType, + Unit: r.Unit(), + collectedTime: payload.Timestamp, + } + + for r.HaveMorePoints() { + if err := r.NextPoint(); err != nil { + return nil, fmt.Errorf("v3 next point: %w", err) + } + s.Points = append(s.Points, V3Point{ + Timestamp: r.Timestamp(), + Value: r.Value(), + }) + } + + series = append(series, s) + } + + return series, nil +} + +func cloneResources(resources []*reader.Resource) []Resource { + if len(resources) == 0 { + return nil + } + cloned := make([]Resource, 0, len(resources)) + for _, resource := range resources { + if resource == nil { + continue + } + cloned = append(cloned, Resource{ + Name: resource.Name, + Type: resource.Type, + }) + } + return cloned +} + +// MetricAggregatorV3 stores V3 metric series payloads received on /api/intake/metrics/v3/series. +type MetricAggregatorV3 struct { + Aggregator[*MetricSeriesV3] +} + +// NewMetricAggregatorV3 returns a new MetricAggregatorV3. +func NewMetricAggregatorV3() MetricAggregatorV3 { + return MetricAggregatorV3{ + Aggregator: newAggregator(ParseMetricSeriesV3), + } +} diff --git a/test/fakeintake/aggregator/metricAggregatorV3_test.go b/test/fakeintake/aggregator/metricAggregatorV3_test.go new file mode 100644 index 000000000000..0b49848e7a35 --- /dev/null +++ b/test/fakeintake/aggregator/metricAggregatorV3_test.go @@ -0,0 +1,277 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2025-present Datadog, Inc. + +package aggregator + +import ( + "testing" + "time" + + "github.com/DataDog/zstd" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + pb "github.com/DataDog/datadog-agent/pkg/proto/pbgo/dogstatsdhttp" + "google.golang.org/protobuf/encoding/protowire" + + "github.com/DataDog/datadog-agent/test/fakeintake/api" +) + +const ( + testFlagHasUnit = 0x200 + + testFieldDictUnitStr protowire.Number = 25 + testFieldUnitRefs protowire.Number = 26 +) + +// buildMinimalV3Payload constructs a pb.Payload containing a single gauge metric +// "test.gauge" with tag "env:test", timestamp 1000, and value 42.0. +// +// Wire encoding of the column-oriented MetricData: +// - DictNameStr: varint(10) + "test.gauge" +// - DictTagStr: varint(8) + "env:test" +// - DictTagsets: [1, 1] — one tagset of size 1, pointing to dictTagStr[1] +// - Types: [3|48] = [51] — MetricType_Gauge | ValueType_Float64 +// - NameRefs: [1] — delta-encoded index → dictNameStr[1] +// - TagsetRefs: [1] — delta-encoded index → dictTagsets[1] +// - DictResourceStr: varint(4) + "host" + varint(6) + "node-a" +// - DictResourceLen: [1] — one resource set with one resource +// - DictResourceType: [1] — type → dictResourceStr[1] ("host") +// - DictResourceName: [2] — name → dictResourceStr[2] ("node-a") +// - ResourcesRefs: [1] — resource set → dictResources[1] +// - SourceTypeNameRefs: [0] +// - OriginInfoRefs: [0] +// - Intervals: [0] +// - NumPoints: [1] +// - Timestamps: [1000] — delta-encoded +// - ValsFloat64: [42.0] +func buildMinimalV3Payload() *pb.Payload { + nameStr := append([]byte{10}, []byte("test.gauge")...) // varint(10) + "test.gauge" + tagStr := append([]byte{8}, []byte("env:test")...) // varint(8) + "env:test" + resourceStr := append([]byte{4}, []byte("host")...) // varint(4) + "host" + resourceStr = append(resourceStr, byte(6)) + resourceStr = append(resourceStr, []byte("node-a")...) + + return &pb.Payload{ + MetricData: &pb.MetricData{ + DictNameStr: nameStr, + DictTagStr: tagStr, + DictTagsets: []int64{1, 1}, // size=1, ref=1 + DictResourceStr: resourceStr, + DictResourceLen: []int64{1}, + DictResourceType: []int64{1}, + DictResourceName: []int64{2}, + Types: []uint64{uint64(pb.MetricType_Gauge) | uint64(pb.ValueType_Float64)}, + NameRefs: []int64{1}, + TagsetRefs: []int64{1}, + ResourcesRefs: []int64{1}, + SourceTypeNameRefs: []int64{0}, + OriginInfoRefs: []int64{0}, + Intervals: []uint64{0}, + NumPoints: []uint64{1}, + Timestamps: []int64{1000}, + ValsFloat64: []float64{42.0}, + }, + } +} + +func buildV3PayloadWithUnit(unit string) *pb.Payload { + p := buildMinimalV3Payload() + p.MetricData.Types[0] |= testFlagHasUnit + + unitStr := protowire.AppendVarint(nil, uint64(len(unit))) + unitStr = append(unitStr, []byte(unit)...) + var unknown []byte + unknown = appendBytesField(unknown, testFieldDictUnitStr, unitStr) + unknown = appendBytesField(unknown, testFieldUnitRefs, []byte{2}) + p.MetricData.ProtoReflect().SetUnknown(unknown) + + return p +} + +func appendBytesField(dst []byte, field protowire.Number, value []byte) []byte { + dst = protowire.AppendTag(dst, field, protowire.BytesType) + return protowire.AppendBytes(dst, value) +} + +func TestParseMetricSeriesV3_SingleGauge(t *testing.T) { + p := buildMinimalV3Payload() + + raw, err := p.MarshalVT() + require.NoError(t, err) + + payload := api.Payload{ + Data: raw, + Encoding: encodingEmpty, // no compression + ContentType: "application/x-protobuf", + Timestamp: time.Unix(999, 0), + } + + series, err := ParseMetricSeriesV3(payload) + require.NoError(t, err) + require.Len(t, series, 1) + + s := series[0] + assert.Equal(t, "test.gauge", s.Metric) + assert.Equal(t, []string{"env:test"}, s.Tags) + require.Len(t, s.Resources, 1) + assert.Equal(t, "host", s.Resources[0].Type) + assert.Equal(t, "node-a", s.Resources[0].Name) + assert.Equal(t, pb.MetricType_Gauge, s.Type) + assert.Empty(t, s.Unit) + require.Len(t, s.Points, 1) + assert.Equal(t, int64(1000), s.Points[0].Timestamp) + assert.Equal(t, 42.0, s.Points[0].Value) + assert.Equal(t, time.Unix(999, 0), s.GetCollectedTime()) +} + +func TestParseMetricSeriesV3_Unit(t *testing.T) { + p := buildV3PayloadWithUnit("millisecond") + + raw, err := p.MarshalVT() + require.NoError(t, err) + + payload := api.Payload{ + Data: raw, + Encoding: encodingEmpty, // no compression + ContentType: "application/x-protobuf", + Timestamp: time.Unix(999, 0), + } + + series, err := ParseMetricSeriesV3(payload) + require.NoError(t, err) + require.Len(t, series, 1) + assert.Equal(t, "millisecond", series[0].Unit) +} + +func TestParseMetricSeriesV3_CompressedColumnPayload(t *testing.T) { + p := buildMinimalV3Payload() + raw, err := p.MarshalVT() + require.NoError(t, err) + + compressed := buildCompressedV3Payload(t, raw) + inflated, err := inflate(compressed, encodingZstd) + require.NoError(t, err) + require.Equal(t, raw, inflated) + + payload := api.Payload{ + Data: compressed, + Encoding: encodingZstd, + ContentType: "application/x-protobuf", + Timestamp: time.Unix(999, 0), + } + + series, err := ParseMetricSeriesV3(payload) + require.NoError(t, err) + require.Len(t, series, 1) + assert.Equal(t, "test.gauge", series[0].Metric) +} + +func TestParseMetricSeriesV3_EmptyPayload(t *testing.T) { + p := &pb.Payload{MetricData: nil} + raw, err := p.MarshalVT() + require.NoError(t, err) + + payload := api.Payload{Data: raw, Encoding: encodingEmpty, Timestamp: time.Now()} + series, err := ParseMetricSeriesV3(payload) + require.NoError(t, err) + assert.Empty(t, series) +} + +func buildCompressedV3Payload(t *testing.T, raw []byte) []byte { + t.Helper() + + fieldNum, typ, n := protowire.ConsumeTag(raw) + require.Greater(t, n, 0) + require.Equal(t, protowire.Number(3), fieldNum) + require.Equal(t, protowire.BytesType, typ) + metricData, m := protowire.ConsumeBytes(raw[n:]) + require.Greater(t, m, 0) + require.Equal(t, len(raw), n+m) + + out := zstdCompress(t, raw[:len(raw)-len(metricData)]) + for len(metricData) > 0 { + _, fieldType, fieldHeaderLen := protowire.ConsumeTag(metricData) + require.Greater(t, fieldHeaderLen, 0) + require.Equal(t, protowire.BytesType, fieldType) + fieldValue, fieldValueLen := protowire.ConsumeBytes(metricData[fieldHeaderLen:]) + require.Greater(t, fieldValueLen, 0) + headerLen := fieldHeaderLen + fieldValueLen - len(fieldValue) + out = append(out, zstdCompress(t, metricData[:headerLen])...) + out = append(out, zstdCompress(t, fieldValue)...) + metricData = metricData[headerLen+len(fieldValue):] + } + return out +} + +func zstdCompress(t *testing.T, raw []byte) []byte { + t.Helper() + compressed, err := zstd.CompressLevel(nil, raw, 1) + require.NoError(t, err) + return compressed +} + +func TestParseMetricSeriesV3_EmptyBody(t *testing.T) { + for _, payload := range []api.Payload{ + {Data: nil, Encoding: encodingEmpty, Timestamp: time.Now()}, + {Data: []byte("{}"), Encoding: encodingEmpty, Timestamp: time.Now()}, + {Data: nil, Encoding: encodingZstd, Timestamp: time.Now()}, + } { + series, err := ParseMetricSeriesV3(payload) + require.NoError(t, err) + assert.Empty(t, series) + } +} + +func TestParseMetricSeriesV3_SketchSkipped(t *testing.T) { + // A payload with a sketch entry should produce no MetricSeriesV3 items — + // sketches are intentionally drained and excluded on the series endpoint. + nameStr := append([]byte{10}, []byte("test.dist1")...) + p := &pb.Payload{ + MetricData: &pb.MetricData{ + DictNameStr: nameStr, + DictTagStr: nil, + DictTagsets: []int64{0}, + Types: []uint64{uint64(pb.MetricType_Sketch) | uint64(pb.ValueType_Zero)}, + NameRefs: []int64{1}, + TagsetRefs: []int64{0}, + ResourcesRefs: []int64{0}, + SourceTypeNameRefs: []int64{0}, + OriginInfoRefs: []int64{0}, + Intervals: []uint64{0}, + NumPoints: []uint64{1}, + Timestamps: []int64{1000}, + // sketch with zero bins and zero-valued summary + SketchNumBins: []uint64{0}, + ValsSint64: []int64{0}, // cnt + }, + } + + raw, err := p.MarshalVT() + require.NoError(t, err) + + payload := api.Payload{Data: raw, Encoding: encodingEmpty, Timestamp: time.Now()} + series, err := ParseMetricSeriesV3(payload) + require.NoError(t, err) + assert.Empty(t, series, "sketch entries must be skipped on the series endpoint") +} + +func TestParseMetricSeriesV3_Aggregator(t *testing.T) { + // End-to-end: parse through the aggregator so GetPayloadsByName works. + p := buildMinimalV3Payload() + raw, err := p.MarshalVT() + require.NoError(t, err) + + agg := NewMetricAggregatorV3() + err = agg.UnmarshallPayloads([]api.Payload{ + {Data: raw, Encoding: encodingEmpty, Timestamp: time.Now()}, + }) + require.NoError(t, err) + + byName := agg.GetPayloadsByName("test.gauge") + require.Len(t, byName, 1) + assert.Equal(t, "test.gauge", byName[0].Metric) + assert.Equal(t, []string{"env:test"}, byName[0].Tags) +} diff --git a/test/fakeintake/client/client.go b/test/fakeintake/client/client.go index 149a6eed9ad3..6be8d18ee3ab 100644 --- a/test/fakeintake/client/client.go +++ b/test/fakeintake/client/client.go @@ -66,6 +66,7 @@ import ( const ( fakeintakeIDHeader = "Fakeintake-ID" metricsEndpoint = "/api/v2/series" + metricsV3Endpoint = "/api/intake/metrics/v3/series" sketchesEndpoint = "/api/beta/sketches" intakeEndpoint = "/intake/" checkRunsEndpoint = "/api/v1/check_run" @@ -130,6 +131,7 @@ type Client struct { getBackoffDelay time.Duration metricAggregator aggregator.MetricAggregator + metricAggregatorV3 aggregator.MetricAggregatorV3 sketchAggregator aggregator.SketchAggregator checkRunAggregator aggregator.CheckRunAggregator eventAggregator aggregator.EventAggregator @@ -164,6 +166,7 @@ func NewClient(fakeIntakeURL string, opts ...Option) *Client { getBackoffDelay: 5 * time.Second, fakeIntakeURL: strings.TrimSuffix(fakeIntakeURL, "/"), metricAggregator: aggregator.NewMetricAggregator(), + metricAggregatorV3: aggregator.NewMetricAggregatorV3(), sketchAggregator: aggregator.NewSketchAggregator(), checkRunAggregator: aggregator.NewCheckRunAggregator(), eventAggregator: aggregator.NewEventAggregator(), @@ -397,6 +400,24 @@ func (c *Client) FilterSketches(name string, options ...MatchOpt[*aggregator.Ske return filterPayload(c.sketchAggregator.GetPayloadsByName(name), options...) } +// FilterMetricsV3 fetches fakeintake on `/api/intake/metrics/v3/series` and returns +// series matching `name` and any [MatchOpt](#MatchOpt) options. Use this to assert +// that the agent is sending metrics via the V3 intake API. +func (c *Client) FilterMetricsV3(name string, options ...MatchOpt[*aggregator.MetricSeriesV3]) ([]*aggregator.MetricSeriesV3, error) { + if err := c.getMetricsV3(); err != nil { + return nil, err + } + return filterPayload(c.metricAggregatorV3.GetPayloadsByName(name), options...) +} + +func (c *Client) getMetricsV3() error { + payloads, err := c.getFakePayloads(metricsV3Endpoint) + if err != nil { + return err + } + return c.metricAggregatorV3.UnmarshallPayloads(payloads) +} + // FilterCheckRuns fetches fakeintake on `/api/v1/check_run` endpoint and returns // metrics matching `name` and any [MatchOpt](#MatchOpt) options func (c *Client) FilterCheckRuns(name string, options ...MatchOpt[*aggregator.CheckRun]) ([]*aggregator.CheckRun, error) { @@ -710,6 +731,7 @@ func (c *Client) FlushServerAndResetAggregators() error { c.checkRunAggregator.Reset() c.connectionAggregator.Reset() c.metricAggregator.Reset() + c.metricAggregatorV3.Reset() c.sketchAggregator.Reset() c.logAggregator.Reset() c.apmStatsAggregator.Reset() diff --git a/test/fakeintake/go.mod b/test/fakeintake/go.mod index 21597453b038..cf042fab3424 100644 --- a/test/fakeintake/go.mod +++ b/test/fakeintake/go.mod @@ -14,6 +14,7 @@ require ( github.com/benbjohnson/clock v1.3.5 github.com/cenkalti/backoff/v5 v5.0.3 github.com/google/uuid v1.6.0 + github.com/klauspost/compress v1.18.6 github.com/kr/pretty v0.3.1 github.com/prometheus/client_golang v1.23.3-0.20251103151724-a5ae20370e5e github.com/samber/lo v1.52.0 @@ -36,7 +37,6 @@ require ( github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect - github.com/klauspost/compress v1.18.6 // indirect github.com/kr/text v0.2.0 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/philhofer/fwd v1.2.0 // indirect diff --git a/test/fakeintake/server/serverstore/parser.go b/test/fakeintake/server/serverstore/parser.go index bcbd1b86a744..f9f8de4dfb4c 100644 --- a/test/fakeintake/server/serverstore/parser.go +++ b/test/fakeintake/server/serverstore/parser.go @@ -13,12 +13,13 @@ import ( type parserFunc func(api.Payload) (interface{}, error) var parserMap = map[string]parserFunc{ - "/api/v2/logs": getLogPayLoadJSON, - "/api/v2/series": getMetricPayLoadJSON, - "/api/v1/series": getV1MetricPayLoadJSON, - "/api/v1/check_run": getCheckRunPayLoadJSON, - "/api/v1/connections": getConnectionsPayLoadProtobuf, - "/api/beta/sketches": getSketchPayloadProtobuf, + "/api/v2/logs": getLogPayLoadJSON, + "/api/v2/series": getMetricPayLoadJSON, + "/api/v1/series": getV1MetricPayLoadJSON, + "/api/v1/check_run": getCheckRunPayLoadJSON, + "/api/v1/connections": getConnectionsPayLoadProtobuf, + "/api/beta/sketches": getSketchPayloadProtobuf, + "/api/intake/metrics/v3/series": getMetricV3SeriesPayload, } func getLogPayLoadJSON(payload api.Payload) (interface{}, error) { @@ -45,6 +46,10 @@ func getSketchPayloadProtobuf(payload api.Payload) (interface{}, error) { return aggregator.ParseSketches(payload) } +func getMetricV3SeriesPayload(payload api.Payload) (interface{}, error) { + return aggregator.ParseMetricSeriesV3(payload) +} + // IsRouteHandled checks if a route is handled by the Datadog parsed store func IsRouteHandled(route string) bool { _, ok := parserMap[route] From 17fcca804a6969f519df206aba6e0976c70b99f0 Mon Sep 17 00:00:00 2001 From: Mark Kirichenko Date: Thu, 21 May 2026 16:45:01 +0200 Subject: [PATCH 2/3] test(e2e): add DogStatsD V3 metrics test --- .../components/datadog/agentparams/params.go | 35 +++++ .../dogstatsd-unit/dogstatsd_v3_nix_test.go | 144 ++++++++++++++++++ 2 files changed, 179 insertions(+) create mode 100644 test/new-e2e/tests/agent-metric-pipelines/dogstatsd-unit/dogstatsd_v3_nix_test.go diff --git a/test/e2e-framework/components/datadog/agentparams/params.go b/test/e2e-framework/components/datadog/agentparams/params.go index 10c3aa432b91..86d65147da59 100644 --- a/test/e2e-framework/components/datadog/agentparams/params.go +++ b/test/e2e-framework/components/datadog/agentparams/params.go @@ -60,6 +60,12 @@ type Params struct { // parameters like the MSI flags. AdditionalInstallParameters []string SkipAPIKeyInConfig bool + + // intakeScheme, intakeHostname, intakePort are stored by withIntakeHostname so that + // WithV3MetricsEnabled (applied after fakeintake wiring) can inject V3 endpoints config. + intakeScheme pulumi.StringInput + intakeHostname pulumi.StringInput + intakePort pulumi.IntInput } type Option = func(*Params) error @@ -268,6 +274,11 @@ func WithPulumiResourceOptions(resources ...pulumi.ResourceOption) func(*Params) func withIntakeHostname(scheme pulumi.StringInput, hostname pulumi.StringInput, port pulumi.IntInput) func(*Params) error { return func(p *Params) error { + // Store so that WithV3MetricsEnabled (applied after) can build V3 endpoints config. + p.intakeScheme = scheme + p.intakeHostname = hostname + p.intakePort = port + extraConfig := pulumi.Sprintf(`dd_url: %[3]s://%[1]s:%[2]d logs_config.logs_dd_url: %[1]s:%[2]d logs_config.logs_no_ssl: true @@ -338,6 +349,30 @@ func WithFakeintake(fakeintake *fakeintake.Fakeintake) func(*Params) error { } } +// WithV3MetricsEnabled opts the agent into the V3 metrics intake API for its primary fakeintake +// endpoint. It adds serializer_experimental_use_v3_api.series/sketches.endpoints pointing at the +// same URL used for dd_url, so the serializer sends to /api/intake/metrics/v3/series instead of +// /api/v2/series. +// +// Must be called after WithFakeintake (or WithIntakeHostname) so the intake URL is known. +func WithV3MetricsEnabled() func(*Params) error { + return func(p *Params) error { + if p.intakeHostname == nil || p.intakePort == nil || p.intakeScheme == nil { + return fmt.Errorf("WithV3MetricsEnabled must be called after WithFakeintake or WithIntakeHostname") + } + v3Config := pulumi.Sprintf(`serializer_experimental_use_v3_api: + series: + endpoints: + - %[3]s://%[1]s:%[2]d + sketches: + endpoints: + - %[3]s://%[1]s:%[2]d +`, p.intakeHostname, p.intakePort, p.intakeScheme) + p.ExtraAgentConfig = append(p.ExtraAgentConfig, v3Config) + return nil + } +} + // WithLogs enables the log agent func WithLogs() func(*Params) error { return func(p *Params) error { diff --git a/test/new-e2e/tests/agent-metric-pipelines/dogstatsd-unit/dogstatsd_v3_nix_test.go b/test/new-e2e/tests/agent-metric-pipelines/dogstatsd-unit/dogstatsd_v3_nix_test.go new file mode 100644 index 000000000000..082adc96d8a3 --- /dev/null +++ b/test/new-e2e/tests/agent-metric-pipelines/dogstatsd-unit/dogstatsd_v3_nix_test.go @@ -0,0 +1,144 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2025-present Datadog, Inc. + +package dogstatsdunit + +import ( + "fmt" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/DataDog/datadog-agent/test/e2e-framework/components/datadog/agentparams" + scenec2 "github.com/DataDog/datadog-agent/test/e2e-framework/scenarios/aws/ec2" + "github.com/DataDog/datadog-agent/test/e2e-framework/testing/e2e" + "github.com/DataDog/datadog-agent/test/e2e-framework/testing/environments" + awshost "github.com/DataDog/datadog-agent/test/e2e-framework/testing/provisioners/aws/host" + "github.com/DataDog/datadog-agent/test/fakeintake/aggregator" +) + +const ( + v3CountMetric = "e2e.metric.v3.count" + + v3HistogramMetric = "e2e.metric.v3.histogram" + v3HistogramMaxSuffix = ".max" + + v3TimingMetric = "e2e.metric.v3.timing" + v3TimingMaxSuffix = ".max" + + expectedV3TimingUnit = "millisecond" +) + +type dogstatsdV3Suite struct { + e2e.BaseSuite[environments.Host] +} + +// TestDogstatsdV3 verifies that when the V3 metrics intake API is enabled the agent sends +// DogStatsD metrics to /api/intake/metrics/v3/series instead of /api/v2/series. +func TestDogstatsdV3(t *testing.T) { + t.Parallel() + + agentOptions := []agentparams.Option{ + agentparams.WithAgentConfig(` +histogram_aggregates: + - max + - avg + - count +histogram_percentiles: + - "0.95" +`), + agentparams.WithV3MetricsEnabled(), + } + + e2e.Run(t, &dogstatsdV3Suite{}, + e2e.WithProvisioner( + awshost.Provisioner( + awshost.WithRunOptions( + // WithV3MetricsEnabled must be listed after WithFakeintake, which the + // provisioner prepends automatically, so this ordering is correct. + scenec2.WithAgentOptions(agentOptions...), + ), + ), + ), + e2e.WithStackName("dogstatsdv3"), + ) +} + +func (s *dogstatsdV3Suite) sendMetric(name string, value float32, metricType string) { + cmd := fmt.Sprintf(`bash -c 'echo -n "%s:%f|%s" > /dev/udp/127.0.0.1/8125'`, name, value, metricType) + s.Env().RemoteHost.MustExecute(cmd) +} + +// TestMetricsReachV3Endpoint sends multiple DogStatsD metric types and asserts that they +// reach the V3 intake endpoint in fakeintake and do NOT appear on the V2 endpoint. +func (s *dogstatsdV3Suite) TestMetricsReachV3Endpoint() { + require.NoError(s.T(), s.Env().FakeIntake.Client().FlushServerAndResetAggregators()) + + // Keep sending until all metric types appear on the V3 endpoint with the expected units. + require.EventuallyWithT(s.T(), func(c *assert.CollectT) { + var wg sync.WaitGroup + wg.Add(3) + go func() { defer wg.Done(); s.sendMetric(v3CountMetric, 1, "c") }() + go func() { defer wg.Done(); s.sendMetric(v3HistogramMetric, 100, "h") }() + go func() { + defer wg.Done() + s.sendMetric(v3TimingMetric, 100, "ms") + s.sendMetric(v3TimingMetric, 0.2, "ms") + s.sendMetric(v3TimingMetric, 3000, "ms") + }() + wg.Wait() + + v3Count, err := s.Env().FakeIntake.Client().FilterMetricsV3(v3CountMetric) + assert.NoError(c, err) + assertValidV3Metric(c, v3Count, "", func(value float64) bool { + return value > 0 + }, "counter must reach V3 endpoint (/api/intake/metrics/v3/series) without a unit") + + v3Histogram, err := s.Env().FakeIntake.Client().FilterMetricsV3(v3HistogramMetric + v3HistogramMaxSuffix) + assert.NoError(c, err) + assertValidV3Metric(c, v3Histogram, "", func(value float64) bool { + return value == 100 + }, "histogram .max must reach V3 endpoint (/api/intake/metrics/v3/series) without a unit") + + v3Timing, err := s.Env().FakeIntake.Client().FilterMetricsV3(v3TimingMetric + v3TimingMaxSuffix) + assert.NoError(c, err) + assertValidV3Metric(c, v3Timing, expectedV3TimingUnit, func(value float64) bool { + return value == 3000 + }, "timing .max must reach V3 endpoint (/api/intake/metrics/v3/series) with unit millisecond") + }, 2*time.Minute, 5*time.Second, "timed out waiting for all metric types on V3 endpoint") + + // Confirm that nothing leaked to the V2 endpoint for the same metric names. + v2Count, err := s.Env().FakeIntake.Client().FilterMetrics(v3CountMetric) + require.NoError(s.T(), err) + assert.Empty(s.T(), v2Count, "counter must NOT appear on V2 endpoint when V3 is enabled") + + v2Histogram, err := s.Env().FakeIntake.Client().FilterMetrics(v3HistogramMetric + v3HistogramMaxSuffix) + require.NoError(s.T(), err) + assert.Empty(s.T(), v2Histogram, "histogram must NOT appear on V2 endpoint when V3 is enabled") + + v2Timing, err := s.Env().FakeIntake.Client().FilterMetrics(v3TimingMetric + v3TimingMaxSuffix) + require.NoError(s.T(), err) + assert.Empty(s.T(), v2Timing, "timing must NOT appear on V2 endpoint when V3 is enabled") +} + +func assertValidV3Metric(c *assert.CollectT, metrics []*aggregator.MetricSeriesV3, expectedUnit string, valueMatches func(float64) bool, message string) { + if !assert.NotEmpty(c, metrics, message) { + return + } + hasMatchingPoint := false + for _, metric := range metrics { + assert.Equal(c, expectedUnit, metric.Unit, + "metric %q must have unit %q, got %q", metric.Metric, expectedUnit, metric.Unit) + for _, point := range metric.Points { + if valueMatches(point.Value) { + hasMatchingPoint = true + } + } + } + assert.Truef(c, hasMatchingPoint, "expected a matching point in %#v", metrics) +} From 3655bb2f3ec27009f551ac98427579dc5604fc6f Mon Sep 17 00:00:00 2001 From: Mark Kirichenko Date: Thu, 21 May 2026 16:45:30 +0200 Subject: [PATCH 3/3] test(e2e): add DDOT V3 metrics test --- .../datadog/dockeragentparams/params.go | 29 +++++ .../otel/otlp-ingest/otlp_v3_metrics_test.go | 101 ++++++++++++++++++ 2 files changed, 130 insertions(+) create mode 100644 test/new-e2e/tests/otel/otlp-ingest/otlp_v3_metrics_test.go diff --git a/test/e2e-framework/components/datadog/dockeragentparams/params.go b/test/e2e-framework/components/datadog/dockeragentparams/params.go index b5755c9186de..85e38e6c1c7e 100644 --- a/test/e2e-framework/components/datadog/dockeragentparams/params.go +++ b/test/e2e-framework/components/datadog/dockeragentparams/params.go @@ -59,6 +59,10 @@ type Params struct { PulumiDependsOn []pulumi.ResourceOption // FIPS is true if FIPS image is needed. FIPS bool + + // intakeURL is stored by withIntakeHostname so that WithV3MetricsEnabled + // can inject V3 endpoint config after fakeintake wiring. + intakeURL pulumi.StringInput } type Option = func(*Params) error @@ -171,6 +175,7 @@ func WithFakeintake(fakeintake *fakeintake.Fakeintake) func(*Params) error { func withIntakeHostname(url pulumi.StringInput, shouldSkipSSLValidation pulumi.BoolInput) func(*Params) error { return func(p *Params) error { + p.intakeURL = url envVars := pulumi.Map{ "DD_DD_URL": pulumi.Sprintf("%s", url), "DD_PROCESS_CONFIG_PROCESS_DD_URL": pulumi.Sprintf("%s", url), @@ -191,6 +196,30 @@ func withIntakeHostname(url pulumi.StringInput, shouldSkipSSLValidation pulumi.B } } +// WithV3MetricsEnabled opts the Agent into the V3 metrics intake API for its primary +// fakeintake endpoint. It adds serializer_experimental_use_v3_api series/sketches +// endpoints pointing at the same URL used for DD_DD_URL, so the serializer sends +// to /api/intake/metrics/v3/series instead of /api/v2/series. +// +// Must be called after WithFakeintake or WithIntake so the intake URL is known. +func WithV3MetricsEnabled() func(*Params) error { + return func(p *Params) error { + if p.intakeURL == nil { + return fmt.Errorf("WithV3MetricsEnabled must be called after WithFakeintake or WithIntake") + } + envVars := pulumi.Map{ + "DD_SERIALIZER_EXPERIMENTAL_USE_V3_API_SERIES_ENDPOINTS": pulumi.Sprintf("%s", p.intakeURL), + "DD_SERIALIZER_EXPERIMENTAL_USE_V3_API_SKETCHES_ENDPOINTS": pulumi.Sprintf("%s", p.intakeURL), + } + for key, value := range envVars { + if err := WithAgentServiceEnvVariable(key, value)(p); err != nil { + return err + } + } + return nil + } +} + type additionalLogEndpointInput struct { Hostname string `json:"host"` APIKey string `json:"api_key,omitempty"` diff --git a/test/new-e2e/tests/otel/otlp-ingest/otlp_v3_metrics_test.go b/test/new-e2e/tests/otel/otlp-ingest/otlp_v3_metrics_test.go new file mode 100644 index 000000000000..00f096fb590b --- /dev/null +++ b/test/new-e2e/tests/otel/otlp-ingest/otlp_v3_metrics_test.go @@ -0,0 +1,101 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2025-present Datadog, Inc. + +package otlpingest + +import ( + "strings" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/DataDog/datadog-agent/test/e2e-framework/components/datadog/apps" + "github.com/DataDog/datadog-agent/test/e2e-framework/components/datadog/dockeragentparams" + "github.com/DataDog/datadog-agent/test/e2e-framework/scenarios/aws/ec2docker" + "github.com/DataDog/datadog-agent/test/e2e-framework/testing/e2e" + "github.com/DataDog/datadog-agent/test/e2e-framework/testing/environments" + awsdocker "github.com/DataDog/datadog-agent/test/e2e-framework/testing/provisioners/aws/docker" + "github.com/DataDog/datadog-agent/test/fakeintake/aggregator" + fakeintakeclient "github.com/DataDog/datadog-agent/test/fakeintake/client" + "github.com/DataDog/datadog-agent/test/new-e2e/tests/otel/utils" + + "github.com/pulumi/pulumi/sdk/v3/go/pulumi" +) + +const otlpV3MetricName = "calendar-rest-go.api.counter" + +type otlpV3MetricsDockerTestSuite struct { + e2e.BaseSuite[environments.DockerHost] +} + +// TestOTLPV3MetricsDocker verifies that OTLP metrics ingested by the Agent are sent to +// /api/intake/metrics/v3/series instead of /api/v2/series when V3 metrics intake is enabled. +func TestOTLPV3MetricsDocker(t *testing.T) { + t.Parallel() + + e2e.Run(t, + &otlpV3MetricsDockerTestSuite{}, + e2e.WithProvisioner( + awsdocker.Provisioner( + awsdocker.WithRunOptions( + ec2docker.WithAgentOptions( + dockeragentparams.WithLogs(), + dockeragentparams.WithV3MetricsEnabled(), + dockeragentparams.WithAgentServiceEnvVariable("DD_OTLP_CONFIG_RECEIVER_PROTOCOLS_GRPC_ENDPOINT", pulumi.StringPtr("0.0.0.0:4317")), + dockeragentparams.WithAgentServiceEnvVariable("DD_OTLP_CONFIG_RECEIVER_PROTOCOLS_HTTP_ENDPOINT", pulumi.StringPtr("0.0.0.0:4318")), + dockeragentparams.WithAgentServiceEnvVariable("DD_LOGS_ENABLED", pulumi.StringPtr("true")), + dockeragentparams.WithAgentServiceEnvVariable("DD_OTLP_CONFIG_LOGS_ENABLED", pulumi.StringPtr("true")), + dockeragentparams.WithAgentServiceEnvVariable("DD_LOGS_CONFIG_CONTAINER_COLLECT_ALL", pulumi.StringPtr("false")), + dockeragentparams.WithAgentServiceEnvVariable("DD_OTLP_CONFIG_METRICS_RESOURCE_ATTRIBUTES_AS_TAGS", pulumi.StringPtr("true")), + dockeragentparams.WithExtraComposeManifest("calendar-rest-go", pulumi.String(strings.ReplaceAll(otlpIngestCompose, "{APPS_VERSION}", apps.Version))), + ), + ), + ), + ), + e2e.WithStackName("otlpv3metrics"), + ) +} + +func (s *otlpV3MetricsDockerTestSuite) SetupSuite() { + s.BaseSuite.SetupSuite() + defer s.CleanupOnSetupFailure() + + utils.TestCalendarAppDocker(s) +} + +func (s *otlpV3MetricsDockerTestSuite) TestOTLPMetricsReachV3Endpoint() { + require.NoError(s.T(), s.Env().FakeIntake.Client().FlushServerAndResetAggregators()) + + serviceTag := "service:" + utils.CalendarService + + var v3Metrics []*aggregator.MetricSeriesV3 + require.EventuallyWithT(s.T(), func(c *assert.CollectT) { + var err error + v3Metrics, err = s.Env().FakeIntake.Client().FilterMetricsV3( + otlpV3MetricName, + fakeintakeclient.WithTags[*aggregator.MetricSeriesV3]([]string{serviceTag}), + ) + assert.NoError(c, err) + if !assert.NotEmpty(c, v3Metrics, "OTLP counter must reach V3 endpoint (/api/intake/metrics/v3/series)") { + return + } + for _, metric := range v3Metrics { + assert.NotEmpty(c, metric.Points, "V3 series must contain decoded points") + for _, point := range metric.Points { + assert.NotZero(c, point.Timestamp, "V3 points must carry timestamps") + assert.Greater(c, point.Value, 0.0, "OTLP counter points must carry positive values") + } + } + }, 5*time.Minute, 10*time.Second, "timed out waiting for OTLP metrics on V3 endpoint") + + v2Metrics, err := s.Env().FakeIntake.Client().FilterMetrics(otlpV3MetricName) + require.NoError(s.T(), err) + assert.Empty(s.T(), v2Metrics, "OTLP metrics must NOT appear on V2 endpoint (/api/v2/series) when V3 is enabled") + + require.NotEmpty(s.T(), v3Metrics) + assert.Contains(s.T(), v3Metrics[0].Tags, serviceTag, "V3 series must carry service tag") +}