Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 34 additions & 24 deletions kafka/admin/_records.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,29 +118,20 @@ def delete_records(self, records_to_delete, timeout_ms=None, partition_leader_id
"""
return self._manager.run(self._async_delete_records, records_to_delete, timeout_ms, partition_leader_id)

@staticmethod
def _convert_topic_partitions(topic_partitions):
return [
(
topic,
partitions
)
for topic, partitions in topic_partitions.items()
]

def _get_all_topic_partitions(self):
def _get_all_topic_partitions(self, topics=None):
return [
(
topic['name'],
[p['partition_index'] for p in topic['partitions']]
)
for topic in self.describe_topics()
for topic in self.describe_topics(topics)
]

def _get_topic_partitions(self, topic_partitions):
if topic_partitions is None:
return self._get_all_topic_partitions()
return self._convert_topic_partitions(topic_partitions)
if isinstance(topic_partitions, dict):
return topic_partitions.items()
else:
return self._get_all_topic_partitions(topic_partitions)

def perform_leader_election(self, election_type, topic_partitions=None, timeout_ms=None, raise_errors=True):
"""Trigger leader election for the specified topic partitions.
Expand All @@ -149,8 +140,11 @@ def perform_leader_election(self, election_type, topic_partitions=None, timeout_
election_type: Type of election to attempt. 0 for Preferred, 1 for Unclean

Keyword Arguments:
topic_partitions (dict): A map of topic name strings to partition ids list.
By default, will run on all topic partitions
topic_partitions (dict, list, optional):
Either: dict of {topic_name: [partition ids]}.
Or: list of [topic_name], and election will run on all partitions for topic.
Or: None, and election runs against all topics / all partitions.
Default: None
timeout_ms (num, optional): Milliseconds to wait for the leader election process.
raise_errors (bool, optional): Whether to raise errors as exceptions. Default True.

Expand All @@ -172,14 +166,30 @@ def response_errors(r):
ignore_errors = (Errors.ElectionNotNeededError,)
return self._manager.run(self._send_request_to_controller, request, response_errors, raise_errors, ignore_errors)

async def _async_describe_log_dirs(self):
version = self._client.api_version(DescribeLogDirsRequest, max_version=0)
return await self._manager.send(DescribeLogDirsRequest[version]())

def describe_log_dirs(self):
async def _async_describe_log_dirs(self, topic_partitions=(), brokers=None):
request = DescribeLogDirsRequest(topics=topic_partitions)
responses = []
if brokers is None:
brokers = [broker.node_id for broker in self._manager.cluster.brokers()]
for node_id in brokers:
response = await self._manager.send(request, node_id=node_id)
responses.append({"broker": node_id, "log_dirs": [result.to_dict() for result in response.results]})
return responses

def describe_log_dirs(self, topic_partitions=None, brokers=None):
"""Send a DescribeLogDirsRequest request to a broker.

Keyword Arguments:
topic_partitions (dict, list, optional):
Either: dict of {topic_name: [partition ids]}.
Or: list of [topic_name], to query all partitions for topic.
Or: None, to query all topics / all partitions.
Default: None
brokers (list, optional): List of [node_id] for brokers to query.
If None, query is sent to all brokers. Default: None

Returns:
DescribeLogDirsResponse object
list of dicts, containing per-broker log-dir data
"""
return self._manager.run(self._async_describe_log_dirs)
topic_partitions = self._get_topic_partitions(topic_partitions)
return self._manager.run(self._async_describe_log_dirs, topic_partitions, brokers)
4 changes: 3 additions & 1 deletion kafka/cli/admin/log_dirs/describe.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,6 @@ class DescribeLogDirs:
@classmethod
def add_subparser(cls, subparsers):
parser = subparsers.add_parser('describe', help='Get topic log directories for brokers')
parser.set_defaults(command=lambda cli, _args: cli.describe_log_dirs())
parser.add_argument('-b', '--broker', type=int, action='append', dest='brokers', help='Query specific broker(s)')
parser.add_argument('-t', '--topic', type=str, action='append', dest='topics', help='Get additional data about specific topic(s)')
parser.set_defaults(command=lambda cli, args: cli.describe_log_dirs(topic_partitions=args.topics, brokers=args.brokers))
13 changes: 13 additions & 0 deletions test/integration/test_admin_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -469,3 +469,16 @@ def test_perform_leader_election(kafka_admin_client, topic):
partition_set.remove(partition[0])
assert partition[1] == ElectionNotNeededError.errno
assert partition_set == set()


@pytest.mark.skipif(env_kafka_version() < (1, 0), reason="DescribeLogDirsRequest requires broker >= 1.0")
def test_describe_log_dirs(kafka_admin_client):
log_dirs = kafka_admin_client.describe_log_dirs()
assert log_dirs
broker_map = {result['broker']: result for result in log_dirs}
for broker in kafka_admin_client._manager.cluster.brokers():
assert broker.node_id in broker_map
assert len(broker_map[broker.node_id]['log_dirs']) > 0
for log_dir in broker_map[broker.node_id]['log_dirs']:
assert 'log_dir' in log_dir
assert log_dir['error_code'] == 0
Loading