|
8 | 8 |
|
9 | 9 | import google.api_core.exceptions |
10 | 10 | import google.cloud.compute_v1 as compute_v1 |
| 11 | +import gpuhunt |
11 | 12 | from cachetools import TTLCache, cachedmethod |
12 | 13 | from google.cloud import tpu_v2 |
13 | 14 | from google.cloud.compute_v1.types.compute import Instance |
|
64 | 65 | InstanceConfiguration, |
65 | 66 | InstanceOffer, |
66 | 67 | InstanceOfferWithAvailability, |
67 | | - InstanceType, |
68 | 68 | Resources, |
69 | 69 | ) |
70 | 70 | from dstack._internal.core.models.placement import PlacementGroup, PlacementGroupProvisioningData |
@@ -136,35 +136,37 @@ def __init__(self, config: GCPConfig): |
136 | 136 |
|
137 | 137 | def get_all_offers_with_availability(self) -> List[InstanceOfferWithAvailability]: |
138 | 138 | regions = get_or_error(self.config.regions) |
| 139 | + zones_by_key: Dict[Tuple, List[str]] = {} |
| 140 | + catalog_item_filter = _make_catalog_item_filter(regions, zones_by_key) |
139 | 141 | offers = get_catalog_offers( |
140 | 142 | backend=BackendType.GCP, |
141 | | - extra_filter=_supported_instances_and_zones(regions), |
| 143 | + catalog_item_filter=catalog_item_filter, |
142 | 144 | ) |
143 | 145 | quotas: Dict[str, Dict[str, float]] = defaultdict(dict) |
144 | 146 | for region in self.regions_client.list(project=self.config.project_id): |
145 | 147 | for quota in region.quotas: |
146 | 148 | quotas[region.name][quota.metric] = quota.limit - quota.usage |
147 | 149 |
|
148 | | - offer_keys_to_offers = {} |
149 | 150 | offers_with_availability = [] |
150 | 151 | for offer in offers: |
151 | 152 | region = offer.region[:-2] # strip zone |
152 | | - key = (_unique_instance_name(offer.instance), region) |
153 | | - if key in offer_keys_to_offers: |
154 | | - offer_keys_to_offers[key].availability_zones.append(offer.region) |
155 | | - continue |
| 153 | + gpu_name = ( |
| 154 | + offer.instance.resources.gpus[0].name if offer.instance.resources.gpus else None |
| 155 | + ) |
| 156 | + key = _offer_dedup_key( |
| 157 | + offer.instance.name, offer.instance.resources.spot, gpu_name, region |
| 158 | + ) |
156 | 159 | availability = InstanceAvailability.NO_QUOTA |
157 | 160 | if _has_gpu_quota(quotas[region], offer.instance.resources): |
158 | 161 | availability = InstanceAvailability.UNKNOWN |
159 | 162 | # todo quotas: cpu, memory, global gpu, tpu |
160 | 163 | offer_with_availability = InstanceOfferWithAvailability( |
161 | 164 | **offer.dict(), |
162 | 165 | availability=availability, |
163 | | - availability_zones=[offer.region], |
| 166 | + availability_zones=zones_by_key.get(key, []), |
164 | 167 | ) |
165 | | - offer_keys_to_offers[key] = offer_with_availability |
166 | 168 | offers_with_availability.append(offer_with_availability) |
167 | | - offers_with_availability[-1].region = region |
| 169 | + offer_with_availability.region = region |
168 | 170 | return offers_with_availability |
169 | 171 |
|
170 | 172 | def get_offers_modifiers(self, requirements: Requirements) -> Iterable[OfferModifier]: |
@@ -992,37 +994,62 @@ def _find_reservation(self, configured_name: str) -> dict[str, compute_v1.Reserv |
992 | 994 | ) |
993 | 995 |
|
994 | 996 |
|
995 | | -def _supported_instances_and_zones( |
| 997 | +def _is_supported_gcp_instance(instance_name: str, gpu_name: Optional[str]) -> bool: |
| 998 | + """Check if the instance is supported by dstack.""" |
| 999 | + if _is_tpu(instance_name) and not _is_single_host_tpu(instance_name): |
| 1000 | + return False |
| 1001 | + for family in [ |
| 1002 | + "m4-", |
| 1003 | + "c4-", |
| 1004 | + "n4-", |
| 1005 | + "h3-", |
| 1006 | + "n2-", |
| 1007 | + "e2-medium", |
| 1008 | + "e2-standard-", |
| 1009 | + "e2-highmem-", |
| 1010 | + "e2-highcpu-", |
| 1011 | + "m1-", |
| 1012 | + "a2-", |
| 1013 | + "a3-", |
| 1014 | + "g2-", |
| 1015 | + ]: |
| 1016 | + if instance_name.startswith(family): |
| 1017 | + return True |
| 1018 | + if gpu_name is not None and gpu_name not in {"K80", "P4"}: |
| 1019 | + return True |
| 1020 | + return False |
| 1021 | + |
| 1022 | + |
| 1023 | +def _offer_dedup_key( |
| 1024 | + instance_name: str, spot: bool, gpu_name: Optional[str], region: str |
| 1025 | +) -> Tuple[str, bool, Optional[str], str]: |
| 1026 | + """Key for deduplicating GCP per-zone items into per-region offers.""" |
| 1027 | + return (instance_name, spot, gpu_name, region) |
| 1028 | + |
| 1029 | + |
| 1030 | +def _make_catalog_item_filter( |
996 | 1031 | regions: List[str], |
997 | | -) -> Optional[Callable[[InstanceOffer], bool]]: |
998 | | - def _filter(offer: InstanceOffer) -> bool: |
999 | | - # strip zone |
1000 | | - if offer.region[:-2] not in regions: |
| 1032 | + zones_by_key: Dict[Tuple, List[str]], |
| 1033 | +) -> Callable[[gpuhunt.CatalogItem], bool]: |
| 1034 | + """ |
| 1035 | + Returns a filter that checks region, instance support, and deduplicates |
| 1036 | + per-zone items into per-region offers. Zones are collected in `zones_by_key` |
| 1037 | + so the caller can attach them to offers later. |
| 1038 | + """ |
| 1039 | + seen: set = set() |
| 1040 | + |
| 1041 | + def _filter(item: gpuhunt.CatalogItem) -> bool: |
| 1042 | + region = item.location[:-2] |
| 1043 | + if region not in regions: |
1001 | 1044 | return False |
1002 | | - # remove multi-host TPUs for initial release |
1003 | | - if _is_tpu(offer.instance.name) and not _is_single_host_tpu(offer.instance.name): |
| 1045 | + if not _is_supported_gcp_instance(item.instance_name, item.gpu_name): |
1004 | 1046 | return False |
1005 | | - for family in [ |
1006 | | - "m4-", |
1007 | | - "c4-", |
1008 | | - "n4-", |
1009 | | - "h3-", |
1010 | | - "n2-", |
1011 | | - "e2-medium", |
1012 | | - "e2-standard-", |
1013 | | - "e2-highmem-", |
1014 | | - "e2-highcpu-", |
1015 | | - "m1-", |
1016 | | - "a2-", |
1017 | | - "a3-", |
1018 | | - "g2-", |
1019 | | - ]: |
1020 | | - if offer.instance.name.startswith(family): |
1021 | | - return True |
1022 | | - if offer.instance.resources.gpus: |
1023 | | - if offer.instance.resources.gpus[0].name not in {"K80", "P4"}: |
1024 | | - return True |
1025 | | - return False |
| 1047 | + key = _offer_dedup_key(item.instance_name, item.spot, item.gpu_name, region) |
| 1048 | + zones_by_key.setdefault(key, []).append(item.location) |
| 1049 | + if key in seen: |
| 1050 | + return False |
| 1051 | + seen.add(key) |
| 1052 | + return True |
1026 | 1053 |
|
1027 | 1054 | return _filter |
1028 | 1055 |
|
@@ -1090,17 +1117,6 @@ def _reservation_has_capacity(reservation: compute_v1.Reservation) -> bool: |
1090 | 1117 | ) |
1091 | 1118 |
|
1092 | 1119 |
|
1093 | | -def _unique_instance_name(instance: InstanceType) -> str: |
1094 | | - if instance.resources.spot: |
1095 | | - name = f"{instance.name}-spot" |
1096 | | - else: |
1097 | | - name = instance.name |
1098 | | - if not instance.resources.gpus: |
1099 | | - return name |
1100 | | - gpu = instance.resources.gpus[0] |
1101 | | - return f"{name}-{gpu.name}-{gpu.memory_mib}" |
1102 | | - |
1103 | | - |
1104 | 1120 | @dataclass |
1105 | 1121 | class GCPImage: |
1106 | 1122 | id: str |
|
0 commit comments