Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 11 additions & 26 deletions kafka/admin/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,25 +331,17 @@ def _find_coordinator_ids(self, group_ids):
coordinator_ids = self.send_requests(requests, response_fn=self._find_coordinator_id_process_response)
return dict(zip(group_ids, coordinator_ids))

def _send_request_to_node(self, node_id, request, wakeup=True):
def _send_request_to_node(self, node_id, request):
"""Send a Kafka protocol message to a specific broker.

Arguments:
node_id: The broker id to which to send the message.
request: The message to send.


Keyword Arguments:
wakeup (bool, optional): Optional flag to disable thread-wakeup.

Returns:
A future object that may be polled for status and results.
"""
try:
self._client.await_ready(node_id)
except Errors.KafkaConnectionError as e:
return Future().failure(e)
return self._client.send(node_id, request, wakeup)
return self._manager.send(request, node_id=node_id)

def _wait_for_futures(self, futures):
"""Block until all futures complete. If any fail, raise the encountered exception.
Expand All @@ -360,28 +352,21 @@ def _wait_for_futures(self, futures):
Raises:
The first encountered exception if a future fails.
"""
while not all(future.succeeded() for future in futures):
for future in futures:
self._client.poll(future=future)

if future.failed():
raise future.exception # pylint: disable-msg=raising-bad-type
failed = None
for future in futures:
self._manager.poll(future=future)
if failed is None and future.failed():
failed = future
if failed:
raise failed.exception # pylint: disable-msg=raising-bad-type

def send_request(self, request, node_id=None):
if node_id is None:
node_id = self._client.least_loaded_node(bootstrap_fallback=True)
self._client.await_ready(node_id)
future = self._client.send(node_id, request)
self._wait_for_futures([future]) # raises exception on failure
return future.value
return self._manager.run(self._manager.send(request, node_id=node_id))

def send_requests(self, requests_and_node_ids, response_fn=lambda x: x):
futures = []
for request, node_id in requests_and_node_ids:
if node_id is None:
node_id = self._client.least_loaded_node(bootstrap_fallback=True)
self._client.await_ready(node_id)
futures.append(self._client.send(node_id, request))
futures.append(self._manager.send(request, node_id=node_id))
self._wait_for_futures(futures)
return [response_fn(future.value) for future in futures]

Expand Down
Loading