From cd598151dbc4ab30a8d1f51d43f5802e4a1580bb Mon Sep 17 00:00:00 2001 From: Matthew Li Date: Tue, 16 Jun 2026 15:24:41 -0400 Subject: [PATCH 1/8] init --- .../common/metrics/OtlpHistogramBuckets.java | 79 +++++++ .../common/metrics/OtlpStatsMetricWriter.java | 218 ++++++++++++++++++ .../core/otlp/common/OtlpResourceProto.java | 6 + .../otlp/common/OtlpResourceProtoTest.java | 11 + 4 files changed, 314 insertions(+) create mode 100644 dd-trace-core/src/main/java/datadog/trace/common/metrics/OtlpHistogramBuckets.java create mode 100644 dd-trace-core/src/main/java/datadog/trace/common/metrics/OtlpStatsMetricWriter.java diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/OtlpHistogramBuckets.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/OtlpHistogramBuckets.java new file mode 100644 index 00000000000..977bf74ead6 --- /dev/null +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/OtlpHistogramBuckets.java @@ -0,0 +1,79 @@ +package datadog.trace.common.metrics; + +import datadog.metrics.api.Histogram; +import datadog.metrics.api.HistogramWithSum; +import datadog.trace.bootstrap.otlp.metrics.OtlpHistogramPoint; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * Projects a client-side-stats {@link Histogram} (a DDSketch over span durations recorded in + * nanoseconds) onto the fixed explicit-bounds histogram layout mandated by the OTLP Trace + * Metrics Export RFC. + */ +final class OtlpHistogramBuckets { + private OtlpHistogramBuckets() {} + + private static final double NANOS_PER_SECOND = 1_000_000_000d; + + static final double[] BOUNDS_SECONDS = { + 0.002, 0.004, 0.006, 0.008, 0.01, 0.05, 0.1, 0.2, 0.4, 0.8, 1, 1.4, 2, 5, 10, 15 + }; + + static final List EXPLICIT_BOUNDS; + + static { + List bounds = new ArrayList<>(BOUNDS_SECONDS.length + 1); + for (double bound : BOUNDS_SECONDS) { + bounds.add(bound); + } + bounds.add(Double.POSITIVE_INFINITY); + EXPLICIT_BOUNDS = Collections.unmodifiableList(bounds); + } + + static int bucketIndex(double seconds) { + for (int i = 0; i < BOUNDS_SECONDS.length; i++) { + if (seconds <= BOUNDS_SECONDS[i]) { + return i; + } + } + return BOUNDS_SECONDS.length; // overflow + } + + /** + * Re-bins {@code histogram} (nanosecond-valued) into an {@link OtlpHistogramPoint} expressed in + * seconds with OTLP's fixed bucket layout. {@code count}, {@code min}, and {@code max} are taken + * directly from the sketch; {@code sum} is exact when the sketch tracks it ({@link + * HistogramWithSum}) and otherwise best-effort estimated from bin upper bounds. + */ + static OtlpHistogramPoint toHistogramPoint(Histogram histogram) { + long[] bucketCounts = new long[BOUNDS_SECONDS.length + 1]; + + List binBoundaries = histogram.getBinBoundaries(); + List binCounts = histogram.getBinCounts(); + double estimatedSumSeconds = 0d; + for (int i = 0; i < binBoundaries.size(); i++) { + double upperSeconds = binBoundaries.get(i) / NANOS_PER_SECOND; + long count = (long) binCounts.get(i).doubleValue(); + bucketCounts[bucketIndex(upperSeconds)] += count; + estimatedSumSeconds += upperSeconds * count; + } + + List counts = new ArrayList<>(bucketCounts.length); + for (long count : bucketCounts) { + counts.add((double) count); + } + + double sumSeconds = + histogram instanceof HistogramWithSum + ? ((HistogramWithSum) histogram).getSum() / NANOS_PER_SECOND + : estimatedSumSeconds; + + double minSeconds = histogram.isEmpty() ? 0d : histogram.getMinValue() / NANOS_PER_SECOND; + double maxSeconds = histogram.isEmpty() ? 0d : histogram.getMaxValue() / NANOS_PER_SECOND; + + return new OtlpHistogramPoint( + histogram.getCount(), EXPLICIT_BOUNDS, counts, sumSeconds, minSeconds, maxSeconds); + } +} diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/OtlpStatsMetricWriter.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/OtlpStatsMetricWriter.java new file mode 100644 index 00000000000..e31371a6927 --- /dev/null +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/OtlpStatsMetricWriter.java @@ -0,0 +1,218 @@ +package datadog.trace.common.metrics; + +import static datadog.trace.bootstrap.otel.metrics.OtelInstrumentType.HISTOGRAM; +import static datadog.trace.bootstrap.otlp.common.OtlpAttributeVisitor.LONG_ATTRIBUTE; +import static datadog.trace.bootstrap.otlp.common.OtlpAttributeVisitor.STRING_ATTRIBUTE; +import static datadog.trace.core.otlp.common.OtlpCommonProto.I64_WIRE_TYPE; +import static datadog.trace.core.otlp.common.OtlpCommonProto.LEN_WIRE_TYPE; +import static datadog.trace.core.otlp.common.OtlpCommonProto.writeAttribute; +import static datadog.trace.core.otlp.common.OtlpCommonProto.writeI64; +import static datadog.trace.core.otlp.common.OtlpCommonProto.writeTag; +import static datadog.trace.core.otlp.common.OtlpResourceProto.RESOURCE_MESSAGE; +import static datadog.trace.core.otlp.metrics.OtlpMetricsProto.recordDataPointMessage; +import static datadog.trace.core.otlp.metrics.OtlpMetricsProto.recordMetricMessage; +import static datadog.trace.core.otlp.metrics.OtlpMetricsProto.recordScopedMetricsMessage; + +import datadog.communication.serialization.GrowableBuffer; +import datadog.metrics.api.Histogram; +import datadog.trace.api.Config; +import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString; +import datadog.trace.bootstrap.otel.common.OtelInstrumentationScope; +import datadog.trace.bootstrap.otel.metrics.OtelInstrumentDescriptor; +import datadog.trace.bootstrap.otlp.metrics.OtlpHistogramPoint; +import datadog.trace.core.otlp.common.OtlpGrpcSender; +import datadog.trace.core.otlp.common.OtlpHttpSender; +import datadog.trace.core.otlp.common.OtlpProtoBuffer; +import datadog.trace.core.otlp.common.OtlpSender; +import javax.annotation.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link MetricWriter} that exports the existing client-side trace (RED) stats as a single + * vendor-neutral OTLP delta-temporality histogram named {@code traces.span.sdk.metrics.duration} + * (unit {@code s}). + * + *

This is the parallel-to-{@link SerializingMetricWriter} OTLP export path. It hangs off the + * same in-memory aggregation ({@link ConflatingMetricsAggregator} / {@link Aggregator}) and + * consumes the same {@link AggregateEntry} stream; only the wire encoding and transport differ. + * Native msgpack stats and OTLP export are mutually exclusive (selected at the factory). + * + *

Assembly mirrors {@code OtlpMetricsProtoCollector} + */ +public final class OtlpStatsMetricWriter implements MetricWriter { + private static final Logger log = LoggerFactory.getLogger(OtlpStatsMetricWriter.class); + + static final String METRIC_NAME = "traces.span.sdk.metrics.duration"; + static final String METRIC_UNIT = "s"; + + private static final OtelInstrumentDescriptor METRIC_DESCRIPTOR = + new OtelInstrumentDescriptor(METRIC_NAME, HISTOGRAM, false, null, METRIC_UNIT); + private static final OtelInstrumentationScope SCOPE = + new OtelInstrumentationScope("datadog.trace.metrics", null, null); + + private static final int DP_START_TIME_FIELD = 2; + private static final int DP_TIME_FIELD = 3; + private static final int DP_ATTRIBUTES_FIELD = 9; + + private static final String SPAN_NAME = "span.name"; + private static final String SPAN_KIND = "span.kind"; + private static final String HTTP_REQUEST_METHOD = "http.request.method"; + private static final String HTTP_RESPONSE_STATUS_CODE = "http.response.status_code"; + private static final String HTTP_ROUTE = "http.route"; + private static final String RPC_RESPONSE_STATUS_CODE = "rpc.response.status_code"; + private static final String STATUS_CODE = "status.code"; + private static final String STATUS_CODE_ERROR = "ERROR"; + + @Nullable private final OtlpSender sender; + + private final GrowableBuffer buf = new GrowableBuffer(512); + private final OtlpProtoBuffer protobuf = new OtlpProtoBuffer(8192); + + private long startNanos; + private long endNanos; + + private int payloadBytes; + private int scopedBytes; + private int metricBytes; + + public OtlpStatsMetricWriter(Config config) { + this(createSender(config)); + } + + // visible for testing: lets tests inject a capturing sender to decode the emitted protobuf + OtlpStatsMetricWriter(@Nullable OtlpSender sender) { + this.sender = sender; + } + + @Nullable + private static OtlpSender createSender(Config config) { + // mirrors OtlpMetricsService's protocol-based sender selection + switch (config.getOtlpMetricsProtocol()) { + case GRPC: + return new OtlpGrpcSender( + config.getOtlpMetricsEndpoint(), + "/opentelemetry.proto.collector.metrics.v1.MetricsService/Export", + config.getOtlpMetricsHeaders(), + config.getOtlpMetricsTimeout(), + config.getOtlpMetricsCompression()); + case HTTP_PROTOBUF: + return new OtlpHttpSender( + config.getOtlpMetricsEndpoint(), + "/v1/metrics", + config.getOtlpMetricsHeaders(), + config.getOtlpMetricsTimeout(), + config.getOtlpMetricsCompression()); + default: + // HTTP_JSON has no protobuf-free encoder yet; JSON transport is deferred per the plan. + log.debug( + "Unsupported OTLP metrics protocol for trace metrics export: {}", + config.getOtlpMetricsProtocol()); + return null; + } + } + + @Override + public void startBucket(int metricCount, long start, long duration) { + // start/duration arrive as epoch nanos / interval nanos (see Aggregator#report) + this.startNanos = start; + this.endNanos = start + duration; + this.payloadBytes = 0; + this.scopedBytes = 0; + this.metricBytes = 0; + } + + @Override + public void add(AggregateEntry entry) { + Histogram okLatencies = entry.getOkLatencies(); + if (!okLatencies.isEmpty()) { + addDataPoint(entry, okLatencies, false); + } + + Histogram errorLatencies = entry.getErrorLatencies(); + if (errorLatencies != null) { + addDataPoint(entry, errorLatencies, true); + } + } + + private void addDataPoint(AggregateEntry entry, Histogram latencies, boolean error) { + writeDataPointAttributes(entry, error); + writeTag(buf, DP_START_TIME_FIELD, I64_WIRE_TYPE); + writeI64(buf, startNanos); + writeTag(buf, DP_TIME_FIELD, I64_WIRE_TYPE); + writeI64(buf, endNanos); + OtlpHistogramPoint point = OtlpHistogramBuckets.toHistogramPoint(latencies); + metricBytes += recordDataPointMessage(buf, point, protobuf); + } + + private void writeDataPointAttributes(AggregateEntry entry, boolean error) { + // TODO(step 4): branch on isTraceOtelSemanticsEnabled() to add the datadog.* attribute set in + // default mode and to omit it in OTel-semantics mode. The OTel-semconv attributes below are + // emitted in both modes. + if (error) { + writeStringAttribute(STATUS_CODE, STATUS_CODE_ERROR); + } + writeStringAttribute(SPAN_NAME, entry.getResource()); + writeStringAttribute(SPAN_KIND, entry.getSpanKind()); + if (entry.getHttpMethod() != null) { + writeStringAttribute(HTTP_REQUEST_METHOD, entry.getHttpMethod()); + } + if (entry.getHttpStatusCode() != 0) { + writeLongAttribute(HTTP_RESPONSE_STATUS_CODE, entry.getHttpStatusCode()); + } + if (entry.getHttpEndpoint() != null) { + writeStringAttribute(HTTP_ROUTE, entry.getHttpEndpoint()); + } + if (entry.getGrpcStatusCode() != null) { + writeStringAttribute(RPC_RESPONSE_STATUS_CODE, entry.getGrpcStatusCode()); + } + } + + private void writeStringAttribute(String key, @Nullable UTF8BytesString value) { + if (value != null) { + writeStringAttribute(key, value.toString()); + } + } + + private void writeStringAttribute(String key, String value) { + writeTag(buf, DP_ATTRIBUTES_FIELD, LEN_WIRE_TYPE); + writeAttribute(buf, STRING_ATTRIBUTE, key, value); + } + + private void writeLongAttribute(String key, long value) { + writeTag(buf, DP_ATTRIBUTES_FIELD, LEN_WIRE_TYPE); + writeAttribute(buf, LONG_ATTRIBUTE, key, value); + } + + @Override + public void finishBucket() { + try { + if (metricBytes > 0) { + scopedBytes += recordMetricMessage(buf, METRIC_DESCRIPTOR, metricBytes, protobuf); + } + if (scopedBytes > 0) { + payloadBytes += recordScopedMetricsMessage(buf, SCOPE, scopedBytes, protobuf); + } + if (payloadBytes == 0) { + return; + } + payloadBytes += protobuf.recordMessage(RESOURCE_MESSAGE); + protobuf.recordMessage(buf, 1, payloadBytes); + + if (sender != null) { + sender.send(protobuf.toPayload()); + } + } finally { + reset(); + } + } + + @Override + public void reset() { + buf.reset(); + protobuf.reset(); + payloadBytes = 0; + scopedBytes = 0; + metricBytes = 0; + } +} diff --git a/dd-trace-core/src/main/java/datadog/trace/core/otlp/common/OtlpResourceProto.java b/dd-trace-core/src/main/java/datadog/trace/core/otlp/common/OtlpResourceProto.java index 0e45aa22ab7..0a86668c234 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/otlp/common/OtlpResourceProto.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/otlp/common/OtlpResourceProto.java @@ -47,6 +47,12 @@ static byte[] buildResourceMessage(Config config) { if (!version.isEmpty()) { writeResourceAttribute(buf, "service.version", version); } + if (config.isReportHostName()) { + String hostName = config.getHostName(); + if (hostName != null && !hostName.isEmpty()) { + writeResourceAttribute(buf, "host.name", hostName); + } + } writeResourceAttribute(buf, "telemetry.sdk.name", "datadog"); writeResourceAttribute(buf, "telemetry.sdk.version", TRACER_VERSION); writeResourceAttribute(buf, "telemetry.sdk.language", "java"); diff --git a/dd-trace-core/src/test/java/datadog/trace/core/otlp/common/OtlpResourceProtoTest.java b/dd-trace-core/src/test/java/datadog/trace/core/otlp/common/OtlpResourceProtoTest.java index 541f1750554..cb613137bcc 100644 --- a/dd-trace-core/src/test/java/datadog/trace/core/otlp/common/OtlpResourceProtoTest.java +++ b/dd-trace-core/src/test/java/datadog/trace/core/otlp/common/OtlpResourceProtoTest.java @@ -5,6 +5,7 @@ import static datadog.trace.api.config.GeneralConfig.SERVICE_NAME; import static datadog.trace.api.config.GeneralConfig.TAGS; import static datadog.trace.api.config.GeneralConfig.VERSION; +import static datadog.trace.api.config.TracerConfig.TRACE_REPORT_HOSTNAME; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -99,6 +100,16 @@ static Stream resourceMessageCases() { "service.name", "my-service", "region", "us-east", "team", "platform")), + // report-hostname disabled (default): no host.name written + Arguments.of( + "report-hostname disabled", + props(SERVICE_NAME, "my-service"), + attrs("service.name", "my-service")), + // report-hostname enabled: host.name written with the detected hostname + Arguments.of( + "report-hostname enabled", + props(SERVICE_NAME, "my-service", TRACE_REPORT_HOSTNAME, "true"), + attrs("service.name", "my-service", "host.name", Config.get().getHostName())), // all config values set together; telemetry.sdk.* keys in tags must be ignored Arguments.of( "service, env, version, and tags all set", From 3b5b8a82aab317d45af58ff25bd5010ec01a2371 Mon Sep 17 00:00:00 2001 From: Matthew Li Date: Wed, 17 Jun 2026 13:47:29 -0400 Subject: [PATCH 2/8] adding exact sums for ok/error --- .../trace/common/metrics/AggregateEntry.java | 19 +++++++++++++++---- .../common/metrics/OtlpHistogramBuckets.java | 15 ++++----------- .../common/metrics/OtlpStatsMetricWriter.java | 3 ++- 3 files changed, 21 insertions(+), 16 deletions(-) diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateEntry.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateEntry.java index 5bc985491de..f9538bd7005 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateEntry.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateEntry.java @@ -139,7 +139,8 @@ final class AggregateEntry extends Hashtable.Entry { private int errorCount; private int hitCount; private int topLevelCount; - private long duration; + private long okDuration; + private long errorDuration; /** Hot-path constructor for the producer/consumer flow. Builds UTF8 fields via the caches. */ AggregateEntry(SpanSnapshot s, long keyHash) { @@ -174,11 +175,12 @@ void recordOneDuration(long tagAndDuration) { if ((tagAndDuration & ERROR_TAG) == ERROR_TAG) { tagAndDuration ^= ERROR_TAG; errorLatenciesForWrite().accept(tagAndDuration); + errorDuration += tagAndDuration; ++errorCount; } else { okLatencies.accept(tagAndDuration); + okDuration += tagAndDuration; } - duration += tagAndDuration; } int getErrorCount() { @@ -194,7 +196,15 @@ int getTopLevelCount() { } long getDuration() { - return duration; + return okDuration + errorDuration; + } + + long getOkDuration() { + return okDuration; + } + + long getErrorDuration() { + return errorDuration; } Histogram getOkLatencies() { @@ -232,7 +242,8 @@ void clear() { this.errorCount = 0; this.hitCount = 0; this.topLevelCount = 0; - this.duration = 0; + this.okDuration = 0; + this.errorDuration = 0; this.okLatencies.clear(); // errorLatencies stays null on entries that never errored. Only clear if it was allocated. if (this.errorLatencies != null) { diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/OtlpHistogramBuckets.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/OtlpHistogramBuckets.java index 977bf74ead6..a07165484ed 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/OtlpHistogramBuckets.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/OtlpHistogramBuckets.java @@ -1,7 +1,6 @@ package datadog.trace.common.metrics; import datadog.metrics.api.Histogram; -import datadog.metrics.api.HistogramWithSum; import datadog.trace.bootstrap.otlp.metrics.OtlpHistogramPoint; import java.util.ArrayList; import java.util.Collections; @@ -44,20 +43,18 @@ static int bucketIndex(double seconds) { /** * Re-bins {@code histogram} (nanosecond-valued) into an {@link OtlpHistogramPoint} expressed in * seconds with OTLP's fixed bucket layout. {@code count}, {@code min}, and {@code max} are taken - * directly from the sketch; {@code sum} is exact when the sketch tracks it ({@link - * HistogramWithSum}) and otherwise best-effort estimated from bin upper bounds. + * directly from the sketch; {@code sumNanos} is the exact duration sum tracked alongside the + * sketch by {@link AggregateEntry} (the DDSketch-derived sum would only be approximate). */ - static OtlpHistogramPoint toHistogramPoint(Histogram histogram) { + static OtlpHistogramPoint toHistogramPoint(Histogram histogram, long sumNanos) { long[] bucketCounts = new long[BOUNDS_SECONDS.length + 1]; List binBoundaries = histogram.getBinBoundaries(); List binCounts = histogram.getBinCounts(); - double estimatedSumSeconds = 0d; for (int i = 0; i < binBoundaries.size(); i++) { double upperSeconds = binBoundaries.get(i) / NANOS_PER_SECOND; long count = (long) binCounts.get(i).doubleValue(); bucketCounts[bucketIndex(upperSeconds)] += count; - estimatedSumSeconds += upperSeconds * count; } List counts = new ArrayList<>(bucketCounts.length); @@ -65,11 +62,7 @@ static OtlpHistogramPoint toHistogramPoint(Histogram histogram) { counts.add((double) count); } - double sumSeconds = - histogram instanceof HistogramWithSum - ? ((HistogramWithSum) histogram).getSum() / NANOS_PER_SECOND - : estimatedSumSeconds; - + double sumSeconds = sumNanos / NANOS_PER_SECOND; double minSeconds = histogram.isEmpty() ? 0d : histogram.getMinValue() / NANOS_PER_SECOND; double maxSeconds = histogram.isEmpty() ? 0d : histogram.getMaxValue() / NANOS_PER_SECOND; diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/OtlpStatsMetricWriter.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/OtlpStatsMetricWriter.java index e31371a6927..a190c5748c2 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/OtlpStatsMetricWriter.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/OtlpStatsMetricWriter.java @@ -141,7 +141,8 @@ private void addDataPoint(AggregateEntry entry, Histogram latencies, boolean err writeI64(buf, startNanos); writeTag(buf, DP_TIME_FIELD, I64_WIRE_TYPE); writeI64(buf, endNanos); - OtlpHistogramPoint point = OtlpHistogramBuckets.toHistogramPoint(latencies); + long sumNanos = error ? entry.getErrorDuration() : entry.getOkDuration(); + OtlpHistogramPoint point = OtlpHistogramBuckets.toHistogramPoint(latencies, sumNanos); metricBytes += recordDataPointMessage(buf, point, protobuf); } From 6753f801105647c102629fc7234fa9bb185f8a8c Mon Sep 17 00:00:00 2001 From: Matthew Li Date: Wed, 17 Jun 2026 13:56:54 -0400 Subject: [PATCH 3/8] remove host.name changes --- .../trace/core/otlp/common/OtlpResourceProto.java | 6 ------ .../trace/core/otlp/common/OtlpResourceProtoTest.java | 11 ----------- 2 files changed, 17 deletions(-) diff --git a/dd-trace-core/src/main/java/datadog/trace/core/otlp/common/OtlpResourceProto.java b/dd-trace-core/src/main/java/datadog/trace/core/otlp/common/OtlpResourceProto.java index 0a86668c234..0e45aa22ab7 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/otlp/common/OtlpResourceProto.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/otlp/common/OtlpResourceProto.java @@ -47,12 +47,6 @@ static byte[] buildResourceMessage(Config config) { if (!version.isEmpty()) { writeResourceAttribute(buf, "service.version", version); } - if (config.isReportHostName()) { - String hostName = config.getHostName(); - if (hostName != null && !hostName.isEmpty()) { - writeResourceAttribute(buf, "host.name", hostName); - } - } writeResourceAttribute(buf, "telemetry.sdk.name", "datadog"); writeResourceAttribute(buf, "telemetry.sdk.version", TRACER_VERSION); writeResourceAttribute(buf, "telemetry.sdk.language", "java"); diff --git a/dd-trace-core/src/test/java/datadog/trace/core/otlp/common/OtlpResourceProtoTest.java b/dd-trace-core/src/test/java/datadog/trace/core/otlp/common/OtlpResourceProtoTest.java index cb613137bcc..541f1750554 100644 --- a/dd-trace-core/src/test/java/datadog/trace/core/otlp/common/OtlpResourceProtoTest.java +++ b/dd-trace-core/src/test/java/datadog/trace/core/otlp/common/OtlpResourceProtoTest.java @@ -5,7 +5,6 @@ import static datadog.trace.api.config.GeneralConfig.SERVICE_NAME; import static datadog.trace.api.config.GeneralConfig.TAGS; import static datadog.trace.api.config.GeneralConfig.VERSION; -import static datadog.trace.api.config.TracerConfig.TRACE_REPORT_HOSTNAME; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -100,16 +99,6 @@ static Stream resourceMessageCases() { "service.name", "my-service", "region", "us-east", "team", "platform")), - // report-hostname disabled (default): no host.name written - Arguments.of( - "report-hostname disabled", - props(SERVICE_NAME, "my-service"), - attrs("service.name", "my-service")), - // report-hostname enabled: host.name written with the detected hostname - Arguments.of( - "report-hostname enabled", - props(SERVICE_NAME, "my-service", TRACE_REPORT_HOSTNAME, "true"), - attrs("service.name", "my-service", "host.name", Config.get().getHostName())), // all config values set together; telemetry.sdk.* keys in tags must be ignored Arguments.of( "service, env, version, and tags all set", From f08320f8b02f7967ceb5869b3f6e4fdc5873fb7b Mon Sep 17 00:00:00 2001 From: Matthew Li Date: Wed, 17 Jun 2026 14:01:20 -0400 Subject: [PATCH 4/8] init --- .../trace/core/otlp/common/OtlpResourceProto.java | 6 ++++++ .../trace/core/otlp/common/OtlpResourceProtoTest.java | 11 +++++++++++ 2 files changed, 17 insertions(+) diff --git a/dd-trace-core/src/main/java/datadog/trace/core/otlp/common/OtlpResourceProto.java b/dd-trace-core/src/main/java/datadog/trace/core/otlp/common/OtlpResourceProto.java index 0e45aa22ab7..0a86668c234 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/otlp/common/OtlpResourceProto.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/otlp/common/OtlpResourceProto.java @@ -47,6 +47,12 @@ static byte[] buildResourceMessage(Config config) { if (!version.isEmpty()) { writeResourceAttribute(buf, "service.version", version); } + if (config.isReportHostName()) { + String hostName = config.getHostName(); + if (hostName != null && !hostName.isEmpty()) { + writeResourceAttribute(buf, "host.name", hostName); + } + } writeResourceAttribute(buf, "telemetry.sdk.name", "datadog"); writeResourceAttribute(buf, "telemetry.sdk.version", TRACER_VERSION); writeResourceAttribute(buf, "telemetry.sdk.language", "java"); diff --git a/dd-trace-core/src/test/java/datadog/trace/core/otlp/common/OtlpResourceProtoTest.java b/dd-trace-core/src/test/java/datadog/trace/core/otlp/common/OtlpResourceProtoTest.java index 541f1750554..cb613137bcc 100644 --- a/dd-trace-core/src/test/java/datadog/trace/core/otlp/common/OtlpResourceProtoTest.java +++ b/dd-trace-core/src/test/java/datadog/trace/core/otlp/common/OtlpResourceProtoTest.java @@ -5,6 +5,7 @@ import static datadog.trace.api.config.GeneralConfig.SERVICE_NAME; import static datadog.trace.api.config.GeneralConfig.TAGS; import static datadog.trace.api.config.GeneralConfig.VERSION; +import static datadog.trace.api.config.TracerConfig.TRACE_REPORT_HOSTNAME; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -99,6 +100,16 @@ static Stream resourceMessageCases() { "service.name", "my-service", "region", "us-east", "team", "platform")), + // report-hostname disabled (default): no host.name written + Arguments.of( + "report-hostname disabled", + props(SERVICE_NAME, "my-service"), + attrs("service.name", "my-service")), + // report-hostname enabled: host.name written with the detected hostname + Arguments.of( + "report-hostname enabled", + props(SERVICE_NAME, "my-service", TRACE_REPORT_HOSTNAME, "true"), + attrs("service.name", "my-service", "host.name", Config.get().getHostName())), // all config values set together; telemetry.sdk.* keys in tags must be ignored Arguments.of( "service, env, version, and tags all set", From 20af48848ec107411fbf2c7ba8a29495a389c1aa Mon Sep 17 00:00:00 2001 From: Matthew Li Date: Wed, 17 Jun 2026 16:27:52 -0400 Subject: [PATCH 5/8] init tests --- .../metrics/OtlpHistogramBucketsTest.java | 153 +++++ .../metrics/OtlpStatsMetricWriterTest.java | 534 ++++++++++++++++++ 2 files changed, 687 insertions(+) create mode 100644 dd-trace-core/src/test/java/datadog/trace/common/metrics/OtlpHistogramBucketsTest.java create mode 100644 dd-trace-core/src/test/java/datadog/trace/common/metrics/OtlpStatsMetricWriterTest.java diff --git a/dd-trace-core/src/test/java/datadog/trace/common/metrics/OtlpHistogramBucketsTest.java b/dd-trace-core/src/test/java/datadog/trace/common/metrics/OtlpHistogramBucketsTest.java new file mode 100644 index 00000000000..ad0de52e0ec --- /dev/null +++ b/dd-trace-core/src/test/java/datadog/trace/common/metrics/OtlpHistogramBucketsTest.java @@ -0,0 +1,153 @@ +package datadog.trace.common.metrics; + +import static datadog.trace.common.metrics.OtlpHistogramBuckets.BOUNDS_SECONDS; +import static datadog.trace.common.metrics.OtlpHistogramBuckets.EXPLICIT_BOUNDS; +import static datadog.trace.common.metrics.OtlpHistogramBuckets.bucketIndex; +import static datadog.trace.common.metrics.OtlpHistogramBuckets.toHistogramPoint; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import datadog.metrics.api.Histogram; +import datadog.metrics.api.Histograms; +import datadog.metrics.impl.DDSketchHistograms; +import datadog.trace.bootstrap.otlp.metrics.OtlpHistogramPoint; +import java.util.stream.IntStream; +import java.util.stream.Stream; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +/** + * Unit tests for {@link OtlpHistogramBuckets}, the helper that re-bins a nanosecond-valued DDSketch + * onto the fixed OTLP explicit-bounds histogram layout (in seconds). + * + *

Both methods under test ({@code bucketIndex}, {@code toHistogramPoint}) are package-private, + * so this test lives in {@code datadog.trace.common.metrics}. + */ +class OtlpHistogramBucketsTest { + + private static final double NANOS_PER_SECOND = 1_000_000_000d; + + // Tolerance for double assertions on exactly-computed values (e.g. sum from sumNanos, integer + // counts). DDSketch-derived values like min/max use looser tolerances inline, since the sketch + // only preserves values to within its relative accuracy. + private static final double EPS = 1e-9; + + @BeforeAll + static void registerHistogramFactory() { + Histograms.register(DDSketchHistograms.FACTORY); + } + + // ── bucketIndex ────────────────────────────────────────────────────────── + + static Stream boundsWithIndex() { + return IntStream.range(0, BOUNDS_SECONDS.length) + .mapToObj(i -> Arguments.of(i, BOUNDS_SECONDS[i])); + } + + @ParameterizedTest(name = "value exactly on BOUNDS_SECONDS[{0}]={1} returns index {0}") + @MethodSource("boundsWithIndex") + void valueExactlyOnBoundReturnsThatIndex(int index, double bound) { + // <= semantics: a value exactly on a bound falls in that bound's bucket. + assertEquals(index, bucketIndex(bound)); + } + + @Test + void valueJustAboveSmallBoundFallsInNextBucket() { + // 0.002 is the first bound; a value just above it but below the second bound (0.004) + // must roll into the next index. + assertEquals(0, bucketIndex(0.002)); + assertEquals(1, bucketIndex(0.0020001)); + assertEquals(1, bucketIndex(0.004)); + } + + @Test + void valueAboveLargestBoundIsOverflow() { + // > 15s overflows to the final (16th) index. + assertEquals(BOUNDS_SECONDS.length, bucketIndex(15.0000001)); + assertEquals(BOUNDS_SECONDS.length, bucketIndex(100.0)); + // exactly 15 (the largest non-overflow bound) is the last non-overflow index. + assertEquals(BOUNDS_SECONDS.length - 1, bucketIndex(15.0)); + } + + // ── EXPLICIT_BOUNDS layout ──────────────────────────────────────────────── + + @Test + void explicitBoundsHas17EntriesEndingInInfinity() { + assertEquals(17, EXPLICIT_BOUNDS.size()); + assertEquals(Double.POSITIVE_INFINITY, EXPLICIT_BOUNDS.get(EXPLICIT_BOUNDS.size() - 1)); + } + + // ── toHistogramPoint ────────────────────────────────────────────────────── + + @Test + void toHistogramPointSummary() { + Histogram h = Histogram.newHistogram(); + // Samples spanning the layout, including a 1ns duration — the upstream-clamped minimum + // (DDSpan#finishAndAddToTrace does Math.max(1, durationNano)). 1ns is far below the smallest + // bound (0.002s) and must land in bucket 0, not be dropped. 1ms also falls in bucket 0, so the + // two sub-2ms samples share it; 100ms → bucket 6, 3s → bucket 13. + long[] samplesNanos = { + 1L, + (long) (0.001 * NANOS_PER_SECOND), + (long) (0.1 * NANOS_PER_SECOND), + (long) (3.0 * NANOS_PER_SECOND) + }; + for (long s : samplesNanos) { + h.accept(s); + } + + // Use a sumNanos that deliberately differs from the sketch's implied sum to prove the + // returned sum comes from the ARGUMENT, not the sketch. + long sumNanos = 42L * (long) NANOS_PER_SECOND; + OtlpHistogramPoint point = toHistogramPoint(h, sumNanos); + + // 17 bucket counts (the EXPLICIT_BOUNDS layout itself is covered by its own test). + assertEquals(17, point.bucketCounts.size()); + + // total count == number of samples. + assertEquals(samplesNanos.length, (long) point.count); + + // max is the 3s sample (CollapsingLowestDenseStore collapses the LOWEST bins, so the top is + // preserved accurately). The exact 1ns min is NOT recoverable: over this wide value range the + // lowest bins collapse, so getMinValue reports the collapsed bin (sub-2ms here), not 1ns. The + // tiny sample isn't lost though — that's proven by the count and bucket-0 assertions below. + // DDSketch is relative-accuracy, so min/max use loose tolerances. + assertTrue( + point.min > 0 && point.min <= BOUNDS_SECONDS[0], "min collapses into bucket 0 range"); + assertEquals(3.0, point.max, 1e-2); + + // sum equals the sumNanos ARGUMENT converted to seconds, not the sketch sum. + assertEquals(42.0, point.sum, EPS); + + // The two sub-2ms samples (1ns + 1ms) both land in bucket 0; nothing is lost: bucket counts + // sum to the total count. The count==Σbuckets invariant guards against a future histogram + // config that parks tiny values in a zero/negative store (excluded from getBinCounts). + long bucketTotal = 0; + for (int i = 0; i < point.bucketCounts.size(); i++) { + long c = (long) point.bucketCounts.get(i).doubleValue(); + bucketTotal += c; + if (i == 0) { + assertEquals(2L, c, "1ns + 1ms both land in bucket 0 (<= 0.002s)"); + } + } + assertEquals(samplesNanos.length, bucketTotal); + } + + @Test + void emptyHistogramHasZeroMinMaxAndCount() { + Histogram h = Histogram.newHistogram(); + OtlpHistogramPoint point = toHistogramPoint(h, 0L); + + assertEquals(0.0, point.count, EPS); + assertEquals(0.0, point.min, EPS); + assertEquals(0.0, point.max, EPS); + long bucketTotal = 0; + for (double c : point.bucketCounts) { + bucketTotal += (long) c; + } + assertEquals(0L, bucketTotal); + } +} diff --git a/dd-trace-core/src/test/java/datadog/trace/common/metrics/OtlpStatsMetricWriterTest.java b/dd-trace-core/src/test/java/datadog/trace/common/metrics/OtlpStatsMetricWriterTest.java new file mode 100644 index 00000000000..c47973c257f --- /dev/null +++ b/dd-trace-core/src/test/java/datadog/trace/common/metrics/OtlpStatsMetricWriterTest.java @@ -0,0 +1,534 @@ +package datadog.trace.common.metrics; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +import com.google.protobuf.CodedInputStream; +import com.google.protobuf.WireFormat; +import datadog.metrics.api.Histograms; +import datadog.metrics.impl.DDSketchHistograms; +import datadog.trace.core.otlp.common.OtlpPayload; +import datadog.trace.core.otlp.common.OtlpSender; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +/** + * Tests for {@link OtlpStatsMetricWriter}. Drives the writer through {@code startBucket} → {@code + * add} → {@code finishBucket} with a capturing {@link OtlpSender}, then decodes the emitted + * protobuf {@code ExportMetricsServiceRequest} ({@code MetricsData}) using protobuf's {@link + * CodedInputStream}, reusing the decode idioms from {@code OtlpMetricsProtoTest}. + * + *

Lives in {@code datadog.trace.common.metrics} to reach the package-private testing constructor + * {@code OtlpStatsMetricWriter(OtlpSender)} and the {@code AggregateEntryTestUtils} factory. + * + *

Wire layout (OTLP metrics proto): + * + *

+ *   MetricsData        { ResourceMetrics resource_metrics = 1; }
+ *   ResourceMetrics    { Resource resource = 1; ScopeMetrics scope_metrics = 2; }
+ *   ScopeMetrics       { InstrumentationScope scope = 1; Metric metrics = 2; }
+ *   Metric             { string name = 1; string unit = 3; Histogram histogram = 9; }
+ *   Histogram          { HistogramDataPoint data_points = 1; AggregationTemporality = 2; }
+ *   HistogramDataPoint { fixed64 start = 2; fixed64 time = 3; fixed64 count = 4; double sum = 5;
+ *                        fixed64 bucket_counts = 6; double explicit_bounds = 7;
+ *                        KeyValue attributes = 9; double min = 11; double max = 12; }
+ * 
+ */ +class OtlpStatsMetricWriterTest { + + private static final int TEMPORALITY_DELTA = 1; + + @BeforeAll + static void registerHistogramFactory() { + Histograms.register(DDSketchHistograms.FACTORY); + } + + // ── capturing sender ────────────────────────────────────────────────────── + + private static final class CapturingSender implements OtlpSender { + int sendCount; + byte[] lastPayload; + + @Override + public void send(OtlpPayload payload) { + sendCount++; + java.nio.ByteBuffer content = payload.getContent(); + byte[] bytes = new byte[content.remaining()]; + content.get(bytes); + lastPayload = bytes; + } + + @Override + public void shutdown() {} + } + + // ── entry builders ────────────────────────────────────────────────────── + + /** Build an entry and record {@code hits} ok durations of {@code durationNanos} each. */ + private static AggregateEntry okEntry(long durationNanos, int hits) { + AggregateEntry e = + AggregateEntryTestUtils.of( + "GET /users", + "web", + "servlet.request", + null, + "web", + 0, + false, + true, + "server", + null, + null, + null, + null); + for (int i = 0; i < hits; i++) { + e.recordOneDuration(durationNanos); + } + return e; + } + + // ── decode helpers (adapted from OtlpMetricsProtoTest) ────────────────────── + + /** A decoded histogram data point. */ + private static final class DataPoint { + long start; + long end; + long count; + double sum; + double min; + double max; + final List bucketCounts = new ArrayList<>(); + final List bounds = new ArrayList<>(); + final Map attributes = new HashMap<>(); + } + + /** A decoded metric: name, unit, temporality, and its histogram data points. */ + private static final class DecodedMetric { + String name; + String unit; + int temporality = -1; + final List dataPoints = new ArrayList<>(); + } + + /** Decodes a full {@code MetricsData} payload into the single histogram metric it carries. */ + private static DecodedMetric decode(byte[] payload) throws IOException { + CodedInputStream metricsData = CodedInputStream.newInstance(payload); + int metricsTag = metricsData.readTag(); + assertEquals(1, WireFormat.getTagFieldNumber(metricsTag), "MetricsData.resource_metrics = 1"); + CodedInputStream resourceMetrics = metricsData.readBytes().newCodedInput(); + assertTrue(metricsData.isAtEnd(), "expected exactly one ResourceMetrics"); + + DecodedMetric metric = null; + while (!resourceMetrics.isAtEnd()) { + int tag = resourceMetrics.readTag(); + int field = WireFormat.getTagFieldNumber(tag); + if (field == 2) { // ScopeMetrics + metric = parseScopeMetrics(resourceMetrics.readBytes().newCodedInput()); + } else { + resourceMetrics.skipField(tag); // Resource (field 1) etc. + } + } + assertNotNull(metric, "no ScopeMetrics found"); + return metric; + } + + private static DecodedMetric parseScopeMetrics(CodedInputStream scopeMetrics) throws IOException { + DecodedMetric metric = null; + while (!scopeMetrics.isAtEnd()) { + int tag = scopeMetrics.readTag(); + if (WireFormat.getTagFieldNumber(tag) == 2) { // Metric + metric = parseMetric(scopeMetrics.readBytes().newCodedInput()); + } else { + scopeMetrics.skipField(tag); + } + } + return metric; + } + + private static DecodedMetric parseMetric(CodedInputStream m) throws IOException { + DecodedMetric metric = new DecodedMetric(); + while (!m.isAtEnd()) { + int tag = m.readTag(); + switch (WireFormat.getTagFieldNumber(tag)) { + case 1: + metric.name = m.readString(); + break; + case 3: + metric.unit = m.readString(); + break; + case 9: // Histogram + parseHistogram(m.readBytes().newCodedInput(), metric); + break; + default: + m.skipField(tag); + } + } + return metric; + } + + private static void parseHistogram(CodedInputStream h, DecodedMetric metric) throws IOException { + while (!h.isAtEnd()) { + int tag = h.readTag(); + switch (WireFormat.getTagFieldNumber(tag)) { + case 1: // HistogramDataPoint (repeated) + metric.dataPoints.add(parseDataPoint(h.readBytes().newCodedInput())); + break; + case 2: // aggregation_temporality + metric.temporality = h.readEnum(); + break; + default: + h.skipField(tag); + } + } + } + + private static DataPoint parseDataPoint(CodedInputStream dp) throws IOException { + DataPoint p = new DataPoint(); + while (!dp.isAtEnd()) { + int tag = dp.readTag(); + switch (WireFormat.getTagFieldNumber(tag)) { + case 2: + p.start = dp.readFixed64(); + break; + case 3: + p.end = dp.readFixed64(); + break; + case 4: + p.count = dp.readFixed64(); + break; + case 5: + p.sum = dp.readDouble(); + break; + case 6: + p.bucketCounts.add(dp.readFixed64()); + break; + case 7: + p.bounds.add(dp.readDouble()); + break; + case 9: // attributes (KeyValue) + readKeyValue(dp.readBytes().newCodedInput(), p.attributes); + break; + case 11: + p.min = dp.readDouble(); + break; + case 12: + p.max = dp.readDouble(); + break; + default: + dp.skipField(tag); + } + } + return p; + } + + /** + * Reads a {@code KeyValue} into {@code out}: key (field 1) → value. Value is an {@code AnyValue} + * (field 2); we decode string (field 1) and int (field 3) variants used by this writer. + */ + private static void readKeyValue(CodedInputStream kv, Map out) + throws IOException { + String key = null; + Object value = null; + while (!kv.isAtEnd()) { + int tag = kv.readTag(); + switch (WireFormat.getTagFieldNumber(tag)) { + case 1: + key = kv.readString(); + break; + case 2: // AnyValue + value = readAnyValue(kv.readBytes().newCodedInput()); + break; + default: + kv.skipField(tag); + } + } + if (key != null) { + out.put(key, value); + } + } + + private static Object readAnyValue(CodedInputStream any) throws IOException { + Object value = null; + while (!any.isAtEnd()) { + int tag = any.readTag(); + switch (WireFormat.getTagFieldNumber(tag)) { + case 1: // string_value + value = any.readString(); + break; + case 3: // int_value + value = any.readInt64(); + break; + default: + any.skipField(tag); + } + } + return value; + } + + // ── test cases ────────────────────────────────────────────────────────── + + @Test + void okOnlyEntryProducesExactlyOneDataPoint() throws IOException { + CapturingSender sender = new CapturingSender(); + OtlpStatsMetricWriter writer = new OtlpStatsMetricWriter(sender); + + long start = SECONDS.toNanos(1_700_000_000L); + long duration = SECONDS.toNanos(10); + writer.startBucket(1, start, duration); + writer.add(okEntry(SECONDS.toNanos(1), 3)); + writer.finishBucket(); + + assertEquals(1, sender.sendCount, "exactly one payload sent"); + DecodedMetric metric = decode(sender.lastPayload); + + assertEquals("traces.span.sdk.metrics.duration", metric.name); + assertEquals("s", metric.unit); + assertEquals(TEMPORALITY_DELTA, metric.temporality, "histogram must be delta temporality"); + assertEquals(1, metric.dataPoints.size(), "ok-only → one data point"); + + DataPoint dp = metric.dataPoints.get(0); + assertEquals(start, dp.start, "start_time_unix_nano == startBucket start"); + assertEquals(start + duration, dp.end, "time_unix_nano == start + duration"); + assertEquals(3L, dp.count); + assertEquals(3L, sumBuckets(dp), "bucket counts sum to total count"); + assertEquals(17, dp.bucketCounts.size(), "17 OTLP buckets"); + assertFalse(dp.attributes.containsKey("status.code"), "ok point carries no status.code"); + } + + @Test + void okPlusErrorEntryProducesTwoDataPointsWithErrorStatus() throws IOException { + CapturingSender sender = new CapturingSender(); + OtlpStatsMetricWriter writer = new OtlpStatsMetricWriter(sender); + + AggregateEntry e = + AggregateEntryTestUtils.of( + "GET /users", + "web", + "servlet.request", + null, + "web", + 0, + false, + true, + "server", + null, + null, + null, + null); + e.recordOneDuration(SECONDS.toNanos(1)); // ok + e.recordOneDuration(SECONDS.toNanos(2)); // ok + e.recordOneDuration(SECONDS.toNanos(3) | AggregateEntry.ERROR_TAG); // error + + long start = SECONDS.toNanos(1_700_000_000L); + long duration = SECONDS.toNanos(10); + writer.startBucket(1, start, duration); + writer.add(e); + writer.finishBucket(); + + DecodedMetric metric = decode(sender.lastPayload); + assertEquals(2, metric.dataPoints.size(), "ok+error → two data points"); + + long okCount = 0; + long errorCount = 0; + DataPoint errorPoint = null; + DataPoint okPoint = null; + for (DataPoint dp : metric.dataPoints) { + if ("ERROR".equals(dp.attributes.get("status.code"))) { + errorPoint = dp; + errorCount = dp.count; + } else { + okPoint = dp; + okCount = dp.count; + } + } + assertNotNull(errorPoint, "one data point must carry status.code=ERROR"); + assertNotNull(okPoint, "one data point must omit status.code"); + assertEquals(e.getOkLatencies().getCount(), (double) okCount, 1e-9); + assertEquals(e.getErrorLatencies().getCount(), (double) errorCount, 1e-9); + assertEquals(okCount, sumBuckets(okPoint)); + assertEquals(errorCount, sumBuckets(errorPoint)); + } + + @Test + void httpAndGrpcAttributesAppearOnlyWhenSet() throws IOException { + CapturingSender sender = new CapturingSender(); + OtlpStatsMetricWriter writer = new OtlpStatsMetricWriter(sender); + + AggregateEntry e = + AggregateEntryTestUtils.of( + "GET /users/{id}", + "web", + "servlet.request", + null, + "web", + 200, + false, + true, + "server", + null, + "GET", + "/users/{id}", + "0"); + e.recordOneDuration(SECONDS.toNanos(1)); + + long start = SECONDS.toNanos(1_700_000_000L); + writer.startBucket(1, start, SECONDS.toNanos(10)); + writer.add(e); + writer.finishBucket(); + + DecodedMetric metric = decode(sender.lastPayload); + assertEquals(1, metric.dataPoints.size()); + Map attrs = metric.dataPoints.get(0).attributes; + + assertEquals("GET", attrs.get("http.request.method")); + assertEquals(200L, attrs.get("http.response.status_code")); + assertEquals("/users/{id}", attrs.get("http.route")); + assertEquals("0", attrs.get("rpc.response.status_code")); + + // a bare entry has none of these + CapturingSender sender2 = new CapturingSender(); + OtlpStatsMetricWriter writer2 = new OtlpStatsMetricWriter(sender2); + writer2.startBucket(1, start, SECONDS.toNanos(10)); + writer2.add(okEntry(SECONDS.toNanos(1), 1)); + writer2.finishBucket(); + Map bareAttrs = decode(sender2.lastPayload).dataPoints.get(0).attributes; + assertFalse(bareAttrs.containsKey("http.request.method")); + assertFalse(bareAttrs.containsKey("http.response.status_code")); + assertFalse(bareAttrs.containsKey("http.route")); + assertFalse(bareAttrs.containsKey("rpc.response.status_code")); + } + + @Test + void emptyBucketSendsNothing() { + CapturingSender sender = new CapturingSender(); + OtlpStatsMetricWriter writer = new OtlpStatsMetricWriter(sender); + + writer.startBucket(0, SECONDS.toNanos(1_700_000_000L), SECONDS.toNanos(10)); + writer.finishBucket(); // no add() + + assertEquals(0, sender.sendCount, "empty bucket must not invoke send"); + assertNull(sender.lastPayload); + } + + @Test + void nullSenderDoesNotThrowOnNonEmptyBucket() { + // mirrors the HTTP_JSON path where createSender(config) returns null. + OtlpStatsMetricWriter writer = new OtlpStatsMetricWriter((OtlpSender) null); + writer.startBucket(1, SECONDS.toNanos(1_700_000_000L), SECONDS.toNanos(10)); + writer.add(okEntry(SECONDS.toNanos(1), 2)); + try { + writer.finishBucket(); + } catch (Exception ex) { + fail("finishBucket must not throw with a null sender, but threw: " + ex); + } + } + + // ── step 4: attribute modes ────────────────────────────────────────────── + + @Test + void defaultModeCarriesDatadogAttributes() throws IOException { + CapturingSender sender = new CapturingSender(); + // otelSemanticsMode = false → datadog.* should be present + OtlpStatsMetricWriter writer = new OtlpStatsMetricWriter(sender, false); + writer.startBucket(1, SECONDS.toNanos(1_700_000_000L), SECONDS.toNanos(10)); + // use an entry where all hits are top-level: OR in TOP_LEVEL_TAG + AggregateEntry e = + AggregateEntryTestUtils.of( + "servlet.request", + "web", + "servlet.request", + null, + "web", + 0, + false, + true, + "server", + null, + null, + null, + null); + e.recordOneDuration(SECONDS.toNanos(1) | AggregateEntry.TOP_LEVEL_TAG); + writer.add(e); + writer.finishBucket(); + + Map attrs = decode(sender.lastPayload).dataPoints.get(0).attributes; + assertTrue( + attrs.containsKey("datadog.operation.name"), "operation name present in default mode"); + assertTrue(attrs.containsKey("datadog.span.type"), "span type present in default mode"); + assertTrue( + attrs.containsKey("datadog.span.top_level"), "span top-level present in default mode"); + assertEquals(1L, attrs.get("datadog.span.top_level"), "all hits top-level → 1"); + // OTel-semconv attrs are present in both modes + assertTrue(attrs.containsKey("span.name"), "span.name present in both modes"); + assertFalse(attrs.containsKey("datadog.origin"), "non-synthetic entry has no datadog.origin"); + } + + @Test + void defaultModeCarriesSyntheticsOrigin() throws IOException { + CapturingSender sender = new CapturingSender(); + OtlpStatsMetricWriter writer = new OtlpStatsMetricWriter(sender, false); + writer.startBucket(1, SECONDS.toNanos(1_700_000_000L), SECONDS.toNanos(10)); + AggregateEntry e = + AggregateEntryTestUtils.of( + "servlet.request", + "web", + "servlet.request", + null, + "web", + 0, + true, // synthetic=true + true, + "server", + null, + null, + null, + null); + e.recordOneDuration(SECONDS.toNanos(1)); + writer.add(e); + writer.finishBucket(); + + Map attrs = decode(sender.lastPayload).dataPoints.get(0).attributes; + assertEquals( + "synthetics", attrs.get("datadog.origin"), "synthetic entry carries datadog.origin"); + } + + @Test + void otelSemanticsModeOmitsDatadogAttributes() throws IOException { + CapturingSender sender = new CapturingSender(); + // otelSemanticsMode = true → datadog.* must be absent + OtlpStatsMetricWriter writer = new OtlpStatsMetricWriter(sender, true); + writer.startBucket(1, SECONDS.toNanos(1_700_000_000L), SECONDS.toNanos(10)); + writer.add(okEntry(SECONDS.toNanos(1), 1)); + writer.finishBucket(); + + Map attrs = decode(sender.lastPayload).dataPoints.get(0).attributes; + assertFalse( + attrs.containsKey("datadog.operation.name"), + "operation name absent in otel-semantics mode"); + assertFalse(attrs.containsKey("datadog.span.type"), "span type absent in otel-semantics mode"); + assertFalse( + attrs.containsKey("datadog.span.top_level"), + "span top-level absent in otel-semantics mode"); + assertFalse(attrs.containsKey("datadog.origin"), "origin absent in otel-semantics mode"); + // OTel-semconv attrs must still be present + assertTrue(attrs.containsKey("span.name"), "span.name present even in otel-semantics mode"); + } + + private static long sumBuckets(DataPoint dp) { + long total = 0; + for (long c : dp.bucketCounts) { + total += c; + } + return total; + } +} From df548ae1e588ce18badb8d050a1ed3652ca52cf1 Mon Sep 17 00:00:00 2001 From: Matthew Li Date: Thu, 18 Jun 2026 13:11:02 -0400 Subject: [PATCH 6/8] adding datadog-specific datapoint attribtues --- .../trace/common/metrics/AggregateEntry.java | 33 +++++++++++++--- .../metrics/ConflatingMetricsAggregator.java | 8 +--- .../common/metrics/OtlpStatsMetricWriter.java | 27 +++++++++++-- .../trace/common/metrics/SpanSnapshot.java | 15 ++++++-- .../common/metrics/AggregateEntryTest.java | 4 +- .../metrics/AggregateEntryTestUtils.java | 38 ++++++++++++++++++- .../common/metrics/AggregateTableTest.java | 6 +-- .../metrics/OtlpStatsMetricWriterTest.java | 17 +++++++++ 8 files changed, 121 insertions(+), 27 deletions(-) diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateEntry.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateEntry.java index f9538bd7005..912fcf84a1f 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateEntry.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateEntry.java @@ -80,6 +80,9 @@ final class AggregateEntry extends Hashtable.Entry { DDCaches.newFixedSizeCache(32); private static final DDCache GRPC_STATUS_CODE_CACHE = DDCaches.newFixedSizeCache(32); + // Origin is a small fixed vocabulary (synthetics, synthetics-browser, rum, ciapp-test, lambda). + private static final DDCache ORIGIN_CACHE = + DDCaches.newFixedSizeCache(8); /** * Outer cache keyed by peer-tag name, with an inner per-name cache keyed by value. The inner @@ -108,8 +111,13 @@ final class AggregateEntry extends Hashtable.Entry { @Nullable private final UTF8BytesString grpcStatusCode; private final short httpStatusCode; - /** Whether the root span carried the {@code synthetics} origin tag (synthetic-monitoring run). */ - private final boolean synthetic; + /** + * Trace origin (e.g. {@code synthetics}, {@code rum}, {@code ciapp-test}, {@code lambda}), or + * {@code null} when the root span carried no origin. Part of the bucket key, so spans with + * distinct origins aggregate separately. The OTLP export emits this as {@code datadog.origin}; + * the native msgpack path reads {@link #isSynthetics()}, derived from it. + */ + @Nullable private final UTF8BytesString origin; /** Whether this span is the trace root ({@code parentId == 0}). */ private final boolean traceRoot; @@ -155,7 +163,7 @@ final class AggregateEntry extends Hashtable.Entry { this.httpEndpoint = canonicalizeOptional(HTTP_ENDPOINT_CACHE, s.httpEndpoint); this.grpcStatusCode = canonicalizeOptional(GRPC_STATUS_CODE_CACHE, s.grpcStatusCode); this.httpStatusCode = s.httpStatusCode; - this.synthetic = s.synthetic; + this.origin = canonicalizeOptional(ORIGIN_CACHE, s.origin); this.traceRoot = s.traceRoot; this.peerTagNames = s.peerTagSchema == null ? null : s.peerTagSchema.names; this.peerTagValues = s.peerTagValues; @@ -254,7 +262,7 @@ void clear() { boolean matches(SpanSnapshot s) { String[] snapshotNames = s.peerTagSchema == null ? null : s.peerTagSchema.names; return httpStatusCode == s.httpStatusCode - && synthetic == s.synthetic + && contentEquals(origin, s.origin) && traceRoot == s.traceRoot && contentEquals(resource, s.resourceName) && contentEquals(service, s.serviceName) @@ -295,7 +303,7 @@ static long hashOf(SpanSnapshot s) { h = LongHashingUtils.addToHash(h, s.serviceNameSource); h = LongHashingUtils.addToHash(h, s.spanType); h = LongHashingUtils.addToHash(h, s.httpStatusCode); - h = LongHashingUtils.addToHash(h, s.synthetic); + h = LongHashingUtils.addToHash(h, s.origin); h = LongHashingUtils.addToHash(h, s.traceRoot); h = LongHashingUtils.addToHash(h, s.spanKind); // Always mix in both the schema's content hash and the values' content hash, unconditionally @@ -363,8 +371,21 @@ int getHttpStatusCode() { return httpStatusCode; } + /** + * The full trace origin, or {@code null} when unset. Used by {@link OtlpStatsMetricWriter} to + * emit {@code datadog.origin}. + */ + @Nullable + UTF8BytesString getOrigin() { + return origin; + } + + /** + * Whether the origin is {@code synthetics}. Derived from {@link #origin} for the native msgpack + * writer, which emits a synthetics boolean rather than the full origin. + */ boolean isSynthetics() { - return synthetic; + return origin != null && "synthetics".contentEquals(origin); } boolean isTraceRoot() { diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java index 895ee434854..594d3e6c4c0 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java @@ -47,8 +47,6 @@ public final class ConflatingMetricsAggregator implements MetricsAggregator, Eve private static final Map DEFAULT_HEADERS = Collections.singletonMap(DDAgentApi.DATADOG_META_TRACER_VERSION, DDTraceCoreInfo.VERSION); - private static final CharSequence SYNTHETICS_ORIGIN = "synthetics"; - private static final SpanKindFilter METRICS_ELIGIBLE_KINDS = SpanKindFilter.builder() .includeServer() @@ -346,7 +344,7 @@ private boolean publish(CoreSpan span, boolean isTopLevel) { span.getServiceNameSource(), spanType, span.getHttpStatusCode(), - isSynthetic(span), + span.getOrigin(), span.getParentId() == 0, spanKind, peerTagSchema, @@ -466,10 +464,6 @@ private static String[] capturePeerTagValues(CoreSpan span, PeerTagSchema sch return values; } - private static boolean isSynthetic(CoreSpan span) { - return span.getOrigin() != null && SYNTHETICS_ORIGIN.equals(span.getOrigin().toString()); - } - public void stop() { if (null != cancellation) { cancellation.cancel(); diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/OtlpStatsMetricWriter.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/OtlpStatsMetricWriter.java index a190c5748c2..7103c3d7092 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/OtlpStatsMetricWriter.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/OtlpStatsMetricWriter.java @@ -63,8 +63,13 @@ public final class OtlpStatsMetricWriter implements MetricWriter { private static final String RPC_RESPONSE_STATUS_CODE = "rpc.response.status_code"; private static final String STATUS_CODE = "status.code"; private static final String STATUS_CODE_ERROR = "ERROR"; + private static final String DATADOG_OPERATION_NAME = "datadog.operation.name"; + private static final String DATADOG_SPAN_TYPE = "datadog.span.type"; + private static final String DATADOG_SPAN_TOP_LEVEL = "datadog.span.top_level"; + private static final String DATADOG_ORIGIN = "datadog.origin"; @Nullable private final OtlpSender sender; + private final boolean otelSemanticsMode; private final GrowableBuffer buf = new GrowableBuffer(512); private final OtlpProtoBuffer protobuf = new OtlpProtoBuffer(8192); @@ -77,12 +82,18 @@ public final class OtlpStatsMetricWriter implements MetricWriter { private int metricBytes; public OtlpStatsMetricWriter(Config config) { - this(createSender(config)); + this(createSender(config), config.isTraceOtelSemanticsEnabled()); } // visible for testing: lets tests inject a capturing sender to decode the emitted protobuf OtlpStatsMetricWriter(@Nullable OtlpSender sender) { + this(sender, false); + } + + // visible for testing: lets tests inject a capturing sender and control the semantics mode + OtlpStatsMetricWriter(@Nullable OtlpSender sender, boolean otelSemanticsMode) { this.sender = sender; + this.otelSemanticsMode = otelSemanticsMode; } @Nullable @@ -147,12 +158,10 @@ private void addDataPoint(AggregateEntry entry, Histogram latencies, boolean err } private void writeDataPointAttributes(AggregateEntry entry, boolean error) { - // TODO(step 4): branch on isTraceOtelSemanticsEnabled() to add the datadog.* attribute set in - // default mode and to omit it in OTel-semantics mode. The OTel-semconv attributes below are - // emitted in both modes. if (error) { writeStringAttribute(STATUS_CODE, STATUS_CODE_ERROR); } + // OTel semconv attrs are emitted in both modes writeStringAttribute(SPAN_NAME, entry.getResource()); writeStringAttribute(SPAN_KIND, entry.getSpanKind()); if (entry.getHttpMethod() != null) { @@ -167,6 +176,16 @@ private void writeDataPointAttributes(AggregateEntry entry, boolean error) { if (entry.getGrpcStatusCode() != null) { writeStringAttribute(RPC_RESPONSE_STATUS_CODE, entry.getGrpcStatusCode()); } + // Default (Datadog) mode: emit datadog.* per-point attributes + if (!otelSemanticsMode) { + writeStringAttribute(DATADOG_OPERATION_NAME, entry.getOperationName()); + writeStringAttribute(DATADOG_SPAN_TYPE, entry.getType()); + writeLongAttribute( + DATADOG_SPAN_TOP_LEVEL, entry.getTopLevelCount() == entry.getHitCount() ? 1L : 0L); + // Emit the full origin (synthetics, synthetics-browser, rum, ciapp-test, lambda, ...) when + // present; writeStringAttribute no-ops on a null value. + writeStringAttribute(DATADOG_ORIGIN, entry.getOrigin()); + } } private void writeStringAttribute(String key, @Nullable UTF8BytesString value) { diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/SpanSnapshot.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/SpanSnapshot.java index 152ac42bb55..a462e5968e9 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/SpanSnapshot.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/SpanSnapshot.java @@ -16,7 +16,16 @@ final class SpanSnapshot implements InboxItem { final CharSequence serviceNameSource; final CharSequence spanType; final short httpStatusCode; - final boolean synthetic; + + /** + * Trace origin (e.g. {@code synthetics}, {@code synthetics-browser}, {@code rum}, {@code + * ciapp-test}, {@code lambda}), or {@code null} when the root span carried no origin. Captured in + * full -- rather than collapsed to a synthetics flag -- so the OTLP export can emit {@code + * datadog.origin} with the recognized value; the native msgpack path derives its synthetics + * boolean from it via {@link AggregateEntry#isSynthetics()}. + */ + final CharSequence origin; + final boolean traceRoot; final String spanKind; @@ -48,7 +57,7 @@ final class SpanSnapshot implements InboxItem { CharSequence serviceNameSource, CharSequence spanType, short httpStatusCode, - boolean synthetic, + CharSequence origin, boolean traceRoot, String spanKind, PeerTagSchema peerTagSchema, @@ -63,7 +72,7 @@ final class SpanSnapshot implements InboxItem { this.serviceNameSource = serviceNameSource; this.spanType = spanType; this.httpStatusCode = httpStatusCode; - this.synthetic = synthetic; + this.origin = origin; this.traceRoot = traceRoot; this.spanKind = spanKind; this.peerTagSchema = peerTagSchema; diff --git a/dd-trace-core/src/test/java/datadog/trace/common/metrics/AggregateEntryTest.java b/dd-trace-core/src/test/java/datadog/trace/common/metrics/AggregateEntryTest.java index 7fd767533c7..02693ee0edd 100644 --- a/dd-trace-core/src/test/java/datadog/trace/common/metrics/AggregateEntryTest.java +++ b/dd-trace-core/src/test/java/datadog/trace/common/metrics/AggregateEntryTest.java @@ -131,7 +131,7 @@ private static SpanSnapshot snapshotWithPeerTags(String[] names, String[] values null, "type", (short) 200, - false, + null, true, "client", PeerTagSchema.testSchema(names), @@ -151,7 +151,7 @@ private static AggregateEntry newEntry() { null, "type", (short) 200, - false, + null, true, "client", null, diff --git a/dd-trace-core/src/test/java/datadog/trace/common/metrics/AggregateEntryTestUtils.java b/dd-trace-core/src/test/java/datadog/trace/common/metrics/AggregateEntryTestUtils.java index ed6fd5a3a7e..a36ca8b907f 100644 --- a/dd-trace-core/src/test/java/datadog/trace/common/metrics/AggregateEntryTestUtils.java +++ b/dd-trace-core/src/test/java/datadog/trace/common/metrics/AggregateEntryTestUtils.java @@ -76,7 +76,9 @@ public static AggregateEntry of( serviceSource, type, (short) httpStatusCode, - synthetic, + // The legacy boolean maps onto the full origin field: true => "synthetics", false => + // no origin. Tests needing a non-synthetics origin use ofOrigin(...). + synthetic ? "synthetics" : null, traceRoot, spanKind == null ? null : spanKind.toString(), schema, @@ -88,6 +90,38 @@ public static AggregateEntry of( return forSnapshot(syntheticSnapshot); } + /** + * Builds a minimal {@link AggregateEntry} carrying an explicit trace {@code origin} (e.g. {@code + * rum}, {@code ciapp-test}, {@code lambda}). A trace-root server entry with no HTTP/RPC/peer-tag + * fields; durations are recorded by the caller. + */ + public static AggregateEntry ofOrigin( + CharSequence resource, + CharSequence service, + CharSequence operationName, + CharSequence type, + CharSequence spanKind, + @Nullable CharSequence origin) { + SpanSnapshot snapshot = + new SpanSnapshot( + resource, + service == null ? null : service.toString(), + operationName, + null, + type, + (short) 0, + origin, + true, + spanKind == null ? null : spanKind.toString(), + null, + null, + null, + null, + null, + 0L); + return forSnapshot(snapshot); + } + /** * Builds an {@link AggregateEntry} from {@code s} by computing its lookup hash via {@link * AggregateEntry#hashOf(SpanSnapshot)} and calling the package-private constructor directly. @@ -106,7 +140,7 @@ public static boolean equals(AggregateEntry a, AggregateEntry b) { if (a == b) return true; if (a == null || b == null) return false; return a.getHttpStatusCode() == b.getHttpStatusCode() - && a.isSynthetics() == b.isSynthetics() + && Objects.equals(a.getOrigin(), b.getOrigin()) && a.isTraceRoot() == b.isTraceRoot() && Objects.equals(a.getResource(), b.getResource()) && Objects.equals(a.getService(), b.getService()) diff --git a/dd-trace-core/src/test/java/datadog/trace/common/metrics/AggregateTableTest.java b/dd-trace-core/src/test/java/datadog/trace/common/metrics/AggregateTableTest.java index 618ead2ab43..c3463337079 100644 --- a/dd-trace-core/src/test/java/datadog/trace/common/metrics/AggregateTableTest.java +++ b/dd-trace-core/src/test/java/datadog/trace/common/metrics/AggregateTableTest.java @@ -274,7 +274,7 @@ private static SpanSnapshot nullServiceKindSnapshot(String service, String spanK null, "web", (short) 200, - false, + null, true, spanKind, null, @@ -294,7 +294,7 @@ private static SpanSnapshot nullableSnapshot( serviceNameSource, type, (short) 200, - false, + null, true, "client", null, @@ -350,7 +350,7 @@ SpanSnapshot build() { null, "web", (short) 200, - false, + null, true, spanKind, peerTagSchema, diff --git a/dd-trace-core/src/test/java/datadog/trace/common/metrics/OtlpStatsMetricWriterTest.java b/dd-trace-core/src/test/java/datadog/trace/common/metrics/OtlpStatsMetricWriterTest.java index c47973c257f..cffa781cc15 100644 --- a/dd-trace-core/src/test/java/datadog/trace/common/metrics/OtlpStatsMetricWriterTest.java +++ b/dd-trace-core/src/test/java/datadog/trace/common/metrics/OtlpStatsMetricWriterTest.java @@ -524,6 +524,23 @@ void otelSemanticsModeOmitsDatadogAttributes() throws IOException { assertTrue(attrs.containsKey("span.name"), "span.name present even in otel-semantics mode"); } + @Test + void defaultModePassesFullOriginThrough() throws IOException { + // A non-synthetics origin must be emitted verbatim, not collapsed to a synthetics flag. + CapturingSender sender = new CapturingSender(); + OtlpStatsMetricWriter writer = new OtlpStatsMetricWriter(sender, false); + writer.startBucket(1, SECONDS.toNanos(1_700_000_000L), SECONDS.toNanos(10)); + AggregateEntry e = + AggregateEntryTestUtils.ofOrigin( + "servlet.request", "web", "servlet.request", "web", "server", "ciapp-test"); + e.recordOneDuration(SECONDS.toNanos(1)); + writer.add(e); + writer.finishBucket(); + + Map attrs = decode(sender.lastPayload).dataPoints.get(0).attributes; + assertEquals("ciapp-test", attrs.get("datadog.origin"), "full origin emitted verbatim"); + } + private static long sumBuckets(DataPoint dp) { long total = 0; for (long c : dp.bucketCounts) { From 27dde3c58ea1e843e14811a4a4712062e782193c Mon Sep 17 00:00:00 2001 From: Matthew Li Date: Thu, 18 Jun 2026 14:07:28 -0400 Subject: [PATCH 7/8] wiring datadog resource attributes into metric writer --- .../common/metrics/OtlpStatsMetricWriter.java | 15 +++- .../core/otlp/common/OtlpResourceProto.java | 40 +++++++++++ .../metrics/OtlpStatsMetricWriterTest.java | 69 +++++++++++++++++++ .../otlp/common/OtlpResourceProtoTest.java | 26 +++++++ 4 files changed, 148 insertions(+), 2 deletions(-) diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/OtlpStatsMetricWriter.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/OtlpStatsMetricWriter.java index 7103c3d7092..78861930daf 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/OtlpStatsMetricWriter.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/OtlpStatsMetricWriter.java @@ -8,7 +8,6 @@ import static datadog.trace.core.otlp.common.OtlpCommonProto.writeAttribute; import static datadog.trace.core.otlp.common.OtlpCommonProto.writeI64; import static datadog.trace.core.otlp.common.OtlpCommonProto.writeTag; -import static datadog.trace.core.otlp.common.OtlpResourceProto.RESOURCE_MESSAGE; import static datadog.trace.core.otlp.metrics.OtlpMetricsProto.recordDataPointMessage; import static datadog.trace.core.otlp.metrics.OtlpMetricsProto.recordMetricMessage; import static datadog.trace.core.otlp.metrics.OtlpMetricsProto.recordScopedMetricsMessage; @@ -23,6 +22,7 @@ import datadog.trace.core.otlp.common.OtlpGrpcSender; import datadog.trace.core.otlp.common.OtlpHttpSender; import datadog.trace.core.otlp.common.OtlpProtoBuffer; +import datadog.trace.core.otlp.common.OtlpResourceProto; import datadog.trace.core.otlp.common.OtlpSender; import javax.annotation.Nullable; import org.slf4j.Logger; @@ -71,6 +71,13 @@ public final class OtlpStatsMetricWriter implements MetricWriter { @Nullable private final OtlpSender sender; private final boolean otelSemanticsMode; + /** + * Resource attribute blob prepended to every payload. In default mode it carries the {@code + * datadog.runtime_id} and process-tag resource attributes; in OTel-semantics mode it is the plain + * vendor-neutral resource (no {@code datadog.*}). + */ + private final byte[] resourceMessage; + private final GrowableBuffer buf = new GrowableBuffer(512); private final OtlpProtoBuffer protobuf = new OtlpProtoBuffer(8192); @@ -94,6 +101,10 @@ public OtlpStatsMetricWriter(Config config) { OtlpStatsMetricWriter(@Nullable OtlpSender sender, boolean otelSemanticsMode) { this.sender = sender; this.otelSemanticsMode = otelSemanticsMode; + this.resourceMessage = + otelSemanticsMode + ? OtlpResourceProto.RESOURCE_MESSAGE + : OtlpResourceProto.RESOURCE_MESSAGE_WITH_DATADOG_ATTRS; } @Nullable @@ -216,7 +227,7 @@ public void finishBucket() { if (payloadBytes == 0) { return; } - payloadBytes += protobuf.recordMessage(RESOURCE_MESSAGE); + payloadBytes += protobuf.recordMessage(resourceMessage); protobuf.recordMessage(buf, 1, payloadBytes); if (sender != null) { diff --git a/dd-trace-core/src/main/java/datadog/trace/core/otlp/common/OtlpResourceProto.java b/dd-trace-core/src/main/java/datadog/trace/core/otlp/common/OtlpResourceProto.java index 0a86668c234..ffd2810a996 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/otlp/common/OtlpResourceProto.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/otlp/common/OtlpResourceProto.java @@ -9,8 +9,10 @@ import datadog.communication.serialization.GrowableBuffer; import datadog.communication.serialization.StreamingBuffer; import datadog.trace.api.Config; +import datadog.trace.api.ProcessTags; import java.util.Arrays; import java.util.HashSet; +import java.util.List; import java.util.Locale; import java.util.Set; @@ -31,9 +33,25 @@ private OtlpResourceProto() {} "telemetry.sdk.version", "telemetry.sdk.language")); + /** Prefix applied to {@code datadog.runtime_id} and process-tag resource attributes. */ + private static final String DATADOG_PREFIX = "datadog."; + + /** Vendor-neutral resource (no {@code datadog.*}). Used by the OTLP trace/metric export. */ public static final byte[] RESOURCE_MESSAGE = buildResourceMessage(Config.get()); + /** + * Resource that additionally carries {@code datadog.runtime_id} and process tags (each prefixed + * {@code datadog.}). Used by the default-mode SDK trace-metrics export; omitted in OTel-semantics + * mode. + */ + public static final byte[] RESOURCE_MESSAGE_WITH_DATADOG_ATTRS = + buildResourceMessage(Config.get(), true); + static byte[] buildResourceMessage(Config config) { + return buildResourceMessage(config, false); + } + + static byte[] buildResourceMessage(Config config, boolean includeDatadogResourceAttributes) { GrowableBuffer buf = new GrowableBuffer(512); String serviceName = config.getServiceName(); @@ -67,6 +85,10 @@ static byte[] buildResourceMessage(Config config) { } }); + if (includeDatadogResourceAttributes) { + writeDatadogResourceAttributes(buf, config); + } + OtlpProtoBuffer protobuf = new OtlpProtoBuffer(buf.capacity()); int numBytes = protobuf.recordMessage(buf, 1); byte[] resourceMessage = new byte[numBytes]; @@ -75,6 +97,24 @@ static byte[] buildResourceMessage(Config config) { return resourceMessage; } + private static void writeDatadogResourceAttributes(StreamingBuffer buf, Config config) { + String runtimeId = config.getRuntimeId(); + if (runtimeId != null && !runtimeId.isEmpty()) { + writeResourceAttribute(buf, DATADOG_PREFIX + "runtime_id", runtimeId); + } + // Process tags arrive as "key:value" pairs; emit each as datadog. = value. + List processTags = ProcessTags.getTagsAsStringList(); + if (processTags != null) { + for (String tag : processTags) { + int colon = tag.indexOf(':'); + if (colon > 0) { + writeResourceAttribute( + buf, DATADOG_PREFIX + tag.substring(0, colon), tag.substring(colon + 1)); + } + } + } + } + private static void writeResourceAttribute(StreamingBuffer buf, String key, String value) { writeTag(buf, 1, LEN_WIRE_TYPE); writeAttribute(buf, STRING_ATTRIBUTE, key, value); diff --git a/dd-trace-core/src/test/java/datadog/trace/common/metrics/OtlpStatsMetricWriterTest.java b/dd-trace-core/src/test/java/datadog/trace/common/metrics/OtlpStatsMetricWriterTest.java index cffa781cc15..377f457f69a 100644 --- a/dd-trace-core/src/test/java/datadog/trace/common/metrics/OtlpStatsMetricWriterTest.java +++ b/dd-trace-core/src/test/java/datadog/trace/common/metrics/OtlpStatsMetricWriterTest.java @@ -142,6 +142,35 @@ private static DecodedMetric decode(byte[] payload) throws IOException { return metric; } + /** + * Decodes the {@code Resource.attributes} ({@code ResourceMetrics.resource = 1} → {@code + * Resource.attributes = 1}) into a key→value map, for asserting the {@code datadog.*} resource + * attributes emitted in default mode. + */ + private static Map decodeResourceAttributes(byte[] payload) throws IOException { + CodedInputStream metricsData = CodedInputStream.newInstance(payload); + metricsData.readTag(); // MetricsData.resource_metrics = 1 + CodedInputStream resourceMetrics = metricsData.readBytes().newCodedInput(); + Map attrs = new HashMap<>(); + while (!resourceMetrics.isAtEnd()) { + int tag = resourceMetrics.readTag(); + if (WireFormat.getTagFieldNumber(tag) == 1) { // Resource + CodedInputStream resource = resourceMetrics.readBytes().newCodedInput(); + while (!resource.isAtEnd()) { + int rtag = resource.readTag(); + if (WireFormat.getTagFieldNumber(rtag) == 1) { // KeyValue attributes + readKeyValue(resource.readBytes().newCodedInput(), attrs); + } else { + resource.skipField(rtag); + } + } + } else { + resourceMetrics.skipField(tag); + } + } + return attrs; + } + private static DecodedMetric parseScopeMetrics(CodedInputStream scopeMetrics) throws IOException { DecodedMetric metric = null; while (!scopeMetrics.isAtEnd()) { @@ -541,6 +570,46 @@ void defaultModePassesFullOriginThrough() throws IOException { assertEquals("ciapp-test", attrs.get("datadog.origin"), "full origin emitted verbatim"); } + // ── resource attributes (datadog.runtime_id / process tags) ──────────────── + + @Test + void defaultModeResourceCarriesRuntimeId() throws IOException { + // runtime-id is enabled by default, so default-mode payloads carry datadog.runtime_id on the + // Resource. + CapturingSender sender = new CapturingSender(); + OtlpStatsMetricWriter writer = new OtlpStatsMetricWriter(sender, false); + writer.startBucket(1, SECONDS.toNanos(1_700_000_000L), SECONDS.toNanos(10)); + writer.add(okEntry(SECONDS.toNanos(1), 1)); + writer.finishBucket(); + + Map resourceAttrs = decodeResourceAttributes(sender.lastPayload); + assertTrue( + resourceAttrs.containsKey("datadog.runtime_id"), + "default mode resource carries datadog.runtime_id"); + Object runtimeId = resourceAttrs.get("datadog.runtime_id"); + assertNotNull(runtimeId, "runtime id value present"); + assertFalse(runtimeId.toString().isEmpty(), "runtime id value non-empty"); + } + + @Test + void otelSemanticsModeResourceOmitsDatadogAttributes() throws IOException { + CapturingSender sender = new CapturingSender(); + OtlpStatsMetricWriter writer = new OtlpStatsMetricWriter(sender, true); + writer.startBucket(1, SECONDS.toNanos(1_700_000_000L), SECONDS.toNanos(10)); + writer.add(okEntry(SECONDS.toNanos(1), 1)); + writer.finishBucket(); + + Map resourceAttrs = decodeResourceAttributes(sender.lastPayload); + assertFalse( + resourceAttrs.containsKey("datadog.runtime_id"), + "otel-semantics mode resource omits datadog.runtime_id"); + for (String key : resourceAttrs.keySet()) { + assertFalse( + key.startsWith("datadog."), + "otel-semantics mode resource has no datadog.* attrs: " + key); + } + } + private static long sumBuckets(DataPoint dp) { long total = 0; for (long c : dp.bucketCounts) { diff --git a/dd-trace-core/src/test/java/datadog/trace/core/otlp/common/OtlpResourceProtoTest.java b/dd-trace-core/src/test/java/datadog/trace/core/otlp/common/OtlpResourceProtoTest.java index cb613137bcc..da3071f450d 100644 --- a/dd-trace-core/src/test/java/datadog/trace/core/otlp/common/OtlpResourceProtoTest.java +++ b/dd-trace-core/src/test/java/datadog/trace/core/otlp/common/OtlpResourceProtoTest.java @@ -7,6 +7,7 @@ import static datadog.trace.api.config.GeneralConfig.VERSION; import static datadog.trace.api.config.TracerConfig.TRACE_REPORT_HOSTNAME; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; import com.google.protobuf.CodedInputStream; @@ -17,6 +18,7 @@ import java.util.Map; import java.util.Properties; import java.util.stream.Stream; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; @@ -158,6 +160,30 @@ void testBuildResourceMessage( assertEquals(expectedAttributes, actualAttributes, "For case: " + caseName); } + /** + * The datadog-attrs variant ({@code buildResourceMessage(config, true)}) carries {@code + * datadog.runtime_id}; the plain variant omits it. (Process tags are emitted only when the + * experimental process-tag propagation is enabled, so they aren't asserted here.) + */ + @Test + void datadogResourceAttributesVariantCarriesRuntimeId() throws IOException { + Config config = Config.get(props(SERVICE_NAME, "my-service")); + + Map withDatadog = + parseResourceAttributes(OtlpResourceProto.buildResourceMessage(config, true)); + Map plain = + parseResourceAttributes(OtlpResourceProto.buildResourceMessage(config, false)); + + assertTrue( + withDatadog.containsKey("datadog.runtime_id"), + "datadog-attrs variant carries datadog.runtime_id"); + assertEquals( + config.getRuntimeId(), + withDatadog.get("datadog.runtime_id"), + "runtime id matches the config value"); + assertFalse(plain.containsKey("datadog.runtime_id"), "plain variant omits datadog.runtime_id"); + } + // ── parsing helpers ─────────────────────────────────────────────────────── /** From e11a6347190c4abbe5681a7db797ca83438d6a74 Mon Sep 17 00:00:00 2001 From: Matthew Li Date: Tue, 23 Jun 2026 09:38:49 -0400 Subject: [PATCH 8/8] more specific logging --- .../datadog/trace/common/metrics/OtlpStatsMetricWriter.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/OtlpStatsMetricWriter.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/OtlpStatsMetricWriter.java index 7103c3d7092..28ec2c3bed3 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/OtlpStatsMetricWriter.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/OtlpStatsMetricWriter.java @@ -116,8 +116,9 @@ private static OtlpSender createSender(Config config) { config.getOtlpMetricsCompression()); default: // HTTP_JSON has no protobuf-free encoder yet; JSON transport is deferred per the plan. - log.debug( - "Unsupported OTLP metrics protocol for trace metrics export: {}", + log.warn( + "OTLP trace metrics export disabled: unsupported metrics protocol {}. " + + "Set OTEL_EXPORTER_OTLP_METRICS_PROTOCOL to grpc or http/protobuf.", config.getOtlpMetricsProtocol()); return null; }