@@ -217,6 +217,8 @@ def __init__(self, **configs):
217217 metric_group_prefix = 'admin' ,
218218 ** self .config
219219 )
220+ # Goal: migrate all self._client calls -> self._manager (skipping compat layer)
221+ self ._manager = self ._client ._manager
220222
221223 # Get auto-discovered version from client if necessary
222224 self .config ['api_version' ] = self ._client .get_broker_version (timeout_ms = self .config ['api_version_auto_timeout_ms' ])
@@ -607,33 +609,18 @@ def _process_acl_operations(self, obj):
607609 obj ['authorized_operations' ] = list (map (lambda acl : acl .name , valid_acl_operations (obj ['authorized_operations' ])))
608610 return obj
609611
610- def _get_cluster_metadata (self , topics = None , auto_topic_creation = False ):
611- """
612- topics == None means "get all topics"
613- """
614- version = self ._client .api_version (MetadataRequest , max_version = 8 )
615- if version <= 3 :
616- if auto_topic_creation :
617- raise IncompatibleBrokerVersion (
618- "auto_topic_creation requires MetadataRequest >= v4, which"
619- " is not supported by Kafka {}"
620- .format (self .config ['api_version' ]))
621-
622- request = MetadataRequest [version ](topics = topics )
623- elif version <= 7 :
624- request = MetadataRequest [version ](
625- topics = topics ,
626- allow_auto_topic_creation = auto_topic_creation
627- )
628- else :
629- request = MetadataRequest [version ](
630- topics = topics ,
631- allow_auto_topic_creation = auto_topic_creation ,
632- include_cluster_authorized_operations = True ,
633- include_topic_authorized_operations = True ,
634- )
635-
636- metadata = self .send_request (request ).to_dict ()
612+ async def _get_cluster_metadata (self , topics ):
613+ """topics = [] for no topics, None for all."""
614+ request = MetadataRequest (
615+ topics = [
616+ MetadataRequest .MetadataRequestTopic (name = topic )
617+ for topic in topics ] if topics is not None else None ,
618+ allow_auto_topic_creation = False ,
619+ include_cluster_authorized_operations = True ,
620+ include_topic_authorized_operations = True ,
621+ )
622+ response = await self ._manager .send (request )
623+ metadata = response .to_dict ()
637624 self ._process_acl_operations (metadata )
638625 for topic in metadata ['topics' ]:
639626 self ._process_acl_operations (topic )
@@ -645,7 +632,7 @@ def list_topics(self):
645632 Returns:
646633 A list of topic name strings.
647634 """
648- metadata = self ._get_cluster_metadata ( topics = None )
635+ metadata = self ._manager . run ( self . _get_cluster_metadata , None ) # None => request all topics
649636 return [t ['name' ] for t in metadata ['topics' ]]
650637
651638 def describe_topics (self , topics = None ):
@@ -658,7 +645,7 @@ def describe_topics(self, topics=None):
658645 Returns:
659646 A list of dicts describing each topic (including partition info).
660647 """
661- metadata = self ._get_cluster_metadata ( topics = topics )
648+ metadata = self ._manager . run ( self . _get_cluster_metadata , topics )
662649 return metadata ['topics' ]
663650
664651 def describe_cluster (self ):
@@ -668,7 +655,7 @@ def describe_cluster(self):
668655 Returns:
669656 A dict with cluster-wide metadata, excluding topic details.
670657 """
671- metadata = self ._get_cluster_metadata ()
658+ metadata = self ._manager . run ( self . _get_cluster_metadata , []) # [] => no topics
672659 metadata .pop ('topics' ) # We have 'describe_topics' for this
673660 return metadata
674661
@@ -1174,7 +1161,7 @@ def _get_leader_for_partitions(self, partitions, timeout_ms=None):
11741161 partitions = set (partitions )
11751162 topics = set (tp .topic for tp in partitions )
11761163
1177- metadata = self ._get_cluster_metadata ( topics = topics )
1164+ metadata = self ._manager . run ( self . _get_cluster_metadata , topics )
11781165
11791166 leader2partitions = defaultdict (list )
11801167 valid_partitions = set ()
0 commit comments