Skip to content

Commit 888ab65

Browse files
committed
feat: add support for event bridge DSM context extraction
1 parent 3bf5778 commit 888ab65

3 files changed

Lines changed: 219 additions & 2 deletions

File tree

datadog_lambda/config.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,10 @@ def _resolve_env(self, key, default=None, cast=None, depends_on_tracing=False):
8989
data_streams_enabled = _get_env(
9090
"DD_DATA_STREAMS_ENABLED", "false", as_bool, depends_on_tracing=True
9191
)
92+
# EventBridge bus name used as the DSM `exchange` tag. The bus name is not
93+
# present in the inbound event, so it must be provided explicitly to allow
94+
# the consume checkpoint to pair with the EventBridge produce checkpoint.
95+
dsm_exchange_name = _get_env("DD_DSM_EXCHANGE_NAME")
9296
appsec_enabled = _get_env("DD_APPSEC_ENABLED", "false", as_bool)
9397
sca_enabled = _get_env("DD_APPSEC_SCA_ENABLED", "false", as_bool)
9498

datadog_lambda/tracing.py

Lines changed: 62 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,43 @@ def _dsm_set_checkpoint(context_json, event_type, arn):
8282
)
8383

8484

85+
def _dsm_set_eventbridge_checkpoint(context_json, detail_type):
86+
"""Set a DSM consume checkpoint for an EventBridge event.
87+
88+
Unlike the SQS/SNS/Kinesis helper, the EventBridge edge tags include an
89+
`exchange` tag (the bus name) to mirror the produce-side tags so the
90+
consume node pairs with the produce node. The bus name is not present in
91+
the inbound event, so it is sourced from `DD_DSM_EXCHANGE_NAME` when set.
92+
The public `set_consume_checkpoint` helper cannot emit an `exchange` tag,
93+
so the lower-level processor API is used directly.
94+
"""
95+
if not config.data_streams_enabled:
96+
return
97+
98+
if not detail_type:
99+
return
100+
101+
try:
102+
from ddtrace.internal.datastreams import data_streams_processor
103+
from ddtrace.internal.datastreams.processor import PROPAGATION_KEY_BASE_64
104+
105+
processor = data_streams_processor()
106+
if not processor:
107+
return
108+
109+
carrier_get = lambda k: context_json and context_json.get(k) # noqa: E731
110+
processor.decode_pathway_b64(carrier_get(PROPAGATION_KEY_BASE_64))
111+
112+
tags = ["direction:in", "topic:" + detail_type, "type:eventbridge"]
113+
if config.dsm_exchange_name:
114+
tags.append("exchange:" + config.dsm_exchange_name)
115+
processor.set_checkpoint(tags)
116+
except Exception as e:
117+
logger.debug(
118+
f"DSM:Failed to set consume checkpoint for eventbridge {detail_type}: {e}"
119+
)
120+
121+
85122
def _convert_xray_trace_id(xray_trace_id):
86123
"""
87124
Convert X-Ray trace id (hex)'s last 63 bits to a Datadog trace id (int).
@@ -353,12 +390,32 @@ def _extract_context_from_eventbridge_sqs_event(event):
353390
This is only possible if first record in `Records` contains a
354391
`body` field which contains the EventBridge `detail` as a JSON string.
355392
"""
356-
first_record = event.get("Records")[0]
393+
records = event.get("Records")
394+
first_record = records[0]
357395
body_str = first_record.get("body")
358396
body = json.loads(body_str)
359397
detail = body.get("detail")
398+
# If `detail` is missing this is not an EventBridge -> SQS event; raising
399+
# here lets the caller fall back to the regular SQS extraction path before
400+
# any DSM checkpoint is set, avoiding double counting for plain SQS events.
360401
dd_context = detail.get("_datadog")
361402

