Skip to content

Commit 92d8de4

Browse files
CagriYoncapvital
andcommitted
fix: Kafka context propagation
Signed-off-by: Cagri Yonca <cagri@ibm.com> Co-authored-by: Paulo Vital <paulo.vital@ibm.com>
1 parent 306df0d commit 92d8de4

File tree

5 files changed

+339
-156
lines changed

5 files changed

+339
-156
lines changed

src/instana/instrumentation/kafka/confluent_kafka_python.py

+56-38
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,13 @@ def trace_kafka_produce(
6666
span.set_attribute("kafka.access", "produce")
6767

6868
# context propagation
69-
headers = args[6] if len(args) > 6 else kwargs.get("headers", {})
69+
#
70+
# As stated in the official documentation at
71+
# https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#pythonclient-producer,
72+
# headers can be either a list of (key, value) pairs or a
73+
# dictionary. To maintain compatibility with the headers for the
74+
# Kafka Python library, we will use a list of tuples.
75+
headers = args[6] if len(args) > 6 else kwargs.get("headers", [])
7076
tracer.inject(
7177
span.context,
7278
Format.KAFKA_HEADERS,
@@ -91,28 +97,34 @@ def trace_kafka_consume(
9197
return wrapped(*args, **kwargs)
9298

9399
tracer, parent_span, _ = get_tracer_tuple()
94-
95-
parent_context = (
96-
parent_span.get_span_context()
97-
if parent_span
98-
else tracer.extract(
99-
Format.KAFKA_HEADERS, {}, disable_w3c_trace_context=True
100+
exception = None
101+
res = []
102+
103+
try:
104+
res = wrapped(*args, **kwargs)
105+
except Exception as exc:
106+
exception = exc
107+
finally:
108+
headers = res[0].headers() if res and len(res) > 0 else []
109+
parent_context = (
110+
parent_span.get_span_context()
111+
if parent_span
112+
else tracer.extract(
113+
Format.KAFKA_HEADERS, headers, disable_w3c_trace_context=True
114+
)
100115
)
101-
)
102-
103-
with tracer.start_as_current_span(
104-
"kafka-consumer", span_context=parent_context, kind=SpanKind.CONSUMER
105-
) as span:
106-
span.set_attribute("kafka.access", "consume")
107116

108-
try:
109-
res = wrapped(*args, **kwargs)
110-
if isinstance(res, list) and len(res) > 0:
117+
with tracer.start_as_current_span(
118+
"kafka-consumer", span_context=parent_context, kind=SpanKind.CONSUMER
119+
) as span:
120+
span.set_attribute("kafka.access", "consume")
121+
if res and isinstance(res, list) and len(res) > 0:
111122
span.set_attribute("kafka.service", res[0].topic())
112-
except Exception as exc:
113-
span.record_exception(exc)
114-
else:
115-
return res
123+
124+
if exception:
125+
span.record_exception(exception)
126+
127+
return res
116128

117129
def trace_kafka_poll(
118130
wrapped: Callable[..., InstanaConfluentKafkaConsumer.poll],
@@ -124,28 +136,34 @@ def trace_kafka_poll(
124136
return wrapped(*args, **kwargs)
125137

126138
tracer, parent_span, _ = get_tracer_tuple()
127-
128-
parent_context = (
129-
parent_span.get_span_context()
130-
if parent_span
131-
else tracer.extract(
132-
Format.KAFKA_HEADERS, {}, disable_w3c_trace_context=True
139+
exception = None
140+
res = None
141+
142+
try:
143+
res = wrapped(*args, **kwargs)
144+
except Exception as exc:
145+
exception = exc
146+
finally:
147+
headers = res.headers() if res else []
148+
parent_context = (
149+
parent_span.get_span_context()
150+
if parent_span
151+
else tracer.extract(
152+
Format.KAFKA_HEADERS, headers, disable_w3c_trace_context=True
153+
)
133154
)
134-
)
135-
136-
with tracer.start_as_current_span(
137-
"kafka-consumer", span_context=parent_context, kind=SpanKind.CONSUMER
138-
) as span:
139-
span.set_attribute("kafka.access", "poll")
140155

141-
try:
142-
res = wrapped(*args, **kwargs)
156+
with tracer.start_as_current_span(
157+
"kafka-consumer", span_context=parent_context, kind=SpanKind.CONSUMER
158+
) as span:
159+
span.set_attribute("kafka.access", "poll")
143160
if res:
144161
span.set_attribute("kafka.service", res.topic())
145-
except Exception as exc:
146-
span.record_exception(exc)
147-
else:
148-
return res
162+
163+
if exception:
164+
span.record_exception(exception)
165+
166+
return res
149167

150168
# Apply the monkey patch
151169
confluent_kafka.Producer = InstanaConfluentKafkaProducer

src/instana/instrumentation/kafka/kafka_python.py

+59-43
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# (c) Copyright IBM Corp. 2025
22

33
try:
4-
from typing import TYPE_CHECKING, Any, Callable, Dict, Tuple
4+
from typing import TYPE_CHECKING, Any, Callable, Dict, Tuple, Optional
55

66
import kafka # noqa: F401
77
import wrapt
@@ -37,14 +37,16 @@ def trace_kafka_send(
3737
span.set_attribute("kafka.access", "send")
3838

3939
# context propagation
40+
headers = kwargs.get("headers", [])
4041
tracer.inject(
4142
span.context,
4243
Format.KAFKA_HEADERS,
43-
kwargs.get("headers", {}),
44+
headers,
4445
disable_w3c_trace_context=True,
4546
)
4647

4748
try:
49+
kwargs["headers"] = headers
4850
res = wrapped(*args, **kwargs)
4951
except Exception as exc:
5052
span.record_exception(exc)
@@ -62,36 +64,42 @@ def trace_kafka_consume(
6264
return wrapped(*args, **kwargs)
6365

6466
tracer, parent_span, _ = get_tracer_tuple()
65-
66-
parent_context = (
67-
parent_span.get_span_context()
68-
if parent_span
69-
else tracer.extract(
70-
Format.KAFKA_HEADERS, {}, disable_w3c_trace_context=True
67+
exception = None
68+
res = None
69+
70+
try:
71+
res = wrapped(*args, **kwargs)
72+
except Exception as exc:
73+
exception = exc
74+
finally:
75+
headers = res.headers if res else []
76+
parent_context = (
77+
parent_span.get_span_context()
78+
if parent_span
79+
else tracer.extract(
80+
Format.KAFKA_HEADERS, headers, disable_w3c_trace_context=True,
81+
)
7182
)
72-
)
73-
74-
with tracer.start_as_current_span(
75-
"kafka-consumer", span_context=parent_context, kind=SpanKind.CONSUMER
76-
) as span:
77-
topic = list(instance.subscription())[0]
78-
span.set_attribute("kafka.service", topic)
79-
span.set_attribute("kafka.access", "consume")
80-
81-
try:
82-
res = wrapped(*args, **kwargs)
83-
except Exception as exc:
84-
span.record_exception(exc)
85-
else:
86-
return res
83+
with tracer.start_as_current_span(
84+
"kafka-consumer", span_context=parent_context, kind=SpanKind.CONSUMER
85+
) as span:
86+
span.set_attribute(
87+
"kafka.service",
88+
res.topic if res else list(instance.subscription())[0]
89+
)
90+
span.set_attribute("kafka.access", "consume")
91+
92+
if exception:
93+
span.record_exception(exception)
94+
return res
8795

8896
@wrapt.patch_function_wrapper("kafka", "KafkaConsumer.poll")
8997
def trace_kafka_poll(
9098
wrapped: Callable[..., "kafka.KafkaConsumer.poll"],
9199
instance: "kafka.KafkaConsumer",
92100
args: Tuple[int, str, Tuple[Any, ...]],
93101
kwargs: Dict[str, Any],
94-
) -> Dict[str, Any]:
102+
) -> Optional[Dict[str, Any]]:
95103
if tracing_is_off():
96104
return wrapped(*args, **kwargs)
97105

@@ -102,27 +110,35 @@ def trace_kafka_poll(
102110
if parent_span and parent_span.name == "kafka-consumer":
103111
return wrapped(*args, **kwargs)
104112

105-
parent_context = (
106-
parent_span.get_span_context()
107-
if parent_span
108-
else tracer.extract(
109-
Format.KAFKA_HEADERS, {}, disable_w3c_trace_context=True
113+
exception = None
114+
res = None
115+
116+
try:
117+
res = wrapped(*args, **kwargs)
118+
except Exception as exc:
119+
exception = exc
120+
finally:
121+
headers = res.headers if res else []
122+
parent_context = (
123+
parent_span.get_span_context()
124+
if parent_span
125+
else tracer.extract(
126+
Format.KAFKA_HEADERS, headers, disable_w3c_trace_context=True
127+
)
110128
)
111-
)
112129

113-
with tracer.start_as_current_span(
114-
"kafka-consumer", span_context=parent_context, kind=SpanKind.CONSUMER
115-
) as span:
116-
topic = list(instance.subscription())[0]
117-
span.set_attribute("kafka.service", topic)
118-
span.set_attribute("kafka.access", "poll")
119-
120-
try:
121-
res = wrapped(*args, **kwargs)
122-
except Exception as exc:
123-
span.record_exception(exc)
124-
else:
125-
return res
130+
with tracer.start_as_current_span(
131+
"kafka-consumer", span_context=parent_context, kind=SpanKind.CONSUMER
132+
) as span:
133+
span.set_attribute(
134+
"kafka.service",
135+
res.topic if res else list(instance.subscription())[0],
136+
)
137+
span.set_attribute("kafka.access", "poll")
138+
139+
if exception:
140+
span.record_exception(exception)
141+
return res
126142

127143
logger.debug("Instrumenting Kafka (kafka-python)")
128144
except ImportError:

0 commit comments

Comments
 (0)