Skip to content

Commit e4ce09e

Browse files
CagriYoncapvital
andcommitted
fix: Kafka context propagation
Signed-off-by: Cagri Yonca <cagri@ibm.com> Co-authored-by: Paulo Vital <paulo.vital@ibm.com> Signed-off-by: Cagri Yonca <cagri@ibm.com>
1 parent 57ec636 commit e4ce09e

7 files changed

Lines changed: 668 additions & 166 deletions

File tree

src/instana/instrumentation/kafka/confluent_kafka_python.py

Lines changed: 62 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,13 @@ def trace_kafka_produce(
6666
span.set_attribute("kafka.access", "produce")
6767

6868
# context propagation
69-
headers = args[6] if len(args) > 6 else kwargs.get("headers", {})
69+
#
70+
# As stated in the official documentation at
71+
# https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#pythonclient-producer,
72+
# headers can be either a list of (key, value) pairs or a
73+
# dictionary. To maintain compatibility with the headers for the
74+
# Kafka Python library, we will use a list of tuples.
75+
headers = args[6] if len(args) > 6 else kwargs.get("headers", [])
7076
tracer.inject(
7177
span.context,
7278
Format.KAFKA_HEADERS,
@@ -75,44 +81,63 @@ def trace_kafka_produce(
7581
)
7682

7783
try:
84+
kwargs["headers"] = headers
7885
res = wrapped(*args, **kwargs)
7986
except Exception as exc:
8087
span.record_exception(exc)
8188
else:
8289
return res
8390

84-
def trace_kafka_consume(
85-
wrapped: Callable[..., InstanaConfluentKafkaConsumer.consume],
86-
instance: InstanaConfluentKafkaConsumer,
87-
args: Tuple[int, str, Tuple[Any, ...]],
88-
kwargs: Dict[str, Any],
89-
) -> List[confluent_kafka.Message]:
90-
if tracing_is_off():
91-
return wrapped(*args, **kwargs)
92-
91+
def create_span(
92+
span_type: str,
93+
topic: Optional[str] = "",
94+
headers: Optional[List[Tuple[str, bytes]]] = [],
95+
exception: Optional[str] = None,
96+
) -> None:
9397
tracer, parent_span, _ = get_tracer_tuple()
94-
9598
parent_context = (
9699
parent_span.get_span_context()
97100
if parent_span
98101
else tracer.extract(
99-
Format.KAFKA_HEADERS, {}, disable_w3c_trace_context=True
102+
Format.KAFKA_HEADERS,
103+
headers,
104+
disable_w3c_trace_context=True,
100105
)
101106
)
102-
103107
with tracer.start_as_current_span(
104108
"kafka-consumer", span_context=parent_context, kind=SpanKind.CONSUMER
105109
) as span:
106-
span.set_attribute("kafka.access", "consume")
110+
if topic:
111+
span.set_attribute("kafka.service", topic)
112+
span.set_attribute("kafka.access", span_type)
107113

108-
try:
109-
res = wrapped(*args, **kwargs)
110-
if isinstance(res, list) and len(res) > 0:
111-
span.set_attribute("kafka.service", res[0].topic())
112-
except Exception as exc:
113-
span.record_exception(exc)
114+
if exception:
115+
span.record_exception(exception)
116+
117+
def trace_kafka_consume(
118+
wrapped: Callable[..., InstanaConfluentKafkaConsumer.consume],
119+
instance: InstanaConfluentKafkaConsumer,
120+
args: Tuple[int, str, Tuple[Any, ...]],
121+
kwargs: Dict[str, Any],
122+
) -> List[confluent_kafka.Message]:
123+
if tracing_is_off():
124+
return wrapped(*args, **kwargs)
125+
126+
res = None
127+
exception = None
128+
129+
try:
130+
res = wrapped(*args, **kwargs)
131+
except Exception as exc:
132+
exception = exc
133+
finally:
134+
if res:
135+
for message in res:
136+
create_span("consume", message.topic(), message.headers())
114137
else:
115-
return res
138+
create_span("consume", exception=exception)
139+
140+
return res
116141

117142
def trace_kafka_poll(
118143
wrapped: Callable[..., InstanaConfluentKafkaConsumer.poll],
@@ -123,29 +148,24 @@ def trace_kafka_poll(
123148
if tracing_is_off():
124149
return wrapped(*args, **kwargs)
125150

126-
tracer, parent_span, _ = get_tracer_tuple()
127-
128-
parent_context = (
129-
parent_span.get_span_context()
130-
if parent_span
131-
else tracer.extract(
132-
Format.KAFKA_HEADERS, {}, disable_w3c_trace_context=True
133-
)
134-
)
135-
136-
with tracer.start_as_current_span(
137-
"kafka-consumer", span_context=parent_context, kind=SpanKind.CONSUMER
138-
) as span:
139-
span.set_attribute("kafka.access", "poll")
151+
res = None
152+
exception = None
140153

141-
try:
142-
res = wrapped(*args, **kwargs)
143-
if res:
144-
span.set_attribute("kafka.service", res.topic())
145-
except Exception as exc:
146-
span.record_exception(exc)
154+
try:
155+
res = wrapped(*args, **kwargs)
156+
except Exception as exc:
157+
exception = exc
158+
finally:
159+
if res:
160+
create_span("poll", res.topic(), res.headers())
147161
else:
148-
return res
162+
create_span(
163+
"poll",
164+
next(iter(instance.list_topics().topics)),
165+
exception=exception,
166+
)
167+
168+
return res
149169

150170
# Apply the monkey patch
151171
confluent_kafka.Producer = InstanaConfluentKafkaProducer

src/instana/instrumentation/kafka/kafka_python.py

Lines changed: 74 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
# (c) Copyright IBM Corp. 2025
22

33
try:
4-
from typing import TYPE_CHECKING, Any, Callable, Dict, Tuple
4+
import inspect
5+
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Tuple
56

67
import kafka # noqa: F401
78
import wrapt
@@ -37,92 +38,118 @@ def trace_kafka_send(
3738
span.set_attribute("kafka.access", "send")
3839

3940
# context propagation
41+
headers = kwargs.get("headers", [])
4042
tracer.inject(
4143
span.context,
4244
Format.KAFKA_HEADERS,
43-
kwargs.get("headers", {}),
45+
headers,
4446
disable_w3c_trace_context=True,
4547
)
4648

4749
try:
50+
kwargs["headers"] = headers
4851
res = wrapped(*args, **kwargs)
4952
except Exception as exc:
5053
span.record_exception(exc)
5154
else:
5255
return res
5356

54-
@wrapt.patch_function_wrapper("kafka", "KafkaConsumer.__next__")
55-
def trace_kafka_consume(
56-
wrapped: Callable[..., "kafka.KafkaConsumer.__next__"],
57-
instance: "kafka.KafkaConsumer",
58-
args: Tuple[int, str, Tuple[Any, ...]],
59-
kwargs: Dict[str, Any],
60-
) -> "FutureRecordMetadata":
61-
if tracing_is_off():
62-
return wrapped(*args, **kwargs)
63-
57+
def create_span(
58+
span_type: str,
59+
topic: Optional[str],
60+
headers: Optional[List[Tuple[str, bytes]]] = [],
61+
exception: Optional[str] = None,
62+
) -> None:
6463
tracer, parent_span, _ = get_tracer_tuple()
65-
6664
parent_context = (
6765
parent_span.get_span_context()
6866
if parent_span
6967
else tracer.extract(
70-
Format.KAFKA_HEADERS, {}, disable_w3c_trace_context=True
68+
Format.KAFKA_HEADERS,
69+
headers,
70+
disable_w3c_trace_context=True,
7171
)
7272
)
73-
7473
with tracer.start_as_current_span(
7574
"kafka-consumer", span_context=parent_context, kind=SpanKind.CONSUMER
7675
) as span:
77-
topic = list(instance.subscription())[0]
78-
span.set_attribute("kafka.service", topic)
79-
span.set_attribute("kafka.access", "consume")
76+
if topic:
77+
span.set_attribute("kafka.service", topic)
78+
span.set_attribute("kafka.access", span_type)
79+
if exception:
80+
span.record_exception(exception)
8081

81-
try:
82-
res = wrapped(*args, **kwargs)
83-
except Exception as exc:
84-
span.record_exception(exc)
82+
@wrapt.patch_function_wrapper("kafka", "KafkaConsumer.__next__")
83+
def trace_kafka_consume(
84+
wrapped: Callable[..., "kafka.KafkaConsumer.__next__"],
85+
instance: "kafka.KafkaConsumer",
86+
args: Tuple[int, str, Tuple[Any, ...]],
87+
kwargs: Dict[str, Any],
88+
) -> "FutureRecordMetadata":
89+
if tracing_is_off():
90+
return wrapped(*args, **kwargs)
91+
92+
exception = None
93+
res = None
94+
95+
try:
96+
res = wrapped(*args, **kwargs)
97+
except Exception as exc:
98+
exception = exc
99+
finally:
100+
if res:
101+
create_span(
102+
"consume",
103+
res.topic if res else list(instance.subscription())[0],
104+
res.headers,
105+
)
85106
else:
86-
return res
107+
create_span(
108+
"consume", list(instance.subscription())[0], exception=exception
109+
)
110+
111+
return res
87112

88113
@wrapt.patch_function_wrapper("kafka", "KafkaConsumer.poll")
89114
def trace_kafka_poll(
90115
wrapped: Callable[..., "kafka.KafkaConsumer.poll"],
91116
instance: "kafka.KafkaConsumer",
92117
args: Tuple[int, str, Tuple[Any, ...]],
93118
kwargs: Dict[str, Any],
94-
) -> Dict[str, Any]:
119+
) -> Optional[Dict[str, Any]]:
95120
if tracing_is_off():
96121
return wrapped(*args, **kwargs)
97122

98-
tracer, parent_span, _ = get_tracer_tuple()
99-
100123
# The KafkaConsumer.consume() from the kafka-python-ng call the
101124
# KafkaConsumer.poll() internally, so we do not consider it here.
102-
if parent_span and parent_span.name == "kafka-consumer":
125+
if any(
126+
frame.function == "trace_kafka_consume"
127+
for frame in inspect.getouterframes(inspect.currentframe(), 2)
128+
):
103129
return wrapped(*args, **kwargs)
104130

105-
parent_context = (
106-
parent_span.get_span_context()
107-
if parent_span
108-
else tracer.extract(
109-
Format.KAFKA_HEADERS, {}, disable_w3c_trace_context=True
110-
)
111-
)
112-
113-
with tracer.start_as_current_span(
114-
"kafka-consumer", span_context=parent_context, kind=SpanKind.CONSUMER
115-
) as span:
116-
topic = list(instance.subscription())[0]
117-
span.set_attribute("kafka.service", topic)
118-
span.set_attribute("kafka.access", "poll")
119-
120-
try:
121-
res = wrapped(*args, **kwargs)
122-
except Exception as exc:
123-
span.record_exception(exc)
131+
exception = None
132+
res = None
133+
134+
try:
135+
res = wrapped(*args, **kwargs)
136+
except Exception as exc:
137+
exception = exc
138+
finally:
139+
if res:
140+
for partition, consumer_records in res.items():
141+
for message in consumer_records:
142+
create_span(
143+
"poll",
144+
partition.topic,
145+
message.headers if hasattr(message, "headers") else [],
146+
)
124147
else:
125-
return res
148+
create_span(
149+
"poll", list(instance.subscription())[0], exception=exception
150+
)
151+
152+
return res
126153

127154
logger.debug("Instrumenting Kafka (kafka-python)")
128155
except ImportError:

0 commit comments

Comments
 (0)