11# (c) Copyright IBM Corp. 2025
22
33try :
4- from typing import TYPE_CHECKING , Any , Callable , Dict , Tuple
4+ from typing import TYPE_CHECKING , Any , Callable , Dict , Tuple , Optional
55
66 import kafka # noqa: F401
77 import wrapt
@@ -37,14 +37,16 @@ def trace_kafka_send(
3737 span .set_attribute ("kafka.access" , "send" )
3838
3939 # context propagation
40+ headers = kwargs .get ("headers" , [])
4041 tracer .inject (
4142 span .context ,
4243 Format .KAFKA_HEADERS ,
43- kwargs . get ( " headers" , {}) ,
44+ headers ,
4445 disable_w3c_trace_context = True ,
4546 )
4647
4748 try :
49+ kwargs ["headers" ] = headers
4850 res = wrapped (* args , ** kwargs )
4951 except Exception as exc :
5052 span .record_exception (exc )
@@ -62,36 +64,42 @@ def trace_kafka_consume(
6264 return wrapped (* args , ** kwargs )
6365
6466 tracer , parent_span , _ = get_tracer_tuple ()
65-
66- parent_context = (
67- parent_span .get_span_context ()
68- if parent_span
69- else tracer .extract (
70- Format .KAFKA_HEADERS , {}, disable_w3c_trace_context = True
67+ exception = None
68+ res = None
69+
70+ try :
71+ res = wrapped (* args , ** kwargs )
72+ except Exception as exc :
73+ exception = exc
74+ finally :
75+ headers = res .headers if res else []
76+ parent_context = (
77+ parent_span .get_span_context ()
78+ if parent_span
79+ else tracer .extract (
80+ Format .KAFKA_HEADERS , headers , disable_w3c_trace_context = True ,
81+ )
7182 )
72- )
73-
74- with tracer .start_as_current_span (
75- "kafka-consumer" , span_context = parent_context , kind = SpanKind .CONSUMER
76- ) as span :
77- topic = list (instance .subscription ())[0 ]
78- span .set_attribute ("kafka.service" , topic )
79- span .set_attribute ("kafka.access" , "consume" )
80-
81- try :
82- res = wrapped (* args , ** kwargs )
83- except Exception as exc :
84- span .record_exception (exc )
85- else :
86- return res
83+ with tracer .start_as_current_span (
84+ "kafka-consumer" , span_context = parent_context , kind = SpanKind .CONSUMER
85+ ) as span :
86+ span .set_attribute (
87+ "kafka.service" ,
88+ res .topic if res else list (instance .subscription ())[0 ]
89+ )
90+ span .set_attribute ("kafka.access" , "consume" )
91+
92+ if exception :
93+ span .record_exception (exception )
94+ return res
8795
8896 @wrapt .patch_function_wrapper ("kafka" , "KafkaConsumer.poll" )
8997 def trace_kafka_poll (
9098 wrapped : Callable [..., "kafka.KafkaConsumer.poll" ],
9199 instance : "kafka.KafkaConsumer" ,
92100 args : Tuple [int , str , Tuple [Any , ...]],
93101 kwargs : Dict [str , Any ],
94- ) -> Dict [str , Any ]:
102+ ) -> Optional [ Dict [str , Any ] ]:
95103 if tracing_is_off ():
96104 return wrapped (* args , ** kwargs )
97105
@@ -102,27 +110,35 @@ def trace_kafka_poll(
102110 if parent_span and parent_span .name == "kafka-consumer" :
103111 return wrapped (* args , ** kwargs )
104112
105- parent_context = (
106- parent_span .get_span_context ()
107- if parent_span
108- else tracer .extract (
109- Format .KAFKA_HEADERS , {}, disable_w3c_trace_context = True
113+ exception = None
114+ res = None
115+
116+ try :
117+ res = wrapped (* args , ** kwargs )
118+ except Exception as exc :
119+ exception = exc
120+ finally :
121+ headers = res .headers if res else []
122+ parent_context = (
123+ parent_span .get_span_context ()
124+ if parent_span
125+ else tracer .extract (
126+ Format .KAFKA_HEADERS , headers , disable_w3c_trace_context = True
127+ )
110128 )
111- )
112129
113- with tracer .start_as_current_span (
114- "kafka-consumer" , span_context = parent_context , kind = SpanKind .CONSUMER
115- ) as span :
116- topic = list (instance .subscription ())[0 ]
117- span .set_attribute ("kafka.service" , topic )
118- span .set_attribute ("kafka.access" , "poll" )
119-
120- try :
121- res = wrapped (* args , ** kwargs )
122- except Exception as exc :
123- span .record_exception (exc )
124- else :
125- return res
130+ with tracer .start_as_current_span (
131+ "kafka-consumer" , span_context = parent_context , kind = SpanKind .CONSUMER
132+ ) as span :
133+ span .set_attribute (
134+ "kafka.service" ,
135+ res .topic if res else list (instance .subscription ())[0 ],
136+ )
137+ span .set_attribute ("kafka.access" , "poll" )
138+
139+ if exception :
140+ span .record_exception (exception )
141+ return res
126142
127143 logger .debug ("Instrumenting Kafka (kafka-python)" )
128144except ImportError :
0 commit comments