Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package datadog.trace.common.metrics;

import datadog.metrics.api.Histogram;
import datadog.trace.bootstrap.otlp.metrics.OtlpHistogramPoint;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

/**
* Projects a client-side-stats {@link Histogram} (a DDSketch over span durations recorded in
* <em>nanoseconds</em>) onto the fixed explicit-bounds histogram layout mandated by the OTLP Trace
* Metrics Export RFC.
*/
final class OtlpHistogramBuckets {
private OtlpHistogramBuckets() {}

private static final double NANOS_PER_SECOND = 1_000_000_000d;

static final double[] BOUNDS_SECONDS = {
0.002, 0.004, 0.006, 0.008, 0.01, 0.05, 0.1, 0.2, 0.4, 0.8, 1, 1.4, 2, 5, 10, 15
};

static final List<Double> EXPLICIT_BOUNDS;

static {
List<Double> bounds = new ArrayList<>(BOUNDS_SECONDS.length + 1);
for (double bound : BOUNDS_SECONDS) {
bounds.add(bound);
}
bounds.add(Double.POSITIVE_INFINITY);
EXPLICIT_BOUNDS = Collections.unmodifiableList(bounds);
}

static int bucketIndex(double seconds) {
for (int i = 0; i < BOUNDS_SECONDS.length; i++) {
if (seconds <= BOUNDS_SECONDS[i]) {
return i;
}
}
return BOUNDS_SECONDS.length; // overflow
}

/**
* Re-bins {@code histogram} (nanosecond-valued) into an {@link OtlpHistogramPoint} expressed in
* seconds with OTLP's fixed bucket layout. {@code count}, {@code min}, and {@code max} are taken
* directly from the sketch; {@code sumNanos} is the exact duration sum tracked alongside the
* sketch by {@link AggregateEntry} (the DDSketch-derived sum would only be approximate).
*/
static OtlpHistogramPoint toHistogramPoint(Histogram histogram, long sumNanos) {
long[] bucketCounts = new long[BOUNDS_SECONDS.length + 1];

List<Double> binBoundaries = histogram.getBinBoundaries();
List<Double> binCounts = histogram.getBinCounts();
for (int i = 0; i < binBoundaries.size(); i++) {
double upperSeconds = binBoundaries.get(i) / NANOS_PER_SECOND;
long count = (long) binCounts.get(i).doubleValue();
bucketCounts[bucketIndex(upperSeconds)] += count;
}

List<Double> counts = new ArrayList<>(bucketCounts.length);
for (long count : bucketCounts) {
counts.add((double) count);
}

double sumSeconds = sumNanos / NANOS_PER_SECOND;
double minSeconds = histogram.isEmpty() ? 0d : histogram.getMinValue() / NANOS_PER_SECOND;
double maxSeconds = histogram.isEmpty() ? 0d : histogram.getMaxValue() / NANOS_PER_SECOND;

return new OtlpHistogramPoint(
histogram.getCount(), EXPLICIT_BOUNDS, counts, sumSeconds, minSeconds, maxSeconds);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
package datadog.trace.common.metrics;

import static datadog.trace.bootstrap.otel.metrics.OtelInstrumentType.HISTOGRAM;
import static datadog.trace.bootstrap.otlp.common.OtlpAttributeVisitor.LONG_ATTRIBUTE;
import static datadog.trace.bootstrap.otlp.common.OtlpAttributeVisitor.STRING_ATTRIBUTE;
import static datadog.trace.core.otlp.common.OtlpCommonProto.I64_WIRE_TYPE;
import static datadog.trace.core.otlp.common.OtlpCommonProto.LEN_WIRE_TYPE;
import static datadog.trace.core.otlp.common.OtlpCommonProto.writeAttribute;
import static datadog.trace.core.otlp.common.OtlpCommonProto.writeI64;
import static datadog.trace.core.otlp.common.OtlpCommonProto.writeTag;
import static datadog.trace.core.otlp.common.OtlpResourceProto.RESOURCE_MESSAGE;
import static datadog.trace.core.otlp.metrics.OtlpMetricsProto.recordDataPointMessage;
import static datadog.trace.core.otlp.metrics.OtlpMetricsProto.recordMetricMessage;
import static datadog.trace.core.otlp.metrics.OtlpMetricsProto.recordScopedMetricsMessage;

import datadog.communication.serialization.GrowableBuffer;
import datadog.metrics.api.Histogram;
import datadog.trace.api.Config;
import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString;
import datadog.trace.bootstrap.otel.common.OtelInstrumentationScope;
import datadog.trace.bootstrap.otel.metrics.OtelInstrumentDescriptor;
import datadog.trace.bootstrap.otlp.metrics.OtlpHistogramPoint;
import datadog.trace.core.otlp.common.OtlpGrpcSender;
import datadog.trace.core.otlp.common.OtlpHttpSender;
import datadog.trace.core.otlp.common.OtlpProtoBuffer;
import datadog.trace.core.otlp.common.OtlpSender;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A {@link MetricWriter} that exports the existing client-side trace (RED) stats as a single
* vendor-neutral OTLP delta-temporality histogram named {@code traces.span.sdk.metrics.duration}
* (unit {@code s}).
*
* <p>This is the parallel-to-{@link SerializingMetricWriter} OTLP export path. It hangs off the
* same in-memory aggregation ({@link ConflatingMetricsAggregator} / {@link Aggregator}) and
* consumes the same {@link AggregateEntry} stream; only the wire encoding and transport differ.
* Native msgpack stats and OTLP export are mutually exclusive (selected at the factory).
*
* <p>Assembly mirrors {@code OtlpMetricsProtoCollector}
*/
public final class OtlpStatsMetricWriter implements MetricWriter {
private static final Logger log = LoggerFactory.getLogger(OtlpStatsMetricWriter.class);

static final String METRIC_NAME = "traces.span.sdk.metrics.duration";
static final String METRIC_UNIT = "s";

private static final OtelInstrumentDescriptor METRIC_DESCRIPTOR =
new OtelInstrumentDescriptor(METRIC_NAME, HISTOGRAM, false, null, METRIC_UNIT);
private static final OtelInstrumentationScope SCOPE =
new OtelInstrumentationScope("datadog.trace.metrics", null, null);

private static final int DP_START_TIME_FIELD = 2;
private static final int DP_TIME_FIELD = 3;
private static final int DP_ATTRIBUTES_FIELD = 9;

private static final String SPAN_NAME = "span.name";
private static final String SPAN_KIND = "span.kind";
private static final String HTTP_REQUEST_METHOD = "http.request.method";
private static final String HTTP_RESPONSE_STATUS_CODE = "http.response.status_code";
private static final String HTTP_ROUTE = "http.route";
private static final String RPC_RESPONSE_STATUS_CODE = "rpc.response.status_code";
private static final String STATUS_CODE = "status.code";
private static final String STATUS_CODE_ERROR = "ERROR";
private static final String DATADOG_OPERATION_NAME = "datadog.operation.name";
private static final String DATADOG_SPAN_TYPE = "datadog.span.type";
private static final String DATADOG_SPAN_TOP_LEVEL = "datadog.span.top_level";
private static final String DATADOG_ORIGIN = "datadog.origin";

@Nullable private final OtlpSender sender;
private final boolean otelSemanticsMode;

private final GrowableBuffer buf = new GrowableBuffer(512);
private final OtlpProtoBuffer protobuf = new OtlpProtoBuffer(8192);

private long startNanos;
private long endNanos;

private int payloadBytes;
private int scopedBytes;
private int metricBytes;

public OtlpStatsMetricWriter(Config config) {
this(createSender(config), config.isTraceOtelSemanticsEnabled());
}

// visible for testing: lets tests inject a capturing sender to decode the emitted protobuf
OtlpStatsMetricWriter(@Nullable OtlpSender sender) {
this(sender, false);
}

// visible for testing: lets tests inject a capturing sender and control the semantics mode
OtlpStatsMetricWriter(@Nullable OtlpSender sender, boolean otelSemanticsMode) {
this.sender = sender;
this.otelSemanticsMode = otelSemanticsMode;
}

@Nullable
private static OtlpSender createSender(Config config) {
// mirrors OtlpMetricsService's protocol-based sender selection
switch (config.getOtlpMetricsProtocol()) {
case GRPC:
return new OtlpGrpcSender(
config.getOtlpMetricsEndpoint(),
"/opentelemetry.proto.collector.metrics.v1.MetricsService/Export",
config.getOtlpMetricsHeaders(),
config.getOtlpMetricsTimeout(),
config.getOtlpMetricsCompression());
case HTTP_PROTOBUF:
return new OtlpHttpSender(
config.getOtlpMetricsEndpoint(),
"/v1/metrics",
config.getOtlpMetricsHeaders(),
config.getOtlpMetricsTimeout(),
config.getOtlpMetricsCompression());
default:
// HTTP_JSON has no protobuf-free encoder yet; JSON transport is deferred per the plan.
log.warn(
"OTLP trace metrics export disabled: unsupported metrics protocol {}. "
+ "Set OTEL_EXPORTER_OTLP_METRICS_PROTOCOL to grpc or http/protobuf.",
config.getOtlpMetricsProtocol());
return null;
}
}

@Override
public void startBucket(int metricCount, long start, long duration) {
// start/duration arrive as epoch nanos / interval nanos (see Aggregator#report)
this.startNanos = start;
this.endNanos = start + duration;
this.payloadBytes = 0;
this.scopedBytes = 0;
this.metricBytes = 0;
}

@Override
public void add(AggregateEntry entry) {
Histogram okLatencies = entry.getOkLatencies();
if (!okLatencies.isEmpty()) {
addDataPoint(entry, okLatencies, false);
}

Histogram errorLatencies = entry.getErrorLatencies();
if (errorLatencies != null) {
addDataPoint(entry, errorLatencies, true);
}
}

private void addDataPoint(AggregateEntry entry, Histogram latencies, boolean error) {
writeDataPointAttributes(entry, error);
writeTag(buf, DP_START_TIME_FIELD, I64_WIRE_TYPE);
writeI64(buf, startNanos);
writeTag(buf, DP_TIME_FIELD, I64_WIRE_TYPE);
writeI64(buf, endNanos);
long sumNanos = error ? entry.getErrorDuration() : entry.getOkDuration();
OtlpHistogramPoint point = OtlpHistogramBuckets.toHistogramPoint(latencies, sumNanos);
metricBytes += recordDataPointMessage(buf, point, protobuf);
}

private void writeDataPointAttributes(AggregateEntry entry, boolean error) {
if (error) {
writeStringAttribute(STATUS_CODE, STATUS_CODE_ERROR);
}
// OTel semconv attrs are emitted in both modes
writeStringAttribute(SPAN_NAME, entry.getResource());
writeStringAttribute(SPAN_KIND, entry.getSpanKind());
if (entry.getHttpMethod() != null) {
writeStringAttribute(HTTP_REQUEST_METHOD, entry.getHttpMethod());
}
if (entry.getHttpStatusCode() != 0) {
writeLongAttribute(HTTP_RESPONSE_STATUS_CODE, entry.getHttpStatusCode());
}
if (entry.getHttpEndpoint() != null) {
writeStringAttribute(HTTP_ROUTE, entry.getHttpEndpoint());
}
if (entry.getGrpcStatusCode() != null) {
writeStringAttribute(RPC_RESPONSE_STATUS_CODE, entry.getGrpcStatusCode());
}
// Default (Datadog) mode: emit datadog.* per-point attributes
if (!otelSemanticsMode) {
writeStringAttribute(DATADOG_OPERATION_NAME, entry.getOperationName());
writeStringAttribute(DATADOG_SPAN_TYPE, entry.getType());
writeLongAttribute(
DATADOG_SPAN_TOP_LEVEL, entry.getTopLevelCount() == entry.getHitCount() ? 1L : 0L);
// Emit the full origin (synthetics, synthetics-browser, rum, ciapp-test, lambda, ...) when
// present; writeStringAttribute no-ops on a null value.
writeStringAttribute(DATADOG_ORIGIN, entry.getOrigin());
}
}

private void writeStringAttribute(String key, @Nullable UTF8BytesString value) {
if (value != null) {
writeStringAttribute(key, value.toString());
}
}

private void writeStringAttribute(String key, String value) {
writeTag(buf, DP_ATTRIBUTES_FIELD, LEN_WIRE_TYPE);
writeAttribute(buf, STRING_ATTRIBUTE, key, value);
}

private void writeLongAttribute(String key, long value) {
writeTag(buf, DP_ATTRIBUTES_FIELD, LEN_WIRE_TYPE);
writeAttribute(buf, LONG_ATTRIBUTE, key, value);
}

@Override
public void finishBucket() {
try {
if (metricBytes > 0) {
scopedBytes += recordMetricMessage(buf, METRIC_DESCRIPTOR, metricBytes, protobuf);
}
if (scopedBytes > 0) {
payloadBytes += recordScopedMetricsMessage(buf, SCOPE, scopedBytes, protobuf);
}
if (payloadBytes == 0) {
return;
}
payloadBytes += protobuf.recordMessage(RESOURCE_MESSAGE);
protobuf.recordMessage(buf, 1, payloadBytes);

if (sender != null) {
sender.send(protobuf.toPayload());
}
} finally {
reset();
}
}

@Override
public void reset() {
buf.reset();
protobuf.reset();
payloadBytes = 0;
scopedBytes = 0;
metricBytes = 0;
}
}
Loading
Loading