Skip to content

Commit e9e789d

Browse files
dpkpclaude
andauthored
AdminClient: wait_for_topics() and create_topics() wait_for_metadata option (#2856)
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 681fe94 commit e9e789d

2 files changed

Lines changed: 237 additions & 3 deletions

File tree

kafka/admin/client.py

Lines changed: 93 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -448,7 +448,8 @@ def _convert_new_topic_request(new_topic):
448448
]
449449
)
450450

451-
def create_topics(self, new_topics, timeout_ms=None, validate_only=False, raise_errors=True):
451+
def create_topics(self, new_topics, timeout_ms=None, validate_only=False, raise_errors=True,
452+
wait_for_metadata=False):
452453
"""Create new topics in the cluster.
453454
454455
Arguments:
@@ -460,10 +461,20 @@ def create_topics(self, new_topics, timeout_ms=None, validate_only=False, raise_
460461
validate_only (bool, optional): If True, don't actually create new topics.
461462
Not supported by all versions. Default: False
462463
raise_errors (bool, optional): Whether to raise errors as exceptions. Default True.
464+
wait_for_metadata (bool, optional): If True, after the broker successfully
465+
accepts the create request, block until each new topic is visible in
466+
broker metadata with a leader assigned for every partition. Useful on
467+
KRaft clusters, where CreateTopicsResponse returning NoError does not
468+
guarantee that a subsequent MetadataRequest will see the topic. Has no
469+
effect when ``validate_only`` is True. Uses a fixed 10-second timeout;
470+
call :meth:`wait_for_topics` directly for finer control.
471+
Mutually exclusive with validate_only. Default: False
463472
464473
Returns:
465474
Appropriate version of CreateTopicResponse class.
466475
"""
476+
if validate_only and wait_for_metadata:
477+
raise ValueError('validate_only and wait_for_metadata are mutually exclusive')
467478
version = self._client.api_version(CreateTopicsRequest, max_version=3)
468479
timeout_ms = self._validate_timeout(timeout_ms)
469480
if version == 0:
@@ -487,7 +498,87 @@ def create_topics(self, new_topics, timeout_ms=None, validate_only=False, raise_
487498
def get_response_errors(r):
488499
for topic in r.topics:
489500
yield Errors.for_code(topic[1])
490-
return self._send_request_to_controller(request, get_errors_fn=get_response_errors, raise_errors=raise_errors)
501+
response = self._send_request_to_controller(request, get_errors_fn=get_response_errors, raise_errors=raise_errors)
502+
if wait_for_metadata: # implies not validate_only
503+
self.wait_for_topics([new_topic.name for new_topic in new_topics])
504+
return response
505+
506+
def wait_for_topics(self, topic_names, timeout_ms=10000):
507+
"""Block until each of the given topics is ready to use.
508+
509+
CreateTopicsResponse only confirms that the broker accepted the create
510+
request; propagating the new topics into the broker's metadata cache --
511+
and electing a leader for every partition -- can lag behind, especially
512+
on KRaft clusters. This method polls :meth:`describe_topics` at a fixed
513+
interval until every requested topic both:
514+
515+
- is returned with ``error_code == 0`` (topic exists and is
516+
visible in metadata), and
517+
- has ``error_code == 0`` and a leader assigned (``leader_id >= 0``)
518+
for every partition.
519+
520+
Useful after :meth:`create_topics` (including implicit creation via
521+
``allow_auto_topic_creation``) or after a delete+recreate sequence.
522+
523+
Arguments:
524+
topic_names ([str]): Topic names to wait for.
525+
526+
Keyword Arguments:
527+
timeout_ms (numeric, optional): Maximum milliseconds to wait.
528+
Default: 10000.
529+
530+
Raises:
531+
KafkaTimeoutError: if any topic is still not ready when the
532+
deadline expires. The exception message includes the
533+
per-topic state from the final ``describe_topics`` call.
534+
"""
535+
if not topic_names:
536+
return
537+
topic_names = list(topic_names)
538+
deadline = time.monotonic() + (timeout_ms / 1000.0)
539+
pending = {name: 'not yet queried' for name in topic_names}
540+
while True:
541+
try:
542+
topics = self.describe_topics(topics=topic_names)
543+
except Exception as exc:
544+
log.debug('describe_topics failed while waiting for topic visibility: %s', exc)
545+
topics = []
546+
by_name = {t.get('name'): t for t in topics}
547+
pending = {}
548+
for name in topic_names:
549+
reason = self._topic_not_ready_reason(by_name.get(name))
550+
if reason is not None:
551+
pending[name] = reason
552+
if not pending:
553+
return
554+
if time.monotonic() >= deadline:
555+
raise Errors.KafkaTimeoutError(
556+
'Topics not ready after %sms: %s' % (timeout_ms, pending))
557+
time.sleep(0.1)
558+
559+
@staticmethod
560+
def _topic_not_ready_reason(topic_info):
561+
"""Return a string reason if ``topic_info`` isn't ready, else None."""
562+
if topic_info is None:
563+
return 'missing from metadata response'
564+
error_code = topic_info.get('error_code', 0)
565+
if error_code != 0:
566+
return Errors.for_code(error_code).__name__
567+
partitions = topic_info.get('partitions') or []
568+
if not partitions:
569+
return 'no partitions reported'
570+
bad = []
571+
for p in partitions:
572+
p_err = p.get('error_code', 0)
573+
idx = p.get('partition_index')
574+
if p_err != 0:
575+
bad.append('p%s=%s' % (idx, Errors.for_code(p_err).__name__))
576+
continue
577+
if p.get('leader_id', -1) < 0:
578+
bad.append('p%s=no leader' % idx)
579+
if bad:
580+
return ','.join(bad)
581+
return None
491582

492583
def delete_topics(self, topics, timeout_ms=None, raise_errors=True):
493584
"""Delete topics from the cluster.

test/admin/test_admin.py

Lines changed: 144 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
import pytest
22

33
import kafka.admin
4-
from kafka.errors import IllegalArgumentError
4+
from kafka.admin.client import KafkaAdminClient
5+
from kafka.errors import IllegalArgumentError, KafkaTimeoutError, UnknownTopicOrPartitionError
56

67

78
def test_config_resource():
@@ -92,3 +93,145 @@ def test_new_topic():
9293
assert good_topic.replication_factor == -1
9394
assert good_topic.replica_assignments == {1: [1, 2, 3]}
9495
assert good_topic.topic_configs == {'key': 'value'}
96+
97+
98+
# ---------------------------------------------------------------------------
99+
# _topic_not_ready_reason (pure function, no network)
100+
# ---------------------------------------------------------------------------
101+
102+
103+
def _ready_topic(name='foo', num_partitions=1):
104+
return {
105+
'name': name,
106+
'error_code': 0,
107+
'partitions': [
108+
{'error_code': 0, 'partition_index': i, 'leader_id': 0}
109+
for i in range(num_partitions)
110+
],
111+
}
112+
113+
114+
def test_topic_not_ready_reason_missing():
115+
assert KafkaAdminClient._topic_not_ready_reason(None) == 'missing from metadata response'
116+
117+
118+
def test_topic_not_ready_reason_topic_error():
119+
assert KafkaAdminClient._topic_not_ready_reason(
120+
{'name': 'foo', 'error_code': 3, 'partitions': []}
121+
) == 'UnknownTopicOrPartitionError'
122+
123+
124+
def test_topic_not_ready_reason_no_partitions():
125+
assert KafkaAdminClient._topic_not_ready_reason(
126+
{'name': 'foo', 'error_code': 0, 'partitions': []}
127+
) == 'no partitions reported'
128+
129+
130+
def test_topic_not_ready_reason_no_leader():
131+
assert KafkaAdminClient._topic_not_ready_reason(
132+
{'name': 'foo', 'error_code': 0, 'partitions': [
133+
{'error_code': 0, 'partition_index': 0, 'leader_id': -1},
134+
{'error_code': 0, 'partition_index': 1, 'leader_id': 0},
135+
]}
136+
) == 'p0=no leader'
137+
138+
139+
def test_topic_not_ready_reason_partition_error():
140+
assert KafkaAdminClient._topic_not_ready_reason(
141+
{'name': 'foo', 'error_code': 0, 'partitions': [
142+
{'error_code': 5, 'partition_index': 0, 'leader_id': -1},
143+
]}
144+
) == 'p0=LeaderNotAvailableError'
145+
146+
147+
def test_topic_not_ready_reason_partial_partition_errors():
148+
# Multiple partitions each with their own issue -> all reasons joined.
149+
reason = KafkaAdminClient._topic_not_ready_reason(
150+
{'name': 'foo', 'error_code': 0, 'partitions': [
151+
{'error_code': 0, 'partition_index': 0, 'leader_id': -1},
152+
{'error_code': 5, 'partition_index': 1, 'leader_id': -1},
153+
]}
154+
)
155+
assert 'p0=no leader' in reason
156+
assert 'p1=LeaderNotAvailableError' in reason
157+
158+
159+
def test_topic_not_ready_reason_ready():
160+
assert KafkaAdminClient._topic_not_ready_reason(_ready_topic()) is None
161+
162+
163+
# ---------------------------------------------------------------------------
164+
# wait_for_topics (mocks describe_topics; does not hit the network)
165+
# ---------------------------------------------------------------------------
166+
167+
168+
def _bare_admin():
169+
"""Return a KafkaAdminClient instance without running __init__ (which
170+
would try to bootstrap a real broker). All attributes needed by the
171+
method under test are provided by the test.
172+
"""
173+
return object.__new__(KafkaAdminClient)
174+
175+
176+
def test_wait_for_topics_empty_list_returns_immediately():
177+
admin = _bare_admin()
178+
# No describe_topics monkey-patch: if it were called the test would
179+
# crash with AttributeError, proving the empty-list fast path.
180+
admin.wait_for_topics([])
181+
182+
183+
def test_wait_for_topics_ready_on_first_call(monkeypatch):
184+
admin = _bare_admin()
185+
calls = []
186+
def fake_describe_topics(topics):
187+
calls.append(topics)
188+
return [_ready_topic(name=t, num_partitions=2) for t in topics]
189+
monkeypatch.setattr(admin, 'describe_topics', fake_describe_topics)
190+
admin.wait_for_topics(['foo', 'bar'], timeout_ms=1000)
191+
assert len(calls) == 1
192+
assert set(calls[0]) == {'foo', 'bar'}
193+
194+
195+
def test_wait_for_topics_becomes_ready_after_retry(monkeypatch):
196+
admin = _bare_admin()
197+
responses = [
198+
# First call: topic missing
199+
[],
200+
# Second call: topic exists but no leader yet
201+
[{'name': 'foo', 'error_code': 0, 'partitions': [
202+
{'error_code': 0, 'partition_index': 0, 'leader_id': -1}]}],
203+
# Third call: ready
204+
[_ready_topic(name='foo')],
205+
]
206+
def fake_describe_topics(topics):
207+
return responses.pop(0)
208+
monkeypatch.setattr(admin, 'describe_topics', fake_describe_topics)
209+
admin.wait_for_topics(['foo'], timeout_ms=5000)
210+
assert responses == [] # all three calls consumed
211+
212+
213+
def test_wait_for_topics_timeout(monkeypatch):
214+
admin = _bare_admin()
215+
def fake_describe_topics(topics):
216+
return [{'name': 'foo', 'error_code': 3, 'partitions': []}]
217+
monkeypatch.setattr(admin, 'describe_topics', fake_describe_topics)
218+
with pytest.raises(KafkaTimeoutError) as exc_info:
219+
admin.wait_for_topics(['foo'], timeout_ms=200)
220+
assert 'foo' in str(exc_info.value)
221+
assert 'UnknownTopicOrPartitionError' in str(exc_info.value)
222+
223+
224+
def test_wait_for_topics_describe_exception_keeps_retrying(monkeypatch):
225+
"""A transient exception from describe_topics should be logged and
226+
retried, not propagated - otherwise a flaky broker could turn a
227+
recoverable wait into a hard failure."""
228+
admin = _bare_admin()
229+
state = {'calls': 0}
230+
def fake_describe_topics(topics):
231+
state['calls'] += 1
232+
if state['calls'] == 1:
233+
raise RuntimeError('transient')
234+
return [_ready_topic(name=t) for t in topics]
235+
monkeypatch.setattr(admin, 'describe_topics', fake_describe_topics)
236+
admin.wait_for_topics(['foo'], timeout_ms=5000)
237+
assert state['calls'] == 2

0 commit comments

Comments
 (0)