403+
# The event has been confirmed as EventBridge -> SQS. Set a consume
404+
# checkpoint for every record in the batch. The message is consumed from
405+
# the SQS queue, so it follows SQS conventions (type:sqs, topic:queue ARN).
406+
if config.data_streams_enabled:
407+
for record in records:
408+
try:
409+
record_body = json.loads(record.get("body"))
410+
record_context = (record_body.get("detail") or {}).get("_datadog")
411+
_dsm_set_checkpoint(
412+
record_context, "sqs", record.get("eventSourceARN", "")
413+
)
414+
except Exception:
415+
logger.debug(
416+
"Failed to set DSM checkpoint for an EventBridge to SQS record."
417+
)
418+
362419
if is_step_function_event(dd_context):
363420
try:
364421
return extract_context_from_step_functions(dd_context, None)
@@ -379,8 +436,11 @@ def extract_context_from_eventbridge_event(event, lambda_context):
379436
that header.
380437
"""
381438
try:
382-
detail = event.get("detail")
439+
detail = event.get("detail") or {}
383440
dd_context = detail.get("_datadog")
441+
442+
_dsm_set_eventbridge_checkpoint(dd_context, event.get("detail-type"))
443+
384444
if not dd_context:
385445
return extract_context_from_lambda_context(lambda_context)
386446

tests/test_tracing.py

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
_dsm_set_checkpoint,
4747
extract_context_from_kinesis_event,
4848
extract_context_from_sqs_or_sns_event_or_context,
49+
extract_context_from_eventbridge_event,
4950
)
5051

5152
from datadog_lambda.trigger import parse_event_source
@@ -3646,3 +3647,155 @@ def test_kinesis_data_streams_disabled(self):
36463647
arn = "arn:aws:kinesis:us-east-1:123456789012:stream/test-stream"
36473648

36483649
_dsm_set_checkpoint(context_json, event_type, arn)
3650+
3651+
# EVENTBRIDGE -> SQS TESTS
3652+
3653+
@staticmethod
3654+
def _eventbridge_sqs_record(queue_arn, pathway_ctx):
3655+
body = {
3656+
"detail-type": "MyDetailType",
3657+
"source": "my.event.source",
3658+
"detail": {
3659+
"_datadog": {
3660+
# Complete trace context so the extractor returns early and
3661+
# does not fall through to the regular SQS path.
3662+
"x-datadog-trace-id": "12345",
3663+
"x-datadog-parent-id": "67890",
3664+
"x-datadog-sampling-priority": "1",
3665+
"dd-pathway-ctx-base64": pathway_ctx,
3666+
}
3667+
},
3668+
}
3669+
return {
3670+
"eventSourceARN": queue_arn,
3671+
"eventSource": "aws:sqs",
3672+
"body": json.dumps(body),
3673+
}
3674+
3675+
def test_eventbridge_sqs_context_propagated(self):
3676+
queue_arn = "arn:aws:sqs:us-east-1:123456789012:eb-queue"
3677+
event = {"Records": [self._eventbridge_sqs_record(queue_arn, "12345")]}
3678+
3679+
extract_context_from_sqs_or_sns_event_or_context(
3680+
event, self.lambda_context, parse_event_source(event)
3681+
)
3682+
3683+
# EventBridge -> SQS is consumed from the queue, so it uses SQS tags.
3684+
self.assertEqual(self.mock_checkpoint.call_count, 1)
3685+
args, _ = self.mock_checkpoint.call_args
3686+
self.assertEqual(args[0], "sqs")
3687+
self.assertEqual(args[1], queue_arn)
3688+
carrier_get = args[2]
3689+
self.assertEqual(carrier_get("dd-pathway-ctx-base64"), "12345")
3690+
3691+
def test_eventbridge_sqs_checkpoints_all_records(self):
3692+
arn1 = "arn:aws:sqs:us-east-1:123456789012:eb-queue"
3693+
arn2 = "arn:aws:sqs:us-east-1:123456789012:eb-queue-2"
3694+
event = {
3695+
"Records": [
3696+
self._eventbridge_sqs_record(arn1, "ctx-1"),
3697+
self._eventbridge_sqs_record(arn2, "ctx-2"),
3698+
]
3699+
}
3700+
3701+
extract_context_from_sqs_or_sns_event_or_context(
3702+
event, self.lambda_context, parse_event_source(event)
3703+
)
3704+
3705+
self.assertEqual(self.mock_checkpoint.call_count, 2)
3706+
first_args, _ = self.mock_checkpoint.call_args_list[0]
3707+
second_args, _ = self.mock_checkpoint.call_args_list[1]
3708+
self.assertEqual((first_args[0], first_args[1]), ("sqs", arn1))
3709+
self.assertEqual(first_args[2]("dd-pathway-ctx-base64"), "ctx-1")
3710+
self.assertEqual((second_args[0], second_args[1]), ("sqs", arn2))
3711+
self.assertEqual(second_args[2]("dd-pathway-ctx-base64"), "ctx-2")
3712+
3713+
@patch("datadog_lambda.config.Config.data_streams_enabled", False)
3714+
def test_eventbridge_sqs_data_streams_disabled(self):
3715+
queue_arn = "arn:aws:sqs:us-east-1:123456789012:eb-queue"
3716+
event = {"Records": [self._eventbridge_sqs_record(queue_arn, "12345")]}
3717+
3718+
extract_context_from_sqs_or_sns_event_or_context(
3719+
event, self.lambda_context, parse_event_source(event)
3720+
)
3721+
3722+
self.mock_checkpoint.assert_not_called()
3723+
3724+
3725+
class TestEventBridgeDSMLogic(unittest.TestCase):
3726+
def setUp(self):
3727+
self.lambda_context = get_mock_context()
3728+
self.mock_processor = Mock()
3729+
processor_patcher = patch(
3730+
"ddtrace.internal.datastreams.data_streams_processor",
3731+
return_value=self.mock_processor,
3732+
)
3733+
processor_patcher.start()
3734+
self.addCleanup(processor_patcher.stop)
3735+
config_patcher = patch(
3736+
"datadog_lambda.config.Config.data_streams_enabled", True
3737+
)
3738+
config_patcher.start()
3739+
self.addCleanup(config_patcher.stop)
3740+
3741+
@staticmethod
3742+
def _eventbridge_event(detail_type="MyDetailType", pathway_ctx="12345"):
3743+
return {
3744+
"detail-type": detail_type,
3745+
"source": "my.event.source",
3746+
"detail": {"_datadog": {"dd-pathway-ctx-base64": pathway_ctx}},
3747+
}
3748+
3749+
def test_eventbridge_context_propagated(self):
3750+
event = self._eventbridge_event()
3751+
3752+
extract_context_from_eventbridge_event(event, self.lambda_context)
3753+
3754+
self.mock_processor.decode_pathway_b64.assert_called_once_with("12345")
3755+
self.mock_processor.set_checkpoint.assert_called_once()
3756+
(tags,), _ = self.mock_processor.set_checkpoint.call_args
3757+
self.assertIn("direction:in", tags)
3758+
self.assertIn("type:eventbridge", tags)
3759+
self.assertIn("topic:MyDetailType", tags)
3760+
self.assertFalse(any(t.startswith("exchange:") for t in tags))
3761+
3762+
@patch("datadog_lambda.config.Config.dsm_exchange_name", "my-event-bus")
3763+
def test_eventbridge_exchange_tag_from_env(self):
3764+
event = self._eventbridge_event()
3765+
3766+
extract_context_from_eventbridge_event(event, self.lambda_context)
3767+
3768+
(tags,), _ = self.mock_processor.set_checkpoint.call_args
3769+
self.assertIn("exchange:my-event-bus", tags)
3770+
self.assertIn("topic:MyDetailType", tags)
3771+
self.assertIn("type:eventbridge", tags)
3772+
3773+
def test_eventbridge_no_detail_type_skips_checkpoint(self):
3774+
event = self._eventbridge_event(detail_type=None)
3775+
3776+
extract_context_from_eventbridge_event(event, self.lambda_context)
3777+
3778+
self.mock_processor.set_checkpoint.assert_not_called()
3779+
3780+
def test_eventbridge_no_dd_context_still_checkpoints(self):
3781+
event = {"detail-type": "MyDetailType", "detail": {}}
3782+
3783+
extract_context_from_eventbridge_event(event, self.lambda_context)
3784+
3785+
self.mock_processor.decode_pathway_b64.assert_called_once_with(None)
3786+
self.mock_processor.set_checkpoint.assert_called_once()
3787+
3788+
def test_eventbridge_missing_detail_still_checkpoints(self):
3789+
event = {"detail-type": "MyDetailType"}
3790+
3791+
extract_context_from_eventbridge_event(event, self.lambda_context)
3792+
3793+
self.mock_processor.set_checkpoint.assert_called_once()
3794+
3795+
@patch("datadog_lambda.config.Config.data_streams_enabled", False)
3796+
def test_eventbridge_data_streams_disabled(self):
3797+
event = self._eventbridge_event()
3798+
3799+
extract_context_from_eventbridge_event(event, self.lambda_context)
3800+
3801+
self.mock_processor.set_checkpoint.assert_not_called()

0 commit comments

Comments
 (0)