From 472b4b32efaf749e78336e2212f6b1047c334355 Mon Sep 17 00:00:00 2001 From: AlvinRamoutar Date: Sun, 3 May 2026 19:59:58 -0400 Subject: [PATCH] feat: az network vnet list-available-cidrs/list-used-cidrs --- .../cli/command_modules/network/_format.py | 31 +++ .../cli/command_modules/network/_help.py | 20 ++ .../cli/command_modules/network/_params.py | 8 + .../cli/command_modules/network/commands.py | 6 +- .../cli/command_modules/network/custom.py | 177 ++++++++++++++++++ .../tests/latest/test_network_commands.py | 128 +++++++++++++ 6 files changed, 369 insertions(+), 1 deletion(-) diff --git a/src/azure-cli/azure/cli/command_modules/network/_format.py b/src/azure-cli/azure/cli/command_modules/network/_format.py index b31da34a523..bbe442cf58a 100644 --- a/src/azure-cli/azure/cli/command_modules/network/_format.py +++ b/src/azure-cli/azure/cli/command_modules/network/_format.py @@ -71,6 +71,37 @@ def transform_local_gateway_table_output(result): return final_result +def transform_vnet_available_cidrs_table_output(result): + rows = [] + for entry in result: + address_prefix = entry['addressPrefixes'] + for cidr in entry.get('availableCIDRs', []): + item = OrderedDict() + item['AddressPrefix'] = address_prefix + item['CIDRAddress'] = cidr['cidrAddress'] + item['StartingAddress'] = cidr['startingAddress'] + item['EndingAddress'] = cidr['endingAddress'] + item['UsableIPs'] = cidr['usableIPs'] + rows.append(item) + return rows + + +def transform_vnet_used_cidrs_table_output(result): + rows = [] + for entry in result: + address_prefix = entry['addressPrefixes'] + for cidr in entry.get('usedCIDRs', []): + item = OrderedDict() + item['AddressPrefix'] = address_prefix + item['CIDRAddress'] = cidr['cidrAddress'] + item['StartingAddress'] = cidr['startingAddress'] + item['EndingAddress'] = cidr['endingAddress'] + item['SubnetName'] = cidr['subnetName'] + item['UsableIPs'] = cidr['usableIPs'] + rows.append(item) + return rows + + def transform_vnet_table_output(result): def _transform(result): diff --git a/src/azure-cli/azure/cli/command_modules/network/_help.py b/src/azure-cli/azure/cli/command_modules/network/_help.py index 234dfbd4ca1..29d2faa5d7a 100644 --- a/src/azure-cli/azure/cli/command_modules/network/_help.py +++ b/src/azure-cli/azure/cli/command_modules/network/_help.py @@ -5089,6 +5089,26 @@ text: az network vnet list-available-ips -g MyResourceGroup -n MyVNet """ +helps['network vnet list-available-cidrs'] = """ +type: command +short-summary: List available CIDRs in an IPv4 VNET. +examples: + - name: List available CIDRs in an IPv4 VNET for all address spaces. + text: az network vnet list-available-cidrs -g MyResourceGroup -n MyVNet + - name: List available CIDRs in an IPv4 VNET for a specific address space. + text: az network vnet list-available-cidrs -g MyResourceGroup -n MyVNet --address-prefixes "10.0.0.0/24" +""" + +helps['network vnet list-used-cidrs'] = """ +type: command +short-summary: List used CIDRs in an IPv4 VNET. +examples: + - name: List used CIDRs in an IPv4 VNET for all address spaces. + text: az network vnet list-used-cidrs -g MyResourceGroup -n MyVNet + - name: List used CIDRs in an IPv4 VNET for a specific address space. + text: az network vnet list-used-cidrs -g MyResourceGroup -n MyVNet --address-prefixes "10.0.0.0/24" +""" + helps['network vnet subnet create'] = """ type: command short-summary: Create a subnet and associate an existing NSG and route table. diff --git a/src/azure-cli/azure/cli/command_modules/network/_params.py b/src/azure-cli/azure/cli/command_modules/network/_params.py index 28c58258c94..c1a4ae9346d 100644 --- a/src/azure-cli/azure/cli/command_modules/network/_params.py +++ b/src/azure-cli/azure/cli/command_modules/network/_params.py @@ -708,6 +708,14 @@ def load_arguments(self, _): with self.argument_context('network vnet delete') as c: c.argument('virtual_network_name', local_context_attribute=None) + with self.argument_context('network vnet list-available-cidrs') as c: + c.argument('address_prefixes', nargs='+', options_list=['--address-prefixes'], + help='Space-separated list of VNet address prefixes to filter results by.', metavar='PREFIX') + + with self.argument_context('network vnet list-used-cidrs') as c: + c.argument('address_prefixes', nargs='+', options_list=['--address-prefixes'], + help='Space-separated list of VNet address prefixes to filter results by.', metavar='PREFIX') + with self.argument_context('network vnet peering') as c: c.argument('virtual_network_name', virtual_network_name_type) c.argument('virtual_network_peering_name', options_list=['--name', '-n'], help='The name of the VNet peering.', id_part='child_name_1') diff --git a/src/azure-cli/azure/cli/command_modules/network/commands.py b/src/azure-cli/azure/cli/command_modules/network/commands.py index 15127789a1b..d1daed8fa8b 100644 --- a/src/azure-cli/azure/cli/command_modules/network/commands.py +++ b/src/azure-cli/azure/cli/command_modules/network/commands.py @@ -16,7 +16,9 @@ transform_geographic_hierachy_table_output, transform_service_community_table_output, transform_waf_rule_sets_table_output, transform_network_usage_table, transform_nsg_rule_table_output, - transform_vnet_table_output, transform_effective_route_table, transform_effective_nsg, + transform_vnet_table_output, transform_vnet_available_cidrs_table_output, + transform_vnet_used_cidrs_table_output, + transform_effective_route_table, transform_effective_nsg, transform_vnet_gateway_routes_table, transform_vnet_gateway_bgp_peer_table) from azure.cli.command_modules.network._validators import ( process_ag_create_namespace, @@ -649,6 +651,8 @@ def load_command_table(self, _): self.command_table["network vnet update"] = VNetUpdate(loader=self) self.command_table['network vnet list'] = List(loader=self, table_transformer=transform_vnet_table_output) g.custom_command("list-available-ips", "list_available_ips", is_preview=True) + g.custom_command("list-available-cidrs", "list_available_cidrs", is_preview=True, table_transformer=transform_vnet_available_cidrs_table_output) + g.custom_command("list-used-cidrs", "list_used_cidrs", is_preview=True, table_transformer=transform_vnet_used_cidrs_table_output) with self.command_group("network vnet peering") as g: from .custom import VNetPeeringCreate diff --git a/src/azure-cli/azure/cli/command_modules/network/custom.py b/src/azure-cli/azure/cli/command_modules/network/custom.py index 2813ad8cd04..f09e4570d14 100644 --- a/src/azure-cli/azure/cli/command_modules/network/custom.py +++ b/src/azure-cli/azure/cli/command_modules/network/custom.py @@ -9,6 +9,7 @@ from collections import Counter, OrderedDict import socket +import struct from knack.log import get_logger from azure.mgmt.core.tools import parse_resource_id, is_valid_resource_id, resource_id @@ -5890,6 +5891,182 @@ def subnet_list_available_ips(cmd, resource_group_name, virtual_network_name, su "ip_address": start_ip, }).get("availableIPAddresses", []) +# converts 32-bit int into 4-octet IPv4 string +def _int_to_ip(n): + return socket.inet_ntoa(struct.pack('!I', n & 0xFFFFFFFF)) + +# converts 4-octet IPv4 CIDR string into a 32-bit int +def _get_network_int(ip_cidr): + ip, prefix_len = ip_cidr.split('/') + mask = (0xFFFFFFFF << (32 - int(prefix_len))) & 0xFFFFFFFF + return struct.unpack('!I', socket.inet_aton(ip))[0] & mask + +# number of IPs in CIDR block +def _block_size(prefix_len): + return 2 ** (32 - prefix_len) + +def _find_largest_aligned_cidr(start_int, available_size): + for prefix_len in range(1, 33): + size = _block_size(prefix_len) + mask = (0xFFFFFFFF << (32 - prefix_len)) & 0xFFFFFFFF + if (start_int & mask) == start_int and size <= available_size: + return prefix_len, size + return 32, 1 + + +def _fetch_vnet_and_subnets(cmd, resource_group_name, virtual_network_name): + from .aaz.latest.network.vnet import Show + from .aaz.latest.network.vnet.subnet import List as SubnetList + vnet = Show(cli_ctx=cmd.cli_ctx)(command_args={ + "name": virtual_network_name, + "resource_group": resource_group_name, + }) + subnets = list(SubnetList(cli_ctx=cmd.cli_ctx)(command_args={ + "vnet_name": virtual_network_name, + "resource_group": resource_group_name, + })) + return vnet, subnets + +def _validate_address_prefixes(address_prefixes, vnet_prefixes, virtual_network_name): + for prefix in address_prefixes: + syntactically_valid = False + try: + ip, prefix_len = prefix.split('/') + socket.inet_aton(ip) + syntactically_valid = 0 <= int(prefix_len) <= 32 + except (AttributeError, ValueError, socket.error): + pass + if not syntactically_valid or prefix not in vnet_prefixes: + raise InvalidArgumentValueError( + f"Address prefix {prefix} is not valid or does not exist in virtual network {virtual_network_name}" + ) + +# takes a raw subnet list and returns a minimal list of non-overlapping int +# ranges, together they cover the same IP space +def _merge_subnet_ranges(subnets): + used_ranges = [] + for subnet in subnets: + prefixes = subnet.get("addressPrefixes") or [subnet.get("addressPrefix")] + for prefix in prefixes: + if not prefix: + continue + start = _get_network_int(prefix) + size = _block_size(int(prefix.split('/')[1])) + used_ranges.append((start, start + size - 1)) + used_ranges.sort() + merged = [] + for rng_start, rng_end in used_ranges: + if merged and rng_start <= merged[-1][1] + 1: + merged[-1] = (merged[-1][0], max(merged[-1][1], rng_end)) + else: + merged.append((rng_start, rng_end)) + return merged + + +def list_available_cidrs(cmd, resource_group_name, virtual_network_name, address_prefixes=None): + vnet, subnets = _fetch_vnet_and_subnets(cmd, resource_group_name, virtual_network_name) + merged = _merge_subnet_ranges(subnets) + + azure_reserved_ips = 5 + vnet_prefixes = vnet["addressSpace"]["addressPrefixes"] + if address_prefixes: + _validate_address_prefixes(address_prefixes, vnet_prefixes, virtual_network_name) + vnet_prefixes = [p for p in vnet_prefixes if p in address_prefixes] + + result = [] + for vnet_cidr in vnet_prefixes: + vnet_start = _get_network_int(vnet_cidr) + vnet_prefix_len = int(vnet_cidr.split('/')[1]) + vnet_end = vnet_start + _block_size(vnet_prefix_len) - 1 + + available_cidrs = [] + current = vnet_start + + # iterate over merged used SNET ranges + for used_start, used_end in merged: + if used_end < vnet_start or used_start > vnet_end: + continue + clipped_start = max(used_start, vnet_start) + # free gap found, greedily fill with CIDR blocks... + while current < clipped_start: + gap = clipped_start - current + # ...but with the largest ^2 block... + prefix_len, size = _find_largest_aligned_cidr(current, gap) + block_end = current + size - 1 + available_cidrs.append({ + "cidrAddress": f"{_int_to_ip(current)}/{prefix_len}", + "startingAddress": _int_to_ip(current), + "endingAddress": _int_to_ip(block_end), + "usableIPs": max(0, size - azure_reserved_ips), + }) + current += size + # ...and repeats since there may still be a gap, so continue filling + current = max(current, min(used_end, vnet_end) + 1) + + # continues where we left off above by looking at trailing free space + while current <= vnet_end: + gap = vnet_end - current + 1 + prefix_len, size = _find_largest_aligned_cidr(current, gap) + block_end = current + size - 1 + available_cidrs.append({ + "cidrAddress": f"{_int_to_ip(current)}/{prefix_len}", + "startingAddress": _int_to_ip(current), + "endingAddress": _int_to_ip(block_end), + "usableIPs": max(0, size - azure_reserved_ips), + }) + current += size + + result.append({ + "addressPrefixes": vnet_cidr, + "availableCIDRs": available_cidrs, + }) + + return result + + +def list_used_cidrs(cmd, resource_group_name, virtual_network_name, address_prefixes=None): + vnet, subnets = _fetch_vnet_and_subnets(cmd, resource_group_name, virtual_network_name) + + vnet_prefixes = vnet["addressSpace"]["addressPrefixes"] + if address_prefixes: + _validate_address_prefixes(address_prefixes, vnet_prefixes, virtual_network_name) + vnet_prefixes = [p for p in vnet_prefixes if p in address_prefixes] + + result = [] + for vnet_cidr in vnet_prefixes: + vnet_start = _get_network_int(vnet_cidr) + vnet_prefix_len = int(vnet_cidr.split('/')[1]) + vnet_end = vnet_start + _block_size(vnet_prefix_len) - 1 + + used_cidrs = [] + for subnet in subnets: + prefixes = subnet.get("addressPrefixes") or [subnet.get("addressPrefix")] + # build a list of used CIDRs per address prefix + for prefix in prefixes: + if not prefix: + continue + subnet_start = _get_network_int(prefix) + if not (vnet_start <= subnet_start <= vnet_end): + continue + subnet_prefix_len = int(prefix.split('/')[1]) + subnet_end = subnet_start + _block_size(subnet_prefix_len) - 1 + used_cidrs.append({ + "cidrAddress": prefix, + "startingAddress": _int_to_ip(subnet_start), + "endingAddress": _int_to_ip(subnet_end), + "subnetName": subnet["name"], + "usableIPs": max(0, _block_size(subnet_prefix_len) - 5), + }) + + # change sort key to cidr dimension for readability + used_cidrs.sort(key=lambda x: _get_network_int(x["cidrAddress"])) + result.append({ + "addressPrefixes": vnet_cidr, + "usedCIDRs": used_cidrs, + }) + + return result + def sync_vnet_peering(cmd, resource_group_name, virtual_network_name, virtual_network_peering_name): from .aaz.latest.network.vnet.peering import Show diff --git a/src/azure-cli/azure/cli/command_modules/network/tests/latest/test_network_commands.py b/src/azure-cli/azure/cli/command_modules/network/tests/latest/test_network_commands.py index 101b0fc9682..18c46e16c87 100644 --- a/src/azure-cli/azure/cli/command_modules/network/tests/latest/test_network_commands.py +++ b/src/azure-cli/azure/cli/command_modules/network/tests/latest/test_network_commands.py @@ -5490,6 +5490,134 @@ def test_network_vnet_list_available_ips(self, resource_group): self.check('length(@)', 5) ]) + @ResourceGroupPreparer(name_prefix='cli_vnet_available_cidrs') + def test_network_vnet_list_available_cidrs(self, resource_group): + self.kwargs.update({ + 'vnet': 'vnet1', + 'rg': resource_group, + 'prefixes': '10.198.0.0/16 10.197.0.0/16', + }) + + self.cmd('network vnet create -g {rg} -n {vnet} --address-prefixes {prefixes}') + + # used CIDRs for SNET construction + for name, prefix in [ + ('s198-0-0', '10.198.0.0/24'), + ('s198-1-0', '10.198.1.0/26'), + ('s198-1-64', '10.198.1.64/26'), + ('s198-2-0', '10.198.2.0/26'), + ('s198-2-64', '10.198.2.64/26'), + ('s198-10', '10.198.10.0/24'), + ('s198-11', '10.198.11.0/24'), + ('s198-20', '10.198.20.0/24'), + ('s197-0', '10.197.0.0/24'), + ]: + self.kwargs.update({'subnet': name, 'prefix': prefix}) + self.cmd('network vnet subnet create -g {rg} --vnet-name {vnet} -n {subnet} --address-prefix {prefix} --default-outbound false') + + def as_cidr_set(cidrs): + return frozenset( + (c['cidrAddress'], c['startingAddress'], c['endingAddress'], c['usableIPs']) + for c in cidrs + ) + + # free CIDRs, expecting this matches just maybe not in the same order + expected = { + '10.198.0.0/16': frozenset([ + ('10.198.1.128/25', '10.198.1.128', '10.198.1.255', 123), + ('10.198.2.128/25', '10.198.2.128', '10.198.2.255', 123), + ('10.198.3.0/24', '10.198.3.0', '10.198.3.255', 251), + ('10.198.4.0/22', '10.198.4.0', '10.198.7.255', 1019), + ('10.198.8.0/23', '10.198.8.0', '10.198.9.255', 507), + ('10.198.12.0/22', '10.198.12.0', '10.198.15.255', 1019), + ('10.198.16.0/22', '10.198.16.0', '10.198.19.255', 1019), + ('10.198.21.0/24', '10.198.21.0', '10.198.21.255', 251), + ('10.198.22.0/23', '10.198.22.0', '10.198.23.255', 507), + ('10.198.24.0/21', '10.198.24.0', '10.198.31.255', 2043), + ('10.198.32.0/19', '10.198.32.0', '10.198.63.255', 8187), + ('10.198.64.0/18', '10.198.64.0', '10.198.127.255', 16379), + ('10.198.128.0/17', '10.198.128.0', '10.198.255.255', 32763), + ]), + '10.197.0.0/16': frozenset([ + ('10.197.1.0/24', '10.197.1.0', '10.197.1.255', 251), + ('10.197.2.0/23', '10.197.2.0', '10.197.3.255', 507), + ('10.197.4.0/22', '10.197.4.0', '10.197.7.255', 1019), + ('10.197.8.0/21', '10.197.8.0', '10.197.15.255', 2043), + ('10.197.16.0/20', '10.197.16.0', '10.197.31.255', 4091), + ('10.197.32.0/19', '10.197.32.0', '10.197.63.255', 8187), + ('10.197.64.0/18', '10.197.64.0', '10.197.127.255', 16379), + ('10.197.128.0/17', '10.197.128.0', '10.197.255.255', 32763), + ]), + } + + # both address spaces returned without filter + result = self.cmd('network vnet list-available-cidrs -g {rg} -n {vnet}').get_output_in_json() + by_prefix = {entry['addressPrefixes']: entry for entry in result} + self.assertEqual(set(by_prefix.keys()), set(expected.keys())) + for prefix, exp_cidrs in expected.items(): + self.assertEqual(as_cidr_set(by_prefix[prefix]['availableCIDRs']), exp_cidrs) + + # --address-prefixes filter test, assert per address space + for prefix, exp_cidrs in expected.items(): + result = self.cmd( + 'network vnet list-available-cidrs -g {rg} -n {vnet} --address-prefixes ' + prefix + ).get_output_in_json() + self.assertEqual(len(result), 1) + self.assertEqual(result[0]['addressPrefixes'], prefix) + self.assertEqual(as_cidr_set(result[0]['availableCIDRs']), exp_cidrs) + + @ResourceGroupPreparer(name_prefix='cli_vnet_used_cidrs') + def test_network_vnet_list_used_cidrs(self, resource_group): + self.kwargs.update({ + 'vnet': 'vnet1', + 'rg': resource_group, + 'prefixes': '10.197.0.0/16 10.196.0.0/16', + }) + + self.cmd('network vnet create -g {rg} -n {vnet} --address-prefixes {prefixes}') + + for name, prefix in [ + ('s197-0', '10.197.0.0/24'), + ('s197-10', '10.197.10.0/24'), + ('s197-11', '10.197.11.0/24'), + ('s196-0', '10.196.0.0/24'), + ]: + self.kwargs.update({'subnet': name, 'prefix': prefix}) + self.cmd('network vnet subnet create -g {rg} --vnet-name {vnet} -n {subnet} --address-prefix {prefix} --default-outbound false') + + def as_used_cidr_set(cidrs): + return frozenset( + (c['cidrAddress'], c['startingAddress'], c['endingAddress'], c['subnetName'], c['usableIPs']) + for c in cidrs + ) + + expected = { + '10.197.0.0/16': frozenset([ + ('10.197.0.0/24', '10.197.0.0', '10.197.0.255', 's197-0', 251), + ('10.197.10.0/24', '10.197.10.0', '10.197.10.255', 's197-10', 251), + ('10.197.11.0/24', '10.197.11.0', '10.197.11.255', 's197-11', 251), + ]), + '10.196.0.0/16': frozenset([ + ('10.196.0.0/24', '10.196.0.0', '10.196.0.255', 's196-0', 251), + ]), + } + + # both address spaces returned without filter + result = self.cmd('network vnet list-used-cidrs -g {rg} -n {vnet}').get_output_in_json() + by_prefix = {entry['addressPrefixes']: entry for entry in result} + self.assertEqual(set(by_prefix.keys()), set(expected.keys())) + for prefix, exp_cidrs in expected.items(): + self.assertEqual(as_used_cidr_set(by_prefix[prefix]['usedCIDRs']), exp_cidrs) + + # --address-prefixes filter test, assert per address space + for prefix, exp_cidrs in expected.items(): + result = self.cmd( + 'network vnet list-used-cidrs -g {rg} -n {vnet} --address-prefixes ' + prefix + ).get_output_in_json() + self.assertEqual(len(result), 1) + self.assertEqual(result[0]['addressPrefixes'], prefix) + self.assertEqual(as_used_cidr_set(result[0]['usedCIDRs']), exp_cidrs) + @ResourceGroupPreparer(name_prefix='cli_vnet_with_bgp_community') def test_network_vnet_with_bgp_community(self, resource_group): self.kwargs.update({